diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java index 4b09bd46af1..6366973f63e 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java @@ -200,7 +200,8 @@ public static void onEnter(@Advice.Argument(value = 0) int estimatedPayloadSize) saved.getTimestampNanos(), saved.getPathwayLatencyNano(), saved.getEdgeLatencyNano(), - estimatedPayloadSize); + estimatedPayloadSize, + saved.getServiceNameOverride()); // then send the point AgentTracer.get().getDataStreamsMonitoring().add(updated); } diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/PayloadSizeAdvice.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/PayloadSizeAdvice.java index 47a2e6d0201..6192485a47e 100644 --- a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/PayloadSizeAdvice.java +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/PayloadSizeAdvice.java @@ -27,7 +27,8 @@ public static void onEnter(@Advice.Argument(value = 0) int estimatedPayloadSize) saved.getTimestampNanos(), saved.getPathwayLatencyNano(), saved.getEdgeLatencyNano(), - estimatedPayloadSize); + estimatedPayloadSize, + saved.getServiceNameOverride()); // then send the point AgentTracer.get().getDataStreamsMonitoring().add(updated); } diff --git a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/datastreams/RecordingDatastreamsPayloadWriter.groovy b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/datastreams/RecordingDatastreamsPayloadWriter.groovy index 4ddb26b51af..cedcf14724b 100644 --- a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/datastreams/RecordingDatastreamsPayloadWriter.groovy +++ b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/datastreams/RecordingDatastreamsPayloadWriter.groovy @@ -18,9 +18,12 @@ class RecordingDatastreamsPayloadWriter implements DatastreamsPayloadWriter { @SuppressWarnings('UnusedPrivateField') private final Set backlogs = [] + private final Set serviceNameOverrides = [] + @Override - synchronized void writePayload(Collection data) { + synchronized void writePayload(Collection data, String serviceNameOverride) { log.info("payload written - {}", data) + serviceNameOverrides.add(serviceNameOverride) this.@payloads.addAll(data) data.each { this.@groups.addAll(it.groups) } for (StatsBucket bucket : data) { @@ -32,6 +35,10 @@ class RecordingDatastreamsPayloadWriter implements DatastreamsPayloadWriter { } } + synchronized List getServices() { + Collections.unmodifiableList(new ArrayList<>(this.@serviceNameOverrides)) + } + synchronized List getPayloads() { Collections.unmodifiableList(new ArrayList<>(this.@payloads)) } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextExtractor.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextExtractor.java index f97989b8798..f831d1cf10d 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextExtractor.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextExtractor.java @@ -12,16 +12,19 @@ public class DataStreamContextExtractor implements HttpCodec.Extractor { private final TimeSource timeSource; private final Supplier traceConfigSupplier; private final long hashOfKnownTags; + private final String serviceNameOverride; public DataStreamContextExtractor( HttpCodec.Extractor delegate, TimeSource timeSource, Supplier traceConfigSupplier, - long hashOfKnownTags) { + long hashOfKnownTags, + String serviceNameOverride) { this.delegate = delegate; this.timeSource = timeSource; this.traceConfigSupplier = traceConfigSupplier; this.hashOfKnownTags = hashOfKnownTags; + this.serviceNameOverride = serviceNameOverride; } @Override @@ -37,7 +40,8 @@ public TagContext extract(C carrier, AgentPropagation.ContextVisitor gett if (shouldExtractPathwayContext) { DefaultPathwayContext pathwayContext = - DefaultPathwayContext.extract(carrier, getter, this.timeSource, this.hashOfKnownTags); + DefaultPathwayContext.extract( + carrier, getter, this.timeSource, this.hashOfKnownTags, serviceNameOverride); extracted.withPathwayContext(pathwayContext); } @@ -45,7 +49,8 @@ public TagContext extract(C carrier, AgentPropagation.ContextVisitor gett return extracted; } else if (traceConfigSupplier.get().isDataStreamsEnabled()) { DefaultPathwayContext pathwayContext = - DefaultPathwayContext.extract(carrier, getter, this.timeSource, this.hashOfKnownTags); + DefaultPathwayContext.extract( + carrier, getter, this.timeSource, this.hashOfKnownTags, serviceNameOverride); if (pathwayContext != null) { extracted = new TagContext(); diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DatastreamsPayloadWriter.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DatastreamsPayloadWriter.java index 07d040dd9ac..b0ddf1f9870 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DatastreamsPayloadWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DatastreamsPayloadWriter.java @@ -3,5 +3,5 @@ import java.util.Collection; public interface DatastreamsPayloadWriter { - void writePayload(Collection data); + void writePayload(Collection data, String serviceNameOverride); } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java index 9df53fe08b5..5398d420122 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java @@ -39,6 +39,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -54,11 +55,11 @@ public class DefaultDataStreamsMonitoring implements DataStreamsMonitoring, Even static final long FEATURE_CHECK_INTERVAL_NANOS = TimeUnit.MINUTES.toNanos(5); private static final StatsPoint REPORT = - new StatsPoint(Collections.emptyList(), 0, 0, 0, 0, 0, 0, 0); + new StatsPoint(Collections.emptyList(), 0, 0, 0, 0, 0, 0, 0, null); private static final StatsPoint POISON_PILL = - new StatsPoint(Collections.emptyList(), 0, 0, 0, 0, 0, 0, 0); + new StatsPoint(Collections.emptyList(), 0, 0, 0, 0, 0, 0, 0, null); - private final Map timeToBucket = new HashMap<>(); + private final Map> timeToBucket = new HashMap<>(); private final MpscArrayQueue inbox = new MpscArrayQueue<>(1024); private final DatastreamsPayloadWriter payloadWriter; private final DDAgentFeaturesDiscovery features; @@ -74,6 +75,7 @@ public class DefaultDataStreamsMonitoring implements DataStreamsMonitoring, Even private volatile boolean agentSupportsDataStreams = false; private volatile boolean configSupportsDataStreams = false; private final ConcurrentHashMap schemaSamplers; + private static final ThreadLocal serviceNameOverride = new ThreadLocal<>(); public DefaultDataStreamsMonitoring( Config config, @@ -184,10 +186,29 @@ public void setProduceCheckpoint(String type, String target) { setProduceCheckpoint(type, target, DataStreamsContextCarrier.NoOp.INSTANCE, false); } + @Override + public void setThreadServiceName(String serviceName) { + if (serviceName == null) { + clearThreadServiceName(); + return; + } + + serviceNameOverride.set(serviceName); + } + + @Override + public void clearThreadServiceName() { + serviceNameOverride.remove(); + } + + private static String getThreadServiceName() { + return serviceNameOverride.get(); + } + @Override public PathwayContext newPathwayContext() { if (configSupportsDataStreams) { - return new DefaultPathwayContext(timeSource, hashOfKnownTags); + return new DefaultPathwayContext(timeSource, hashOfKnownTags, getThreadServiceName()); } else { return AgentTracer.NoopPathwayContext.INSTANCE; } @@ -196,7 +217,7 @@ public PathwayContext newPathwayContext() { @Override public HttpCodec.Extractor extractor(HttpCodec.Extractor delegate) { return new DataStreamContextExtractor( - delegate, timeSource, traceConfigSupplier, hashOfKnownTags); + delegate, timeSource, traceConfigSupplier, hashOfKnownTags, getThreadServiceName()); } @Override @@ -212,7 +233,8 @@ public void mergePathwayContextIntoSpan(AgentSpan span, DataStreamsContextCarrie carrier, DataStreamsContextCarrierAdapter.INSTANCE, this.timeSource, - this.hashOfKnownTags); + this.hashOfKnownTags, + getThreadServiceName()); ((DDSpan) span).context().mergePathwayContext(pathwayContext); } } @@ -226,7 +248,7 @@ public void trackBacklog(LinkedHashMap sortedTags, long value) { } tags.add(tag); } - inbox.offer(new Backlog(tags, value, timeSource.getCurrentTimeNanos())); + inbox.offer(new Backlog(tags, value, timeSource.getCurrentTimeNanos(), getThreadServiceName())); } @Override @@ -308,6 +330,15 @@ public void close() { } private class InboxProcessor implements Runnable { + + private StatsBucket getStatsBucket(final long timestamp, final String serviceNameOverride) { + long bucket = currentBucket(timestamp); + Map statsBucketMap = + timeToBucket.computeIfAbsent(bucket, startTime -> new HashMap<>(1)); + return statsBucketMap.computeIfAbsent( + serviceNameOverride, s -> new StatsBucket(bucket, bucketDurationNanos)); + } + @Override public void run() { Thread currentThread = Thread.currentThread(); @@ -335,17 +366,14 @@ public void run() { } else if (supportsDataStreams) { if (payload instanceof StatsPoint) { StatsPoint statsPoint = (StatsPoint) payload; - Long bucket = currentBucket(statsPoint.getTimestampNanos()); StatsBucket statsBucket = - timeToBucket.computeIfAbsent( - bucket, startTime -> new StatsBucket(startTime, bucketDurationNanos)); + getStatsBucket( + statsPoint.getTimestampNanos(), statsPoint.getServiceNameOverride()); statsBucket.addPoint(statsPoint); } else if (payload instanceof Backlog) { Backlog backlog = (Backlog) payload; - Long bucket = currentBucket(backlog.getTimestampNanos()); StatsBucket statsBucket = - timeToBucket.computeIfAbsent( - bucket, startTime -> new StatsBucket(startTime, bucketDurationNanos)); + getStatsBucket(backlog.getTimestampNanos(), backlog.getServiceNameOverride()); statsBucket.addBacklog(backlog); } } @@ -363,21 +391,32 @@ private long currentBucket(long timestampNanos) { private void flush(long timestampNanos) { long currentBucket = currentBucket(timestampNanos); - List includedBuckets = new ArrayList<>(); - Iterator> mapIterator = timeToBucket.entrySet().iterator(); + // stats are grouped by time buckets and service names + Map> includedBuckets = new HashMap<>(); + Iterator>> mapIterator = + timeToBucket.entrySet().iterator(); while (mapIterator.hasNext()) { - Map.Entry entry = mapIterator.next(); - + Map.Entry> entry = mapIterator.next(); if (entry.getKey() < currentBucket) { mapIterator.remove(); - includedBuckets.add(entry.getValue()); + for (Map.Entry buckets : entry.getValue().entrySet()) { + if (!includedBuckets.containsKey(buckets.getKey())) { + includedBuckets.put(buckets.getKey(), new LinkedList<>()); + } + + includedBuckets.get(buckets.getKey()).add(buckets.getValue()); + } } } if (!includedBuckets.isEmpty()) { - log.debug("Flushing {} buckets", includedBuckets.size()); - payloadWriter.writePayload(includedBuckets); + for (Map.Entry> entry : includedBuckets.entrySet()) { + if (!entry.getValue().isEmpty()) { + log.debug("Flushing {} buckets ({})", entry.getValue(), entry.getKey()); + payloadWriter.writePayload(entry.getValue(), entry.getKey()); + } + } } } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java index 87847c0b78d..1fec6b9852b 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java @@ -35,6 +35,7 @@ public class DefaultPathwayContext implements PathwayContext { private final Lock lock = new ReentrantLock(); private final long hashOfKnownTags; private final TimeSource timeSource; + private final String serviceNameOverride; private final GrowingByteArrayOutput outputBuffer = GrowingByteArrayOutput.withInitialCapacity(20); @@ -68,9 +69,11 @@ public class DefaultPathwayContext implements PathwayContext { TagsProcessor.DATASET_NAMESPACE_TAG, TagsProcessor.MANUAL_TAG)); - public DefaultPathwayContext(TimeSource timeSource, long hashOfKnownTags) { + public DefaultPathwayContext( + TimeSource timeSource, long hashOfKnownTags, String serviceNameOverride) { this.timeSource = timeSource; this.hashOfKnownTags = hashOfKnownTags; + this.serviceNameOverride = serviceNameOverride; } private DefaultPathwayContext( @@ -79,8 +82,9 @@ private DefaultPathwayContext( long pathwayStartNanos, long pathwayStartNanoTicks, long edgeStartNanoTicks, - long hash) { - this(timeSource, hashOfKnownTags); + long hash, + String serviceNameOverride) { + this(timeSource, hashOfKnownTags, serviceNameOverride); this.pathwayStartNanos = pathwayStartNanos; this.pathwayStartNanoTicks = pathwayStartNanoTicks; this.edgeStartNanoTicks = edgeStartNanoTicks; @@ -126,7 +130,8 @@ public void setCheckpoint( // So far, each tag key has only one tag value, so we're initializing the capacity to match // the number of tag keys for now. We should revisit this later if it's no longer the case. List allTags = new ArrayList<>(sortedTags.size()); - PathwayHashBuilder pathwayHashBuilder = new PathwayHashBuilder(hashOfKnownTags); + PathwayHashBuilder pathwayHashBuilder = + new PathwayHashBuilder(hashOfKnownTags, serviceNameOverride); DataSetHashBuilder aggregationHashBuilder = new DataSetHashBuilder(); if (!started) { @@ -190,7 +195,8 @@ public void setCheckpoint( startNanos, pathwayLatencyNano, edgeLatencyNano, - payloadSizeBytes); + payloadSizeBytes, + serviceNameOverride); edgeStartNanoTicks = nanoTicks; hash = newHash; @@ -272,18 +278,21 @@ public String toString() { private static class PathwayContextExtractor implements AgentPropagation.KeyClassifier { private final TimeSource timeSource; private final long hashOfKnownTags; + private final String serviceNameOverride; private DefaultPathwayContext extractedContext; - PathwayContextExtractor(TimeSource timeSource, long hashOfKnownTags) { + PathwayContextExtractor( + TimeSource timeSource, long hashOfKnownTags, String serviceNameOverride) { this.timeSource = timeSource; this.hashOfKnownTags = hashOfKnownTags; + this.serviceNameOverride = serviceNameOverride; } @Override public boolean accept(String key, String value) { if (PathwayContext.PROPAGATION_KEY_BASE64.equalsIgnoreCase(key)) { try { - extractedContext = strDecode(timeSource, hashOfKnownTags, value); + extractedContext = strDecode(timeSource, hashOfKnownTags, serviceNameOverride, value); } catch (IOException e) { return false; } @@ -296,11 +305,14 @@ private static class BinaryPathwayContextExtractor implements AgentPropagation.BinaryKeyClassifier { private final TimeSource timeSource; private final long hashOfKnownTags; + private final String serviceNameOverride; private DefaultPathwayContext extractedContext; - BinaryPathwayContextExtractor(TimeSource timeSource, long hashOfKnownTags) { + BinaryPathwayContextExtractor( + TimeSource timeSource, long hashOfKnownTags, String serviceNameOverride) { this.timeSource = timeSource; this.hashOfKnownTags = hashOfKnownTags; + this.serviceNameOverride = serviceNameOverride; } @Override @@ -308,7 +320,7 @@ public boolean accept(String key, byte[] value) { // older versions support, should be removed in the future if (PathwayContext.PROPAGATION_KEY.equalsIgnoreCase(key)) { try { - extractedContext = decode(timeSource, hashOfKnownTags, value); + extractedContext = decode(timeSource, hashOfKnownTags, serviceNameOverride, value); } catch (IOException e) { return false; } @@ -316,7 +328,7 @@ public boolean accept(String key, byte[] value) { if (PathwayContext.PROPAGATION_KEY_BASE64.equalsIgnoreCase(key)) { try { - extractedContext = base64Decode(timeSource, hashOfKnownTags, value); + extractedContext = base64Decode(timeSource, hashOfKnownTags, serviceNameOverride, value); } catch (IOException e) { return false; } @@ -329,13 +341,18 @@ static DefaultPathwayContext extract( C carrier, AgentPropagation.ContextVisitor getter, TimeSource timeSource, - long hashOfKnownTags) { + long hashOfKnownTags, + String serviceNameOverride) { if (getter instanceof AgentPropagation.BinaryContextVisitor) { return extractBinary( - carrier, (AgentPropagation.BinaryContextVisitor) getter, timeSource, hashOfKnownTags); + carrier, + (AgentPropagation.BinaryContextVisitor) getter, + timeSource, + hashOfKnownTags, + serviceNameOverride); } PathwayContextExtractor pathwayContextExtractor = - new PathwayContextExtractor(timeSource, hashOfKnownTags); + new PathwayContextExtractor(timeSource, hashOfKnownTags, serviceNameOverride); getter.forEachKey(carrier, pathwayContextExtractor); if (pathwayContextExtractor.extractedContext == null) { log.debug("No context extracted"); @@ -349,9 +366,10 @@ static DefaultPathwayContext extractBinary( C carrier, AgentPropagation.BinaryContextVisitor getter, TimeSource timeSource, - long hashOfKnownTags) { + long hashOfKnownTags, + String serviceNameOverride) { BinaryPathwayContextExtractor pathwayContextExtractor = - new BinaryPathwayContextExtractor(timeSource, hashOfKnownTags); + new BinaryPathwayContextExtractor(timeSource, hashOfKnownTags, serviceNameOverride); getter.forEachKey(carrier, pathwayContextExtractor); if (pathwayContextExtractor.extractedContext == null) { log.debug("No context extracted"); @@ -362,18 +380,22 @@ static DefaultPathwayContext extractBinary( } private static DefaultPathwayContext strDecode( - TimeSource timeSource, long hashOfKnownTags, String data) throws IOException { + TimeSource timeSource, long hashOfKnownTags, String serviceNameOverride, String data) + throws IOException { byte[] base64Bytes = data.getBytes(UTF_8); - return base64Decode(timeSource, hashOfKnownTags, base64Bytes); + return base64Decode(timeSource, hashOfKnownTags, serviceNameOverride, base64Bytes); } private static DefaultPathwayContext base64Decode( - TimeSource timeSource, long hashOfKnownTags, byte[] data) throws IOException { - return decode(timeSource, hashOfKnownTags, Base64.getDecoder().decode(data)); + TimeSource timeSource, long hashOfKnownTags, String serviceNameOverride, byte[] data) + throws IOException { + return decode( + timeSource, hashOfKnownTags, serviceNameOverride, Base64.getDecoder().decode(data)); } private static DefaultPathwayContext decode( - TimeSource timeSource, long hashOfKnownTags, byte[] data) throws IOException { + TimeSource timeSource, long hashOfKnownTags, String serviceNameOverride, byte[] data) + throws IOException { ByteArrayInput input = ByteArrayInput.wrap(data); long hash = input.readLongLE(); @@ -397,7 +419,8 @@ private static DefaultPathwayContext decode( pathwayStartNanos, pathwayStartNanoTicks, edgeStartNanoTicks, - hash); + hash, + serviceNameOverride); } static class DataSetHashBuilder { @@ -412,8 +435,11 @@ public long addValue(String val) { private static class PathwayHashBuilder { private long hash; - public PathwayHashBuilder(long baseHash) { + public PathwayHashBuilder(long baseHash, String serviceNameOverride) { hash = baseHash; + if (serviceNameOverride != null) { + addTag(serviceNameOverride); + } } public void addTag(String tag) { diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java index 461e5457673..e6dc2a18e05 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java @@ -56,7 +56,7 @@ public void reset() { } @Override - public void writePayload(Collection data) { + public void writePayload(Collection data, String serviceNameOverride) { writer.startMap(7); /* 1 */ writer.writeUTF8(ENV); @@ -64,7 +64,11 @@ public void writePayload(Collection data) { /* 2 */ writer.writeUTF8(SERVICE); - writer.writeUTF8(wellKnownTags.getService()); + if (serviceNameOverride != null) { + writer.writeUTF8(serviceNameOverride.getBytes(ISO_8859_1)); + } else { + writer.writeUTF8(wellKnownTags.getService()); + } /* 3 */ writer.writeUTF8(LANG); diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy index d4e6ccc8f7f..5f01afeafa7 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy @@ -46,6 +46,63 @@ class DataStreamsWritingTest extends DDCoreSpecification { requestBodies = [] } + def "Service overrides split buckets"() { + given: + def conditions = new PollingConditions(timeout: 2) + + def testOkhttpClient = OkHttpUtils.buildHttpClient(HttpUrl.get(server.address), 5000L) + + def features = Stub(DDAgentFeaturesDiscovery) { + supportsDataStreams() >> true + } + + def wellKnownTags = new WellKnownTags("runtimeid", "hostname", "test", Config.get().getServiceName(), "version", "java") + + def fakeConfig = Stub(Config) { + getAgentUrl() >> server.address.toString() + getWellKnownTags() >> wellKnownTags + getPrimaryTag() >> "region-1" + } + + def sharedCommObjects = new SharedCommunicationObjects() + sharedCommObjects.featuresDiscovery = features + sharedCommObjects.okHttpClient = testOkhttpClient + sharedCommObjects.createRemaining(fakeConfig) + + def timeSource = new ControllableTimeSource() + + def traceConfig = Mock(TraceConfig) { + isDataStreamsEnabled() >> true + } + def serviceNameOverride = "service-name-override" + + when: + def dataStreams = new DefaultDataStreamsMonitoring(fakeConfig, sharedCommObjects, timeSource, { traceConfig }) + dataStreams.start() + dataStreams.setThreadServiceName(serviceNameOverride) + dataStreams.add(new StatsPoint([], 9, 0, 10, timeSource.currentTimeNanos, 0, 0, 0, serviceNameOverride)) + dataStreams.trackBacklog(new LinkedHashMap<>(["partition": "1", "topic": "testTopic", "type": "kafka_produce"]), 130) + timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) + // force flush + dataStreams.report() + dataStreams.close() + dataStreams.clearThreadServiceName() + then: + conditions.eventually { + assert requestBodies.size() == 1 + } + GzipSource gzipSource = new GzipSource(Okio.source(new ByteArrayInputStream(requestBodies[0]))) + + BufferedSource bufferedSource = Okio.buffer(gzipSource) + MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(bufferedSource.inputStream()) + + assert unpacker.unpackMapHeader() == 7 + assert unpacker.unpackString() == "Env" + assert unpacker.unpackString() == "test" + assert unpacker.unpackString() == "Service" + assert unpacker.unpackString() == serviceNameOverride + } + def "Write bucket to mock server"() { given: def conditions = new PollingConditions(timeout: 2) @@ -78,26 +135,24 @@ class DataStreamsWritingTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(fakeConfig, sharedCommObjects, timeSource, { traceConfig }) dataStreams.start() - dataStreams.add(new StatsPoint([], 9, 0, 10, timeSource.currentTimeNanos, 0, 0, 0)) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 5, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint([], 9, 0, 10, timeSource.currentTimeNanos, 0, 0, 0, null)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 5, timeSource.currentTimeNanos, 0, 0, 0, null)) dataStreams.trackBacklog(new LinkedHashMap<>(["partition": "1", "topic": "testTopic", "type": "kafka_produce"]), 100) dataStreams.trackBacklog(new LinkedHashMap<>(["partition": "1", "topic": "testTopic", "type": "kafka_produce"]), 130) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS - 100l) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 5, timeSource.currentTimeNanos, SECONDS.toNanos(10), SECONDS.toNanos(10), 10)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 5, timeSource.currentTimeNanos, SECONDS.toNanos(10), SECONDS.toNanos(10), 10, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 5, timeSource.currentTimeNanos, SECONDS.toNanos(5), SECONDS.toNanos(5), 5)) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, 6, timeSource.currentTimeNanos, SECONDS.toNanos(2), 0, 2)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 5, timeSource.currentTimeNanos, SECONDS.toNanos(5), SECONDS.toNanos(5), 5, null)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, 6, timeSource.currentTimeNanos, SECONDS.toNanos(2), 0, 2, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) - dataStreams.report() + dataStreams.close() then: conditions.eventually { assert requestBodies.size() == 1 } - validateMessage(requestBodies[0]) - cleanup: - dataStreams.close() + validateMessage(requestBodies[0]) } def validateMessage(byte[] message) { diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.groovy index 5313f1eb412..36c4dc04287 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.groovy @@ -40,7 +40,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 0, 0, 0, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 0, 0, 0, timeSource.currentTimeNanos, 0, 0, 0, null)) dataStreams.report() then: @@ -131,7 +131,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -177,7 +177,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, bucketDuration) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(bucketDuration) then: @@ -220,9 +220,9 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 3, 4, 3, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 3, 4, 3, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS - 100l) dataStreams.report() @@ -266,9 +266,9 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 5, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 5, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, 6, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, 6, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS - 100l) dataStreams.close() @@ -377,9 +377,9 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 5, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 5, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, 6, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, 6, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -434,12 +434,12 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 1, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 1, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS - 100l) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 1, timeSource.currentTimeNanos, SECONDS.toNanos(10), SECONDS.toNanos(10), 10)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 1, timeSource.currentTimeNanos, SECONDS.toNanos(10), SECONDS.toNanos(10), 10, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2,1, timeSource.currentTimeNanos, SECONDS.toNanos(5), SECONDS.toNanos(5), 5)) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, 5, timeSource.currentTimeNanos, SECONDS.toNanos(2), 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2,1, timeSource.currentTimeNanos, SECONDS.toNanos(5), SECONDS.toNanos(5), 5, null)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, 5, timeSource.currentTimeNanos, SECONDS.toNanos(2), 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -508,7 +508,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: "reporting points when data streams is not supported" def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -538,7 +538,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { timeSource.advance(FEATURE_CHECK_INTERVAL_NANOS) dataStreams.report() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -584,7 +584,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { dataStreams.start() supportsDataStreaming = false dataStreams.onEvent(EventListener.EventType.DOWNGRADED, "") - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -601,7 +601,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { timeSource.advance(FEATURE_CHECK_INTERVAL_NANOS) dataStreams.report() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -646,7 +646,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: "reporting points when data streams is not enabled" def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -662,7 +662,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { dsmEnabled = true dataStreams.report() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -695,7 +695,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: "submitting points after being disabled" payloadWriter.buckets.clear() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -729,7 +729,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: "reporting points when data streams is not supported" def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -746,7 +746,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { timeSource.advance(FEATURE_CHECK_INTERVAL_NANOS) dataStreams.report() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -760,7 +760,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { dsmEnabled = true dataStreams.report() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -805,7 +805,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: "reporting points when data streams is not supported" def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -821,7 +821,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { dsmEnabled = true dataStreams.report() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 3, timeSource.currentTimeNanos, 0, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -841,7 +841,7 @@ class CapturingPayloadWriter implements DatastreamsPayloadWriter { boolean accepting = true List buckets = new ArrayList<>() - void writePayload(Collection payload) { + void writePayload(Collection payload, String serviceNameOverride) { if (accepting) { buckets.addAll(payload) } diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy index 4838c842335..ecdce8e0651 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy @@ -45,7 +45,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def "First Set checkpoint starts the context."() { given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) when: timeSource.advance(50) @@ -60,7 +60,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def "Checkpoint generated"() { given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) when: timeSource.advance(50) @@ -86,7 +86,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def "Checkpoint with payload size"() { given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) when: timeSource.advance(25) @@ -107,7 +107,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def "Multiple checkpoints generated"() { given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) when: timeSource.advance(50) @@ -147,7 +147,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def "Exception thrown when trying to encode unstarted context"() { given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) when: context.encode() @@ -159,14 +159,14 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def "Set checkpoint with dataset tags"() { given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) when: timeSource.advance(MILLISECONDS.toNanos(50)) context.setCheckpoint(new LinkedHashMap<>(["type": "s3", "ds.namespace": "my_bucket", "ds.name": "my_object.csv", "direction": "in"]), pointConsumer) def encoded = context.strEncode() timeSource.advance(MILLISECONDS.toNanos(2)) - def decodedContext = DefaultPathwayContext.strDecode(timeSource, baseHash, encoded) + def decodedContext = DefaultPathwayContext.strDecode(timeSource, baseHash, null, encoded) timeSource.advance(MILLISECONDS.toNanos(25)) context.setCheckpoint(new LinkedHashMap<>(["type": "s3", "ds.namespace": "my_bucket", "ds.name": "my_object.csv", "direction": "out"]), pointConsumer) @@ -185,14 +185,14 @@ class DefaultPathwayContextTest extends DDCoreSpecification { // Timesource needs to be advanced in milliseconds because encoding truncates to millis given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) when: timeSource.advance(MILLISECONDS.toNanos(50)) context.setCheckpoint(new LinkedHashMap<>(["type": "internal"]), pointConsumer) def encoded = context.strEncode() timeSource.advance(MILLISECONDS.toNanos(2)) - def decodedContext = DefaultPathwayContext.strDecode(timeSource, baseHash, encoded) + def decodedContext = DefaultPathwayContext.strDecode(timeSource, baseHash, null, encoded) timeSource.advance(MILLISECONDS.toNanos(25)) context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "kafka"]), pointConsumer) @@ -213,7 +213,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def "Set checkpoint with timestamp"() { given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) def timeFromQueue = timeSource.getCurrentTimeMillis() - 200 when: context.setCheckpoint(["type": "internal"], pointConsumer, timeFromQueue) @@ -234,7 +234,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { // Timesource needs to be advanced in milliseconds because encoding truncates to millis given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) when: timeSource.advance(MILLISECONDS.toNanos(50)) @@ -242,7 +242,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def encoded = context.strEncode() timeSource.advance(MILLISECONDS.toNanos(1)) - def decodedContext = DefaultPathwayContext.strDecode(timeSource, baseHash, encoded) + def decodedContext = DefaultPathwayContext.strDecode(timeSource, baseHash, null, encoded) timeSource.advance(MILLISECONDS.toNanos(25)) context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "kafka"]), pointConsumer) @@ -261,7 +261,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { when: def secondEncode = decodedContext.strEncode() timeSource.advance(MILLISECONDS.toNanos(2)) - def secondDecode = DefaultPathwayContext.strDecode(timeSource, baseHash, secondEncode) + def secondDecode = DefaultPathwayContext.strDecode(timeSource, baseHash, null, secondEncode) timeSource.advance(MILLISECONDS.toNanos(30)) context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topicB", "type": "kafka"]), pointConsumer) @@ -282,7 +282,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { // Timesource needs to be advanced in milliseconds because encoding truncates to millis given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) def contextVisitor = new Base64MapContextVisitor() when: @@ -292,7 +292,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def encoded = context.strEncode() Map carrier = [(PathwayContext.PROPAGATION_KEY_BASE64): encoded, "someotherkey": "someothervalue"] timeSource.advance(MILLISECONDS.toNanos(1)) - def decodedContext = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash) + def decodedContext = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash, null) timeSource.advance(MILLISECONDS.toNanos(25)) context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "kafka"]), pointConsumer) @@ -312,7 +312,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def secondEncode = decodedContext.strEncode() carrier = [(PathwayContext.PROPAGATION_KEY_BASE64): secondEncode] timeSource.advance(MILLISECONDS.toNanos(2)) - def secondDecode = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash) + def secondDecode = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash, null) timeSource.advance(MILLISECONDS.toNanos(30)) context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topicB", "type": "kafka"]), pointConsumer) @@ -333,14 +333,14 @@ class DefaultPathwayContextTest extends DDCoreSpecification { // Timesource needs to be advanced in milliseconds because encoding truncates to millis given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) when: timeSource.advance(MILLISECONDS.toNanos(50)) context.setCheckpoint(new LinkedHashMap<>(["type": "internal"]), pointConsumer) def encoded = context.encode() timeSource.advance(MILLISECONDS.toNanos(2)) - def decodedContext = DefaultPathwayContext.decode(timeSource, baseHash, encoded) + def decodedContext = DefaultPathwayContext.decode(timeSource, baseHash, null, encoded) timeSource.advance(MILLISECONDS.toNanos(25)) context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "kafka"]), pointConsumer) @@ -362,7 +362,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { // Timesource needs to be advanced in milliseconds because encoding truncates to millis given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) when: timeSource.advance(MILLISECONDS.toNanos(50)) @@ -370,7 +370,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def encoded = context.encode() timeSource.advance(MILLISECONDS.toNanos(1)) - def decodedContext = DefaultPathwayContext.decode(timeSource, baseHash, encoded) + def decodedContext = DefaultPathwayContext.decode(timeSource, baseHash, null, encoded) timeSource.advance(MILLISECONDS.toNanos(25)) context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "kafka"]), pointConsumer) @@ -389,7 +389,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { when: def secondEncode = decodedContext.encode() timeSource.advance(MILLISECONDS.toNanos(2)) - def secondDecode = DefaultPathwayContext.decode(timeSource, baseHash, secondEncode) + def secondDecode = DefaultPathwayContext.decode(timeSource, baseHash, null, secondEncode) timeSource.advance(MILLISECONDS.toNanos(30)) context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topicB", "type": "kafka"]), pointConsumer) @@ -409,7 +409,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def "Legacy binary encoding"() { given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) def contextVisitor = new BinaryMapContextVisitor() when: @@ -419,7 +419,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def encoded = java.util.Base64.getDecoder().decode(context.encode()) Map carrier = [(PathwayContext.PROPAGATION_KEY): encoded] timeSource.advance(MILLISECONDS.toNanos(1)) - def decodedContext = DefaultPathwayContext.extractBinary(carrier, contextVisitor, timeSource, baseHash) + def decodedContext = DefaultPathwayContext.extractBinary(carrier, contextVisitor, timeSource, baseHash, null) then: decodedContext.strEncode() == context.strEncode() @@ -429,7 +429,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { // Timesource needs to be advanced in milliseconds because encoding truncates to millis given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) def contextVisitor = new BinaryMapContextVisitor() when: @@ -439,7 +439,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def encoded = context.encode() Map carrier = [(PathwayContext.PROPAGATION_KEY_BASE64): encoded, "someotherkey": new byte[0]] timeSource.advance(MILLISECONDS.toNanos(1)) - def decodedContext = DefaultPathwayContext.extractBinary(carrier, contextVisitor, timeSource, baseHash) + def decodedContext = DefaultPathwayContext.extractBinary(carrier, contextVisitor, timeSource, baseHash, null) timeSource.advance(MILLISECONDS.toNanos(25)) context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "kafka"]), pointConsumer) @@ -459,7 +459,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def secondEncode = decodedContext.encode() carrier = [(PathwayContext.PROPAGATION_KEY_BASE64): secondEncode] timeSource.advance(MILLISECONDS.toNanos(2)) - def secondDecode = DefaultPathwayContext.extractBinary(carrier, contextVisitor, timeSource, baseHash) + def secondDecode = DefaultPathwayContext.extractBinary(carrier, contextVisitor, timeSource, baseHash, null) timeSource.advance(MILLISECONDS.toNanos(30)) context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topicB", "type": "kafka"]), pointConsumer) @@ -480,7 +480,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { // Timesource needs to be advanced in milliseconds because encoding truncates to millis given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) def contextVisitor = new Base64MapContextVisitor() when: @@ -490,7 +490,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def encoded = context.strEncode() Map carrier = [(PathwayContext.PROPAGATION_KEY_BASE64): encoded, "someotherkey": "someothervalue"] timeSource.advance(MILLISECONDS.toNanos(1)) - def decodedContext = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash) + def decodedContext = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash, null) timeSource.advance(MILLISECONDS.toNanos(25)) context.setCheckpoint(new LinkedHashMap<>(["topic": "topic", "type": "sqs"]), pointConsumer) @@ -510,7 +510,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def secondEncode = decodedContext.strEncode() carrier = [(PathwayContext.PROPAGATION_KEY_BASE64): secondEncode] timeSource.advance(MILLISECONDS.toNanos(2)) - def secondDecode = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash) + def secondDecode = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash, null) timeSource.advance(MILLISECONDS.toNanos(30)) context.setCheckpoint(new LinkedHashMap<>(["topic": "topicB", "type": "sqs"]), pointConsumer) @@ -530,7 +530,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def "Empty tags not set"() { given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) when: timeSource.advance(50) @@ -591,7 +591,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { globalTraceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) timeSource.advance(MILLISECONDS.toNanos(50)) context.setCheckpoint(new LinkedHashMap<>(["type": "internal"]), pointConsumer) def encoded = context.strEncode() @@ -637,7 +637,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { globalTraceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) timeSource.advance(MILLISECONDS.toNanos(50)) context.setCheckpoint(new LinkedHashMap<>(["type": "internal"]), pointConsumer) def encoded = context.strEncode() @@ -678,7 +678,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { globalTraceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) - def context = new DefaultPathwayContext(timeSource, baseHash) + def context = new DefaultPathwayContext(timeSource, baseHash, null) timeSource.advance(MILLISECONDS.toNanos(50)) context.setCheckpoint(new LinkedHashMap<>(["type": "internal"]), pointConsumer) def encoded = context.strEncode() diff --git a/dd-trace-core/src/traceAgentTest/groovy/DataStreamsIntegrationTest.groovy b/dd-trace-core/src/traceAgentTest/groovy/DataStreamsIntegrationTest.groovy index 6f7ac8676a1..8b5fe50e019 100644 --- a/dd-trace-core/src/traceAgentTest/groovy/DataStreamsIntegrationTest.groovy +++ b/dd-trace-core/src/traceAgentTest/groovy/DataStreamsIntegrationTest.groovy @@ -51,7 +51,7 @@ class DataStreamsIntegrationTest extends DDSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(sink, sharedCommunicationObjects.featuresDiscovery, timeSource, { traceConfig }, Config.get()) dataStreams.start() - dataStreams.accept(new StatsPoint("testType", "testGroup", "testTopic", 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.accept(new StatsPoint("testType", "testGroup", "testTopic", 1, 2, timeSource.currentTimeNanos, 0, 0, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentDataStreamsMonitoring.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentDataStreamsMonitoring.java index 641716a2df3..660c9be04eb 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentDataStreamsMonitoring.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentDataStreamsMonitoring.java @@ -42,4 +42,15 @@ void setCheckpoint( Schema getSchema(String schemaName, SchemaIterator iterator); void setProduceCheckpoint(String type, String target); + + /** + * setServiceNameOverride is used override service name for all DataStreams payloads produced + * within Thread.currentThread() + * + * @param serviceName new service name to use for DSM checkpoints. + */ + void setThreadServiceName(String serviceName); + + /** clearThreadServiceName clears up service name override for Thread.currentThread() */ + void clearThreadServiceName(); } diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java index 256aab3d970..88f2e4e2b99 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java @@ -1134,6 +1134,12 @@ public Schema getSchema(String schemaName, SchemaIterator iterator) { @Override public void setProduceCheckpoint(String type, String target) {} + @Override + public void setThreadServiceName(String serviceName) {} + + @Override + public void clearThreadServiceName() {} + @Override public void setConsumeCheckpoint( String type, String source, DataStreamsContextCarrier carrier) {} diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/Backlog.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/Backlog.java index 333a43f6e87..9b2a26540bc 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/Backlog.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/Backlog.java @@ -18,13 +18,20 @@ public long getTimestampNanos() { return timestampNanos; } + public String getServiceNameOverride() { + return serviceNameOverride; + } + private final List sortedTags; private final long value; private final long timestampNanos; + private final String serviceNameOverride; - public Backlog(List sortedTags, long value, long timestampNanos) { + public Backlog( + List sortedTags, long value, long timestampNanos, String serviceNameOverride) { this.sortedTags = sortedTags; this.value = value; this.timestampNanos = timestampNanos; + this.serviceNameOverride = serviceNameOverride; } } diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/StatsPoint.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/StatsPoint.java index 1901b160ebb..fb313ddcef2 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/StatsPoint.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/StatsPoint.java @@ -11,6 +11,7 @@ public class StatsPoint implements InboxItem { private final long pathwayLatencyNano; private final long edgeLatencyNano; private final long payloadSizeBytes; + private final String serviceNameOverride; public StatsPoint( List edgeTags, @@ -20,7 +21,8 @@ public StatsPoint( long timestampNanos, long pathwayLatencyNano, long edgeLatencyNano, - long payloadSizeBytes) { + long payloadSizeBytes, + String serviceNameOverride) { this.edgeTags = edgeTags; this.hash = hash; this.parentHash = parentHash; @@ -29,6 +31,7 @@ public StatsPoint( this.pathwayLatencyNano = pathwayLatencyNano; this.edgeLatencyNano = edgeLatencyNano; this.payloadSizeBytes = payloadSizeBytes; + this.serviceNameOverride = serviceNameOverride; } public List getEdgeTags() { @@ -63,6 +66,10 @@ public long getAggregationHash() { return aggregationHash; } + public String getServiceNameOverride() { + return serviceNameOverride; + } + @Override public String toString() { return "StatsPoint{"