diff --git a/dd-java-agent/instrumentation/spark/spark_2.12/build.gradle b/dd-java-agent/instrumentation/spark/spark_2.12/build.gradle index a9a87a99ef1..e92abde7e15 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.12/build.gradle +++ b/dd-java-agent/instrumentation/spark/spark_2.12/build.gradle @@ -41,13 +41,24 @@ dependencies { testImplementation group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: "$sparkVersion" testImplementation group: 'org.apache.spark', name: "spark-yarn_$scalaVersion", version: "$sparkVersion" + testImplementation group: 'org.apache.spark', name:"spark-sql-kafka-0-10_$scalaVersion", version: "$sparkVersion" + testImplementation group: 'org.apache.kafka', name: "kafka_$scalaVersion", version: '2.4.0' + testImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: '2.4.0' + testImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.4.0.RELEASE' + testImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '2.4.0.RELEASE' + + testRuntimeOnly project(':dd-java-agent:instrumentation:kafka-clients-0.11') + test_spark24Implementation group: 'org.apache.spark', name: "spark-core_$scalaVersion", version: "2.4.8" test_spark24Implementation group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: "2.4.8" test_spark24Implementation group: 'org.apache.spark', name: "spark-yarn_$scalaVersion", version: "2.4.8" + test_spark24Implementation group: 'org.apache.spark', name:"spark-sql-kafka-0-10_$scalaVersion", version: "2.4.8" test_spark32Implementation group: 'org.apache.spark', name: "spark-core_$scalaVersion", version: "3.2.4" test_spark32Implementation group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: "3.2.4" test_spark32Implementation group: 'org.apache.spark', name: "spark-yarn_$scalaVersion", version: "3.2.4" + test_spark24Implementation group: 'org.apache.spark', name:"spark-sql-kafka-0-10_$scalaVersion", version: "3.2.4" + // We do not support netty versions older than this because of a change to the number of parameters to the // PooledByteBufAllocator constructor. See this PR where the new constructor (the only one we support) was introduced: // https://github.com/netty/netty/pull/10267 @@ -56,6 +67,7 @@ dependencies { latestDepTestImplementation group: 'org.apache.spark', name: "spark-core_$scalaVersion", version: '+' latestDepTestImplementation group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: '+' latestDepTestImplementation group: 'org.apache.spark', name: "spark-yarn_$scalaVersion", version: '+' + latestDepTestImplementation group: 'org.apache.spark', name:"spark-sql-kafka-0-10_$scalaVersion", version: "+" } tasks.named("test").configure { diff --git a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java index 15e6fa5a80f..b8ce59643d2 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java @@ -20,6 +20,7 @@ public String[] helperClassNames() { packageName + ".RemoveEldestHashMap", packageName + ".SparkAggregatedTaskMetrics", packageName + ".SparkConfAllowList", + packageName + ".SparkConfUtils", packageName + ".SparkSQLUtils", packageName + ".SparkSQLUtils$SparkPlanInfoForStage", packageName + ".SparkSQLUtils$AccumulatorWithStage", diff --git a/dd-java-agent/instrumentation/spark/spark_2.12/src/test/groovy/SparkStreamingKafkaTest.groovy b/dd-java-agent/instrumentation/spark/spark_2.12/src/test/groovy/SparkStreamingKafkaTest.groovy new file mode 100644 index 00000000000..cf22b2c4db1 --- /dev/null +++ b/dd-java-agent/instrumentation/spark/spark_2.12/src/test/groovy/SparkStreamingKafkaTest.groovy @@ -0,0 +1,89 @@ +import datadog.trace.agent.test.AgentTestRunner +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.spark.api.java.function.VoidFunction2 +import org.apache.spark.sql.Dataset +import org.apache.spark.sql.Row +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.streaming.Trigger +import org.junit.Rule +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.test.EmbeddedKafkaBroker +import org.springframework.kafka.test.rule.EmbeddedKafkaRule +import org.springframework.kafka.test.utils.KafkaTestUtils + +class SparkStreamingKafkaTest extends AgentTestRunner { + static final SOURCE_TOPIC = "source" + static final SINK_TOPIC = "sink" + + @Override + boolean isDataStreamsEnabled() { + return true + } + + @Rule + EmbeddedKafkaRule kafkaRule = new EmbeddedKafkaRule(1, false, 1, SOURCE_TOPIC, SINK_TOPIC) + EmbeddedKafkaBroker embeddedKafka = kafkaRule.embeddedKafka + + @Override + void configurePreAgent() { + super.configurePreAgent() + injectSysConfig("dd.integration.spark.enabled", "true") + injectSysConfig("dd.integration.kafka.enabled", "true") + } + + def "test dsm checkpoints are correctly set"() { + setup: + def appName = "test-app" + def sparkSession = SparkSession.builder() + .config("spark.master", "local[2]") + .config("spark.driver.bindAddress", "localhost") + .appName(appName) + .getOrCreate() + + def producerProps = KafkaTestUtils.producerProps(embeddedKafka.getBrokersAsString()) + def producer = new DefaultKafkaProducerFactory(producerProps).createProducer() + + when: + for (int i = 0; i < 100; i++) { + producer.send(new ProducerRecord<>(SOURCE_TOPIC, i, i.toString())) + } + producer.flush() + + def df = sparkSession + .readStream() + .format("kafka") + .option("kafka.bootstrap.servers", embeddedKafka.getBrokersAsString()) + .option("startingOffsets", "earliest") + .option("failOnDataLoss", "false") + .option("subscribe", SOURCE_TOPIC) + .load() + + def query = df + .selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value") + .writeStream() + .format("kafka") + .option("kafka.bootstrap.servers", embeddedKafka.getBrokersAsString()) + .option("checkpointLocation", "/tmp/" + System.currentTimeMillis().toString()) + .option("topic", SINK_TOPIC) + .trigger(Trigger.Once()) + .foreachBatch(new VoidFunction2, Long>() { + @Override + void call(Dataset rowDataset, Long aLong) throws Exception { + rowDataset.show() + rowDataset.write() + } + }) + .start() + + query.processAllAvailable() + + then: + query.stop() + producer.close() + + // check that checkpoints were written with a service name override == "SparkAppName" + assert TEST_DATA_STREAMS_WRITER.payloads.size() > 0 + assert TEST_DATA_STREAMS_WRITER.services.size() == 1 + assert TEST_DATA_STREAMS_WRITER.services.get(0) == appName + } +} diff --git a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java index 0d80eb7553c..e0036f79dca 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java @@ -20,6 +20,7 @@ public String[] helperClassNames() { packageName + ".RemoveEldestHashMap", packageName + ".SparkAggregatedTaskMetrics", packageName + ".SparkConfAllowList", + packageName + ".SparkConfUtils", packageName + ".SparkSQLUtils", packageName + ".SparkSQLUtils$SparkPlanInfoForStage", packageName + ".SparkSQLUtils$AccumulatorWithStage", diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java index 9572f0cef1a..3fc6070c076 100644 --- a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java @@ -4,10 +4,12 @@ import static datadog.trace.core.datastreams.TagsProcessor.PARTITION_TAG; import static datadog.trace.core.datastreams.TagsProcessor.TOPIC_TAG; import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG; +import static datadog.trace.instrumentation.spark.SparkConfUtils.getDatabricksClusterName; +import static datadog.trace.instrumentation.spark.SparkConfUtils.getIsRunningOnDatabricks; +import static datadog.trace.instrumentation.spark.SparkConfUtils.getServiceNameOverride; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import datadog.trace.api.Config; import datadog.trace.api.DDTags; import datadog.trace.api.DDTraceId; import datadog.trace.api.sampling.PrioritySampling; @@ -110,8 +112,7 @@ public abstract class AbstractDatadogSparkListener extends SparkListener { private final boolean isRunningOnDatabricks; private final String databricksClusterName; - private final String databricksServiceName; - private final String sparkServiceName; + private final String serviceNameOverride; private boolean lastJobFailed = false; private String lastJobFailedMessage; @@ -130,10 +131,9 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp this.appId = appId; this.sparkVersion = sparkVersion; - isRunningOnDatabricks = sparkConf.contains("spark.databricks.sparkContextId"); - databricksClusterName = sparkConf.get("spark.databricks.clusterUsageTags.clusterName", null); - databricksServiceName = getDatabricksServiceName(sparkConf, databricksClusterName); - sparkServiceName = getSparkServiceName(sparkConf, isRunningOnDatabricks); + isRunningOnDatabricks = getIsRunningOnDatabricks(sparkConf); + databricksClusterName = getDatabricksClusterName(sparkConf); + serviceNameOverride = getServiceNameOverride(sparkConf); // If JVM exiting with System.exit(code), it bypass the code closing the application span // @@ -924,10 +924,8 @@ private AgentTracer.SpanBuilder buildSparkSpan(String spanName, Properties prope AgentTracer.SpanBuilder builder = tracer.buildSpan(spanName).withSpanType("spark").withTag("app_id", appId); - if (databricksServiceName != null) { - builder.withServiceName(databricksServiceName); - } else if (sparkServiceName != null) { - builder.withServiceName(sparkServiceName); + if (serviceNameOverride != null) { + builder.withServiceName(serviceNameOverride); } addPropertiesTags(builder, properties); @@ -1153,45 +1151,6 @@ private static String getBatchIdFromBatchKey(String batchKey) { return batchKey.substring(batchKey.lastIndexOf(".") + 1); } - private static String getDatabricksServiceName(SparkConf conf, String databricksClusterName) { - if (Config.get().isServiceNameSetByUser()) { - return null; - } - - String serviceName = null; - String runName = getDatabricksRunName(conf); - if (runName != null) { - serviceName = "databricks.job-cluster." + runName; - } else if (databricksClusterName != null) { - serviceName = "databricks.all-purpose-cluster." + databricksClusterName; - } - - return serviceName; - } - - private static String getSparkServiceName(SparkConf conf, boolean isRunningOnDatabricks) { - // If config is not set or running on databricks, not changing the service name - if (!Config.get().useSparkAppNameAsService() || isRunningOnDatabricks) { - return null; - } - - // Keep service set by user, except if it is only "spark" or "hadoop" that can be set by USM - String serviceName = Config.get().getServiceName(); - if (Config.get().isServiceNameSetByUser() - && !"spark".equals(serviceName) - && !"hadoop".equals(serviceName)) { - log.debug("Service '{}' explicitly set by user, not using the application name", serviceName); - return null; - } - - String sparkAppName = conf.get("spark.app.name", null); - if (sparkAppName != null) { - log.info("Using Spark application name '{}' as the Datadog service name", sparkAppName); - } - - return sparkAppName; - } - private static void reportKafkaOffsets( final String appName, final AgentSpan span, final SourceProgress progress) { if (!span.traceConfig().isDataStreamsEnabled() @@ -1234,34 +1193,4 @@ private static void reportKafkaOffsets( } } } - - private static String getDatabricksRunName(SparkConf conf) { - String allTags = conf.get("spark.databricks.clusterUsageTags.clusterAllTags", null); - if (allTags == null) { - return null; - } - - try { - // Using the jackson JSON lib used by spark - // https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.12/3.5.0 - JsonNode jsonNode = objectMapper.readTree(allTags); - - for (JsonNode node : jsonNode) { - String key = node.get("key").asText(); - if ("RunName".equals(key)) { - // Databricks jobs launched by Azure Data Factory have an uuid at the end of the name - return removeUuidFromEndOfString(node.get("value").asText()); - } - } - } catch (Exception ignored) { - } - - return null; - } - - @SuppressForbidden // called at most once per spark application - private static String removeUuidFromEndOfString(String input) { - return input.replaceAll( - "_[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$", ""); - } } diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/SparkConfUtils.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/SparkConfUtils.java new file mode 100644 index 00000000000..ac74e7eb1e3 --- /dev/null +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/SparkConfUtils.java @@ -0,0 +1,100 @@ +package datadog.trace.instrumentation.spark; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import datadog.trace.api.Config; +import de.thetaphi.forbiddenapis.SuppressForbidden; +import org.apache.spark.SparkConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SparkConfUtils { + private static final ObjectMapper objectMapper = new ObjectMapper(); + private static final Logger log = LoggerFactory.getLogger(SparkConfUtils.class); + + public static boolean getIsRunningOnDatabricks(SparkConf sparkConf) { + return sparkConf.contains("spark.databricks.sparkContextId"); + } + + public static String getDatabricksClusterName(SparkConf sparkConf) { + return sparkConf.get("spark.databricks.clusterUsageTags.clusterName", null); + } + + public static String getDatabricksServiceName(SparkConf conf, String databricksClusterName) { + if (Config.get().isServiceNameSetByUser()) { + return null; + } + + String serviceName = null; + String runName = getDatabricksRunName(conf); + if (runName != null) { + serviceName = "databricks.job-cluster." + runName; + } else if (databricksClusterName != null) { + serviceName = "databricks.all-purpose-cluster." + databricksClusterName; + } + + return serviceName; + } + + public static String getSparkServiceName(SparkConf conf, boolean isRunningOnDatabricks) { + // If config is not set or running on databricks, not changing the service name + if (!Config.get().useSparkAppNameAsService() || isRunningOnDatabricks) { + return null; + } + + // Keep service set by user, except if it is only "spark" or "hadoop" that can be set by USM + String serviceName = Config.get().getServiceName(); + if (Config.get().isServiceNameSetByUser() + && !"spark".equals(serviceName) + && !"hadoop".equals(serviceName)) { + log.debug("Service '{}' explicitly set by user, not using the application name", serviceName); + return null; + } + + String sparkAppName = conf.get("spark.app.name", null); + if (sparkAppName != null) { + log.info("Using Spark application name '{}' as the Datadog service name", sparkAppName); + } + + return sparkAppName; + } + + public static String getServiceNameOverride(SparkConf conf) { + boolean isRunningOnDatabricks = getIsRunningOnDatabricks(conf); + String databricksClusterName = getDatabricksClusterName(conf); + String databricksServiceName = getDatabricksServiceName(conf, databricksClusterName); + String sparkServiceName = getSparkServiceName(conf, isRunningOnDatabricks); + + return databricksServiceName != null ? databricksServiceName : sparkServiceName; + } + + private static String getDatabricksRunName(SparkConf conf) { + String allTags = conf.get("spark.databricks.clusterUsageTags.clusterAllTags", null); + if (allTags == null) { + return null; + } + + try { + // Using the jackson JSON lib used by spark + // https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.12/3.5.0 + JsonNode jsonNode = objectMapper.readTree(allTags); + + for (JsonNode node : jsonNode) { + String key = node.get("key").asText(); + if ("RunName".equals(key)) { + // Databricks jobs launched by Azure Data Factory have an uuid at the end of the name + return removeUuidFromEndOfString(node.get("value").asText()); + } + } + } catch (Exception ignored) { + } + + return null; + } + + @SuppressForbidden // called at most once per spark application + private static String removeUuidFromEndOfString(String input) { + return input.replaceAll( + "_[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$", ""); + } +} diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/SparkSessionInstrumentation.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/SparkSessionInstrumentation.java new file mode 100644 index 00000000000..fe2254eed80 --- /dev/null +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/SparkSessionInstrumentation.java @@ -0,0 +1,50 @@ +package datadog.trace.instrumentation.spark; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static datadog.trace.instrumentation.spark.SparkSessionUtils.SESSION_UTILS; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import net.bytebuddy.asm.Advice; +import org.apache.spark.SparkContext; + +@AutoService(InstrumenterModule.class) +public class SparkSessionInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForSingleType { + + public SparkSessionInstrumentation() { + super("spark-streaming"); + } + + @Override + public String instrumentedType() { + return "org.apache.spark.sql.SparkSession"; + } + + @Override + public String[] helperClassNames() { + return new String[] { + packageName + ".SparkSessionUtils", packageName + ".SparkConfUtils", + }; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isConstructor().and(isDeclaredBy(named("org.apache.spark.sql.SparkSession"))), + SparkSessionInstrumentation.class.getName() + "$SessionCreatedAdvice"); + } + + public static final class SessionCreatedAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static AgentScope enter(@Advice.Argument(0) SparkContext context) { + SESSION_UTILS.updatePreferredServiceName(context); + return null; + } + } +} diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/SparkSessionUtils.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/SparkSessionUtils.java new file mode 100644 index 00000000000..8ae0fc2329f --- /dev/null +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/SparkSessionUtils.java @@ -0,0 +1,16 @@ +package datadog.trace.instrumentation.spark; + +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; +import org.apache.spark.SparkContext; + +public class SparkSessionUtils { + public static SparkSessionUtils SESSION_UTILS = new SparkSessionUtils(); + + public void updatePreferredServiceName(SparkContext context) { + // we are not using `updatePreferredServiceName` here, since it will + // update service names for all spans. + AgentTracer.get() + .getDataStreamsMonitoring() + .setPreferredServiceName(SparkConfUtils.getServiceNameOverride(context.conf())); + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextExtractor.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextExtractor.java index f831d1cf10d..f97989b8798 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextExtractor.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextExtractor.java @@ -12,19 +12,16 @@ public class DataStreamContextExtractor implements HttpCodec.Extractor { private final TimeSource timeSource; private final Supplier traceConfigSupplier; private final long hashOfKnownTags; - private final String serviceNameOverride; public DataStreamContextExtractor( HttpCodec.Extractor delegate, TimeSource timeSource, Supplier traceConfigSupplier, - long hashOfKnownTags, - String serviceNameOverride) { + long hashOfKnownTags) { this.delegate = delegate; this.timeSource = timeSource; this.traceConfigSupplier = traceConfigSupplier; this.hashOfKnownTags = hashOfKnownTags; - this.serviceNameOverride = serviceNameOverride; } @Override @@ -40,8 +37,7 @@ public TagContext extract(C carrier, AgentPropagation.ContextVisitor gett if (shouldExtractPathwayContext) { DefaultPathwayContext pathwayContext = - DefaultPathwayContext.extract( - carrier, getter, this.timeSource, this.hashOfKnownTags, serviceNameOverride); + DefaultPathwayContext.extract(carrier, getter, this.timeSource, this.hashOfKnownTags); extracted.withPathwayContext(pathwayContext); } @@ -49,8 +45,7 @@ public TagContext extract(C carrier, AgentPropagation.ContextVisitor gett return extracted; } else if (traceConfigSupplier.get().isDataStreamsEnabled()) { DefaultPathwayContext pathwayContext = - DefaultPathwayContext.extract( - carrier, getter, this.timeSource, this.hashOfKnownTags, serviceNameOverride); + DefaultPathwayContext.extract(carrier, getter, this.timeSource, this.hashOfKnownTags); if (pathwayContext != null) { extracted = new TagContext(); diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java index 5398d420122..6c82985e2b9 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java @@ -74,8 +74,9 @@ public class DefaultDataStreamsMonitoring implements DataStreamsMonitoring, Even private volatile boolean supportsDataStreams = false; private volatile boolean agentSupportsDataStreams = false; private volatile boolean configSupportsDataStreams = false; + private static volatile String preferredServiceName = null; private final ConcurrentHashMap schemaSamplers; - private static final ThreadLocal serviceNameOverride = new ThreadLocal<>(); + private static final ThreadLocal threadServiceName = new ThreadLocal<>(); public DefaultDataStreamsMonitoring( Config config, @@ -193,22 +194,40 @@ public void setThreadServiceName(String serviceName) { return; } - serviceNameOverride.set(serviceName); + threadServiceName.set(serviceName); } @Override public void clearThreadServiceName() { - serviceNameOverride.remove(); + threadServiceName.remove(); } - private static String getThreadServiceName() { - return serviceNameOverride.get(); + @Override + public void setPreferredServiceName(String serviceName) { + if (serviceName == null || serviceName.isEmpty()) { + clearPreferredServiceName(); + return; + } + + preferredServiceName = serviceName; + } + + @Override + public void clearPreferredServiceName() { + preferredServiceName = null; + } + + public static String getServiceNameOverride() { + String perThreadName = threadServiceName.get(); + return perThreadName != null + ? perThreadName + : preferredServiceName != null ? preferredServiceName : null; } @Override public PathwayContext newPathwayContext() { if (configSupportsDataStreams) { - return new DefaultPathwayContext(timeSource, hashOfKnownTags, getThreadServiceName()); + return new DefaultPathwayContext(timeSource, hashOfKnownTags); } else { return AgentTracer.NoopPathwayContext.INSTANCE; } @@ -217,7 +236,7 @@ public PathwayContext newPathwayContext() { @Override public HttpCodec.Extractor extractor(HttpCodec.Extractor delegate) { return new DataStreamContextExtractor( - delegate, timeSource, traceConfigSupplier, hashOfKnownTags, getThreadServiceName()); + delegate, timeSource, traceConfigSupplier, hashOfKnownTags); } @Override @@ -233,8 +252,7 @@ public void mergePathwayContextIntoSpan(AgentSpan span, DataStreamsContextCarrie carrier, DataStreamsContextCarrierAdapter.INSTANCE, this.timeSource, - this.hashOfKnownTags, - getThreadServiceName()); + this.hashOfKnownTags); ((DDSpan) span).context().mergePathwayContext(pathwayContext); } } @@ -248,7 +266,8 @@ public void trackBacklog(LinkedHashMap sortedTags, long value) { } tags.add(tag); } - inbox.offer(new Backlog(tags, value, timeSource.getCurrentTimeNanos(), getThreadServiceName())); + inbox.offer( + new Backlog(tags, value, timeSource.getCurrentTimeNanos(), getServiceNameOverride())); } @Override diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java index 1fec6b9852b..f56763322cb 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java @@ -1,5 +1,6 @@ package datadog.trace.core.datastreams; +import static datadog.trace.core.datastreams.DefaultDataStreamsMonitoring.getServiceNameOverride; import static java.nio.charset.StandardCharsets.ISO_8859_1; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -69,11 +70,10 @@ public class DefaultPathwayContext implements PathwayContext { TagsProcessor.DATASET_NAMESPACE_TAG, TagsProcessor.MANUAL_TAG)); - public DefaultPathwayContext( - TimeSource timeSource, long hashOfKnownTags, String serviceNameOverride) { + public DefaultPathwayContext(TimeSource timeSource, long hashOfKnownTags) { this.timeSource = timeSource; this.hashOfKnownTags = hashOfKnownTags; - this.serviceNameOverride = serviceNameOverride; + this.serviceNameOverride = getServiceNameOverride(); } private DefaultPathwayContext( @@ -82,9 +82,8 @@ private DefaultPathwayContext( long pathwayStartNanos, long pathwayStartNanoTicks, long edgeStartNanoTicks, - long hash, - String serviceNameOverride) { - this(timeSource, hashOfKnownTags, serviceNameOverride); + long hash) { + this(timeSource, hashOfKnownTags); this.pathwayStartNanos = pathwayStartNanos; this.pathwayStartNanoTicks = pathwayStartNanoTicks; this.edgeStartNanoTicks = edgeStartNanoTicks; @@ -278,21 +277,18 @@ public String toString() { private static class PathwayContextExtractor implements AgentPropagation.KeyClassifier { private final TimeSource timeSource; private final long hashOfKnownTags; - private final String serviceNameOverride; private DefaultPathwayContext extractedContext; - PathwayContextExtractor( - TimeSource timeSource, long hashOfKnownTags, String serviceNameOverride) { + PathwayContextExtractor(TimeSource timeSource, long hashOfKnownTags) { this.timeSource = timeSource; this.hashOfKnownTags = hashOfKnownTags; - this.serviceNameOverride = serviceNameOverride; } @Override public boolean accept(String key, String value) { if (PathwayContext.PROPAGATION_KEY_BASE64.equalsIgnoreCase(key)) { try { - extractedContext = strDecode(timeSource, hashOfKnownTags, serviceNameOverride, value); + extractedContext = strDecode(timeSource, hashOfKnownTags, value); } catch (IOException e) { return false; } @@ -305,14 +301,11 @@ private static class BinaryPathwayContextExtractor implements AgentPropagation.BinaryKeyClassifier { private final TimeSource timeSource; private final long hashOfKnownTags; - private final String serviceNameOverride; private DefaultPathwayContext extractedContext; - BinaryPathwayContextExtractor( - TimeSource timeSource, long hashOfKnownTags, String serviceNameOverride) { + BinaryPathwayContextExtractor(TimeSource timeSource, long hashOfKnownTags) { this.timeSource = timeSource; this.hashOfKnownTags = hashOfKnownTags; - this.serviceNameOverride = serviceNameOverride; } @Override @@ -320,7 +313,7 @@ public boolean accept(String key, byte[] value) { // older versions support, should be removed in the future if (PathwayContext.PROPAGATION_KEY.equalsIgnoreCase(key)) { try { - extractedContext = decode(timeSource, hashOfKnownTags, serviceNameOverride, value); + extractedContext = decode(timeSource, hashOfKnownTags, value); } catch (IOException e) { return false; } @@ -328,7 +321,7 @@ public boolean accept(String key, byte[] value) { if (PathwayContext.PROPAGATION_KEY_BASE64.equalsIgnoreCase(key)) { try { - extractedContext = base64Decode(timeSource, hashOfKnownTags, serviceNameOverride, value); + extractedContext = base64Decode(timeSource, hashOfKnownTags, value); } catch (IOException e) { return false; } @@ -341,18 +334,13 @@ static DefaultPathwayContext extract( C carrier, AgentPropagation.ContextVisitor getter, TimeSource timeSource, - long hashOfKnownTags, - String serviceNameOverride) { + long hashOfKnownTags) { if (getter instanceof AgentPropagation.BinaryContextVisitor) { return extractBinary( - carrier, - (AgentPropagation.BinaryContextVisitor) getter, - timeSource, - hashOfKnownTags, - serviceNameOverride); + carrier, (AgentPropagation.BinaryContextVisitor) getter, timeSource, hashOfKnownTags); } PathwayContextExtractor pathwayContextExtractor = - new PathwayContextExtractor(timeSource, hashOfKnownTags, serviceNameOverride); + new PathwayContextExtractor(timeSource, hashOfKnownTags); getter.forEachKey(carrier, pathwayContextExtractor); if (pathwayContextExtractor.extractedContext == null) { log.debug("No context extracted"); @@ -366,10 +354,9 @@ static DefaultPathwayContext extractBinary( C carrier, AgentPropagation.BinaryContextVisitor getter, TimeSource timeSource, - long hashOfKnownTags, - String serviceNameOverride) { + long hashOfKnownTags) { BinaryPathwayContextExtractor pathwayContextExtractor = - new BinaryPathwayContextExtractor(timeSource, hashOfKnownTags, serviceNameOverride); + new BinaryPathwayContextExtractor(timeSource, hashOfKnownTags); getter.forEachKey(carrier, pathwayContextExtractor); if (pathwayContextExtractor.extractedContext == null) { log.debug("No context extracted"); @@ -380,22 +367,18 @@ static DefaultPathwayContext extractBinary( } private static DefaultPathwayContext strDecode( - TimeSource timeSource, long hashOfKnownTags, String serviceNameOverride, String data) - throws IOException { + TimeSource timeSource, long hashOfKnownTags, String data) throws IOException { byte[] base64Bytes = data.getBytes(UTF_8); - return base64Decode(timeSource, hashOfKnownTags, serviceNameOverride, base64Bytes); + return base64Decode(timeSource, hashOfKnownTags, base64Bytes); } private static DefaultPathwayContext base64Decode( - TimeSource timeSource, long hashOfKnownTags, String serviceNameOverride, byte[] data) - throws IOException { - return decode( - timeSource, hashOfKnownTags, serviceNameOverride, Base64.getDecoder().decode(data)); + TimeSource timeSource, long hashOfKnownTags, byte[] data) throws IOException { + return decode(timeSource, hashOfKnownTags, Base64.getDecoder().decode(data)); } private static DefaultPathwayContext decode( - TimeSource timeSource, long hashOfKnownTags, String serviceNameOverride, byte[] data) - throws IOException { + TimeSource timeSource, long hashOfKnownTags, byte[] data) throws IOException { ByteArrayInput input = ByteArrayInput.wrap(data); long hash = input.readLongLE(); @@ -419,8 +402,7 @@ private static DefaultPathwayContext decode( pathwayStartNanos, pathwayStartNanoTicks, edgeStartNanoTicks, - hash, - serviceNameOverride); + hash); } static class DataSetHashBuilder { diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/StatsBucket.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/StatsBucket.java index 79f73df8876..c372f702ce9 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/StatsBucket.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/StatsBucket.java @@ -60,4 +60,16 @@ public Collection getGroups() { public Collection, Long>> getBacklogs() { return backlogs.entrySet(); } + + @Override + public String toString() { + return "StatsBucket{" + + "startTimeNanos=" + + startTimeNanos + + ", bucketDurationNanos=" + + bucketDurationNanos + + ", backlogs=" + + backlogs.size() + + "}"; + } } diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentDataStreamsMonitoring.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentDataStreamsMonitoring.java index 660c9be04eb..bfc390fcf4f 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentDataStreamsMonitoring.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentDataStreamsMonitoring.java @@ -53,4 +53,15 @@ void setCheckpoint( /** clearThreadServiceName clears up service name override for Thread.currentThread() */ void clearThreadServiceName(); + + /** + * setPreferredServiceName is used override service name for all DataStreams payloads produced, no + * matter what thread they were created in. It has lower priority then per-thread overrides. + * + * @param serviceName new service name to use for DSM checkpoints. + */ + void setPreferredServiceName(String serviceName); + + /** clearPreferredServiceName removes the service name override */ + void clearPreferredServiceName(); } diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java index 0518c92563b..20095e1474d 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java @@ -1135,6 +1135,12 @@ public void setThreadServiceName(String serviceName) {} @Override public void clearThreadServiceName() {} + @Override + public void setPreferredServiceName(String serviceName) {} + + @Override + public void clearPreferredServiceName() {} + @Override public void setConsumeCheckpoint( String type, String source, DataStreamsContextCarrier carrier) {} diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/StatsPoint.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/StatsPoint.java index fb313ddcef2..1cd8ca13fcc 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/StatsPoint.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/StatsPoint.java @@ -90,6 +90,8 @@ public String toString() { + edgeLatencyNano + ", payloadSizeBytes=" + payloadSizeBytes - + '}'; + + ", serviceNameOverride=" + + serviceNameOverride + + "'}"; } }