From 3247dba537f42af37ec581aa9fba47368e33be52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Wed, 15 Nov 2023 18:51:05 +0100 Subject: [PATCH 1/8] add instrumentation to allow sending kafka payload size on produce --- .../KafkaProducerInstrumentation.java | 49 ++++++++++++++++--- .../DataStreamContextInjector.java | 23 +++++++-- .../core/propagation/CorePropagation.java | 5 ++ .../instrumentation/api/AgentPropagation.java | 3 ++ .../instrumentation/api/AgentTracer.java | 3 ++ 5 files changed, 73 insertions(+), 10 deletions(-) diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java index 62c74489201..54b82aa85ba 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java @@ -15,13 +15,16 @@ import static datadog.trace.instrumentation.kafka_clients.TextMapInjectAdapter.SETTER; import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.isStatic; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.api.Config; +import datadog.trace.bootstrap.CallDepthThreadLocalMap; import datadog.trace.bootstrap.instrumentation.api.AgentScope; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags; import java.util.LinkedHashMap; import net.bytebuddy.asm.Advice; @@ -29,6 +32,7 @@ import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.record.AbstractRecords; import org.apache.kafka.common.record.RecordBatch; @AutoService(Instrumenter.class) @@ -62,6 +66,10 @@ public void adviceTransformations(AdviceTransformation transformation) { .and(takesArgument(0, named("org.apache.kafka.clients.producer.ProducerRecord"))) .and(takesArgument(1, named("org.apache.kafka.clients.producer.Callback"))), KafkaProducerInstrumentation.class.getName() + "$ProducerAdvice"); + + transformation.applyAdvice( + isMethod().and(isPublic()).and(isStatic()).and(named("estimateSizeInBytesUpperBound")), + KafkaProducerInstrumentation.class.getName() + "$EstimateSizeAdvice"); } public static class ProducerAdvice { @@ -82,6 +90,8 @@ public static AgentScope onEnter( if (record.value() == null) { span.setTag(InstrumentationTags.TOMBSTONE, true); } + // save the topic for EstimateSizeAdvice to read it. + span.setBaggageItem("kafka.topic", record.topic()); // Do not inject headers for batch versions below 2 // This is how similar check is being done in Kafka client itself: @@ -93,13 +103,9 @@ public static AgentScope onEnter( if (apiVersions.maxUsableProduceMagic() >= RecordBatch.MAGIC_VALUE_V2 && Config.get().isKafkaClientPropagationEnabled() && !Config.get().isKafkaClientPropagationDisabledForTopic(record.topic())) { - LinkedHashMap sortedTags = new LinkedHashMap<>(); - sortedTags.put(DIRECTION_TAG, DIRECTION_OUT); - sortedTags.put(TOPIC_TAG, record.topic()); - sortedTags.put(TYPE_TAG, "kafka"); try { propagate().inject(span, record.headers(), SETTER); - propagate().injectPathwayContext(span, record.headers(), SETTER, sortedTags); + propagate().injectPathwayContext(span, record.headers(), SETTER); } catch (final IllegalStateException e) { // headers must be read-only from reused record. try again with new one. record = @@ -112,7 +118,7 @@ record = record.headers()); propagate().inject(span, record.headers(), SETTER); - propagate().injectPathwayContext(span, record.headers(), SETTER, sortedTags); + propagate().injectPathwayContext(span, record.headers(), SETTER); } if (TIME_IN_QUEUE_ENABLED) { SETTER.injectTimeInQueue(record.headers()); @@ -130,4 +136,35 @@ public static void stopSpan( scope.close(); } } + + public static class EstimateSizeAdvice { + + /** + * Instrumentation for the method AbstractRecords.estimateSizeInBytesUpperBound that is called + * as part of sending a kafka payload. This gives us an estimate of the payload size "for free", + * that we send as a metric. + */ + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit(@Advice.Return int estimatedPayloadSize) { + // prevent multiple calls + final int callDepth = CallDepthThreadLocalMap.incrementCallDepth(AbstractRecords.class); + if (callDepth > 0) return; + + final AgentSpan encompassingSpan = activeSpan(); + if (encompassingSpan == null) return; + + String topic = encompassingSpan.getBaggageItem("kafka.topic"); + // if not available, the call is not coming from where we expect it. + if (topic == null) return; + + LinkedHashMap sortedTags = new LinkedHashMap<>(); + sortedTags.put(DIRECTION_TAG, DIRECTION_OUT); + sortedTags.put(TOPIC_TAG, topic); + sortedTags.put(TYPE_TAG, "kafka"); + + AgentTracer.get() + .getDataStreamsMonitoring() + .setCheckpoint(encompassingSpan, sortedTags, 0, estimatedPayloadSize); + } + } } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextInjector.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextInjector.java index 838c50159e6..b2be01f8353 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextInjector.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextInjector.java @@ -25,13 +25,28 @@ public void injectPathwayContext( AgentPropagation.Setter setter, LinkedHashMap sortedTags) { PathwayContext pathwayContext = span.context().getPathwayContext(); + if (shouldInject(pathwayContext, span)) { + pathwayContext.setCheckpoint(sortedTags, dataStreamsMonitoring::add); + doInject(pathwayContext, span, carrier, setter); + } + } - if (pathwayContext == null - || (span.traceConfig() != null && !span.traceConfig().isDataStreamsEnabled())) { - return; + public void injectPathwayContext( + AgentSpan span, C carrier, AgentPropagation.Setter setter) { + PathwayContext pathwayContext = span.context().getPathwayContext(); + if (shouldInject(pathwayContext, span)) { + // Checkpoint is not set here. Should be done manually later one. + doInject(pathwayContext, span, carrier, setter); } - pathwayContext.setCheckpoint(sortedTags, dataStreamsMonitoring::add); + } + + private boolean shouldInject(PathwayContext pathwayContext, AgentSpan span) { + return !(pathwayContext == null + || (span.traceConfig() != null && !span.traceConfig().isDataStreamsEnabled())); + } + private static void doInject( + PathwayContext pathwayContext, AgentSpan span, C carrier, AgentPropagation.Setter setter) { boolean injected = setter instanceof AgentPropagation.BinarySetter ? injectBinaryPathwayContext( diff --git a/dd-trace-core/src/main/java/datadog/trace/core/propagation/CorePropagation.java b/dd-trace-core/src/main/java/datadog/trace/core/propagation/CorePropagation.java index 8453e20446d..bd1750ece37 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/propagation/CorePropagation.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/propagation/CorePropagation.java @@ -71,6 +71,11 @@ public void injectPathwayContext( this.dataStreamContextInjector.injectPathwayContext(span, carrier, setter, sortedTags); } + @Override + public void injectPathwayContext(AgentSpan span, C carrier, Setter setter) { + this.dataStreamContextInjector.injectPathwayContext(span, carrier, setter); + } + @Override public AgentSpan.Context.Extracted extract(final C carrier, final ContextVisitor getter) { return extractor.extract(carrier, getter); diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentPropagation.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentPropagation.java index 285e6ef4763..e359ca75420 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentPropagation.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentPropagation.java @@ -14,6 +14,9 @@ public interface AgentPropagation { void injectPathwayContext( AgentSpan span, C carrier, Setter setter, LinkedHashMap sortedTags); + /** This method does not set a checkpoint */ + void injectPathwayContext(AgentSpan span, C carrier, Setter setter); + interface Setter { void set(C carrier, String key, String value); } diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java index 000bc2c7235..8c15a91f6b9 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java @@ -867,6 +867,9 @@ public void inject( public void injectPathwayContext( AgentSpan span, C carrier, Setter setter, LinkedHashMap sortedTags) {} + @Override + public void injectPathwayContext(AgentSpan span, C carrier, Setter setter) {} + @Override public Context.Extracted extract(final C carrier, final ContextVisitor getter) { return NoopContext.INSTANCE; From 619cb0fa33807a221ba60a8a58ea7c58bf57c78f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Wed, 15 Nov 2023 19:04:33 +0100 Subject: [PATCH 2/8] typo --- .../trace/core/datastreams/DataStreamContextInjector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextInjector.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextInjector.java index b2be01f8353..5d3dd8aeb6a 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextInjector.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextInjector.java @@ -35,7 +35,7 @@ public void injectPathwayContext( AgentSpan span, C carrier, AgentPropagation.Setter setter) { PathwayContext pathwayContext = span.context().getPathwayContext(); if (shouldInject(pathwayContext, span)) { - // Checkpoint is not set here. Should be done manually later one. + // Checkpoint is not set here. Should be done manually later on. doInject(pathwayContext, span, carrier, setter); } } From 46e86513542af13fd20a08d35554f706e96c267c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Thu, 16 Nov 2023 14:19:46 +0100 Subject: [PATCH 3/8] fix depth count --- .../kafka_clients/KafkaProducerInstrumentation.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java index 54b82aa85ba..84d2ac18792 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java @@ -131,6 +131,9 @@ record = @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) public static void stopSpan( @Advice.Enter final AgentScope scope, @Advice.Thrown final Throwable throwable) { + // reset depth count for EstimateSizeAdvice + CallDepthThreadLocalMap.reset(AbstractRecords.class); + PRODUCER_DECORATE.onError(scope, throwable); PRODUCER_DECORATE.beforeFinish(scope); scope.close(); From 5ac65d1ae8f54a364585fc89057a751db4e4f276 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Thu, 16 Nov 2023 17:31:45 +0100 Subject: [PATCH 4/8] change instrumented method --- .../KafkaProducerInstrumentation.java | 31 +++++++++---------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java index 84d2ac18792..03bae63881c 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java @@ -14,14 +14,13 @@ import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.TIME_IN_QUEUE_ENABLED; import static datadog.trace.instrumentation.kafka_clients.TextMapInjectAdapter.SETTER; import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isPrivate; import static net.bytebuddy.matcher.ElementMatchers.isPublic; -import static net.bytebuddy.matcher.ElementMatchers.isStatic; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.api.Config; -import datadog.trace.bootstrap.CallDepthThreadLocalMap; import datadog.trace.bootstrap.instrumentation.api.AgentScope; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import datadog.trace.bootstrap.instrumentation.api.AgentTracer; @@ -30,9 +29,9 @@ import net.bytebuddy.asm.Advice; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.record.AbstractRecords; import org.apache.kafka.common.record.RecordBatch; @AutoService(Instrumenter.class) @@ -68,14 +67,18 @@ public void adviceTransformations(AdviceTransformation transformation) { KafkaProducerInstrumentation.class.getName() + "$ProducerAdvice"); transformation.applyAdvice( - isMethod().and(isPublic()).and(isStatic()).and(named("estimateSizeInBytesUpperBound")), - KafkaProducerInstrumentation.class.getName() + "$EstimateSizeAdvice"); + isMethod() + .and(isPrivate()) + .and(takesArgument(0, int.class)) + .and(named("ensureValidRecordSize")), + KafkaProducerInstrumentation.class.getName() + "$PayloadSizeAdvice"); } public static class ProducerAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static AgentScope onEnter( + @Advice.This KafkaProducer producer, @Advice.FieldValue("apiVersions") final ApiVersions apiVersions, @Advice.FieldValue("producerConfig") ProducerConfig producerConfig, @Advice.Argument(value = 0, readOnly = false) ProducerRecord record, @@ -131,29 +134,23 @@ record = @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) public static void stopSpan( @Advice.Enter final AgentScope scope, @Advice.Thrown final Throwable throwable) { - // reset depth count for EstimateSizeAdvice - CallDepthThreadLocalMap.reset(AbstractRecords.class); - PRODUCER_DECORATE.onError(scope, throwable); PRODUCER_DECORATE.beforeFinish(scope); scope.close(); } } - public static class EstimateSizeAdvice { + public static class PayloadSizeAdvice { /** - * Instrumentation for the method AbstractRecords.estimateSizeInBytesUpperBound that is called - * as part of sending a kafka payload. This gives us an estimate of the payload size "for free", + * Instrumentation for the method KafkaProducer.ensureValidRecordSize that is called as part of + * sending a kafka payload. This gives us access to an estimate of the payload size "for free", * that we send as a metric. */ - @Advice.OnMethodExit(suppress = Throwable.class) - public static void onExit(@Advice.Return int estimatedPayloadSize) { - // prevent multiple calls - final int callDepth = CallDepthThreadLocalMap.incrementCallDepth(AbstractRecords.class); - if (callDepth > 0) return; - + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter(@Advice.Argument(value = 0) int estimatedPayloadSize) { final AgentSpan encompassingSpan = activeSpan(); + System.out.println("current span = " + encompassingSpan); if (encompassingSpan == null) return; String topic = encompassingSpan.getBaggageItem("kafka.topic"); From d9f1025080aec027d31c009237174e896a142a8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Fri, 17 Nov 2023 14:10:45 +0100 Subject: [PATCH 5/8] do something that works --- .../KafkaProducerInstrumentation.java | 46 ++++++++++--------- .../DataStreamContextInjector.java | 26 +++++++---- .../datastreams/DefaultPathwayContext.java | 13 +++++- .../core/propagation/CorePropagation.java | 6 ++- .../instrumentation/api/AgentPropagation.java | 4 +- .../instrumentation/api/AgentTracer.java | 11 ++++- .../instrumentation/api/PathwayContext.java | 4 ++ 7 files changed, 74 insertions(+), 36 deletions(-) diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java index 03bae63881c..ce37a96008d 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java @@ -25,11 +25,11 @@ import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags; +import datadog.trace.bootstrap.instrumentation.api.StatsPoint; import java.util.LinkedHashMap; import net.bytebuddy.asm.Advice; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.record.RecordBatch; @@ -78,7 +78,6 @@ public static class ProducerAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static AgentScope onEnter( - @Advice.This KafkaProducer producer, @Advice.FieldValue("apiVersions") final ApiVersions apiVersions, @Advice.FieldValue("producerConfig") ProducerConfig producerConfig, @Advice.Argument(value = 0, readOnly = false) ProducerRecord record, @@ -93,8 +92,6 @@ public static AgentScope onEnter( if (record.value() == null) { span.setTag(InstrumentationTags.TOMBSTONE, true); } - // save the topic for EstimateSizeAdvice to read it. - span.setBaggageItem("kafka.topic", record.topic()); // Do not inject headers for batch versions below 2 // This is how similar check is being done in Kafka client itself: @@ -106,9 +103,14 @@ public static AgentScope onEnter( if (apiVersions.maxUsableProduceMagic() >= RecordBatch.MAGIC_VALUE_V2 && Config.get().isKafkaClientPropagationEnabled() && !Config.get().isKafkaClientPropagationDisabledForTopic(record.topic())) { + LinkedHashMap sortedTags = new LinkedHashMap<>(); + sortedTags.put(DIRECTION_TAG, DIRECTION_OUT); + sortedTags.put(TOPIC_TAG, record.topic()); + sortedTags.put(TYPE_TAG, "kafka"); try { propagate().inject(span, record.headers(), SETTER); - propagate().injectPathwayContext(span, record.headers(), SETTER); + propagate() + .injectPathwayContextWithoutSendingStats(span, record.headers(), SETTER, sortedTags); } catch (final IllegalStateException e) { // headers must be read-only from reused record. try again with new one. record = @@ -121,7 +123,8 @@ record = record.headers()); propagate().inject(span, record.headers(), SETTER); - propagate().injectPathwayContext(span, record.headers(), SETTER); + propagate() + .injectPathwayContextWithoutSendingStats(span, record.headers(), SETTER, sortedTags); } if (TIME_IN_QUEUE_ENABLED) { SETTER.injectTimeInQueue(record.headers()); @@ -149,22 +152,21 @@ public static class PayloadSizeAdvice { */ @Advice.OnMethodEnter(suppress = Throwable.class) public static void onEnter(@Advice.Argument(value = 0) int estimatedPayloadSize) { - final AgentSpan encompassingSpan = activeSpan(); - System.out.println("current span = " + encompassingSpan); - if (encompassingSpan == null) return; - - String topic = encompassingSpan.getBaggageItem("kafka.topic"); - // if not available, the call is not coming from where we expect it. - if (topic == null) return; - - LinkedHashMap sortedTags = new LinkedHashMap<>(); - sortedTags.put(DIRECTION_TAG, DIRECTION_OUT); - sortedTags.put(TOPIC_TAG, topic); - sortedTags.put(TYPE_TAG, "kafka"); - - AgentTracer.get() - .getDataStreamsMonitoring() - .setCheckpoint(encompassingSpan, sortedTags, 0, estimatedPayloadSize); + StatsPoint saved = activeSpan().context().getPathwayContext().getSavedStats(); + if (saved != null) { + // create new stats including the payload size + StatsPoint updated = + new StatsPoint( + saved.getEdgeTags(), + saved.getHash(), + saved.getParentHash(), + saved.getTimestampNanos(), + saved.getPathwayLatencyNano(), + saved.getEdgeLatencyNano(), + estimatedPayloadSize); + // then send the point + AgentTracer.get().getDataStreamsMonitoring().add(updated); + } } } } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextInjector.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextInjector.java index 5d3dd8aeb6a..5222733e900 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextInjector.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextInjector.java @@ -24,18 +24,28 @@ public void injectPathwayContext( C carrier, AgentPropagation.Setter setter, LinkedHashMap sortedTags) { - PathwayContext pathwayContext = span.context().getPathwayContext(); - if (shouldInject(pathwayContext, span)) { - pathwayContext.setCheckpoint(sortedTags, dataStreamsMonitoring::add); - doInject(pathwayContext, span, carrier, setter); - } + injectPathwayContext(span, carrier, setter, sortedTags, true); } - public void injectPathwayContext( - AgentSpan span, C carrier, AgentPropagation.Setter setter) { + /** Same as injectPathwayContext, but the stats collected in the StatsPoint are not sent. */ + public void injectPathwayContextWithoutSendingStats( + AgentSpan span, + C carrier, + AgentPropagation.Setter setter, + LinkedHashMap sortedTags) { + injectPathwayContext(span, carrier, setter, sortedTags, false); + } + + private void injectPathwayContext( + AgentSpan span, + C carrier, + AgentPropagation.Setter setter, + LinkedHashMap sortedTags, + boolean sendCheckpoint) { PathwayContext pathwayContext = span.context().getPathwayContext(); if (shouldInject(pathwayContext, span)) { - // Checkpoint is not set here. Should be done manually later on. + pathwayContext.setCheckpoint( + sortedTags, sendCheckpoint ? dataStreamsMonitoring::add : pathwayContext::saveStats); doInject(pathwayContext, span, carrier, setter); } } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java index 7e4beb835fc..340af2c5596 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java @@ -44,6 +44,7 @@ public class DefaultPathwayContext implements PathwayContext { private long pathwayStartNanos; private long pathwayStartNanoTicks; private long edgeStartNanoTicks; + private StatsPoint savedStats; private long hash; private boolean started; // state variables used to memoize the pathway hash with @@ -174,7 +175,7 @@ public void setCheckpoint( allTags, newHash, hash, - timeSource.getCurrentTimeNanos(), + startNanos, pathwayLatencyNano, edgeLatencyNano, payloadSizeBytes); @@ -188,6 +189,16 @@ public void setCheckpoint( } } + @Override + public void saveStats(StatsPoint point) { + this.savedStats = point; + } + + @Override + public StatsPoint getSavedStats() { + return this.savedStats; + } + @Override public byte[] encode() throws IOException { lock.lock(); diff --git a/dd-trace-core/src/main/java/datadog/trace/core/propagation/CorePropagation.java b/dd-trace-core/src/main/java/datadog/trace/core/propagation/CorePropagation.java index bd1750ece37..5ab4b21da27 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/propagation/CorePropagation.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/propagation/CorePropagation.java @@ -72,8 +72,10 @@ public void injectPathwayContext( } @Override - public void injectPathwayContext(AgentSpan span, C carrier, Setter setter) { - this.dataStreamContextInjector.injectPathwayContext(span, carrier, setter); + public void injectPathwayContextWithoutSendingStats( + AgentSpan span, C carrier, Setter setter, LinkedHashMap sortedTags) { + this.dataStreamContextInjector.injectPathwayContextWithoutSendingStats( + span, carrier, setter, sortedTags); } @Override diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentPropagation.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentPropagation.java index e359ca75420..f404ccfd233 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentPropagation.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentPropagation.java @@ -14,8 +14,8 @@ public interface AgentPropagation { void injectPathwayContext( AgentSpan span, C carrier, Setter setter, LinkedHashMap sortedTags); - /** This method does not set a checkpoint */ - void injectPathwayContext(AgentSpan span, C carrier, Setter setter); + void injectPathwayContextWithoutSendingStats( + AgentSpan span, C carrier, Setter setter, LinkedHashMap sortedTags); interface Setter { void set(C carrier, String key, String value); diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java index 8c15a91f6b9..038237eeb9c 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java @@ -868,7 +868,8 @@ public void injectPathwayContext( AgentSpan span, C carrier, Setter setter, LinkedHashMap sortedTags) {} @Override - public void injectPathwayContext(AgentSpan span, C carrier, Setter setter) {} + public void injectPathwayContextWithoutSendingStats( + AgentSpan span, C carrier, Setter setter, LinkedHashMap sortedTags) {} @Override public Context.Extracted extract(final C carrier, final ContextVisitor getter) { @@ -1085,6 +1086,14 @@ public void setCheckpoint( public void setCheckpoint( LinkedHashMap sortedTags, Consumer pointConsumer) {} + @Override + public void saveStats(StatsPoint point) {} + + @Override + public StatsPoint getSavedStats() { + return null; + } + @Override public byte[] encode() { return null; diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/PathwayContext.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/PathwayContext.java index 0419c6f7844..b9cd729fc19 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/PathwayContext.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/PathwayContext.java @@ -27,6 +27,10 @@ void setCheckpoint( // The input tags should be sorted. void setCheckpoint(LinkedHashMap sortedTags, Consumer pointConsumer); + void saveStats(StatsPoint point); + + StatsPoint getSavedStats(); + byte[] encode() throws IOException; String strEncode() throws IOException; From 881aa65b0674d80ad01579dfc27bd3e5373fe89b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Fri, 17 Nov 2023 14:14:48 +0100 Subject: [PATCH 6/8] remove factorization that was done on injection --- .../datastreams/DataStreamContextInjector.java | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextInjector.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextInjector.java index 5222733e900..ac79aca6c54 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextInjector.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextInjector.java @@ -43,20 +43,12 @@ private void injectPathwayContext( LinkedHashMap sortedTags, boolean sendCheckpoint) { PathwayContext pathwayContext = span.context().getPathwayContext(); - if (shouldInject(pathwayContext, span)) { - pathwayContext.setCheckpoint( - sortedTags, sendCheckpoint ? dataStreamsMonitoring::add : pathwayContext::saveStats); - doInject(pathwayContext, span, carrier, setter); + if (pathwayContext == null + || (span.traceConfig() != null && !span.traceConfig().isDataStreamsEnabled())) { + return; } - } - - private boolean shouldInject(PathwayContext pathwayContext, AgentSpan span) { - return !(pathwayContext == null - || (span.traceConfig() != null && !span.traceConfig().isDataStreamsEnabled())); - } - - private static void doInject( - PathwayContext pathwayContext, AgentSpan span, C carrier, AgentPropagation.Setter setter) { + pathwayContext.setCheckpoint( + sortedTags, sendCheckpoint ? dataStreamsMonitoring::add : pathwayContext::saveStats); boolean injected = setter instanceof AgentPropagation.BinarySetter ? injectBinaryPathwayContext( From 473c660b015e0d294f587bcb1fe015fee10f4e4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Tue, 19 Dec 2023 16:58:40 +0100 Subject: [PATCH 7/8] add tests and comments --- .../src/latestDepTest/groovy/KafkaClientTest.groovy | 6 ++++++ .../kafka_clients/KafkaProducerInstrumentation.java | 6 +++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/latestDepTest/groovy/KafkaClientTest.groovy b/dd-java-agent/instrumentation/kafka-clients-0.11/src/latestDepTest/groovy/KafkaClientTest.groovy index eafe94a23db..6aa13bbd17a 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/latestDepTest/groovy/KafkaClientTest.groovy +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/latestDepTest/groovy/KafkaClientTest.groovy @@ -182,6 +182,7 @@ class KafkaClientTest extends AgentTestRunner { "type:kafka" ] edgeTags.size() == 4 + payloadSize.minValue > 0.0 } StatsGroup second = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == first.hash } @@ -194,6 +195,7 @@ class KafkaClientTest extends AgentTestRunner { "type:kafka" ] edgeTags.size() == 5 + payloadSize.minValue > 0.0 } cleanup: @@ -321,6 +323,7 @@ class KafkaClientTest extends AgentTestRunner { "type:kafka" ] edgeTags.size() == 4 + payloadSize.minValue > 0.0 } StatsGroup second = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == first.hash } @@ -333,6 +336,7 @@ class KafkaClientTest extends AgentTestRunner { "type:kafka" ] edgeTags.size() == 5 + payloadSize.minValue > 0.0 } cleanup: @@ -981,6 +985,7 @@ class KafkaClientTest extends AgentTestRunner { "type:kafka" ] edgeTags.size() == 4 + payloadSize.minValue > 0.0 } StatsGroup second = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == first.hash } @@ -993,6 +998,7 @@ class KafkaClientTest extends AgentTestRunner { "type:kafka" ] edgeTags.size() == 5 + payloadSize.minValue > 0.0 } cleanup: diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java index 7e534ccd7b4..7603a6d1af4 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java @@ -83,7 +83,8 @@ public void adviceTransformations(AdviceTransformation transformation) { isMethod() .and(isPrivate()) .and(takesArgument(0, int.class)) - .and(named("ensureValidRecordSize")), + .and(named("ensureValidRecordSize")), // intercepting this call allows us to see the + // estimated message size KafkaProducerInstrumentation.class.getName() + "$PayloadSizeAdvice"); } @@ -130,6 +131,9 @@ public static AgentScope onEnter( try { propagate().inject(span, record.headers(), SETTER); if (STREAMING_CONTEXT.empty() || STREAMING_CONTEXT.isSinkTopic(record.topic())) { + // inject the context in the headers, but delay sending the stats until we know the + // message size. + // The stats are saved in the pathway context and sent in PayloadSizeAdvice. propagate() .injectPathwayContextWithoutSendingStats( span, record.headers(), SETTER, sortedTags); From 60e046dffa4645033c9f9df2d14c60c1a82f1c6b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Thu, 28 Dec 2023 13:01:20 +0100 Subject: [PATCH 8/8] fix parameter --- .../trace/core/datastreams/DataStreamContextInjector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextInjector.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextInjector.java index 8963ff7b54f..d354a72c457 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextInjector.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextInjector.java @@ -35,7 +35,7 @@ public void injectPathwayContext( long defaultTimestamp, long payloadSizeBytes) { injectPathwayContext( - span, carrier, setter, sortedTags, defaultTimestamp, payloadSizeBytes, false); + span, carrier, setter, sortedTags, defaultTimestamp, payloadSizeBytes, true); } /** Same as injectPathwayContext, but the stats collected in the StatsPoint are not sent. */