Skip to content

Commit

Permalink
Delay starting logs-intake backend until remote I/O is allowed
Browse files Browse the repository at this point in the history
  • Loading branch information
mcculls committed Dec 28, 2024
1 parent 9b97d7a commit 2cdc507
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ public void execute() {
maybeStartAppSec(scoClass, sco);
maybeStartIast(instrumentation, scoClass, sco);
maybeStartCiVisibility(instrumentation, scoClass, sco);
maybeStartLogsIntake(scoClass, sco);
maybeInstallLogsIntake(scoClass, sco);
// start debugger before remote config to subscribe to it before starting to poll
maybeStartDebugger(instrumentation, scoClass, sco);
maybeStartRemoteConfig(scoClass, sco);
Expand Down Expand Up @@ -865,17 +865,18 @@ private static void maybeStartCiVisibility(Instrumentation inst, Class<?> scoCla
}
}

private static void maybeStartLogsIntake(Class<?> scoClass, Object sco) {
private static void maybeInstallLogsIntake(Class<?> scoClass, Object sco) {
if (agentlessLogSubmissionEnabled) {
StaticEventLogger.begin("Logs Intake");

try {
final Class<?> logsIntakeSystemClass =
AGENT_CLASSLOADER.loadClass("datadog.trace.logging.intake.LogsIntakeSystem");
final Method logsIntakeInstallerMethod = logsIntakeSystemClass.getMethod("start", scoClass);
final Method logsIntakeInstallerMethod =
logsIntakeSystemClass.getMethod("install", scoClass);
logsIntakeInstallerMethod.invoke(null, sco);
} catch (final Throwable e) {
log.warn("Not starting Logs Intake subsystem", e);
log.warn("Not installing Logs Intake subsystem", e);
}

StaticEventLogger.end("Logs Intake");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package datadog.trace.logging.intake;

import datadog.communication.BackendApi;
import datadog.communication.BackendApiFactory;
import datadog.communication.ddagent.SharedCommunicationObjects;
import datadog.trace.api.Config;
Expand All @@ -12,18 +11,16 @@ public class LogsIntakeSystem {

private static final Logger LOGGER = LoggerFactory.getLogger(LogsIntakeSystem.class);

public static void start(SharedCommunicationObjects sco) {
public static void install(SharedCommunicationObjects sco) {
Config config = Config.get();
if (!config.isAgentlessLogSubmissionEnabled()) {
LOGGER.debug("Agentless logs intake is disabled");
return;
}

BackendApiFactory apiFactory = new BackendApiFactory(config, sco);
BackendApi backendApi = apiFactory.createBackendApi(BackendApiFactory.Intake.LOGS);
LogsDispatcher dispatcher = new LogsDispatcher(backendApi);
LogsWriterImpl writer = new LogsWriterImpl(config, dispatcher);
writer.start();
LogsWriterImpl writer = new LogsWriterImpl(config, apiFactory);
sco.whenReady(writer::start);

LogsIntake.registerWriter(writer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import static datadog.trace.util.AgentThreadFactory.AGENT_THREAD_GROUP;

import datadog.communication.BackendApi;
import datadog.communication.BackendApiFactory;
import datadog.communication.BackendApiFactory.Intake;
import datadog.trace.api.Config;
import datadog.trace.api.logging.intake.LogsWriter;
import datadog.trace.util.AgentThreadFactory;
Expand All @@ -23,12 +26,12 @@ public class LogsWriterImpl implements LogsWriter {
private static final int ENQUEUE_LOG_TIMEOUT_MILLIS = 1_000;

private final Map<String, Object> commonTags;
private final LogsDispatcher logsDispatcher;
private final BackendApiFactory apiFactory;
private final BlockingQueue<Map<String, Object>> messageQueue;
private final Thread messagePollingThread;

public LogsWriterImpl(Config config, LogsDispatcher logsDispatcher) {
this.logsDispatcher = logsDispatcher;
public LogsWriterImpl(Config config, BackendApiFactory apiFactory) {
this.apiFactory = apiFactory;

commonTags = new HashMap<>();
commonTags.put("ddsource", "java");
Expand Down Expand Up @@ -84,6 +87,9 @@ public void log(Map<String, Object> message) {
}

private void logPollingLoop() {
BackendApi backendApi = apiFactory.createBackendApi(Intake.LOGS);
LogsDispatcher logsDispatcher = new LogsDispatcher(backendApi);

while (!Thread.currentThread().isInterrupted()) {
try {
List<Map<String, Object>> batch = new ArrayList<>();
Expand Down

0 comments on commit 2cdc507

Please sign in to comment.