diff --git a/batch_consumer_test.go b/batch_consumer_test.go index 9cccaae..0981922 100644 --- a/batch_consumer_test.go +++ b/batch_consumer_test.go @@ -34,7 +34,7 @@ func Test_batchConsumer_startBatch(t *testing.T) { concurrency: 1, }, messageGroupLimit: 3, - consumeFn: func(messages []*Message) error { + consumeFn: func(_ []*Message) error { numberOfBatch++ return nil }, @@ -101,7 +101,7 @@ func Test_batchConsumer_startBatch_with_preBatch(t *testing.T) { concurrency: 1, }, messageGroupLimit: 2, - consumeFn: func(messages []*Message) error { + consumeFn: func(_ []*Message) error { numberOfBatch++ return nil }, @@ -179,7 +179,7 @@ func Test_batchConsumer_process(t *testing.T) { gotOnlyOneTimeException := true bc := batchConsumer{ base: &base{metric: &ConsumerMetric{}, transactionalRetry: true, logger: NewZapLogger(LogLevelDebug)}, - consumeFn: func(messages []*Message) error { + consumeFn: func(_ []*Message) error { if gotOnlyOneTimeException { gotOnlyOneTimeException = false return errors.New("simulate only one time exception") diff --git a/consumer_base.go b/consumer_base.go index 87a3bbd..083e2df 100644 --- a/consumer_base.go +++ b/consumer_base.go @@ -84,6 +84,7 @@ type base struct { consumerState state metricPrefix string mu sync.Mutex + consumerCfg *ConsumerConfig } func NewConsumer(cfg *ConsumerConfig) (Consumer, error) { @@ -138,6 +139,7 @@ func newBase(cfg *ConsumerConfig, messageChSize int) (*base, error) { skipMessageByHeaderFn: cfg.SkipMessageByHeaderFn, metricPrefix: cfg.MetricPrefix, mu: sync.Mutex{}, + consumerCfg: cfg, } if cfg.DistributedTracingEnabled { @@ -201,7 +203,8 @@ func (c *base) startConsume() { } c.metric.TotalErrorCountDuringFetchingMessage++ - c.logger.Warnf("Message could not read, err %s", err.Error()) + //nolint:lll + c.logger.Warnf("Message could not read, err %s, from topics %s with consumer group %s", err.Error(), c.consumerCfg.getTopics(), c.consumerCfg.Reader.GroupID) continue } diff --git a/consumer_base_test.go b/consumer_base_test.go index c0c2c70..40e0000 100644 --- a/consumer_base_test.go +++ b/consumer_base_test.go @@ -23,6 +23,7 @@ func Test_base_startConsume(t *testing.T) { logger: NewZapLogger(LogLevelError), consumerState: stateRunning, metric: &ConsumerMetric{}, + consumerCfg: &ConsumerConfig{}, } b.context, b.cancelFn = context.WithCancel(context.Background()) diff --git a/consumer_config_test.go b/consumer_config_test.go index 97e03ad..98a8333 100644 --- a/consumer_config_test.go +++ b/consumer_config_test.go @@ -98,7 +98,7 @@ func TestConsumerConfig_newCronsumerConfig(t *testing.T) { // Given cfg := ConsumerConfig{ RetryConfiguration: RetryConfiguration{ - SkipMessageByHeaderFn: func(headers []Header) bool { + SkipMessageByHeaderFn: func(_ []Header) bool { return false }, },