From 6580c7ad51921b769da187b88780bac4eb564bd2 Mon Sep 17 00:00:00 2001 From: Emre Odabas Date: Thu, 1 Feb 2024 00:46:49 +0300 Subject: [PATCH 01/12] feat: adding message filter ability via configured message header key --- internal/cronsumer.go | 7 ++++ internal/message_header.go | 9 ++++ pkg/kafka/config.go | 20 +++++---- test/integration/integration_test.go | 63 ++++++++++++++++++++++++++++ 4 files changed, 92 insertions(+), 7 deletions(-) diff --git a/internal/cronsumer.go b/internal/cronsumer.go index c48db8c..f2cf609 100644 --- a/internal/cronsumer.go +++ b/internal/cronsumer.go @@ -60,6 +60,13 @@ func (k *kafkaCronsumer) Listen(ctx context.Context, strategyName string, cancel return } + if k.cfg.HeaderFilter != nil { + if FilterMessage(m.Headers, *k.cfg.HeaderFilter) { + k.cfg.Logger.Warnf("msg is skipped cause of message filter. Headers: %v", m.Headers) + return + } + } + msg := NewMessageWrapper(*m, strategyName) if msg.ProduceTime >= startTimeUnixNano { diff --git a/internal/message_header.go b/internal/message_header.go index f871a66..c51032a 100644 --- a/internal/message_header.go +++ b/internal/message_header.go @@ -93,3 +93,12 @@ func getMessageProduceTime(message *segmentio.Message) int64 { return 0 } + +func FilterMessage(headers []segmentio.Header, filter kafka.HeaderFilter) bool { + for i := range headers { + if headers[i].Key == filter.Key && string(headers[i].Value) == filter.Value { + return false + } + } + return true +} diff --git a/pkg/kafka/config.go b/pkg/kafka/config.go index 174e27d..0da6062 100644 --- a/pkg/kafka/config.go +++ b/pkg/kafka/config.go @@ -20,13 +20,14 @@ const ( ) type Config struct { - Brokers []string `yaml:"brokers"` - Consumer ConsumerConfig `yaml:"consumer"` - Producer ProducerConfig `yaml:"producer"` - SASL SASLConfig `yaml:"sasl"` - LogLevel logger.Level `yaml:"logLevel"` - Logger logger.Interface `yaml:"-"` - ClientID string `yaml:"clientId"` + Brokers []string `yaml:"brokers"` + Consumer ConsumerConfig `yaml:"consumer"` + Producer ProducerConfig `yaml:"producer"` + SASL SASLConfig `yaml:"sasl"` + LogLevel logger.Level `yaml:"logLevel"` + Logger logger.Interface `yaml:"-"` + ClientID string `yaml:"clientId"` + HeaderFilter *HeaderFilter `yaml:"headerFilter"` } type SASLConfig struct { @@ -65,6 +66,11 @@ type ProducerConfig struct { BatchTimeout time.Duration `yaml:"batchTimeout"` } +type HeaderFilter struct { + Key string `yaml:"key"` + Value string `yaml:"value"` +} + func (c *Config) SetDefaults() { if c.Consumer.MaxRetry == 0 { c.Consumer.MaxRetry = 3 diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index 0771608..247ee24 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -386,6 +386,69 @@ func Test_Should_Discard_Message_When_Retry_Count_Is_Equal_To_MaxRetrys_Value_Wi assertEventually(t, conditionFunc, 30*time.Second, time.Second) } +func Test_Should_Discard_Message_When_Header_Filter_Defined(t *testing.T) { + // Given + topic := "exception-header-filter" + key, value := "filter_key", "filter_value" + conn, cleanUp := createTopic(t, topic) + defer cleanUp() + + maxRetry := 1 + config := &kafka.Config{ + Brokers: []string{"localhost:9092"}, + Consumer: kafka.ConsumerConfig{ + GroupID: "sample-consumer", + Topic: topic, + Cron: "*/1 * * * *", + Duration: 20 * time.Second, + MaxRetry: maxRetry, + }, + LogLevel: "info", + HeaderFilter: &kafka.HeaderFilter{ + Key: key, + Value: value, + }, + } + + respCh := make(chan kafka.Message) + var consumeFn kafka.ConsumeFn = func(message kafka.Message) error { + fmt.Printf("consumer > Message received. Headers: %v\n", message.Headers) + respCh <- message + return nil + } + + c := cronsumer.New(config, consumeFn) + c.Start() + + // When + producedMessages := []kafka.Message{ + {Topic: topic, Value: []byte("some message"), Key: []byte("some key")}, + {Topic: topic, Value: []byte("real message"), Key: []byte("some key"), Headers: []kafka.Header{{ + Key: key, + Value: []byte(value), + }, + }}, + } + if err := c.ProduceBatch(producedMessages); err != nil { + fmt.Println("Produce err", err.Error()) + } + + // Then + actualMessage := <-respCh + if string(actualMessage.Value) != "real message" { + t.Errorf("Expected: %s, Actual: %s", value, actualMessage.Value) + } + + var expectedOffset int64 = 2 + conditionFunc := func() bool { + lastOffset, _ := conn.ReadLastOffset() + fmt.Println("lastOffset", lastOffset) + return lastOffset == expectedOffset + } + + assertEventually(t, conditionFunc, 30*time.Second, time.Second) +} + func getRetryCount(message kafka.Message) int { for _, header := range message.Headers { if header.Key == "x-retry-count" { From 8162b77a17448a388cd57f4e6342e4c911b34912 Mon Sep 17 00:00:00 2001 From: emreodabas Date: Thu, 1 Feb 2024 16:56:49 +0300 Subject: [PATCH 02/12] feat: Users could use header filter function for filtering messages --- internal/cronsumer.go | 12 +++++------- internal/message_header.go | 9 --------- pkg/kafka/config.go | 21 +++++++++------------ test/integration/integration_test.go | 12 ++++++++---- 4 files changed, 22 insertions(+), 32 deletions(-) diff --git a/internal/cronsumer.go b/internal/cronsumer.go index f2cf609..2933ae0 100644 --- a/internal/cronsumer.go +++ b/internal/cronsumer.go @@ -60,15 +60,13 @@ func (k *kafkaCronsumer) Listen(ctx context.Context, strategyName string, cancel return } - if k.cfg.HeaderFilter != nil { - if FilterMessage(m.Headers, *k.cfg.HeaderFilter) { - k.cfg.Logger.Warnf("msg is skipped cause of message filter. Headers: %v", m.Headers) - return - } - } - msg := NewMessageWrapper(*m, strategyName) + if k.cfg.Consumer.HeaderFilterFn != nil && k.cfg.Consumer.HeaderFilterFn(msg.Headers) { + k.cfg.Logger.Warnf("Message is not processed. Header filter applied. Headers: %v", msg.Headers) + return + } + if msg.ProduceTime >= startTimeUnixNano { (*cancelFuncWrapper)() diff --git a/internal/message_header.go b/internal/message_header.go index c51032a..f871a66 100644 --- a/internal/message_header.go +++ b/internal/message_header.go @@ -93,12 +93,3 @@ func getMessageProduceTime(message *segmentio.Message) int64 { return 0 } - -func FilterMessage(headers []segmentio.Header, filter kafka.HeaderFilter) bool { - for i := range headers { - if headers[i].Key == filter.Key && string(headers[i].Value) == filter.Value { - return false - } - } - return true -} diff --git a/pkg/kafka/config.go b/pkg/kafka/config.go index 0da6062..5a3c6ab 100644 --- a/pkg/kafka/config.go +++ b/pkg/kafka/config.go @@ -20,14 +20,13 @@ const ( ) type Config struct { - Brokers []string `yaml:"brokers"` - Consumer ConsumerConfig `yaml:"consumer"` - Producer ProducerConfig `yaml:"producer"` - SASL SASLConfig `yaml:"sasl"` - LogLevel logger.Level `yaml:"logLevel"` - Logger logger.Interface `yaml:"-"` - ClientID string `yaml:"clientId"` - HeaderFilter *HeaderFilter `yaml:"headerFilter"` + Brokers []string `yaml:"brokers"` + Consumer ConsumerConfig `yaml:"consumer"` + Producer ProducerConfig `yaml:"producer"` + SASL SASLConfig `yaml:"sasl"` + LogLevel logger.Level `yaml:"logLevel"` + Logger logger.Interface `yaml:"-"` + ClientID string `yaml:"clientId"` } type SASLConfig struct { @@ -59,6 +58,7 @@ type ConsumerConfig struct { Duration time.Duration `yaml:"duration"` Cron string `yaml:"cron"` BackOffStrategy BackoffStrategyInterface `yaml:"backOffStrategy"` + HeaderFilterFn HeaderFilterFn `yaml:"headerFilterFn"` } type ProducerConfig struct { @@ -66,10 +66,7 @@ type ProducerConfig struct { BatchTimeout time.Duration `yaml:"batchTimeout"` } -type HeaderFilter struct { - Key string `yaml:"key"` - Value string `yaml:"value"` -} +type HeaderFilterFn func(headers []Header) bool func (c *Config) SetDefaults() { if c.Consumer.MaxRetry == 0 { diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index 247ee24..da5a54c 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -402,12 +402,16 @@ func Test_Should_Discard_Message_When_Header_Filter_Defined(t *testing.T) { Cron: "*/1 * * * *", Duration: 20 * time.Second, MaxRetry: maxRetry, + HeaderFilterFn: func(headers []kafka.Header) bool { + for i := range headers { + if headers[i].Key == key && string(headers[i].Value) == value { + return false + } + } + return true + }, }, LogLevel: "info", - HeaderFilter: &kafka.HeaderFilter{ - Key: key, - Value: value, - }, } respCh := make(chan kafka.Message) From 14a91c3f3c9466b45632d431fc7bde05c1c98e22 Mon Sep 17 00:00:00 2001 From: Emre Kosen Date: Sun, 4 Feb 2024 23:24:17 +0300 Subject: [PATCH 03/12] feat: add example for consumer with header filter function --- README.md | 1 + .../go.mod | 31 +++++++++ .../go.sum | 69 +++++++++++++++++++ .../main.go | 29 ++++++++ examples/single-consumer/main.go | 19 +++-- 5 files changed, 145 insertions(+), 4 deletions(-) create mode 100644 examples/single-consumer-with-header-filter-function/go.mod create mode 100644 examples/single-consumer-with-header-filter-function/go.sum create mode 100644 examples/single-consumer-with-header-filter-function/main.go diff --git a/README.md b/README.md index b80b86a..49aa173 100644 --- a/README.md +++ b/README.md @@ -148,6 +148,7 @@ func StartAPI(cfg kafka.Config, metricCollectors ...prometheus.Collector) { | `consumer.rebalanceTimeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.RebalanceTimeout) | 30s | | | `consumer.startOffset` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.StartOffset) | earliest | | | `consumer.retentionTime` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.RetentionTime) | 24h | | +| `consumer.HeaderFilterFn` | | nil | | | `producer.clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#Transport) | | | | `producer.batchSize` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#Writer.BatchSize) | 100 | | | `producer.batchTimeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#Writer.BatchTimeout) | 1s | | diff --git a/examples/single-consumer-with-header-filter-function/go.mod b/examples/single-consumer-with-header-filter-function/go.mod new file mode 100644 index 0000000..4c31f1e --- /dev/null +++ b/examples/single-consumer-with-header-filter-function/go.mod @@ -0,0 +1,31 @@ +module single-consumer + +go 1.19 + +replace github.com/Trendyol/kafka-cronsumer => ../.. + +require github.com/Trendyol/kafka-cronsumer v0.0.0-00010101000000-000000000000 + +require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/golang/protobuf v1.5.3 // indirect + github.com/klauspost/compress v1.16.4 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect + github.com/pierrec/lz4/v4 v4.1.17 // indirect + github.com/prometheus/client_golang v1.16.0 // indirect + github.com/prometheus/client_model v0.3.0 // indirect + github.com/prometheus/common v0.42.0 // indirect + github.com/prometheus/procfs v0.10.1 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect + github.com/segmentio/kafka-go v0.4.42 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.1.2 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect + go.uber.org/atomic v1.7.0 // indirect + go.uber.org/multierr v1.6.0 // indirect + go.uber.org/zap v1.24.0 // indirect + golang.org/x/sys v0.8.0 // indirect + golang.org/x/text v0.7.0 // indirect + google.golang.org/protobuf v1.30.0 // indirect +) diff --git a/examples/single-consumer-with-header-filter-function/go.sum b/examples/single-consumer-with-header-filter-function/go.sum new file mode 100644 index 0000000..e08be6f --- /dev/null +++ b/examples/single-consumer-with-header-filter-function/go.sum @@ -0,0 +1,69 @@ +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/klauspost/compress v1.16.4/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= +github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.16.0/go.mod h1:Zsulrv/L9oM40tJ7T815tM89lFEugiJ9HzIqaAx4LKc= +github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w= +github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc= +github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= +github.com/segmentio/kafka-go v0.4.42/go.mod h1:d0g15xPMqoUookug0OU75DhGZxXwCFxSLeJ4uphwJzg= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/examples/single-consumer-with-header-filter-function/main.go b/examples/single-consumer-with-header-filter-function/main.go new file mode 100644 index 0000000..7ba89c8 --- /dev/null +++ b/examples/single-consumer-with-header-filter-function/main.go @@ -0,0 +1,29 @@ +package main + +import ( + "fmt" + cronsumer "github.com/Trendyol/kafka-cronsumer" + "github.com/Trendyol/kafka-cronsumer/pkg/kafka" + "time" +) + +func main() { + config := &kafka.Config{ + Brokers: []string{"localhost:29092"}, + Consumer: kafka.ConsumerConfig{ + GroupID: "sample-consumer", + Topic: "exception", + Cron: "*/1 * * * *", + Duration: 20 * time.Second, + }, + LogLevel: "info", + } + + var consumeFn kafka.ConsumeFn = func(message kafka.Message) error { + fmt.Printf("consumer > Message received: %s\n", string(message.Value)) + return nil + } + + c := cronsumer.New(config, consumeFn) + c.Run() +} diff --git a/examples/single-consumer/main.go b/examples/single-consumer/main.go index 7ba89c8..546035e 100644 --- a/examples/single-consumer/main.go +++ b/examples/single-consumer/main.go @@ -7,14 +7,25 @@ import ( "time" ) +func SampleHeaderFilterFn(headers []kafka.Header) bool { + for i, header := range headers { + if header.Key == "key" && string(headers[i].Value) == "value" { + // Will consume message if the required condition is met + return false + } + } + return true +} + func main() { config := &kafka.Config{ Brokers: []string{"localhost:29092"}, Consumer: kafka.ConsumerConfig{ - GroupID: "sample-consumer", - Topic: "exception", - Cron: "*/1 * * * *", - Duration: 20 * time.Second, + GroupID: "sample-consumer", + Topic: "exception", + Cron: "*/1 * * * *", + Duration: 20 * time.Second, + HeaderFilterFn: SampleHeaderFilterFn, }, LogLevel: "info", } From 71171a8fcc82896f88eec45215aefe1e9e1fc3ab Mon Sep 17 00:00:00 2001 From: Emre Kosen Date: Sun, 4 Feb 2024 23:28:07 +0300 Subject: [PATCH 04/12] feat: update readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 49aa173..f4ddc7b 100644 --- a/README.md +++ b/README.md @@ -148,7 +148,7 @@ func StartAPI(cfg kafka.Config, metricCollectors ...prometheus.Collector) { | `consumer.rebalanceTimeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.RebalanceTimeout) | 30s | | | `consumer.startOffset` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.StartOffset) | earliest | | | `consumer.retentionTime` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.RetentionTime) | 24h | | -| `consumer.HeaderFilterFn` | | nil | | +| `consumer.HeaderFilterFn` | Function to filter messages based on headers | nil | | | `producer.clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#Transport) | | | | `producer.batchSize` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#Writer.BatchSize) | 100 | | | `producer.batchTimeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#Writer.BatchTimeout) | 1s | | From 5ba8c618b045bb922eb2f6838fc331722afa5273 Mon Sep 17 00:00:00 2001 From: Emre Kosen Date: Sun, 4 Feb 2024 23:35:49 +0300 Subject: [PATCH 05/12] feat: fix wrong example file --- .../main.go | 19 +++++++++++++++---- examples/single-consumer/main.go | 19 ++++--------------- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/examples/single-consumer-with-header-filter-function/main.go b/examples/single-consumer-with-header-filter-function/main.go index 7ba89c8..546035e 100644 --- a/examples/single-consumer-with-header-filter-function/main.go +++ b/examples/single-consumer-with-header-filter-function/main.go @@ -7,14 +7,25 @@ import ( "time" ) +func SampleHeaderFilterFn(headers []kafka.Header) bool { + for i, header := range headers { + if header.Key == "key" && string(headers[i].Value) == "value" { + // Will consume message if the required condition is met + return false + } + } + return true +} + func main() { config := &kafka.Config{ Brokers: []string{"localhost:29092"}, Consumer: kafka.ConsumerConfig{ - GroupID: "sample-consumer", - Topic: "exception", - Cron: "*/1 * * * *", - Duration: 20 * time.Second, + GroupID: "sample-consumer", + Topic: "exception", + Cron: "*/1 * * * *", + Duration: 20 * time.Second, + HeaderFilterFn: SampleHeaderFilterFn, }, LogLevel: "info", } diff --git a/examples/single-consumer/main.go b/examples/single-consumer/main.go index 546035e..7ba89c8 100644 --- a/examples/single-consumer/main.go +++ b/examples/single-consumer/main.go @@ -7,25 +7,14 @@ import ( "time" ) -func SampleHeaderFilterFn(headers []kafka.Header) bool { - for i, header := range headers { - if header.Key == "key" && string(headers[i].Value) == "value" { - // Will consume message if the required condition is met - return false - } - } - return true -} - func main() { config := &kafka.Config{ Brokers: []string{"localhost:29092"}, Consumer: kafka.ConsumerConfig{ - GroupID: "sample-consumer", - Topic: "exception", - Cron: "*/1 * * * *", - Duration: 20 * time.Second, - HeaderFilterFn: SampleHeaderFilterFn, + GroupID: "sample-consumer", + Topic: "exception", + Cron: "*/1 * * * *", + Duration: 20 * time.Second, }, LogLevel: "info", } From c2af82283e75fb83fca69b6bf32f1040f3204428 Mon Sep 17 00:00:00 2001 From: Emre Kosen Date: Mon, 5 Feb 2024 00:59:22 +0300 Subject: [PATCH 06/12] feat: change log level and add condition to integration test --- internal/cronsumer.go | 2 +- test/integration/integration_test.go | 20 +++++++++----------- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/internal/cronsumer.go b/internal/cronsumer.go index 2933ae0..f384259 100644 --- a/internal/cronsumer.go +++ b/internal/cronsumer.go @@ -63,7 +63,7 @@ func (k *kafkaCronsumer) Listen(ctx context.Context, strategyName string, cancel msg := NewMessageWrapper(*m, strategyName) if k.cfg.Consumer.HeaderFilterFn != nil && k.cfg.Consumer.HeaderFilterFn(msg.Headers) { - k.cfg.Logger.Warnf("Message is not processed. Header filter applied. Headers: %v", msg.Headers) + k.cfg.Logger.Infof("Message is not processed. Header filter applied. Headers: %v", msg.Headers) return } diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index da5a54c..51e3f3d 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -390,7 +390,7 @@ func Test_Should_Discard_Message_When_Header_Filter_Defined(t *testing.T) { // Given topic := "exception-header-filter" key, value := "filter_key", "filter_value" - conn, cleanUp := createTopic(t, topic) + _, cleanUp := createTopic(t, topic) defer cleanUp() maxRetry := 1 @@ -414,7 +414,8 @@ func Test_Should_Discard_Message_When_Header_Filter_Defined(t *testing.T) { LogLevel: "info", } - respCh := make(chan kafka.Message) + respCh := make(chan kafka.Message, 2) + defer close(respCh) var consumeFn kafka.ConsumeFn = func(message kafka.Message) error { fmt.Printf("consumer > Message received. Headers: %v\n", message.Headers) respCh <- message @@ -438,19 +439,16 @@ func Test_Should_Discard_Message_When_Header_Filter_Defined(t *testing.T) { } // Then + conditionFunc := func() bool { + messageCount := len(respCh) + return messageCount == 1 + } + assertEventually(t, conditionFunc, 30*time.Second, time.Second) + actualMessage := <-respCh if string(actualMessage.Value) != "real message" { t.Errorf("Expected: %s, Actual: %s", value, actualMessage.Value) } - - var expectedOffset int64 = 2 - conditionFunc := func() bool { - lastOffset, _ := conn.ReadLastOffset() - fmt.Println("lastOffset", lastOffset) - return lastOffset == expectedOffset - } - - assertEventually(t, conditionFunc, 30*time.Second, time.Second) } func getRetryCount(message kafka.Message) int { From f30425b3277a5bdff00c36077fd7418a8fb34348 Mon Sep 17 00:00:00 2001 From: Emre Kosen Date: Mon, 5 Feb 2024 13:09:50 +0300 Subject: [PATCH 07/12] feat: move header filter function to cronsumer struct --- internal/cronsumer.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/cronsumer.go b/internal/cronsumer.go index f384259..9f809a4 100644 --- a/internal/cronsumer.go +++ b/internal/cronsumer.go @@ -18,6 +18,7 @@ type kafkaCronsumer struct { metric *CronsumerMetric maxRetry int deadLetterTopic string + headerFilterFn kafka.HeaderFilterFn cfg *kafka.Config } @@ -32,6 +33,7 @@ func newKafkaCronsumer(cfg *kafka.Config, c func(message kafka.Message) error) * kafkaConsumer: newConsumer(cfg), kafkaProducer: newProducer(cfg), consumeFn: c, + headerFilterFn: cfg.Consumer.HeaderFilterFn, metric: &CronsumerMetric{}, maxRetry: cfg.Consumer.MaxRetry, deadLetterTopic: cfg.Consumer.DeadLetterTopic, @@ -62,7 +64,7 @@ func (k *kafkaCronsumer) Listen(ctx context.Context, strategyName string, cancel msg := NewMessageWrapper(*m, strategyName) - if k.cfg.Consumer.HeaderFilterFn != nil && k.cfg.Consumer.HeaderFilterFn(msg.Headers) { + if k.headerFilterFn != nil && k.headerFilterFn(msg.Headers) { k.cfg.Logger.Infof("Message is not processed. Header filter applied. Headers: %v", msg.Headers) return } From a5c9e976d3c3591ecf165873a739d4b8db713645 Mon Sep 17 00:00:00 2001 From: Abdulsametileri Date: Sun, 11 Feb 2024 12:45:27 +0300 Subject: [PATCH 08/12] feat: change from headerFilterFn to skipMessageByHeaderFn for clarification --- .../main.go | 30 +++++++------- internal/cronsumer.go | 28 ++++++------- pkg/kafka/config.go | 40 +++++++++---------- test/integration/integration_test.go | 38 ++++++------------ 4 files changed, 62 insertions(+), 74 deletions(-) diff --git a/examples/single-consumer-with-header-filter-function/main.go b/examples/single-consumer-with-header-filter-function/main.go index 546035e..dac87b1 100644 --- a/examples/single-consumer-with-header-filter-function/main.go +++ b/examples/single-consumer-with-header-filter-function/main.go @@ -7,25 +7,15 @@ import ( "time" ) -func SampleHeaderFilterFn(headers []kafka.Header) bool { - for i, header := range headers { - if header.Key == "key" && string(headers[i].Value) == "value" { - // Will consume message if the required condition is met - return false - } - } - return true -} - func main() { config := &kafka.Config{ Brokers: []string{"localhost:29092"}, Consumer: kafka.ConsumerConfig{ - GroupID: "sample-consumer", - Topic: "exception", - Cron: "*/1 * * * *", - Duration: 20 * time.Second, - HeaderFilterFn: SampleHeaderFilterFn, + GroupID: "sample-consumer", + Topic: "exception", + Cron: "*/1 * * * *", + Duration: 20 * time.Second, + SkipMessageByHeaderFn: SkipMessageByHeaderFn, }, LogLevel: "info", } @@ -38,3 +28,13 @@ func main() { c := cronsumer.New(config, consumeFn) c.Run() } + +func SkipMessageByHeaderFn(headers []kafka.Header) bool { + for _, header := range headers { + if header.Key == "skipMessage" { + // If a kafka message comes with `skipMessage` header key, it will be skipped! + return true + } + } + return false +} diff --git a/internal/cronsumer.go b/internal/cronsumer.go index 9f809a4..eea6933 100644 --- a/internal/cronsumer.go +++ b/internal/cronsumer.go @@ -15,10 +15,10 @@ type kafkaCronsumer struct { consumeFn func(message kafka.Message) error - metric *CronsumerMetric - maxRetry int - deadLetterTopic string - headerFilterFn kafka.HeaderFilterFn + metric *CronsumerMetric + maxRetry int + deadLetterTopic string + skipMessageByHeaderFn kafka.SkipMessageByHeaderFn cfg *kafka.Config } @@ -28,15 +28,15 @@ func newKafkaCronsumer(cfg *kafka.Config, c func(message kafka.Message) error) * cfg.Validate() return &kafkaCronsumer{ - cfg: cfg, - messageChannel: make(chan MessageWrapper), - kafkaConsumer: newConsumer(cfg), - kafkaProducer: newProducer(cfg), - consumeFn: c, - headerFilterFn: cfg.Consumer.HeaderFilterFn, - metric: &CronsumerMetric{}, - maxRetry: cfg.Consumer.MaxRetry, - deadLetterTopic: cfg.Consumer.DeadLetterTopic, + cfg: cfg, + messageChannel: make(chan MessageWrapper), + kafkaConsumer: newConsumer(cfg), + kafkaProducer: newProducer(cfg), + consumeFn: c, + skipMessageByHeaderFn: cfg.Consumer.SkipMessageByHeaderFn, + metric: &CronsumerMetric{}, + maxRetry: cfg.Consumer.MaxRetry, + deadLetterTopic: cfg.Consumer.DeadLetterTopic, } } @@ -64,7 +64,7 @@ func (k *kafkaCronsumer) Listen(ctx context.Context, strategyName string, cancel msg := NewMessageWrapper(*m, strategyName) - if k.headerFilterFn != nil && k.headerFilterFn(msg.Headers) { + if k.skipMessageByHeaderFn != nil && k.skipMessageByHeaderFn(msg.Headers) { k.cfg.Logger.Infof("Message is not processed. Header filter applied. Headers: %v", msg.Headers) return } diff --git a/pkg/kafka/config.go b/pkg/kafka/config.go index 5a3c6ab..77cd702 100644 --- a/pkg/kafka/config.go +++ b/pkg/kafka/config.go @@ -40,25 +40,25 @@ type SASLConfig struct { } type ConsumerConfig struct { - ClientID string `yaml:"clientId"` - GroupID string `yaml:"groupId"` - Topic string `yaml:"topic"` - DeadLetterTopic string `yaml:"deadLetterTopic"` - MinBytes int `yaml:"minBytes"` - MaxBytes int `yaml:"maxBytes"` - MaxRetry int `yaml:"maxRetry"` - MaxWait time.Duration `yaml:"maxWait"` - CommitInterval time.Duration `yaml:"commitInterval"` - HeartbeatInterval time.Duration `yaml:"heartbeatInterval"` - SessionTimeout time.Duration `yaml:"sessionTimeout"` - RebalanceTimeout time.Duration `yaml:"rebalanceTimeout"` - StartOffset Offset `yaml:"startOffset"` - RetentionTime time.Duration `yaml:"retentionTime"` - Concurrency int `yaml:"concurrency"` - Duration time.Duration `yaml:"duration"` - Cron string `yaml:"cron"` - BackOffStrategy BackoffStrategyInterface `yaml:"backOffStrategy"` - HeaderFilterFn HeaderFilterFn `yaml:"headerFilterFn"` + ClientID string `yaml:"clientId"` + GroupID string `yaml:"groupId"` + Topic string `yaml:"topic"` + DeadLetterTopic string `yaml:"deadLetterTopic"` + MinBytes int `yaml:"minBytes"` + MaxBytes int `yaml:"maxBytes"` + MaxRetry int `yaml:"maxRetry"` + MaxWait time.Duration `yaml:"maxWait"` + CommitInterval time.Duration `yaml:"commitInterval"` + HeartbeatInterval time.Duration `yaml:"heartbeatInterval"` + SessionTimeout time.Duration `yaml:"sessionTimeout"` + RebalanceTimeout time.Duration `yaml:"rebalanceTimeout"` + StartOffset Offset `yaml:"startOffset"` + RetentionTime time.Duration `yaml:"retentionTime"` + Concurrency int `yaml:"concurrency"` + Duration time.Duration `yaml:"duration"` + Cron string `yaml:"cron"` + BackOffStrategy BackoffStrategyInterface `yaml:"backOffStrategy"` + SkipMessageByHeaderFn SkipMessageByHeaderFn `yaml:"headerFilterFn"` } type ProducerConfig struct { @@ -66,7 +66,7 @@ type ProducerConfig struct { BatchTimeout time.Duration `yaml:"batchTimeout"` } -type HeaderFilterFn func(headers []Header) bool +type SkipMessageByHeaderFn func(headers []Header) bool func (c *Config) SetDefaults() { if c.Consumer.MaxRetry == 0 { diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index 51e3f3d..d23acce 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -389,11 +389,9 @@ func Test_Should_Discard_Message_When_Retry_Count_Is_Equal_To_MaxRetrys_Value_Wi func Test_Should_Discard_Message_When_Header_Filter_Defined(t *testing.T) { // Given topic := "exception-header-filter" - key, value := "filter_key", "filter_value" _, cleanUp := createTopic(t, topic) defer cleanUp() - maxRetry := 1 config := &kafka.Config{ Brokers: []string{"localhost:9092"}, Consumer: kafka.ConsumerConfig{ @@ -401,21 +399,18 @@ func Test_Should_Discard_Message_When_Header_Filter_Defined(t *testing.T) { Topic: topic, Cron: "*/1 * * * *", Duration: 20 * time.Second, - MaxRetry: maxRetry, - HeaderFilterFn: func(headers []kafka.Header) bool { + SkipMessageByHeaderFn: func(headers []kafka.Header) bool { for i := range headers { - if headers[i].Key == key && string(headers[i].Value) == value { - return false + if headers[i].Key == "skipMessage" { + return true } } - return true + return false }, }, - LogLevel: "info", } - respCh := make(chan kafka.Message, 2) - defer close(respCh) + respCh := make(chan kafka.Message) var consumeFn kafka.ConsumeFn = func(message kafka.Message) error { fmt.Printf("consumer > Message received. Headers: %v\n", message.Headers) respCh <- message @@ -427,27 +422,20 @@ func Test_Should_Discard_Message_When_Header_Filter_Defined(t *testing.T) { // When producedMessages := []kafka.Message{ - {Topic: topic, Value: []byte("some message"), Key: []byte("some key")}, - {Topic: topic, Value: []byte("real message"), Key: []byte("some key"), Headers: []kafka.Header{{ - Key: key, - Value: []byte(value), - }, - }}, + {Topic: topic, Key: []byte("real message")}, + {Topic: topic, Key: []byte("will be skipped message"), Headers: []kafka.Header{{ + Key: "skipMessage", + }}}, } if err := c.ProduceBatch(producedMessages); err != nil { - fmt.Println("Produce err", err.Error()) + t.Fatalf("error producing batch %s", err) } // Then - conditionFunc := func() bool { - messageCount := len(respCh) - return messageCount == 1 - } - assertEventually(t, conditionFunc, 30*time.Second, time.Second) - actualMessage := <-respCh - if string(actualMessage.Value) != "real message" { - t.Errorf("Expected: %s, Actual: %s", value, actualMessage.Value) + + if string(actualMessage.Key) != "real message" { + t.Errorf("Expected: %s, Actual: %s", "real message", actualMessage.Value) } } From bee6a49095c4288c26115c81e95d2e72bdf4578f Mon Sep 17 00:00:00 2001 From: Abdulsametileri Date: Sun, 11 Feb 2024 13:27:51 +0300 Subject: [PATCH 09/12] chore: integration --- test/integration/integration_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index d23acce..261d347 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -399,6 +399,7 @@ func Test_Should_Discard_Message_When_Header_Filter_Defined(t *testing.T) { Topic: topic, Cron: "*/1 * * * *", Duration: 20 * time.Second, + MaxRetry: 1, SkipMessageByHeaderFn: func(headers []kafka.Header) bool { for i := range headers { if headers[i].Key == "skipMessage" { @@ -494,7 +495,7 @@ func createTopic(t *testing.T, topicName string) (*segmentio.Conn, func()) { } cleanUp := func() { - if err := conn.DeleteTopics(topicName); err != nil { + if err = conn.DeleteTopics(topicName); err != nil { fmt.Println("err deleting topic", err.Error()) } } From cebb2e9a8aa2d2a2980bb2a0abf1fd8fcf686731 Mon Sep 17 00:00:00 2001 From: Abdulsametileri Date: Sun, 11 Feb 2024 13:38:51 +0300 Subject: [PATCH 10/12] feat: add continue instead of return --- README.md | 62 +++++++++++++++++++++---------------------- internal/cronsumer.go | 2 +- 2 files changed, 32 insertions(+), 32 deletions(-) diff --git a/README.md b/README.md index f4ddc7b..fce600e 100644 --- a/README.md +++ b/README.md @@ -128,37 +128,37 @@ func StartAPI(cfg kafka.Config, metricCollectors ...prometheus.Collector) { ## Configurations -| config | description | default | example | -|------------------------------|----------------------------------------------------------------------------------------------------|----------|--------------------------| -| `logLevel` | Describes log level, valid options are `debug`, `info`, `warn`, and `error` | info | | -| `consumer.clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#Dialer) | | | -| `consumer.cron` | Cron expression when exception consumer starts to work at | | */1 * * * * | -| `consumer.backOffStrategy` | Define consumer backoff strategy for retry topics | fixed | exponential, linear | -| `consumer.duration` | Work duration exception consumer actively consuming messages | | 20s, 15m, 1h | -| `consumer.topic` | Exception topic names | | exception-topic | -| `consumer.groupId` | Exception consumer group id | | exception-consumer-group | -| `consumer.maxRetry` | Maximum retry value for attempting to retry a message | 3 | | -| `consumer.concurrency` | Number of goroutines used at listeners | 1 | | -| `consumer.minBytes` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.MinBytes) | 1 | | -| `consumer.maxBytes` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.MaxBytes) | 1 MB | | -| `consumer.maxWait` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.MaxWait) | 10s | | -| `consumer.commitInterval` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.CommitInterval) | 1s | | -| `consumer.heartbeatInterval` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.HeartbeatInterval) | 3s | | -| `consumer.sessionTimeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.SessionTimeout) | 30s | | -| `consumer.rebalanceTimeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.RebalanceTimeout) | 30s | | -| `consumer.startOffset` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.StartOffset) | earliest | | -| `consumer.retentionTime` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.RetentionTime) | 24h | | -| `consumer.HeaderFilterFn` | Function to filter messages based on headers | nil | | -| `producer.clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#Transport) | | | -| `producer.batchSize` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#Writer.BatchSize) | 100 | | -| `producer.batchTimeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#Writer.BatchTimeout) | 1s | | -| `sasl.enabled` | It enables sasl authentication mechanism | false | | -| `sasl.authType` | Currently we only support `SCRAM` | "" | | -| `sasl.username` | SCRAM username | "" | | -| `sasl.password` | SCRAM password | "" | | -| `sasl.rootCAPath` | [see doc](https://pkg.go.dev/crypto/tls#Config.RootCAs) | "" | | -| `sasl.intermediateCAPath` | | "" | | -| `sasl.rack` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#RackAffinityGroupBalancer) | "" | | +| config | description | default | example | +|----------------------------------|----------------------------------------------------------------------------------------------------|----------|--------------------------| +| `logLevel` | Describes log level, valid options are `debug`, `info`, `warn`, and `error` | info | | +| `consumer.clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#Dialer) | | | +| `consumer.cron` | Cron expression when exception consumer starts to work at | | */1 * * * * | +| `consumer.backOffStrategy` | Define consumer backoff strategy for retry topics | fixed | exponential, linear | +| `consumer.duration` | Work duration exception consumer actively consuming messages | | 20s, 15m, 1h | +| `consumer.topic` | Exception topic names | | exception-topic | +| `consumer.groupId` | Exception consumer group id | | exception-consumer-group | +| `consumer.maxRetry` | Maximum retry value for attempting to retry a message | 3 | | +| `consumer.concurrency` | Number of goroutines used at listeners | 1 | | +| `consumer.minBytes` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.MinBytes) | 1 | | +| `consumer.maxBytes` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.MaxBytes) | 1 MB | | +| `consumer.maxWait` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.MaxWait) | 10s | | +| `consumer.commitInterval` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.CommitInterval) | 1s | | +| `consumer.heartbeatInterval` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.HeartbeatInterval) | 3s | | +| `consumer.sessionTimeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.SessionTimeout) | 30s | | +| `consumer.rebalanceTimeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.RebalanceTimeout) | 30s | | +| `consumer.startOffset` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.StartOffset) | earliest | | +| `consumer.retentionTime` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.RetentionTime) | 24h | | +| `consumer.SkipMessageByHeaderFn` | Function to filter messages based on headers, return true if you want to skip the message | nil | | +| `producer.clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#Transport) | | | +| `producer.batchSize` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#Writer.BatchSize) | 100 | | +| `producer.batchTimeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#Writer.BatchTimeout) | 1s | | +| `sasl.enabled` | It enables sasl authentication mechanism | false | | +| `sasl.authType` | Currently we only support `SCRAM` | "" | | +| `sasl.username` | SCRAM username | "" | | +| `sasl.password` | SCRAM password | "" | | +| `sasl.rootCAPath` | [see doc](https://pkg.go.dev/crypto/tls#Config.RootCAs) | "" | | +| `sasl.intermediateCAPath` | | "" | | +| `sasl.rack` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#RackAffinityGroupBalancer) | "" | | ### Exposed Metrics diff --git a/internal/cronsumer.go b/internal/cronsumer.go index eea6933..bed7343 100644 --- a/internal/cronsumer.go +++ b/internal/cronsumer.go @@ -66,7 +66,7 @@ func (k *kafkaCronsumer) Listen(ctx context.Context, strategyName string, cancel if k.skipMessageByHeaderFn != nil && k.skipMessageByHeaderFn(msg.Headers) { k.cfg.Logger.Infof("Message is not processed. Header filter applied. Headers: %v", msg.Headers) - return + continue } if msg.ProduceTime >= startTimeUnixNano { From a17bdf581e8b99c898a57f9c82ee2aad3683ddb1 Mon Sep 17 00:00:00 2001 From: Abdulsametileri Date: Sun, 11 Feb 2024 13:40:19 +0300 Subject: [PATCH 11/12] chore: readme --- README.md | 2 +- pkg/kafka/config.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index fce600e..dfb76da 100644 --- a/README.md +++ b/README.md @@ -148,7 +148,7 @@ func StartAPI(cfg kafka.Config, metricCollectors ...prometheus.Collector) { | `consumer.rebalanceTimeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.RebalanceTimeout) | 30s | | | `consumer.startOffset` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.StartOffset) | earliest | | | `consumer.retentionTime` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.RetentionTime) | 24h | | -| `consumer.SkipMessageByHeaderFn` | Function to filter messages based on headers, return true if you want to skip the message | nil | | +| `consumer.skipMessageByHeaderFn` | Function to filter messages based on headers, return true if you want to skip the message | nil | | | `producer.clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#Transport) | | | | `producer.batchSize` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#Writer.BatchSize) | 100 | | | `producer.batchTimeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#Writer.BatchTimeout) | 1s | | diff --git a/pkg/kafka/config.go b/pkg/kafka/config.go index 77cd702..60a7597 100644 --- a/pkg/kafka/config.go +++ b/pkg/kafka/config.go @@ -58,7 +58,7 @@ type ConsumerConfig struct { Duration time.Duration `yaml:"duration"` Cron string `yaml:"cron"` BackOffStrategy BackoffStrategyInterface `yaml:"backOffStrategy"` - SkipMessageByHeaderFn SkipMessageByHeaderFn `yaml:"headerFilterFn"` + SkipMessageByHeaderFn SkipMessageByHeaderFn `yaml:"skipMessageByHeaderFn"` } type ProducerConfig struct { From 1b89837d9f368978c9c3423552d89e5013c8d0a0 Mon Sep 17 00:00:00 2001 From: Abdulsametileri Date: Sun, 11 Feb 2024 13:53:01 +0300 Subject: [PATCH 12/12] chore: add timeout to the integration --- .github/workflows/integration-test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/integration-test.yml b/.github/workflows/integration-test.yml index 6cdf75b..df945f0 100644 --- a/.github/workflows/integration-test.yml +++ b/.github/workflows/integration-test.yml @@ -26,7 +26,7 @@ jobs: - uses: actions/checkout@v3 - name: Integration Test - run: go test -v test/integration/integration_test.go + run: go test -timeout=15m -v test/integration/integration_test.go env: INPUT_PUBLISH: false GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file