Skip to content

Commit

Permalink
upgrading to latest transactional session testing Transaction.Current…
Browse files Browse the repository at this point in the history
… setup (#1177)

Co-authored-by: Szymon Pobiega <[email protected]>
Co-authored-by: Daniel Marbach <[email protected]>
# Conflicts:
#	src/NServiceBus.Persistence.Sql.TransactionalSession/NServiceBus.Persistence.Sql.TransactionalSession.csproj
#	src/TransactionalSession.MsSqlSystemDataClient.AcceptenceTests/When_using_transactional_session_with_transactionscope.cs
  • Loading branch information
danielmarbach authored Apr 6, 2023
1 parent 4e13d50 commit cf507a4
Show file tree
Hide file tree
Showing 5 changed files with 294 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>net472;net6.0</TargetFrameworks>
<SignAssembly>true</SignAssembly>
<AssemblyOriginatorKeyFile>$(SolutionDir)NServiceBus.snk</AssemblyOriginatorKeyFile>
<LangVersion>10.0</LangVersion>
<!-- We want the root namespace to match the transactional session one -->
<RootNamespace>NServiceBus.TransactionalSession</RootNamespace>
<Description>NServiceBus Transactional Session for SQL persistence.</Description>
</PropertyGroup>
<PropertyGroup>
<TargetFrameworks>net472;net6.0</TargetFrameworks>
<SignAssembly>true</SignAssembly>
<AssemblyOriginatorKeyFile>$(SolutionDir)NServiceBus.snk</AssemblyOriginatorKeyFile>
<LangVersion>10.0</LangVersion>
<!-- We want the root namespace to match the transactional session one -->
<RootNamespace>NServiceBus.TransactionalSession</RootNamespace>
<Description>NServiceBus Transactional Session for SQL persistence.</Description>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Particular.Packaging" Version="2.2.0" PrivateAssets="All" />
<PackageReference Include="NServiceBus.TransactionalSession" Version="[2.0.0, 3.0.0)" />
<ProjectReference Include="..\SqlPersistence\SqlPersistence.csproj" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\SqlPersistence\SqlPersistence.csproj" />
<PackageReference Include="NServiceBus.TransactionalSession" Version="[2.0.1, 3.0.0)" />
<PackageReference Include="Particular.Packaging" Version="2.3.0" PrivateAssets="All" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<PackageReference Include="Nunit" Version="3.13.3" />
<PackageReference Include="NServiceBus.AcceptanceTesting" Version="8.0.0" />
<PackageReference Include="GitHubActionsTestLogger" Version="2.0.1" />
<PackageReference Include="NServiceBus.TransactionalSession" Version="2.0.0" />
<PackageReference Include="NServiceBus.TransactionalSession" Version="2.0.1" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
namespace NServiceBus.TransactionalSession.AcceptanceTests
{
using System;
using System.Threading.Tasks;
using System.Transactions;
using AcceptanceTesting;
using AcceptanceTesting.Customization;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.DependencyInjection;
using NUnit.Framework;
using Persistence.Sql.ScriptBuilder;

public class When_using_transactional_session_with_transactionscope : NServiceBusAcceptanceTest
{
[OneTimeSetUp]
public void OneTimeSetup()
{
MsSqlMicrosoftDataClientConnectionBuilder.DropDbIfCollationIncorrect();
MsSqlMicrosoftDataClientConnectionBuilder.CreateDbIfNotExists();
}

[Test]
public async Task Should_provide_ambient_transactionscope()
{
await CreateOutboxTable(Conventions.EndpointNamingConvention(typeof(AnEndpoint)));

string rowId = Guid.NewGuid().ToString();

await Scenario.Define<Context>()
.WithEndpoint<AnEndpoint>(s => s.When(async (_, ctx) =>
{
using IServiceScope scope = ctx.ServiceProvider.CreateScope();
using var transactionalSession = scope.ServiceProvider.GetRequiredService<ITransactionalSession>();
var sessionOptions = new SqlPersistenceOpenSessionOptions();
await transactionalSession.Open(sessionOptions);
await transactionalSession.SendLocal(new SampleMessage());
var storageSession = transactionalSession.SynchronizedStorageSession.SqlPersistenceSession();
string insertText =
$@"IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='SomeTable' and xtype='U')
BEGIN
CREATE TABLE [dbo].[SomeTable]([Id] [nvarchar](50) NOT NULL)
END;
INSERT INTO [dbo].[SomeTable] VALUES ('{rowId}')";
using (var insertCommand = new SqlCommand(insertText, (SqlConnection)storageSession.Connection))
{
await insertCommand.ExecuteNonQueryAsync();
}
using (var __ = new TransactionScope(TransactionScopeOption.Suppress, TransactionScopeAsyncFlowOption.Enabled))
{
using var connection = MsSqlMicrosoftDataClientConnectionBuilder.Build();
await connection.OpenAsync();
using var queryCommand =
new SqlCommand($"SELECT TOP 1 [Id] FROM [dbo].[SomeTable] WITH (READPAST) WHERE [Id]='{rowId}' ", connection);
object result = await queryCommand.ExecuteScalarAsync();
Assert.AreEqual(null, result);
}
await transactionalSession.Commit().ConfigureAwait(false);
}))
.Done(c => c.MessageReceived)
.Run();

using var connection = MsSqlMicrosoftDataClientConnectionBuilder.Build();
await connection.OpenAsync();

using var queryCommand =
new SqlCommand($"SELECT TOP 1 [Id] FROM [dbo].[SomeTable] WHERE [Id]='{rowId}'", connection);
object result = await queryCommand.ExecuteScalarAsync();

Assert.AreEqual(rowId, result);
}


static async Task CreateOutboxTable(string endpointName)
{
string tablePrefix = TestTableNameCleaner.Clean(endpointName);
using var connection = MsSqlMicrosoftDataClientConnectionBuilder.Build();
await connection.OpenAsync().ConfigureAwait(false);

connection.ExecuteCommand(OutboxScriptBuilder.BuildDropScript(BuildSqlDialect.MsSqlServer), tablePrefix);
connection.ExecuteCommand(OutboxScriptBuilder.BuildCreateScript(BuildSqlDialect.MsSqlServer), tablePrefix);
}

class Context : ScenarioContext, IInjectServiceProvider
{
public bool MessageReceived { get; set; }
public bool CompleteMessageReceived { get; set; }
public IServiceProvider ServiceProvider { get; set; }
}

class AnEndpoint : EndpointConfigurationBuilder
{
public AnEndpoint() => EndpointSetup<TransactionSessionWithOutboxEndpoint>(c => c.EnableOutbox().UseTransactionScope());

class SampleHandler : IHandleMessages<SampleMessage>
{
public SampleHandler(Context testContext) => this.testContext = testContext;

public Task Handle(SampleMessage message, IMessageHandlerContext context)
{
testContext.MessageReceived = true;

return Task.CompletedTask;
}

readonly Context testContext;
}

class CompleteTestMessageHandler : IHandleMessages<CompleteTestMessage>
{
public CompleteTestMessageHandler(Context context) => testContext = context;

public Task Handle(CompleteTestMessage message, IMessageHandlerContext context)
{
testContext.CompleteMessageReceived = true;

return Task.CompletedTask;
}

readonly Context testContext;
}
}

class SampleMessage : ICommand
{
}

class CompleteTestMessage : ICommand
{
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<PackageReference Include="Nunit" Version="3.13.3" />
<PackageReference Include="NServiceBus.AcceptanceTesting" Version="8.0.0" />
<PackageReference Include="GitHubActionsTestLogger" Version="2.0.1" />
<PackageReference Include="NServiceBus.TransactionalSession" Version="2.0.0" />
<PackageReference Include="NServiceBus.TransactionalSession" Version="2.0.1" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
namespace NServiceBus.TransactionalSession.AcceptanceTests
{
using System;
using System.Threading.Tasks;
using System.Transactions;
using AcceptanceTesting;
using AcceptanceTesting.Customization;
using System.Data.SqlClient;
using Microsoft.Extensions.DependencyInjection;
using NUnit.Framework;
using Persistence.Sql.ScriptBuilder;

public class When_using_transactional_session_with_transactionscope : NServiceBusAcceptanceTest
{
[OneTimeSetUp]
public void OneTimeSetup()
{
MsSqlSystemDataClientConnectionBuilder.DropDbIfCollationIncorrect();
MsSqlSystemDataClientConnectionBuilder.CreateDbIfNotExists();
}

[Test]
public async Task Should_provide_ambient_transactionscope()
{
await CreateOutboxTable(Conventions.EndpointNamingConvention(typeof(AnEndpoint)));

string rowId = Guid.NewGuid().ToString();

await Scenario.Define<Context>()
.WithEndpoint<AnEndpoint>(s => s.When(async (_, ctx) =>
{
using IServiceScope scope = ctx.ServiceProvider.CreateScope();
using var transactionalSession = scope.ServiceProvider.GetRequiredService<ITransactionalSession>();
var sessionOptions = new SqlPersistenceOpenSessionOptions();
await transactionalSession.Open(sessionOptions);
await transactionalSession.SendLocal(new SampleMessage());
var storageSession = transactionalSession.SynchronizedStorageSession.SqlPersistenceSession();
string insertText =
$@"IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='SomeTable' and xtype='U')
BEGIN
CREATE TABLE [dbo].[SomeTable]([Id] [nvarchar](50) NOT NULL)
END;
INSERT INTO [dbo].[SomeTable] VALUES ('{rowId}')";
using (var insertCommand = new SqlCommand(insertText, (SqlConnection)storageSession.Connection))
{
await insertCommand.ExecuteNonQueryAsync();
}
using (var __ = new TransactionScope(TransactionScopeOption.Suppress, TransactionScopeAsyncFlowOption.Enabled))
{
using var connection = MsSqlSystemDataClientConnectionBuilder.Build();
await connection.OpenAsync();
using var queryCommand =
new SqlCommand($"SELECT TOP 1 [Id] FROM [dbo].[SomeTable] WITH (READPAST) WHERE [Id]='{rowId}' ", connection);
object result = await queryCommand.ExecuteScalarAsync();
Assert.AreEqual(null, result);
}
await transactionalSession.Commit().ConfigureAwait(false);
}))
.Done(c => c.MessageReceived)
.Run();

using var connection = MsSqlSystemDataClientConnectionBuilder.Build();
await connection.OpenAsync();

using var queryCommand =
new SqlCommand($"SELECT TOP 1 [Id] FROM [dbo].[SomeTable] WHERE [Id]='{rowId}'", connection);
object result = await queryCommand.ExecuteScalarAsync();

Assert.AreEqual(rowId, result);
}


static async Task CreateOutboxTable(string endpointName)
{
string tablePrefix = TestTableNameCleaner.Clean(endpointName);
using var connection = MsSqlSystemDataClientConnectionBuilder.Build();
await connection.OpenAsync().ConfigureAwait(false);

connection.ExecuteCommand(OutboxScriptBuilder.BuildDropScript(BuildSqlDialect.MsSqlServer), tablePrefix);
connection.ExecuteCommand(OutboxScriptBuilder.BuildCreateScript(BuildSqlDialect.MsSqlServer), tablePrefix);
}

class Context : ScenarioContext, IInjectServiceProvider
{
public bool MessageReceived { get; set; }
public bool CompleteMessageReceived { get; set; }
public IServiceProvider ServiceProvider { get; set; }
}

class AnEndpoint : EndpointConfigurationBuilder
{
public AnEndpoint() => EndpointSetup<TransactionSessionWithOutboxEndpoint>(c => c.EnableOutbox().UseTransactionScope());

class SampleHandler : IHandleMessages<SampleMessage>
{
public SampleHandler(Context testContext) => this.testContext = testContext;

public Task Handle(SampleMessage message, IMessageHandlerContext context)
{
testContext.MessageReceived = true;

return Task.CompletedTask;
}

readonly Context testContext;
}

class CompleteTestMessageHandler : IHandleMessages<CompleteTestMessage>
{
public CompleteTestMessageHandler(Context context) => testContext = context;

public Task Handle(CompleteTestMessage message, IMessageHandlerContext context)
{
testContext.CompleteMessageReceived = true;

return Task.CompletedTask;
}

readonly Context testContext;
}
}

class SampleMessage : ICommand
{
}

class CompleteTestMessage : ICommand
{
}
}
}

0 comments on commit cf507a4

Please sign in to comment.