Skip to content

Commit

Permalink
Introduce processing time metric. (#7097)
Browse files Browse the repository at this point in the history
* Introduce processing time metric.

* Move the record of the metrics for a message successfully processed after the outbox transaction commit.

* Remove unnecessary comment.

* Move the critical time

---------

Co-authored-by: SzymonPobiega <[email protected]>
  • Loading branch information
saratry and SzymonPobiega authored Jul 15, 2024
1 parent 1a053ae commit d7b35c1
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,50 @@ public async Task Should_report_successful_message_metric()
metricsListener.AssertMetric("nservicebus.messaging.successes", 5);
metricsListener.AssertMetric("nservicebus.messaging.fetches", 5);
metricsListener.AssertMetric("nservicebus.messaging.failures", 0);
metricsListener.AssertMetric("nservicebus.messaging.critical_time", 5);
metricsListener.AssertMetric("nservicebus.messaging.processing_time", 5);
metricsListener.AssertMetric("nservicebus.messaging.handler_time", 5);

metricsListener.AssertTags("nservicebus.messaging.fetches",
new Dictionary<string, object>
{
["nservicebus.queue"] = Conventions.EndpointNamingConvention(typeof(EndpointWithMetrics)),
["nservicebus.discriminator"] = "disc",
["nservicebus.message_type"] = typeof(OutgoingMessage).FullName
});

metricsListener.AssertTags("nservicebus.messaging.successes",
new Dictionary<string, object>
{
["nservicebus.queue"] = Conventions.EndpointNamingConvention(typeof(EndpointWithMetrics)),
["nservicebus.discriminator"] = "disc",
["nservicebus.message_type"] = typeof(OutgoingMessage).FullName
});

metricsListener.AssertTags("nservicebus.messaging.critical_time",
new Dictionary<string, object>
{
["nservicebus.queue"] = Conventions.EndpointNamingConvention(typeof(EndpointWithMetrics)),
["nservicebus.discriminator"] = "disc",
["nservicebus.message_type"] = typeof(OutgoingMessage).FullName
});

metricsListener.AssertTags("nservicebus.messaging.processing_time",
new Dictionary<string, object>
{
["nservicebus.queue"] = Conventions.EndpointNamingConvention(typeof(EndpointWithMetrics)),
["nservicebus.discriminator"] = "disc",
["nservicebus.message_type"] = typeof(OutgoingMessage).FullName
});

metricsListener.AssertTags("nservicebus.messaging.handler_time",
new Dictionary<string, object>
{
["nservicebus.queue"] = Conventions.EndpointNamingConvention(typeof(EndpointWithMetrics)),
["nservicebus.discriminator"] = "disc",
["nservicebus.message_type"] = typeof(OutgoingMessage).FullName,
["nservicebus.message_handler_type"] = typeof(EndpointWithMetrics.MessageHandler).FullName,
["execution.result"] = "success"
});
}

Expand All @@ -68,9 +98,12 @@ public async Task Should_only_tag_most_concrete_type_on_metric()
metricsListener.AssertMetric("nservicebus.messaging.fetches", 5);
metricsListener.AssertMetric("nservicebus.messaging.failures", 0);

var successEndpoint = metricsListener.AssertTagKeyExists("nservicebus.messaging.successes", "nservicebus.queue");
var successType = metricsListener.AssertTagKeyExists("nservicebus.messaging.successes", "nservicebus.message_type");
var successHandlerType = metricsListener.AssertTagKeyExists("nservicebus.messaging.successes", "nservicebus.message_handler_types");
var successEndpoint =
metricsListener.AssertTagKeyExists("nservicebus.messaging.successes", "nservicebus.queue");
var successType =
metricsListener.AssertTagKeyExists("nservicebus.messaging.successes", "nservicebus.message_type");
var successHandlerType =
metricsListener.AssertTagKeyExists("nservicebus.messaging.successes", "nservicebus.message_handler_types");

var fetchedEndpoint = metricsListener.AssertTagKeyExists("nservicebus.messaging.fetches", "nservicebus.queue");

