From b5ebf85f537b946df8107ebfe428851323fd7f7a Mon Sep 17 00:00:00 2001 From: Isaiah Inuwa Date: Thu, 31 Oct 2024 15:27:17 -0500 Subject: [PATCH] Update to latest OTel conventions - Swap order of queue name and operation in Activity name - Add messaging.operation.name tag --- .../Impl/Channel.BasicPublish.cs | 4 +- projects/RabbitMQ.Client/Impl/Channel.cs | 4 +- .../Impl/RabbitMQActivitySource.cs | 37 +++++++++++-------- .../TestActivitySource.cs | 6 +-- .../TestOpenTelemetry.cs | 11 ++++-- 5 files changed, 37 insertions(+), 25 deletions(-) diff --git a/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs b/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs index b7ded1397..491d32796 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs @@ -62,7 +62,7 @@ await MaybeEnforceFlowControlAsync(cancellationToken) var cmd = new BasicPublish(exchange, routingKey, mandatory, default); using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners - ? RabbitMQActivitySource.Send(routingKey, exchange, body.Length) + ? RabbitMQActivitySource.BasicPublish(routingKey, exchange, body.Length) : default; ulong publishSequenceNumber = 0; @@ -117,7 +117,7 @@ await MaybeEnforceFlowControlAsync(cancellationToken) var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default); using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners - ? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length) + ? RabbitMQActivitySource.BasicPublish(routingKey.Value, exchange.Value, body.Length) : default; ulong publishSequenceNumber = 0; diff --git a/projects/RabbitMQ.Client/Impl/Channel.cs b/projects/RabbitMQ.Client/Impl/Channel.cs index 64be4152e..4e73a9a08 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.cs @@ -918,10 +918,10 @@ await ModelSendAsync(in method, k.CancellationToken) BasicGetResult? result = await k; using Activity? activity = result != null - ? RabbitMQActivitySource.Receive(result.RoutingKey, + ? RabbitMQActivitySource.BasicGet(result.RoutingKey, result.Exchange, result.DeliveryTag, result.BasicProperties, result.Body.Length) - : RabbitMQActivitySource.ReceiveEmpty(queue); + : RabbitMQActivitySource.BasicGetEmpty(queue); activity?.SetStartTime(k.StartTime); diff --git a/projects/RabbitMQ.Client/Impl/RabbitMQActivitySource.cs b/projects/RabbitMQ.Client/Impl/RabbitMQActivitySource.cs index 45c11b590..31d076cbd 100644 --- a/projects/RabbitMQ.Client/Impl/RabbitMQActivitySource.cs +++ b/projects/RabbitMQ.Client/Impl/RabbitMQActivitySource.cs @@ -15,6 +15,11 @@ public static class RabbitMQActivitySource // https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#messaging-attributes internal const string MessageId = "messaging.message.id"; internal const string MessageConversationId = "messaging.message.conversation_id"; + internal const string MessagingOperationName = "messaging.operation.name"; + internal const string MessagingOperationNameBasicDeliver = "deliver"; + internal const string MessagingOperationNameBasicGet = "fetch"; + internal const string MessagingOperationNameBasicGetEmpty = "fetch (empty)"; + internal const string MessagingOperationNameBasicPublish = "publish"; internal const string MessagingOperationType = "messaging.operation.type"; internal const string MessagingOperationTypeSend = "send"; internal const string MessagingOperationTypeProcess = "process"; @@ -56,7 +61,7 @@ public static class RabbitMQActivitySource new KeyValuePair(ProtocolVersion, "0.9.1") }; - internal static Activity? Send(string routingKey, string exchange, int bodySize, + internal static Activity? BasicPublish(string routingKey, string exchange, int bodySize, ActivityContext linkedContext = default) { if (!s_publisherSource.HasListeners()) @@ -66,21 +71,21 @@ public static class RabbitMQActivitySource Activity? activity = linkedContext == default ? s_publisherSource.StartRabbitMQActivity( - UseRoutingKeyAsOperationName ? $"{routingKey} {MessagingOperationTypeSend}" : MessagingOperationTypeSend, + UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicPublish} {routingKey}" : MessagingOperationNameBasicPublish, ActivityKind.Producer) : s_publisherSource.StartLinkedRabbitMQActivity( - UseRoutingKeyAsOperationName ? $"{routingKey} {MessagingOperationTypeSend}" : MessagingOperationTypeSend, + UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicPublish} {routingKey}" : MessagingOperationNameBasicPublish, ActivityKind.Producer, linkedContext); if (activity != null && activity.IsAllDataRequested) { - PopulateMessagingTags(MessagingOperationTypeSend, routingKey, exchange, 0, bodySize, activity); + PopulateMessagingTags(MessagingOperationTypeSend, MessagingOperationNameBasicPublish, routingKey, exchange, 0, bodySize, activity); } return activity; } - internal static Activity? ReceiveEmpty(string queue) + internal static Activity? BasicGetEmpty(string queue) { if (!s_subscriberSource.HasListeners()) { @@ -88,19 +93,20 @@ public static class RabbitMQActivitySource } Activity? activity = s_subscriberSource.StartRabbitMQActivity( - UseRoutingKeyAsOperationName ? $"{queue} {MessagingOperationTypeReceive}" : MessagingOperationTypeReceive, + UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicGetEmpty} {queue}" : MessagingOperationNameBasicGetEmpty, ActivityKind.Consumer); if (activity != null && activity.IsAllDataRequested) { activity .SetTag(MessagingOperationType, MessagingOperationTypeReceive) + .SetTag(MessagingOperationName, MessagingOperationNameBasicGetEmpty) .SetTag(MessagingDestination, "amq.default"); } return activity; } - internal static Activity? Receive(string routingKey, string exchange, ulong deliveryTag, + internal static Activity? BasicGet(string routingKey, string exchange, ulong deliveryTag, IReadOnlyBasicProperties readOnlyBasicProperties, int bodySize) { if (!s_subscriberSource.HasListeners()) @@ -110,11 +116,11 @@ public static class RabbitMQActivitySource // Extract the PropagationContext of the upstream parent from the message headers. Activity? activity = s_subscriberSource.StartLinkedRabbitMQActivity( - UseRoutingKeyAsOperationName ? $"{routingKey} {MessagingOperationTypeReceive}" : MessagingOperationTypeReceive, ActivityKind.Consumer, + UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicGet} {routingKey}" : MessagingOperationNameBasicGet, ActivityKind.Consumer, ContextExtractor(readOnlyBasicProperties)); if (activity != null && activity.IsAllDataRequested) { - PopulateMessagingTags(MessagingOperationTypeReceive, routingKey, exchange, deliveryTag, readOnlyBasicProperties, + PopulateMessagingTags(MessagingOperationTypeReceive, MessagingOperationNameBasicGet, routingKey, exchange, deliveryTag, readOnlyBasicProperties, bodySize, activity); } @@ -131,11 +137,11 @@ public static class RabbitMQActivitySource // Extract the PropagationContext of the upstream parent from the message headers. Activity? activity = s_subscriberSource.StartLinkedRabbitMQActivity( - UseRoutingKeyAsOperationName ? $"{routingKey} {MessagingOperationTypeProcess}" : MessagingOperationTypeProcess, + UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicDeliver} {routingKey}" : MessagingOperationNameBasicDeliver, ActivityKind.Consumer, ContextExtractor(basicProperties)); if (activity != null && activity.IsAllDataRequested) { - PopulateMessagingTags(MessagingOperationTypeProcess, routingKey, exchange, + PopulateMessagingTags(MessagingOperationTypeProcess, MessagingOperationNameBasicDeliver, routingKey, exchange, deliveryTag, basicProperties, bodySize, activity); } @@ -157,10 +163,10 @@ public static class RabbitMQActivitySource ?.Start(); } - private static void PopulateMessagingTags(string operation, string routingKey, string exchange, + private static void PopulateMessagingTags(string operationType, string operationName, string routingKey, string exchange, ulong deliveryTag, IReadOnlyBasicProperties readOnlyBasicProperties, int bodySize, Activity activity) { - PopulateMessagingTags(operation, routingKey, exchange, deliveryTag, bodySize, activity); + PopulateMessagingTags(operationType, operationName, routingKey, exchange, deliveryTag, bodySize, activity); if (!string.IsNullOrEmpty(readOnlyBasicProperties.CorrelationId)) { @@ -173,11 +179,12 @@ private static void PopulateMessagingTags(string operation, string routingKey, s } } - private static void PopulateMessagingTags(string operation, string routingKey, string exchange, + private static void PopulateMessagingTags(string operationType, string operationName, string routingKey, string exchange, ulong deliveryTag, int bodySize, Activity activity) { activity - .SetTag(MessagingOperationType, operation) + .SetTag(MessagingOperationType, operationType) + .SetTag(MessagingOperationName, operationName) .SetTag(MessagingDestination, string.IsNullOrEmpty(exchange) ? "amq.default" : exchange) .SetTag(MessagingDestinationRoutingKey, routingKey) .SetTag(MessagingBodySize, bodySize); diff --git a/projects/Test/SequentialIntegration/TestActivitySource.cs b/projects/Test/SequentialIntegration/TestActivitySource.cs index 1143b9551..ec9cdcf31 100644 --- a/projects/Test/SequentialIntegration/TestActivitySource.cs +++ b/projects/Test/SequentialIntegration/TestActivitySource.cs @@ -402,7 +402,7 @@ private static ActivityListener StartActivityListener(List activities) private void AssertActivityData(bool useRoutingKeyAsOperationName, string queueName, List activityList, bool isDeliver = false) { - string childName = isDeliver ? "process" : "receive"; + string childName = isDeliver ? "deliver" : "fetch"; Activity[] activities = activityList.ToArray(); Assert.NotEmpty(activities); @@ -418,11 +418,11 @@ private void AssertActivityData(bool useRoutingKeyAsOperationName, string queueN } Activity sendActivity = activities.First(x => - x.OperationName == (useRoutingKeyAsOperationName ? $"{queueName} send" : "send") && + x.OperationName == (useRoutingKeyAsOperationName ? $"publish {queueName}" : "publish") && x.GetTagItem(RabbitMQActivitySource.MessagingDestinationRoutingKey) is string routingKeyTag && routingKeyTag == $"{queueName}"); Activity receiveActivity = activities.Single(x => - x.OperationName == (useRoutingKeyAsOperationName ? $"{queueName} {childName}" : $"{childName}") && + x.OperationName == (useRoutingKeyAsOperationName ? $"{childName} {queueName}" : childName) && x.Links.First().Context.TraceId == sendActivity.TraceId); Assert.Equal(ActivityKind.Producer, sendActivity.Kind); Assert.Equal(ActivityKind.Consumer, receiveActivity.Kind); diff --git a/projects/Test/SequentialIntegration/TestOpenTelemetry.cs b/projects/Test/SequentialIntegration/TestOpenTelemetry.cs index 4ecff875d..4bfc54104 100644 --- a/projects/Test/SequentialIntegration/TestOpenTelemetry.cs +++ b/projects/Test/SequentialIntegration/TestOpenTelemetry.cs @@ -342,7 +342,8 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera private void AssertActivityData(bool useRoutingKeyAsOperationName, string queueName, List activityList, bool isDeliver = false, string baggageGuid = null) { - string childName = isDeliver ? "process" : "receive"; + string childName = isDeliver ? "deliver" : "fetch"; + string childType = isDeliver ? "process" : "receive"; Activity[] activities = activityList.ToArray(); Assert.NotEmpty(activities); foreach (var item in activities) @@ -354,11 +355,11 @@ private void AssertActivityData(bool useRoutingKeyAsOperationName, string queueN } Activity sendActivity = activities.First(x => - x.OperationName == (useRoutingKeyAsOperationName ? $"{queueName} send" : "send") && + x.OperationName == (useRoutingKeyAsOperationName ? $"publish {queueName}" : "publish") && x.GetTagItem(RabbitMQActivitySource.MessagingDestinationRoutingKey) is string routingKeyTag && routingKeyTag == $"{queueName}"); Activity receiveActivity = activities.Single(x => - x.OperationName == (useRoutingKeyAsOperationName ? $"{queueName} {childName}" : $"{childName}") && + x.OperationName == (useRoutingKeyAsOperationName ? $"{childName} {queueName}" : childName) && x.Links.First().Context.TraceId == sendActivity.TraceId); Assert.Equal(ActivityKind.Producer, sendActivity.Kind); Assert.Equal(ActivityKind.Consumer, receiveActivity.Kind); @@ -380,6 +381,10 @@ private void AssertActivityData(bool useRoutingKeyAsOperationName, string queueN AssertIntTagGreaterThanZero(sendActivity, RabbitMQActivitySource.MessagingEnvelopeSize); AssertIntTagGreaterThanZero(sendActivity, RabbitMQActivitySource.MessagingBodySize); AssertIntTagGreaterThanZero(receiveActivity, RabbitMQActivitySource.MessagingBodySize); + AssertStringTagEquals(receiveActivity, RabbitMQActivitySource.MessagingOperationType, childType); + AssertStringTagEquals(receiveActivity, RabbitMQActivitySource.MessagingOperationName, childName); + AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessagingOperationType, "send"); + AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessagingOperationName, "publish"); } } }