From 5ac65d1ae8f54a364585fc89057a751db4e4f276 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Thu, 16 Nov 2023 17:31:45 +0100 Subject: [PATCH] change instrumented method --- .../KafkaProducerInstrumentation.java | 31 +++++++++---------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java index 84d2ac18792..03bae63881c 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java @@ -14,14 +14,13 @@ import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.TIME_IN_QUEUE_ENABLED; import static datadog.trace.instrumentation.kafka_clients.TextMapInjectAdapter.SETTER; import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isPrivate; import static net.bytebuddy.matcher.ElementMatchers.isPublic; -import static net.bytebuddy.matcher.ElementMatchers.isStatic; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.api.Config; -import datadog.trace.bootstrap.CallDepthThreadLocalMap; import datadog.trace.bootstrap.instrumentation.api.AgentScope; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import datadog.trace.bootstrap.instrumentation.api.AgentTracer; @@ -30,9 +29,9 @@ import net.bytebuddy.asm.Advice; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.record.AbstractRecords; import org.apache.kafka.common.record.RecordBatch; @AutoService(Instrumenter.class) @@ -68,14 +67,18 @@ public void adviceTransformations(AdviceTransformation transformation) { KafkaProducerInstrumentation.class.getName() + "$ProducerAdvice"); transformation.applyAdvice( - isMethod().and(isPublic()).and(isStatic()).and(named("estimateSizeInBytesUpperBound")), - KafkaProducerInstrumentation.class.getName() + "$EstimateSizeAdvice"); + isMethod() + .and(isPrivate()) + .and(takesArgument(0, int.class)) + .and(named("ensureValidRecordSize")), + KafkaProducerInstrumentation.class.getName() + "$PayloadSizeAdvice"); } public static class ProducerAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static AgentScope onEnter( + @Advice.This KafkaProducer producer, @Advice.FieldValue("apiVersions") final ApiVersions apiVersions, @Advice.FieldValue("producerConfig") ProducerConfig producerConfig, @Advice.Argument(value = 0, readOnly = false) ProducerRecord record, @@ -131,29 +134,23 @@ record = @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) public static void stopSpan( @Advice.Enter final AgentScope scope, @Advice.Thrown final Throwable throwable) { - // reset depth count for EstimateSizeAdvice - CallDepthThreadLocalMap.reset(AbstractRecords.class); - PRODUCER_DECORATE.onError(scope, throwable); PRODUCER_DECORATE.beforeFinish(scope); scope.close(); } } - public static class EstimateSizeAdvice { + public static class PayloadSizeAdvice { /** - * Instrumentation for the method AbstractRecords.estimateSizeInBytesUpperBound that is called - * as part of sending a kafka payload. This gives us an estimate of the payload size "for free", + * Instrumentation for the method KafkaProducer.ensureValidRecordSize that is called as part of + * sending a kafka payload. This gives us access to an estimate of the payload size "for free", * that we send as a metric. */ - @Advice.OnMethodExit(suppress = Throwable.class) - public static void onExit(@Advice.Return int estimatedPayloadSize) { - // prevent multiple calls - final int callDepth = CallDepthThreadLocalMap.incrementCallDepth(AbstractRecords.class); - if (callDepth > 0) return; - + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter(@Advice.Argument(value = 0) int estimatedPayloadSize) { final AgentSpan encompassingSpan = activeSpan(); + System.out.println("current span = " + encompassingSpan); if (encompassingSpan == null) return; String topic = encompassingSpan.getBaggageItem("kafka.topic");