Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hotfix/set offset bug #73

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions src/kafka-net/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class Consumer : IMetadataQueries, IDisposable
private readonly BlockingCollection<Message> _fetchResponseQueue;
private readonly CancellationTokenSource _disposeToken = new CancellationTokenSource();
private readonly ConcurrentDictionary<int, Task> _partitionPollingIndex = new ConcurrentDictionary<int, Task>();
private readonly ConcurrentDictionary<int, long> _partitionOffsetIndex = new ConcurrentDictionary<int, long>();
private readonly ConcurrentDictionary<int, Tuple<long, bool>> _partitionOffsetIndex = new ConcurrentDictionary<int, Tuple<long, bool>>();
private readonly IMetadataQueries _metadataQueries;

private int _disposeCount;
Expand All @@ -33,7 +33,7 @@ public Consumer(ConsumerOptions options, params OffsetPosition[] positions)
_options = options;
_fetchResponseQueue = new BlockingCollection<Message>(_options.ConsumerBufferSize);
_metadataQueries = new MetadataQueries(_options.Router);

SetOffsetPosition(positions);
}

Expand Down Expand Up @@ -62,7 +62,7 @@ public void SetOffsetPosition(params OffsetPosition[] positions)
foreach (var position in positions)
{
var temp = position;
_partitionOffsetIndex.AddOrUpdate(position.PartitionId, i => temp.Offset, (i, l) => temp.Offset);
_partitionOffsetIndex.AddOrUpdate(position.PartitionId, i => new Tuple<long, bool>(temp.Offset, true), (i, l) => new Tuple<long, bool>(temp.Offset, true));
}
}

Expand All @@ -73,7 +73,7 @@ public void SetOffsetPosition(params OffsetPosition[] positions)
/// <remarks>Will only return data if the consumer is actively being consumed.</remarks>
public List<OffsetPosition> GetOffsetPosition()
{
return _partitionOffsetIndex.Select(x => new OffsetPosition { PartitionId = x.Key, Offset = x.Value }).ToList();
return _partitionOffsetIndex.Select(x => new OffsetPosition { PartitionId = x.Key, Offset = x.Value.Item1 }).ToList();
}

private void EnsurePartitionPollingThreads()
Expand Down Expand Up @@ -125,7 +125,7 @@ private Task ConsumeTopicPartitionAsync(string topic, int partitionId)
{
//get the current offset, or default to zero if not there.
long offset = 0;
_partitionOffsetIndex.AddOrUpdate(partitionId, i => offset, (i, currentOffset) => { offset = currentOffset; return currentOffset; });
_partitionOffsetIndex.AddOrUpdate(partitionId, i => new Tuple<long, bool>(offset, false), (i, currentOffset) => { offset = currentOffset.Item1; return new Tuple<long, bool>(currentOffset.Item1, false); });

//build a fetch request for partition at offset
var fetch = new Fetch
Expand All @@ -139,11 +139,11 @@ private Task ConsumeTopicPartitionAsync(string topic, int partitionId)
var fetches = new List<Fetch> { fetch };

var fetchRequest = new FetchRequest
{
MaxWaitTime = (int)Math.Min((long)int.MaxValue, _options.MaxWaitTimeForMinimumBytes.TotalMilliseconds),
MinBytes = _options.MinimumBytes,
Fetches = fetches
};
{
MaxWaitTime = (int)Math.Min((long)int.MaxValue, _options.MaxWaitTimeForMinimumBytes.TotalMilliseconds),
MinBytes = _options.MinimumBytes,
Fetches = fetches
};

//make request and post to queue
var route = _options.Router.SelectBrokerRoute(topic, partitionId);
Expand All @@ -166,7 +166,7 @@ private Task ConsumeTopicPartitionAsync(string topic, int partitionId)
}

var nextOffset = response.Messages.Max(x => x.Meta.Offset) + 1;
_partitionOffsetIndex.AddOrUpdate(partitionId, i => nextOffset, (i, l) => nextOffset);
_partitionOffsetIndex.AddOrUpdate(partitionId, i => new Tuple<long, bool>(nextOffset, false), (i, l) => l.Item2 ? new Tuple<long, bool>(l.Item1, false) : new Tuple<long, bool>(nextOffset, false));

