From 17eacbca8bd20099ec25edf700af8b195837367d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Thu, 19 Oct 2023 11:00:29 +0200 Subject: [PATCH] couple of fixes following advice from andrea - mutualize method between client and streams - fix code that would not have been inlined - add class cast check just to be sure --- .../KafkaConsumerInstrumentation.java | 3 +- .../kafka_clients/TracingIterator.java | 14 +-------- .../instrumentation/kafka_clients/Utils.java | 19 ++++++++++++ .../KafkaStreamTaskInstrumentation.java | 31 ++++++------------- 4 files changed, 32 insertions(+), 35 deletions(-) create mode 100644 dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/Utils.java diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java index bf840a76e835..a3161879a866 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java @@ -48,7 +48,8 @@ public String[] helperClassNames() { packageName + ".TracingIterator", packageName + ".TracingList", packageName + ".TracingListIterator", - packageName + ".Base64Decoder" + packageName + ".Base64Decoder", + packageName + ".Utils" }; } diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java index a9523bae0531..7e14f23d69d8 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java @@ -13,6 +13,7 @@ import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.KAFKA_DELIVER; import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.TIME_IN_QUEUE_ENABLED; import static datadog.trace.instrumentation.kafka_clients.TextMapExtractAdapter.GETTER; +import static datadog.trace.instrumentation.kafka_clients.Utils.computePayloadSizeBytes; import static java.util.concurrent.TimeUnit.MILLISECONDS; import datadog.trace.api.Config; @@ -20,12 +21,9 @@ import datadog.trace.bootstrap.instrumentation.api.AgentSpan.Context; import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags; -import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.LinkedHashMap; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.header.Header; -import org.apache.kafka.common.header.Headers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -114,16 +112,6 @@ protected void startNewRecordSpan(ConsumerRecord val) { } } - private static long computePayloadSizeBytes(ConsumerRecord val) { - long headersSize = 0; - Headers headers = val.headers(); - if (headers != null) - for (Header h : headers) { - headersSize += h.value().length + h.key().getBytes(StandardCharsets.UTF_8).length; - } - return headersSize + val.serializedKeySize() + val.serializedValueSize(); - } - @Override public void remove() { delegateIterator.remove(); diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/Utils.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/Utils.java new file mode 100644 index 000000000000..a07d333be577 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/Utils.java @@ -0,0 +1,19 @@ +package datadog.trace.instrumentation.kafka_clients; + +import java.nio.charset.StandardCharsets; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; + +public class Utils { + // this method is used in kafka-clients and kafka-streams instrumentations + public static long computePayloadSizeBytes(ConsumerRecord val) { + long headersSize = 0; + Headers headers = val.headers(); + if (headers != null) + for (Header h : headers) { + headersSize += h.value().length + h.key().getBytes(StandardCharsets.UTF_8).length; + } + return headersSize + val.serializedKeySize() + val.serializedValueSize(); + } +} diff --git a/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamTaskInstrumentation.java b/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamTaskInstrumentation.java index 12ee5fbf083f..871b3fdc2403 100644 --- a/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamTaskInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamTaskInstrumentation.java @@ -9,6 +9,7 @@ import static datadog.trace.core.datastreams.TagsProcessor.GROUP_TAG; import static datadog.trace.core.datastreams.TagsProcessor.TOPIC_TAG; import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG; +import static datadog.trace.instrumentation.kafka_clients.Utils.computePayloadSizeBytes; import static datadog.trace.instrumentation.kafka_streams.KafkaStreamsDecorator.BROKER_DECORATE; import static datadog.trace.instrumentation.kafka_streams.KafkaStreamsDecorator.CONSUMER_DECORATE; import static datadog.trace.instrumentation.kafka_streams.KafkaStreamsDecorator.KAFKA_CONSUME; @@ -32,14 +33,11 @@ import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import datadog.trace.instrumentation.kafka_clients.TracingIterableDelegator; -import java.nio.charset.StandardCharsets; import java.util.LinkedHashMap; import java.util.Map; import net.bytebuddy.asm.Advice; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.header.Header; -import org.apache.kafka.common.header.Headers; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; @@ -63,6 +61,7 @@ public String instrumentedType() { public String[] helperClassNames() { return new String[] { "datadog.trace.instrumentation.kafka_clients.TracingIterableDelegator", + "datadog.trace.instrumentation.kafka_clients.Utils", packageName + ".KafkaStreamsDecorator", packageName + ".ProcessorRecordContextVisitor", packageName + ".StampedRecordContextVisitor", @@ -268,16 +267,6 @@ public static void start( InstrumentationContext.get(StreamTask.class, StreamTaskContext.class) .put(task, streamTaskContext); } - - private static long computePayloadSizeBytes(ConsumerRecord val) { - long headersSize = 0; - Headers headers = val.headers(); - if (headers != null) - for (Header h : headers) { - headersSize += h.value().length + h.key().getBytes(StandardCharsets.UTF_8).length; - } - return headersSize + val.serializedKeySize() + val.serializedValueSize(); - } } /** Very similar to StartSpanAdvice, but with a different argument type for record. */ @@ -323,17 +312,17 @@ public static void start( sortedTags.put(TOPIC_TAG, record.topic()); sortedTags.put(TYPE_TAG, "kafka"); - // the type ProcessorRecordContext inherits from RecordMetadata starting with 2.7 - // we need access to that type to be able to get the payload size info - RecordMetadata metadata = (RecordMetadata) (Object) record; + long payloadSize = 0; + // we have to go through Object to get the RecordMetadata here because the class of `record` + // only implements it after 2.7 (and this class is only used if v >= 2.7) + if ((Object) record instanceof RecordMetadata) { // should always be true + RecordMetadata metadata = (RecordMetadata) (Object) record; + payloadSize = metadata.serializedKeySize() + metadata.serializedValueSize(); + } AgentTracer.get() .getDataStreamsMonitoring() - .setCheckpoint( - span, - sortedTags, - record.timestamp(), - metadata.serializedKeySize() + metadata.serializedValueSize()); + .setCheckpoint(span, sortedTags, record.timestamp(), payloadSize); } else { span = startSpan(KAFKA_CONSUME, null); }