Skip to content

Commit

Permalink
Update to latest OTel conventions
Browse files Browse the repository at this point in the history
- Swap order of queue name and operation in Activity name
- Add messaging.operation.name tag
  • Loading branch information
iinuwa authored and lukebakken committed Dec 13, 2024
1 parent 354d4bd commit b5ebf85
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 25 deletions.
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ await MaybeEnforceFlowControlAsync(cancellationToken)
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);

using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
? RabbitMQActivitySource.Send(routingKey, exchange, body.Length)
? RabbitMQActivitySource.BasicPublish(routingKey, exchange, body.Length)
: default;

ulong publishSequenceNumber = 0;
Expand Down Expand Up @@ -117,7 +117,7 @@ await MaybeEnforceFlowControlAsync(cancellationToken)
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);

using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length)
? RabbitMQActivitySource.BasicPublish(routingKey.Value, exchange.Value, body.Length)
: default;

ulong publishSequenceNumber = 0;
Expand Down
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/Impl/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -918,10 +918,10 @@ await ModelSendAsync(in method, k.CancellationToken)
BasicGetResult? result = await k;

using Activity? activity = result != null
? RabbitMQActivitySource.Receive(result.RoutingKey,
? RabbitMQActivitySource.BasicGet(result.RoutingKey,
result.Exchange,
result.DeliveryTag, result.BasicProperties, result.Body.Length)
: RabbitMQActivitySource.ReceiveEmpty(queue);
: RabbitMQActivitySource.BasicGetEmpty(queue);

activity?.SetStartTime(k.StartTime);

Expand Down
37 changes: 22 additions & 15 deletions projects/RabbitMQ.Client/Impl/RabbitMQActivitySource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ public static class RabbitMQActivitySource
// https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#messaging-attributes
internal const string MessageId = "messaging.message.id";
internal const string MessageConversationId = "messaging.message.conversation_id";
internal const string MessagingOperationName = "messaging.operation.name";
internal const string MessagingOperationNameBasicDeliver = "deliver";
internal const string MessagingOperationNameBasicGet = "fetch";
internal const string MessagingOperationNameBasicGetEmpty = "fetch (empty)";
internal const string MessagingOperationNameBasicPublish = "publish";
internal const string MessagingOperationType = "messaging.operation.type";
internal const string MessagingOperationTypeSend = "send";
internal const string MessagingOperationTypeProcess = "process";
Expand Down Expand Up @@ -56,7 +61,7 @@ public static class RabbitMQActivitySource
new KeyValuePair<string, object?>(ProtocolVersion, "0.9.1")
};

internal static Activity? Send(string routingKey, string exchange, int bodySize,
internal static Activity? BasicPublish(string routingKey, string exchange, int bodySize,
ActivityContext linkedContext = default)
{
if (!s_publisherSource.HasListeners())
Expand All @@ -66,41 +71,42 @@ public static class RabbitMQActivitySource

Activity? activity = linkedContext == default
? s_publisherSource.StartRabbitMQActivity(
UseRoutingKeyAsOperationName ? $"{routingKey} {MessagingOperationTypeSend}" : MessagingOperationTypeSend,
UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicPublish} {routingKey}" : MessagingOperationNameBasicPublish,
ActivityKind.Producer)
: s_publisherSource.StartLinkedRabbitMQActivity(
UseRoutingKeyAsOperationName ? $"{routingKey} {MessagingOperationTypeSend}" : MessagingOperationTypeSend,
UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicPublish} {routingKey}" : MessagingOperationNameBasicPublish,
ActivityKind.Producer, linkedContext);
if (activity != null && activity.IsAllDataRequested)
{
PopulateMessagingTags(MessagingOperationTypeSend, routingKey, exchange, 0, bodySize, activity);
PopulateMessagingTags(MessagingOperationTypeSend, MessagingOperationNameBasicPublish, routingKey, exchange, 0, bodySize, activity);
}

return activity;

}

