Skip to content

Commit

Permalink
Add ability to pause remote components until SharedCommunicationObjec…
Browse files Browse the repository at this point in the history
…ts is ready
  • Loading branch information
mcculls committed Dec 28, 2024
1 parent 40c7583 commit 84d2b6d
Showing 1 changed file with 48 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import datadog.remoteconfig.DefaultConfigurationPoller;
import datadog.trace.api.Config;
import datadog.trace.util.AgentTaskScheduler;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import okhttp3.HttpUrl;
Expand All @@ -21,12 +23,23 @@
public class SharedCommunicationObjects {
private static final Logger log = LoggerFactory.getLogger(SharedCommunicationObjects.class);

private final List<Runnable> pausedComponents = new ArrayList<>();
private volatile boolean paused;

public OkHttpClient okHttpClient;
public HttpUrl agentUrl;
public Monitoring monitoring;
private DDAgentFeaturesDiscovery featuresDiscovery;
private ConfigurationPoller configurationPoller;

public SharedCommunicationObjects() {
this(false);
}

public SharedCommunicationObjects(boolean paused) {
this.paused = paused;
}

public void createRemaining(Config config) {
if (monitoring == null) {
monitoring = Monitoring.DISABLED;
Expand All @@ -46,6 +59,32 @@ public void createRemaining(Config config) {
}
}

public void whenReady(Runnable callback) {
if (paused) {
synchronized (pausedComponents) {
if (paused) {
pausedComponents.add(callback);
return;
}
}
}
callback.run(); // not paused, run immediately
}

public void resume() {
paused = false;
synchronized (pausedComponents) {
for (Runnable callback : pausedComponents) {
try {
callback.run();
} catch (Throwable e) {
log.warn("Problem resuming remote component {}", callback, e);
}
}
pausedComponents.clear();
}
}

private static HttpUrl parseAgentUrl(Config config) {
String agentUrl = config.getAgentUrl();
if (agentUrl.startsWith("unix:")) {
Expand Down Expand Up @@ -100,11 +139,16 @@ public DDAgentFeaturesDiscovery featuresDiscovery(Config config) {
agentUrl,
config.isTraceAgentV05Enabled(),
config.isTracerMetricsEnabled());
if (AGENT_THREAD_GROUP.equals(Thread.currentThread().getThreadGroup())) {
featuresDiscovery.discover(); // safe to run on same thread

if (paused) {
// defer remote discovery until remote I/O is allowed
} else {
// avoid performing blocking I/O operation on application thread
AgentTaskScheduler.INSTANCE.execute(featuresDiscovery::discover);
if (AGENT_THREAD_GROUP.equals(Thread.currentThread().getThreadGroup())) {
featuresDiscovery.discover(); // safe to run on same thread
} else {
// avoid performing blocking I/O operation on application thread
AgentTaskScheduler.INSTANCE.execute(featuresDiscovery::discover);
}
}
}
return featuresDiscovery;
Expand Down

0 comments on commit 84d2b6d

Please sign in to comment.