Skip to content

Commit

Permalink
Avoid performing blocking I/O operation on application thread (#8120)
Browse files Browse the repository at this point in the history
* Avoid performing blocking I/O operation on application thread
* Call checkDynamicConfig() at DSM start to load tracer config, then again at Inbox start to merge agent config
* Make test more tolerant of parallel discover requests
* Cleanup test side-effects
  • Loading branch information
mcculls authored Dec 20, 2024
1 parent ab205f6 commit 8d5f5ac
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datadog.communication.ddagent;

import static datadog.communication.ddagent.TracerVersion.TRACER_VERSION;
import static datadog.trace.util.AgentThreadFactory.AGENT_THREAD_GROUP;

import datadog.common.container.ContainerInfo;
import datadog.common.socket.SocketUtils;
Expand All @@ -9,6 +10,7 @@
import datadog.remoteconfig.ConfigurationPoller;
import datadog.remoteconfig.DefaultConfigurationPoller;
import datadog.trace.api.Config;
import datadog.trace.util.AgentTaskScheduler;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import okhttp3.HttpUrl;
Expand Down Expand Up @@ -98,8 +100,11 @@ public DDAgentFeaturesDiscovery featuresDiscovery(Config config) {
agentUrl,
config.isTraceAgentV05Enabled(),
config.isTracerMetricsEnabled());
if (!"true".equalsIgnoreCase(System.getProperty("dd.test.no.early.discovery"))) {
featuresDiscovery.discover();
if (AGENT_THREAD_GROUP.equals(Thread.currentThread().getThreadGroup())) {
featuresDiscovery.discover(); // safe to run on same thread
} else {
// avoid performing blocking I/O operation on application thread
AgentTaskScheduler.INSTANCE.execute(featuresDiscovery::discover);
}
}
return featuresDiscovery;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,11 @@ public void runEnabledWithDatadogAgent() throws InterruptedException, IOExceptio
ConfigurationPoller configurationPoller =
(ConfigurationPoller) sharedCommunicationObjects.configurationPoller(Config.get());
configurationPoller.start();
RecordedRequest request = datadogAgentServer.takeRequest(5, TimeUnit.SECONDS);
assertNotNull(request);
assertEquals("/info", request.getPath());
request = datadogAgentServer.takeRequest(5, TimeUnit.SECONDS);
assertNotNull(request);
RecordedRequest request;
do {
request = datadogAgentServer.takeRequest(5, TimeUnit.SECONDS);
assertNotNull(request);
} while ("/info".equals(request.getPath()));
assertEquals("/v0.7/config", request.getPath());
DebuggerAgent.stop();
datadogAgentServer.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,21 +136,7 @@ public DefaultDataStreamsMonitoring(

@Override
public void start() {
if (features.getDataStreamsEndpoint() == null) {
features.discoverIfOutdated();
}

agentSupportsDataStreams = features.supportsDataStreams();
checkDynamicConfig();

if (!configSupportsDataStreams) {
log.debug("Data streams is disabled");
} else if (!agentSupportsDataStreams) {
log.debug("Data streams is disabled or not supported by agent");
}

nextFeatureCheck = timeSource.getCurrentTimeNanos() + FEATURE_CHECK_INTERVAL_NANOS;

cancellation =
AgentTaskScheduler.INSTANCE.scheduleAtFixedRate(
new ReportTask(), this, bucketDurationNanos, bucketDurationNanos, TimeUnit.NANOSECONDS);
Expand Down Expand Up @@ -341,6 +327,22 @@ private StatsBucket getStatsBucket(final long timestamp, final String serviceNam

@Override
public void run() {

if (features.getDataStreamsEndpoint() == null) {
features.discoverIfOutdated();
}

agentSupportsDataStreams = features.supportsDataStreams();
checkDynamicConfig();

if (!configSupportsDataStreams) {
log.debug("Data streams is disabled");
} else if (!agentSupportsDataStreams) {
log.debug("Data streams is disabled or not supported by agent");
}

nextFeatureCheck = timeSource.getCurrentTimeNanos() + FEATURE_CHECK_INTERVAL_NANOS;

Thread currentThread = Thread.currentThread();
while (!currentThread.isInterrupted()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import datadog.trace.api.DDTraceId
import datadog.trace.api.DynamicConfig
import datadog.trace.api.time.TimeSource
import datadog.trace.bootstrap.instrumentation.api.AgentTracer.NoopPathwayContext
import datadog.trace.core.datastreams.DataStreamsMonitoring

import static datadog.trace.api.sampling.PrioritySampling.*
import static datadog.trace.api.sampling.SamplingMechanism.*
Expand All @@ -25,7 +26,11 @@ class XRayHttpInjectorTest extends DDCoreSpecification {
setup:
def writer = new ListWriter()
def timeSource = Mock(TimeSource)
def tracer = tracerBuilder().writer(writer).timeSource(timeSource).build()
def tracer = tracerBuilder()
.dataStreamsMonitoring(Mock(DataStreamsMonitoring))
.writer(writer)
.timeSource(timeSource)
.build()
final DDSpanContext mockedContext =
new DDSpanContext(
DDTraceId.from("$traceId"),
Expand Down Expand Up @@ -76,7 +81,11 @@ class XRayHttpInjectorTest extends DDCoreSpecification {
setup:
def writer = new ListWriter()
def timeSource = Mock(TimeSource)
def tracer = tracerBuilder().writer(writer).timeSource(timeSource).build()
def tracer = tracerBuilder()
.dataStreamsMonitoring(Mock(DataStreamsMonitoring))
.writer(writer)
.timeSource(timeSource)
.build()
def headers = [
'X-Amzn-Trace-Id' : "Root=1-00000000-00000000${traceId.padLeft(16, '0')};Parent=${spanId.padLeft(16, '0')}"
]
Expand Down Expand Up @@ -136,7 +145,11 @@ class XRayHttpInjectorTest extends DDCoreSpecification {
setup:
def writer = new ListWriter()
def timeSource = Mock(TimeSource)
def tracer = tracerBuilder().writer(writer).timeSource(timeSource).build()
def tracer = tracerBuilder()
.dataStreamsMonitoring(Mock(DataStreamsMonitoring))
.writer(writer)
.timeSource(timeSource)
.build()
final DDSpanContext mockedContext =
new DDSpanContext(
DDTraceId.from("1"),
Expand Down

0 comments on commit 8d5f5ac

Please sign in to comment.