Skip to content

Commit

Permalink
feat: change skip message log level to debug and add Pretty for writi…
Browse files Browse the repository at this point in the history
…ng headers (#137)

* feat: change skip message log level to debug and add Pretty for writing headers
  • Loading branch information
Abdulsametileri authored Aug 1, 2024
1 parent 5284a50 commit f5c5370
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 7 deletions.
12 changes: 6 additions & 6 deletions consumer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,19 +205,19 @@ func (c *base) startConsume() {
continue
}

incomingMessage := &IncomingMessage{
kafkaMessage: m,
message: fromKafkaMessage(m),
}

if c.skipMessageByHeaderFn != nil && c.skipMessageByHeaderFn(m.Headers) {
c.logger.Infof("Message is not processed. Header filter applied. Headers: %v", m.Headers)
c.logger.Debugf("Message is not processed. Header filter applied. Headers: %v", incomingMessage.message.Headers.Pretty())
if err = c.r.CommitMessages([]kafka.Message{*m}); err != nil {
c.logger.Errorf("Commit Error %s,", err.Error())
}
continue
}

incomingMessage := &IncomingMessage{
kafkaMessage: m,
message: fromKafkaMessage(m),
}

if c.distributedTracingEnabled {
incomingMessage.message.Context = c.propagator.Extract(context.Background(), otelkafkakonsumer.NewMessageCarrier(m))
}
Expand Down
15 changes: 14 additions & 1 deletion message.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package kafka

import (
"context"
"fmt"
"strings"
"time"

kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka"
Expand All @@ -15,6 +17,17 @@ const (

type Header = protocol.Header

type Headers []Header

// Pretty Writes every header key and value, it is useful for debugging purpose
func (hs Headers) Pretty() string {
headerStrings := make([]string, len(hs))
for i := range hs {
headerStrings[i] = fmt.Sprintf("%s: %s", hs[i].Key, string(hs[i].Value))
}
return strings.Join(headerStrings, ", ")
}

type Message struct {
Time time.Time
WriterData interface{}
Expand All @@ -24,7 +37,7 @@ type Message struct {
Topic string
Key []byte
Value []byte
Headers []Header
Headers Headers
Partition int
Offset int64
HighWaterMark int64
Expand Down
16 changes: 16 additions & 0 deletions message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,3 +266,19 @@ func TestMessage_toRetryableMessage(t *testing.T) {
}
})
}

func TestHeaders_Pretty(t *testing.T) {
// Given
headers := Headers{
{Key: "key1", Value: []byte("value1")},
{Key: "key2", Value: []byte("value2")},
}

// When
result := headers.Pretty()

// Then
if result != "key1: value1, key2: value2" {
t.Error("result must be `key1: value1, key2: value2`")
}
}

0 comments on commit f5c5370

Please sign in to comment.