Skip to content

Commit

Permalink
Retry telemetry requests if CI Visibility is enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
nikita-tkachenko-datadog committed Jan 3, 2025
1 parent a8b33d5 commit fd72025
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ public void close() {
}

public static class Factory {
public static final Factory NEVER_RETRY = new Factory(0, 0, 0);

private final int maxRetries;
private final long initialDelay;
private final double delayFactor;
Expand Down
32 changes: 24 additions & 8 deletions telemetry/src/main/java/datadog/telemetry/TelemetryClient.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package datadog.telemetry;

import datadog.communication.http.HttpRetryPolicy;
import datadog.communication.http.OkHttpUtils;
import datadog.trace.api.Config;
import datadog.trace.util.Strings;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.TimeUnit;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -17,16 +18,19 @@ public class TelemetryClient {
public enum Result {
SUCCESS,
FAILURE,
NOT_FOUND;
NOT_FOUND,
INTERRUPTED
}

public static TelemetryClient buildAgentClient(OkHttpClient okHttpClient, HttpUrl agentUrl) {
public static TelemetryClient buildAgentClient(
OkHttpClient okHttpClient, HttpUrl agentUrl, HttpRetryPolicy.Factory httpRetryPolicy) {
HttpUrl agentTelemetryUrl =
agentUrl.newBuilder().addPathSegments(AGENT_TELEMETRY_API_ENDPOINT).build();
return new TelemetryClient(okHttpClient, agentTelemetryUrl, null);
return new TelemetryClient(okHttpClient, httpRetryPolicy, agentTelemetryUrl, null);
}

public static TelemetryClient buildIntakeClient(Config config) {
public static TelemetryClient buildIntakeClient(
Config config, HttpRetryPolicy.Factory httpRetryPolicy) {
String apiKey = config.getApiKey();
if (apiKey == null) {
log.debug("Cannot create Telemetry Intake because DD_API_KEY unspecified.");
Expand All @@ -44,7 +48,7 @@ public static TelemetryClient buildIntakeClient(Config config) {

long timeoutMillis = TimeUnit.SECONDS.toMillis(config.getAgentTimeout());
OkHttpClient httpClient = OkHttpUtils.buildHttpClient(url, timeoutMillis);
return new TelemetryClient(httpClient, url, apiKey);
return new TelemetryClient(httpClient, httpRetryPolicy, url, apiKey);
}

private static String buildIntakeTelemetryUrl(Config config) {
Expand All @@ -71,11 +75,17 @@ private static String buildIntakeTelemetryUrl(Config config) {
private static final String DD_TELEMETRY_REQUEST_TYPE = "DD-Telemetry-Request-Type";

private final OkHttpClient okHttpClient;
private final HttpRetryPolicy.Factory httpRetryPolicy;
private final HttpUrl url;
private final String apiKey;

public TelemetryClient(OkHttpClient okHttpClient, HttpUrl url, String apiKey) {
public TelemetryClient(
OkHttpClient okHttpClient,
HttpRetryPolicy.Factory httpRetryPolicy,
HttpUrl url,
String apiKey) {
this.okHttpClient = okHttpClient;
this.httpRetryPolicy = httpRetryPolicy;
this.url = url;
this.apiKey = apiKey;
}
Expand All @@ -92,7 +102,9 @@ public Result sendHttpRequest(Request.Builder httpRequestBuilder) {

Request httpRequest = httpRequestBuilder.build();
String requestType = httpRequest.header(DD_TELEMETRY_REQUEST_TYPE);
try (Response response = okHttpClient.newCall(httpRequest).execute()) {

try (okhttp3.Response response =
OkHttpUtils.sendWithRetries(okHttpClient, httpRetryPolicy, httpRequest)) {
if (response.code() == 404) {
log.debug("Telemetry endpoint is disabled, dropping {} message.", requestType);
return Result.NOT_FOUND;
Expand All @@ -105,6 +117,10 @@ public Result sendHttpRequest(Request.Builder httpRequestBuilder) {
response.message());
return Result.FAILURE;
}
} catch (InterruptedIOException e) {
log.debug("Telemetry message {} sending interrupted: {}.", requestType, e.toString());
return Result.INTERRUPTED;

} catch (IOException e) {
log.debug("Telemetry message {} failed with exception: {}.", requestType, e.toString());
return Result.FAILURE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ public TelemetryClient.Result sendRequest(TelemetryRequest request) {
Request.Builder httpRequestBuilder = request.httpRequest();
TelemetryClient.Result result = currentClient.sendHttpRequest(httpRequestBuilder);

boolean requestFailed = result != TelemetryClient.Result.SUCCESS;
boolean requestFailed =
result != TelemetryClient.Result.SUCCESS
// interrupted request is most likely due to telemetry system shutdown,
// we do not want to log errors and reattempt in this case
&& result != TelemetryClient.Result.INTERRUPTED;
if (currentClient == agentClient) {
if (requestFailed) {
reportErrorOnce(currentClient.getUrl(), result);
Expand Down
11 changes: 9 additions & 2 deletions telemetry/src/main/java/datadog/telemetry/TelemetrySystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
import datadog.communication.ddagent.SharedCommunicationObjects;
import datadog.communication.http.HttpRetryPolicy;
import datadog.telemetry.TelemetryRunnable.TelemetryPeriodicAction;
import datadog.telemetry.dependency.DependencyPeriodicAction;
import datadog.telemetry.dependency.DependencyService;
Expand Down Expand Up @@ -81,8 +82,14 @@ public static void startTelemetry(
boolean debug = config.isTelemetryDebugRequestsEnabled();
DDAgentFeaturesDiscovery ddAgentFeaturesDiscovery = sco.featuresDiscovery(config);

TelemetryClient agentClient = TelemetryClient.buildAgentClient(sco.okHttpClient, sco.agentUrl);
TelemetryClient intakeClient = TelemetryClient.buildIntakeClient(config);
HttpRetryPolicy.Factory httpRetryPolicy =
config.isCiVisibilityEnabled()
? new HttpRetryPolicy.Factory(2, 100, 2.0, true)
: HttpRetryPolicy.Factory.NEVER_RETRY;

TelemetryClient agentClient =
TelemetryClient.buildAgentClient(sco.okHttpClient, sco.agentUrl, httpRetryPolicy);
TelemetryClient intakeClient = TelemetryClient.buildIntakeClient(config, httpRetryPolicy);

boolean useIntakeClientByDefault =
config.isCiVisibilityEnabled() && config.isCiVisibilityAgentlessEnabled();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package datadog.telemetry

import datadog.communication.http.HttpRetryPolicy
import datadog.telemetry.api.RequestType
import datadog.trace.api.Config
import okhttp3.HttpUrl
import okhttp3.OkHttpClient
import spock.lang.Specification

class TelemetryClientTest extends Specification {
Expand All @@ -13,7 +17,7 @@ class TelemetryClientTest extends Specification {
config.getSite() >> site

when:
def intakeClient = TelemetryClient.buildIntakeClient(config)
def intakeClient = TelemetryClient.buildIntakeClient(config, HttpRetryPolicy.Factory.NEVER_RETRY)

then:
intakeClient.getUrl().toString() == expectedUrl
Expand All @@ -39,7 +43,7 @@ class TelemetryClientTest extends Specification {
config.getCiVisibilityAgentlessUrl() >> ciVisAgentlessUrl

when:
def intakeClient = TelemetryClient.buildIntakeClient(config)
def intakeClient = TelemetryClient.buildIntakeClient(config, HttpRetryPolicy.Factory.NEVER_RETRY)

then:
intakeClient.getUrl().toString() == expectedUrl
Expand All @@ -51,4 +55,23 @@ class TelemetryClientTest extends Specification {
true | false | "http://ci.visibility.agentless.url" | "https://all-http-intake.logs.datad0g.com/api/v2/apmtelemetry"
true | true | null | "https://all-http-intake.logs.datad0g.com/api/v2/apmtelemetry"
}

def "Intake client retries telemetry request if configured to do so"() {
setup:
def httpClient = Mock(OkHttpClient)
def httpRetryPolicy = new HttpRetryPolicy.Factory(2, 50, 1.5, true)
def httpUrl = HttpUrl.get("https://intake.example.com")
def intakeClient = new TelemetryClient(httpClient, httpRetryPolicy, httpUrl, "dummy-api-key")

when:
intakeClient.sendHttpRequest(dummyRequest())

then:
// original request + 2 retries
3 * httpClient.newCall(_) >> { throw new ConnectException("exception") }
}

def dummyRequest() {
return new TelemetryRequest(Mock(EventSource), Mock(EventSink), 1000, RequestType.APP_STARTED, false).httpRequest()
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datadog.telemetry

import datadog.communication.ddagent.DDAgentFeaturesDiscovery
import datadog.communication.http.HttpRetryPolicy
import datadog.telemetry.api.RequestType
import okhttp3.Call
import okhttp3.HttpUrl
Expand Down Expand Up @@ -41,8 +42,8 @@ class TelemetryRouterSpecification extends Specification {
OkHttpClient okHttpClient = Mock()
DDAgentFeaturesDiscovery ddAgentFeaturesDiscovery = Mock()

def agentTelemetryClient = TelemetryClient.buildAgentClient(okHttpClient, agentUrl)
def intakeTelemetryClient = new TelemetryClient(okHttpClient, intakeUrl, apiKey)
def agentTelemetryClient = TelemetryClient.buildAgentClient(okHttpClient, agentUrl, HttpRetryPolicy.Factory.NEVER_RETRY)
def intakeTelemetryClient = new TelemetryClient(okHttpClient, HttpRetryPolicy.Factory.NEVER_RETRY, intakeUrl, apiKey)
def httpClient = new TelemetryRouter(ddAgentFeaturesDiscovery, agentTelemetryClient, intakeTelemetryClient, false)

def 'map an http status code to the correct send result'() {
Expand Down Expand Up @@ -70,6 +71,15 @@ class TelemetryRouterSpecification extends Specification {
1 * okHttpClient.newCall(_) >> { throw new IOException("exception") }
}

def 'catch InterruptedIOException from OkHttpClient and return INTERRUPTED'() {
when:
def result = httpClient.sendRequest(dummyRequest())

then:
result == TelemetryClient.Result.INTERRUPTED
1 * okHttpClient.newCall(_) >> { throw new InterruptedIOException("interrupted") }
}

def 'keep trying to send telemetry to Agent despite of return code when Intake client is null'() {
setup:
def httpClient = new TelemetryRouter(ddAgentFeaturesDiscovery, agentTelemetryClient, null, false)
Expand Down Expand Up @@ -172,6 +182,31 @@ class TelemetryRouterSpecification extends Specification {
request.header(apiKeyHeader) == apiKey
}

def 'when configured to prefer Intake: do not switch to Agent if request is interrupted'() {
Request request

setup:
def telemetryRouter = new TelemetryRouter(ddAgentFeaturesDiscovery, agentTelemetryClient, intakeTelemetryClient, true)

when:
telemetryRouter.sendRequest(dummyRequest())

then:
1 * ddAgentFeaturesDiscovery.discoverIfOutdated()
1 * ddAgentFeaturesDiscovery.supportsTelemetryProxy() >> true
1 * okHttpClient.newCall(_) >> { args -> request = args[0]; throw new InterruptedIOException("interrupted") }
request.url() == intakeUrl
request.header(apiKeyHeader) == apiKey

when:
telemetryRouter.sendRequest(dummyRequest())

then:
1 * okHttpClient.newCall(_) >> { args -> request = args[0]; mockResponse(200) }
request.url() == intakeUrl
request.header(apiKeyHeader) == apiKey
}

def 'when configured to prefer Intake: switch to Agent if Intake request fails'() {
Request request

Expand Down

0 comments on commit fd72025

Please sign in to comment.