Expand All @@ -90,7 +123,7 @@ class EndpointWithMetrics : EndpointConfigurationBuilder
{
public EndpointWithMetrics() => EndpointSetup<OpenTelemetryEnabledEndpoint>();

class MessageHandler : IHandleMessages<OutgoingMessage>
public class MessageHandler : IHandleMessages<OutgoingMessage>
{
readonly Context testContext;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,34 @@ public async Task Should_report_failing_message_metrics()
metricsListener.AssertMetric("nservicebus.messaging.fetches", 1);
metricsListener.AssertMetric("nservicebus.messaging.failures", 1);
metricsListener.AssertMetric("nservicebus.messaging.successes", 0);
metricsListener.AssertMetric("nservicebus.messaging.critical_time", 0);
metricsListener.AssertMetric("nservicebus.messaging.processing_time", 0);
metricsListener.AssertMetric("nservicebus.messaging.handler_time", 1);

metricsListener.AssertTags("nservicebus.messaging.fetches",
new Dictionary<string, object>
{
["nservicebus.queue"] = Conventions.EndpointNamingConvention(typeof(FailingEndpoint)),
["nservicebus.discriminator"] = "disc",
["nservicebus.message_type"] = typeof(FailingMessage).FullName
});

metricsListener.AssertTags("nservicebus.messaging.failures",
new Dictionary<string, object>
{
["nservicebus.queue"] = Conventions.EndpointNamingConvention(typeof(FailingEndpoint)),
["nservicebus.discriminator"] = "disc",
["nservicebus.message_type"] = typeof(FailingMessage).FullName,
["error.type"] = typeof(SimulatedException).FullName,
});

metricsListener.AssertTags("nservicebus.messaging.handler_time",
new Dictionary<string, object>
{
["nservicebus.queue"] = Conventions.EndpointNamingConvention(typeof(FailingEndpoint)),
["nservicebus.discriminator"] = "disc",
["nservicebus.message_type"] = typeof(FailingMessage).FullName,
["execution.result"] = "failure",
["error.type"] = typeof(SimulatedException).FullName,
});
}
Expand All @@ -55,7 +77,6 @@ public Task Handle(FailingMessage message, IMessageHandlerContext context)
}

