diff --git a/dd-java-agent/instrumentation/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingIterator.java b/dd-java-agent/instrumentation/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingIterator.java index ee58cbf3538..17843ec398c 100644 --- a/dd-java-agent/instrumentation/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingIterator.java +++ b/dd-java-agent/instrumentation/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingIterator.java @@ -20,11 +20,15 @@ import datadog.trace.api.Config; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import datadog.trace.bootstrap.instrumentation.api.AgentTracer; +import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; public class TracingIterator> implements Iterator { private static final Logger log = LoggerFactory.getLogger(TracingIterator.class); @@ -91,7 +95,9 @@ protected void startNewMessageSpan(Message message) { sortedTags.put(DIRECTION_TAG, DIRECTION_IN); sortedTags.put(TOPIC_TAG, urlFileName(queueUrl)); sortedTags.put(TYPE_TAG, "sqs"); - AgentTracer.get().getDataStreamsMonitoring().setCheckpoint(span, sortedTags, 0, 0); + AgentTracer.get() + .getDataStreamsMonitoring() + .setCheckpoint(span, sortedTags, 0, computePayloadSizeBytes(message)); CONSUMER_DECORATE.afterStart(span); CONSUMER_DECORATE.onConsume(span, queueUrl, requestId); @@ -106,6 +112,29 @@ protected void startNewMessageSpan(Message message) { } } + private static long computePayloadSizeBytes(Message message) { + long size = 0; + if (message.body() != null) size += message.body().getBytes(StandardCharsets.UTF_8).length; + + for (Map.Entry attr : message.messageAttributes().entrySet()) { + size += attr.getKey().getBytes(StandardCharsets.UTF_8).length; + + MessageAttributeValue val = attr.getValue(); + if (null != val.stringValue()) + size += val.stringValue().getBytes(StandardCharsets.UTF_8).length; + if (null != val.binaryValue()) + // using Unsafe method to avoid a mem copy + size += val.binaryValue().asByteArrayUnsafe().length; + for (String s : val.stringListValues()) { + size += s.getBytes(StandardCharsets.UTF_8).length; + } + for (SdkBytes b : val.binaryListValues()) { + size += b.asByteArrayUnsafe().length; + } + } + return size; + } + @Override public void remove() { delegate.remove(); diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitDecorator.java b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitDecorator.java index 8b8aba561e3..cd92dd77514 100644 --- a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitDecorator.java +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitDecorator.java @@ -17,6 +17,7 @@ import com.rabbitmq.client.Command; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.Envelope; +import com.rabbitmq.client.LongString; import datadog.trace.api.Config; import datadog.trace.api.naming.SpanNaming; import datadog.trace.bootstrap.instrumentation.api.AgentScope; @@ -27,6 +28,7 @@ import datadog.trace.bootstrap.instrumentation.api.Tags; import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import datadog.trace.bootstrap.instrumentation.decorator.MessagingClientDecorator; +import java.nio.charset.StandardCharsets; import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -249,7 +251,11 @@ public static AgentScope startReceivingSpan( sortedTags.put(TYPE_TAG, "rabbitmq"); AgentTracer.get() .getDataStreamsMonitoring() - .setCheckpoint(span, sortedTags, produceMillis, 0); + .setCheckpoint( + span, + sortedTags, + produceMillis, + (body != null ? body.length : 0) + computeHeadersSizeBytes(headers)); } CONSUMER_DECORATE.afterStart(span); @@ -260,6 +266,19 @@ public static AgentScope startReceivingSpan( return scope; } + private static long computeHeadersSizeBytes(Map headers) { + long size = 0; + for (Map.Entry kv : headers.entrySet()) { + size += kv.getKey().getBytes(StandardCharsets.UTF_8).length; + + Object v = kv.getValue(); + // most things get converted to this type in RMQ headers + if (v instanceof LongString) size += ((LongString) v).getBytes().length; + // todo handle more types ? + } + return size; + } + public static void finishReceivingSpan(AgentScope scope) { AgentSpan span = scope.span(); if (CONSUMER_DECORATE.endToEndDurationsEnabled) {