diff --git a/.github/workflows/integration-test.yml b/.github/workflows/integration-test.yml index 6cdf75b..df945f0 100644 --- a/.github/workflows/integration-test.yml +++ b/.github/workflows/integration-test.yml @@ -26,7 +26,7 @@ jobs: - uses: actions/checkout@v3 - name: Integration Test - run: go test -v test/integration/integration_test.go + run: go test -timeout=15m -v test/integration/integration_test.go env: INPUT_PUBLISH: false GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file diff --git a/README.md b/README.md index b80b86a..dfb76da 100644 --- a/README.md +++ b/README.md @@ -128,36 +128,37 @@ func StartAPI(cfg kafka.Config, metricCollectors ...prometheus.Collector) { ## Configurations -| config | description | default | example | -|------------------------------|----------------------------------------------------------------------------------------------------|----------|--------------------------| -| `logLevel` | Describes log level, valid options are `debug`, `info`, `warn`, and `error` | info | | -| `consumer.clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#Dialer) | | | -| `consumer.cron` | Cron expression when exception consumer starts to work at | | */1 * * * * | -| `consumer.backOffStrategy` | Define consumer backoff strategy for retry topics | fixed | exponential, linear | -| `consumer.duration` | Work duration exception consumer actively consuming messages | | 20s, 15m, 1h | -| `consumer.topic` | Exception topic names | | exception-topic | -| `consumer.groupId` | Exception consumer group id | | exception-consumer-group | -| `consumer.maxRetry` | Maximum retry value for attempting to retry a message | 3 | | -| `consumer.concurrency` | Number of goroutines used at listeners | 1 | | -| `consumer.minBytes` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.MinBytes) | 1 | | -| `consumer.maxBytes` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.MaxBytes) | 1 MB | | -| `consumer.maxWait` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.MaxWait) | 10s | | -| `consumer.commitInterval` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.CommitInterval) | 1s | | -| `consumer.heartbeatInterval` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.HeartbeatInterval) | 3s | | -| `consumer.sessionTimeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.SessionTimeout) | 30s | | -| `consumer.rebalanceTimeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.RebalanceTimeout) | 30s | | -| `consumer.startOffset` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.StartOffset) | earliest | | -| `consumer.retentionTime` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.RetentionTime) | 24h | | -| `producer.clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#Transport) | | | -| `producer.batchSize` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#Writer.BatchSize) | 100 | | -| `producer.batchTimeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#Writer.BatchTimeout) | 1s | | -| `sasl.enabled` | It enables sasl authentication mechanism | false | | -| `sasl.authType` | Currently we only support `SCRAM` | "" | | -| `sasl.username` | SCRAM username | "" | | -| `sasl.password` | SCRAM password | "" | | -| `sasl.rootCAPath` | [see doc](https://pkg.go.dev/crypto/tls#Config.RootCAs) | "" | | -| `sasl.intermediateCAPath` | | "" | | -| `sasl.rack` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#RackAffinityGroupBalancer) | "" | | +| config | description | default | example | +|----------------------------------|----------------------------------------------------------------------------------------------------|----------|--------------------------| +| `logLevel` | Describes log level, valid options are `debug`, `info`, `warn`, and `error` | info | | +| `consumer.clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#Dialer) | | | +| `consumer.cron` | Cron expression when exception consumer starts to work at | | */1 * * * * | +| `consumer.backOffStrategy` | Define consumer backoff strategy for retry topics | fixed | exponential, linear | +| `consumer.duration` | Work duration exception consumer actively consuming messages | | 20s, 15m, 1h | +| `consumer.topic` | Exception topic names | | exception-topic | +| `consumer.groupId` | Exception consumer group id | | exception-consumer-group | +| `consumer.maxRetry` | Maximum retry value for attempting to retry a message | 3 | | +| `consumer.concurrency` | Number of goroutines used at listeners | 1 | | +| `consumer.minBytes` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.MinBytes) | 1 | | +| `consumer.maxBytes` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.MaxBytes) | 1 MB | | +| `consumer.maxWait` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.MaxWait) | 10s | | +| `consumer.commitInterval` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.CommitInterval) | 1s | | +| `consumer.heartbeatInterval` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.HeartbeatInterval) | 3s | | +| `consumer.sessionTimeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.SessionTimeout) | 30s | | +| `consumer.rebalanceTimeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.RebalanceTimeout) | 30s | | +| `consumer.startOffset` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.StartOffset) | earliest | | +| `consumer.retentionTime` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.RetentionTime) | 24h | | +| `consumer.skipMessageByHeaderFn` | Function to filter messages based on headers, return true if you want to skip the message | nil | | +| `producer.clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#Transport) | | | +| `producer.batchSize` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#Writer.BatchSize) | 100 | | +| `producer.batchTimeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#Writer.BatchTimeout) | 1s | | +| `sasl.enabled` | It enables sasl authentication mechanism | false | | +| `sasl.authType` | Currently we only support `SCRAM` | "" | | +| `sasl.username` | SCRAM username | "" | | +| `sasl.password` | SCRAM password | "" | | +| `sasl.rootCAPath` | [see doc](https://pkg.go.dev/crypto/tls#Config.RootCAs) | "" | | +| `sasl.intermediateCAPath` | | "" | | +| `sasl.rack` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#RackAffinityGroupBalancer) | "" | | ### Exposed Metrics diff --git a/examples/single-consumer-with-header-filter-function/go.mod b/examples/single-consumer-with-header-filter-function/go.mod new file mode 100644 index 0000000..4c31f1e --- /dev/null +++ b/examples/single-consumer-with-header-filter-function/go.mod @@ -0,0 +1,31 @@ +module single-consumer + +go 1.19 + +replace github.com/Trendyol/kafka-cronsumer => ../.. + +require github.com/Trendyol/kafka-cronsumer v0.0.0-00010101000000-000000000000 + +require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/golang/protobuf v1.5.3 // indirect + github.com/klauspost/compress v1.16.4 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect + github.com/pierrec/lz4/v4 v4.1.17 // indirect + github.com/prometheus/client_golang v1.16.0 // indirect + github.com/prometheus/client_model v0.3.0 // indirect + github.com/prometheus/common v0.42.0 // indirect + github.com/prometheus/procfs v0.10.1 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect + github.com/segmentio/kafka-go v0.4.42 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.1.2 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect + go.uber.org/atomic v1.7.0 // indirect + go.uber.org/multierr v1.6.0 // indirect + go.uber.org/zap v1.24.0 // indirect + golang.org/x/sys v0.8.0 // indirect + golang.org/x/text v0.7.0 // indirect + google.golang.org/protobuf v1.30.0 // indirect +) diff --git a/examples/single-consumer-with-header-filter-function/go.sum b/examples/single-consumer-with-header-filter-function/go.sum new file mode 100644 index 0000000..e08be6f --- /dev/null +++ b/examples/single-consumer-with-header-filter-function/go.sum @@ -0,0 +1,69 @@ +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/klauspost/compress v1.16.4/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= +github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.16.0/go.mod h1:Zsulrv/L9oM40tJ7T815tM89lFEugiJ9HzIqaAx4LKc= +github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w= +github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc= +github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= +github.com/segmentio/kafka-go v0.4.42/go.mod h1:d0g15xPMqoUookug0OU75DhGZxXwCFxSLeJ4uphwJzg= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/examples/single-consumer-with-header-filter-function/main.go b/examples/single-consumer-with-header-filter-function/main.go new file mode 100644 index 0000000..dac87b1 --- /dev/null +++ b/examples/single-consumer-with-header-filter-function/main.go @@ -0,0 +1,40 @@ +package main + +import ( + "fmt" + cronsumer "github.com/Trendyol/kafka-cronsumer" + "github.com/Trendyol/kafka-cronsumer/pkg/kafka" + "time" +) + +func main() { + config := &kafka.Config{ + Brokers: []string{"localhost:29092"}, + Consumer: kafka.ConsumerConfig{ + GroupID: "sample-consumer", + Topic: "exception", + Cron: "*/1 * * * *", + Duration: 20 * time.Second, + SkipMessageByHeaderFn: SkipMessageByHeaderFn, + }, + LogLevel: "info", + } + + var consumeFn kafka.ConsumeFn = func(message kafka.Message) error { + fmt.Printf("consumer > Message received: %s\n", string(message.Value)) + return nil + } + + c := cronsumer.New(config, consumeFn) + c.Run() +} + +func SkipMessageByHeaderFn(headers []kafka.Header) bool { + for _, header := range headers { + if header.Key == "skipMessage" { + // If a kafka message comes with `skipMessage` header key, it will be skipped! + return true + } + } + return false +} diff --git a/internal/cronsumer.go b/internal/cronsumer.go index c48db8c..bed7343 100644 --- a/internal/cronsumer.go +++ b/internal/cronsumer.go @@ -15,9 +15,10 @@ type kafkaCronsumer struct { consumeFn func(message kafka.Message) error - metric *CronsumerMetric - maxRetry int - deadLetterTopic string + metric *CronsumerMetric + maxRetry int + deadLetterTopic string + skipMessageByHeaderFn kafka.SkipMessageByHeaderFn cfg *kafka.Config } @@ -27,14 +28,15 @@ func newKafkaCronsumer(cfg *kafka.Config, c func(message kafka.Message) error) * cfg.Validate() return &kafkaCronsumer{ - cfg: cfg, - messageChannel: make(chan MessageWrapper), - kafkaConsumer: newConsumer(cfg), - kafkaProducer: newProducer(cfg), - consumeFn: c, - metric: &CronsumerMetric{}, - maxRetry: cfg.Consumer.MaxRetry, - deadLetterTopic: cfg.Consumer.DeadLetterTopic, + cfg: cfg, + messageChannel: make(chan MessageWrapper), + kafkaConsumer: newConsumer(cfg), + kafkaProducer: newProducer(cfg), + consumeFn: c, + skipMessageByHeaderFn: cfg.Consumer.SkipMessageByHeaderFn, + metric: &CronsumerMetric{}, + maxRetry: cfg.Consumer.MaxRetry, + deadLetterTopic: cfg.Consumer.DeadLetterTopic, } } @@ -62,6 +64,11 @@ func (k *kafkaCronsumer) Listen(ctx context.Context, strategyName string, cancel msg := NewMessageWrapper(*m, strategyName) + if k.skipMessageByHeaderFn != nil && k.skipMessageByHeaderFn(msg.Headers) { + k.cfg.Logger.Infof("Message is not processed. Header filter applied. Headers: %v", msg.Headers) + continue + } + if msg.ProduceTime >= startTimeUnixNano { (*cancelFuncWrapper)() diff --git a/pkg/kafka/config.go b/pkg/kafka/config.go index 174e27d..60a7597 100644 --- a/pkg/kafka/config.go +++ b/pkg/kafka/config.go @@ -40,24 +40,25 @@ type SASLConfig struct { } type ConsumerConfig struct { - ClientID string `yaml:"clientId"` - GroupID string `yaml:"groupId"` - Topic string `yaml:"topic"` - DeadLetterTopic string `yaml:"deadLetterTopic"` - MinBytes int `yaml:"minBytes"` - MaxBytes int `yaml:"maxBytes"` - MaxRetry int `yaml:"maxRetry"` - MaxWait time.Duration `yaml:"maxWait"` - CommitInterval time.Duration `yaml:"commitInterval"` - HeartbeatInterval time.Duration `yaml:"heartbeatInterval"` - SessionTimeout time.Duration `yaml:"sessionTimeout"` - RebalanceTimeout time.Duration `yaml:"rebalanceTimeout"` - StartOffset Offset `yaml:"startOffset"` - RetentionTime time.Duration `yaml:"retentionTime"` - Concurrency int `yaml:"concurrency"` - Duration time.Duration `yaml:"duration"` - Cron string `yaml:"cron"` - BackOffStrategy BackoffStrategyInterface `yaml:"backOffStrategy"` + ClientID string `yaml:"clientId"` + GroupID string `yaml:"groupId"` + Topic string `yaml:"topic"` + DeadLetterTopic string `yaml:"deadLetterTopic"` + MinBytes int `yaml:"minBytes"` + MaxBytes int `yaml:"maxBytes"` + MaxRetry int `yaml:"maxRetry"` + MaxWait time.Duration `yaml:"maxWait"` + CommitInterval time.Duration `yaml:"commitInterval"` + HeartbeatInterval time.Duration `yaml:"heartbeatInterval"` + SessionTimeout time.Duration `yaml:"sessionTimeout"` + RebalanceTimeout time.Duration `yaml:"rebalanceTimeout"` + StartOffset Offset `yaml:"startOffset"` + RetentionTime time.Duration `yaml:"retentionTime"` + Concurrency int `yaml:"concurrency"` + Duration time.Duration `yaml:"duration"` + Cron string `yaml:"cron"` + BackOffStrategy BackoffStrategyInterface `yaml:"backOffStrategy"` + SkipMessageByHeaderFn SkipMessageByHeaderFn `yaml:"skipMessageByHeaderFn"` } type ProducerConfig struct { @@ -65,6 +66,8 @@ type ProducerConfig struct { BatchTimeout time.Duration `yaml:"batchTimeout"` } +type SkipMessageByHeaderFn func(headers []Header) bool + func (c *Config) SetDefaults() { if c.Consumer.MaxRetry == 0 { c.Consumer.MaxRetry = 3 diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index 0771608..261d347 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -386,6 +386,60 @@ func Test_Should_Discard_Message_When_Retry_Count_Is_Equal_To_MaxRetrys_Value_Wi assertEventually(t, conditionFunc, 30*time.Second, time.Second) } +func Test_Should_Discard_Message_When_Header_Filter_Defined(t *testing.T) { + // Given + topic := "exception-header-filter" + _, cleanUp := createTopic(t, topic) + defer cleanUp() + + config := &kafka.Config{ + Brokers: []string{"localhost:9092"}, + Consumer: kafka.ConsumerConfig{ + GroupID: "sample-consumer", + Topic: topic, + Cron: "*/1 * * * *", + Duration: 20 * time.Second, + MaxRetry: 1, + SkipMessageByHeaderFn: func(headers []kafka.Header) bool { + for i := range headers { + if headers[i].Key == "skipMessage" { + return true + } + } + return false + }, + }, + } + + respCh := make(chan kafka.Message) + var consumeFn kafka.ConsumeFn = func(message kafka.Message) error { + fmt.Printf("consumer > Message received. Headers: %v\n", message.Headers) + respCh <- message + return nil + } + + c := cronsumer.New(config, consumeFn) + c.Start() + + // When + producedMessages := []kafka.Message{ + {Topic: topic, Key: []byte("real message")}, + {Topic: topic, Key: []byte("will be skipped message"), Headers: []kafka.Header{{ + Key: "skipMessage", + }}}, + } + if err := c.ProduceBatch(producedMessages); err != nil { + t.Fatalf("error producing batch %s", err) + } + + // Then + actualMessage := <-respCh + + if string(actualMessage.Key) != "real message" { + t.Errorf("Expected: %s, Actual: %s", "real message", actualMessage.Value) + } +} + func getRetryCount(message kafka.Message) int { for _, header := range message.Headers { if header.Key == "x-retry-count" { @@ -441,7 +495,7 @@ func createTopic(t *testing.T, topicName string) (*segmentio.Conn, func()) { } cleanUp := func() { - if err := conn.DeleteTopics(topicName); err != nil { + if err = conn.DeleteTopics(topicName); err != nil { fmt.Println("err deleting topic", err.Error()) } }