Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry telemetry requests if CI Visibility is enabled #8147

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ public void close() {
}

public static class Factory {
public static final Factory NEVER_RETRY = new Factory(0, 0, 0);

private final int maxRetries;
private final long initialDelay;
private final double delayFactor;
Expand Down
32 changes: 24 additions & 8 deletions telemetry/src/main/java/datadog/telemetry/TelemetryClient.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package datadog.telemetry;

import datadog.communication.http.HttpRetryPolicy;
import datadog.communication.http.OkHttpUtils;
import datadog.trace.api.Config;
import datadog.trace.util.Strings;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.TimeUnit;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -17,16 +18,19 @@ public class TelemetryClient {
public enum Result {
SUCCESS,
FAILURE,
NOT_FOUND;
NOT_FOUND,
INTERRUPTED
}

public static TelemetryClient buildAgentClient(OkHttpClient okHttpClient, HttpUrl agentUrl) {
public static TelemetryClient buildAgentClient(
OkHttpClient okHttpClient, HttpUrl agentUrl, HttpRetryPolicy.Factory httpRetryPolicy) {
HttpUrl agentTelemetryUrl =
agentUrl.newBuilder().addPathSegments(AGENT_TELEMETRY_API_ENDPOINT).build();
return new TelemetryClient(okHttpClient, agentTelemetryUrl, null);
return new TelemetryClient(okHttpClient, httpRetryPolicy, agentTelemetryUrl, null);
}

public static TelemetryClient buildIntakeClient(Config config) {
public static TelemetryClient buildIntakeClient(
Config config, HttpRetryPolicy.Factory httpRetryPolicy) {
String apiKey = config.getApiKey();
if (apiKey == null) {
log.debug("Cannot create Telemetry Intake because DD_API_KEY unspecified.");
Expand All @@ -44,7 +48,7 @@ public static TelemetryClient buildIntakeClient(Config config) {

long timeoutMillis = TimeUnit.SECONDS.toMillis(config.getAgentTimeout());
OkHttpClient httpClient = OkHttpUtils.buildHttpClient(url, timeoutMillis);
return new TelemetryClient(httpClient, url, apiKey);
return new TelemetryClient(httpClient, httpRetryPolicy, url, apiKey);
}

private static String buildIntakeTelemetryUrl(Config config) {
Expand All @@ -71,11 +75,17 @@ private static String buildIntakeTelemetryUrl(Config config) {
private static final String DD_TELEMETRY_REQUEST_TYPE = "DD-Telemetry-Request-Type";

private final OkHttpClient okHttpClient;
private final HttpRetryPolicy.Factory httpRetryPolicy;
private final HttpUrl url;
private final String apiKey;

public TelemetryClient(OkHttpClient okHttpClient, HttpUrl url, String apiKey) {
public TelemetryClient(
OkHttpClient okHttpClient,
HttpRetryPolicy.Factory httpRetryPolicy,
HttpUrl url,
String apiKey) {
this.okHttpClient = okHttpClient;
this.httpRetryPolicy = httpRetryPolicy;
this.url = url;
this.apiKey = apiKey;
}
Expand All @@ -92,7 +102,9 @@ public Result sendHttpRequest(Request.Builder httpRequestBuilder) {

Request httpRequest = httpRequestBuilder.build();
String requestType = httpRequest.header(DD_TELEMETRY_REQUEST_TYPE);
try (Response response = okHttpClient.newCall(httpRequest).execute()) {

try (okhttp3.Response response =
OkHttpUtils.sendWithRetries(okHttpClient, httpRetryPolicy, httpRequest)) {
if (response.code() == 404) {
log.debug("Telemetry endpoint is disabled, dropping {} message.", requestType);
return Result.NOT_FOUND;
Expand All @@ -105,6 +117,10 @@ public Result sendHttpRequest(Request.Builder httpRequestBuilder) {
response.message());
return Result.FAILURE;
}
} catch (InterruptedIOException e) {
log.debug("Telemetry message {} sending interrupted: {}.", requestType, e.toString());
return Result.INTERRUPTED;

} catch (IOException e) {
log.debug("Telemetry message {} failed with exception: {}.", requestType, e.toString());
return Result.FAILURE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ public TelemetryClient.Result sendRequest(TelemetryRequest request) {
Request.Builder httpRequestBuilder = request.httpRequest();
TelemetryClient.Result result = currentClient.sendHttpRequest(httpRequestBuilder);

boolean requestFailed = result != TelemetryClient.Result.SUCCESS;
boolean requestFailed =
result != TelemetryClient.Result.SUCCESS
// interrupted request is most likely due to telemetry system shutdown,
// we do not want to log errors and reattempt in this case
&& result != TelemetryClient.Result.INTERRUPTED;
if (currentClient == agentClient) {
if (requestFailed) {
reportErrorOnce(currentClient.getUrl(), result);
Expand Down
11 changes: 9 additions & 2 deletions telemetry/src/main/java/datadog/telemetry/TelemetrySystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
import datadog.communication.ddagent.SharedCommunicationObjects;
import datadog.communication.http.HttpRetryPolicy;
import datadog.telemetry.TelemetryRunnable.TelemetryPeriodicAction;
import datadog.telemetry.dependency.DependencyPeriodicAction;
import datadog.telemetry.dependency.DependencyService;
Expand Down Expand Up @@ -81,8 +82,14 @@ public static void startTelemetry(
boolean debug = config.isTelemetryDebugRequestsEnabled();
DDAgentFeaturesDiscovery ddAgentFeaturesDiscovery = sco.featuresDiscovery(config);

TelemetryClient agentClient = TelemetryClient.buildAgentClient(sco.okHttpClient, sco.agentUrl);
TelemetryClient intakeClient = TelemetryClient.buildIntakeClient(config);
HttpRetryPolicy.Factory httpRetryPolicy =
config.isCiVisibilityEnabled()
? new HttpRetryPolicy.Factory(2, 100, 2.0, true)
: HttpRetryPolicy.Factory.NEVER_RETRY;

TelemetryClient agentClient =
TelemetryClient.buildAgentClient(sco.okHttpClient, sco.agentUrl, httpRetryPolicy);
TelemetryClient intakeClient = TelemetryClient.buildIntakeClient(config, httpRetryPolicy);

boolean useIntakeClientByDefault =
config.isCiVisibilityEnabled() && config.isCiVisibilityAgentlessEnabled();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package datadog.telemetry

import datadog.communication.http.HttpRetryPolicy
import datadog.telemetry.api.RequestType
import datadog.trace.api.Config
import okhttp3.HttpUrl
import okhttp3.OkHttpClient
import spock.lang.Specification

class TelemetryClientTest extends Specification {
Expand All @@ -13,7 +17,7 @@ class TelemetryClientTest extends Specification {
config.getSite() >> site

when:
def intakeClient = TelemetryClient.buildIntakeClient(config)
def intakeClient = TelemetryClient.buildIntakeClient(config, HttpRetryPolicy.Factory.NEVER_RETRY)

then:
intakeClient.getUrl().toString() == expectedUrl
Expand All @@ -39,7 +43,7 @@ class TelemetryClientTest extends Specification {
config.getCiVisibilityAgentlessUrl() >> ciVisAgentlessUrl

when:
def intakeClient = TelemetryClient.buildIntakeClient(config)
def intakeClient = TelemetryClient.buildIntakeClient(config, HttpRetryPolicy.Factory.NEVER_RETRY)

then:
intakeClient.getUrl().toString() == expectedUrl
Expand All @@ -51,4 +55,23 @@ class TelemetryClientTest extends Specification {
true | false | "http://ci.visibility.agentless.url" | "https://all-http-intake.logs.datad0g.com/api/v2/apmtelemetry"
true | true | null | "https://all-http-intake.logs.datad0g.com/api/v2/apmtelemetry"
}

def "Intake client retries telemetry request if configured to do so"() {
setup:
def httpClient = Mock(OkHttpClient)
def httpRetryPolicy = new HttpRetryPolicy.Factory(2, 50, 1.5, true)
def httpUrl = HttpUrl.get("https://intake.example.com")
def intakeClient = new TelemetryClient(httpClient, httpRetryPolicy, httpUrl, "dummy-api-key")

when:
intakeClient.sendHttpRequest(dummyRequest())

then:
// original request + 2 retries
3 * httpClient.newCall(_) >> { throw new ConnectException("exception") }
}

def dummyRequest() {
return new TelemetryRequest(Mock(EventSource), Mock(EventSink), 1000, RequestType.APP_STARTED, false).httpRequest()
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datadog.telemetry

import datadog.communication.ddagent.DDAgentFeaturesDiscovery
import datadog.communication.http.HttpRetryPolicy
import datadog.telemetry.api.RequestType
import okhttp3.Call
import okhttp3.HttpUrl
Expand Down Expand Up @@ -41,8 +42,8 @@ class TelemetryRouterSpecification extends Specification {
OkHttpClient okHttpClient = Mock()
DDAgentFeaturesDiscovery ddAgentFeaturesDiscovery = Mock()

def agentTelemetryClient = TelemetryClient.buildAgentClient(okHttpClient, agentUrl)
def intakeTelemetryClient = new TelemetryClient(okHttpClient, intakeUrl, apiKey)
def agentTelemetryClient = TelemetryClient.buildAgentClient(okHttpClient, agentUrl, HttpRetryPolicy.Factory.NEVER_RETRY)
def intakeTelemetryClient = new TelemetryClient(okHttpClient, HttpRetryPolicy.Factory.NEVER_RETRY, intakeUrl, apiKey)
def httpClient = new TelemetryRouter(ddAgentFeaturesDiscovery, agentTelemetryClient, intakeTelemetryClient, false)

def 'map an http status code to the correct send result'() {
Expand Down Expand Up @@ -70,6 +71,15 @@ class TelemetryRouterSpecification extends Specification {
1 * okHttpClient.newCall(_) >> { throw new IOException("exception") }
}

def 'catch InterruptedIOException from OkHttpClient and return INTERRUPTED'() {
when:
def result = httpClient.sendRequest(dummyRequest())

then:
result == TelemetryClient.Result.INTERRUPTED
1 * okHttpClient.newCall(_) >> { throw new InterruptedIOException("interrupted") }
}

def 'keep trying to send telemetry to Agent despite of return code when Intake client is null'() {
setup:
def httpClient = new TelemetryRouter(ddAgentFeaturesDiscovery, agentTelemetryClient, null, false)
Expand Down Expand Up @@ -172,6 +182,31 @@ class TelemetryRouterSpecification extends Specification {
request.header(apiKeyHeader) == apiKey
}

def 'when configured to prefer Intake: do not switch to Agent if request is interrupted'() {
Request request

setup:
def telemetryRouter = new TelemetryRouter(ddAgentFeaturesDiscovery, agentTelemetryClient, intakeTelemetryClient, true)

when:
telemetryRouter.sendRequest(dummyRequest())

then:
1 * ddAgentFeaturesDiscovery.discoverIfOutdated()
1 * ddAgentFeaturesDiscovery.supportsTelemetryProxy() >> true
1 * okHttpClient.newCall(_) >> { args -> request = args[0]; throw new InterruptedIOException("interrupted") }
request.url() == intakeUrl
request.header(apiKeyHeader) == apiKey

when:
telemetryRouter.sendRequest(dummyRequest())

then:
1 * okHttpClient.newCall(_) >> { args -> request = args[0]; mockResponse(200) }
request.url() == intakeUrl
request.header(apiKeyHeader) == apiKey
}

def 'when configured to prefer Intake: switch to Agent if Intake request fails'() {
Request request

Expand Down
Loading