diff --git a/src/Services/Implementations/KafkaRetryJobService.cs b/src/Services/Implementations/KafkaRetryJobService.cs index d899a88..a9d98ce 100644 --- a/src/Services/Implementations/KafkaRetryJobService.cs +++ b/src/Services/Implementations/KafkaRetryJobService.cs @@ -72,9 +72,6 @@ private async Task MoveMessagesForTopic( Action, ConsumeResult> consumerCommitStrategy ) { - var consumer = _kafkaService.GetKafkaConsumer(); - var producer = _kafkaService.GetKafkaProducer(); - var messageConsumeLimitPerTopic = _configuration.MessageConsumeLimitPerTopic; foreach (var (topicPartition, lag) in topicPartitionsWithLag) @@ -83,6 +80,8 @@ Action, ConsumeResult> consumerCommitStr { continue; } + var consumer = _kafkaService.GetKafkaConsumer(); + var producer = _kafkaService.GetKafkaProducer(); var errorTopic = topicPartition.Topic; try @@ -95,7 +94,7 @@ Action, ConsumeResult> consumerCommitStr while (currentLag > 0 && messageConsumeLimitPerTopic > 0) { - var result = consumer.Consume(TimeSpan.FromSeconds(10)); + var result = consumer.Consume(TimeSpan.FromSeconds(3)); if (result is null) {