Skip to content

Commit

Permalink
Add API for configuring rate limiting (#6196)
Browse files Browse the repository at this point in the history
Add basic system outage rate limiter

Co-authored-by: Szymon Pobiega <[email protected]>
Co-authored-by: Michał Wójcik <[email protected]>
  • Loading branch information
3 people authored Dec 15, 2021
1 parent 0fae8bb commit 7d94fcf
Show file tree
Hide file tree
Showing 17 changed files with 810 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
namespace NServiceBus.AcceptanceTests.Core.Recoverability
{
using System;
using System.Threading;
using System.Threading.Tasks;
using AcceptanceTesting;
using EndpointTemplates;
using NUnit.Framework;

public class When_messages_never_succeed : NServiceBusAcceptanceTest
{
public static int NumberOfConsecutiveFailuresBeforeThrottling = 1;
public static TimeSpan TimeToWaitBetweenThrottledAttempts = TimeSpan.FromSeconds(0);

[Test]
public async Task Should_throttle_pipeline_after_configured_number_of_consecutive_failures()
{
NumberOfConsecutiveFailuresBeforeThrottling = 5;

var context = await Scenario.Define<Context>()
.WithEndpoint<EndpointWithFailingHandler>(b => b
.DoNotFailOnErrorMessages()
.When(async (session, ctx) =>
{
for (var x = 0; x < 10; x++)
{
await session.SendLocal(new InitiatingMessage
{
Id = ctx.TestRunId
});
}
})
)
.Done(c => c.ThrottleModeEntered && c.FailuresBeforeThrottling >= NumberOfConsecutiveFailuresBeforeThrottling)
.Run();
}

[Test]
public async Task Should_not_throttle_pipeline_if_number_of_consecutive_failures_is_below_threshold()
{
NumberOfConsecutiveFailuresBeforeThrottling = 100;

var context = await Scenario.Define<Context>()
.WithEndpoint<EndpointWithFailingHandler>(b => b
.DoNotFailOnErrorMessages()
.When(async (session, ctx) =>
{
for (var x = 0; x < 10; x++)
{
await session.SendLocal(new InitiatingMessage
{
Id = ctx.TestRunId
});
}
})
)
.Done(c => c.FailuresBeforeThrottling == 10 && !c.ThrottleModeEntered)
.Run();
}

class Context : ScenarioContext
{
public bool ThrottleModeEntered { get; set; }
public static int failuresBeforeThrottling;
public DateTime LastProcessedTimeStamp { get; set; }
public TimeSpan TimeBetweenProcessingAttempts { get; set; }

public int FailuresBeforeThrottling => failuresBeforeThrottling;
}

class EndpointWithFailingHandler : EndpointConfigurationBuilder
{
public EndpointWithFailingHandler()
{
EndpointSetup<DefaultServer>((config, context) =>
{
config.LimitMessageProcessingConcurrencyTo(3);
var scenarioContext = (Context)context.ScenarioContext;
var recoverability = config.Recoverability();
recoverability.Immediate(i => i.NumberOfRetries(0));
recoverability.Delayed(d => d.NumberOfRetries(0));
var rateLimitingSettings = new RateLimitSettings(TimeToWaitBetweenThrottledAttempts, () =>
{
scenarioContext.ThrottleModeEntered = true;
return Task.FromResult(0);
});
recoverability.OnConsecutiveFailures(NumberOfConsecutiveFailuresBeforeThrottling, rateLimitingSettings);
});
}

class InitiatingHandler : IHandleMessages<InitiatingMessage>
{
public InitiatingHandler(Context testContext)
{
this.testContext = testContext;
}
public Task Handle(InitiatingMessage initiatingMessage, IMessageHandlerContext context)
{
if (testContext.ThrottleModeEntered)
{
testContext.TimeBetweenProcessingAttempts = DateTime.Now - testContext.LastProcessedTimeStamp;
}

testContext.LastProcessedTimeStamp = DateTime.Now;

Interlocked.Increment(ref Context.failuresBeforeThrottling);

throw new SimulatedException("THIS IS A MESSAGE THAT WILL NEVER SUCCEED");
}
Context testContext;
}
}

public class InitiatingMessage : IMessage
{
public Guid Id { get; set; }
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
namespace NServiceBus.AcceptanceTests.Core.Recoverability
{
using System;
using System.Threading;
using System.Threading.Tasks;
using AcceptanceTesting;
using EndpointTemplates;
using NUnit.Framework;

public class When_messages_never_succeed_with_delays_specified : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_wait_the_configured_delay_between_processing_attempts_in_throttled_mode()
{
var context = await Scenario.Define<Context>()
.WithEndpoint<EndpointWithFailingHandler>(b => b
.DoNotFailOnErrorMessages()
.When(async (session, ctx) =>
{
for (var x = 0; x < 5; x++)
{
await session.SendLocal(new InitiatingMessage
{
Id = ctx.TestRunId
});
}
})
)
.Done(c => c.ThrottleModeEntered && c.TimeBetweenProcessingAttempts >= TimeSpan.FromSeconds(2))
.Run();
}

class Context : ScenarioContext
{
public bool ThrottleModeEntered { get; set; }
public static int failuresBeforeThrottling;
public DateTime LastProcessedTimeStamp { get; set; }
public TimeSpan TimeBetweenProcessingAttempts { get; set; }
}

class EndpointWithFailingHandler : EndpointConfigurationBuilder
{
public EndpointWithFailingHandler()
{
EndpointSetup<DefaultServer>((config, context) =>
{
var scenarioContext = (Context)context.ScenarioContext;
config.LimitMessageProcessingConcurrencyTo(1);
var recoverability = config.Recoverability();
recoverability.Immediate(i => i.NumberOfRetries(0));
recoverability.Delayed(d => d.NumberOfRetries(0));
var rateLimitingSettings = new RateLimitSettings(TimeSpan.FromSeconds(2), () =>
{
scenarioContext.ThrottleModeEntered = true;
return Task.FromResult(0);
});
recoverability.OnConsecutiveFailures(1, rateLimitingSettings);
});
}

class InitiatingHandler : IHandleMessages<InitiatingMessage>
{
public InitiatingHandler(Context context)
{
testContext = context;
}
public Task Handle(InitiatingMessage initiatingMessage, IMessageHandlerContext context)
{
if (testContext.ThrottleModeEntered)
{
testContext.TimeBetweenProcessingAttempts = DateTime.Now - testContext.LastProcessedTimeStamp;
}

testContext.LastProcessedTimeStamp = DateTime.Now;

Interlocked.Increment(ref Context.failuresBeforeThrottling);

throw new SimulatedException("THIS IS A MESSAGE THAT WILL NEVER SUCCEED");
}
Context testContext;
}
}

public class InitiatingMessage : IMessage
{
public Guid Id { get; set; }
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
namespace NServiceBus.AcceptanceTests.Core.Recoverability
{
using System;
using System.Threading.Tasks;
using AcceptanceTesting;
using EndpointTemplates;
using NUnit.Framework;

public class When_messages_succeed_after_throttling : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_end_throttling_mode_after_a_single_successful_message()
{
var context = await Scenario.Define<Context>()
.WithEndpoint<EndpointWithFailingHandler>(b => b
.DoNotFailOnErrorMessages()
.When(async (session, ctx) =>
{
for (var x = 0; x < 10; x++)
{
await session.SendLocal(new InitiatingMessage
{
Id = ctx.TestRunId
});
}
})
)
.Done(c => c.MessageProcessedNormally && c.ThrottleModeEnded)
.Run();
}

class Context : ScenarioContext
{
public bool ThrottleModeEntered { get; set; }
public bool ThrottleModeEnded { get; set; }
public bool MessageProcessedNormally { get; set; }
}

class EndpointWithFailingHandler : EndpointConfigurationBuilder
{
public EndpointWithFailingHandler()
{
EndpointSetup<DefaultServer>((config, context) =>
{
var scenarioContext = (Context)context.ScenarioContext;
config.LimitMessageProcessingConcurrencyTo(3);
var recoverability = config.Recoverability();
recoverability.Immediate(i => i.NumberOfRetries(0));
recoverability.Delayed(d => d.NumberOfRetries(0));
var rateLimitingSettings = new RateLimitSettings(TimeSpan.FromSeconds(5), () =>
{
scenarioContext.ThrottleModeEntered = true;
return Task.FromResult(0);
},
() =>
{
scenarioContext.ThrottleModeEnded = true;
return Task.FromResult(0);
});
recoverability.OnConsecutiveFailures(2, rateLimitingSettings);
});
}

class InitiatingHandler : IHandleMessages<InitiatingMessage>
{
public InitiatingHandler(Context testContext)
{
this.testContext = testContext;
}
public Task Handle(InitiatingMessage initiatingMessage, IMessageHandlerContext context)
{
if (!testContext.ThrottleModeEntered)
{
throw new SimulatedException("THIS IS A MESSAGE THAT WILL NEVER SUCCEED");
}

testContext.MessageProcessedNormally = true;

return Task.FromResult(0);
}
Context testContext;
}
}

public class InitiatingMessage : IMessage
{
public Guid Id { get; set; }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,13 @@ namespace NServiceBus
{
public PublishOptions() { }
}
public class RateLimitSettings
{
public RateLimitSettings(System.Nullable<System.TimeSpan> timeToWaitBetweenThrottledAttempts = null, System.Func<System.Threading.Tasks.Task> onRateLimitStarted = null, System.Func<System.Threading.Tasks.Task> onRateLimitEnded = null) { }
public System.Func<System.Threading.Tasks.Task> OnRateLimitEnded { get; }
public System.Func<System.Threading.Tasks.Task> OnRateLimitStarted { get; }
public System.TimeSpan TimeToWaitBetweenThrottledAttempts { get; }
}
public class ReceivePipelineCompleted
{
public ReceivePipelineCompleted(NServiceBus.Transport.IncomingMessage processedMessage, System.DateTime startedAt, System.DateTime completedAt) { }
Expand Down Expand Up @@ -844,6 +851,7 @@ namespace NServiceBus
public NServiceBus.RecoverabilitySettings DisableLegacyRetriesSatellite() { }
public NServiceBus.RecoverabilitySettings Failed(System.Action<NServiceBus.RetryFailedSettings> customizations) { }
public NServiceBus.RecoverabilitySettings Immediate(System.Action<NServiceBus.ImmediateRetriesSettings> customizations) { }
public NServiceBus.RecoverabilitySettings OnConsecutiveFailures(int numberOfConsecutiveFailures, NServiceBus.RateLimitSettings settings) { }
}
public class static RecoverabilitySettingsExtensions
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,13 @@ namespace NServiceBus
{
public PublishOptions() { }
}
public class RateLimitSettings
{
public RateLimitSettings(System.Nullable<System.TimeSpan> timeToWaitBetweenThrottledAttempts = null, System.Func<System.Threading.Tasks.Task> onRateLimitStarted = null, System.Func<System.Threading.Tasks.Task> onRateLimitEnded = null) { }
public System.Func<System.Threading.Tasks.Task> OnRateLimitEnded { get; }
public System.Func<System.Threading.Tasks.Task> OnRateLimitStarted { get; }
public System.TimeSpan TimeToWaitBetweenThrottledAttempts { get; }
}
public class ReceivePipelineCompleted
{
public ReceivePipelineCompleted(NServiceBus.Transport.IncomingMessage processedMessage, System.DateTime startedAt, System.DateTime completedAt) { }
Expand Down Expand Up @@ -844,6 +851,7 @@ namespace NServiceBus
public NServiceBus.RecoverabilitySettings DisableLegacyRetriesSatellite() { }
public NServiceBus.RecoverabilitySettings Failed(System.Action<NServiceBus.RetryFailedSettings> customizations) { }
public NServiceBus.RecoverabilitySettings Immediate(System.Action<NServiceBus.ImmediateRetriesSettings> customizations) { }
public NServiceBus.RecoverabilitySettings OnConsecutiveFailures(int numberOfConsecutiveFailures, NServiceBus.RateLimitSettings settings) { }
}
public class static RecoverabilitySettingsExtensions
{
Expand Down
1 change: 0 additions & 1 deletion src/NServiceBus.Core.Tests/StandardsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using System.Runtime.CompilerServices;
using NServiceBus.Features;
using NServiceBus.Logging;
using NServiceBus.Pipeline;
using NUnit.Framework;

[TestFixture]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@
public class TransportReceiverTests
{
[SetUp]
public void SetUp()
public async Task SetUp()
{
pump = new Pump();

receiver = new TransportReceiver("FakeReceiver", pump, new PushSettings("queue", "queue", true, TransportTransactionMode.SendsAtomicWithReceive), new PushRuntimeSettings(), null, null, null);
receiver = new TransportReceiver("FakeReceiver", () => pump, new PushSettings("queue", "queue", true, TransportTransactionMode.SendsAtomicWithReceive), new PushRuntimeSettings(), null, null, null, new ConsecutiveFailuresConfiguration());
await receiver.Init();
}

[Test]
Expand Down Expand Up @@ -76,7 +77,7 @@ class Pump : IPushMessages, IDisposable

public Task Init(Func<MessageContext, Task> onMessage, Func<ErrorContext, Task<ErrorHandleResult>> onError, CriticalError criticalError, PushSettings settings)
{
throw new NotImplementedException();
return Task.FromResult(0);
}

public void Start(PushRuntimeSettings limitations)
Expand Down
Loading

0 comments on commit 7d94fcf

Please sign in to comment.