Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/producer interceptor #144

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions batch_producer_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package kafka

import (
"context"
)

type BatchProducerInterceptorContext struct {
Context context.Context
Message *Message
}

type BatchProducerInterceptor interface {
OnProduce(ctx context.Context, msg Message)
}
2 changes: 1 addition & 1 deletion examples/with-deadletter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func main() {
Writer: kafka.WriterConfig{
Brokers: []string{"localhost:29092"},
},
})
}, nil)

_ = producer.Produce(context.Background(), kafka.Message{
Topic: topicName,
Expand Down
2 changes: 1 addition & 1 deletion examples/with-kafka-producer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ func main() {
Writer: kafka.WriterConfig{
Brokers: []string{"localhost:29092"},
},
})
}, nil)

const topicName = "standart-topic"

Expand Down
2 changes: 1 addition & 1 deletion examples/with-kafka-transactional-retry-disabled/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
func main() {
producer, _ := kafka.NewProducer(&kafka.ProducerConfig{
Writer: kafka.WriterConfig{Brokers: []string{"localhost:29092"}, Topic: "standart-topic"},
})
}, nil)

producer.ProduceBatch(context.Background(), []kafka.Message{
{Key: []byte("key1"), Value: []byte("message1")},
Expand Down
28 changes: 19 additions & 9 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ type Writer interface {
}

type producer struct {
w Writer
w Writer
interceptor *ProducerInterceptor
}

func NewProducer(cfg *ProducerConfig) (Producer, error) {
func NewProducer(cfg *ProducerConfig, interceptor *ProducerInterceptor) (Producer, error) {
kafkaWriter := &kafka.Writer{
Addr: kafka.TCP(cfg.Writer.Brokers...),
Topic: cfg.Writer.Topic,
Expand Down Expand Up @@ -51,7 +52,7 @@ func NewProducer(cfg *ProducerConfig) (Producer, error) {
kafkaWriter.Transport = transport
}

p := &producer{w: kafkaWriter}
p := &producer{w: kafkaWriter, interceptor: interceptor}

if cfg.DistributedTracingEnabled {
otelWriter, err := NewOtelProducer(cfg, kafkaWriter)
Expand All @@ -64,18 +65,27 @@ func NewProducer(cfg *ProducerConfig) (Producer, error) {
return p, nil
}

func (c *producer) Produce(ctx context.Context, message Message) error {
return c.w.WriteMessages(ctx, message.toKafkaMessage())
func (p *producer) Produce(ctx context.Context, message Message) error {
if p.interceptor != nil {
(*p.interceptor).OnProduce(ProducerInterceptorContext{Context: ctx, Message: &message})
}

return p.w.WriteMessages(ctx, message.toKafkaMessage())
}

func (c *producer) ProduceBatch(ctx context.Context, messages []Message) error {
func (p *producer) ProduceBatch(ctx context.Context, messages []Message) error {
kafkaMessages := make([]kafka.Message, 0, len(messages))
for i := range messages {
if p.interceptor != nil {
(*p.interceptor).OnProduce(ProducerInterceptorContext{Context: ctx, Message: &messages[i]})
}

kafkaMessages = append(kafkaMessages, messages[i].toKafkaMessage())
}
return c.w.WriteMessages(ctx, kafkaMessages...)

return p.w.WriteMessages(ctx, kafkaMessages...)
}

func (c *producer) Close() error {
return c.w.Close()
func (p *producer) Close() error {
return p.w.Close()
}
14 changes: 14 additions & 0 deletions producer_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package kafka

import (
"context"
)

type ProducerInterceptorContext struct {
Context context.Context
Message *Message
}

type ProducerInterceptor interface {
OnProduce(ctx ProducerInterceptorContext)
}
39 changes: 38 additions & 1 deletion producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package kafka

import (
"context"
stubData "github.com/Trendyol/kafka-konsumer/v2/test/stub-data"
"github.com/gofiber/fiber/v2/utils"
"testing"

"github.com/segmentio/kafka-go"
Expand All @@ -20,6 +22,27 @@ func Test_producer_Produce_Successfully(t *testing.T) {
}
}

func Test_producer_Produce_interceptor_Successfully(t *testing.T) {
// Given
mw := &mockWriter{}
msg := Message{Headers: make([]Header, 0)}
msg.Headers = append(msg.Headers, kafka.Header{
Key: "x-correlation-id",
Value: []byte(utils.UUIDv4()),
})
interceptor := stubData.NewMockProducerInterceptor()

p := producer{w: mw, interceptor: &interceptor}

// When
err := p.Produce(context.Background(), msg)

// Then
if err != nil {
t.Fatalf("Producing err %s", err.Error())
}
}

func Test_producer_ProduceBatch_Successfully(t *testing.T) {
// Given
mw := &mockWriter{}
Expand All @@ -33,6 +56,20 @@ func Test_producer_ProduceBatch_Successfully(t *testing.T) {
}
}

func Test_producer_ProduceBatch_interceptor_Successfully(t *testing.T) {
// Given
mw := &mockWriter{}
interceptor := stubData.NewMockProducerInterceptor()
p := producer{w: mw, interceptor: &interceptor}

// When
err := p.ProduceBatch(context.Background(), []Message{{}, {}, {}})
// Then
if err != nil {
t.Fatalf("Batch Producing err %s", err.Error())
}
}

func Test_producer_Close_Successfully(t *testing.T) {
// Given
mw := &mockWriter{}
Expand All @@ -48,7 +85,7 @@ func Test_producer_Close_Successfully(t *testing.T) {

type mockWriter struct{}

func (m *mockWriter) WriteMessages(_ context.Context, _ ...kafka.Message) error {
func (m *mockWriter) WriteMessages(_ context.Context, msg ...kafka.Message) error {
return nil
}

Expand Down
134 changes: 107 additions & 27 deletions test/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"github.com/Trendyol/kafka-konsumer/v2"
stub_data "github.com/Trendyol/kafka-konsumer/v2/test/stub-data"
segmentio "github.com/segmentio/kafka-go"
"testing"
"time"
Expand All @@ -14,40 +15,99 @@ import (
func Test_Should_Produce_Successfully(t *testing.T) {
// Given
t.Parallel()
topic := "produce-topic"
brokerAddress := "localhost:9092"

producer, _ := kafka.NewProducer(&kafka.ProducerConfig{
Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}},
Transport: &kafka.TransportConfig{
MetadataTopics: []string{
topic,
t.Run("without interceptor", func(t *testing.T) {
//Given

topic := "produce-topic"
producer, _ := kafka.NewProducer(&kafka.ProducerConfig{
Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}},
Transport: &kafka.TransportConfig{
MetadataTopics: []string{
topic,
},
},
},
})
}, nil)

// When
err := producer.Produce(context.Background(), kafka.Message{
Key: []byte("1"),
Value: []byte(`foo`),
// When
err := producer.Produce(context.Background(), kafka.Message{
Key: []byte("1"),
Value: []byte(`foo`),
})

// Then
if err != nil {
t.Fatalf("Error while producing err %s", err.Error())
}
})

// Then
if err != nil {
t.Fatalf("Error while producing err %s", err.Error())
}
t.Run("with interceptor", func(t *testing.T) {
// Given
topic := "produce-interceptor-topic"
consumerGroup := "produce-topic-cg"
interceptor := stub_data.NewMockProducerInterceptor()

producer, _ := kafka.NewProducer(&kafka.ProducerConfig{
Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}},
Transport: &kafka.TransportConfig{
MetadataTopics: []string{
topic,
},
},
}, &interceptor)

// When
err := producer.Produce(context.Background(), kafka.Message{
Key: []byte("1"),
Value: []byte(`foo`),
})

messageCh := make(chan *kafka.Message)

consumerCfg := &kafka.ConsumerConfig{
Reader: kafka.ReaderConfig{Brokers: []string{brokerAddress}, Topic: topic, GroupID: consumerGroup},
ConsumeFn: func(message *kafka.Message) error {
messageCh <- message
return nil
},
}

consumer, _ := kafka.NewConsumer(consumerCfg)
defer consumer.Stop()

consumer.Consume()

// Then

if err != nil {
t.Fatalf("Error while producing err %s", err.Error())
}

actual := <-messageCh
if string(actual.Value) != "foo" {
t.Fatalf("Value does not equal %s", actual.Value)
}
if string(actual.Key) != "1" {
t.Fatalf("Key does not equal %s", actual.Key)
}
if len(actual.Headers) != 1 {
t.Fatalf("Header size does not equal %d", len(actual.Headers))
}
if string(actual.Headers[0].Key) != stub_data.XSourceAppKey {
t.Fatalf("Header key does not equal %s", actual.Headers[0].Key)
}
if string(actual.Headers[0].Value) != stub_data.XSourceAppValue {
t.Fatalf("Header value does not equal %s", actual.Headers[0].Value)
}
})
}

func Test_Should_Batch_Produce_Successfully(t *testing.T) {
// Given
t.Parallel()
topic := "batch-produce-topic"
brokerAddress := "localhost:9092"

producer, _ := kafka.NewProducer(&kafka.ProducerConfig{
Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}})

// When
msgs := []kafka.Message{
{
Key: []byte("1"),
Expand All @@ -59,13 +119,33 @@ func Test_Should_Batch_Produce_Successfully(t *testing.T) {
},
}

// When
err := producer.ProduceBatch(context.Background(), msgs)
t.Run("without interceptor", func(t *testing.T) {
producer, _ := kafka.NewProducer(&kafka.ProducerConfig{
Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}}, nil)

// Then
if err != nil {
t.Fatalf("Error while producing err %s", err.Error())
}
// When
err := producer.ProduceBatch(context.Background(), msgs)

// Then
if err != nil {
t.Fatalf("Error while producing err %s", err.Error())
}
})

t.Run("with interceptor", func(t *testing.T) {
interceptor := stub_data.NewMockProducerInterceptor()

producer, _ := kafka.NewProducer(&kafka.ProducerConfig{
Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}}, &interceptor)

// When
err := producer.ProduceBatch(context.Background(), msgs)

// Then
if err != nil {
t.Fatalf("Error while producing err %s", err.Error())
}
})
}

func Test_Should_Consume_Message_Successfully(t *testing.T) {
Expand Down
21 changes: 21 additions & 0 deletions test/stub-data/producer_interceptor_stub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package test

import "github.com/Trendyol/kafka-konsumer/v2"

type MockProducerInterceptor struct{}

const (
XSourceAppKey = "x-source-app"
XSourceAppValue = "kafka-konsumer"
)

func (i *MockProducerInterceptor) OnProduce(ctx kafka.ProducerInterceptorContext) {
ctx.Message.Headers = append(ctx.Message.Headers, kafka.Header{
Key: XSourceAppKey,
Value: []byte(XSourceAppValue),
})
}

func NewMockProducerInterceptor() kafka.ProducerInterceptor {
return &MockProducerInterceptor{}
}
Loading