From ba30c88cdc1e92bbb91b412cc3040ec3e31813bc Mon Sep 17 00:00:00 2001 From: Igor Kravchenko <21974069+kr-igor@users.noreply.github.com> Date: Thu, 17 Oct 2024 15:38:34 -0500 Subject: [PATCH 01/14] Support service name overrides for DSM --- .../KafkaProducerInstrumentation.java | 3 +- .../kafka_clients38/PayloadSizeAdvice.java | 3 +- .../RecordingDatastreamsPayloadWriter.groovy | 2 +- .../DataStreamContextExtractor.java | 11 ++- .../datastreams/DatastreamsPayloadWriter.java | 2 +- .../DefaultDataStreamsMonitoring.java | 74 ++++++++++++++----- .../datastreams/DefaultPathwayContext.java | 70 ++++++++++++------ .../MsgPackDatastreamsPayloadWriter.java | 8 +- .../datastreams/DataStreamsWritingTest.groovy | 66 +++++++++++++++-- .../DefaultDataStreamsMonitoringTest.groovy | 52 ++++++------- .../groovy/DataStreamsIntegrationTest.groovy | 2 +- .../api/AgentDataStreamsMonitoring.java | 16 ++++ .../instrumentation/api/AgentTracer.java | 6 ++ .../instrumentation/api/Backlog.java | 9 ++- .../instrumentation/api/StatsPoint.java | 9 ++- 15 files changed, 248 insertions(+), 85 deletions(-) diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java index 4b09bd46af1..6366973f63e 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java @@ -200,7 +200,8 @@ public static void onEnter(@Advice.Argument(value = 0) int estimatedPayloadSize) saved.getTimestampNanos(), saved.getPathwayLatencyNano(), saved.getEdgeLatencyNano(), - estimatedPayloadSize); + estimatedPayloadSize, + saved.getServiceNameOverride()); // then send the point AgentTracer.get().getDataStreamsMonitoring().add(updated); } diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/PayloadSizeAdvice.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/PayloadSizeAdvice.java index 47a2e6d0201..6192485a47e 100644 --- a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/PayloadSizeAdvice.java +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/PayloadSizeAdvice.java @@ -27,7 +27,8 @@ public static void onEnter(@Advice.Argument(value = 0) int estimatedPayloadSize) saved.getTimestampNanos(), saved.getPathwayLatencyNano(), saved.getEdgeLatencyNano(), - estimatedPayloadSize); + estimatedPayloadSize, + saved.getServiceNameOverride()); // then send the point AgentTracer.get().getDataStreamsMonitoring().add(updated); } diff --git a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/datastreams/RecordingDatastreamsPayloadWriter.groovy b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/datastreams/RecordingDatastreamsPayloadWriter.groovy index 4ddb26b51af..396985b24cd 100644 --- a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/datastreams/RecordingDatastreamsPayloadWriter.groovy +++ b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/datastreams/RecordingDatastreamsPayloadWriter.groovy @@ -19,7 +19,7 @@ class RecordingDatastreamsPayloadWriter implements DatastreamsPayloadWriter { private final Set backlogs = [] @Override - synchronized void writePayload(Collection data) { + synchronized void writePayload(Collection data, String serviceNameOverride) { log.info("payload written - {}", data) this.@payloads.addAll(data) data.each { this.@groups.addAll(it.groups) } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextExtractor.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextExtractor.java index f97989b8798..f831d1cf10d 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextExtractor.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextExtractor.java @@ -12,16 +12,19 @@ public class DataStreamContextExtractor implements HttpCodec.Extractor { private final TimeSource timeSource; private final Supplier traceConfigSupplier; private final long hashOfKnownTags; + private final String serviceNameOverride; public DataStreamContextExtractor( HttpCodec.Extractor delegate, TimeSource timeSource, Supplier traceConfigSupplier, - long hashOfKnownTags) { + long hashOfKnownTags, + String serviceNameOverride) { this.delegate = delegate; this.timeSource = timeSource; this.traceConfigSupplier = traceConfigSupplier; this.hashOfKnownTags = hashOfKnownTags; + this.serviceNameOverride = serviceNameOverride; } @Override @@ -37,7 +40,8 @@ public TagContext extract(C carrier, AgentPropagation.ContextVisitor gett if (shouldExtractPathwayContext) { DefaultPathwayContext pathwayContext = - DefaultPathwayContext.extract(carrier, getter, this.timeSource, this.hashOfKnownTags); + DefaultPathwayContext.extract( + carrier, getter, this.timeSource, this.hashOfKnownTags, serviceNameOverride); extracted.withPathwayContext(pathwayContext); } @@ -45,7 +49,8 @@ public TagContext extract(C carrier, AgentPropagation.ContextVisitor gett return extracted; } else if (traceConfigSupplier.get().isDataStreamsEnabled()) { DefaultPathwayContext pathwayContext = - DefaultPathwayContext.extract(carrier, getter, this.timeSource, this.hashOfKnownTags); + DefaultPathwayContext.extract( + carrier, getter, this.timeSource, this.hashOfKnownTags, serviceNameOverride); if (pathwayContext != null) { extracted = new TagContext(); diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DatastreamsPayloadWriter.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DatastreamsPayloadWriter.java index 07d040dd9ac..b0ddf1f9870 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DatastreamsPayloadWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DatastreamsPayloadWriter.java @@ -3,5 +3,5 @@ import java.util.Collection; public interface DatastreamsPayloadWriter { - void writePayload(Collection data); + void writePayload(Collection data, String serviceNameOverride); } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java index 9df53fe08b5..fbc3e0e609f 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java @@ -39,6 +39,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -54,11 +55,11 @@ public class DefaultDataStreamsMonitoring implements DataStreamsMonitoring, Even static final long FEATURE_CHECK_INTERVAL_NANOS = TimeUnit.MINUTES.toNanos(5); private static final StatsPoint REPORT = - new StatsPoint(Collections.emptyList(), 0, 0, 0, 0, 0, 0, 0); + new StatsPoint(Collections.emptyList(), 0, 0, 0, 0, 0, 0, 0, null); private static final StatsPoint POISON_PILL = - new StatsPoint(Collections.emptyList(), 0, 0, 0, 0, 0, 0, 0); + new StatsPoint(Collections.emptyList(), 0, 0, 0, 0, 0, 0, 0, null); - private final Map timeToBucket = new HashMap<>(); + private final Map> timeToBucket = new HashMap<>(); private final MpscArrayQueue inbox = new MpscArrayQueue<>(1024); private final DatastreamsPayloadWriter payloadWriter; private final DDAgentFeaturesDiscovery features; @@ -74,6 +75,8 @@ public class DefaultDataStreamsMonitoring implements DataStreamsMonitoring, Even private volatile boolean agentSupportsDataStreams = false; private volatile boolean configSupportsDataStreams = false; private final ConcurrentHashMap schemaSamplers; + private static final ConcurrentHashMap threadServiceNames = + new ConcurrentHashMap<>(); public DefaultDataStreamsMonitoring( Config config, @@ -184,10 +187,24 @@ public void setProduceCheckpoint(String type, String target) { setProduceCheckpoint(type, target, DataStreamsContextCarrier.NoOp.INSTANCE, false); } + @Override + public void setThreadServiceName(Long threadId, String serviceName) { + threadServiceNames.put(threadId, serviceName); + } + + @Override + public void clearThreadServiceName(Long threadId) { + threadServiceNames.remove(threadId); + } + + private static String getThreadServiceNameOverride() { + return threadServiceNames.getOrDefault(Thread.currentThread().getId(), null); + } + @Override public PathwayContext newPathwayContext() { if (configSupportsDataStreams) { - return new DefaultPathwayContext(timeSource, hashOfKnownTags); + return new DefaultPathwayContext(timeSource, hashOfKnownTags, getThreadServiceNameOverride()); } else { return AgentTracer.NoopPathwayContext.INSTANCE; } @@ -196,7 +213,7 @@ public PathwayContext newPathwayContext() { @Override public HttpCodec.Extractor extractor(HttpCodec.Extractor delegate) { return new DataStreamContextExtractor( - delegate, timeSource, traceConfigSupplier, hashOfKnownTags); + delegate, timeSource, traceConfigSupplier, hashOfKnownTags, getThreadServiceNameOverride()); } @Override @@ -212,7 +229,8 @@ public void mergePathwayContextIntoSpan(AgentSpan span, DataStreamsContextCarrie carrier, DataStreamsContextCarrierAdapter.INSTANCE, this.timeSource, - this.hashOfKnownTags); + this.hashOfKnownTags, + getThreadServiceNameOverride()); ((DDSpan) span).context().mergePathwayContext(pathwayContext); } } @@ -226,7 +244,8 @@ public void trackBacklog(LinkedHashMap sortedTags, long value) { } tags.add(tag); } - inbox.offer(new Backlog(tags, value, timeSource.getCurrentTimeNanos())); + inbox.offer( + new Backlog(tags, value, timeSource.getCurrentTimeNanos(), getThreadServiceNameOverride())); } @Override @@ -308,6 +327,15 @@ public void close() { } private class InboxProcessor implements Runnable { + + private StatsBucket getStatsBucket(final long timestamp, final String serviceNameOverride) { + long bucket = currentBucket(timestamp); + Map statsBucketMap = + timeToBucket.computeIfAbsent(bucket, startTime -> new HashMap<>(1)); + return statsBucketMap.computeIfAbsent( + serviceNameOverride, s -> new StatsBucket(bucket, bucketDurationNanos)); + } + @Override public void run() { Thread currentThread = Thread.currentThread(); @@ -335,17 +363,14 @@ public void run() { } else if (supportsDataStreams) { if (payload instanceof StatsPoint) { StatsPoint statsPoint = (StatsPoint) payload; - Long bucket = currentBucket(statsPoint.getTimestampNanos()); StatsBucket statsBucket = - timeToBucket.computeIfAbsent( - bucket, startTime -> new StatsBucket(startTime, bucketDurationNanos)); + getStatsBucket( + statsPoint.getTimestampNanos(), statsPoint.getServiceNameOverride()); statsBucket.addPoint(statsPoint); } else if (payload instanceof Backlog) { Backlog backlog = (Backlog) payload; - Long bucket = currentBucket(backlog.getTimestampNanos()); StatsBucket statsBucket = - timeToBucket.computeIfAbsent( - bucket, startTime -> new StatsBucket(startTime, bucketDurationNanos)); + getStatsBucket(backlog.getTimestampNanos(), backlog.getServiceNameOverride()); statsBucket.addBacklog(backlog); } } @@ -363,21 +388,30 @@ private long currentBucket(long timestampNanos) { private void flush(long timestampNanos) { long currentBucket = currentBucket(timestampNanos); - List includedBuckets = new ArrayList<>(); - Iterator> mapIterator = timeToBucket.entrySet().iterator(); + // stats are grouped by time buckets and service names + Map> includedBuckets = new HashMap<>(); + Iterator>> mapIterator = + timeToBucket.entrySet().iterator(); while (mapIterator.hasNext()) { - Map.Entry entry = mapIterator.next(); - + Map.Entry> entry = mapIterator.next(); if (entry.getKey() < currentBucket) { mapIterator.remove(); - includedBuckets.add(entry.getValue()); + for (Map.Entry buckets : entry.getValue().entrySet()) { + if (!includedBuckets.containsKey(buckets.getKey())) { + includedBuckets.put(buckets.getKey(), new LinkedList<>()); + } + + includedBuckets.get(buckets.getKey()).add(buckets.getValue()); + } } } if (!includedBuckets.isEmpty()) { - log.debug("Flushing {} buckets", includedBuckets.size()); - payloadWriter.writePayload(includedBuckets); + for (Map.Entry> entry : includedBuckets.entrySet()) { + log.debug("Flushing {} buckets", entry.getValue()); + payloadWriter.writePayload(entry.getValue(), entry.getKey()); + } } } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java index 87847c0b78d..1fec6b9852b 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java @@ -35,6 +35,7 @@ public class DefaultPathwayContext implements PathwayContext { private final Lock lock = new ReentrantLock(); private final long hashOfKnownTags; private final TimeSource timeSource; + private final String serviceNameOverride; private final GrowingByteArrayOutput outputBuffer = GrowingByteArrayOutput.withInitialCapacity(20); @@ -68,9 +69,11 @@ public class DefaultPathwayContext implements PathwayContext { TagsProcessor.DATASET_NAMESPACE_TAG, TagsProcessor.MANUAL_TAG)); - public DefaultPathwayContext(TimeSource timeSource, long hashOfKnownTags) { + public DefaultPathwayContext( + TimeSource timeSource, long hashOfKnownTags, String serviceNameOverride) { this.timeSource = timeSource; this.hashOfKnownTags = hashOfKnownTags; + this.serviceNameOverride = serviceNameOverride; } private DefaultPathwayContext( @@ -79,8 +82,9 @@ private DefaultPathwayContext( long pathwayStartNanos, long pathwayStartNanoTicks, long edgeStartNanoTicks, - long hash) { - this(timeSource, hashOfKnownTags); + long hash, + String serviceNameOverride) { + this(timeSource, hashOfKnownTags, serviceNameOverride); this.pathwayStartNanos = pathwayStartNanos; this.pathwayStartNanoTicks = pathwayStartNanoTicks; this.edgeStartNanoTicks = edgeStartNanoTicks; @@ -126,7 +130,8 @@ public void setCheckpoint( // So far, each tag key has only one tag value, so we're initializing the capacity to match // the number of tag keys for now. We should revisit this later if it's no longer the case. List allTags = new ArrayList<>(sortedTags.size()); - PathwayHashBuilder pathwayHashBuilder = new PathwayHashBuilder(hashOfKnownTags); + PathwayHashBuilder pathwayHashBuilder = + new PathwayHashBuilder(hashOfKnownTags, serviceNameOverride); DataSetHashBuilder aggregationHashBuilder = new DataSetHashBuilder(); if (!started) { @@ -190,7 +195,8 @@ public void setCheckpoint( startNanos, pathwayLatencyNano, edgeLatencyNano, - payloadSizeBytes); + payloadSizeBytes, + serviceNameOverride); edgeStartNanoTicks = nanoTicks; hash = newHash; @@ -272,18 +278,21 @@ public String toString() { private static class PathwayContextExtractor implements AgentPropagation.KeyClassifier { private final TimeSource timeSource; private final long hashOfKnownTags; + private final String serviceNameOverride; private DefaultPathwayContext extractedContext; - PathwayContextExtractor(TimeSource timeSource, long hashOfKnownTags) { + PathwayContextExtractor( + TimeSource timeSource, long hashOfKnownTags, String serviceNameOverride) { this.timeSource = timeSource; this.hashOfKnownTags = hashOfKnownTags; + this.serviceNameOverride = serviceNameOverride; } @Override public boolean accept(String key, String value) { if (PathwayContext.PROPAGATION_KEY_BASE64.equalsIgnoreCase(key)) { try { - extractedContext = strDecode(timeSource, hashOfKnownTags, value); + extractedContext = strDecode(timeSource, hashOfKnownTags, serviceNameOverride, value); } catch (IOException e) { return false; } @@ -296,11 +305,14 @@ private static class BinaryPathwayContextExtractor implements AgentPropagation.BinaryKeyClassifier { private final TimeSource timeSource; private final long hashOfKnownTags; + private final String serviceNameOverride; private DefaultPathwayContext extractedContext; - BinaryPathwayContextExtractor(TimeSource timeSource, long hashOfKnownTags) { + BinaryPathwayContextExtractor( + TimeSource timeSource, long hashOfKnownTags, String serviceNameOverride) { this.timeSource = timeSource; this.hashOfKnownTags = hashOfKnownTags; + this.serviceNameOverride = serviceNameOverride; } @Override @@ -308,7 +320,7 @@ public boolean accept(String key, byte[] value) { // older versions support, should be removed in the future if (PathwayContext.PROPAGATION_KEY.equalsIgnoreCase(key)) { try { - extractedContext = decode(timeSource, hashOfKnownTags, value); + extractedContext = decode(timeSource, hashOfKnownTags, serviceNameOverride, value); } catch (IOException e) { return false; } @@ -316,7 +328,7 @@ public boolean accept(String key, byte[] value) { if (PathwayContext.PROPAGATION_KEY_BASE64.equalsIgnoreCase(key)) { try { - extractedContext = base64Decode(timeSource, hashOfKnownTags, value); + extractedContext = base64Decode(timeSource, hashOfKnownTags, serviceNameOverride, value); } catch (IOException e) { return false; } @@ -329,13 +341,18 @@ static DefaultPathwayContext extract( C carrier, AgentPropagation.ContextVisitor getter, TimeSource timeSource, - long hashOfKnownTags) { + long hashOfKnownTags, + String serviceNameOverride) { if (getter instanceof AgentPropagation.BinaryContextVisitor) { return extractBinary( - carrier, (AgentPropagation.BinaryContextVisitor) getter, timeSource, hashOfKnownTags); + carrier, + (AgentPropagation.BinaryContextVisitor) getter, + timeSource, + hashOfKnownTags, + serviceNameOverride); } PathwayContextExtractor pathwayContextExtractor = - new PathwayContextExtractor(timeSource, hashOfKnownTags); + new PathwayContextExtractor(timeSource, hashOfKnownTags, serviceNameOverride); getter.forEachKey(carrier, pathwayContextExtractor); if (pathwayContextExtractor.extractedContext == null) { log.debug("No context extracted"); @@ -349,9 +366,10 @@ static DefaultPathwayContext extractBinary( C carrier, AgentPropagation.BinaryContextVisitor getter, TimeSource timeSource, - long hashOfKnownTags) { + long hashOfKnownTags, + String serviceNameOverride) { BinaryPathwayContextExtractor pathwayContextExtractor = - new BinaryPathwayContextExtractor(timeSource, hashOfKnownTags); + new BinaryPathwayContextExtractor(timeSource, hashOfKnownTags, serviceNameOverride); getter.forEachKey(carrier, pathwayContextExtractor); if (pathwayContextExtractor.extractedContext == null) { log.debug("No context extracted"); @@ -362,18 +380,22 @@ static DefaultPathwayContext extractBinary( } private static DefaultPathwayContext strDecode( - TimeSource timeSource, long hashOfKnownTags, String data) throws IOException { + TimeSource timeSource, long hashOfKnownTags, String serviceNameOverride, String data) + throws IOException { byte[] base64Bytes = data.getBytes(UTF_8); - return base64Decode(timeSource, hashOfKnownTags, base64Bytes); + return base64Decode(timeSource, hashOfKnownTags, serviceNameOverride, base64Bytes); } private static DefaultPathwayContext base64Decode( - TimeSource timeSource, long hashOfKnownTags, byte[] data) throws IOException { - return decode(timeSource, hashOfKnownTags, Base64.getDecoder().decode(data)); + TimeSource timeSource, long hashOfKnownTags, String serviceNameOverride, byte[] data) + throws IOException { + return decode( + timeSource, hashOfKnownTags, serviceNameOverride, Base64.getDecoder().decode(data)); } private static DefaultPathwayContext decode( - TimeSource timeSource, long hashOfKnownTags, byte[] data) throws IOException { + TimeSource timeSource, long hashOfKnownTags, String serviceNameOverride, byte[] data) + throws IOException { ByteArrayInput input = ByteArrayInput.wrap(data); long hash = input.readLongLE(); @@ -397,7 +419,8 @@ private static DefaultPathwayContext decode( pathwayStartNanos, pathwayStartNanoTicks, edgeStartNanoTicks, - hash); + hash, + serviceNameOverride); } static class DataSetHashBuilder { @@ -412,8 +435,11 @@ public long addValue(String val) { private static class PathwayHashBuilder { private long hash; - public PathwayHashBuilder(long baseHash) { + public PathwayHashBuilder(long baseHash, String serviceNameOverride) { hash = baseHash; + if (serviceNameOverride != null) { + addTag(serviceNameOverride); + } } public void addTag(String tag) { diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java index 461e5457673..e6dc2a18e05 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java @@ -56,7 +56,7 @@ public void reset() { } @Override - public void writePayload(Collection data) { + public void writePayload(Collection data, String serviceNameOverride) { writer.startMap(7); /* 1 */ writer.writeUTF8(ENV); @@ -64,7 +64,11 @@ public void writePayload(Collection data) { /* 2 */ writer.writeUTF8(SERVICE); - writer.writeUTF8(wellKnownTags.getService()); + if (serviceNameOverride != null) { + writer.writeUTF8(serviceNameOverride.getBytes(ISO_8859_1)); + } else { + writer.writeUTF8(wellKnownTags.getService()); + } /* 3 */ writer.writeUTF8(LANG); diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy index d4e6ccc8f7f..d6623cb8647 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy @@ -46,6 +46,62 @@ class DataStreamsWritingTest extends DDCoreSpecification { requestBodies = [] } + def "Service overrides split buckets"() { + given: + def conditions = new PollingConditions(timeout: 2) + + def testOkhttpClient = OkHttpUtils.buildHttpClient(HttpUrl.get(server.address), 5000L) + + def features = Stub(DDAgentFeaturesDiscovery) { + supportsDataStreams() >> true + } + + def wellKnownTags = new WellKnownTags("runtimeid", "hostname", "test", Config.get().getServiceName(), "version", "java") + + def fakeConfig = Stub(Config) { + getAgentUrl() >> server.address.toString() + getWellKnownTags() >> wellKnownTags + getPrimaryTag() >> "region-1" + } + + def sharedCommObjects = new SharedCommunicationObjects() + sharedCommObjects.featuresDiscovery = features + sharedCommObjects.okHttpClient = testOkhttpClient + sharedCommObjects.createRemaining(fakeConfig) + + def timeSource = new ControllableTimeSource() + + def traceConfig = Mock(TraceConfig) { + isDataStreamsEnabled() >> true + } + def serviceNameOverride = "service-name-override" + + when: + def dataStreams = new DefaultDataStreamsMonitoring(fakeConfig, sharedCommObjects, timeSource, { traceConfig }) + dataStreams.start() + dataStreams.setThreadServiceName(Thread.currentThread().getId(), serviceNameOverride) + dataStreams.add(new StatsPoint([], 9, 0, 10, timeSource.currentTimeNanos, 0, 0, 0, serviceNameOverride)) + dataStreams.trackBacklog(new LinkedHashMap<>(["partition": "1", "topic": "testTopic", "type": "kafka_produce"]), 130) + timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) + // force flush + dataStreams.report() + dataStreams.close() + then: + conditions.eventually { + assert requestBodies.size() == 1 + } + GzipSource gzipSource = new GzipSource(Okio.source(new ByteArrayInputStream(requestBodies[0]))) + + BufferedSource bufferedSource = Okio.buffer(gzipSource) + MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(bufferedSource.inputStream()) + + assert unpacker.unpackMapHeader() == 7 + assert unpacker.unpackString() == "Env" + assert unpacker.unpackString() == "test" + assert unpacker.unpackString() == "Service" + assert unpacker.unpackString() == serviceNameOverride + } + def "Write bucket to mock server"() { given: def conditions = new PollingConditions(timeout: 2) @@ -78,15 +134,15 @@ class DataStreamsWritingTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(fakeConfig, sharedCommObjects, timeSource, { traceConfig }) dataStreams.start() - dataStreams.add(new StatsPoint([], 9, 0, 10, timeSource.currentTimeNanos, 0, 0, 0)) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 5, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint([], 9, 0, 10, timeSource.currentTimeNanos, 0, 0, 0, null)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 5, timeSource.currentTimeNanos, 0, 0, 0, null)) dataStreams.trackBacklog(new LinkedHashMap<>(["partition": "1", "topic": "testTopic", "type": "kafka_produce"]), 100) dataStreams.trackBacklog(new LinkedHashMap<>(["partition": "1", "topic": "testTopic", "type": "kafka_produce"]), 130) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS - 100l) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 5, timeSource.currentTimeNanos, SECONDS.toNanos(10), SECONDS.toNanos(10), 10)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 5, timeSource.currentTimeNanos, SECONDS.toNanos(10), SECONDS.toNanos(10), 10, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 5, timeSource.currentTimeNanos, SECONDS.toNanos(5), SECONDS.toNanos(5), 5)) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, 6, timeSource.currentTimeNanos, SECONDS.toNanos(2), 0, 2)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 5, timeSource.currentTimeNanos, SECONDS.toNanos(5), SECONDS.toNanos(5), 5, null)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, 6, timeSource.currentTimeNanos, SECONDS.toNanos(2), 0, 2, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.groovy index 5313f1eb412..36c4dc04287 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.groovy @@ -40,7 +40,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 0, 0, 0, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 0, 0, 0, timeSource.currentTimeNanos, 0, 0, 0, null)) dataStreams.report() then: @@ -131,7 +131,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -177,7 +177,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, bucketDuration) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(bucketDuration) then: @@ -220,9 +220,9 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 3, 4, 3, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 3, 4, 3, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS - 100l) dataStreams.report() @@ -266,9 +266,9 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 5, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 5, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, 6, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, 6, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS - 100l) dataStreams.close() @@ -377,9 +377,9 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 5, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 5, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, 6, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, 6, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -434,12 +434,12 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 1, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 1, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS - 100l) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 1, timeSource.currentTimeNanos, SECONDS.toNanos(10), SECONDS.toNanos(10), 10)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 1, timeSource.currentTimeNanos, SECONDS.toNanos(10), SECONDS.toNanos(10), 10, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2,1, timeSource.currentTimeNanos, SECONDS.toNanos(5), SECONDS.toNanos(5), 5)) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, 5, timeSource.currentTimeNanos, SECONDS.toNanos(2), 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2,1, timeSource.currentTimeNanos, SECONDS.toNanos(5), SECONDS.toNanos(5), 5, null)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, 5, timeSource.currentTimeNanos, SECONDS.toNanos(2), 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -508,7 +508,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: "reporting points when data streams is not supported" def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -538,7 +538,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { timeSource.advance(FEATURE_CHECK_INTERVAL_NANOS) dataStreams.report() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -584,7 +584,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { dataStreams.start() supportsDataStreaming = false dataStreams.onEvent(EventListener.EventType.DOWNGRADED, "") - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -601,7 +601,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { timeSource.advance(FEATURE_CHECK_INTERVAL_NANOS) dataStreams.report() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -646,7 +646,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: "reporting points when data streams is not enabled" def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -662,7 +662,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { dsmEnabled = true dataStreams.report() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -695,7 +695,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: "submitting points after being disabled" payloadWriter.buckets.clear() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -729,7 +729,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: "reporting points when data streams is not supported" def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -746,7 +746,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { timeSource.advance(FEATURE_CHECK_INTERVAL_NANOS) dataStreams.report() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -760,7 +760,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { dsmEnabled = true dataStreams.report() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -805,7 +805,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: "reporting points when data streams is not supported" def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -821,7 +821,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { dsmEnabled = true dataStreams.report() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -841,7 +841,7 @@ class CapturingPayloadWriter implements DatastreamsPayloadWriter { boolean accepting = true List buckets = new ArrayList<>() - void writePayload(Collection payload) { + void writePayload(Collection payload, String serviceNameOverride) { if (accepting) { buckets.addAll(payload) } diff --git a/dd-trace-core/src/traceAgentTest/groovy/DataStreamsIntegrationTest.groovy b/dd-trace-core/src/traceAgentTest/groovy/DataStreamsIntegrationTest.groovy index 6f7ac8676a1..8b5fe50e019 100644 --- a/dd-trace-core/src/traceAgentTest/groovy/DataStreamsIntegrationTest.groovy +++ b/dd-trace-core/src/traceAgentTest/groovy/DataStreamsIntegrationTest.groovy @@ -51,7 +51,7 @@ class DataStreamsIntegrationTest extends DDSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(sink, sharedCommunicationObjects.featuresDiscovery, timeSource, { traceConfig }, Config.get()) dataStreams.start() - dataStreams.accept(new StatsPoint("testType", "testGroup", "testTopic", 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.accept(new StatsPoint("testType", "testGroup", "testTopic", 1, 2, timeSource.currentTimeNanos, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentDataStreamsMonitoring.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentDataStreamsMonitoring.java index 641716a2df3..e6cb15603dc 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentDataStreamsMonitoring.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentDataStreamsMonitoring.java @@ -42,4 +42,20 @@ void setCheckpoint( Schema getSchema(String schemaName, SchemaIterator iterator); void setProduceCheckpoint(String type, String target); + + /** + * setServiceNameOverride is used override service name for all DataStreams payloads produced + * within given thread + * + * @param threadId thread Id + * @param serviceName new service name to use for DSM checkpoints. + */ + void setThreadServiceName(Long threadId, String serviceName); + + /** + * clearThreadServiceName clears up threadId -> Service name mapping + * + * @param threadId thread Id + */ + void clearThreadServiceName(Long threadId); } diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java index 256aab3d970..5d113534179 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java @@ -1134,6 +1134,12 @@ public Schema getSchema(String schemaName, SchemaIterator iterator) { @Override public void setProduceCheckpoint(String type, String target) {} + @Override + public void setThreadServiceName(Long threadId, String serviceName) {} + + @Override + public void clearThreadServiceName(Long threadId) {} + @Override public void setConsumeCheckpoint( String type, String source, DataStreamsContextCarrier carrier) {} diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/Backlog.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/Backlog.java index 333a43f6e87..9b2a26540bc 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/Backlog.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/Backlog.java @@ -18,13 +18,20 @@ public long getTimestampNanos() { return timestampNanos; } + public String getServiceNameOverride() { + return serviceNameOverride; + } + private final List sortedTags; private final long value; private final long timestampNanos; + private final String serviceNameOverride; - public Backlog(List sortedTags, long value, long timestampNanos) { + public Backlog( + List sortedTags, long value, long timestampNanos, String serviceNameOverride) { this.sortedTags = sortedTags; this.value = value; this.timestampNanos = timestampNanos; + this.serviceNameOverride = serviceNameOverride; } } diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/StatsPoint.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/StatsPoint.java index 1901b160ebb..fb313ddcef2 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/StatsPoint.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/StatsPoint.java @@ -11,6 +11,7 @@ public class StatsPoint implements InboxItem { private final long pathwayLatencyNano; private final long edgeLatencyNano; private final long payloadSizeBytes; + private final String serviceNameOverride; public StatsPoint( List edgeTags, @@ -20,7 +21,8 @@ public StatsPoint( long timestampNanos, long pathwayLatencyNano, long edgeLatencyNano, - long payloadSizeBytes) { + long payloadSizeBytes, + String serviceNameOverride) { this.edgeTags = edgeTags; this.hash = hash; this.parentHash = parentHash; @@ -29,6 +31,7 @@ public StatsPoint( this.pathwayLatencyNano = pathwayLatencyNano; this.edgeLatencyNano = edgeLatencyNano; this.payloadSizeBytes = payloadSizeBytes; + this.serviceNameOverride = serviceNameOverride; } public List getEdgeTags() { @@ -63,6 +66,10 @@ public long getAggregationHash() { return aggregationHash; } + public String getServiceNameOverride() { + return serviceNameOverride; + } + @Override public String toString() { return "StatsPoint{" From a0e834efb4cb7cf0ff073c0f8c15f1bbd9837e2f Mon Sep 17 00:00:00 2001 From: Igor Kravchenko <21974069+kr-igor@users.noreply.github.com> Date: Thu, 17 Oct 2024 18:16:16 -0500 Subject: [PATCH 02/14] Set thread service name for spark tasks --- .../spark/SparkExecutorDecorator.java | 25 +++++++++++++++++-- .../spark/SparkExecutorInstrumentation.java | 6 +++-- 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java b/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java index 0f9519c696c..e1dda5459d1 100644 --- a/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java +++ b/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java @@ -1,8 +1,11 @@ package datadog.trace.instrumentation.spark; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import datadog.trace.bootstrap.instrumentation.decorator.BaseDecorator; +import java.lang.reflect.Field; +import java.util.Properties; import org.apache.spark.executor.Executor; import org.apache.spark.executor.TaskMetrics; @@ -10,6 +13,7 @@ public class SparkExecutorDecorator extends BaseDecorator { public static final CharSequence SPARK_TASK = UTF8BytesString.create("spark.task"); public static final CharSequence SPARK = UTF8BytesString.create("spark"); public static SparkExecutorDecorator DECORATE = new SparkExecutorDecorator(); + private final String propSparkAppName = "spark.app.name"; @Override protected String[] instrumentationNames() { @@ -26,12 +30,29 @@ protected CharSequence component() { return SPARK; } - public void onTaskStart(AgentSpan span, Executor.TaskRunner taskRunner) { + public void onTaskStart(AgentSpan span, Executor.TaskRunner taskRunner, Object taskDescription) { span.setTag("task_id", taskRunner.taskId()); span.setTag("task_thread_name", taskRunner.threadName()); + + if (taskDescription != null) { + try { + Field prop = taskDescription.getClass().getDeclaredField("properties"); + prop.setAccessible(true); + Properties props = (Properties) prop.get(taskDescription); + String appName = props.getProperty(propSparkAppName); + if (appName != null) { + AgentTracer.get() + .getDataStreamsMonitoring() + .setThreadServiceName(taskRunner.getThreadId(), appName); + } + } catch (Exception ignored) { + } + } } public void onTaskEnd(AgentSpan span, Executor.TaskRunner taskRunner) { + AgentTracer.get().getDataStreamsMonitoring().clearThreadServiceName(taskRunner.getThreadId()); + // task is set by spark in run() by deserializing the task binary coming from the driver if (taskRunner.task() == null) { return; @@ -50,7 +71,7 @@ public void onTaskEnd(AgentSpan span, Executor.TaskRunner taskRunner) { span.setTag("app_attempt_id", taskRunner.task().appAttemptId().get()); } span.setTag( - "application_name", taskRunner.task().localProperties().getProperty("spark.app.name")); + "application_name", taskRunner.task().localProperties().getProperty(propSparkAppName)); TaskMetrics metrics = taskRunner.task().metrics(); span.setMetric("spark.executor_deserialize_time", metrics.executorDeserializeTime()); diff --git a/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorInstrumentation.java b/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorInstrumentation.java index 0a8a6532326..8de403a83bf 100644 --- a/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorInstrumentation.java +++ b/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorInstrumentation.java @@ -52,11 +52,13 @@ public void methodAdvice(MethodTransformer transformer) { public static final class RunAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) - public static AgentScope enter(@Advice.This Executor.TaskRunner taskRunner) { + public static AgentScope enter( + @Advice.FieldValue("taskDescription") final Object taskDescription, + @Advice.This Executor.TaskRunner taskRunner) { final AgentSpan span = startSpan("spark-executor", SPARK_TASK); DECORATE.afterStart(span); - DECORATE.onTaskStart(span, taskRunner); + DECORATE.onTaskStart(span, taskRunner, taskDescription); return activateSpan(span); } From 3703c422653c2922935e5eb523f0eed0b61a4faf Mon Sep 17 00:00:00 2001 From: Igor Kravchenko <21974069+kr-igor@users.noreply.github.com> Date: Mon, 28 Oct 2024 13:14:09 -0500 Subject: [PATCH 03/14] Fixed tests --- .../DefaultPathwayContextTest.groovy | 66 +++++++++---------- 1 file changed, 33 insertions(+), 33 deletions(-) diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy index 4838c842335..ecdce8e0651 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy @@ -45,7 +45,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def "First Set checkpoint starts the context."() { given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) when: timeSource.advance(50) @@ -60,7 +60,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def "Checkpoint generated"() { given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) when: timeSource.advance(50) @@ -86,7 +86,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def "Checkpoint with payload size"() { given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) when: timeSource.advance(25) @@ -107,7 +107,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def "Multiple checkpoints generated"() { given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) when: timeSource.advance(50) @@ -147,7 +147,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def "Exception thrown when trying to encode unstarted context"() { given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) when: context.encode() @@ -159,14 +159,14 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def "Set checkpoint with dataset tags"() { given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) when: timeSource.advance(MILLISECONDS.toNanos(50)) context.setCheckpoint(new LinkedHashMap<>(["type": "s3", "ds.namespace": "my_bucket", "ds.name": "my_object.csv", "direction": "in"]), pointConsumer) def encoded = context.strEncode() timeSource.advance(MILLISECONDS.toNanos(2)) - def decodedContext = DefaultPathwayContext.strDecode(timeSource, baseHash, encoded) + def decodedContext = DefaultPathwayContext.strDecode(timeSource, baseHash, null, encoded) timeSource.advance(MILLISECONDS.toNanos(25)) context.setCheckpoint(new LinkedHashMap<>(["type": "s3", "ds.namespace": "my_bucket", "ds.name": "my_object.csv", "direction": "out"]), pointConsumer) @@ -185,14 +185,14 @@ class DefaultPathwayContextTest extends DDCoreSpecification { // Timesource needs to be advanced in milliseconds because encoding truncates to millis given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) when: timeSource.advance(MILLISECONDS.toNanos(50)) context.setCheckpoint(new LinkedHashMap<>(["type": "internal"]), pointConsumer) def encoded = context.strEncode() timeSource.advance(MILLISECONDS.toNanos(2)) - def decodedContext = DefaultPathwayContext.strDecode(timeSource, baseHash, encoded) + def decodedContext = DefaultPathwayContext.strDecode(timeSource, baseHash, null, encoded) timeSource.advance(MILLISECONDS.toNanos(25)) context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "kafka"]), pointConsumer) @@ -213,7 +213,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def "Set checkpoint with timestamp"() { given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) def timeFromQueue = timeSource.getCurrentTimeMillis() - 200 when: context.setCheckpoint(["type": "internal"], pointConsumer, timeFromQueue) @@ -234,7 +234,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { // Timesource needs to be advanced in milliseconds because encoding truncates to millis given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) when: timeSource.advance(MILLISECONDS.toNanos(50)) @@ -242,7 +242,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def encoded = context.strEncode() timeSource.advance(MILLISECONDS.toNanos(1)) - def decodedContext = DefaultPathwayContext.strDecode(timeSource, baseHash, encoded) + def decodedContext = DefaultPathwayContext.strDecode(timeSource, baseHash, null, encoded) timeSource.advance(MILLISECONDS.toNanos(25)) context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "kafka"]), pointConsumer) @@ -261,7 +261,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { when: def secondEncode = decodedContext.strEncode() timeSource.advance(MILLISECONDS.toNanos(2)) - def secondDecode = DefaultPathwayContext.strDecode(timeSource, baseHash, secondEncode) + def secondDecode = DefaultPathwayContext.strDecode(timeSource, baseHash, null, secondEncode) timeSource.advance(MILLISECONDS.toNanos(30)) context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topicB", "type": "kafka"]), pointConsumer) @@ -282,7 +282,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { // Timesource needs to be advanced in milliseconds because encoding truncates to millis given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) def contextVisitor = new Base64MapContextVisitor() when: @@ -292,7 +292,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def encoded = context.strEncode() Map carrier = [(PathwayContext.PROPAGATION_KEY_BASE64): encoded, "someotherkey": "someothervalue"] timeSource.advance(MILLISECONDS.toNanos(1)) - def decodedContext = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash) + def decodedContext = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash, null) timeSource.advance(MILLISECONDS.toNanos(25)) context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "kafka"]), pointConsumer) @@ -312,7 +312,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def secondEncode = decodedContext.strEncode() carrier = [(PathwayContext.PROPAGATION_KEY_BASE64): secondEncode] timeSource.advance(MILLISECONDS.toNanos(2)) - def secondDecode = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash) + def secondDecode = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash, null) timeSource.advance(MILLISECONDS.toNanos(30)) context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topicB", "type": "kafka"]), pointConsumer) @@ -333,14 +333,14 @@ class DefaultPathwayContextTest extends DDCoreSpecification { // Timesource needs to be advanced in milliseconds because encoding truncates to millis given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) when: timeSource.advance(MILLISECONDS.toNanos(50)) context.setCheckpoint(new LinkedHashMap<>(["type": "internal"]), pointConsumer) def encoded = context.encode() timeSource.advance(MILLISECONDS.toNanos(2)) - def decodedContext = DefaultPathwayContext.decode(timeSource, baseHash, encoded) + def decodedContext = DefaultPathwayContext.decode(timeSource, baseHash, null, encoded) timeSource.advance(MILLISECONDS.toNanos(25)) context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "kafka"]), pointConsumer) @@ -362,7 +362,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { // Timesource needs to be advanced in milliseconds because encoding truncates to millis given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) when: timeSource.advance(MILLISECONDS.toNanos(50)) @@ -370,7 +370,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def encoded = context.encode() timeSource.advance(MILLISECONDS.toNanos(1)) - def decodedContext = DefaultPathwayContext.decode(timeSource, baseHash, encoded) + def decodedContext = DefaultPathwayContext.decode(timeSource, baseHash, null, encoded) timeSource.advance(MILLISECONDS.toNanos(25)) context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "kafka"]), pointConsumer) @@ -389,7 +389,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { when: def secondEncode = decodedContext.encode() timeSource.advance(MILLISECONDS.toNanos(2)) - def secondDecode = DefaultPathwayContext.decode(timeSource, baseHash, secondEncode) + def secondDecode = DefaultPathwayContext.decode(timeSource, baseHash, null, secondEncode) timeSource.advance(MILLISECONDS.toNanos(30)) context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topicB", "type": "kafka"]), pointConsumer) @@ -409,7 +409,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def "Legacy binary encoding"() { given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) def contextVisitor = new BinaryMapContextVisitor() when: @@ -419,7 +419,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def encoded = java.util.Base64.getDecoder().decode(context.encode()) Map carrier = [(PathwayContext.PROPAGATION_KEY): encoded] timeSource.advance(MILLISECONDS.toNanos(1)) - def decodedContext = DefaultPathwayContext.extractBinary(carrier, contextVisitor, timeSource, baseHash) + def decodedContext = DefaultPathwayContext.extractBinary(carrier, contextVisitor, timeSource, baseHash, null) then: decodedContext.strEncode() == context.strEncode() @@ -429,7 +429,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { // Timesource needs to be advanced in milliseconds because encoding truncates to millis given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) def contextVisitor = new BinaryMapContextVisitor() when: @@ -439,7 +439,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def encoded = context.encode() Map carrier = [(PathwayContext.PROPAGATION_KEY_BASE64): encoded, "someotherkey": new byte[0]] timeSource.advance(MILLISECONDS.toNanos(1)) - def decodedContext = DefaultPathwayContext.extractBinary(carrier, contextVisitor, timeSource, baseHash) + def decodedContext = DefaultPathwayContext.extractBinary(carrier, contextVisitor, timeSource, baseHash, null) timeSource.advance(MILLISECONDS.toNanos(25)) context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "kafka"]), pointConsumer) @@ -459,7 +459,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def secondEncode = decodedContext.encode() carrier = [(PathwayContext.PROPAGATION_KEY_BASE64): secondEncode] timeSource.advance(MILLISECONDS.toNanos(2)) - def secondDecode = DefaultPathwayContext.extractBinary(carrier, contextVisitor, timeSource, baseHash) + def secondDecode = DefaultPathwayContext.extractBinary(carrier, contextVisitor, timeSource, baseHash, null) timeSource.advance(MILLISECONDS.toNanos(30)) context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topicB", "type": "kafka"]), pointConsumer) @@ -480,7 +480,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { // Timesource needs to be advanced in milliseconds because encoding truncates to millis given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) def contextVisitor = new Base64MapContextVisitor() when: @@ -490,7 +490,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def encoded = context.strEncode() Map carrier = [(PathwayContext.PROPAGATION_KEY_BASE64): encoded, "someotherkey": "someothervalue"] timeSource.advance(MILLISECONDS.toNanos(1)) - def decodedContext = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash) + def decodedContext = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash, null) timeSource.advance(MILLISECONDS.toNanos(25)) context.setCheckpoint(new LinkedHashMap<>(["topic": "topic", "type": "sqs"]), pointConsumer) @@ -510,7 +510,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def secondEncode = decodedContext.strEncode() carrier = [(PathwayContext.PROPAGATION_KEY_BASE64): secondEncode] timeSource.advance(MILLISECONDS.toNanos(2)) - def secondDecode = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash) + def secondDecode = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash, null) timeSource.advance(MILLISECONDS.toNanos(30)) context.setCheckpoint(new LinkedHashMap<>(["topic": "topicB", "type": "sqs"]), pointConsumer) @@ -530,7 +530,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def "Empty tags not set"() { given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) when: timeSource.advance(50) @@ -591,7 +591,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { globalTraceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) timeSource.advance(MILLISECONDS.toNanos(50)) context.setCheckpoint(new LinkedHashMap<>(["type": "internal"]), pointConsumer) def encoded = context.strEncode() @@ -637,7 +637,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { globalTraceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) timeSource.advance(MILLISECONDS.toNanos(50)) context.setCheckpoint(new LinkedHashMap<>(["type": "internal"]), pointConsumer) def encoded = context.strEncode() @@ -678,7 +678,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { globalTraceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) timeSource.advance(MILLISECONDS.toNanos(50)) context.setCheckpoint(new LinkedHashMap<>(["type": "internal"]), pointConsumer) def encoded = context.strEncode() From 71d5678fab36fb85da8bc0cb06a2f9aa6e5acd52 Mon Sep 17 00:00:00 2001 From: Igor Kravchenko <21974069+kr-igor@users.noreply.github.com> Date: Mon, 28 Oct 2024 13:51:05 -0500 Subject: [PATCH 04/14] Restructured tests --- .../trace/core/datastreams/DataStreamsWritingTest.groovy | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy index d6623cb8647..490258606a7 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy @@ -145,15 +145,13 @@ class DataStreamsWritingTest extends DDCoreSpecification { dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, 6, timeSource.currentTimeNanos, SECONDS.toNanos(2), 0, 2, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() + dataStreams.close() then: conditions.eventually { assert requestBodies.size() == 1 } validateMessage(requestBodies[0]) - - cleanup: - dataStreams.close() } def validateMessage(byte[] message) { From f8c7b949118a79b835b0d385b6472d87210b4da4 Mon Sep 17 00:00:00 2001 From: Igor Kravchenko <21974069+kr-igor@users.noreply.github.com> Date: Mon, 28 Oct 2024 14:20:40 -0500 Subject: [PATCH 05/14] Updated failing test --- .../trace/core/datastreams/DataStreamsWritingTest.groovy | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy index 490258606a7..ff8150d5a7a 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy @@ -144,14 +144,16 @@ class DataStreamsWritingTest extends DDCoreSpecification { dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 5, timeSource.currentTimeNanos, SECONDS.toNanos(5), SECONDS.toNanos(5), 5, null)) dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, 6, timeSource.currentTimeNanos, SECONDS.toNanos(2), 0, 2, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) - dataStreams.report() dataStreams.close() then: conditions.eventually { - assert requestBodies.size() == 1 + assert requestBodies.size() > 0 + } + + for (int i = 0; i < requestBodies.size(); i++) { + validateMessage(requestBodies[i]) } - validateMessage(requestBodies[0]) } def validateMessage(byte[] message) { From dacc353187815fb2cf511d3a17ab6f2d2a8d1abd Mon Sep 17 00:00:00 2001 From: Igor Kravchenko <21974069+kr-igor@users.noreply.github.com> Date: Mon, 28 Oct 2024 15:02:10 -0500 Subject: [PATCH 06/14] Fixed tests --- .../core/datastreams/DefaultDataStreamsMonitoring.java | 6 ++++-- .../trace/core/datastreams/DataStreamsWritingTest.groovy | 6 ++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java index fbc3e0e609f..70ad439988c 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java @@ -409,8 +409,10 @@ private void flush(long timestampNanos) { if (!includedBuckets.isEmpty()) { for (Map.Entry> entry : includedBuckets.entrySet()) { - log.debug("Flushing {} buckets", entry.getValue()); - payloadWriter.writePayload(entry.getValue(), entry.getKey()); + if (!entry.getValue().isEmpty()) { + log.debug("Flushing {} buckets", entry.getValue()); + payloadWriter.writePayload(entry.getValue(), entry.getKey()); + } } } } diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy index ff8150d5a7a..a800d6dc197 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy @@ -148,12 +148,10 @@ class DataStreamsWritingTest extends DDCoreSpecification { then: conditions.eventually { - assert requestBodies.size() > 0 + assert requestBodies.size() == 1 } - for (int i = 0; i < requestBodies.size(); i++) { - validateMessage(requestBodies[i]) - } + validateMessage(requestBodies[0]) } def validateMessage(byte[] message) { From 5c484ede3c85036ec6db9f2434dab65ad6452b9d Mon Sep 17 00:00:00 2001 From: Igor Kravchenko <21974069+kr-igor@users.noreply.github.com> Date: Mon, 28 Oct 2024 16:59:37 -0500 Subject: [PATCH 07/14] Reset service name override in test --- .../datadog/trace/core/datastreams/DataStreamsWritingTest.groovy | 1 + 1 file changed, 1 insertion(+) diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy index a800d6dc197..31c010d1e41 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy @@ -83,6 +83,7 @@ class DataStreamsWritingTest extends DDCoreSpecification { dataStreams.add(new StatsPoint([], 9, 0, 10, timeSource.currentTimeNanos, 0, 0, 0, serviceNameOverride)) dataStreams.trackBacklog(new LinkedHashMap<>(["partition": "1", "topic": "testTopic", "type": "kafka_produce"]), 130) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) + dataStreams.setThreadServiceName(Thread.currentThread().getId(), null) // force flush dataStreams.report() dataStreams.close() From 545c9284d42a5997ff58eb9d8b7534c72028f241 Mon Sep 17 00:00:00 2001 From: Igor Kravchenko <21974069+kr-igor@users.noreply.github.com> Date: Tue, 29 Oct 2024 09:52:15 -0500 Subject: [PATCH 08/14] Test now uses clearThreadServiceName --- .../core/datastreams/DefaultDataStreamsMonitoring.java | 6 ++++++ .../trace/core/datastreams/DataStreamsWritingTest.groovy | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java index 70ad439988c..01f1c5781fa 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java @@ -189,6 +189,12 @@ public void setProduceCheckpoint(String type, String target) { @Override public void setThreadServiceName(Long threadId, String serviceName) { + // setting service name to null == removing the value + if (serviceName == null) { + clearThreadServiceName(threadId); + return; + } + threadServiceNames.put(threadId, serviceName); } diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy index 31c010d1e41..51583ff8060 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy @@ -83,10 +83,10 @@ class DataStreamsWritingTest extends DDCoreSpecification { dataStreams.add(new StatsPoint([], 9, 0, 10, timeSource.currentTimeNanos, 0, 0, 0, serviceNameOverride)) dataStreams.trackBacklog(new LinkedHashMap<>(["partition": "1", "topic": "testTopic", "type": "kafka_produce"]), 130) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) - dataStreams.setThreadServiceName(Thread.currentThread().getId(), null) // force flush dataStreams.report() dataStreams.close() + dataStreams.clearThreadServiceName(Thread.currentThread().getId()) then: conditions.eventually { assert requestBodies.size() == 1 From cef6dc72fc9631a26a37851e725b8e849ce9d01e Mon Sep 17 00:00:00 2001 From: Igor Kravchenko <21974069+kr-igor@users.noreply.github.com> Date: Tue, 29 Oct 2024 16:28:06 -0500 Subject: [PATCH 09/14] Log service name --- .../trace/core/datastreams/DefaultDataStreamsMonitoring.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java index 01f1c5781fa..a966425f504 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java @@ -416,7 +416,7 @@ private void flush(long timestampNanos) { if (!includedBuckets.isEmpty()) { for (Map.Entry> entry : includedBuckets.entrySet()) { if (!entry.getValue().isEmpty()) { - log.debug("Flushing {} buckets", entry.getValue()); + log.debug("Flushing {} buckets ({})", entry.getValue(), entry.getKey()); payloadWriter.writePayload(entry.getValue(), entry.getKey()); } } From 02ce8d81a5efde4cc639aebfbf8caa6d41ab1387 Mon Sep 17 00:00:00 2001 From: Igor Kravchenko <21974069+kr-igor@users.noreply.github.com> Date: Wed, 6 Nov 2024 10:16:01 -0600 Subject: [PATCH 10/14] Use unreflected method to get properties --- .../spark/SparkExecutorDecorator.java | 64 ++++++++++++++++--- 1 file changed, 54 insertions(+), 10 deletions(-) diff --git a/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java b/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java index e1dda5459d1..45a870e72ea 100644 --- a/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java +++ b/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java @@ -4,16 +4,60 @@ import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import datadog.trace.bootstrap.instrumentation.decorator.BaseDecorator; +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; import java.lang.reflect.Field; import java.util.Properties; import org.apache.spark.executor.Executor; import org.apache.spark.executor.TaskMetrics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class SparkExecutorDecorator extends BaseDecorator { + private static final Logger log = LoggerFactory.getLogger(SparkExecutorDecorator.class); + public static final CharSequence SPARK_TASK = UTF8BytesString.create("spark.task"); public static final CharSequence SPARK = UTF8BytesString.create("spark"); public static SparkExecutorDecorator DECORATE = new SparkExecutorDecorator(); private final String propSparkAppName = "spark.app.name"; + private static final String TASK_DESCRIPTION_CLASSNAME = + "org.apache.spark.scheduler.TaskDescription"; + private static final MethodHandles.Lookup lookup = MethodHandles.lookup(); + private static final MethodHandle propertiesField_mh; + + private static Class initClass() { + try { + return Class.forName( + SparkExecutorDecorator.TASK_DESCRIPTION_CLASSNAME, + false, + SparkExecutorDecorator.class.getClassLoader()); + } catch (ClassNotFoundException e) { + log.debug("Can't find class '{}'", TASK_DESCRIPTION_CLASSNAME, e); + } + return null; + } + + private static MethodHandle getFieldGetter() { + Class cls = initClass(); + + try { + if (cls != null) { + Field field = cls.getDeclaredField("properties"); + field.setAccessible(true); + + return lookup.unreflectGetter(field); + } + + } catch (NoSuchFieldException | IllegalAccessException e) { + log.debug("Can't find and unreflect declared field for '{}'", TASK_DESCRIPTION_CLASSNAME); + } + + return null; + } + + static { + propertiesField_mh = getFieldGetter(); + } @Override protected String[] instrumentationNames() { @@ -34,18 +78,18 @@ public void onTaskStart(AgentSpan span, Executor.TaskRunner taskRunner, Object t span.setTag("task_id", taskRunner.taskId()); span.setTag("task_thread_name", taskRunner.threadName()); - if (taskDescription != null) { + if (taskDescription != null && propertiesField_mh != null) { try { - Field prop = taskDescription.getClass().getDeclaredField("properties"); - prop.setAccessible(true); - Properties props = (Properties) prop.get(taskDescription); - String appName = props.getProperty(propSparkAppName); - if (appName != null) { - AgentTracer.get() - .getDataStreamsMonitoring() - .setThreadServiceName(taskRunner.getThreadId(), appName); + Properties props = (Properties) propertiesField_mh.invoke(taskDescription); + if (props != null) { + String appName = props.getProperty(propSparkAppName); + if (appName != null) { + AgentTracer.get() + .getDataStreamsMonitoring() + .setThreadServiceName(taskRunner.getThreadId(), appName); + } } - } catch (Exception ignored) { + } catch (Throwable ignored) { } } } From a6d89db8c8878d03825f74e6585455dd2dfa3349 Mon Sep 17 00:00:00 2001 From: Andrea Marziali Date: Fri, 8 Nov 2024 09:27:27 +0100 Subject: [PATCH 11/14] Simplify methodhandle initialization --- .../spark/SparkExecutorDecorator.java | 42 +++---------------- 1 file changed, 6 insertions(+), 36 deletions(-) diff --git a/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java b/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java index 45a870e72ea..a5b4eea27aa 100644 --- a/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java +++ b/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java @@ -4,61 +4,31 @@ import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import datadog.trace.bootstrap.instrumentation.decorator.BaseDecorator; +import datadog.trace.util.MethodHandles; import java.lang.invoke.MethodHandle; -import java.lang.invoke.MethodHandles; -import java.lang.reflect.Field; import java.util.Properties; import org.apache.spark.executor.Executor; import org.apache.spark.executor.TaskMetrics; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class SparkExecutorDecorator extends BaseDecorator { - private static final Logger log = LoggerFactory.getLogger(SparkExecutorDecorator.class); - public static final CharSequence SPARK_TASK = UTF8BytesString.create("spark.task"); public static final CharSequence SPARK = UTF8BytesString.create("spark"); public static SparkExecutorDecorator DECORATE = new SparkExecutorDecorator(); private final String propSparkAppName = "spark.app.name"; private static final String TASK_DESCRIPTION_CLASSNAME = "org.apache.spark.scheduler.TaskDescription"; - private static final MethodHandles.Lookup lookup = MethodHandles.lookup(); - private static final MethodHandle propertiesField_mh; - - private static Class initClass() { - try { - return Class.forName( - SparkExecutorDecorator.TASK_DESCRIPTION_CLASSNAME, - false, - SparkExecutorDecorator.class.getClassLoader()); - } catch (ClassNotFoundException e) { - log.debug("Can't find class '{}'", TASK_DESCRIPTION_CLASSNAME, e); - } - return null; - } + private static final MethodHandle propertiesField_mh = getFieldGetter(); private static MethodHandle getFieldGetter() { - Class cls = initClass(); - try { - if (cls != null) { - Field field = cls.getDeclaredField("properties"); - field.setAccessible(true); - - return lookup.unreflectGetter(field); - } - - } catch (NoSuchFieldException | IllegalAccessException e) { - log.debug("Can't find and unreflect declared field for '{}'", TASK_DESCRIPTION_CLASSNAME); + return new MethodHandles(Executor.class.getClassLoader()) + .privateFieldGetter(TASK_DESCRIPTION_CLASSNAME, "properties"); + } catch (Throwable ignored) { + // should be already logged } - return null; } - static { - propertiesField_mh = getFieldGetter(); - } - @Override protected String[] instrumentationNames() { return new String[] {"spark-executor"}; From c2eb6cfff1cb5893453dc3330b321da6a1f6413a Mon Sep 17 00:00:00 2001 From: Igor Kravchenko <21974069+kr-igor@users.noreply.github.com> Date: Mon, 11 Nov 2024 10:07:38 -0600 Subject: [PATCH 12/14] wip: Added unit test --- .../spark-executor/build.gradle | 11 +++ .../baseTest/groovy/SparkExecutorTest.groovy | 72 +++++++++++++++++++ 2 files changed, 83 insertions(+) diff --git a/dd-java-agent/instrumentation/spark-executor/build.gradle b/dd-java-agent/instrumentation/spark-executor/build.gradle index 4525dafc370..159f7b05907 100644 --- a/dd-java-agent/instrumentation/spark-executor/build.gradle +++ b/dd-java-agent/instrumentation/spark-executor/build.gradle @@ -33,13 +33,24 @@ ext { dependencies { compileOnly group: 'org.apache.spark', name: 'spark-core_2.12', version: '2.4.0' compileOnly group: 'org.apache.spark', name: 'spark-sql_2.12', version: '2.4.0' + compileOnly group: 'org.apache.spark', name:'spark-sql-kafka-0-10_2.12', version: "2.4.0" baseTestImplementation group: 'org.apache.spark', name: "spark-core_2.12", version: "2.4.0" baseTestImplementation group: 'org.apache.spark', name: "spark-sql_2.12", version: "2.4.0" + baseTestImplementation group: 'org.apache.spark', name: "spark-sql_2.12", version: "2.4.0" + baseTestImplementation group: 'org.apache.spark', name:'spark-sql-kafka-0-10_2.12', version: "2.4.0" + testImplementation group: 'org.apache.kafka', name: 'kafka_2.12', version: '2.4.0' + testImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: '2.4.0' + testImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.4.0.RELEASE' + testImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '2.4.0.RELEASE' latest212DepTestImplementation group: 'org.apache.spark', name: "spark-core_2.12", version: '3.+' latest212DepTestImplementation group: 'org.apache.spark', name: "spark-sql_2.12", version: '3.+' + latest212DepTestImplementation group: 'org.apache.spark', name: "spark-sql_2.12", version: "3.+" + latest212DepTestImplementation group: 'org.apache.spark', name:'spark-sql-kafka-0-10_2.12', version: "2.4.0" latest213DepTestImplementation group: 'org.apache.spark', name: "spark-core_2.13", version: '3.+' latest213DepTestImplementation group: 'org.apache.spark', name: "spark-sql_2.13", version: '3.+' + latest212DepTestImplementation group: 'org.apache.spark', name: "spark-sql_2.13", version: "3.+" + latest212DepTestImplementation group: 'org.apache.spark', name:'spark-sql-kafka-0-10_2.13', version: "3.+" } diff --git a/dd-java-agent/instrumentation/spark-executor/src/baseTest/groovy/SparkExecutorTest.groovy b/dd-java-agent/instrumentation/spark-executor/src/baseTest/groovy/SparkExecutorTest.groovy index 7e9ae0951f5..16dec400604 100644 --- a/dd-java-agent/instrumentation/spark-executor/src/baseTest/groovy/SparkExecutorTest.groovy +++ b/dd-java-agent/instrumentation/spark-executor/src/baseTest/groovy/SparkExecutorTest.groovy @@ -1,17 +1,38 @@ import datadog.trace.agent.test.AgentTestRunner import datadog.trace.bootstrap.instrumentation.api.Tags +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.spark.api.java.function.VoidFunction2 import org.apache.spark.sql.Dataset import org.apache.spark.sql.Row import org.apache.spark.sql.RowFactory import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.types.StructType +import org.junit.ClassRule +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.test.EmbeddedKafkaBroker +import org.springframework.kafka.test.rule.EmbeddedKafkaRule +import org.springframework.kafka.test.utils.KafkaTestUtils +import spock.lang.Shared + class SparkExecutorTest extends AgentTestRunner { + static final SOURCE_TOPIC = "source" + static final SINK_TOPIC = "sink" + + @Shared + @ClassRule + EmbeddedKafkaRule kafkaRule = new EmbeddedKafkaRule(1, false, 1, SOURCE_TOPIC, SINK_TOPIC) + EmbeddedKafkaBroker embeddedKafka = kafkaRule.embeddedKafka @Override void configurePreAgent() { super.configurePreAgent() injectSysConfig("dd.integration.spark-executor.enabled", "true") + injectSysConfig("dd.integration.spark.enabled", "true") + injectSysConfig("dd.integration.kafka.enabled", "true") + injectSysConfig("dd.data.streams.enabled", "true") + injectSysConfig("dd.trace.debug", "true") } private Dataset generateSampleDataframe(SparkSession spark) { @@ -23,6 +44,57 @@ class SparkExecutorTest extends AgentTestRunner { spark.createDataFrame(rows, structType) } + def "test dsm service name override"() { + setup: + def sparkSession = SparkSession.builder() + .config("spark.master", "local[2]") + .config("spark.driver.bindAddress", "localhost") + // .config("spark.sql.shuffle.partitions", "2") + .appName("test-app") + .getOrCreate() + + def producerProps = KafkaTestUtils.producerProps(embeddedKafka.getBrokersAsString()) + def producer = new DefaultKafkaProducerFactory(producerProps).createProducer() + + when: + for (int i = 0; i < 100; i++) { + producer.send(new ProducerRecord<>(SOURCE_TOPIC, i, i.toString())) + } + producer.flush() + + def df = sparkSession + .readStream() + .format("kafka") + .option("kafka.bootstrap.servers", embeddedKafka.getBrokersAsString()) + .option("startingOffsets", "earliest") + .option("failOnDataLoss", "false") + .option("subscribe", SOURCE_TOPIC) + .load() + + def query = df + .selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value") + .writeStream() + .format("kafka") + .option("kafka.bootstrap.servers", embeddedKafka.getBrokersAsString()) + .option("checkpointLocation", "/tmp/" + System.currentTimeMillis().toString()) + .option("topic", SINK_TOPIC) + .trigger(Trigger.Once()) + .foreachBatch(new VoidFunction2, Long>() { + @Override + void call(Dataset rowDataset, Long aLong) throws Exception { + rowDataset.show() + rowDataset.write() + } + }) + .start() + + query.processAllAvailable() + + then: + query.stop() + producer.close() + } + def "generate spark task run spans"() { setup: def sparkSession = SparkSession.builder() From 0e9f3b6c9ae1d0a2ac2756f2a6f40292385887af Mon Sep 17 00:00:00 2001 From: Igor Kravchenko <21974069+kr-igor@users.noreply.github.com> Date: Fri, 15 Nov 2024 16:20:08 -0600 Subject: [PATCH 13/14] Added service name overrides to test writer --- .../datastreams/RecordingDatastreamsPayloadWriter.groovy | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/datastreams/RecordingDatastreamsPayloadWriter.groovy b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/datastreams/RecordingDatastreamsPayloadWriter.groovy index 396985b24cd..cedcf14724b 100644 --- a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/datastreams/RecordingDatastreamsPayloadWriter.groovy +++ b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/datastreams/RecordingDatastreamsPayloadWriter.groovy @@ -18,9 +18,12 @@ class RecordingDatastreamsPayloadWriter implements DatastreamsPayloadWriter { @SuppressWarnings('UnusedPrivateField') private final Set backlogs = [] + private final Set serviceNameOverrides = [] + @Override synchronized void writePayload(Collection data, String serviceNameOverride) { log.info("payload written - {}", data) + serviceNameOverrides.add(serviceNameOverride) this.@payloads.addAll(data) data.each { this.@groups.addAll(it.groups) } for (StatsBucket bucket : data) { @@ -32,6 +35,10 @@ class RecordingDatastreamsPayloadWriter implements DatastreamsPayloadWriter { } } + synchronized List getServices() { + Collections.unmodifiableList(new ArrayList<>(this.@serviceNameOverrides)) + } + synchronized List getPayloads() { Collections.unmodifiableList(new ArrayList<>(this.@payloads)) } From 6856dc5a9068d4952961cbbe808d6dd9f0264640 Mon Sep 17 00:00:00 2001 From: Igor Kravchenko <21974069+kr-igor@users.noreply.github.com> Date: Fri, 15 Nov 2024 16:50:42 -0600 Subject: [PATCH 14/14] Reverted instrumentation-specific code; switched to thread local --- .../spark-executor/build.gradle | 11 --- .../baseTest/groovy/SparkExecutorTest.groovy | 72 ------------------- .../spark/SparkExecutorDecorator.java | 39 +--------- .../spark/SparkExecutorInstrumentation.java | 6 +- .../DefaultDataStreamsMonitoring.java | 27 ++++--- .../datastreams/DataStreamsWritingTest.groovy | 4 +- .../api/AgentDataStreamsMonitoring.java | 13 ++-- .../instrumentation/api/AgentTracer.java | 4 +- 8 files changed, 24 insertions(+), 152 deletions(-) diff --git a/dd-java-agent/instrumentation/spark-executor/build.gradle b/dd-java-agent/instrumentation/spark-executor/build.gradle index 159f7b05907..4525dafc370 100644 --- a/dd-java-agent/instrumentation/spark-executor/build.gradle +++ b/dd-java-agent/instrumentation/spark-executor/build.gradle @@ -33,24 +33,13 @@ ext { dependencies { compileOnly group: 'org.apache.spark', name: 'spark-core_2.12', version: '2.4.0' compileOnly group: 'org.apache.spark', name: 'spark-sql_2.12', version: '2.4.0' - compileOnly group: 'org.apache.spark', name:'spark-sql-kafka-0-10_2.12', version: "2.4.0" baseTestImplementation group: 'org.apache.spark', name: "spark-core_2.12", version: "2.4.0" baseTestImplementation group: 'org.apache.spark', name: "spark-sql_2.12", version: "2.4.0" - baseTestImplementation group: 'org.apache.spark', name: "spark-sql_2.12", version: "2.4.0" - baseTestImplementation group: 'org.apache.spark', name:'spark-sql-kafka-0-10_2.12', version: "2.4.0" - testImplementation group: 'org.apache.kafka', name: 'kafka_2.12', version: '2.4.0' - testImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: '2.4.0' - testImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.4.0.RELEASE' - testImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '2.4.0.RELEASE' latest212DepTestImplementation group: 'org.apache.spark', name: "spark-core_2.12", version: '3.+' latest212DepTestImplementation group: 'org.apache.spark', name: "spark-sql_2.12", version: '3.+' - latest212DepTestImplementation group: 'org.apache.spark', name: "spark-sql_2.12", version: "3.+" - latest212DepTestImplementation group: 'org.apache.spark', name:'spark-sql-kafka-0-10_2.12', version: "2.4.0" latest213DepTestImplementation group: 'org.apache.spark', name: "spark-core_2.13", version: '3.+' latest213DepTestImplementation group: 'org.apache.spark', name: "spark-sql_2.13", version: '3.+' - latest212DepTestImplementation group: 'org.apache.spark', name: "spark-sql_2.13", version: "3.+" - latest212DepTestImplementation group: 'org.apache.spark', name:'spark-sql-kafka-0-10_2.13', version: "3.+" } diff --git a/dd-java-agent/instrumentation/spark-executor/src/baseTest/groovy/SparkExecutorTest.groovy b/dd-java-agent/instrumentation/spark-executor/src/baseTest/groovy/SparkExecutorTest.groovy index 16dec400604..7e9ae0951f5 100644 --- a/dd-java-agent/instrumentation/spark-executor/src/baseTest/groovy/SparkExecutorTest.groovy +++ b/dd-java-agent/instrumentation/spark-executor/src/baseTest/groovy/SparkExecutorTest.groovy @@ -1,38 +1,17 @@ import datadog.trace.agent.test.AgentTestRunner import datadog.trace.bootstrap.instrumentation.api.Tags -import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.spark.api.java.function.VoidFunction2 import org.apache.spark.sql.Dataset import org.apache.spark.sql.Row import org.apache.spark.sql.RowFactory import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.types.StructType -import org.junit.ClassRule -import org.springframework.kafka.core.DefaultKafkaProducerFactory -import org.springframework.kafka.test.EmbeddedKafkaBroker -import org.springframework.kafka.test.rule.EmbeddedKafkaRule -import org.springframework.kafka.test.utils.KafkaTestUtils -import spock.lang.Shared - class SparkExecutorTest extends AgentTestRunner { - static final SOURCE_TOPIC = "source" - static final SINK_TOPIC = "sink" - - @Shared - @ClassRule - EmbeddedKafkaRule kafkaRule = new EmbeddedKafkaRule(1, false, 1, SOURCE_TOPIC, SINK_TOPIC) - EmbeddedKafkaBroker embeddedKafka = kafkaRule.embeddedKafka @Override void configurePreAgent() { super.configurePreAgent() injectSysConfig("dd.integration.spark-executor.enabled", "true") - injectSysConfig("dd.integration.spark.enabled", "true") - injectSysConfig("dd.integration.kafka.enabled", "true") - injectSysConfig("dd.data.streams.enabled", "true") - injectSysConfig("dd.trace.debug", "true") } private Dataset generateSampleDataframe(SparkSession spark) { @@ -44,57 +23,6 @@ class SparkExecutorTest extends AgentTestRunner { spark.createDataFrame(rows, structType) } - def "test dsm service name override"() { - setup: - def sparkSession = SparkSession.builder() - .config("spark.master", "local[2]") - .config("spark.driver.bindAddress", "localhost") - // .config("spark.sql.shuffle.partitions", "2") - .appName("test-app") - .getOrCreate() - - def producerProps = KafkaTestUtils.producerProps(embeddedKafka.getBrokersAsString()) - def producer = new DefaultKafkaProducerFactory(producerProps).createProducer() - - when: - for (int i = 0; i < 100; i++) { - producer.send(new ProducerRecord<>(SOURCE_TOPIC, i, i.toString())) - } - producer.flush() - - def df = sparkSession - .readStream() - .format("kafka") - .option("kafka.bootstrap.servers", embeddedKafka.getBrokersAsString()) - .option("startingOffsets", "earliest") - .option("failOnDataLoss", "false") - .option("subscribe", SOURCE_TOPIC) - .load() - - def query = df - .selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value") - .writeStream() - .format("kafka") - .option("kafka.bootstrap.servers", embeddedKafka.getBrokersAsString()) - .option("checkpointLocation", "/tmp/" + System.currentTimeMillis().toString()) - .option("topic", SINK_TOPIC) - .trigger(Trigger.Once()) - .foreachBatch(new VoidFunction2, Long>() { - @Override - void call(Dataset rowDataset, Long aLong) throws Exception { - rowDataset.show() - rowDataset.write() - } - }) - .start() - - query.processAllAvailable() - - then: - query.stop() - producer.close() - } - def "generate spark task run spans"() { setup: def sparkSession = SparkSession.builder() diff --git a/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java b/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java index a5b4eea27aa..0f9519c696c 100644 --- a/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java +++ b/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java @@ -1,12 +1,8 @@ package datadog.trace.instrumentation.spark; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; -import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import datadog.trace.bootstrap.instrumentation.decorator.BaseDecorator; -import datadog.trace.util.MethodHandles; -import java.lang.invoke.MethodHandle; -import java.util.Properties; import org.apache.spark.executor.Executor; import org.apache.spark.executor.TaskMetrics; @@ -14,20 +10,6 @@ public class SparkExecutorDecorator extends BaseDecorator { public static final CharSequence SPARK_TASK = UTF8BytesString.create("spark.task"); public static final CharSequence SPARK = UTF8BytesString.create("spark"); public static SparkExecutorDecorator DECORATE = new SparkExecutorDecorator(); - private final String propSparkAppName = "spark.app.name"; - private static final String TASK_DESCRIPTION_CLASSNAME = - "org.apache.spark.scheduler.TaskDescription"; - private static final MethodHandle propertiesField_mh = getFieldGetter(); - - private static MethodHandle getFieldGetter() { - try { - return new MethodHandles(Executor.class.getClassLoader()) - .privateFieldGetter(TASK_DESCRIPTION_CLASSNAME, "properties"); - } catch (Throwable ignored) { - // should be already logged - } - return null; - } @Override protected String[] instrumentationNames() { @@ -44,29 +26,12 @@ protected CharSequence component() { return SPARK; } - public void onTaskStart(AgentSpan span, Executor.TaskRunner taskRunner, Object taskDescription) { + public void onTaskStart(AgentSpan span, Executor.TaskRunner taskRunner) { span.setTag("task_id", taskRunner.taskId()); span.setTag("task_thread_name", taskRunner.threadName()); - - if (taskDescription != null && propertiesField_mh != null) { - try { - Properties props = (Properties) propertiesField_mh.invoke(taskDescription); - if (props != null) { - String appName = props.getProperty(propSparkAppName); - if (appName != null) { - AgentTracer.get() - .getDataStreamsMonitoring() - .setThreadServiceName(taskRunner.getThreadId(), appName); - } - } - } catch (Throwable ignored) { - } - } } public void onTaskEnd(AgentSpan span, Executor.TaskRunner taskRunner) { - AgentTracer.get().getDataStreamsMonitoring().clearThreadServiceName(taskRunner.getThreadId()); - // task is set by spark in run() by deserializing the task binary coming from the driver if (taskRunner.task() == null) { return; @@ -85,7 +50,7 @@ public void onTaskEnd(AgentSpan span, Executor.TaskRunner taskRunner) { span.setTag("app_attempt_id", taskRunner.task().appAttemptId().get()); } span.setTag( - "application_name", taskRunner.task().localProperties().getProperty(propSparkAppName)); + "application_name", taskRunner.task().localProperties().getProperty("spark.app.name")); TaskMetrics metrics = taskRunner.task().metrics(); span.setMetric("spark.executor_deserialize_time", metrics.executorDeserializeTime()); diff --git a/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorInstrumentation.java b/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorInstrumentation.java index 8de403a83bf..0a8a6532326 100644 --- a/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorInstrumentation.java +++ b/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorInstrumentation.java @@ -52,13 +52,11 @@ public void methodAdvice(MethodTransformer transformer) { public static final class RunAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) - public static AgentScope enter( - @Advice.FieldValue("taskDescription") final Object taskDescription, - @Advice.This Executor.TaskRunner taskRunner) { + public static AgentScope enter(@Advice.This Executor.TaskRunner taskRunner) { final AgentSpan span = startSpan("spark-executor", SPARK_TASK); DECORATE.afterStart(span); - DECORATE.onTaskStart(span, taskRunner, taskDescription); + DECORATE.onTaskStart(span, taskRunner); return activateSpan(span); } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java index a966425f504..5398d420122 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java @@ -75,8 +75,7 @@ public class DefaultDataStreamsMonitoring implements DataStreamsMonitoring, Even private volatile boolean agentSupportsDataStreams = false; private volatile boolean configSupportsDataStreams = false; private final ConcurrentHashMap schemaSamplers; - private static final ConcurrentHashMap threadServiceNames = - new ConcurrentHashMap<>(); + private static final ThreadLocal serviceNameOverride = new ThreadLocal<>(); public DefaultDataStreamsMonitoring( Config config, @@ -188,29 +187,28 @@ public void setProduceCheckpoint(String type, String target) { } @Override - public void setThreadServiceName(Long threadId, String serviceName) { - // setting service name to null == removing the value + public void setThreadServiceName(String serviceName) { if (serviceName == null) { - clearThreadServiceName(threadId); + clearThreadServiceName(); return; } - threadServiceNames.put(threadId, serviceName); + serviceNameOverride.set(serviceName); } @Override - public void clearThreadServiceName(Long threadId) { - threadServiceNames.remove(threadId); + public void clearThreadServiceName() { + serviceNameOverride.remove(); } - private static String getThreadServiceNameOverride() { - return threadServiceNames.getOrDefault(Thread.currentThread().getId(), null); + private static String getThreadServiceName() { + return serviceNameOverride.get(); } @Override public PathwayContext newPathwayContext() { if (configSupportsDataStreams) { - return new DefaultPathwayContext(timeSource, hashOfKnownTags, getThreadServiceNameOverride()); + return new DefaultPathwayContext(timeSource, hashOfKnownTags, getThreadServiceName()); } else { return AgentTracer.NoopPathwayContext.INSTANCE; } @@ -219,7 +217,7 @@ public PathwayContext newPathwayContext() { @Override public HttpCodec.Extractor extractor(HttpCodec.Extractor delegate) { return new DataStreamContextExtractor( - delegate, timeSource, traceConfigSupplier, hashOfKnownTags, getThreadServiceNameOverride()); + delegate, timeSource, traceConfigSupplier, hashOfKnownTags, getThreadServiceName()); } @Override @@ -236,7 +234,7 @@ public void mergePathwayContextIntoSpan(AgentSpan span, DataStreamsContextCarrie DataStreamsContextCarrierAdapter.INSTANCE, this.timeSource, this.hashOfKnownTags, - getThreadServiceNameOverride()); + getThreadServiceName()); ((DDSpan) span).context().mergePathwayContext(pathwayContext); } } @@ -250,8 +248,7 @@ public void trackBacklog(LinkedHashMap sortedTags, long value) { } tags.add(tag); } - inbox.offer( - new Backlog(tags, value, timeSource.getCurrentTimeNanos(), getThreadServiceNameOverride())); + inbox.offer(new Backlog(tags, value, timeSource.getCurrentTimeNanos(), getThreadServiceName())); } @Override diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy index 51583ff8060..5f01afeafa7 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy @@ -79,14 +79,14 @@ class DataStreamsWritingTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(fakeConfig, sharedCommObjects, timeSource, { traceConfig }) dataStreams.start() - dataStreams.setThreadServiceName(Thread.currentThread().getId(), serviceNameOverride) + dataStreams.setThreadServiceName(serviceNameOverride) dataStreams.add(new StatsPoint([], 9, 0, 10, timeSource.currentTimeNanos, 0, 0, 0, serviceNameOverride)) dataStreams.trackBacklog(new LinkedHashMap<>(["partition": "1", "topic": "testTopic", "type": "kafka_produce"]), 130) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) // force flush dataStreams.report() dataStreams.close() - dataStreams.clearThreadServiceName(Thread.currentThread().getId()) + dataStreams.clearThreadServiceName() then: conditions.eventually { assert requestBodies.size() == 1 diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentDataStreamsMonitoring.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentDataStreamsMonitoring.java index e6cb15603dc..660c9be04eb 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentDataStreamsMonitoring.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentDataStreamsMonitoring.java @@ -45,17 +45,12 @@ void setCheckpoint( /** * setServiceNameOverride is used override service name for all DataStreams payloads produced - * within given thread + * within Thread.currentThread() * - * @param threadId thread Id * @param serviceName new service name to use for DSM checkpoints. */ - void setThreadServiceName(Long threadId, String serviceName); + void setThreadServiceName(String serviceName); - /** - * clearThreadServiceName clears up threadId -> Service name mapping - * - * @param threadId thread Id - */ - void clearThreadServiceName(Long threadId); + /** clearThreadServiceName clears up service name override for Thread.currentThread() */ + void clearThreadServiceName(); } diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java index 5d113534179..88f2e4e2b99 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java @@ -1135,10 +1135,10 @@ public Schema getSchema(String schemaName, SchemaIterator iterator) { public void setProduceCheckpoint(String type, String target) {} @Override - public void setThreadServiceName(Long threadId, String serviceName) {} + public void setThreadServiceName(String serviceName) {} @Override - public void clearThreadServiceName(Long threadId) {} + public void clearThreadServiceName() {} @Override public void setConsumeCheckpoint(