diff --git a/dd-java-agent/instrumentation/spark-executor/build.gradle b/dd-java-agent/instrumentation/spark-executor/build.gradle index 159f7b05907..4525dafc370 100644 --- a/dd-java-agent/instrumentation/spark-executor/build.gradle +++ b/dd-java-agent/instrumentation/spark-executor/build.gradle @@ -33,24 +33,13 @@ ext { dependencies { compileOnly group: 'org.apache.spark', name: 'spark-core_2.12', version: '2.4.0' compileOnly group: 'org.apache.spark', name: 'spark-sql_2.12', version: '2.4.0' - compileOnly group: 'org.apache.spark', name:'spark-sql-kafka-0-10_2.12', version: "2.4.0" baseTestImplementation group: 'org.apache.spark', name: "spark-core_2.12", version: "2.4.0" baseTestImplementation group: 'org.apache.spark', name: "spark-sql_2.12", version: "2.4.0" - baseTestImplementation group: 'org.apache.spark', name: "spark-sql_2.12", version: "2.4.0" - baseTestImplementation group: 'org.apache.spark', name:'spark-sql-kafka-0-10_2.12', version: "2.4.0" - testImplementation group: 'org.apache.kafka', name: 'kafka_2.12', version: '2.4.0' - testImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: '2.4.0' - testImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.4.0.RELEASE' - testImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '2.4.0.RELEASE' latest212DepTestImplementation group: 'org.apache.spark', name: "spark-core_2.12", version: '3.+' latest212DepTestImplementation group: 'org.apache.spark', name: "spark-sql_2.12", version: '3.+' - latest212DepTestImplementation group: 'org.apache.spark', name: "spark-sql_2.12", version: "3.+" - latest212DepTestImplementation group: 'org.apache.spark', name:'spark-sql-kafka-0-10_2.12', version: "2.4.0" latest213DepTestImplementation group: 'org.apache.spark', name: "spark-core_2.13", version: '3.+' latest213DepTestImplementation group: 'org.apache.spark', name: "spark-sql_2.13", version: '3.+' - latest212DepTestImplementation group: 'org.apache.spark', name: "spark-sql_2.13", version: "3.+" - latest212DepTestImplementation group: 'org.apache.spark', name:'spark-sql-kafka-0-10_2.13', version: "3.+" } diff --git a/dd-java-agent/instrumentation/spark-executor/src/baseTest/groovy/SparkExecutorTest.groovy b/dd-java-agent/instrumentation/spark-executor/src/baseTest/groovy/SparkExecutorTest.groovy index 16dec400604..7e9ae0951f5 100644 --- a/dd-java-agent/instrumentation/spark-executor/src/baseTest/groovy/SparkExecutorTest.groovy +++ b/dd-java-agent/instrumentation/spark-executor/src/baseTest/groovy/SparkExecutorTest.groovy @@ -1,38 +1,17 @@ import datadog.trace.agent.test.AgentTestRunner import datadog.trace.bootstrap.instrumentation.api.Tags -import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.spark.api.java.function.VoidFunction2 import org.apache.spark.sql.Dataset import org.apache.spark.sql.Row import org.apache.spark.sql.RowFactory import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.types.StructType -import org.junit.ClassRule -import org.springframework.kafka.core.DefaultKafkaProducerFactory -import org.springframework.kafka.test.EmbeddedKafkaBroker -import org.springframework.kafka.test.rule.EmbeddedKafkaRule -import org.springframework.kafka.test.utils.KafkaTestUtils -import spock.lang.Shared - class SparkExecutorTest extends AgentTestRunner { - static final SOURCE_TOPIC = "source" - static final SINK_TOPIC = "sink" - - @Shared - @ClassRule - EmbeddedKafkaRule kafkaRule = new EmbeddedKafkaRule(1, false, 1, SOURCE_TOPIC, SINK_TOPIC) - EmbeddedKafkaBroker embeddedKafka = kafkaRule.embeddedKafka @Override void configurePreAgent() { super.configurePreAgent() injectSysConfig("dd.integration.spark-executor.enabled", "true") - injectSysConfig("dd.integration.spark.enabled", "true") - injectSysConfig("dd.integration.kafka.enabled", "true") - injectSysConfig("dd.data.streams.enabled", "true") - injectSysConfig("dd.trace.debug", "true") } private Dataset generateSampleDataframe(SparkSession spark) { @@ -44,57 +23,6 @@ class SparkExecutorTest extends AgentTestRunner { spark.createDataFrame(rows, structType) } - def "test dsm service name override"() { - setup: - def sparkSession = SparkSession.builder() - .config("spark.master", "local[2]") - .config("spark.driver.bindAddress", "localhost") - // .config("spark.sql.shuffle.partitions", "2") - .appName("test-app") - .getOrCreate() - - def producerProps = KafkaTestUtils.producerProps(embeddedKafka.getBrokersAsString()) - def producer = new DefaultKafkaProducerFactory(producerProps).createProducer() - - when: - for (int i = 0; i < 100; i++) { - producer.send(new ProducerRecord<>(SOURCE_TOPIC, i, i.toString())) - } - producer.flush() - - def df = sparkSession - .readStream() - .format("kafka") - .option("kafka.bootstrap.servers", embeddedKafka.getBrokersAsString()) - .option("startingOffsets", "earliest") - .option("failOnDataLoss", "false") - .option("subscribe", SOURCE_TOPIC) - .load() - - def query = df - .selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value") - .writeStream() - .format("kafka") - .option("kafka.bootstrap.servers", embeddedKafka.getBrokersAsString()) - .option("checkpointLocation", "/tmp/" + System.currentTimeMillis().toString()) - .option("topic", SINK_TOPIC) - .trigger(Trigger.Once()) - .foreachBatch(new VoidFunction2, Long>() { - @Override - void call(Dataset rowDataset, Long aLong) throws Exception { - rowDataset.show() - rowDataset.write() - } - }) - .start() - - query.processAllAvailable() - - then: - query.stop() - producer.close() - } - def "generate spark task run spans"() { setup: def sparkSession = SparkSession.builder() diff --git a/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java b/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java index a5b4eea27aa..0f9519c696c 100644 --- a/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java +++ b/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java @@ -1,12 +1,8 @@ package datadog.trace.instrumentation.spark; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; -import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import datadog.trace.bootstrap.instrumentation.decorator.BaseDecorator; -import datadog.trace.util.MethodHandles; -import java.lang.invoke.MethodHandle; -import java.util.Properties; import org.apache.spark.executor.Executor; import org.apache.spark.executor.TaskMetrics; @@ -14,20 +10,6 @@ public class SparkExecutorDecorator extends BaseDecorator { public static final CharSequence SPARK_TASK = UTF8BytesString.create("spark.task"); public static final CharSequence SPARK = UTF8BytesString.create("spark"); public static SparkExecutorDecorator DECORATE = new SparkExecutorDecorator(); - private final String propSparkAppName = "spark.app.name"; - private static final String TASK_DESCRIPTION_CLASSNAME = - "org.apache.spark.scheduler.TaskDescription"; - private static final MethodHandle propertiesField_mh = getFieldGetter(); - - private static MethodHandle getFieldGetter() { - try { - return new MethodHandles(Executor.class.getClassLoader()) - .privateFieldGetter(TASK_DESCRIPTION_CLASSNAME, "properties"); - } catch (Throwable ignored) { - // should be already logged - } - return null; - } @Override protected String[] instrumentationNames() { @@ -44,29 +26,12 @@ protected CharSequence component() { return SPARK; } - public void onTaskStart(AgentSpan span, Executor.TaskRunner taskRunner, Object taskDescription) { + public void onTaskStart(AgentSpan span, Executor.TaskRunner taskRunner) { span.setTag("task_id", taskRunner.taskId()); span.setTag("task_thread_name", taskRunner.threadName()); - - if (taskDescription != null && propertiesField_mh != null) { - try { - Properties props = (Properties) propertiesField_mh.invoke(taskDescription); - if (props != null) { - String appName = props.getProperty(propSparkAppName); - if (appName != null) { - AgentTracer.get() - .getDataStreamsMonitoring() - .setThreadServiceName(taskRunner.getThreadId(), appName); - } - } - } catch (Throwable ignored) { - } - } } public void onTaskEnd(AgentSpan span, Executor.TaskRunner taskRunner) { - AgentTracer.get().getDataStreamsMonitoring().clearThreadServiceName(taskRunner.getThreadId()); - // task is set by spark in run() by deserializing the task binary coming from the driver if (taskRunner.task() == null) { return; @@ -85,7 +50,7 @@ public void onTaskEnd(AgentSpan span, Executor.TaskRunner taskRunner) { span.setTag("app_attempt_id", taskRunner.task().appAttemptId().get()); } span.setTag( - "application_name", taskRunner.task().localProperties().getProperty(propSparkAppName)); + "application_name", taskRunner.task().localProperties().getProperty("spark.app.name")); TaskMetrics metrics = taskRunner.task().metrics(); span.setMetric("spark.executor_deserialize_time", metrics.executorDeserializeTime()); diff --git a/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorInstrumentation.java b/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorInstrumentation.java index 8de403a83bf..0a8a6532326 100644 --- a/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorInstrumentation.java +++ b/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorInstrumentation.java @@ -52,13 +52,11 @@ public void methodAdvice(MethodTransformer transformer) { public static final class RunAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) - public static AgentScope enter( - @Advice.FieldValue("taskDescription") final Object taskDescription, - @Advice.This Executor.TaskRunner taskRunner) { + public static AgentScope enter(@Advice.This Executor.TaskRunner taskRunner) { final AgentSpan span = startSpan("spark-executor", SPARK_TASK); DECORATE.afterStart(span); - DECORATE.onTaskStart(span, taskRunner, taskDescription); + DECORATE.onTaskStart(span, taskRunner); return activateSpan(span); } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java index a966425f504..5398d420122 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java @@ -75,8 +75,7 @@ public class DefaultDataStreamsMonitoring implements DataStreamsMonitoring, Even private volatile boolean agentSupportsDataStreams = false; private volatile boolean configSupportsDataStreams = false; private final ConcurrentHashMap schemaSamplers; - private static final ConcurrentHashMap threadServiceNames = - new ConcurrentHashMap<>(); + private static final ThreadLocal serviceNameOverride = new ThreadLocal<>(); public DefaultDataStreamsMonitoring( Config config, @@ -188,29 +187,28 @@ public void setProduceCheckpoint(String type, String target) { } @Override - public void setThreadServiceName(Long threadId, String serviceName) { - // setting service name to null == removing the value + public void setThreadServiceName(String serviceName) { if (serviceName == null) { - clearThreadServiceName(threadId); + clearThreadServiceName(); return; } - threadServiceNames.put(threadId, serviceName); + serviceNameOverride.set(serviceName); } @Override - public void clearThreadServiceName(Long threadId) { - threadServiceNames.remove(threadId); + public void clearThreadServiceName() { + serviceNameOverride.remove(); } - private static String getThreadServiceNameOverride() { - return threadServiceNames.getOrDefault(Thread.currentThread().getId(), null); + private static String getThreadServiceName() { + return serviceNameOverride.get(); } @Override public PathwayContext newPathwayContext() { if (configSupportsDataStreams) { - return new DefaultPathwayContext(timeSource, hashOfKnownTags, getThreadServiceNameOverride()); + return new DefaultPathwayContext(timeSource, hashOfKnownTags, getThreadServiceName()); } else { return AgentTracer.NoopPathwayContext.INSTANCE; } @@ -219,7 +217,7 @@ public PathwayContext newPathwayContext() { @Override public HttpCodec.Extractor extractor(HttpCodec.Extractor delegate) { return new DataStreamContextExtractor( - delegate, timeSource, traceConfigSupplier, hashOfKnownTags, getThreadServiceNameOverride()); + delegate, timeSource, traceConfigSupplier, hashOfKnownTags, getThreadServiceName()); } @Override @@ -236,7 +234,7 @@ public void mergePathwayContextIntoSpan(AgentSpan span, DataStreamsContextCarrie DataStreamsContextCarrierAdapter.INSTANCE, this.timeSource, this.hashOfKnownTags, - getThreadServiceNameOverride()); + getThreadServiceName()); ((DDSpan) span).context().mergePathwayContext(pathwayContext); } } @@ -250,8 +248,7 @@ public void trackBacklog(LinkedHashMap sortedTags, long value) { } tags.add(tag); } - inbox.offer( - new Backlog(tags, value, timeSource.getCurrentTimeNanos(), getThreadServiceNameOverride())); + inbox.offer(new Backlog(tags, value, timeSource.getCurrentTimeNanos(), getThreadServiceName())); } @Override diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy index 51583ff8060..5f01afeafa7 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy @@ -79,14 +79,14 @@ class DataStreamsWritingTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(fakeConfig, sharedCommObjects, timeSource, { traceConfig }) dataStreams.start() - dataStreams.setThreadServiceName(Thread.currentThread().getId(), serviceNameOverride) + dataStreams.setThreadServiceName(serviceNameOverride) dataStreams.add(new StatsPoint([], 9, 0, 10, timeSource.currentTimeNanos, 0, 0, 0, serviceNameOverride)) dataStreams.trackBacklog(new LinkedHashMap<>(["partition": "1", "topic": "testTopic", "type": "kafka_produce"]), 130) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) // force flush dataStreams.report() dataStreams.close() - dataStreams.clearThreadServiceName(Thread.currentThread().getId()) + dataStreams.clearThreadServiceName() then: conditions.eventually { assert requestBodies.size() == 1 diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentDataStreamsMonitoring.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentDataStreamsMonitoring.java index e6cb15603dc..660c9be04eb 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentDataStreamsMonitoring.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentDataStreamsMonitoring.java @@ -45,17 +45,12 @@ void setCheckpoint( /** * setServiceNameOverride is used override service name for all DataStreams payloads produced - * within given thread + * within Thread.currentThread() * - * @param threadId thread Id * @param serviceName new service name to use for DSM checkpoints. */ - void setThreadServiceName(Long threadId, String serviceName); + void setThreadServiceName(String serviceName); - /** - * clearThreadServiceName clears up threadId -> Service name mapping - * - * @param threadId thread Id - */ - void clearThreadServiceName(Long threadId); + /** clearThreadServiceName clears up service name override for Thread.currentThread() */ + void clearThreadServiceName(); } diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java index 5d113534179..88f2e4e2b99 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java @@ -1135,10 +1135,10 @@ public Schema getSchema(String schemaName, SchemaIterator iterator) { public void setProduceCheckpoint(String type, String target) {} @Override - public void setThreadServiceName(Long threadId, String serviceName) {} + public void setThreadServiceName(String serviceName) {} @Override - public void clearThreadServiceName(Long threadId) {} + public void clearThreadServiceName() {} @Override public void setConsumeCheckpoint(