Skip to content

Commit

Permalink
chore: add unit tests (#46)
Browse files Browse the repository at this point in the history
* chore: add unit tests

* chore: add unit tests

* chore: make lint

* chore: test fixed
  • Loading branch information
henesgokdag authored Oct 4, 2023
1 parent a866978 commit f10fc17
Show file tree
Hide file tree
Showing 6 changed files with 506 additions and 0 deletions.
36 changes: 36 additions & 0 deletions internal/collector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package internal

import (
"reflect"
"testing"

"github.com/prometheus/client_golang/prometheus"
)

func Test_NewCollector(t *testing.T) {
cronsumerMetric := &CronsumerMetric{
TotalRetriedMessagesCounter: 0,
TotalDiscardedMessagesCounter: 0,
}
expectedTotalRetriedMessagesCounter := prometheus.NewDesc(
prometheus.BuildFQName(Name, "retried_messages_total", "current"),
"Total number of retried messages.",
[]string{},
nil,
)
expectedTotalDiscardedMessagesCounter := prometheus.NewDesc(
prometheus.BuildFQName(Name, "discarded_messages_total", "current"),
"Total number of discarded messages.",
[]string{},
nil,
)

collector := NewCollector(cronsumerMetric)

if !reflect.DeepEqual(collector.totalDiscardedMessagesCounter, expectedTotalDiscardedMessagesCounter) {
t.Errorf("Expected: %+v, Actual: %+v", collector.totalDiscardedMessagesCounter, expectedTotalDiscardedMessagesCounter)
}
if !reflect.DeepEqual(collector.totalRetriedMessagesCounter, expectedTotalRetriedMessagesCounter) {
t.Errorf("Expected: %+v, Actual: %+v", collector.totalRetriedMessagesCounter, expectedTotalRetriedMessagesCounter)
}
}
37 changes: 37 additions & 0 deletions internal/cron_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package internal

import (
"testing"
"time"

"github.com/Trendyol/kafka-cronsumer/pkg/kafka"
)

func Test_GetMetricsCollector(t *testing.T) {
// Given
kafkaConfig := &kafka.Config{
Brokers: []string{"localhost:29092"},
Consumer: kafka.ConsumerConfig{
GroupID: "sample-consumer",
Topic: "exception",
Cron: "@every 1s",
Duration: 20 * time.Second,
},
LogLevel: "info",
}

var firstConsumerFn kafka.ConsumeFn = func(message kafka.Message) error {
return nil
}

// When
c := NewCronsumer(kafkaConfig, firstConsumerFn)

c.Start()

collector := c.GetMetricCollectors()
// Then
if collector == nil {
t.Errorf("Expected not nil: %+v", collector)
}
}
236 changes: 236 additions & 0 deletions internal/cronsumer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
package internal

import (
"context"
"reflect"
"testing"
"time"

"github.com/Trendyol/kafka-cronsumer/pkg/kafka"
"github.com/Trendyol/kafka-cronsumer/pkg/logger"
segmentio "github.com/segmentio/kafka-go"
)

func Test_Produce_Max_Retry_Count_Reach(t *testing.T) {
// Given
kafkaConfig := &kafka.Config{
Brokers: []string{"localhost:29092"},
Consumer: kafka.ConsumerConfig{
GroupID: "sample-consumer",
Topic: "exception",
Cron: "@every 1s",
Duration: 20 * time.Second,
},
LogLevel: "info",
Logger: logger.New("info"),
}

var firstConsumerFn kafka.ConsumeFn = func(message kafka.Message) error {
return nil
}
c := &kafkaCronsumer{
cfg: kafkaConfig,
messageChannel: make(chan MessageWrapper),
kafkaConsumer: mockConsumer{},
kafkaProducer: newProducer(kafkaConfig),
consumeFn: firstConsumerFn,
metric: &CronsumerMetric{},
maxRetry: 1,
deadLetterTopic: kafkaConfig.Consumer.DeadLetterTopic,
}
m := MessageWrapper{
Message: kafka.Message{
Headers: []kafka.Header{
{Key: RetryHeaderKey, Value: []byte("1")},
},
Topic: "exception",
},
RetryCount: 1,
}

// When
c.produce(m)

// Then
if !reflect.DeepEqual(c.metric.TotalDiscardedMessagesCounter, int64(1)) {
t.Errorf("Expected: %+v, Actual: %+v", c.metric.TotalDiscardedMessagesCounter, int64(1))
}
}

func Test_Produce_Max_Retry_Count_Reach_Dead_Letter_Topic_Feature_Enabled(t *testing.T) {
// Given

var firstConsumerFn kafka.ConsumeFn = func(message kafka.Message) error {
return nil
}
c := &kafkaCronsumer{
cfg: &kafka.Config{
Logger: logger.New("info"),
},
messageChannel: make(chan MessageWrapper),
kafkaConsumer: mockConsumer{},
kafkaProducer: &mockProducer{},
consumeFn: firstConsumerFn,
metric: &CronsumerMetric{},
maxRetry: 1,
deadLetterTopic: "abc",
}
m := MessageWrapper{
Message: kafka.Message{
Headers: []kafka.Header{
{Key: RetryHeaderKey, Value: []byte("1")},
},
Topic: "exception",
},
RetryCount: 1,
}

// When
c.produce(m)

// Then
if !reflect.DeepEqual(c.metric.TotalDiscardedMessagesCounter, int64(1)) {
t.Errorf("Expected: %+v, Actual: %+v", c.metric.TotalDiscardedMessagesCounter, int64(1))
}
}

func Test_Produce_With_Retry(t *testing.T) {
// Given
kafkaConfig := &kafka.Config{
Brokers: []string{"localhost:29092"},
Consumer: kafka.ConsumerConfig{
GroupID: "sample-consumer",
Topic: "exception",
Cron: "@every 1s",
Duration: 20 * time.Second,
},
LogLevel: "info",
Logger: logger.New("info"),
}

var firstConsumerFn kafka.ConsumeFn = func(message kafka.Message) error {
return nil
}
producer := newMockProducer()
c := &kafkaCronsumer{
cfg: kafkaConfig,
messageChannel: make(chan MessageWrapper),
kafkaConsumer: mockConsumer{},
kafkaProducer: &producer,
consumeFn: firstConsumerFn,
metric: &CronsumerMetric{},
maxRetry: 3,
deadLetterTopic: kafkaConfig.Consumer.DeadLetterTopic,
}
m := MessageWrapper{
Message: kafka.Message{
Headers: []kafka.Header{
{Key: RetryHeaderKey, Value: []byte("1")},
},
Topic: "exception",
},
RetryCount: 1,
}

// When
c.produce(m)

// Then
if !reflect.DeepEqual(c.metric.TotalRetriedMessagesCounter, int64(1)) {
t.Errorf("Expected: %+v, Actual: %+v", c.metric.TotalRetriedMessagesCounter, int64(1))
}
}

func Test_Recover_Message(t *testing.T) {
// Given
kafkaConfig := &kafka.Config{
Brokers: []string{"localhost:29092"},
Consumer: kafka.ConsumerConfig{
GroupID: "sample-consumer",
Topic: "exception",
Cron: "@every 1s",
Duration: 20 * time.Second,
},
LogLevel: "info",
Logger: logger.New("info"),
}

var firstConsumerFn kafka.ConsumeFn = func(message kafka.Message) error {
return nil
}
producer := newMockProducer()
c := &kafkaCronsumer{
cfg: kafkaConfig,
messageChannel: make(chan MessageWrapper),
kafkaConsumer: mockConsumer{},
kafkaProducer: &producer,
consumeFn: firstConsumerFn,
metric: &CronsumerMetric{},
maxRetry: 3,
deadLetterTopic: kafkaConfig.Consumer.DeadLetterTopic,
}
m := MessageWrapper{
Message: kafka.Message{
Headers: []kafka.Header{
{Key: RetryHeaderKey, Value: []byte("1")},
},
Topic: "exception",
},
RetryCount: 1,
}

// When
c.recoverMessage(m)

// Then
if !reflect.DeepEqual(c.metric.TotalDiscardedMessagesCounter, int64(0)) {
t.Errorf("Expected: %+v, Actual: %+v", c.metric.TotalDiscardedMessagesCounter, int64(0))
}
if !reflect.DeepEqual(c.metric.TotalRetriedMessagesCounter, int64(0)) {
t.Errorf("Expected: %+v, Actual: %+v", c.metric.TotalRetriedMessagesCounter, int64(0))
}
}

type mockConsumer struct{}

func (c mockConsumer) Stop() {
}

func (c mockConsumer) ReadMessage(ctx context.Context) (*MessageWrapper, error) {
return &MessageWrapper{}, nil
}

type mockProducer struct {
w *segmentio.Writer
cfg *kafka.Config
}

func newMockProducer() mockProducer {
producer := &segmentio.Writer{
Addr: segmentio.TCP("abc"),
Balancer: &segmentio.LeastBytes{},
BatchTimeout: 1,
BatchSize: 1,
AllowAutoTopicCreation: true,
}

return mockProducer{
w: producer,
cfg: &kafka.Config{},
}
}

func (k *mockProducer) ProduceWithRetryOption(message MessageWrapper, increaseRetry bool) error {
return nil
}

func (k *mockProducer) Produce(m kafka.Message) error {
return nil
}

func (k *mockProducer) ProduceBatch(messages []kafka.Message) error {
return nil
}

func (k *mockProducer) Close() {
}
38 changes: 38 additions & 0 deletions internal/message_header_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package internal

import (
"bytes"
"strconv"
"testing"

pkg "github.com/Trendyol/kafka-cronsumer/pkg/kafka"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/protocol"
)
Expand Down Expand Up @@ -97,3 +99,39 @@ func Test_getRetryCount(t *testing.T) {
}
})
}

