Skip to content

Commit

Permalink
Fix outbox transaction transaction scope handling (#1197) (#1201)
Browse files Browse the repository at this point in the history
* Adjust the tests to verify before and after transactional session commit behavior

* Fix TransactionScopeSqlOutboxTransaction to complete the scope on commit

* Remove unnecessary method which was no longer called

* Remove no longer used dispose callback

* Async variants
  • Loading branch information
danielmarbach authored May 17, 2023
1 parent 52b8cee commit a8adf48
Show file tree
Hide file tree
Showing 17 changed files with 653 additions and 172 deletions.
2 changes: 1 addition & 1 deletion src/NServiceBus.Persistence.Sql.sln
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TransactionalSession.MsSqlM
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TransactionalSession.Tests", "TransactionalSession.Tests\TransactionalSession.Tests.csproj", "{DE280E3C-E711-4543-BF53-09D7840C3033}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TransactionalSession.MsSqlSystemDataClient.AcceptenceTests", "TransactionalSession.MsSqlSystemDataClient.AcceptenceTests\TransactionalSession.MsSqlSystemDataClient.AcceptenceTests.csproj", "{AB33F7C1-FF29-4958-B176-81AC227451A5}"
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TransactionalSession.MsSqlSystemDataClient.AcceptanceTests", "TransactionalSession.MsSqlSystemDataClient.AcceptanceTests\TransactionalSession.MsSqlSystemDataClient.AcceptanceTests.csproj", "{AB33F7C1-FF29-4958-B176-81AC227451A5}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down
14 changes: 0 additions & 14 deletions src/SqlPersistence/Outbox/AdoNetSqlOutboxTransaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@
using System.Threading;
using System.Threading.Tasks;
using NServiceBus.Extensibility;
using NServiceBus.Logging;
using NServiceBus.Outbox;

class AdoNetSqlOutboxTransaction : ISqlOutboxTransaction
{
static ILog Log = LogManager.GetLogger<AdoNetSqlOutboxTransaction>();

IConnectionManager connectionManager;
IsolationLevel isolationLevel;
ConcurrencyControlStrategy concurrencyControlStrategy;
Expand Down Expand Up @@ -41,17 +38,6 @@ public async Task Begin(ContextBag context, CancellationToken cancellationToken
public Task Complete(OutboxMessage outboxMessage, ContextBag context, CancellationToken cancellationToken = default) =>
concurrencyControlStrategy.Complete(outboxMessage, Connection, Transaction, context, cancellationToken);

public void BeginSynchronizedSession(ContextBag context)
{
if (System.Transactions.Transaction.Current != null)
{
Log.Warn("The endpoint is configured to use Outbox but a TransactionScope has been detected. " +
"In order to make the Outbox compatible with TransactionScope, use " +
"config.EnableOutbox().UseTransactionScope(). " +
"Do not use config.UnitOfWork().WrapHandlersInATransactionScope().");
}
}

public void Dispose()
{
Transaction?.Dispose();
Expand Down
1 change: 0 additions & 1 deletion src/SqlPersistence/Outbox/ISqlOutboxTransaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,4 @@ interface ISqlOutboxTransaction : IOutboxTransaction
void Prepare(ContextBag context);
Task Begin(ContextBag context, CancellationToken cancellationToken = default);
Task Complete(OutboxMessage outboxMessage, ContextBag context, CancellationToken cancellationToken = default);
void BeginSynchronizedSession(ContextBag context);
}
31 changes: 6 additions & 25 deletions src/SqlPersistence/Outbox/TransactionScopeSqlOutboxTransaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,15 @@
using System.Threading.Tasks;
using System.Transactions;
using NServiceBus.Extensibility;
using NServiceBus.Logging;
using NServiceBus.Outbox;

class TransactionScopeSqlOutboxTransaction : ISqlOutboxTransaction
{
static ILog Log = LogManager.GetLogger<TransactionScopeSqlOutboxTransaction>();

IConnectionManager connectionManager;
IsolationLevel isolationLevel;
ConcurrencyControlStrategy concurrencyControlStrategy;
TransactionScope transactionScope;
Transaction ambientTransaction;
bool commit;
TimeSpan transactionTimeout;

public TransactionScopeSqlOutboxTransaction(ConcurrencyControlStrategy concurrencyControlStrategy,
Expand Down Expand Up @@ -55,34 +51,19 @@ public async Task Begin(ContextBag context, CancellationToken cancellationToken
public Task Complete(OutboxMessage outboxMessage, ContextBag context, CancellationToken cancellationToken = default) =>
concurrencyControlStrategy.Complete(outboxMessage, Connection, null, context, cancellationToken);

public void BeginSynchronizedSession(ContextBag context)
{
if (System.Transactions.Transaction.Current != null && System.Transactions.Transaction.Current != ambientTransaction)
{
Log.Warn("The endpoint is configured to use Outbox with TransactionScope but a different TransactionScope " +
"has been detected in the current context. " +
"Do not use config.UnitOfWork().WrapHandlersInATransactionScope().");
}
}

public void Dispose()
{
transactionScope?.Dispose();
Connection?.Dispose();
if (transactionScope != null)
{
if (commit)
{
transactionScope.Complete();
}
transactionScope.Dispose();
transactionScope = null;
ambientTransaction = null;
}
transactionScope = null;
ambientTransaction = null;
}

public Task Commit(CancellationToken cancellationToken = default)
{
commit = true;
transactionScope?.Complete();
// we need to dispose it after completion in order to execute the transaction after marking it as completed
transactionScope?.Dispose();
return Task.CompletedTask;
}
}
26 changes: 15 additions & 11 deletions src/SqlPersistence/SynchronizedStorage/StorageSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ class StorageSession : ICompletableSynchronizedStorageSession, ISqlStorageSessio
{
bool ownsTransaction;
Func<ISqlStorageSession, CancellationToken, Task> onSaveChangesCallback = (_, __) => Task.CompletedTask;
Action disposedCallback = () => { };
readonly IConnectionManager connectionManager;
readonly SqlDialect dialect;

Expand Down Expand Up @@ -47,7 +46,7 @@ public void OnSaveChanges(Func<ISqlStorageSession, CancellationToken, Task> call
#pragma warning restore PS0013 // A Func used as a method parameter with a Task, ValueTask, or ValueTask<T> return type argument should have at least one CancellationToken parameter type argument unless it has a parameter type argument implementing ICancellableContext

public ValueTask<bool> TryOpen(IOutboxTransaction transaction, ContextBag context,
CancellationToken cancellationToken = new CancellationToken())
CancellationToken cancellationToken = default)
{
if (transaction is not ISqlOutboxTransaction outboxTransaction)
{
Expand All @@ -61,7 +60,7 @@ public ValueTask<bool> TryOpen(IOutboxTransaction transaction, ContextBag contex
}

public async ValueTask<bool> TryOpen(TransportTransaction transportTransaction, ContextBag context,
CancellationToken cancellationToken = new CancellationToken())
CancellationToken cancellationToken = default)
{
(bool wasAdapted, DbConnection connection, DbTransaction transaction, bool ownsTx) =
await dialect.TryAdaptTransportConnection(transportTransaction, context, connectionManager, cancellationToken)
Expand All @@ -77,21 +76,30 @@ await dialect.TryAdaptTransportConnection(transportTransaction, context, connect
return true;
}

public async Task Open(ContextBag contextBag, CancellationToken cancellationToken = new CancellationToken())
public async Task Open(ContextBag contextBag, CancellationToken cancellationToken = default)
{
Connection = await connectionManager.OpenConnection(contextBag.GetIncomingMessage(), cancellationToken).ConfigureAwait(false);
#if NET
Transaction = await Connection.BeginTransactionAsync(cancellationToken).ConfigureAwait(false);
#else
Transaction = Connection.BeginTransaction();
#endif
ownsTransaction = true;
}

public async Task CompleteAsync(CancellationToken cancellationToken = default)
{
await onSaveChangesCallback(this, cancellationToken).ConfigureAwait(false);

if (ownsTransaction)
if (ownsTransaction && Transaction != null)
{
Transaction?.Commit();
Transaction?.Dispose();
#if NET
await Transaction.CommitAsync(cancellationToken).ConfigureAwait(false);
await Transaction.DisposeAsync().ConfigureAwait(false);
#else
Transaction.Commit();
Transaction.Dispose();
#endif
Connection.Dispose();
}
}
Expand All @@ -103,9 +111,5 @@ public void Dispose()
Transaction?.Dispose();
Connection?.Dispose();
}

disposedCallback();
}

public void OnDisposed(Action callback) => disposedCallback = callback;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
namespace NServiceBus.TransactionalSession.AcceptanceTests
{
using System.Threading.Tasks;
using AcceptanceTesting.Customization;
using Microsoft.Data.SqlClient;
using Persistence.Sql.ScriptBuilder;

static class OutboxHelpers
{
public static async Task CreateOutboxTable<TEndpoint>()
=> await CreateOutboxTable(Conventions.EndpointNamingConvention(typeof(TEndpoint)));

public static async Task CreateOutboxTable(string endpointName)
{
string tablePrefix = TestTableNameCleaner.Clean(endpointName);
using var connection = MsSqlMicrosoftDataClientConnectionBuilder.Build();
await connection.OpenAsync().ConfigureAwait(false);

connection.ExecuteCommand(OutboxScriptBuilder.BuildDropScript(BuildSqlDialect.MsSqlServer), tablePrefix);
connection.ExecuteCommand(OutboxScriptBuilder.BuildCreateScript(BuildSqlDialect.MsSqlServer), tablePrefix);
}

public static async Task CreateDataTable()
{
var createTable = @"IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='SomeTable' and xtype='U')
BEGIN
CREATE TABLE [dbo].[SomeTable]([Id] [nvarchar](50) NOT NULL)
END;";
using var connection = MsSqlMicrosoftDataClientConnectionBuilder.Build();
await connection.OpenAsync();
using var createTableCommand = new SqlCommand(createTable, connection);
await createTableCommand.ExecuteNonQueryAsync();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
public class When_using_transactional_session : NServiceBusAcceptanceTest
{
[OneTimeSetUp]
public void OneTimeSetup()
public async Task OneTimeSetup()
{
MsSqlMicrosoftDataClientConnectionBuilder.DropDbIfCollationIncorrect();
MsSqlMicrosoftDataClientConnectionBuilder.CreateDbIfNotExists();

await OutboxHelpers.CreateDataTable();
}

[TestCase(true)]
Expand Down Expand Up @@ -57,19 +59,21 @@ CREATE TABLE [dbo].[SomeTable]([Id] [nvarchar](50) NOT NULL)
await insertCommand.ExecuteNonQueryAsync();
}
// the transactional operations should not be visible before commit
var resultBeforeCommit = await QueryInsertedEntry(rowId);
Assert.AreEqual(null, resultBeforeCommit);
await transactionalSession.Commit().ConfigureAwait(false);
// the transactional operations should be visible after commit
var resultBeforeAfterCommit = await QueryInsertedEntry(rowId);
Assert.AreEqual(rowId, resultBeforeAfterCommit);
}))
.Done(c => c.MessageReceived)
.Run();

using var connection = MsSqlMicrosoftDataClientConnectionBuilder.Build();
await connection.OpenAsync();

using var queryCommand =
new SqlCommand($"SELECT TOP 1 [Id] FROM [dbo].[SomeTable] WHERE [Id]='{rowId}'", connection);
object result = await queryCommand.ExecuteScalarAsync();

Assert.AreEqual(rowId, result);
var resultAfterDispose = await QueryInsertedEntry(rowId);
Assert.AreEqual(rowId, resultAfterDispose);
}

[TestCase(true)]
Expand All @@ -96,12 +100,7 @@ await Scenario.Define<Context>()
ISqlStorageSession storageSession = scope.ServiceProvider.GetRequiredService<ISqlStorageSession>();
string insertText =
$@"IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='SomeTable' and xtype='U')
BEGIN
CREATE TABLE [dbo].[SomeTable]([Id] [nvarchar](50) NOT NULL)
END;
INSERT INTO [dbo].[SomeTable] VALUES ('{rowId}')";
string insertText = $@"INSERT INTO [dbo].[SomeTable] VALUES ('{rowId}')";
using (var insertCommand = new SqlCommand(insertText,
(SqlConnection)storageSession.Connection,
Expand All @@ -110,19 +109,33 @@ CREATE TABLE [dbo].[SomeTable]([Id] [nvarchar](50) NOT NULL)
await insertCommand.ExecuteNonQueryAsync();
}
// the transactional operations should not be visible before commit
var resultBeforeCommit = await QueryInsertedEntry(rowId);
Assert.AreEqual(null, resultBeforeCommit);
await transactionalSession.Commit().ConfigureAwait(false);
// the transactional operations should be visible after commit
var resultBeforeAfterCommit = await QueryInsertedEntry(rowId);
Assert.AreEqual(rowId, resultBeforeAfterCommit);
}))
.Done(c => c.MessageReceived)
.Run();

var resultAfterDispose = await QueryInsertedEntry(rowId);
Assert.AreEqual(rowId, resultAfterDispose);
}

static async Task<string> QueryInsertedEntry(string rowId)
{
using var connection = MsSqlMicrosoftDataClientConnectionBuilder.Build();

await connection.OpenAsync();

using var queryCommand =
new SqlCommand($"SELECT TOP 1 [Id] FROM [dbo].[SomeTable] WHERE [Id]='{rowId}'", connection);
object result = await queryCommand.ExecuteScalarAsync();

Assert.AreEqual(rowId, result);
new SqlCommand($"SET TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT TOP 1 [Id] FROM [dbo].[SomeTable] WITH (READPAST) WHERE [Id]='{rowId}' ",
connection);
return (string)await queryCommand.ExecuteScalarAsync();
}

[TestCase(true)]
Expand Down
Loading

0 comments on commit a8adf48

Please sign in to comment.