const string ErrorMessage = "oh no!";

}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"Note": "Changes to metrics API should result in an update to NServiceBusMeter version.",
"ActivitySourceVersion": "0.2.0",
"MetricsSourceName": "NServiceBus.Core.Pipeline.Incoming",
"MetricsSourceVersion": "0.2.0",
"Tags": [
"error.type",
"execution.result",
Expand All @@ -15,6 +16,7 @@
"nservicebus.messaging.failures => Counter",
"nservicebus.messaging.fetches => Counter",
"nservicebus.messaging.handler_time => Histogram, Unit: s",
"nservicebus.messaging.processing_time => Histogram, Unit: s",
"nservicebus.messaging.successes => Counter",
"nservicebus.recoverability.delayed => Counter",
"nservicebus.recoverability.error => Counter",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class TestingMetricListener : IDisposable
readonly MeterListener meterListener;
public List<Instrument> metrics = [];
public string version = "";
public string metricsSourceName = "";

public TestingMetricListener(string sourceName)
{
Expand All @@ -25,6 +26,7 @@ public TestingMetricListener(string sourceName)
listener.EnableMeasurementEvents(instrument);
metrics.Add(instrument);
version = instrument.Meter.Version;
metricsSourceName = instrument.Meter.Name;
}
}
};
Expand Down
3 changes: 2 additions & 1 deletion src/NServiceBus.Core.Tests/OpenTelemetry/MeterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public void Verify_MeterAPI()
Approver.Verify(new
{
Note = "Changes to metrics API should result in an update to NServiceBusMeter version.",
ActivitySourceVersion = metricsListener.version,
MetricsSourceName = metricsListener.metricsSourceName,
MetricsSourceVersion = metricsListener.version,
Tags = meterTags,
Metrics = metrics
});
Expand Down
36 changes: 31 additions & 5 deletions src/NServiceBus.Core/Pipeline/Incoming/IncomingPipelineMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class IncomingPipelineMetrics
const string TotalFailures = "nservicebus.messaging.failures";
const string MessageHandlerTime = "nservicebus.messaging.handler_time";
const string CriticalTime = "nservicebus.messaging.critical_time";
const string ProcessingTime = "nservicebus.messaging.processing_time";
const string RecoverabilityImmediate = "nservicebus.recoverability.immediate";
const string RecoverabilityDelayed = "nservicebus.recoverability.delayed";
const string RecoverabilityError = "nservicebus.recoverability.error";
Expand All @@ -30,6 +31,8 @@ public IncomingPipelineMetrics(IMeterFactory meterFactory, string queueName, str
"The time in seconds for the execution of the business code.");
criticalTime = meter.CreateHistogram<double>(CriticalTime, "s",
"The time in seconds between when the message was sent until processed by the endpoint.");
processingTime = meter.CreateHistogram<double>(ProcessingTime, "s",
"The time in seconds between when the message was fetched from the input queue until successfully processed by the endpoint.");
totalImmediateRetries = meter.CreateCounter<long>(RecoverabilityImmediate,
description: "Total number of immediate retries requested.");
totalDelayedRetries = meter.CreateCounter<long>(RecoverabilityDelayed,
Expand All @@ -47,13 +50,35 @@ public void AddDefaultIncomingPipelineMetricTags(IncomingPipelineMetricTags inco
incomingPipelineMetricsTags.Add(MeterTags.EndpointDiscriminator, endpointDiscriminator ?? "");
}

public void RecordMessageSuccessfullyProcessed(ITransportReceiveContext context, IncomingPipelineMetricTags incomingPipelineMetricTags)
public void RecordProcessingTime(ITransportReceiveContext context, TimeSpan elapsed)
{
if (!processingTime.Enabled)
{
return;
}

var incomingPipelineMetricTags = context.Extensions.Get<IncomingPipelineMetricTags>();

TagList tags;
tags.Add(new(MeterTags.ExecutionResult, "success"));
incomingPipelineMetricTags.ApplyTags(ref tags, [
MeterTags.QueueName,
MeterTags.EndpointDiscriminator,
MeterTags.MessageType,
MeterTags.MessageHandlerTypes]);

processingTime.Record(elapsed.TotalSeconds, tags);
}

public void RecordCriticalTimeAndTotalProcessed(ITransportReceiveContext context)
{
if (!totalProcessedSuccessfully.Enabled && !criticalTime.Enabled)
{
return;
}

var incomingPipelineMetricTags = context.Extensions.Get<IncomingPipelineMetricTags>();

TagList tags;
tags.Add(new(MeterTags.ExecutionResult, "success"));
incomingPipelineMetricTags.ApplyTags(ref tags, [
Expand All @@ -66,10 +91,9 @@ public void RecordMessageSuccessfullyProcessed(ITransportReceiveContext context,
{
totalProcessedSuccessfully.Add(1, tags);
}
var completedAt = DateTimeOffset.UtcNow;
if (criticalTime.Enabled)
{
var completedAt = DateTimeOffset.UtcNow;

if (context.Message.Headers.TryGetDeliverAt(out var startTime)
|| context.Message.Headers.TryGetTimeSent(out startTime))
{
Expand All @@ -96,7 +120,7 @@ public void RecordMessageProcessingFailure(IncomingPipelineMetricTags incomingPi
MeterTags.MessageHandlerTypes]);
totalFailures.Add(1, tags);

// the critical time is intentionally not recorded in case of failure
// the processing and critical time are intentionally not recorded in case of failure
}

public void RecordFetchedMessage(IncomingPipelineMetricTags incomingPipelineMetricTags)
Expand All @@ -109,7 +133,8 @@ public void RecordFetchedMessage(IncomingPipelineMetricTags incomingPipelineMetr
TagList tags;
incomingPipelineMetricTags.ApplyTags(ref tags, [
MeterTags.EndpointDiscriminator,
MeterTags.QueueName]);
MeterTags.QueueName,
MeterTags.MessageType]);

totalFetched.Add(1, tags);
}
Expand Down Expand Up @@ -217,6 +242,7 @@ public void RecordSendToErrorQueue(IRecoverabilityContext recoverabilityContext)
readonly Counter<long> totalFailures;
readonly Histogram<double> messageHandlerTime;
readonly Histogram<double> criticalTime;
readonly Histogram<double> processingTime;
readonly Counter<long> totalImmediateRetries;
readonly Counter<long> totalDelayedRetries;
readonly Counter<long> totalSentToErrorQueue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public TransportReceiveToPhysicalMessageConnector(IOutboxStorage outboxStorage,

public async Task Invoke(ITransportReceiveContext context, Func<IIncomingPhysicalMessageContext, Task> next)
{
var processingStartedAt = DateTimeOffset.UtcNow;
var messageId = context.Message.MessageId;
var physicalMessageContext = this.CreateIncomingPhysicalMessageContext(context.Message, context);

Expand All @@ -34,14 +35,14 @@ public async Task Invoke(ITransportReceiveContext context, Func<IIncomingPhysica
context.Extensions.Set(outboxTransaction);
await next(physicalMessageContext).ConfigureAwait(false);

context.Extensions.TryGet<IncomingPipelineMetricTags>(out IncomingPipelineMetricTags incomingPipelineMetricsTags);
incomingPipelineMetrics.RecordMessageSuccessfullyProcessed(context, incomingPipelineMetricsTags);

var outboxMessage = new OutboxMessage(messageId, ConvertToOutboxOperations(pendingTransportOperations.Operations));
await outboxStorage.Store(outboxMessage, outboxTransaction, context.Extensions, context.CancellationToken).ConfigureAwait(false);

context.Extensions.Remove<IOutboxTransaction>();
await outboxTransaction.Commit(context.CancellationToken).ConfigureAwait(false);

var processingCompletedAt = DateTimeOffset.UtcNow;
incomingPipelineMetrics.RecordProcessingTime(context, processingCompletedAt - processingStartedAt);
}

physicalMessageContext.Extensions.Remove<PendingTransportOperations>();
Expand All @@ -68,6 +69,11 @@ public async Task Invoke(ITransportReceiveContext context, Func<IIncomingPhysica
}

await outboxStorage.SetAsDispatched(messageId, context.Extensions, context.CancellationToken).ConfigureAwait(false);

if (pendingTransportOperations.HasOperations || deduplicationEntry == null)
{
incomingPipelineMetrics.RecordCriticalTimeAndTotalProcessed(context);
}
}

static void ConvertToPendingOperations(OutboxMessage deduplicationEntry, PendingTransportOperations pendingTransportOperations)
Expand Down
12 changes: 8 additions & 4 deletions src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@ class MainPipelineExecutor(
public async Task Invoke(MessageContext messageContext, CancellationToken cancellationToken = default)
{
var pipelineStartedAt = DateTimeOffset.UtcNow;

using var activity = activityFactory.StartIncomingPipelineActivity(messageContext);

var incomingPipelineMetricsTags = messageContext.Extensions.Get<IncomingPipelineMetricTags>();

incomingPipelineMetrics.AddDefaultIncomingPipelineMetricTags(incomingPipelineMetricsTags);
incomingPipelineMetrics.RecordFetchedMessage(incomingPipelineMetricsTags);

var childScope = rootBuilder.CreateAsyncScope();
await using (childScope.ConfigureAwait(false))
Expand Down Expand Up @@ -64,10 +62,16 @@ public async Task Invoke(MessageContext messageContext, CancellationToken cancel

ex.Data["Pipeline canceled"] = transportReceiveContext.CancellationToken.IsCancellationRequested;

incomingPipelineMetrics.RecordMessageProcessingFailure(incomingPipelineMetricsTags, ex);

if (!ex.IsCausedBy(transportReceiveContext.CancellationToken))
{
incomingPipelineMetrics.RecordMessageProcessingFailure(incomingPipelineMetricsTags, ex);
}
throw;
}
finally
{
incomingPipelineMetrics.RecordFetchedMessage(incomingPipelineMetricsTags);
}

var completedAt = DateTimeOffset.UtcNow;
await receivePipelineNotification.Raise(new ReceivePipelineCompleted(message, pipelineStartedAt, completedAt), cancellationToken).ConfigureAwait(false);
Expand Down

0 comments on commit d7b35c1

Please sign in to comment.