From 8d5f5acd234cd259b4b341a5ed32a9c9cf7d3b8b Mon Sep 17 00:00:00 2001 From: Stuart McCulloch Date: Fri, 20 Dec 2024 10:47:46 +0000 Subject: [PATCH] Avoid performing blocking I/O operation on application thread (#8120) * Avoid performing blocking I/O operation on application thread * Call checkDynamicConfig() at DSM start to load tracer config, then again at Inbox start to merge agent config * Make test more tolerant of parallel discover requests * Cleanup test side-effects --- .../ddagent/SharedCommunicationObjects.java | 9 ++++-- .../debugger/agent/DebuggerAgentTest.java | 10 +++---- .../DefaultDataStreamsMonitoring.java | 30 ++++++++++--------- .../propagation/XRayHttpInjectorTest.groovy | 19 ++++++++++-- 4 files changed, 44 insertions(+), 24 deletions(-) diff --git a/communication/src/main/java/datadog/communication/ddagent/SharedCommunicationObjects.java b/communication/src/main/java/datadog/communication/ddagent/SharedCommunicationObjects.java index 8c02098fe81..1a44228d98a 100644 --- a/communication/src/main/java/datadog/communication/ddagent/SharedCommunicationObjects.java +++ b/communication/src/main/java/datadog/communication/ddagent/SharedCommunicationObjects.java @@ -1,6 +1,7 @@ package datadog.communication.ddagent; import static datadog.communication.ddagent.TracerVersion.TRACER_VERSION; +import static datadog.trace.util.AgentThreadFactory.AGENT_THREAD_GROUP; import datadog.common.container.ContainerInfo; import datadog.common.socket.SocketUtils; @@ -9,6 +10,7 @@ import datadog.remoteconfig.ConfigurationPoller; import datadog.remoteconfig.DefaultConfigurationPoller; import datadog.trace.api.Config; +import datadog.trace.util.AgentTaskScheduler; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import okhttp3.HttpUrl; @@ -98,8 +100,11 @@ public DDAgentFeaturesDiscovery featuresDiscovery(Config config) { agentUrl, config.isTraceAgentV05Enabled(), config.isTracerMetricsEnabled()); - if (!"true".equalsIgnoreCase(System.getProperty("dd.test.no.early.discovery"))) { - featuresDiscovery.discover(); + if (AGENT_THREAD_GROUP.equals(Thread.currentThread().getThreadGroup())) { + featuresDiscovery.discover(); // safe to run on same thread + } else { + // avoid performing blocking I/O operation on application thread + AgentTaskScheduler.INSTANCE.execute(featuresDiscovery::discover); } } return featuresDiscovery; diff --git a/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/agent/DebuggerAgentTest.java b/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/agent/DebuggerAgentTest.java index e2e237cd12d..15672f703f8 100644 --- a/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/agent/DebuggerAgentTest.java +++ b/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/agent/DebuggerAgentTest.java @@ -122,11 +122,11 @@ public void runEnabledWithDatadogAgent() throws InterruptedException, IOExceptio ConfigurationPoller configurationPoller = (ConfigurationPoller) sharedCommunicationObjects.configurationPoller(Config.get()); configurationPoller.start(); - RecordedRequest request = datadogAgentServer.takeRequest(5, TimeUnit.SECONDS); - assertNotNull(request); - assertEquals("/info", request.getPath()); - request = datadogAgentServer.takeRequest(5, TimeUnit.SECONDS); - assertNotNull(request); + RecordedRequest request; + do { + request = datadogAgentServer.takeRequest(5, TimeUnit.SECONDS); + assertNotNull(request); + } while ("/info".equals(request.getPath())); assertEquals("/v0.7/config", request.getPath()); DebuggerAgent.stop(); datadogAgentServer.shutdown(); diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java index 5398d420122..00b9c4504b4 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java @@ -136,21 +136,7 @@ public DefaultDataStreamsMonitoring( @Override public void start() { - if (features.getDataStreamsEndpoint() == null) { - features.discoverIfOutdated(); - } - - agentSupportsDataStreams = features.supportsDataStreams(); checkDynamicConfig(); - - if (!configSupportsDataStreams) { - log.debug("Data streams is disabled"); - } else if (!agentSupportsDataStreams) { - log.debug("Data streams is disabled or not supported by agent"); - } - - nextFeatureCheck = timeSource.getCurrentTimeNanos() + FEATURE_CHECK_INTERVAL_NANOS; - cancellation = AgentTaskScheduler.INSTANCE.scheduleAtFixedRate( new ReportTask(), this, bucketDurationNanos, bucketDurationNanos, TimeUnit.NANOSECONDS); @@ -341,6 +327,22 @@ private StatsBucket getStatsBucket(final long timestamp, final String serviceNam @Override public void run() { + + if (features.getDataStreamsEndpoint() == null) { + features.discoverIfOutdated(); + } + + agentSupportsDataStreams = features.supportsDataStreams(); + checkDynamicConfig(); + + if (!configSupportsDataStreams) { + log.debug("Data streams is disabled"); + } else if (!agentSupportsDataStreams) { + log.debug("Data streams is disabled or not supported by agent"); + } + + nextFeatureCheck = timeSource.getCurrentTimeNanos() + FEATURE_CHECK_INTERVAL_NANOS; + Thread currentThread = Thread.currentThread(); while (!currentThread.isInterrupted()) { try { diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/propagation/XRayHttpInjectorTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/propagation/XRayHttpInjectorTest.groovy index 16226ef6362..8425d3e4bd0 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/propagation/XRayHttpInjectorTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/propagation/XRayHttpInjectorTest.groovy @@ -6,6 +6,7 @@ import datadog.trace.api.DDTraceId import datadog.trace.api.DynamicConfig import datadog.trace.api.time.TimeSource import datadog.trace.bootstrap.instrumentation.api.AgentTracer.NoopPathwayContext +import datadog.trace.core.datastreams.DataStreamsMonitoring import static datadog.trace.api.sampling.PrioritySampling.* import static datadog.trace.api.sampling.SamplingMechanism.* @@ -25,7 +26,11 @@ class XRayHttpInjectorTest extends DDCoreSpecification { setup: def writer = new ListWriter() def timeSource = Mock(TimeSource) - def tracer = tracerBuilder().writer(writer).timeSource(timeSource).build() + def tracer = tracerBuilder() + .dataStreamsMonitoring(Mock(DataStreamsMonitoring)) + .writer(writer) + .timeSource(timeSource) + .build() final DDSpanContext mockedContext = new DDSpanContext( DDTraceId.from("$traceId"), @@ -76,7 +81,11 @@ class XRayHttpInjectorTest extends DDCoreSpecification { setup: def writer = new ListWriter() def timeSource = Mock(TimeSource) - def tracer = tracerBuilder().writer(writer).timeSource(timeSource).build() + def tracer = tracerBuilder() + .dataStreamsMonitoring(Mock(DataStreamsMonitoring)) + .writer(writer) + .timeSource(timeSource) + .build() def headers = [ 'X-Amzn-Trace-Id' : "Root=1-00000000-00000000${traceId.padLeft(16, '0')};Parent=${spanId.padLeft(16, '0')}" ] @@ -136,7 +145,11 @@ class XRayHttpInjectorTest extends DDCoreSpecification { setup: def writer = new ListWriter() def timeSource = Mock(TimeSource) - def tracer = tracerBuilder().writer(writer).timeSource(timeSource).build() + def tracer = tracerBuilder() + .dataStreamsMonitoring(Mock(DataStreamsMonitoring)) + .writer(writer) + .timeSource(timeSource) + .build() final DDSpanContext mockedContext = new DDSpanContext( DDTraceId.from("1"),