Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip telemetry heartbeat and add app closing event on shutdown flush #7496

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ public void writeHeartbeat() {
requestBody.writeHeartbeatEvent();
}

public void writeAppClosing() {
requestBody.writeAppClosingEvent();
}

private boolean isWithinSizeLimits() {
return requestBody.size() < messageBytesSoftLimit;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ public void writeHeartbeatEvent() {
endMessageIfBatch(RequestType.APP_HEARTBEAT);
}

public void writeAppClosingEvent() {
beginMessageIfBatch(RequestType.APP_CLOSING);
endMessageIfBatch(RequestType.APP_CLOSING);
}

public void beginMetrics() throws IOException {
beginMessageIfBatch(RequestType.GENERATE_METRICS);
bodyWriter.name("namespace").value(TELEMETRY_NAMESPACE_TAG_TRACER);
Expand Down Expand Up @@ -338,7 +343,7 @@ private void beginMessageIfBatch(RequestType messageType) {
try {
bodyWriter.beginObject();
bodyWriter.name("request_type").value(String.valueOf(messageType));
if (messageType != RequestType.APP_HEARTBEAT) {
if (messageType != RequestType.APP_HEARTBEAT && messageType != RequestType.APP_CLOSING) {
bodyWriter.name("payload");
bodyWriter.beginObject();
}
Expand All @@ -352,7 +357,7 @@ private void endMessageIfBatch(RequestType messageType) {
return;
}
try {
if (messageType != RequestType.APP_HEARTBEAT) {
if (messageType != RequestType.APP_HEARTBEAT && messageType != RequestType.APP_CLOSING) {
bodyWriter.endObject(); // payload
}
bodyWriter.endObject(); // message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ public void run() {

if (startupEventSent) {
flushPendingTelemetryData();
telemetryService.sendAppClosingEvent();
}
log.debug("Telemetry thread finished");
}
Expand Down Expand Up @@ -124,7 +123,7 @@ private void mainLoopIteration() throws InterruptedException {
action.doIteration(this.telemetryService);
}
for (int i = 0; i < MAX_CONSECUTIVE_REQUESTS; i++) {
if (!telemetryService.sendTelemetryEvents()) {
if (!telemetryService.sendTelemetryEvents(false)) {
// stop if there is no more data to be sent, or it failed to send a request
break;
}
Expand Down Expand Up @@ -157,7 +156,7 @@ private void flushPendingTelemetryData() {
for (final TelemetryPeriodicAction action : actions) {
action.doIteration(telemetryService);
}
telemetryService.sendTelemetryEvents();
telemetryService.sendTelemetryEvents(true);
}

interface ThreadSleeper {
Expand Down
27 changes: 11 additions & 16 deletions telemetry/src/main/java/datadog/telemetry/TelemetryService.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,19 +124,6 @@ public boolean addDistributionSeries(DistributionSeries series) {
return this.distributionSeries.offer(series);
}

public void sendAppClosingEvent() {
TelemetryRequest telemetryRequest =
new TelemetryRequest(
this.eventSource,
EventSink.NOOP,
messageBytesSoftLimit,
RequestType.APP_CLOSING,
debug);
if (telemetryRouter.sendRequest(telemetryRequest) != TelemetryClient.Result.SUCCESS) {
log.warn("Couldn't send app-closing event!");
}
}

// keeps track of unsent events from the previous attempt
private BufferedEvents bufferedEvents;

Expand Down Expand Up @@ -172,10 +159,11 @@ public boolean sendAppStartedEvent() {
}

/**
* @param isClosing True if this is the last telemetry message, sent on shutdown flush.
* @return true - only part of data has been sent because of the request size limit false - all
* data has been sent, or it has failed sending a request
*/
public boolean sendTelemetryEvents() {
public boolean sendTelemetryEvents(final boolean isClosing) {
EventSource eventSource;
EventSink eventSink;
if (bufferedEvents == null) {
Expand All @@ -192,7 +180,7 @@ public boolean sendTelemetryEvents() {
}
TelemetryRequest request;
boolean isMoreDataAvailable = false;
if (eventSource.isEmpty()) {
if (eventSource.isEmpty() && !isClosing) {
log.debug("Preparing app-heartbeat request");
request =
new TelemetryRequest(
Expand All @@ -202,7 +190,14 @@ public boolean sendTelemetryEvents() {
request =
new TelemetryRequest(
eventSource, eventSink, messageBytesSoftLimit, RequestType.MESSAGE_BATCH, debug);
request.writeHeartbeat();
if (isClosing) {
// Write early, if moved, ensure it does not get skipped because of size limits.
request.writeAppClosing();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a side effect, this changes the request type for the app closing event, it won't be app-closing anymore but a message-batch.

Can this cause any issues further down the line in the backend (probably not)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really. The only problem is that we won't be optimizing the message size for the case where there is only an app-closing message. I think that will usually not be the case because of metrics, and anyway it's not that much overhead.

} else {
// If this is a flush, no need to sent the heartbeat. If it was needed, it would have been
// scheduled already.
request.writeHeartbeat();
}
request.writeConfigurations();
request.writeIntegrations();
request.writeDependencies();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class TelemetryRunnableSpecification extends DDSpecification {
1 * periodicAction.doIteration(telemetryService)

then: 'two partial and one final telemetry data requests'
3 * telemetryService.sendTelemetryEvents() >>> [true, true, false]
3 * telemetryService.sendTelemetryEvents(false) >>> [true, true, false]
1 * timeSource.getCurrentTimeMillis() >> 60 * 1000 + 1
1 * sleeperMock.sleep(9999)
0 * _
Expand Down Expand Up @@ -162,7 +162,7 @@ class TelemetryRunnableSpecification extends DDSpecification {
1 * periodicAction.doIteration(telemetryService)

then:
1 * telemetryService.sendTelemetryEvents()
1 * telemetryService.sendTelemetryEvents(false)
1 * timeSource.getCurrentTimeMillis() >> 120 * 1000 + 7
1 * sleeperMock.sleep(9993)

Expand Down Expand Up @@ -191,10 +191,7 @@ class TelemetryRunnableSpecification extends DDSpecification {
1 * metricCollector.drain() >> []
1 * metricCollector.drainDistributionSeries() >> []
1 * periodicAction.doIteration(telemetryService)
1 * telemetryService.sendTelemetryEvents()

then:
1 * telemetryService.sendAppClosingEvent()
1 * telemetryService.sendTelemetryEvents(true)
0 * _
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ class TelemetryServiceSpecification extends DDSpecification {

when: 'second iteration'
testHttpClient.expectRequest(TelemetryClient.Result.SUCCESS)
telemetryService.sendTelemetryEvents()
telemetryService.sendTelemetryEvents(false)

then: 'app-heartbeat only'
testHttpClient.assertRequestBody(RequestType.APP_HEARTBEAT)
testHttpClient.assertNoMoreRequests()

when: 'third iteration'
testHttpClient.expectRequest(TelemetryClient.Result.SUCCESS)
telemetryService.sendTelemetryEvents()
telemetryService.sendTelemetryEvents(false)

then: 'app-heartbeat only'
testHttpClient.assertRequestBody(RequestType.APP_HEARTBEAT)
Expand Down Expand Up @@ -81,7 +81,7 @@ class TelemetryServiceSpecification extends DDSpecification {

when:
testHttpClient.expectRequest(TelemetryClient.Result.SUCCESS)
telemetryService.sendTelemetryEvents()
telemetryService.sendTelemetryEvents(false)

then:
testHttpClient.assertRequestBody(RequestType.MESSAGE_BATCH)
Expand All @@ -99,7 +99,7 @@ class TelemetryServiceSpecification extends DDSpecification {

when: 'second iteration heartbeat only'
testHttpClient.expectRequest(TelemetryClient.Result.SUCCESS)
telemetryService.sendTelemetryEvents()
telemetryService.sendTelemetryEvents(false)

then:
testHttpClient.assertRequestBody(RequestType.APP_HEARTBEAT).assertNoPayload()
Expand All @@ -108,7 +108,7 @@ class TelemetryServiceSpecification extends DDSpecification {
when: 'third iteration metrics data'
telemetryService.addMetric(metric)
testHttpClient.expectRequest(TelemetryClient.Result.SUCCESS)
telemetryService.sendTelemetryEvents()
telemetryService.sendTelemetryEvents(false)

then:
testHttpClient.assertRequestBody(RequestType.MESSAGE_BATCH)
Expand Down Expand Up @@ -143,7 +143,7 @@ class TelemetryServiceSpecification extends DDSpecification {

and: 'send messages'
testHttpClient.expectRequest(TelemetryClient.Result.SUCCESS)
telemetryService.sendTelemetryEvents()
telemetryService.sendTelemetryEvents(false)

then:
testHttpClient.assertRequestBody(RequestType.MESSAGE_BATCH)
Expand Down Expand Up @@ -229,7 +229,7 @@ class TelemetryServiceSpecification extends DDSpecification {

when: 'successful batch attempt'
testHttpClient.expectRequest(TelemetryClient.Result.SUCCESS)
telemetryService.sendTelemetryEvents()
telemetryService.sendTelemetryEvents(false)

then: 'attempt batch with SUCCESS'
testHttpClient.assertRequestBody(RequestType.MESSAGE_BATCH)
Expand All @@ -247,7 +247,7 @@ class TelemetryServiceSpecification extends DDSpecification {

when: 'attempt with NOT_FOUND error'
testHttpClient.expectRequest(TelemetryClient.Result.NOT_FOUND)
telemetryService.sendTelemetryEvents()
telemetryService.sendTelemetryEvents(false)

then: 'message-batch attempted with heartbeat'
testHttpClient.assertRequestBody(RequestType.APP_HEARTBEAT).assertNoPayload()
Expand All @@ -261,10 +261,12 @@ class TelemetryServiceSpecification extends DDSpecification {

when:
testHttpClient.expectRequest(TelemetryClient.Result.SUCCESS)
telemetryService.sendAppClosingEvent()
telemetryService.sendTelemetryEvents(true)

then:
testHttpClient.assertRequestBody(RequestType.APP_CLOSING)
testHttpClient.assertRequestBody(RequestType.MESSAGE_BATCH)
.assertBatch(1)
.assertFirstMessage(RequestType.APP_CLOSING).hasNoPayload()
testHttpClient.assertNoMoreRequests()
}

Expand Down Expand Up @@ -302,7 +304,7 @@ class TelemetryServiceSpecification extends DDSpecification {

when: 'send a heartbeat request without telemetry data to measure body size to set stable request size limit'
testHttpClient.expectRequest(TelemetryClient.Result.SUCCESS)
telemetryService.sendTelemetryEvents()
telemetryService.sendTelemetryEvents(false)

then: 'get body size'
def bodySize = testHttpClient.assertRequestBody(RequestType.APP_HEARTBEAT).bodySize()
Expand All @@ -320,7 +322,7 @@ class TelemetryServiceSpecification extends DDSpecification {
telemetryService.addProductChange(productChange)

testHttpClient.expectRequest(TelemetryClient.Result.SUCCESS)
telemetryService.sendTelemetryEvents()
telemetryService.sendTelemetryEvents(false)

then: 'attempt with SUCCESS'
testHttpClient.assertRequestBody(RequestType.MESSAGE_BATCH)
Expand All @@ -335,7 +337,7 @@ class TelemetryServiceSpecification extends DDSpecification {

when: 'sending second part of data'
testHttpClient.expectRequest(TelemetryClient.Result.SUCCESS)
!telemetryService.sendTelemetryEvents()
!telemetryService.sendTelemetryEvents(false)

then:
testHttpClient.assertRequestBody(RequestType.MESSAGE_BATCH)
Expand Down Expand Up @@ -397,7 +399,7 @@ class TelemetryServiceSpecification extends DDSpecification {

when:
testHttpClient.expectRequest(resultCode)
telemetryService.sendTelemetryEvents()
telemetryService.sendTelemetryEvents(false)

then:
testHttpClient.assertRequestBody(RequestType.MESSAGE_BATCH)
Expand Down
Loading