Skip to content

Commit

Permalink
Trendyol#29 prevents to initialize kafka consumers for empty topic pa…
Browse files Browse the repository at this point in the history
…rtitions
  • Loading branch information
furkan.kavraz committed May 20, 2024
1 parent e60a4c0 commit 2eecccf
Showing 1 changed file with 3 additions and 4 deletions.
7 changes: 3 additions & 4 deletions src/Services/Implementations/KafkaRetryJobService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@ private async Task MoveMessagesForTopic(
Action<IConsumer<string,string>, ConsumeResult<string,string>> consumerCommitStrategy
)
{
var consumer = _kafkaService.GetKafkaConsumer();
var producer = _kafkaService.GetKafkaProducer();

var messageConsumeLimitPerTopic = _configuration.MessageConsumeLimitPerTopic;

foreach (var (topicPartition, lag) in topicPartitionsWithLag)
Expand All @@ -83,6 +80,8 @@ Action<IConsumer<string,string>, ConsumeResult<string,string>> consumerCommitStr
{
continue;
}
var consumer = _kafkaService.GetKafkaConsumer();
var producer = _kafkaService.GetKafkaProducer();
var errorTopic = topicPartition.Topic;

try
Expand All @@ -95,7 +94,7 @@ Action<IConsumer<string,string>, ConsumeResult<string,string>> consumerCommitStr

while (currentLag > 0 && messageConsumeLimitPerTopic > 0)
{
var result = consumer.Consume(TimeSpan.FromSeconds(10));
var result = consumer.Consume(TimeSpan.FromSeconds(3));

if (result is null)
{
Expand Down

0 comments on commit 2eecccf

Please sign in to comment.