From 81fb9bafb6d73bac86acecb132fb108954926bff Mon Sep 17 00:00:00 2001 From: Jon Skeet Date: Tue, 30 Jul 2024 11:35:39 +0100 Subject: [PATCH] Update CloudNative.CloudEvents.Mqtt to MQTTnet version 4.3.6.1152 This involves a new major version of CloudNative.CloudEvents.Mqtt. --- .../CloudNative.CloudEvents.Mqtt.csproj | 5 ++++- .../MqttExtensions.cs | 10 ++++----- .../Core/BinaryDataUtilities.cs | 11 +++++++++- .../Mqtt/MqttTest.cs | 22 ++++++++++--------- 4 files changed, 31 insertions(+), 17 deletions(-) diff --git a/src/CloudNative.CloudEvents.Mqtt/CloudNative.CloudEvents.Mqtt.csproj b/src/CloudNative.CloudEvents.Mqtt/CloudNative.CloudEvents.Mqtt.csproj index 5638bd3..2e62d13 100644 --- a/src/CloudNative.CloudEvents.Mqtt/CloudNative.CloudEvents.Mqtt.csproj +++ b/src/CloudNative.CloudEvents.Mqtt/CloudNative.CloudEvents.Mqtt.csproj @@ -5,11 +5,14 @@ MQTT extensions for CloudNative.CloudEvents cncf;cloudnative;cloudevents;events;mqtt 8.0 + 3.$(MinorVersion).$(PatchVersion) + + 2.$(PackageValidationMinor).0 enable - + diff --git a/src/CloudNative.CloudEvents.Mqtt/MqttExtensions.cs b/src/CloudNative.CloudEvents.Mqtt/MqttExtensions.cs index 9effda8..57f7f86 100644 --- a/src/CloudNative.CloudEvents.Mqtt/MqttExtensions.cs +++ b/src/CloudNative.CloudEvents.Mqtt/MqttExtensions.cs @@ -1,4 +1,4 @@ -// Copyright (c) Cloud Native Foundation. +// Copyright (c) Cloud Native Foundation. // Licensed under the Apache 2.0 license. // See LICENSE file in the project root for full license information. @@ -39,10 +39,10 @@ public static CloudEvent ToCloudEvent(this MqttApplicationMessage message, Validation.CheckNotNull(message, nameof(message)); // TODO: Determine if there's a sensible content type we should apply. - return formatter.DecodeStructuredModeMessage(message.Payload, contentType: null, extensionAttributes); + return formatter.DecodeStructuredModeMessage(message.PayloadSegment, contentType: null, extensionAttributes); } - // TODO: Update to a newer version of MQTTNet and support both binary and structured mode? + // TODO: Support both binary and structured mode. /// /// Converts a CloudEvent to . /// @@ -61,11 +61,11 @@ public static MqttApplicationMessage ToMqttApplicationMessage(this CloudEvent cl return new MqttApplicationMessage { Topic = topic, - Payload = BinaryDataUtilities.AsArray(formatter.EncodeStructuredModeMessage(cloudEvent, out _)) + PayloadSegment = BinaryDataUtilities.GetArraySegment(formatter.EncodeStructuredModeMessage(cloudEvent, out _)) }; default: throw new ArgumentOutOfRangeException(nameof(contentMode), $"Unsupported content mode: {contentMode}"); } } } -} \ No newline at end of file +} diff --git a/src/CloudNative.CloudEvents/Core/BinaryDataUtilities.cs b/src/CloudNative.CloudEvents/Core/BinaryDataUtilities.cs index c1915a9..5dfe119 100644 --- a/src/CloudNative.CloudEvents/Core/BinaryDataUtilities.cs +++ b/src/CloudNative.CloudEvents/Core/BinaryDataUtilities.cs @@ -114,7 +114,16 @@ public static byte[] AsArray(ReadOnlyMemory memory) } // Note: when this returns, the Array property of the returned segment is guaranteed not to be null. - private static ArraySegment GetArraySegment(ReadOnlyMemory memory) => + + /// + /// Returns the data from as a byte array, return the underlying array + /// if there is one, or creating a copy otherwise. This method should be used with care, due to the + /// "sometimes shared, sometimes not" nature of the result. (It is generally safe to use this with the result + /// of encoding a CloudEvent, assuming the same memory is not used elsewhere.) + /// + /// The memory to obtain the data from. + /// The data in as an array segment. + public static ArraySegment GetArraySegment(ReadOnlyMemory memory) => MemoryMarshal.TryGetArray(memory, out var segment) && segment.Array is not null ? segment : new ArraySegment(memory.ToArray()); diff --git a/test/CloudNative.CloudEvents.UnitTests/Mqtt/MqttTest.cs b/test/CloudNative.CloudEvents.UnitTests/Mqtt/MqttTest.cs index d9dd3af..071d4f7 100644 --- a/test/CloudNative.CloudEvents.UnitTests/Mqtt/MqttTest.cs +++ b/test/CloudNative.CloudEvents.UnitTests/Mqtt/MqttTest.cs @@ -1,12 +1,10 @@ -// Copyright (c) Cloud Native Foundation. +// Copyright (c) Cloud Native Foundation. // Licensed under the Apache 2.0 license. // See LICENSE file in the project root for full license information. using CloudNative.CloudEvents.NewtonsoftJson; using MQTTnet; using MQTTnet.Client; -using MQTTnet.Client.Options; -using MQTTnet.Client.Receiving; using MQTTnet.Server; using System; using System.Net.Mime; @@ -18,16 +16,17 @@ namespace CloudNative.CloudEvents.Mqtt.UnitTests { public class MqttTest : IDisposable { - private readonly IMqttServer mqttServer; + private readonly MqttServer mqttServer; public MqttTest() { var optionsBuilder = new MqttServerOptionsBuilder() .WithConnectionBacklog(100) + .WithDefaultEndpoint() .WithDefaultEndpointPort(52355); - this.mqttServer = new MqttFactory().CreateMqttServer(); - mqttServer.StartAsync(optionsBuilder.Build()).GetAwaiter().GetResult(); + this.mqttServer = new MqttFactory().CreateMqttServer(optionsBuilder.Build()); + mqttServer.StartAsync().GetAwaiter().GetResult(); } public void Dispose() @@ -55,14 +54,17 @@ public async Task MqttSendTest() var options = new MqttClientOptionsBuilder() .WithClientId("Client1") - .WithTcpServer("localhost", 52355) + .WithTcpServer("127.0.0.1", 52355) .WithCleanSession() .Build(); TaskCompletionSource tcs = new TaskCompletionSource(); await client.ConnectAsync(options); - client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate( - args => tcs.SetResult(args.ApplicationMessage.ToCloudEvent(jsonEventFormatter))); + client.ApplicationMessageReceivedAsync += args => + { + tcs.SetResult(args.ApplicationMessage.ToCloudEvent(jsonEventFormatter)); + return Task.CompletedTask; + }; var result = await client.SubscribeAsync("abc"); await client.PublishAsync(cloudEvent.ToMqttApplicationMessage(ContentMode.Structured, new JsonEventFormatter(), topic: "abc")); @@ -79,4 +81,4 @@ public async Task MqttSendTest() Assert.Equal("value", (string?) receivedCloudEvent["comexampleextension1"]); } } -} \ No newline at end of file +}