Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(elasticsearch): wait for #2724

Merged
merged 1 commit into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 89 additions & 66 deletions modules/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package elasticsearch

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"os"
Expand All @@ -15,6 +17,7 @@ const (
defaultTCPPort = "9300"
defaultPassword = "changeme"
defaultUsername = "elastic"
defaultCaCertPath = "/usr/share/elasticsearch/config/certs/http_ca.crt"
minimalImageVersion = "7.9.2"
)

Expand All @@ -32,7 +35,7 @@ type ElasticsearchContainer struct {
}

// Deprecated: use Run instead
// RunContainer creates an instance of the Couchbase container type
// RunContainer creates an instance of the Elasticsearch container type
func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomizer) (*ElasticsearchContainer, error) {
return Run(ctx, "docker.elastic.co/elasticsearch/elasticsearch:7.9.2", opts...)
}
Expand All @@ -50,54 +53,41 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
defaultHTTPPort + "/tcp",
defaultTCPPort + "/tcp",
},
// regex that
// matches 8.3 JSON logging with started message and some follow up content within the message field
// matches 8.0 JSON logging with no whitespace between message field and content
// matches 7.x JSON logging with whitespace between message field and content
// matches 6.x text logging with node name in brackets and just a 'started' message till the end of the line
WaitingFor: wait.ForLog(`.*("message":\s?"started(\s|")?.*|]\sstarted\n)`).AsRegexp(),
LifecycleHooks: []testcontainers.ContainerLifecycleHooks{
{
// the container needs a post create hook to set the default JVM options in a file
PostCreates: []testcontainers.ContainerHook{},
PostReadies: []testcontainers.ContainerHook{},
},
},
},
Started: true,
}

// Gather all config options (defaults and then apply provided options)
settings := defaultOptions()
options := defaultOptions()
for _, opt := range opts {
if apply, ok := opt.(Option); ok {
apply(settings)
apply(options)
}
if err := opt.Customize(&req); err != nil {
return nil, err
}
}

// Transfer the certificate settings to the container request
err := configureCertificate(settings, &req)
if err != nil {
return nil, err
}

// Transfer the password settings to the container request
err = configurePassword(settings, &req)
if err != nil {
if err := configurePassword(options, &req); err != nil {
return nil, err
}

if isAtLeastVersion(req.Image, 7) {
req.LifecycleHooks[0].PostCreates = append(req.LifecycleHooks[0].PostCreates, configureJvmOpts)
req.LifecycleHooks = append(req.LifecycleHooks,
testcontainers.ContainerLifecycleHooks{
PostCreates: []testcontainers.ContainerHook{configureJvmOpts},
},
)
}

// Set the default waiting strategy if not already set.
setWaitFor(options, &req.ContainerRequest)

container, err := testcontainers.GenericContainer(ctx, req)
var esContainer *ElasticsearchContainer
if container != nil {
esContainer = &ElasticsearchContainer{Container: container, Settings: *settings}
esContainer = &ElasticsearchContainer{Container: container, Settings: *options}
}
if err != nil {
return esContainer, fmt.Errorf("generic container: %w", err)
Expand All @@ -110,6 +100,61 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
return esContainer, nil
}

// certWriter is a helper that writes the details of a CA cert to options.
type certWriter struct {
options *Options
certPool *x509.CertPool
}

// Read reads the CA cert from the reader and appends it to the options.
func (w *certWriter) Read(r io.Reader) error {
buf, err := io.ReadAll(r)
if err != nil {
return fmt.Errorf("read CA cert: %w", err)
}

w.options.CACert = buf
w.certPool.AppendCertsFromPEM(w.options.CACert)

return nil
}

// setWaitFor sets the req.WaitingFor strategy based on settings.
func setWaitFor(options *Options, req *testcontainers.ContainerRequest) {
var strategies []wait.Strategy
if req.WaitingFor != nil {
// Custom waiting strategy, ensure we honour it.
strategies = append(strategies, req.WaitingFor)
}

waitHTTP := wait.ForHTTP("/").WithPort(defaultHTTPPort)
if sslRequired(req) {
waitHTTP = waitHTTP.WithTLS(true).WithAllowInsecure(true)
cw := &certWriter{
options: options,
certPool: x509.NewCertPool(),
}

waitHTTP = waitHTTP.
WithTLS(true, &tls.Config{RootCAs: cw.certPool})

strategies = append(strategies, wait.ForFile(defaultCaCertPath).WithMatcher(cw.Read))
stevenh marked this conversation as resolved.
Show resolved Hide resolved
}

if options.Password != "" || options.Username != "" {
waitHTTP = waitHTTP.WithBasicAuth(options.Username, options.Password)
}

strategies = append(strategies, waitHTTP)

if len(strategies) > 1 {
req.WaitingFor = wait.ForAll(strategies...)
return
}

req.WaitingFor = strategies[0]
}

// configureAddress sets the address of the Elasticsearch container.
// If the certificate is set, it will use https as protocol, otherwise http.
func (c *ElasticsearchContainer) configureAddress(ctx context.Context) error {
Expand All @@ -133,50 +178,28 @@ func (c *ElasticsearchContainer) configureAddress(ctx context.Context) error {
return nil
}

// configureCertificate transfers the certificate settings to the container request.
// For that, it defines a post start hook that copies the certificate from the container to the host.
// The certificate is only available since version 8, and will be located in a well-known location.
func configureCertificate(settings *Options, req *testcontainers.GenericContainerRequest) error {
if isAtLeastVersion(req.Image, 8) {
// These configuration keys explicitly disable CA generation.
// If any are set we skip the file retrieval.
configKeys := []string{
"xpack.security.enabled",
"xpack.security.http.ssl.enabled",
"xpack.security.transport.ssl.enabled",
}
for _, configKey := range configKeys {
if value, ok := req.Env[configKey]; ok {
if value == "false" {
return nil
}
// sslRequired returns true if the SSL is required, otherwise false.
func sslRequired(req *testcontainers.ContainerRequest) bool {
if !isAtLeastVersion(req.Image, 8) {
return false
}

// These configuration keys explicitly disable CA generation.
// If any are set we skip the file retrieval.
configKeys := []string{
"xpack.security.enabled",
"xpack.security.http.ssl.enabled",
"xpack.security.transport.ssl.enabled",
}
for _, configKey := range configKeys {
if value, ok := req.Env[configKey]; ok {
if value == "false" {
return false
}
}

// The container needs a post ready hook to copy the certificate from the container to the host.
// This certificate is only available since version 8
req.LifecycleHooks[0].PostReadies = append(req.LifecycleHooks[0].PostReadies,
func(ctx context.Context, container testcontainers.Container) error {
const defaultCaCertPath = "/usr/share/elasticsearch/config/certs/http_ca.crt"

readCloser, err := container.CopyFileFromContainer(ctx, defaultCaCertPath)
if err != nil {
return err
}

// receive the bytes from the default location
certBytes, err := io.ReadAll(readCloser)
if err != nil {
return err
}

settings.CACert = certBytes

return nil
})
}

return nil
return true
}

// configurePassword transfers the password settings to the container request.
Expand Down
1 change: 0 additions & 1 deletion modules/elasticsearch/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ type Options struct {

func defaultOptions() *Options {
return &Options{
CACert: nil,
Username: defaultUsername,
}
}
Expand Down