Skip to content

Commit

Permalink
feat: enhance Message Could Not Reader error by adding topic and cg i…
Browse files Browse the repository at this point in the history
…nfo (#138)

* feat: enhance Message Could not read error with topic and cg

* chore: remove prefix xxx from example topic

* chore: fix lint
  • Loading branch information
Abdulsametileri authored Aug 5, 2024
1 parent f5c5370 commit 7491cce
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 5 deletions.
6 changes: 3 additions & 3 deletions batch_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func Test_batchConsumer_startBatch(t *testing.T) {
concurrency: 1,
},
messageGroupLimit: 3,
consumeFn: func(messages []*Message) error {
consumeFn: func(_ []*Message) error {
numberOfBatch++
return nil
},
Expand Down Expand Up @@ -101,7 +101,7 @@ func Test_batchConsumer_startBatch_with_preBatch(t *testing.T) {
concurrency: 1,
},
messageGroupLimit: 2,
consumeFn: func(messages []*Message) error {
consumeFn: func(_ []*Message) error {
numberOfBatch++
return nil
},
Expand Down Expand Up @@ -179,7 +179,7 @@ func Test_batchConsumer_process(t *testing.T) {
gotOnlyOneTimeException := true
bc := batchConsumer{
base: &base{metric: &ConsumerMetric{}, transactionalRetry: true, logger: NewZapLogger(LogLevelDebug)},
consumeFn: func(messages []*Message) error {
consumeFn: func(_ []*Message) error {
if gotOnlyOneTimeException {
gotOnlyOneTimeException = false
return errors.New("simulate only one time exception")
Expand Down
5 changes: 4 additions & 1 deletion consumer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type base struct {
consumerState state
metricPrefix string
mu sync.Mutex
consumerCfg *ConsumerConfig
}

func NewConsumer(cfg *ConsumerConfig) (Consumer, error) {
Expand Down Expand Up @@ -138,6 +139,7 @@ func newBase(cfg *ConsumerConfig, messageChSize int) (*base, error) {
skipMessageByHeaderFn: cfg.SkipMessageByHeaderFn,
metricPrefix: cfg.MetricPrefix,
mu: sync.Mutex{},
consumerCfg: cfg,
}

if cfg.DistributedTracingEnabled {
Expand Down Expand Up @@ -201,7 +203,8 @@ func (c *base) startConsume() {
}

c.metric.TotalErrorCountDuringFetchingMessage++
c.logger.Warnf("Message could not read, err %s", err.Error())
//nolint:lll
c.logger.Warnf("Message could not read, err %s, from topics %s with consumer group %s", err.Error(), c.consumerCfg.getTopics(), c.consumerCfg.Reader.GroupID)
continue
}

Expand Down
1 change: 1 addition & 0 deletions consumer_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func Test_base_startConsume(t *testing.T) {
logger: NewZapLogger(LogLevelError),
consumerState: stateRunning,
metric: &ConsumerMetric{},
consumerCfg: &ConsumerConfig{},
}
b.context, b.cancelFn = context.WithCancel(context.Background())

Expand Down
2 changes: 1 addition & 1 deletion consumer_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestConsumerConfig_newCronsumerConfig(t *testing.T) {
// Given
cfg := ConsumerConfig{
RetryConfiguration: RetryConfiguration{
SkipMessageByHeaderFn: func(headers []Header) bool {
SkipMessageByHeaderFn: func(_ []Header) bool {
return false
},
},
Expand Down

0 comments on commit 7491cce

Please sign in to comment.