internal static Activity? ReceiveEmpty(string queue)
internal static Activity? BasicGetEmpty(string queue)
{
if (!s_subscriberSource.HasListeners())
{
return null;
}

Activity? activity = s_subscriberSource.StartRabbitMQActivity(
UseRoutingKeyAsOperationName ? $"{queue} {MessagingOperationTypeReceive}" : MessagingOperationTypeReceive,
UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicGetEmpty} {queue}" : MessagingOperationNameBasicGetEmpty,
ActivityKind.Consumer);
if (activity != null && activity.IsAllDataRequested)
{
activity
.SetTag(MessagingOperationType, MessagingOperationTypeReceive)
.SetTag(MessagingOperationName, MessagingOperationNameBasicGetEmpty)
.SetTag(MessagingDestination, "amq.default");
}

return activity;
}

internal static Activity? Receive(string routingKey, string exchange, ulong deliveryTag,
internal static Activity? BasicGet(string routingKey, string exchange, ulong deliveryTag,
IReadOnlyBasicProperties readOnlyBasicProperties, int bodySize)
{
if (!s_subscriberSource.HasListeners())
Expand All @@ -110,11 +116,11 @@ public static class RabbitMQActivitySource

// Extract the PropagationContext of the upstream parent from the message headers.
Activity? activity = s_subscriberSource.StartLinkedRabbitMQActivity(
UseRoutingKeyAsOperationName ? $"{routingKey} {MessagingOperationTypeReceive}" : MessagingOperationTypeReceive, ActivityKind.Consumer,
UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicGet} {routingKey}" : MessagingOperationNameBasicGet, ActivityKind.Consumer,
ContextExtractor(readOnlyBasicProperties));
if (activity != null && activity.IsAllDataRequested)
{
PopulateMessagingTags(MessagingOperationTypeReceive, routingKey, exchange, deliveryTag, readOnlyBasicProperties,
PopulateMessagingTags(MessagingOperationTypeReceive, MessagingOperationNameBasicGet, routingKey, exchange, deliveryTag, readOnlyBasicProperties,
bodySize, activity);
}

Expand All @@ -131,11 +137,11 @@ public static class RabbitMQActivitySource

// Extract the PropagationContext of the upstream parent from the message headers.
Activity? activity = s_subscriberSource.StartLinkedRabbitMQActivity(
UseRoutingKeyAsOperationName ? $"{routingKey} {MessagingOperationTypeProcess}" : MessagingOperationTypeProcess,
UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicDeliver} {routingKey}" : MessagingOperationNameBasicDeliver,
ActivityKind.Consumer, ContextExtractor(basicProperties));
if (activity != null && activity.IsAllDataRequested)
{
PopulateMessagingTags(MessagingOperationTypeProcess, routingKey, exchange,
PopulateMessagingTags(MessagingOperationTypeProcess, MessagingOperationNameBasicDeliver, routingKey, exchange,
deliveryTag, basicProperties, bodySize, activity);
}

Expand All @@ -157,10 +163,10 @@ public static class RabbitMQActivitySource
?.Start();
}

private static void PopulateMessagingTags(string operation, string routingKey, string exchange,
private static void PopulateMessagingTags(string operationType, string operationName, string routingKey, string exchange,
ulong deliveryTag, IReadOnlyBasicProperties readOnlyBasicProperties, int bodySize, Activity activity)
{
PopulateMessagingTags(operation, routingKey, exchange, deliveryTag, bodySize, activity);
PopulateMessagingTags(operationType, operationName, routingKey, exchange, deliveryTag, bodySize, activity);

if (!string.IsNullOrEmpty(readOnlyBasicProperties.CorrelationId))
{
Expand All @@ -173,11 +179,12 @@ private static void PopulateMessagingTags(string operation, string routingKey, s
}
}

private static void PopulateMessagingTags(string operation, string routingKey, string exchange,
private static void PopulateMessagingTags(string operationType, string operationName, string routingKey, string exchange,
ulong deliveryTag, int bodySize, Activity activity)
{
activity
.SetTag(MessagingOperationType, operation)
.SetTag(MessagingOperationType, operationType)
.SetTag(MessagingOperationName, operationName)
.SetTag(MessagingDestination, string.IsNullOrEmpty(exchange) ? "amq.default" : exchange)
.SetTag(MessagingDestinationRoutingKey, routingKey)
.SetTag(MessagingBodySize, bodySize);
Expand Down
6 changes: 3 additions & 3 deletions projects/Test/SequentialIntegration/TestActivitySource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ private static ActivityListener StartActivityListener(List<Activity> activities)
private void AssertActivityData(bool useRoutingKeyAsOperationName, string queueName,
List<Activity> activityList, bool isDeliver = false)
{
string childName = isDeliver ? "process" : "receive";
string childName = isDeliver ? "deliver" : "fetch";
Activity[] activities = activityList.ToArray();
Assert.NotEmpty(activities);

Expand All @@ -418,11 +418,11 @@ private void AssertActivityData(bool useRoutingKeyAsOperationName, string queueN
}

Activity sendActivity = activities.First(x =>
x.OperationName == (useRoutingKeyAsOperationName ? $"{queueName} send" : "send") &&
x.OperationName == (useRoutingKeyAsOperationName ? $"publish {queueName}" : "publish") &&
x.GetTagItem(RabbitMQActivitySource.MessagingDestinationRoutingKey) is string routingKeyTag &&
routingKeyTag == $"{queueName}");
Activity receiveActivity = activities.Single(x =>
x.OperationName == (useRoutingKeyAsOperationName ? $"{queueName} {childName}" : $"{childName}") &&
x.OperationName == (useRoutingKeyAsOperationName ? $"{childName} {queueName}" : childName) &&
x.Links.First().Context.TraceId == sendActivity.TraceId);
Assert.Equal(ActivityKind.Producer, sendActivity.Kind);
Assert.Equal(ActivityKind.Consumer, receiveActivity.Kind);
Expand Down
11 changes: 8 additions & 3 deletions projects/Test/SequentialIntegration/TestOpenTelemetry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,8 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera
private void AssertActivityData(bool useRoutingKeyAsOperationName, string queueName,
List<Activity> activityList, bool isDeliver = false, string baggageGuid = null)
{
string childName = isDeliver ? "process" : "receive";
string childName = isDeliver ? "deliver" : "fetch";
string childType = isDeliver ? "process" : "receive";
Activity[] activities = activityList.ToArray();
Assert.NotEmpty(activities);
foreach (var item in activities)
Expand All @@ -354,11 +355,11 @@ private void AssertActivityData(bool useRoutingKeyAsOperationName, string queueN
}

Activity sendActivity = activities.First(x =>
x.OperationName == (useRoutingKeyAsOperationName ? $"{queueName} send" : "send") &&
x.OperationName == (useRoutingKeyAsOperationName ? $"publish {queueName}" : "publish") &&
x.GetTagItem(RabbitMQActivitySource.MessagingDestinationRoutingKey) is string routingKeyTag &&
routingKeyTag == $"{queueName}");
Activity receiveActivity = activities.Single(x =>
x.OperationName == (useRoutingKeyAsOperationName ? $"{queueName} {childName}" : $"{childName}") &&
x.OperationName == (useRoutingKeyAsOperationName ? $"{childName} {queueName}" : childName) &&
x.Links.First().Context.TraceId == sendActivity.TraceId);
Assert.Equal(ActivityKind.Producer, sendActivity.Kind);
Assert.Equal(ActivityKind.Consumer, receiveActivity.Kind);
Expand All @@ -380,6 +381,10 @@ private void AssertActivityData(bool useRoutingKeyAsOperationName, string queueN
AssertIntTagGreaterThanZero(sendActivity, RabbitMQActivitySource.MessagingEnvelopeSize);
AssertIntTagGreaterThanZero(sendActivity, RabbitMQActivitySource.MessagingBodySize);
AssertIntTagGreaterThanZero(receiveActivity, RabbitMQActivitySource.MessagingBodySize);
AssertStringTagEquals(receiveActivity, RabbitMQActivitySource.MessagingOperationType, childType);
AssertStringTagEquals(receiveActivity, RabbitMQActivitySource.MessagingOperationName, childName);
AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessagingOperationType, "send");
AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessagingOperationName, "publish");
}
}
}

0 comments on commit b5ebf85

Please sign in to comment.