Skip to content

Commit

Permalink
Incomplete implementation of MQTT binary mode
Browse files Browse the repository at this point in the history
In some ways this *is* complete - but it assumes a v5 client.
Instead, we should only populate ContentType (and allow binary mode) when using v5.
This probably requires some API design work.

Will fix cloudevents#147 when complete.
  • Loading branch information
jskeet committed Jul 30, 2024
1 parent 81fb9ba commit 5964989
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<TargetFrameworks>netstandard2.0;netstandard2.1;net8.0</TargetFrameworks>
<Description>MQTT extensions for CloudNative.CloudEvents</Description>
<PackageTags>cncf;cloudnative;cloudevents;events;mqtt</PackageTags>
<LangVersion>8.0</LangVersion>
<LangVersion>latest</LangVersion>
<Version>3.$(MinorVersion).$(PatchVersion)</Version>
<!-- After the first release of v3, we'll change the major here to 3. -->
<PackageValidationBaselineVersion>2.$(PackageValidationMinor).0</PackageValidationBaselineVersion>
Expand Down
49 changes: 43 additions & 6 deletions src/CloudNative.CloudEvents.Mqtt/MqttExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@

using CloudNative.CloudEvents.Core;
using MQTTnet;
using MQTTnet.Packets;
using System;
using System.Collections.Generic;
using System.Linq;

namespace CloudNative.CloudEvents.Mqtt
{
Expand Down Expand Up @@ -38,16 +40,40 @@ public static CloudEvent ToCloudEvent(this MqttApplicationMessage message,
Validation.CheckNotNull(formatter, nameof(formatter));
Validation.CheckNotNull(message, nameof(message));

// TODO: Determine if there's a sensible content type we should apply.
return formatter.DecodeStructuredModeMessage(message.PayloadSegment, contentType: null, extensionAttributes);
// Check if the spec version is specified in user properties.
// If it is, we'll assume it's binary mode. Otherwise, we'll assume it's structured mode.
if (message.UserProperties?.FirstOrDefault(p => p.Name == CloudEventsSpecVersion.SpecVersionAttribute.Name)
is not MqttUserProperty specVersionProperty)
{
// TODO: Determine if there's a sensible content type we should apply.
return formatter.DecodeStructuredModeMessage(message.PayloadSegment, contentType: null, extensionAttributes);
}

var specVersion = CloudEventsSpecVersion.FromVersionId(specVersionProperty.Value)
?? throw new ArgumentException($"Unknown CloudEvents spec version '{specVersionProperty.Value}'", nameof(message));
var cloudEvent = new CloudEvent(specVersion, extensionAttributes);

foreach (var userProperty in message.UserProperties)
{
if (userProperty == specVersionProperty)
{
continue;
}
cloudEvent.SetAttributeFromString(userProperty.Name, userProperty.Value);
}

if (message.PayloadSegment.Array is not null)
{
formatter.DecodeBinaryModeEventData(message.PayloadSegment, cloudEvent);
}
return cloudEvent;
}

// TODO: Support both binary and structured mode.
/// <summary>
/// Converts a CloudEvent to <see cref="MqttApplicationMessage"/>.
/// </summary>
/// <param name="cloudEvent">The CloudEvent to convert. Must not be null, and must be a valid CloudEvent.</param>
/// <param name="contentMode">Content mode. Currently only structured mode is supported.</param>
/// <param name="contentMode">Content mode. Both structured mode and binary mode are supported.</param>
/// <param name="formatter">The formatter to use within the conversion. Must not be null.</param>
/// <param name="topic">The MQTT topic for the message. May be null.</param>
public static MqttApplicationMessage ToMqttApplicationMessage(this CloudEvent cloudEvent, ContentMode contentMode, CloudEventFormatter formatter, string? topic)
Expand All @@ -58,13 +84,24 @@ public static MqttApplicationMessage ToMqttApplicationMessage(this CloudEvent cl
switch (contentMode)
{
case ContentMode.Structured:
var arraySegment = BinaryDataUtilities.GetArraySegment(formatter.EncodeStructuredModeMessage(cloudEvent, out var contentType));
return new MqttApplicationMessage
{
Topic = topic,
PayloadSegment = BinaryDataUtilities.GetArraySegment(formatter.EncodeStructuredModeMessage(cloudEvent, out _))
ContentType = contentType.ToString(),
PayloadSegment = arraySegment
};
default:
throw new ArgumentOutOfRangeException(nameof(contentMode), $"Unsupported content mode: {contentMode}");
return new MqttApplicationMessage
{
ContentType = formatter.GetOrInferDataContentType(cloudEvent),
UserProperties = cloudEvent.GetPopulatedAttributes()
.Select(pair => new MqttUserProperty(pair.Key.Name, pair.Key.Format(pair.Value)))
.Append(new MqttUserProperty(CloudEventsSpecVersion.SpecVersionAttribute.Name, cloudEvent.SpecVersion.VersionId))
.ToList(),
Topic = topic,
PayloadSegment = BinaryDataUtilities.GetArraySegment(formatter.EncodeBinaryModeEventData(cloudEvent))
};
}
}
}
Expand Down
52 changes: 51 additions & 1 deletion test/CloudNative.CloudEvents.UnitTests/Mqtt/MqttTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using CloudNative.CloudEvents.NewtonsoftJson;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Formatter;
using MQTTnet.Server;
using System;
using System.Net.Mime;
Expand Down Expand Up @@ -35,7 +36,7 @@ public void Dispose()
}

