From fd72025912dcce8831037514a23cd851c89ce3bf Mon Sep 17 00:00:00 2001 From: Nikita Tkachenko Date: Fri, 3 Jan 2025 11:59:29 +0100 Subject: [PATCH] Retry telemetry requests if CI Visibility is enabled --- .../communication/http/HttpRetryPolicy.java | 2 + .../datadog/telemetry/TelemetryClient.java | 32 +++++++++++---- .../datadog/telemetry/TelemetryRouter.java | 6 ++- .../datadog/telemetry/TelemetrySystem.java | 11 +++++- .../telemetry/TelemetryClientTest.groovy | 27 ++++++++++++- .../TelemetryRouterSpecification.groovy | 39 ++++++++++++++++++- 6 files changed, 102 insertions(+), 15 deletions(-) diff --git a/communication/src/main/java/datadog/communication/http/HttpRetryPolicy.java b/communication/src/main/java/datadog/communication/http/HttpRetryPolicy.java index 23ed4c27525..5ff1aca2b38 100644 --- a/communication/src/main/java/datadog/communication/http/HttpRetryPolicy.java +++ b/communication/src/main/java/datadog/communication/http/HttpRetryPolicy.java @@ -160,6 +160,8 @@ public void close() { } public static class Factory { + public static final Factory NEVER_RETRY = new Factory(0, 0, 0); + private final int maxRetries; private final long initialDelay; private final double delayFactor; diff --git a/telemetry/src/main/java/datadog/telemetry/TelemetryClient.java b/telemetry/src/main/java/datadog/telemetry/TelemetryClient.java index 593a32ed2e0..e06e75ea0b3 100644 --- a/telemetry/src/main/java/datadog/telemetry/TelemetryClient.java +++ b/telemetry/src/main/java/datadog/telemetry/TelemetryClient.java @@ -1,14 +1,15 @@ package datadog.telemetry; +import datadog.communication.http.HttpRetryPolicy; import datadog.communication.http.OkHttpUtils; import datadog.trace.api.Config; import datadog.trace.util.Strings; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.concurrent.TimeUnit; import okhttp3.HttpUrl; import okhttp3.OkHttpClient; import okhttp3.Request; -import okhttp3.Response; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -17,16 +18,19 @@ public class TelemetryClient { public enum Result { SUCCESS, FAILURE, - NOT_FOUND; + NOT_FOUND, + INTERRUPTED } - public static TelemetryClient buildAgentClient(OkHttpClient okHttpClient, HttpUrl agentUrl) { + public static TelemetryClient buildAgentClient( + OkHttpClient okHttpClient, HttpUrl agentUrl, HttpRetryPolicy.Factory httpRetryPolicy) { HttpUrl agentTelemetryUrl = agentUrl.newBuilder().addPathSegments(AGENT_TELEMETRY_API_ENDPOINT).build(); - return new TelemetryClient(okHttpClient, agentTelemetryUrl, null); + return new TelemetryClient(okHttpClient, httpRetryPolicy, agentTelemetryUrl, null); } - public static TelemetryClient buildIntakeClient(Config config) { + public static TelemetryClient buildIntakeClient( + Config config, HttpRetryPolicy.Factory httpRetryPolicy) { String apiKey = config.getApiKey(); if (apiKey == null) { log.debug("Cannot create Telemetry Intake because DD_API_KEY unspecified."); @@ -44,7 +48,7 @@ public static TelemetryClient buildIntakeClient(Config config) { long timeoutMillis = TimeUnit.SECONDS.toMillis(config.getAgentTimeout()); OkHttpClient httpClient = OkHttpUtils.buildHttpClient(url, timeoutMillis); - return new TelemetryClient(httpClient, url, apiKey); + return new TelemetryClient(httpClient, httpRetryPolicy, url, apiKey); } private static String buildIntakeTelemetryUrl(Config config) { @@ -71,11 +75,17 @@ private static String buildIntakeTelemetryUrl(Config config) { private static final String DD_TELEMETRY_REQUEST_TYPE = "DD-Telemetry-Request-Type"; private final OkHttpClient okHttpClient; + private final HttpRetryPolicy.Factory httpRetryPolicy; private final HttpUrl url; private final String apiKey; - public TelemetryClient(OkHttpClient okHttpClient, HttpUrl url, String apiKey) { + public TelemetryClient( + OkHttpClient okHttpClient, + HttpRetryPolicy.Factory httpRetryPolicy, + HttpUrl url, + String apiKey) { this.okHttpClient = okHttpClient; + this.httpRetryPolicy = httpRetryPolicy; this.url = url; this.apiKey = apiKey; } @@ -92,7 +102,9 @@ public Result sendHttpRequest(Request.Builder httpRequestBuilder) { Request httpRequest = httpRequestBuilder.build(); String requestType = httpRequest.header(DD_TELEMETRY_REQUEST_TYPE); - try (Response response = okHttpClient.newCall(httpRequest).execute()) { + + try (okhttp3.Response response = + OkHttpUtils.sendWithRetries(okHttpClient, httpRetryPolicy, httpRequest)) { if (response.code() == 404) { log.debug("Telemetry endpoint is disabled, dropping {} message.", requestType); return Result.NOT_FOUND; @@ -105,6 +117,10 @@ public Result sendHttpRequest(Request.Builder httpRequestBuilder) { response.message()); return Result.FAILURE; } + } catch (InterruptedIOException e) { + log.debug("Telemetry message {} sending interrupted: {}.", requestType, e.toString()); + return Result.INTERRUPTED; + } catch (IOException e) { log.debug("Telemetry message {} failed with exception: {}.", requestType, e.toString()); return Result.FAILURE; diff --git a/telemetry/src/main/java/datadog/telemetry/TelemetryRouter.java b/telemetry/src/main/java/datadog/telemetry/TelemetryRouter.java index 343ad7be924..1636f865def 100644 --- a/telemetry/src/main/java/datadog/telemetry/TelemetryRouter.java +++ b/telemetry/src/main/java/datadog/telemetry/TelemetryRouter.java @@ -36,7 +36,11 @@ public TelemetryClient.Result sendRequest(TelemetryRequest request) { Request.Builder httpRequestBuilder = request.httpRequest(); TelemetryClient.Result result = currentClient.sendHttpRequest(httpRequestBuilder); - boolean requestFailed = result != TelemetryClient.Result.SUCCESS; + boolean requestFailed = + result != TelemetryClient.Result.SUCCESS + // interrupted request is most likely due to telemetry system shutdown, + // we do not want to log errors and reattempt in this case + && result != TelemetryClient.Result.INTERRUPTED; if (currentClient == agentClient) { if (requestFailed) { reportErrorOnce(currentClient.getUrl(), result); diff --git a/telemetry/src/main/java/datadog/telemetry/TelemetrySystem.java b/telemetry/src/main/java/datadog/telemetry/TelemetrySystem.java index cea16b09dc8..9605b3366c1 100644 --- a/telemetry/src/main/java/datadog/telemetry/TelemetrySystem.java +++ b/telemetry/src/main/java/datadog/telemetry/TelemetrySystem.java @@ -2,6 +2,7 @@ import datadog.communication.ddagent.DDAgentFeaturesDiscovery; import datadog.communication.ddagent.SharedCommunicationObjects; +import datadog.communication.http.HttpRetryPolicy; import datadog.telemetry.TelemetryRunnable.TelemetryPeriodicAction; import datadog.telemetry.dependency.DependencyPeriodicAction; import datadog.telemetry.dependency.DependencyService; @@ -81,8 +82,14 @@ public static void startTelemetry( boolean debug = config.isTelemetryDebugRequestsEnabled(); DDAgentFeaturesDiscovery ddAgentFeaturesDiscovery = sco.featuresDiscovery(config); - TelemetryClient agentClient = TelemetryClient.buildAgentClient(sco.okHttpClient, sco.agentUrl); - TelemetryClient intakeClient = TelemetryClient.buildIntakeClient(config); + HttpRetryPolicy.Factory httpRetryPolicy = + config.isCiVisibilityEnabled() + ? new HttpRetryPolicy.Factory(2, 100, 2.0, true) + : HttpRetryPolicy.Factory.NEVER_RETRY; + + TelemetryClient agentClient = + TelemetryClient.buildAgentClient(sco.okHttpClient, sco.agentUrl, httpRetryPolicy); + TelemetryClient intakeClient = TelemetryClient.buildIntakeClient(config, httpRetryPolicy); boolean useIntakeClientByDefault = config.isCiVisibilityEnabled() && config.isCiVisibilityAgentlessEnabled(); diff --git a/telemetry/src/test/groovy/datadog/telemetry/TelemetryClientTest.groovy b/telemetry/src/test/groovy/datadog/telemetry/TelemetryClientTest.groovy index dbb2f798ba3..323bef8c131 100644 --- a/telemetry/src/test/groovy/datadog/telemetry/TelemetryClientTest.groovy +++ b/telemetry/src/test/groovy/datadog/telemetry/TelemetryClientTest.groovy @@ -1,6 +1,10 @@ package datadog.telemetry +import datadog.communication.http.HttpRetryPolicy +import datadog.telemetry.api.RequestType import datadog.trace.api.Config +import okhttp3.HttpUrl +import okhttp3.OkHttpClient import spock.lang.Specification class TelemetryClientTest extends Specification { @@ -13,7 +17,7 @@ class TelemetryClientTest extends Specification { config.getSite() >> site when: - def intakeClient = TelemetryClient.buildIntakeClient(config) + def intakeClient = TelemetryClient.buildIntakeClient(config, HttpRetryPolicy.Factory.NEVER_RETRY) then: intakeClient.getUrl().toString() == expectedUrl @@ -39,7 +43,7 @@ class TelemetryClientTest extends Specification { config.getCiVisibilityAgentlessUrl() >> ciVisAgentlessUrl when: - def intakeClient = TelemetryClient.buildIntakeClient(config) + def intakeClient = TelemetryClient.buildIntakeClient(config, HttpRetryPolicy.Factory.NEVER_RETRY) then: intakeClient.getUrl().toString() == expectedUrl @@ -51,4 +55,23 @@ class TelemetryClientTest extends Specification { true | false | "http://ci.visibility.agentless.url" | "https://all-http-intake.logs.datad0g.com/api/v2/apmtelemetry" true | true | null | "https://all-http-intake.logs.datad0g.com/api/v2/apmtelemetry" } + + def "Intake client retries telemetry request if configured to do so"() { + setup: + def httpClient = Mock(OkHttpClient) + def httpRetryPolicy = new HttpRetryPolicy.Factory(2, 50, 1.5, true) + def httpUrl = HttpUrl.get("https://intake.example.com") + def intakeClient = new TelemetryClient(httpClient, httpRetryPolicy, httpUrl, "dummy-api-key") + + when: + intakeClient.sendHttpRequest(dummyRequest()) + + then: + // original request + 2 retries + 3 * httpClient.newCall(_) >> { throw new ConnectException("exception") } + } + + def dummyRequest() { + return new TelemetryRequest(Mock(EventSource), Mock(EventSink), 1000, RequestType.APP_STARTED, false).httpRequest() + } } diff --git a/telemetry/src/test/groovy/datadog/telemetry/TelemetryRouterSpecification.groovy b/telemetry/src/test/groovy/datadog/telemetry/TelemetryRouterSpecification.groovy index 5d6574e8de8..de108fa85d1 100644 --- a/telemetry/src/test/groovy/datadog/telemetry/TelemetryRouterSpecification.groovy +++ b/telemetry/src/test/groovy/datadog/telemetry/TelemetryRouterSpecification.groovy @@ -1,6 +1,7 @@ package datadog.telemetry import datadog.communication.ddagent.DDAgentFeaturesDiscovery +import datadog.communication.http.HttpRetryPolicy import datadog.telemetry.api.RequestType import okhttp3.Call import okhttp3.HttpUrl @@ -41,8 +42,8 @@ class TelemetryRouterSpecification extends Specification { OkHttpClient okHttpClient = Mock() DDAgentFeaturesDiscovery ddAgentFeaturesDiscovery = Mock() - def agentTelemetryClient = TelemetryClient.buildAgentClient(okHttpClient, agentUrl) - def intakeTelemetryClient = new TelemetryClient(okHttpClient, intakeUrl, apiKey) + def agentTelemetryClient = TelemetryClient.buildAgentClient(okHttpClient, agentUrl, HttpRetryPolicy.Factory.NEVER_RETRY) + def intakeTelemetryClient = new TelemetryClient(okHttpClient, HttpRetryPolicy.Factory.NEVER_RETRY, intakeUrl, apiKey) def httpClient = new TelemetryRouter(ddAgentFeaturesDiscovery, agentTelemetryClient, intakeTelemetryClient, false) def 'map an http status code to the correct send result'() { @@ -70,6 +71,15 @@ class TelemetryRouterSpecification extends Specification { 1 * okHttpClient.newCall(_) >> { throw new IOException("exception") } } + def 'catch InterruptedIOException from OkHttpClient and return INTERRUPTED'() { + when: + def result = httpClient.sendRequest(dummyRequest()) + + then: + result == TelemetryClient.Result.INTERRUPTED + 1 * okHttpClient.newCall(_) >> { throw new InterruptedIOException("interrupted") } + } + def 'keep trying to send telemetry to Agent despite of return code when Intake client is null'() { setup: def httpClient = new TelemetryRouter(ddAgentFeaturesDiscovery, agentTelemetryClient, null, false) @@ -172,6 +182,31 @@ class TelemetryRouterSpecification extends Specification { request.header(apiKeyHeader) == apiKey } + def 'when configured to prefer Intake: do not switch to Agent if request is interrupted'() { + Request request + + setup: + def telemetryRouter = new TelemetryRouter(ddAgentFeaturesDiscovery, agentTelemetryClient, intakeTelemetryClient, true) + + when: + telemetryRouter.sendRequest(dummyRequest()) + + then: + 1 * ddAgentFeaturesDiscovery.discoverIfOutdated() + 1 * ddAgentFeaturesDiscovery.supportsTelemetryProxy() >> true + 1 * okHttpClient.newCall(_) >> { args -> request = args[0]; throw new InterruptedIOException("interrupted") } + request.url() == intakeUrl + request.header(apiKeyHeader) == apiKey + + when: + telemetryRouter.sendRequest(dummyRequest()) + + then: + 1 * okHttpClient.newCall(_) >> { args -> request = args[0]; mockResponse(200) } + request.url() == intakeUrl + request.header(apiKeyHeader) == apiKey + } + def 'when configured to prefer Intake: switch to Agent if Intake request fails'() { Request request