// sleep is not needed if responses were received
continue;
Expand Down
46 changes: 35 additions & 11 deletions src/kafka-tests/Integration/ProducerConsumerIntegrationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ public void ConsumerShouldConsumeInSameOrderAsProduced()
[Test]
public void ConsumerShouldBeAbleToSeekBackToEarlierOffset()
{
var expected = new List<string> { "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19" };
var expectedFrom0 = new List<string> { "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19" };
var expectedFrom250 = new List<string> { "250", "251", "252", "253", "254", "255", "256", "257", "258", "259", "260", "261", "262", "263", "264", "265", "266", "267", "268", "269" };
var expectedFrom500 = new List<string> { "500", "501", "502", "503", "504", "505", "506", "507", "508", "509", "510", "511", "512", "513", "514", "515", "516", "517", "518", "519" };
var testId = Guid.NewGuid().ToString();

using (var router = new BrokerRouter(new KafkaOptions(IntegrationConfig.IntegrationUri)))
Expand All @@ -86,32 +88,54 @@ public void ConsumerShouldBeAbleToSeekBackToEarlierOffset()
var offsets = producer.GetTopicOffsetAsync(IntegrationConfig.IntegrationTopic).Result
.Select(x => new OffsetPosition(x.PartitionId, x.Offsets.Max())).ToArray();

var initialOffset = offsets.First().Offset;

using (var consumer = new Consumer(new ConsumerOptions(IntegrationConfig.IntegrationTopic, router), offsets))
{
for (int i = 0; i < 20; i++)
{
producer.SendMessageAsync(IntegrationConfig.IntegrationTopic, new[] { new Message(i.ToString(), testId) }).Wait();
}
int iter = 0;
producer.SendMessageAsync(IntegrationConfig.IntegrationTopic, new int[1000].Select(i => new Message(iter++.ToString(), testId))).Wait();

consumer.Consume().Take(900).ToList();

var sentMessages = consumer.Consume().Take(20).ToList();
consumer.SetOffsetPosition(offsets);
var sentMessages = consumer.Consume().SkipWhile(x => x.Meta.Offset != offsets.First().Offset).Take(20).ToList();

//ensure the produced messages arrived
Console.WriteLine("Message order: {0}", string.Join(", ", sentMessages.Select(x => x.Value.ToUtf8String()).ToList()));

Assert.That(sentMessages.Count, Is.EqualTo(20));
Assert.That(sentMessages.Select(x => x.Value.ToUtf8String()).ToList(), Is.EqualTo(expected));
Assert.That(sentMessages.Select(x => x.Value.ToUtf8String()).ToList(), Is.EqualTo(expectedFrom0));
Assert.That(sentMessages.Any(x => x.Key.ToUtf8String() != testId), Is.False);

consumer.Consume().Take(900).ToList();

//seek back to initial offset
offsets.First().Offset = initialOffset + 500;
consumer.SetOffsetPosition(offsets);

var resetPositionMessages = consumer.Consume().SkipWhile(x => x.Meta.Offset != offsets.First().Offset).Take(20).ToList();

//ensure all produced messages arrive again
Console.WriteLine("Message order: {0}", string.Join(", ", resetPositionMessages.Select(x => x.Value.ToUtf8String()).ToList()));

Assert.That(resetPositionMessages.Count, Is.EqualTo(20));
var actual = resetPositionMessages.Select(x => x.Value.ToUtf8String()).ToList();
Assert.That(actual, Is.EqualTo(expectedFrom500));
Assert.That(resetPositionMessages.Any(x => x.Key.ToUtf8String() != testId), Is.False);

consumer.Consume().Take(400).ToList();

//seek back to initial offset
offsets.First().Offset = initialOffset + 250;
consumer.SetOffsetPosition(offsets);

var resetPositionMessages = consumer.Consume().Take(20).ToList();
resetPositionMessages = consumer.Consume().SkipWhile(x => x.Meta.Offset != offsets.First().Offset).Take(20).ToList();

//ensure all produced messages arrive again
Console.WriteLine("Message order: {0}", string.Join(", ", resetPositionMessages.Select(x => x.Value).ToList()));
Console.WriteLine("Message order: {0}", string.Join(", ", resetPositionMessages.Select(x => x.Value.ToUtf8String()).ToList()));

Assert.That(resetPositionMessages.Count, Is.EqualTo(20));
Assert.That(resetPositionMessages.Select(x => x.Value.ToUtf8String()).ToList(), Is.EqualTo(expected));
Assert.That(resetPositionMessages.Select(x => x.Value.ToUtf8String()).ToList(), Is.EqualTo(expectedFrom250));
Assert.That(resetPositionMessages.Any(x => x.Key.ToUtf8String() != testId), Is.False);
}
}
Expand Down Expand Up @@ -198,7 +222,7 @@ public async void ConsumerShouldMoveToNextAvailableOffsetWhenQueryingForNextMess
offsets.Select(x => new OffsetPosition(x.PartitionId, x.Offsets.Max())).ToArray()))
{
Console.WriteLine("Sending {0} test messages", expectedCount);
var response = await producer.SendMessageAsync(IntegrationConfig.IntegrationTopic,
var response = await producer.SendMessageAsync(IntegrationConfig.IntegrationTopic,
Enumerable.Range(0, expectedCount).Select(x => new Message(x.ToString())));

Assert.That(response.Any(x => x.Error != (int)ErrorResponseCode.NoError), Is.False, "Error occured sending test messages to server.");
Expand Down