Skip to content

Commit

Permalink
feat: disable instant retry for non transactional retry mode (#136)
Browse files Browse the repository at this point in the history
  • Loading branch information
Abdulsametileri authored Aug 1, 2024
1 parent ff0c6b1 commit 5284a50
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 10 deletions.
12 changes: 8 additions & 4 deletions batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,14 @@ func (b *batchConsumer) process(chunkMessages []*Message) {
consumeErr := b.consumeFn(chunkMessages)

if consumeErr != nil {
b.logger.Warnf("Consume Function Err %s, Messages will be retried", consumeErr.Error())
// Try to process same messages again for resolving transient network errors etc.
if consumeErr = b.consumeFn(chunkMessages); consumeErr != nil {
b.logger.Warnf("Consume Function Again Err %s, messages are sending to exception/retry topic %s", consumeErr.Error(), b.retryTopic)
if b.transactionalRetry {
b.logger.Warnf("Consume Function Err %s, Messages will be retried", consumeErr.Error())
// Try to process same messages again for resolving transient network errors etc.
if consumeErr = b.consumeFn(chunkMessages); consumeErr != nil {
b.logger.Warnf("Consume Function Again Err %s, messages are sending to exception/retry topic %s", consumeErr.Error(), b.retryTopic)
b.metric.TotalUnprocessedMessagesCounter += int64(len(chunkMessages))
}
} else {
b.metric.TotalUnprocessedMessagesCounter += int64(len(chunkMessages))
}

Expand Down
12 changes: 8 additions & 4 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,14 @@ func (c *consumer) process(message *Message) {
consumeErr := c.consumeFn(message)

if consumeErr != nil {
c.logger.Warnf("Consume Function Err %s, Message will be retried", consumeErr.Error())
// Try to process same message again
if consumeErr = c.consumeFn(message); consumeErr != nil {
c.logger.Warnf("Consume Function Again Err %s, message is sending to exception/retry topic %s", consumeErr.Error(), c.retryTopic)
if c.transactionalRetry {
c.logger.Warnf("Consume Function Err %s, Message will be retried", consumeErr.Error())
// Try to process same message again
if consumeErr = c.consumeFn(message); consumeErr != nil {
c.logger.Warnf("Consume Function Again Err %s, message is sending to exception/retry topic %s", consumeErr.Error(), c.retryTopic)
c.metric.TotalUnprocessedMessagesCounter++
}
} else {
c.metric.TotalUnprocessedMessagesCounter++
}
}
Expand Down
4 changes: 2 additions & 2 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func Test_consumer_process(t *testing.T) {
// Given
gotOnlyOneTimeException := true
c := consumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug)},
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug), transactionalRetry: true},
consumeFn: func(*Message) error {
if gotOnlyOneTimeException {
gotOnlyOneTimeException = false
Expand All @@ -47,7 +47,7 @@ func Test_consumer_process(t *testing.T) {

// Then
if c.metric.TotalProcessedMessagesCounter != 1 {
t.Fatalf("Total Processed Message Counter must equal to 3")
t.Fatalf("Total Processed Message Counter must equal to 1")
}
if c.metric.TotalUnprocessedMessagesCounter != 0 {
t.Fatalf("Total Unprocessed Message Counter must equal to 0")
Expand Down

0 comments on commit 5284a50

Please sign in to comment.