From a403c1557026e85ee4f66dea12d96d11f40ced48 Mon Sep 17 00:00:00 2001 From: Santiago Mola Date: Fri, 23 Aug 2024 13:26:22 +0200 Subject: [PATCH] Skip telemetry heartbeat and add app closing event on shutdown flush --- .../datadog/telemetry/TelemetryRequest.java | 4 +++ .../telemetry/TelemetryRequestBody.java | 9 ++++-- .../datadog/telemetry/TelemetryRunnable.java | 5 ++-- .../datadog/telemetry/TelemetryService.java | 27 +++++++---------- .../TelemetryRunnableSpecification.groovy | 9 ++---- .../TelemetryServiceSpecification.groovy | 30 ++++++++++--------- 6 files changed, 43 insertions(+), 41 deletions(-) diff --git a/telemetry/src/main/java/datadog/telemetry/TelemetryRequest.java b/telemetry/src/main/java/datadog/telemetry/TelemetryRequest.java index 1424b3c735c..f07d13405b8 100644 --- a/telemetry/src/main/java/datadog/telemetry/TelemetryRequest.java +++ b/telemetry/src/main/java/datadog/telemetry/TelemetryRequest.java @@ -230,6 +230,10 @@ public void writeHeartbeat() { requestBody.writeHeartbeatEvent(); } + public void writeAppClosing() { + requestBody.writeAppClosingEvent(); + } + private boolean isWithinSizeLimits() { return requestBody.size() < messageBytesSoftLimit; } diff --git a/telemetry/src/main/java/datadog/telemetry/TelemetryRequestBody.java b/telemetry/src/main/java/datadog/telemetry/TelemetryRequestBody.java index 8f32e9bbec8..2c814e1a5dc 100644 --- a/telemetry/src/main/java/datadog/telemetry/TelemetryRequestBody.java +++ b/telemetry/src/main/java/datadog/telemetry/TelemetryRequestBody.java @@ -147,6 +147,11 @@ public void writeHeartbeatEvent() { endMessageIfBatch(RequestType.APP_HEARTBEAT); } + public void writeAppClosingEvent() { + beginMessageIfBatch(RequestType.APP_CLOSING); + endMessageIfBatch(RequestType.APP_CLOSING); + } + public void beginMetrics() throws IOException { beginMessageIfBatch(RequestType.GENERATE_METRICS); bodyWriter.name("namespace").value(TELEMETRY_NAMESPACE_TAG_TRACER); @@ -338,7 +343,7 @@ private void beginMessageIfBatch(RequestType messageType) { try { bodyWriter.beginObject(); bodyWriter.name("request_type").value(String.valueOf(messageType)); - if (messageType != RequestType.APP_HEARTBEAT) { + if (messageType != RequestType.APP_HEARTBEAT && messageType != RequestType.APP_CLOSING) { bodyWriter.name("payload"); bodyWriter.beginObject(); } @@ -352,7 +357,7 @@ private void endMessageIfBatch(RequestType messageType) { return; } try { - if (messageType != RequestType.APP_HEARTBEAT) { + if (messageType != RequestType.APP_HEARTBEAT && messageType != RequestType.APP_CLOSING) { bodyWriter.endObject(); // payload } bodyWriter.endObject(); // message diff --git a/telemetry/src/main/java/datadog/telemetry/TelemetryRunnable.java b/telemetry/src/main/java/datadog/telemetry/TelemetryRunnable.java index e807bc0cd82..5096da79a66 100644 --- a/telemetry/src/main/java/datadog/telemetry/TelemetryRunnable.java +++ b/telemetry/src/main/java/datadog/telemetry/TelemetryRunnable.java @@ -84,7 +84,6 @@ public void run() { if (startupEventSent) { flushPendingTelemetryData(); - telemetryService.sendAppClosingEvent(); } log.debug("Telemetry thread finished"); } @@ -124,7 +123,7 @@ private void mainLoopIteration() throws InterruptedException { action.doIteration(this.telemetryService); } for (int i = 0; i < MAX_CONSECUTIVE_REQUESTS; i++) { - if (!telemetryService.sendTelemetryEvents()) { + if (!telemetryService.sendTelemetryEvents(false)) { // stop if there is no more data to be sent, or it failed to send a request break; } @@ -157,7 +156,7 @@ private void flushPendingTelemetryData() { for (final TelemetryPeriodicAction action : actions) { action.doIteration(telemetryService); } - telemetryService.sendTelemetryEvents(); + telemetryService.sendTelemetryEvents(true); } interface ThreadSleeper { diff --git a/telemetry/src/main/java/datadog/telemetry/TelemetryService.java b/telemetry/src/main/java/datadog/telemetry/TelemetryService.java index 6e71cb7c6b3..215483d3588 100644 --- a/telemetry/src/main/java/datadog/telemetry/TelemetryService.java +++ b/telemetry/src/main/java/datadog/telemetry/TelemetryService.java @@ -124,19 +124,6 @@ public boolean addDistributionSeries(DistributionSeries series) { return this.distributionSeries.offer(series); } - public void sendAppClosingEvent() { - TelemetryRequest telemetryRequest = - new TelemetryRequest( - this.eventSource, - EventSink.NOOP, - messageBytesSoftLimit, - RequestType.APP_CLOSING, - debug); - if (telemetryRouter.sendRequest(telemetryRequest) != TelemetryClient.Result.SUCCESS) { - log.warn("Couldn't send app-closing event!"); - } - } - // keeps track of unsent events from the previous attempt private BufferedEvents bufferedEvents; @@ -172,10 +159,11 @@ public boolean sendAppStartedEvent() { } /** + * @param isClosing True if this is the last telemetry message, sent on shutdown flush. * @return true - only part of data has been sent because of the request size limit false - all * data has been sent, or it has failed sending a request */ - public boolean sendTelemetryEvents() { + public boolean sendTelemetryEvents(final boolean isClosing) { EventSource eventSource; EventSink eventSink; if (bufferedEvents == null) { @@ -192,7 +180,7 @@ public boolean sendTelemetryEvents() { } TelemetryRequest request; boolean isMoreDataAvailable = false; - if (eventSource.isEmpty()) { + if (eventSource.isEmpty() && !isClosing) { log.debug("Preparing app-heartbeat request"); request = new TelemetryRequest( @@ -202,7 +190,14 @@ public boolean sendTelemetryEvents() { request = new TelemetryRequest( eventSource, eventSink, messageBytesSoftLimit, RequestType.MESSAGE_BATCH, debug); - request.writeHeartbeat(); + if (isClosing) { + // Write early, if moved, ensure it does not get skipped because of size limits. + request.writeAppClosing(); + } else { + // If this is a flush, no need to sent the heartbeat. If it was needed, it would have been + // scheduled already. + request.writeHeartbeat(); + } request.writeConfigurations(); request.writeIntegrations(); request.writeDependencies(); diff --git a/telemetry/src/test/groovy/datadog/telemetry/TelemetryRunnableSpecification.groovy b/telemetry/src/test/groovy/datadog/telemetry/TelemetryRunnableSpecification.groovy index 6ddebef3782..dd11c07fdb8 100644 --- a/telemetry/src/test/groovy/datadog/telemetry/TelemetryRunnableSpecification.groovy +++ b/telemetry/src/test/groovy/datadog/telemetry/TelemetryRunnableSpecification.groovy @@ -66,7 +66,7 @@ class TelemetryRunnableSpecification extends DDSpecification { 1 * periodicAction.doIteration(telemetryService) then: 'two partial and one final telemetry data requests' - 3 * telemetryService.sendTelemetryEvents() >>> [true, true, false] + 3 * telemetryService.sendTelemetryEvents(false) >>> [true, true, false] 1 * timeSource.getCurrentTimeMillis() >> 60 * 1000 + 1 1 * sleeperMock.sleep(9999) 0 * _ @@ -162,7 +162,7 @@ class TelemetryRunnableSpecification extends DDSpecification { 1 * periodicAction.doIteration(telemetryService) then: - 1 * telemetryService.sendTelemetryEvents() + 1 * telemetryService.sendTelemetryEvents(false) 1 * timeSource.getCurrentTimeMillis() >> 120 * 1000 + 7 1 * sleeperMock.sleep(9993) @@ -191,10 +191,7 @@ class TelemetryRunnableSpecification extends DDSpecification { 1 * metricCollector.drain() >> [] 1 * metricCollector.drainDistributionSeries() >> [] 1 * periodicAction.doIteration(telemetryService) - 1 * telemetryService.sendTelemetryEvents() - - then: - 1 * telemetryService.sendAppClosingEvent() + 1 * telemetryService.sendTelemetryEvents(true) 0 * _ } diff --git a/telemetry/src/test/groovy/datadog/telemetry/TelemetryServiceSpecification.groovy b/telemetry/src/test/groovy/datadog/telemetry/TelemetryServiceSpecification.groovy index 4a9e7316f81..dee285f59f1 100644 --- a/telemetry/src/test/groovy/datadog/telemetry/TelemetryServiceSpecification.groovy +++ b/telemetry/src/test/groovy/datadog/telemetry/TelemetryServiceSpecification.groovy @@ -41,7 +41,7 @@ class TelemetryServiceSpecification extends DDSpecification { when: 'second iteration' testHttpClient.expectRequest(TelemetryClient.Result.SUCCESS) - telemetryService.sendTelemetryEvents() + telemetryService.sendTelemetryEvents(false) then: 'app-heartbeat only' testHttpClient.assertRequestBody(RequestType.APP_HEARTBEAT) @@ -49,7 +49,7 @@ class TelemetryServiceSpecification extends DDSpecification { when: 'third iteration' testHttpClient.expectRequest(TelemetryClient.Result.SUCCESS) - telemetryService.sendTelemetryEvents() + telemetryService.sendTelemetryEvents(false) then: 'app-heartbeat only' testHttpClient.assertRequestBody(RequestType.APP_HEARTBEAT) @@ -81,7 +81,7 @@ class TelemetryServiceSpecification extends DDSpecification { when: testHttpClient.expectRequest(TelemetryClient.Result.SUCCESS) - telemetryService.sendTelemetryEvents() + telemetryService.sendTelemetryEvents(false) then: testHttpClient.assertRequestBody(RequestType.MESSAGE_BATCH) @@ -99,7 +99,7 @@ class TelemetryServiceSpecification extends DDSpecification { when: 'second iteration heartbeat only' testHttpClient.expectRequest(TelemetryClient.Result.SUCCESS) - telemetryService.sendTelemetryEvents() + telemetryService.sendTelemetryEvents(false) then: testHttpClient.assertRequestBody(RequestType.APP_HEARTBEAT).assertNoPayload() @@ -108,7 +108,7 @@ class TelemetryServiceSpecification extends DDSpecification { when: 'third iteration metrics data' telemetryService.addMetric(metric) testHttpClient.expectRequest(TelemetryClient.Result.SUCCESS) - telemetryService.sendTelemetryEvents() + telemetryService.sendTelemetryEvents(false) then: testHttpClient.assertRequestBody(RequestType.MESSAGE_BATCH) @@ -143,7 +143,7 @@ class TelemetryServiceSpecification extends DDSpecification { and: 'send messages' testHttpClient.expectRequest(TelemetryClient.Result.SUCCESS) - telemetryService.sendTelemetryEvents() + telemetryService.sendTelemetryEvents(false) then: testHttpClient.assertRequestBody(RequestType.MESSAGE_BATCH) @@ -229,7 +229,7 @@ class TelemetryServiceSpecification extends DDSpecification { when: 'successful batch attempt' testHttpClient.expectRequest(TelemetryClient.Result.SUCCESS) - telemetryService.sendTelemetryEvents() + telemetryService.sendTelemetryEvents(false) then: 'attempt batch with SUCCESS' testHttpClient.assertRequestBody(RequestType.MESSAGE_BATCH) @@ -247,7 +247,7 @@ class TelemetryServiceSpecification extends DDSpecification { when: 'attempt with NOT_FOUND error' testHttpClient.expectRequest(TelemetryClient.Result.NOT_FOUND) - telemetryService.sendTelemetryEvents() + telemetryService.sendTelemetryEvents(false) then: 'message-batch attempted with heartbeat' testHttpClient.assertRequestBody(RequestType.APP_HEARTBEAT).assertNoPayload() @@ -261,10 +261,12 @@ class TelemetryServiceSpecification extends DDSpecification { when: testHttpClient.expectRequest(TelemetryClient.Result.SUCCESS) - telemetryService.sendAppClosingEvent() + telemetryService.sendTelemetryEvents(true) then: - testHttpClient.assertRequestBody(RequestType.APP_CLOSING) + testHttpClient.assertRequestBody(RequestType.MESSAGE_BATCH) + .assertBatch(1) + .assertFirstMessage(RequestType.APP_CLOSING).hasNoPayload() testHttpClient.assertNoMoreRequests() } @@ -302,7 +304,7 @@ class TelemetryServiceSpecification extends DDSpecification { when: 'send a heartbeat request without telemetry data to measure body size to set stable request size limit' testHttpClient.expectRequest(TelemetryClient.Result.SUCCESS) - telemetryService.sendTelemetryEvents() + telemetryService.sendTelemetryEvents(false) then: 'get body size' def bodySize = testHttpClient.assertRequestBody(RequestType.APP_HEARTBEAT).bodySize() @@ -320,7 +322,7 @@ class TelemetryServiceSpecification extends DDSpecification { telemetryService.addProductChange(productChange) testHttpClient.expectRequest(TelemetryClient.Result.SUCCESS) - telemetryService.sendTelemetryEvents() + telemetryService.sendTelemetryEvents(false) then: 'attempt with SUCCESS' testHttpClient.assertRequestBody(RequestType.MESSAGE_BATCH) @@ -335,7 +337,7 @@ class TelemetryServiceSpecification extends DDSpecification { when: 'sending second part of data' testHttpClient.expectRequest(TelemetryClient.Result.SUCCESS) - !telemetryService.sendTelemetryEvents() + !telemetryService.sendTelemetryEvents(false) then: testHttpClient.assertRequestBody(RequestType.MESSAGE_BATCH) @@ -397,7 +399,7 @@ class TelemetryServiceSpecification extends DDSpecification { when: testHttpClient.expectRequest(resultCode) - telemetryService.sendTelemetryEvents() + telemetryService.sendTelemetryEvents(false) then: testHttpClient.assertRequestBody(RequestType.MESSAGE_BATCH)