diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 5e21781..d8dffe4 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -19,7 +19,7 @@ jobs: name: Build & push Docker image with: image: kafka-retry-job - tags: 1.12.3, latest + tags: 1.12.4, latest registry: ghcr.io username: ${{ secrets.GHCR_USERNAME }} password: ${{ secrets.GHCR_TOKEN }} diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 2a3db68..4d48127 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -1,6 +1,6 @@ variables: - VERSION: "1.12.3" + VERSION: "1.12.4" DOCKER_IMAGE_VERSION: $GITLAB_REGISTRY_HOST/$CI_PROJECT_PATH:$VERSION DOCKER_IMAGE_LATEST: $GITLAB_REGISTRY_HOST/$CI_PROJECT_PATH diff --git a/CHANGELOG.MD b/CHANGELOG.MD index 5e85107..f567f63 100644 --- a/CHANGELOG.MD +++ b/CHANGELOG.MD @@ -1,5 +1,15 @@ # Changelog +## [1.12.4](https://github.com/github-changelog-generator/github-changelog-generator/tree/1.16.4) (2024-03) + +- [\#29](https://github.com/Trendyol/kafka-retry-job/issues/29) Add async programming to consume message in parallel +- [\#30](https://github.com/Trendyol/kafka-retry-job/issues/30) Catch exception based on Partitions +- [\#44](https://github.com/Trendyol/kafka-retry-job/issues/44) Limit message consumption based on Topic instead of Topic Partition + +**Merged pull requests:** + +- Pull Request for the issues #29, #30, #44 [\#43](https://github.com/Trendyol/kafka-retry-job/pull/43) ([ahmetfurkankavraz](https://github.com/ahmetfurkankavraz)) + ## [1.12.3](https://github.com/github-changelog-generator/github-changelog-generator/tree/1.16.4) (2024-03) - [\#15](https://github.com/Trendyol/kafka-retry-job/issues/15) Fix integration tests diff --git a/src/Services/Implementations/ConfigurationService.cs b/src/Services/Implementations/ConfigurationService.cs index 4a166b3..45d1ff1 100644 --- a/src/Services/Implementations/ConfigurationService.cs +++ b/src/Services/Implementations/ConfigurationService.cs @@ -21,7 +21,7 @@ public ConfigurationService(IConfiguration configuration) public string RetrySuffix => GetValueOrThrowInvalidConfigException("RetrySuffix"); public string RetryTopicNameInHeader => GetValue("RetryTopicNameInHeader"); - public long MessageConsumeLimitPerTopicPartition => GetValue("MessageConsumeLimitPerTopicPartition") ?? Int64.MaxValue; + public long MessageConsumeLimitPerTopic => GetValue("MessageConsumeLimitPerTopic") ?? Int64.MaxValue; public bool? EnableAutoCommit => GetValue("EnableAutoCommit") ?? false; public bool? EnableAutoOffsetStore => GetValue("EnableAutoOffsetStore") ?? false; @@ -40,6 +40,7 @@ public ConfigurationService(IConfiguration configuration) public int? MessageTimeoutMs => GetValue("ProducerMessageTimeoutMs"); public int? RequestTimeoutMs => GetValue("ProducerRequestTimeoutMs"); public int? MessageMaxBytes => GetValue("ProducerMessageMaxBytes"); + public int MaxLevelParallelism => GetValue("MaxLevelParallelism") ?? 1; private string GetValueOrThrowInvalidConfigException(string configName) { diff --git a/src/Services/Implementations/KafkaRetryJobService.cs b/src/Services/Implementations/KafkaRetryJobService.cs index 101f6fb..ac0d6ce 100644 --- a/src/Services/Implementations/KafkaRetryJobService.cs +++ b/src/Services/Implementations/KafkaRetryJobService.cs @@ -1,7 +1,9 @@ using System; using System.Collections.Generic; +using System.Collections.Immutable; using System.Linq; using System.Text.RegularExpressions; +using System.Threading; using System.Threading.Tasks; using Confluent.Kafka; using KafkaRetry.Job.Services.Interfaces; @@ -26,92 +28,112 @@ public KafkaRetryJobService(IKafkaService kafkaService, public async Task MoveMessages() { _logService.LogApplicationStarted(); - - using var assignedConsumer = _kafkaService.BuildKafkaConsumer(); - var adminClient = _kafkaService.BuildAdminClient(); - var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(120)); - adminClient.Dispose(); - var errorTopicPartitionsWithLag = GetErrorTopicInfosFromCluster(assignedConsumer, metadata); - var errorTopics = errorTopicPartitionsWithLag.Select(p => p.Item1.Topic).Distinct().ToList(); + var adminClient = _kafkaService.GetKafkaAdminClient(); + var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(120)); + _kafkaService.ReleaseKafkaAdminClient(ref adminClient); + + var consumer = _kafkaService.GetKafkaConsumer(); + var errorTopicsWithLag = GetErrorTopicInfosFromCluster(consumer, metadata); + var errorTopics = errorTopicsWithLag.Keys.ToList(); _logService.LogMatchingErrorTopics(errorTopics); - - using var producer = _kafkaService.BuildKafkaProducer(); - - var utcNow = DateTime.UtcNow; - + var consumerCommitStrategy= _kafkaService.GetConsumerCommitStrategy(); - try + var messageConsumeLimit = _configuration.MessageConsumeLimitPerTopic; + if (messageConsumeLimit <= 0) + { + _logService.LogMessageConsumeLimitIsZero(); + return; + } + + var maxDegreeOfParallelism = _configuration.MaxLevelParallelism; + var semaphore = new SemaphoreSlim(maxDegreeOfParallelism); + var tasks = new List(); + + foreach (var (_, topicPartitionsWithLag) in errorTopicsWithLag) { - var messageConsumeLimit = _configuration.MessageConsumeLimitPerTopicPartition; - if (messageConsumeLimit <= 0) + await semaphore.WaitAsync(); + tasks.Add(Task.Run(async () => + { + try + { + await MoveMessagesForTopic(topicPartitionsWithLag, consumerCommitStrategy); + } + finally + { + semaphore.Release(); + } + })); + } + + await Task.WhenAll(tasks.ToArray()); + + _logService.LogApplicationIsClosing(); + } + + private async Task MoveMessagesForTopic( + List<(TopicPartition, long)> topicPartitionsWithLag, + Action, ConsumeResult> consumerCommitStrategy + ) + { + var messageConsumeLimitPerTopic = _configuration.MessageConsumeLimitPerTopic; + + foreach (var (topicPartition, lag) in topicPartitionsWithLag) + { + if (lag <= 0) { - _logService.LogMessageConsumeLimitIsZero(); - return; + continue; } - - foreach (var (topicPartition, lag) in errorTopicPartitionsWithLag) + var consumer = _kafkaService.GetKafkaConsumer(); + var producer = _kafkaService.GetKafkaProducer(); + var errorTopic = topicPartition.Topic; + + try { - if (lag <= 0) - { - continue; - } - - var messageConsumeLimitForTopicPartition = messageConsumeLimit; _logService.LogStartOfSubscribingTopicPartition(topicPartition); - - var errorTopic = topicPartition.Topic; + var currentLag = lag; - - assignedConsumer.Assign(topicPartition); - while (currentLag > 0 && messageConsumeLimitForTopicPartition > 0) + consumer.Assign(topicPartition); + + while (currentLag > 0 && messageConsumeLimitPerTopic > 0) { - var result = assignedConsumer.Consume(TimeSpan.FromSeconds(3)); + var result = consumer.Consume(TimeSpan.FromSeconds(3)); if (result is null) { - break; + continue; } currentLag -= 1; - messageConsumeLimitForTopicPartition -= 1; - - var resultDate = result.Message.Timestamp.UtcDateTime; - - if (utcNow < resultDate) - { - _logService.LogNewMessageArrived(utcNow); - break; - } + messageConsumeLimitPerTopic -= 1; result.Message.Timestamp = new Timestamp(DateTime.UtcNow); var retryTopic = GetRetryTopicName(result, errorTopic); - + _logService.LogProducingMessage(result, errorTopic, retryTopic); - + await producer.ProduceAsync(retryTopic, result.Message); - consumerCommitStrategy.Invoke(assignedConsumer, result); + consumerCommitStrategy.Invoke(consumer, result); } _logService.LogEndOfSubscribingTopicPartition(topicPartition); } - - assignedConsumer.Unassign(); - - } - catch (Exception e) - { - _logService.LogError(e); - assignedConsumer.Unassign(); - throw; + catch (Exception e) + { + _logService.LogError(e); + } + finally + { + consumer.Unassign(); + _kafkaService.ReleaseKafkaConsumer(ref consumer); + _kafkaService.ReleaseKafkaProducer(ref producer); + } } - - _logService.LogApplicationIsClosing(); } private string GetRetryTopicName(ConsumeResult result , string errorTopic ) @@ -122,7 +144,7 @@ private string GetRetryTopicName(ConsumeResult result , string er errorTopic.ReplaceAtEnd(_configuration.ErrorSuffix, _configuration.RetrySuffix); } - private List<(TopicPartition, long)> GetErrorTopicInfosFromCluster(IConsumer assignedConsumer, Metadata metadata) + private IDictionary> GetErrorTopicInfosFromCluster(IConsumer assignedConsumer, Metadata metadata) { _logService.LogFetchingErrorTopicInfoStarted(); @@ -142,7 +164,12 @@ private string GetRetryTopicName(ConsumeResult result , string er var watermark = assignedConsumer.QueryWatermarkOffsets(tpo.TopicPartition, TimeSpan.FromSeconds(5)); var lag = tpo.Offset >= 0 ? watermark.High - tpo.Offset : watermark.High - watermark.Low; return (tpo.TopicPartition, lag); - }).ToList(); + }) + .GroupBy(t => t.Item1.Topic) + .ToImmutableDictionary( + t => t.Key, + t => t.ToList() + ); _logService.LogFetchingErrorTopicInfoFinished(); diff --git a/src/Services/Implementations/KafkaService.cs b/src/Services/Implementations/KafkaService.cs index 9560275..9741bf9 100644 --- a/src/Services/Implementations/KafkaService.cs +++ b/src/Services/Implementations/KafkaService.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Concurrent; using Confluent.Kafka; using KafkaRetry.Job.Helpers.KafkaConfigs; using KafkaRetry.Job.Services.Interfaces; @@ -8,13 +9,34 @@ namespace KafkaRetry.Job.Services.Implementations; public class KafkaService : IKafkaService { private readonly ConfigurationService _configuration; + private readonly ConcurrentBag> _consumers = new(); + private readonly ConcurrentBag> _producers = new(); + private readonly ConcurrentBag _adminClients = new(); public KafkaService(ConfigurationService configuration) { _configuration = configuration; } - public IConsumer BuildKafkaConsumer() + ~KafkaService() + { + while (_consumers.TryTake(out var consumer)) + { + consumer.Dispose(); + } + + while (_producers.TryTake(out var producer)) + { + producer.Dispose(); + } + + while (_adminClients.TryTake(out var adminClient)) + { + adminClient.Dispose(); + } + } + + private IConsumer BuildKafkaConsumer() { var bootstrapServers = _configuration.BootstrapServers; var groupId = _configuration.GroupId; @@ -24,7 +46,7 @@ public IConsumer BuildKafkaConsumer() return consumerBuilder.Build(); } - public IProducer BuildKafkaProducer() + private IProducer BuildKafkaProducer() { var bootstrapServers = _configuration.BootstrapServers; var producerConfig = CreateProducerConfig(bootstrapServers); @@ -32,8 +54,8 @@ public IProducer BuildKafkaProducer() return producerBuilder.Build(); } - - public IAdminClient BuildAdminClient() + + private IAdminClient BuildAdminClient() { var bootstrapServers = _configuration.BootstrapServers; var adminClientConfig = CreateAdminClientConfig(bootstrapServers); @@ -41,6 +63,39 @@ public IAdminClient BuildAdminClient() return adminClientBuilder.Build(); } + + public IConsumer GetKafkaConsumer() + { + return _consumers.TryTake(out var consumer) ? consumer : BuildKafkaConsumer(); + } + + public IProducer GetKafkaProducer() + { + return _producers.TryTake(out var producer) ? producer : BuildKafkaProducer(); + } + + public IAdminClient GetKafkaAdminClient() + { + return _adminClients.TryTake(out var adminClient) ? adminClient : BuildAdminClient(); + } + + public void ReleaseKafkaConsumer(ref IConsumer consumer) + { + _consumers.Add(consumer); + consumer = null; + } + + public void ReleaseKafkaProducer(ref IProducer producer) + { + _producers.Add(producer); + producer = null; + } + + public void ReleaseKafkaAdminClient(ref IAdminClient adminClient) + { + _adminClients.Add(adminClient); + adminClient = null; + } private ClientConfig CreateClientConfig(string bootstrapServers) { diff --git a/src/Services/Interfaces/IKafkaService.cs b/src/Services/Interfaces/IKafkaService.cs index 46f9c1a..0b6cb91 100644 --- a/src/Services/Interfaces/IKafkaService.cs +++ b/src/Services/Interfaces/IKafkaService.cs @@ -5,9 +5,12 @@ namespace KafkaRetry.Job.Services.Interfaces { public interface IKafkaService { - IConsumer BuildKafkaConsumer(); - IProducer BuildKafkaProducer(); - IAdminClient BuildAdminClient(); + public IConsumer GetKafkaConsumer(); + public IProducer GetKafkaProducer(); + public IAdminClient GetKafkaAdminClient(); + public void ReleaseKafkaConsumer(ref IConsumer consumer); + public void ReleaseKafkaProducer(ref IProducer producer); + public void ReleaseKafkaAdminClient(ref IAdminClient adminClient); public Action, ConsumeResult> GetConsumerCommitStrategy(); } } \ No newline at end of file