Skip to content

Commit

Permalink
Merge branch 'hotfix-4.6.3'
Browse files Browse the repository at this point in the history
  • Loading branch information
John Simons committed Jul 17, 2014
2 parents 791019e + 3672d0f commit cf24a0c
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 24 deletions.
22 changes: 16 additions & 6 deletions src/NServiceBus.Core.Tests/Timeout/When_pooling_timeouts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ namespace NServiceBus.Core.Tests.Timeout
using Raven.Client.Embedded;

[TestFixture]
[Explicit]
public class When_pooling_timeouts_with_raven : When_pooling_timeouts
{
private IDocumentStore store;
Expand All @@ -37,10 +36,21 @@ public void Cleanup()
{
store.Dispose();
}

[Test]
public void Should_retrieve_all_timeout_messages_that_expired_even_if_it_needs_to_page()
{
expected = 1024 + 5;

Enumerable.Range(1, expected).ToList().ForEach(i => persister.Add(CreateData(DateTime.UtcNow.AddSeconds(-5))));

StartAndStopReceiver(5);

WaitForMessagesThenAssert(5);
}
}

[TestFixture]
[Explicit]
public class When_pooling_timeouts_with_inMemory : When_pooling_timeouts
{
protected override IPersistTimeouts CreateTimeoutPersister()
Expand All @@ -54,7 +64,7 @@ public abstract class When_pooling_timeouts
private IManageTimeouts manager;
private FakeMessageSender messageSender;
readonly Random rand = new Random();
private int expected;
protected int expected;

protected IPersistTimeouts persister;
protected TimeoutPersisterReceiver receiver;
Expand Down Expand Up @@ -187,14 +197,14 @@ private void Push(int total, DateTime time)
Enumerable.Range(1, total).ToList().ForEach(i => manager.PushTimeout(CreateData(time)));
}

private void StartAndStopReceiver(int secondsToWaitBeforeCallingStop = 1)
protected void StartAndStopReceiver(int secondsToWaitBeforeCallingStop = 1)
{
receiver.Start();
Thread.Sleep(TimeSpan.FromSeconds(secondsToWaitBeforeCallingStop));
receiver.Stop();
}

private static TimeoutData CreateData(DateTime time)
protected static TimeoutData CreateData(DateTime time)
{
return new TimeoutData
{
Expand All @@ -204,7 +214,7 @@ private static TimeoutData CreateData(DateTime time)
};
}

private void WaitForMessagesThenAssert(int maxSecondsToWait)
protected void WaitForMessagesThenAssert(int maxSecondsToWait)
{
var maxTime = DateTime.Now.AddSeconds(maxSecondsToWait);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,33 +72,56 @@ public List<Tuple<string, DateTime>> GetNextChunk(DateTime startSlice, out DateT
results.AddRange(GetCleanupChunk(startSlice));
}

var skip = 0;
var numberOfRequestsExecutedSoFar = 0;
RavenQueryStatistics stats;
using (var session = OpenSession())
do
{
var query = GetChunkQuery(session)
.Where(t => t.Time > startSlice && t.Time <= now)
.Statistics(out stats)
.Select(t => new
using (var session = OpenSession())
{
session.Advanced.AllowNonAuthoritativeInformation = true;

var query = GetChunkQuery(session)
.Where(
t =>
t.Time > startSlice &&
t.Time <= now)
.Select(t => new { t.Id, t.Time })
.Statistics(out stats);
do
{
t.Id,
t.Time
})
.Take(1024);

results.AddRange(query.ToList()
.Select(arg => new Tuple<string, DateTime>(arg.Id, arg.Time))
);
}

// Set next execution to be now if we haven't consumed the entire thing or received stale results.
results.AddRange(query
.Skip(skip)
.Take(1024)
.ToList()
.Select(arg => new Tuple<string, DateTime>(arg.Id, arg.Time)));

skip += 1024;
} while (skip < stats.TotalResults &&
++numberOfRequestsExecutedSoFar < session.Advanced.MaxNumberOfRequestsPerSession);
}
} while (skip < stats.TotalResults);

// Set next execution to be now if we received stale results.
// Delay the next execution a bit if we results weren't stale and we got the full chunk.
if (stats.TotalResults > 1024 || stats.IsStale)
if (stats.IsStale)
{
nextTimeToRunQuery = now;
}
else
{
nextTimeToRunQuery = DateTime.UtcNow.AddMinutes(10);
using (var session = OpenSession())
{
var beginningOfNextChunk = GetChunkQuery(session)
.Where(t => t.Time > now)
.Take(1)
.Select(t => t.Time)
.FirstOrDefault();

nextTimeToRunQuery = (beginningOfNextChunk == default(DateTime))
? DateTime.UtcNow.AddMinutes(10)
: beginningOfNextChunk.ToUniversalTime();
}
}

return results;
Expand Down

0 comments on commit cf24a0c

Please sign in to comment.