Skip to content

Commit

Permalink
add payload size tracking to sqs and rabbit
Browse files Browse the repository at this point in the history
  • Loading branch information
vandonr committed Nov 7, 2023
1 parent ad4d938 commit 11c6917
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@
import datadog.trace.api.Config;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;

public class TracingIterator<L extends Iterator<Message>> implements Iterator<Message> {
private static final Logger log = LoggerFactory.getLogger(TracingIterator.class);
Expand Down Expand Up @@ -91,7 +95,9 @@ protected void startNewMessageSpan(Message message) {
sortedTags.put(DIRECTION_TAG, DIRECTION_IN);
sortedTags.put(TOPIC_TAG, urlFileName(queueUrl));
sortedTags.put(TYPE_TAG, "sqs");
AgentTracer.get().getDataStreamsMonitoring().setCheckpoint(span, sortedTags, 0, 0);
AgentTracer.get()
.getDataStreamsMonitoring()
.setCheckpoint(span, sortedTags, 0, computePayloadSizeBytes(message));

CONSUMER_DECORATE.afterStart(span);
CONSUMER_DECORATE.onConsume(span, queueUrl, requestId);
Expand All @@ -106,6 +112,29 @@ protected void startNewMessageSpan(Message message) {
}
}

private static long computePayloadSizeBytes(Message message) {
long size = 0;
if (message.body() != null) size += message.body().getBytes(StandardCharsets.UTF_8).length;

for (Map.Entry<String, MessageAttributeValue> attr : message.messageAttributes().entrySet()) {
size += attr.getKey().getBytes(StandardCharsets.UTF_8).length;

MessageAttributeValue val = attr.getValue();
if (null != val.stringValue())
size += val.stringValue().getBytes(StandardCharsets.UTF_8).length;
if (null != val.binaryValue())
// using Unsafe method to avoid a mem copy
size += val.binaryValue().asByteArrayUnsafe().length;
for (String s : val.stringListValues()) {
size += s.getBytes(StandardCharsets.UTF_8).length;
}
for (SdkBytes b : val.binaryListValues()) {
size += b.asByteArrayUnsafe().length;
}
}
return size;
}

@Override
public void remove() {
delegate.remove();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.rabbitmq.client.Command;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.LongString;
import datadog.trace.api.Config;
import datadog.trace.api.naming.SpanNaming;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
Expand All @@ -27,6 +28,7 @@
import datadog.trace.bootstrap.instrumentation.api.Tags;
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
import datadog.trace.bootstrap.instrumentation.decorator.MessagingClientDecorator;
import java.nio.charset.StandardCharsets;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -249,7 +251,11 @@ public static AgentScope startReceivingSpan(
sortedTags.put(TYPE_TAG, "rabbitmq");
AgentTracer.get()
.getDataStreamsMonitoring()
.setCheckpoint(span, sortedTags, produceMillis, 0);
.setCheckpoint(
span,
sortedTags,
produceMillis,
(body != null ? body.length : 0) + computeHeadersSizeBytes(headers));
}

CONSUMER_DECORATE.afterStart(span);
Expand All @@ -260,6 +266,19 @@ public static AgentScope startReceivingSpan(
return scope;
}

private static long computeHeadersSizeBytes(Map<String, Object> headers) {
long size = 0;
for (Map.Entry<String, Object> kv : headers.entrySet()) {
size += kv.getKey().getBytes(StandardCharsets.UTF_8).length;

Object v = kv.getValue();
// most things get converted to this type in RMQ headers
if (v instanceof LongString) size += ((LongString) v).getBytes().length;
// todo handle more types ?
}
return size;
}

public static void finishReceivingSpan(AgentScope scope) {
AgentSpan span = scope.span();
if (CONSUMER_DECORATE.endToEndDurationsEnabled) {
Expand Down

0 comments on commit 11c6917

Please sign in to comment.