Skip to content

Commit

Permalink
Merge pull request #7053 from Particular/databus-acceptance-tests-8.2
Browse files Browse the repository at this point in the history
FileShareDataBus acceptance and unit tests for multi environments
  • Loading branch information
abparticular authored Jun 4, 2024
2 parents d676c7c + 45845d1 commit cda8c7e
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
namespace NServiceBus.AcceptanceTests.DataBus
{
using System;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using AcceptanceTesting;
using AcceptanceTesting.Customization;
using EndpointTemplates;
using MessageMutator;
using NUnit.Framework;

public class When_sending_databus_properties_from_different_environments : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_receive_messages_with_largepayload_correctly_from_windows()
{
var payloadToSend = new byte[PayloadSize];

var context = await Scenario.Define<Context>()
.WithEndpoint<WindowsSender>(b => b.When(session => session.Send(new MyMessageWithLargePayload
{
Payload = new DataBusProperty<byte[]>(payloadToSend)
})))
.WithEndpoint<Receiver>()
.Done(c => c.ReceivedPayload != null)
.Run();

Assert.AreEqual(payloadToSend, context.ReceivedPayload, "The large payload should be marshalled correctly using the databus");
}

[Test]
public async Task Should_receive_messages_with_largepayload_correctly_from_linux()
{
var payloadToSend = new byte[PayloadSize];

var context = await Scenario.Define<Context>()
.WithEndpoint<LinuxSender>(b => b.When(session => session.Send(new MyMessageWithLargePayload
{
Payload = new DataBusProperty<byte[]>(payloadToSend)
})))
.WithEndpoint<Receiver>()
.Done(c => c.ReceivedPayload != null)
.Run();

Assert.AreEqual(payloadToSend, context.ReceivedPayload, "The large payload should be marshalled correctly using the databus");
}

const int PayloadSize = 500;

public class Context : ScenarioContext
{
public byte[] ReceivedPayload { get; set; }
}

public class WindowsSender : EndpointConfigurationBuilder
{
public WindowsSender()
{
EndpointSetup<DefaultServer>(builder =>
{
var basePath = Path.Combine(TestContext.CurrentContext.TestDirectory, "databus", "sender");
builder.UseDataBus<FileShareDataBus, SystemJsonDataBusSerializer>().BasePath(basePath);
builder.UseSerialization<SystemJsonSerializer>();
builder.ConfigureRouting().RouteToEndpoint(typeof(MyMessageWithLargePayload), typeof(Receiver));
builder.RegisterMessageMutator(new MutateOutgoingForWindows());
});
}

public class MutateOutgoingForWindows : IMutateOutgoingTransportMessages
{
public Task MutateOutgoing(MutateOutgoingTransportMessageContext context)
{
var databusHeaderKey = context.OutgoingHeaders.FirstOrDefault(f => f.Key.StartsWith("NServiceBus.DataBus.", StringComparison.OrdinalIgnoreCase)).Key;
context.OutgoingHeaders[databusHeaderKey] = context.OutgoingHeaders[databusHeaderKey].Replace("/", "\\");
return Task.CompletedTask;
}
}
}


public class LinuxSender : EndpointConfigurationBuilder
{
public LinuxSender()
{
EndpointSetup<DefaultServer>(builder =>
{
var basePath = Path.Combine(TestContext.CurrentContext.TestDirectory, "databus", "sender");
builder.UseDataBus<FileShareDataBus, SystemJsonDataBusSerializer>().BasePath(basePath);
builder.UseSerialization<SystemJsonSerializer>();
builder.ConfigureRouting().RouteToEndpoint(typeof(MyMessageWithLargePayload), typeof(Receiver));
builder.RegisterMessageMutator(new MutateOutgoingForLinux());
});
}

public class MutateOutgoingForLinux : IMutateOutgoingTransportMessages
{
public Task MutateOutgoing(MutateOutgoingTransportMessageContext context)
{
var databusHeaderKey = context.OutgoingHeaders.FirstOrDefault(f => f.Key.StartsWith("NServiceBus.DataBus.", StringComparison.OrdinalIgnoreCase)).Key;
context.OutgoingHeaders[databusHeaderKey] = context.OutgoingHeaders[databusHeaderKey].Replace("\\", "/");
return Task.CompletedTask;
}
}
}

public class Receiver : EndpointConfigurationBuilder
{
public Receiver()
{
EndpointSetup<DefaultServer>(builder =>
{
var basePath = Path.Combine(TestContext.CurrentContext.TestDirectory, "databus", "sender");
builder.UseDataBus<FileShareDataBus, SystemJsonDataBusSerializer>().BasePath(basePath);
builder.UseSerialization<SystemJsonSerializer>();
builder.RegisterMessageMutator(new Mutator());
});
}

public class MyMessageHandler : IHandleMessages<MyMessageWithLargePayload>
{
public MyMessageHandler(Context context)
{
testContext = context;
}

public Task Handle(MyMessageWithLargePayload messageWithLargePayload, IMessageHandlerContext context)
{
testContext.ReceivedPayload = messageWithLargePayload.Payload.Value;

return Task.CompletedTask;
}

Context testContext;
}
public class Mutator : IMutateIncomingTransportMessages
{
public Task MutateIncoming(MutateIncomingTransportMessageContext context)
{
if (context.Body.Length > PayloadSize)
{
throw new Exception("The message body is too large, which means the DataBus was not used to transfer the payload.");
}
return Task.CompletedTask;
}
}
}

public class MyMessageWithLargePayload : ICommand
{
public DataBusProperty<byte[]> Payload { get; set; }
}
}
}
14 changes: 14 additions & 0 deletions src/NServiceBus.Core.Tests/DataBus/FileShare/AcceptanceTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,20 @@ public async Task Should_handle_be_able_to_read_stored_values_concurrently()
});
}

[TestCase('/')]
[TestCase('\\')]
public async Task Should_handle_be_able_to_read_stored_values_when_received_from_different_environments(char pathSeparator)
{
const string content = "Test";

var key = await Put(content, TimeSpan.MaxValue);
using (var stream = await dataBus.Get(key.Replace(Path.DirectorySeparatorChar, pathSeparator)))
using (var streamReader = new StreamReader(stream))
{
Assert.AreEqual(await streamReader.ReadToEndAsync(), content);
}
}

[Test]
public async Task Should_handle_max_ttl()
{
Expand Down

0 comments on commit cda8c7e

Please sign in to comment.