Skip to content

Commit

Permalink
Fixes the ingestion of physical messages containing multiple logical …
Browse files Browse the repository at this point in the history
…messages (#7129) (#7130)

* The IncomingPipelineMetricTags add a new tag only when the key is not yet in the collection for dealing with the use case of a payload containing multiple logical messages.

* Remove acceptance test for now

* Add failing test

---------

Co-authored-by: Laila Bougria <[email protected]>
Co-authored-by: sara pellegrini <[email protected]>
  • Loading branch information
3 people authored Jul 31, 2024
1 parent a8cfa7e commit 581218c
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
namespace NServiceBus.Core.Tests.Pipeline.Incoming;

using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using MessageInterfaces.MessageMapper.Reflection;
using NServiceBus.Pipeline;
using NUnit.Framework;
using Serialization;
using Testing;
using Transport;
using Unicast.Messages;

[TestFixture]
public class IncomingPipelineMetricTagsTests
{
[Test]
public async Task Should_not_fail_when_handling_more_than_one_logical_message()
{
var registry = new MessageMetadataRegistry(new Conventions().IsMessageType, true);

registry.RegisterMessageTypesFoundIn(
[
typeof(MyMessage)
]);

var context = new TestableIncomingPhysicalMessageContext
{
Message = new IncomingMessage("messageId", new Dictionary<string, string>
{
{ Headers.EnclosedMessageTypes, typeof(MyMessage).AssemblyQualifiedName }
}, new ReadOnlyMemory<byte>(new byte[] { 1 }))
};

var messageMapper = new MessageMapper();
var behavior = new DeserializeMessageConnector(new MessageDeserializerResolver(new FakeSerializer(), []), new LogicalMessageFactory(registry, messageMapper), registry, messageMapper, false);

await behavior.Invoke(context, c =>
{
c.Extensions.Get<IncomingPipelineMetricTags>().Add("Same", "Same");
return Task.CompletedTask;
});

Assert.That(true, Is.True);
}

class MyMessage : IMessage { }

class FakeSerializer : IMessageSerializer
{
public string ContentType { get; }
public void Serialize(object message, Stream stream) => throw new NotImplementedException();

public object[] Deserialize(ReadOnlyMemory<byte> body, IList<Type> messageTypes = null) => [new MyMessage(), new MyMessage()];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ public sealed class IncomingPipelineMetricTags
Dictionary<string, KeyValuePair<string, object?>>? tags;

/// <summary>
/// Adds the specified tag and value to the collection.
/// Adds the specified tag and value to the collection if not already present.
/// </summary>
/// <param name="tagKey">The tag to add.</param>
/// <param name="value">The value assigned to the tag.</param>
public void Add(string tagKey, object value)
{
tags ??= [];
tags.Add(tagKey, new(tagKey, value));
// We are using tryAdd to mitigate multiple logical messages transmitted in a single physical message
tags.TryAdd(tagKey, new(tagKey, value));
}

/// <summary>
Expand Down

0 comments on commit 581218c

Please sign in to comment.