[Fact]
public async Task MqttSendTest()
public async Task MqttSendTest_Structured()
{

var jsonEventFormatter = new JsonEventFormatter();
Expand All @@ -56,6 +57,7 @@ public async Task MqttSendTest()
.WithClientId("Client1")
.WithTcpServer("127.0.0.1", 52355)
.WithCleanSession()
.WithProtocolVersion(MqttProtocolVersion.V500)
.Build();

TaskCompletionSource<CloudEvent> tcs = new TaskCompletionSource<CloudEvent>();
Expand All @@ -80,5 +82,53 @@ public async Task MqttSendTest()

Assert.Equal("value", (string?) receivedCloudEvent["comexampleextension1"]);
}

[Fact]
public async Task MqttSendTest_Binary()
{

var jsonEventFormatter = new JsonEventFormatter();
var cloudEvent = new CloudEvent
{
Type = "com.github.pull.create",
Source = new Uri("https://github.com/cloudevents/spec/pull/123"),
Id = "A234-1234-1234",
Time = new DateTimeOffset(2018, 4, 5, 17, 31, 0, TimeSpan.Zero),
DataContentType = MediaTypeNames.Text.Xml,
Data = "<much wow=\"xml\"/>",
["comexampleextension1"] = "value"
};

var client = new MqttFactory().CreateMqttClient();

var options = new MqttClientOptionsBuilder()
.WithClientId("Client1")
.WithTcpServer("127.0.0.1", 52355)
.WithCleanSession()
.WithProtocolVersion(MqttProtocolVersion.V500)
.Build();

TaskCompletionSource<CloudEvent> tcs = new TaskCompletionSource<CloudEvent>();
await client.ConnectAsync(options);
client.ApplicationMessageReceivedAsync += args =>
{
tcs.SetResult(args.ApplicationMessage.ToCloudEvent(jsonEventFormatter));
return Task.CompletedTask;
};

var result = await client.SubscribeAsync("abc");
await client.PublishAsync(cloudEvent.ToMqttApplicationMessage(ContentMode.Binary, new JsonEventFormatter(), topic: "abc"));
var receivedCloudEvent = await tcs.Task;

Assert.Equal(CloudEventsSpecVersion.Default, receivedCloudEvent.SpecVersion);
Assert.Equal("com.github.pull.create", receivedCloudEvent.Type);
Assert.Equal(new Uri("https://github.com/cloudevents/spec/pull/123"), receivedCloudEvent.Source);
Assert.Equal("A234-1234-1234", receivedCloudEvent.Id);
AssertTimestampsEqual("2018-04-05T17:31:00Z", receivedCloudEvent.Time!.Value);
Assert.Equal(MediaTypeNames.Text.Xml, receivedCloudEvent.DataContentType);
Assert.Equal("<much wow=\"xml\"/>", receivedCloudEvent.Data);

Assert.Equal("value", (string?) receivedCloudEvent["comexampleextension1"]);
}
}
}

0 comments on commit 5964989

Please sign in to comment.