func Test_FromHeaders(t *testing.T) {
// Given
expected := []pkg.Header{
{Key: "x-retry-count", Value: []byte("1")},
}
// When
actual := ToHeaders(expected)
actualHeader := actual[0]
expectedHeader := expected[0]
// Then
if actualHeader.Key != expectedHeader.Key {
t.Errorf("Expected: %s, Actual: %s", actualHeader.Key, expectedHeader.Key)
}
if !bytes.Equal(actualHeader.Value, expectedHeader.Value) {
t.Errorf("Expected: %s, Actual: %s", expectedHeader.Value, expectedHeader.Value)
}
}

func Test_ToHeaders(t *testing.T) {
// Given
expected := []kafka.Header{
{Key: "x-retry-count", Value: []byte("1")},
}
// When
actual := FromHeaders(expected)
actualHeader := actual[0]
expectedHeader := expected[0]
// Then
if actualHeader.Key != expectedHeader.Key {
t.Errorf("Expected: %s, Actual: %s", actualHeader.Key, expectedHeader.Key)
}
if !bytes.Equal(actualHeader.Value, expectedHeader.Value) {
t.Errorf("Expected: %s, Actual: %s", expectedHeader.Value, expectedHeader.Value)
}
}
Loading

0 comments on commit f10fc17

Please sign in to comment.