diff --git a/src/kafka-net/Consumer.cs b/src/kafka-net/Consumer.cs index 99b9afcb..05037442 100644 --- a/src/kafka-net/Consumer.cs +++ b/src/kafka-net/Consumer.cs @@ -21,7 +21,7 @@ public class Consumer : IMetadataQueries, IDisposable private readonly BlockingCollection _fetchResponseQueue; private readonly CancellationTokenSource _disposeToken = new CancellationTokenSource(); private readonly ConcurrentDictionary _partitionPollingIndex = new ConcurrentDictionary(); - private readonly ConcurrentDictionary _partitionOffsetIndex = new ConcurrentDictionary(); + private readonly ConcurrentDictionary> _partitionOffsetIndex = new ConcurrentDictionary>(); private readonly IMetadataQueries _metadataQueries; private int _disposeCount; @@ -33,7 +33,7 @@ public Consumer(ConsumerOptions options, params OffsetPosition[] positions) _options = options; _fetchResponseQueue = new BlockingCollection(_options.ConsumerBufferSize); _metadataQueries = new MetadataQueries(_options.Router); - + SetOffsetPosition(positions); } @@ -62,7 +62,7 @@ public void SetOffsetPosition(params OffsetPosition[] positions) foreach (var position in positions) { var temp = position; - _partitionOffsetIndex.AddOrUpdate(position.PartitionId, i => temp.Offset, (i, l) => temp.Offset); + _partitionOffsetIndex.AddOrUpdate(position.PartitionId, i => new Tuple(temp.Offset, true), (i, l) => new Tuple(temp.Offset, true)); } } @@ -73,7 +73,7 @@ public void SetOffsetPosition(params OffsetPosition[] positions) /// Will only return data if the consumer is actively being consumed. public List GetOffsetPosition() { - return _partitionOffsetIndex.Select(x => new OffsetPosition { PartitionId = x.Key, Offset = x.Value }).ToList(); + return _partitionOffsetIndex.Select(x => new OffsetPosition { PartitionId = x.Key, Offset = x.Value.Item1 }).ToList(); } private void EnsurePartitionPollingThreads() @@ -125,7 +125,7 @@ private Task ConsumeTopicPartitionAsync(string topic, int partitionId) { //get the current offset, or default to zero if not there. long offset = 0; - _partitionOffsetIndex.AddOrUpdate(partitionId, i => offset, (i, currentOffset) => { offset = currentOffset; return currentOffset; }); + _partitionOffsetIndex.AddOrUpdate(partitionId, i => new Tuple(offset, false), (i, currentOffset) => { offset = currentOffset.Item1; return new Tuple(currentOffset.Item1, false); }); //build a fetch request for partition at offset var fetch = new Fetch @@ -139,11 +139,11 @@ private Task ConsumeTopicPartitionAsync(string topic, int partitionId) var fetches = new List { fetch }; var fetchRequest = new FetchRequest - { - MaxWaitTime = (int)Math.Min((long)int.MaxValue, _options.MaxWaitTimeForMinimumBytes.TotalMilliseconds), - MinBytes = _options.MinimumBytes, - Fetches = fetches - }; + { + MaxWaitTime = (int)Math.Min((long)int.MaxValue, _options.MaxWaitTimeForMinimumBytes.TotalMilliseconds), + MinBytes = _options.MinimumBytes, + Fetches = fetches + }; //make request and post to queue var route = _options.Router.SelectBrokerRoute(topic, partitionId); @@ -166,7 +166,7 @@ private Task ConsumeTopicPartitionAsync(string topic, int partitionId) } var nextOffset = response.Messages.Max(x => x.Meta.Offset) + 1; - _partitionOffsetIndex.AddOrUpdate(partitionId, i => nextOffset, (i, l) => nextOffset); + _partitionOffsetIndex.AddOrUpdate(partitionId, i => new Tuple(nextOffset, false), (i, l) => l.Item2 ? new Tuple(l.Item1, false) : new Tuple(nextOffset, false)); // sleep is not needed if responses were received continue; diff --git a/src/kafka-tests/Integration/ProducerConsumerIntegrationTests.cs b/src/kafka-tests/Integration/ProducerConsumerIntegrationTests.cs index 9a88d9d0..58a1493d 100644 --- a/src/kafka-tests/Integration/ProducerConsumerIntegrationTests.cs +++ b/src/kafka-tests/Integration/ProducerConsumerIntegrationTests.cs @@ -77,7 +77,9 @@ public void ConsumerShouldConsumeInSameOrderAsProduced() [Test] public void ConsumerShouldBeAbleToSeekBackToEarlierOffset() { - var expected = new List { "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19" }; + var expectedFrom0 = new List { "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19" }; + var expectedFrom250 = new List { "250", "251", "252", "253", "254", "255", "256", "257", "258", "259", "260", "261", "262", "263", "264", "265", "266", "267", "268", "269" }; + var expectedFrom500 = new List { "500", "501", "502", "503", "504", "505", "506", "507", "508", "509", "510", "511", "512", "513", "514", "515", "516", "517", "518", "519" }; var testId = Guid.NewGuid().ToString(); using (var router = new BrokerRouter(new KafkaOptions(IntegrationConfig.IntegrationUri))) @@ -86,32 +88,54 @@ public void ConsumerShouldBeAbleToSeekBackToEarlierOffset() var offsets = producer.GetTopicOffsetAsync(IntegrationConfig.IntegrationTopic).Result .Select(x => new OffsetPosition(x.PartitionId, x.Offsets.Max())).ToArray(); + var initialOffset = offsets.First().Offset; + using (var consumer = new Consumer(new ConsumerOptions(IntegrationConfig.IntegrationTopic, router), offsets)) { - for (int i = 0; i < 20; i++) - { - producer.SendMessageAsync(IntegrationConfig.IntegrationTopic, new[] { new Message(i.ToString(), testId) }).Wait(); - } + int iter = 0; + producer.SendMessageAsync(IntegrationConfig.IntegrationTopic, new int[1000].Select(i => new Message(iter++.ToString(), testId))).Wait(); + + consumer.Consume().Take(900).ToList(); - var sentMessages = consumer.Consume().Take(20).ToList(); + consumer.SetOffsetPosition(offsets); + var sentMessages = consumer.Consume().SkipWhile(x => x.Meta.Offset != offsets.First().Offset).Take(20).ToList(); //ensure the produced messages arrived Console.WriteLine("Message order: {0}", string.Join(", ", sentMessages.Select(x => x.Value.ToUtf8String()).ToList())); Assert.That(sentMessages.Count, Is.EqualTo(20)); - Assert.That(sentMessages.Select(x => x.Value.ToUtf8String()).ToList(), Is.EqualTo(expected)); + Assert.That(sentMessages.Select(x => x.Value.ToUtf8String()).ToList(), Is.EqualTo(expectedFrom0)); Assert.That(sentMessages.Any(x => x.Key.ToUtf8String() != testId), Is.False); + consumer.Consume().Take(900).ToList(); + + //seek back to initial offset + offsets.First().Offset = initialOffset + 500; + consumer.SetOffsetPosition(offsets); + + var resetPositionMessages = consumer.Consume().SkipWhile(x => x.Meta.Offset != offsets.First().Offset).Take(20).ToList(); + + //ensure all produced messages arrive again + Console.WriteLine("Message order: {0}", string.Join(", ", resetPositionMessages.Select(x => x.Value.ToUtf8String()).ToList())); + + Assert.That(resetPositionMessages.Count, Is.EqualTo(20)); + var actual = resetPositionMessages.Select(x => x.Value.ToUtf8String()).ToList(); + Assert.That(actual, Is.EqualTo(expectedFrom500)); + Assert.That(resetPositionMessages.Any(x => x.Key.ToUtf8String() != testId), Is.False); + + consumer.Consume().Take(400).ToList(); + //seek back to initial offset + offsets.First().Offset = initialOffset + 250; consumer.SetOffsetPosition(offsets); - var resetPositionMessages = consumer.Consume().Take(20).ToList(); + resetPositionMessages = consumer.Consume().SkipWhile(x => x.Meta.Offset != offsets.First().Offset).Take(20).ToList(); //ensure all produced messages arrive again - Console.WriteLine("Message order: {0}", string.Join(", ", resetPositionMessages.Select(x => x.Value).ToList())); + Console.WriteLine("Message order: {0}", string.Join(", ", resetPositionMessages.Select(x => x.Value.ToUtf8String()).ToList())); Assert.That(resetPositionMessages.Count, Is.EqualTo(20)); - Assert.That(resetPositionMessages.Select(x => x.Value.ToUtf8String()).ToList(), Is.EqualTo(expected)); + Assert.That(resetPositionMessages.Select(x => x.Value.ToUtf8String()).ToList(), Is.EqualTo(expectedFrom250)); Assert.That(resetPositionMessages.Any(x => x.Key.ToUtf8String() != testId), Is.False); } } @@ -198,7 +222,7 @@ public async void ConsumerShouldMoveToNextAvailableOffsetWhenQueryingForNextMess offsets.Select(x => new OffsetPosition(x.PartitionId, x.Offsets.Max())).ToArray())) { Console.WriteLine("Sending {0} test messages", expectedCount); - var response = await producer.SendMessageAsync(IntegrationConfig.IntegrationTopic, + var response = await producer.SendMessageAsync(IntegrationConfig.IntegrationTopic, Enumerable.Range(0, expectedCount).Select(x => new Message(x.ToString()))); Assert.That(response.Any(x => x.Error != (int)ErrorResponseCode.NoError), Is.False, "Error occured sending test messages to server.");