From 2309c4ecaf2f0d2949b478dc1be1be1f75fe0c74 Mon Sep 17 00:00:00 2001 From: Santiago Jimenez Giraldo Date: Fri, 12 Jan 2024 12:03:35 +0100 Subject: [PATCH] feat: expose Redpanda's listener in the docker network (#1994) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: implement WithListener for Redpanda module New function WithListener lets you add a new listener to the redpanda container that can be used within any of the docker networks of the container Signed-off-by: Santiago Jimenez Giraldo * test: test listeners for Redpanda Container Add test for new WithListener function, validate connectivity and couple of asserts in the construction of the listener Signed-off-by: Santiago Jimenez Giraldo * docs: add documentation for WithListener option Redpanda Add documentation for the additional listener option (WithListener) for the Redpanda module Signed-off-by: Santiago Jimenez Giraldo * fix: run make lint --------- Signed-off-by: Santiago Jimenez Giraldo Co-authored-by: Manuel de la Peña --- docs/modules/redpanda.md | 20 ++++ modules/redpanda/mounts/redpanda.yaml.tpl | 13 +++ modules/redpanda/options.go | 39 +++++++- modules/redpanda/redpanda.go | 49 ++++++++- modules/redpanda/redpanda_test.go | 117 ++++++++++++++++++++++ 5 files changed, 232 insertions(+), 6 deletions(-) diff --git a/docs/modules/redpanda.md b/docs/modules/redpanda.md index c7d8cd36d9..ad2223ce83 100644 --- a/docs/modules/redpanda.md +++ b/docs/modules/redpanda.md @@ -51,6 +51,26 @@ for Redpanda. E.g. `testcontainers.WithImage("docker.redpanda.com/redpandadata/r If you need to enable TLS use `WithTLS` with a valid PEM encoded certificate and key. +#### Additional Listener + +There are scenarios where additional listeners are needed, for example if you +want to consume/from another container in the same network + +You can use the `WithListener` option to add a listener to the Redpanda container. + +[Register additional listener](../../modules/redpanda/redpanda_test.go) inside_block:withListenerRP + + +Container defined in the same network + +[Start Kcat container](../../modules/redpanda/redpanda_test.go) inside_block:withListenerKcat + + +Produce messages using the new registered listener + +[Produce/consume via registered listener](../../modules/redpanda/redpanda_test.go) inside_block:withListenerExec + + ### Container Methods The Redpanda container exposes the following methods: diff --git a/modules/redpanda/mounts/redpanda.yaml.tpl b/modules/redpanda/mounts/redpanda.yaml.tpl index a19d21ce19..935e1923ef 100644 --- a/modules/redpanda/mounts/redpanda.yaml.tpl +++ b/modules/redpanda/mounts/redpanda.yaml.tpl @@ -19,6 +19,13 @@ redpanda: port: 9093 authentication_method: {{ if .KafkaAPI.EnableAuthorization }}sasl{{ else }}none{{ end }} + {{ range .KafkaAPI.Listeners }} + - address: 0.0.0.0 + name: {{ .Address }} + port: {{ .Port }} + authentication_method: {{ .AuthenticationMethod }} + {{ end }} + advertised_kafka_api: - address: {{ .KafkaAPI.AdvertisedHost }} name: external @@ -26,6 +33,12 @@ redpanda: - address: 127.0.0.1 name: internal port: 9093 + {{ range .KafkaAPI.Listeners }} + - address: {{ .Address }} + name: {{ .Address }} + port: {{ .Port }} + {{ end }} + {{ if .EnableTLS }} admin_api_tls: diff --git a/modules/redpanda/options.go b/modules/redpanda/options.go index 379492b95d..1d4afcf8af 100644 --- a/modules/redpanda/options.go +++ b/modules/redpanda/options.go @@ -1,6 +1,11 @@ package redpanda -import "github.com/testcontainers/testcontainers-go" +import ( + "net" + "strconv" + + "github.com/testcontainers/testcontainers-go" +) type options struct { // Superusers is a list of service account names. @@ -29,7 +34,12 @@ type options struct { // EnableTLS is a flag to enable TLS. EnableTLS bool + cert, key []byte + + // Listeners is a list of custom listeners that can be provided to access the + // containers form within docker networks + Listeners []listener } func defaultOptions() options { @@ -41,6 +51,7 @@ func defaultOptions() options { ServiceAccounts: make(map[string]string, 0), AutoCreateTopics: false, EnableTLS: false, + Listeners: make([]listener, 0), } } @@ -86,6 +97,8 @@ func WithEnableKafkaAuthorization() Option { } } +// WithEnableSchemaRegistryHTTPBasicAuth enables HTTP basic authentication for +// Schema Registry. func WithEnableSchemaRegistryHTTPBasicAuth() Option { return func(o *options) { o.SchemaRegistryAuthenticationMethod = "http_basic" @@ -106,3 +119,27 @@ func WithTLS(cert, key []byte) Option { o.key = key } } + +// WithListener adds a custom listener to the Redpanda containers. Listener +// will be aliases to all networks, so they can be accessed from within docker +// networks. At leas one network must be attached to the container, if not an +// error will be thrown when starting the container. +func WithListener(lis string) Option { + host, port, err := net.SplitHostPort(lis) + if err != nil { + return func(o *options) {} + } + + portInt, err := strconv.Atoi(port) + if err != nil { + return func(o *options) {} + } + + return func(o *options) { + o.Listeners = append(o.Listeners, listener{ + Address: host, + Port: portInt, + AuthenticationMethod: o.KafkaAuthenticationMethod, + }) + } +} diff --git a/modules/redpanda/redpanda.go b/modules/redpanda/redpanda.go index fd885fbc07..3a30a24e75 100644 --- a/modules/redpanda/redpanda.go +++ b/modules/redpanda/redpanda.go @@ -5,6 +5,7 @@ import ( "context" _ "embed" "fmt" + "math" "os" "path/filepath" "text/template" @@ -31,6 +32,7 @@ const ( defaultKafkaAPIPort = "9092/tcp" defaultAdminAPIPort = "9644/tcp" defaultSchemaRegistryPort = "8081/tcp" + defaultDockerKafkaApiPort = "29092" redpandaDir = "/etc/redpanda" entrypointFile = "/entrypoint-tc.sh" @@ -98,6 +100,12 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize return nil, fmt.Errorf("failed to create entrypoint file: %w", err) } + // 4. Register extra kafka listeners if provided, network aliases will be + // set + if err := registerListeners(ctx, settings, req); err != nil { + return nil, fmt.Errorf("failed to register listeners: %w", err) + } + // Bootstrap config file contains cluster configurations which will only be considered // the very first time you start a cluster. bootstrapConfigPath := filepath.Join(tmpDir, bootstrapConfigFile) @@ -122,7 +130,7 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize }, ) - // 4. Create certificate and key for TLS connections. + // 5. Create certificate and key for TLS connections. if settings.EnableTLS { certPath := filepath.Join(tmpDir, certFile) if err := os.WriteFile(certPath, settings.cert, 0o600); err != nil { @@ -152,7 +160,7 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize return nil, err } - // 5. Get mapped port for the Kafka API, so that we can render and then mount + // 6. Get mapped port for the Kafka API, so that we can render and then mount // the Redpanda config with the advertised Kafka address. hostIP, err := container.Host(ctx) if err != nil { @@ -164,7 +172,7 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize return nil, fmt.Errorf("failed to get mapped Kafka port: %w", err) } - // 6. Render redpanda.yaml config and mount it. + // 7. Render redpanda.yaml config and mount it. nodeConfig, err := renderNodeConfig(settings, hostIP, kafkaPort.Int()) if err != nil { return nil, fmt.Errorf("failed to render node config: %w", err) @@ -175,7 +183,7 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize return nil, fmt.Errorf("failed to copy redpanda.yaml into container: %w", err) } - // 6. Wait until Redpanda is ready to serve requests + // 8. Wait until Redpanda is ready to serve requests err = wait.ForAll( wait.ForListeningPort(defaultKafkaAPIPort), wait.ForLog("Successfully started Redpanda!").WithPollInterval(100*time.Millisecond)). @@ -185,7 +193,7 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize return nil, fmt.Errorf("failed to wait for Redpanda readiness: %w", err) } - // 7. Create Redpanda Service Accounts if configured to do so. + // 9. Create Redpanda Service Accounts if configured to do so. if len(settings.ServiceAccounts) > 0 { adminAPIPort, err := container.MappedPort(ctx, nat.Port(defaultAdminAPIPort)) if err != nil { @@ -252,6 +260,29 @@ func renderBootstrapConfig(settings options) ([]byte, error) { return bootstrapConfig.Bytes(), nil } +// registerListeners validates that the provided listeners are valid and set network aliases for the provided addresses. +// The container must be attached to at least one network. +func registerListeners(ctx context.Context, settings options, req testcontainers.GenericContainerRequest) error { + if len(settings.Listeners) == 0 { + return nil + } + + if len(req.Networks) == 0 { + return fmt.Errorf("container must be attached to at least one network") + } + + for _, listener := range settings.Listeners { + if listener.Port < 0 || listener.Port > math.MaxUint16 { + return fmt.Errorf("invalid port on listener %s:%d (must be between 0 and 65535)", listener.Address, listener.Port) + } + + for _, network := range req.Networks { + req.NetworkAliases[network] = append(req.NetworkAliases[network], listener.Address) + } + } + return nil +} + // renderNodeConfig renders the redpanda.yaml node config and returns it as // byte array. func renderNodeConfig(settings options, hostIP string, advertisedKafkaPort int) ([]byte, error) { @@ -262,6 +293,7 @@ func renderNodeConfig(settings options, hostIP string, advertisedKafkaPort int) AdvertisedPort: advertisedKafkaPort, AuthenticationMethod: settings.KafkaAuthenticationMethod, EnableAuthorization: settings.KafkaEnableAuthorization, + Listeners: settings.Listeners, }, SchemaRegistry: redpandaConfigTplParamsSchemaRegistry{ AuthenticationMethod: settings.SchemaRegistryAuthenticationMethod, @@ -300,8 +332,15 @@ type redpandaConfigTplParamsKafkaAPI struct { AdvertisedPort int AuthenticationMethod string EnableAuthorization bool + Listeners []listener } type redpandaConfigTplParamsSchemaRegistry struct { AuthenticationMethod string } + +type listener struct { + Address string + Port int + AuthenticationMethod string +} diff --git a/modules/redpanda/redpanda_test.go b/modules/redpanda/redpanda_test.go index 7ad424b3bd..ced487cadd 100644 --- a/modules/redpanda/redpanda_test.go +++ b/modules/redpanda/redpanda_test.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "crypto/x509" "fmt" + "io" "net/http" "strings" "testing" @@ -16,6 +17,9 @@ import ( "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/sasl/scram" + + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/network" ) func TestRedpanda(t *testing.T) { @@ -278,6 +282,119 @@ func TestRedpandaWithTLS(t *testing.T) { require.Error(t, results.FirstErr(), kerr.UnknownTopicOrPartition) } +func TestRedpandaListener_Simple(t *testing.T) { + ctx := context.Background() + + // 1. Create network + rpNetwork, err := network.New(ctx, network.WithCheckDuplicate()) + require.NoError(t, err) + + // 2. Start Redpanda container + // withListenerRP { + container, err := RunContainer(ctx, + testcontainers.WithImage("redpandadata/redpanda:v23.2.18"), + network.WithNetwork([]string{"redpanda-host"}, rpNetwork), + WithListener("redpanda:29092"), WithAutoCreateTopics(), + ) + // } + require.NoError(t, err) + + // 3. Start KCat container + // withListenerKcat { + kcat, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: testcontainers.ContainerRequest{ + Image: "confluentinc/cp-kcat:7.4.1", + Networks: []string{ + rpNetwork.Name, + }, + Entrypoint: []string{ + "sh", + }, + Cmd: []string{ + "-c", + "tail -f /dev/null", + }, + }, + Started: true, + }) + // } + + require.NoError(t, err) + + // 4. Copy message to kcat + err = kcat.CopyToContainer(ctx, []byte("Message produced by kcat"), "/tmp/msgs.txt", 700) + require.NoError(t, err) + + // 5. Produce message to Redpanda + // withListenerExec { + _, _, err = kcat.Exec(ctx, []string{"kcat", "-b", "redpanda:29092", "-t", "msgs", "-P", "-l", "/tmp/msgs.txt"}) + // } + + require.NoError(t, err) + + // 6. Consume message from Redpanda + _, stdout, err := kcat.Exec(ctx, []string{"kcat", "-b", "redpanda:29092", "-C", "-t", "msgs", "-c", "1"}) + require.NoError(t, err) + + // 7. Read Message from stdout + out, err := io.ReadAll(stdout) + require.NoError(t, err) + + require.Contains(t, string(out), "Message produced by kcat") + + t.Cleanup(func() { + if err := kcat.Terminate(ctx); err != nil { + t.Fatalf("failed to terminate kcat container: %s", err) + } + if err := container.Terminate(ctx); err != nil { + t.Fatalf("failed to terminate redpanda container: %s", err) + } + + if err := rpNetwork.Remove(ctx); err != nil { + t.Fatalf("failed to remove network: %s", err) + } + }) +} + +func TestRedpandaListener_InvalidPort(t *testing.T) { + ctx := context.Background() + + // 1. Create network + RPNetwork, err := network.New(ctx, network.WithCheckDuplicate()) + require.NoError(t, err) + + // 2. Attempt Start Redpanda container + _, err = RunContainer(ctx, + testcontainers.WithImage("redpandadata/redpanda:v23.2.18"), + WithListener("redpanda:99092"), + network.WithNetwork([]string{"redpanda-host"}, RPNetwork), + ) + + require.Error(t, err) + + require.Contains(t, err.Error(), "invalid port on listener redpanda:99092") + + t.Cleanup(func() { + if err := RPNetwork.Remove(ctx); err != nil { + t.Fatalf("failed to remove network: %s", err) + } + }) +} + +func TestRedpandaListener_NoNetwork(t *testing.T) { + ctx := context.Background() + + // 1. Attempt Start Redpanda container + _, err := RunContainer(ctx, + testcontainers.WithImage("redpandadata/redpanda:v23.2.18"), + WithListener("redpanda:99092"), + ) + + require.Error(t, err) + + require.Contains(t, err.Error(), "container must be attached to at least one network") +} + // localhostCert is a PEM-encoded TLS cert with SAN IPs // generated from src/crypto/tls: // go run generate_cert.go --rsa-bits 2048 --host 127.0.0.1,::1,localhost --ca --start-date "Jan 1 00:00:00 1970" --duration=1000000h