From 60d9bf84b7c5f27c47c23a6c7c113833e9055b75 Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Mon, 9 Dec 2024 13:08:35 -0700 Subject: [PATCH 1/7] Parse JSON --- .../messaging/DatadogAttributeParser.java | 1 + .../aws/v1/sqs/MessageExtractAdapter.java | 32 +++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/messaging/DatadogAttributeParser.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/messaging/DatadogAttributeParser.java index 4a901f9b370..f0fded8506c 100644 --- a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/messaging/DatadogAttributeParser.java +++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/messaging/DatadogAttributeParser.java @@ -26,6 +26,7 @@ public static void forEachProperty(AgentPropagation.KeyClassifier classifier, St acceptJsonProperty(classifier, json, "x-datadog-sampling-priority"); } if (Config.get().isDataStreamsEnabled()) { + System.out.println("DSM ENABLED!!!: " + json); acceptJsonProperty(classifier, json, "dd-pathway-ctx-base64"); } } catch (Exception e) { diff --git a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageExtractAdapter.java b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageExtractAdapter.java index bea9bdd3c54..e314995f783 100644 --- a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageExtractAdapter.java +++ b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageExtractAdapter.java @@ -2,8 +2,14 @@ import com.amazonaws.services.sqs.model.Message; import com.amazonaws.services.sqs.model.MessageAttributeValue; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import datadog.trace.bootstrap.instrumentation.api.AgentPropagation; import datadog.trace.bootstrap.instrumentation.messaging.DatadogAttributeParser; + +import java.nio.ByteBuffer; +import java.io.IOException; +import java.util.Base64; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,6 +34,32 @@ public void forEachKey(Message carrier, AgentPropagation.KeyClassifier classifie } else if ("Binary".equals(datadog.getDataType())) { DatadogAttributeParser.forEachProperty(classifier, datadog.getBinaryValue()); } + } else { + try { + this.forEachKeyInBody(carrier.getBody(), classifier); + } catch (IOException e) { + log.warn("Error extracting Datadog context from SQS message body", e); + } + } + } + + public void forEachKeyInBody(String body, AgentPropagation.KeyClassifier classifier) throws IOException { + ObjectMapper objectMapper = new ObjectMapper(); + + // Parse the JSON string into a JsonNode + JsonNode rootNode = objectMapper.readTree(body); + + // Navigate to MessageAttributes._datadog + JsonNode messageAttributes = rootNode.path("MessageAttributes").path("_datadog"); + + // Extract Value and Type + String value = messageAttributes.path("Value").asText(); + String type = messageAttributes.path("Type").asText(); + if ("String".equals(type)) { + DatadogAttributeParser.forEachProperty(classifier, value); + } else if ("Binary".equals(type)) { + ByteBuffer decodedValue = ByteBuffer.wrap(Base64.getDecoder().decode(value)); + DatadogAttributeParser.forEachProperty(classifier, decodedValue); } } From fa946284164ffa531d15e1d6dba1830178644dd2 Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Mon, 9 Dec 2024 13:42:12 -0700 Subject: [PATCH 2/7] put config behind ff --- .../instrumentation/messaging/DatadogAttributeParser.java | 1 - .../instrumentation/aws/v1/sqs/MessageExtractAdapter.java | 5 ++++- .../trace/api/config/TraceInstrumentationConfig.java | 1 + internal-api/src/main/java/datadog/trace/api/Config.java | 6 ++++++ 4 files changed, 11 insertions(+), 2 deletions(-) diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/messaging/DatadogAttributeParser.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/messaging/DatadogAttributeParser.java index f0fded8506c..4a901f9b370 100644 --- a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/messaging/DatadogAttributeParser.java +++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/messaging/DatadogAttributeParser.java @@ -26,7 +26,6 @@ public static void forEachProperty(AgentPropagation.KeyClassifier classifier, St acceptJsonProperty(classifier, json, "x-datadog-sampling-priority"); } if (Config.get().isDataStreamsEnabled()) { - System.out.println("DSM ENABLED!!!: " + json); acceptJsonProperty(classifier, json, "dd-pathway-ctx-base64"); } } catch (Exception e) { diff --git a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageExtractAdapter.java b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageExtractAdapter.java index e314995f783..58c06562644 100644 --- a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageExtractAdapter.java +++ b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageExtractAdapter.java @@ -4,6 +4,7 @@ import com.amazonaws.services.sqs.model.MessageAttributeValue; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import datadog.trace.api.Config; import datadog.trace.bootstrap.instrumentation.api.AgentPropagation; import datadog.trace.bootstrap.instrumentation.messaging.DatadogAttributeParser; @@ -18,6 +19,8 @@ public final class MessageExtractAdapter implements AgentPropagation.ContextVisi private static final Logger log = LoggerFactory.getLogger(MessageExtractAdapter.class); public static final MessageExtractAdapter GETTER = new MessageExtractAdapter(); + public static final boolean SHOULD_EXTRACT_CONTEXT_FROM_BODY = + Config.get().isSqsBodyPropagationEnabled(); @Override public void forEachKey(Message carrier, AgentPropagation.KeyClassifier classifier) { @@ -34,7 +37,7 @@ public void forEachKey(Message carrier, AgentPropagation.KeyClassifier classifie } else if ("Binary".equals(datadog.getDataType())) { DatadogAttributeParser.forEachProperty(classifier, datadog.getBinaryValue()); } - } else { + } else if (SHOULD_EXTRACT_CONTEXT_FROM_BODY) { try { this.forEachKeyInBody(carrier.getBody(), classifier); } catch (IOException e) { diff --git a/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java b/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java index 0d81bf0ca85..bb9cc7cc902 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java @@ -158,6 +158,7 @@ public final class TraceInstrumentationConfig { public static final String JAX_RS_ADDITIONAL_ANNOTATIONS = "trace.jax-rs.additional.annotations"; /** If set, the instrumentation will set its resource name on the local root too. */ public static final String AXIS_PROMOTE_RESOURCE_NAME = "trace.axis.promote.resource-name"; + public static final String SQS_BODY_PROPAGATION_ENABLED = "sqs.body.propagation.enabled"; private TraceInstrumentationConfig() {} } diff --git a/internal-api/src/main/java/datadog/trace/api/Config.java b/internal-api/src/main/java/datadog/trace/api/Config.java index e925bd5aaf1..8b5a2cc40d8 100644 --- a/internal-api/src/main/java/datadog/trace/api/Config.java +++ b/internal-api/src/main/java/datadog/trace/api/Config.java @@ -413,6 +413,7 @@ public static String getHostName() { private final boolean awsPropagationEnabled; private final boolean sqsPropagationEnabled; + private final boolean sqsBodyPropagationEnabled; private final boolean kafkaClientPropagationEnabled; private final Set kafkaClientPropagationDisabledTopics; @@ -1571,6 +1572,7 @@ PROFILING_DATADOG_PROFILER_ENABLED, isDatadogProfilerSafeInCurrentEnvironment()) awsPropagationEnabled = isPropagationEnabled(true, "aws", "aws-sdk"); sqsPropagationEnabled = isPropagationEnabled(true, "sqs"); + sqsBodyPropagationEnabled = configProvider.getBoolean(SQS_BODY_PROPAGATION_ENABLED, false); kafkaClientPropagationEnabled = isPropagationEnabled(true, "kafka", "kafka.client"); kafkaClientPropagationDisabledTopics = @@ -3048,6 +3050,10 @@ public boolean isSqsPropagationEnabled() { return sqsPropagationEnabled; } + public boolean isSqsBodyPropagationEnabled() { + return sqsBodyPropagationEnabled; + } + public boolean isKafkaClientPropagationEnabled() { return kafkaClientPropagationEnabled; } From 6f658fe6a424b64d9a5f0b3a7145a63b41494d2f Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Mon, 9 Dec 2024 22:18:20 -0700 Subject: [PATCH 3/7] Add test --- .../aws/v1/sqs/MessageExtractAdapter.java | 6 +-- .../src/test/groovy/SqsClientTest.groovy | 48 +++++++++++++++++++ .../config/TraceInstrumentationConfig.java | 1 + 3 files changed, 52 insertions(+), 3 deletions(-) diff --git a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageExtractAdapter.java b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageExtractAdapter.java index 58c06562644..b097ae14e44 100644 --- a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageExtractAdapter.java +++ b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageExtractAdapter.java @@ -7,9 +7,8 @@ import datadog.trace.api.Config; import datadog.trace.bootstrap.instrumentation.api.AgentPropagation; import datadog.trace.bootstrap.instrumentation.messaging.DatadogAttributeParser; - -import java.nio.ByteBuffer; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Base64; import java.util.Map; import org.slf4j.Logger; @@ -46,7 +45,8 @@ public void forEachKey(Message carrier, AgentPropagation.KeyClassifier classifie } } - public void forEachKeyInBody(String body, AgentPropagation.KeyClassifier classifier) throws IOException { + public void forEachKeyInBody(String body, AgentPropagation.KeyClassifier classifier) + throws IOException { ObjectMapper objectMapper = new ObjectMapper(); // Parse the JSON string into a JsonNode diff --git a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy index af4cb4f219a..6a0e14613ec 100644 --- a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy +++ b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy @@ -44,6 +44,7 @@ abstract class SqsClientTest extends VersionedNamingTestBase { // Set a service name that gets sorted early with SORT_BY_NAMES injectSysConfig(GeneralConfig.SERVICE_NAME, "A-service") injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, isDataStreamsEnabled().toString()) + injectSysConfig("sqs.body.propagation.enabled", "true") } @Shared @@ -511,6 +512,22 @@ class SqsClientV0DataStreamsTest extends SqsClientTest { } class SqsClientV1DataStreamsForkedTest extends SqsClientTest { + private static final String MESSAGE_WITH_ATTRIBUTES = "{\n" + + " \"Type\" : \"Notification\",\n" + + " \"MessageId\" : \"cb337e2a-1c06-5629-86f5-21fba14fb492\",\n" + + " \"TopicArn\" : \"arn:aws:sns:us-east-1:223300679234:dsm-dev-sns-topic\",\n" + + " \"Message\" : \"Some message\",\n" + + " \"Timestamp\" : \"2024-12-10T03:52:41.662Z\",\n" + + " \"SignatureVersion\" : \"1\",\n" + + " \"Signature\" : \"ZsEewd5gNR8jLC08TenLDp5rhdBtGIdAzWk7j6fzDyUzb/t56R9SBPrNJtjsPO8Ep8v/iGs/wSFUrnm+Zh3N1duc3alR1bKTAbDlzbEBxaHsGcNwzMz14JF7bKLE+3nPIi0/kT8EuIiRevGqPtCG/NEe9oW2dOyvYZvt+L7GC0AS9L0yJp8Ag7NkgNvYbIqPeKcjj8S7WRiV95Useg0P46e5pn5FXmNKPlpIqYN28jnrTZHWUDTiO5RE7lfFcdH2tBaYSR9F/PwA1Mga5NrTxlZp/yDoOlOUFj5zXAtDDpjNTcR48jAu66Mpi1wom7Si7vc3ZsYzN2Z2ig/aUJLaNA==\",\n" + + " \"SigningCertURL\" : \"https://sns.us-east-1.amazonaws.com/SimpleNotificationService-some-pem.pem\",\n" + + " \"UnsubscribeURL\" : \"https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:7270067952343:dsm-dev-sns-topic:0d82adcc-5b42-4035-81c4-22ccd126fc41\",\n" + + " \"MessageAttributes\" : {\n" + + " \"_datadog\" : {\"Type\":\"Binary\",\"Value\":\"eyJ4LWRhdGFkb2ctdHJhY2UtaWQiOiI1ODExMzQ0MDA5MDA2NDM1Njk0IiwieC1kYXRhZG9nLXBhcmVudC1pZCI6Ijc3MjQzODMxMjg4OTMyNDAxNDAiLCJ4LWRhdGFkb2ctc2FtcGxpbmctcHJpb3JpdHkiOiIwIiwieC1kYXRhZG9nLXRhZ3MiOiJfZGQucC50aWQ9Njc1N2JiMDkwMDAwMDAwMCIsInRyYWNlcGFyZW50IjoiMDAtNjc1N2JiMDkwMDAwMDAwMDUwYTYwYTk2MWM2YzRkNmUtNmIzMjg1ODdiYWIzYjM0Yy0wMCIsInRyYWNlc3RhdGUiOiJkZD1zOjA7cDo2YjMyODU4N2JhYjNiMzRjO3QudGlkOjY3NTdiYjA5MDAwMDAwMDAiLCJkZC1wYXRod2F5LWN0eC1iYXNlNjQiOiJkdzdKcjU0VERkcjA5cFRyOVdUMDlwVHI5V1E9In0=\"}\n" + + " }\n" + + "}" + + @Override String expectedOperation(String awsService, String awsOperation) { if (awsService == "SQS") { @@ -537,6 +554,37 @@ class SqsClientV1DataStreamsForkedTest extends SqsClientTest { int version() { 1 } + + def "Data streams context extracted from message body"() { + setup: + def client = AmazonSQSClientBuilder.standard() + .withEndpointConfiguration(endpoint) + .withCredentials(credentialsProvider) + .build() + def queueUrl = client.createQueue('somequeue').queueUrl + TEST_WRITER.clear() + + when: + injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, "false") + client.sendMessage(queueUrl, MESSAGE_WITH_ATTRIBUTES) + injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, "true") + def messages = client.receiveMessage(queueUrl).messages + messages.forEach {/* consume to create message spans */ } + + TEST_DATA_STREAMS_WRITER.waitForGroups(1) + + then: + StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == -2734507826469073289 } + + verifyAll(first) { + edgeTags == ["direction:in", "topic:somequeue", "type:sqs"] + edgeTags.size() == 3 + } + + cleanup: + client.shutdown() + } + } diff --git a/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java b/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java index bb9cc7cc902..c20aca90e2a 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java @@ -158,6 +158,7 @@ public final class TraceInstrumentationConfig { public static final String JAX_RS_ADDITIONAL_ANNOTATIONS = "trace.jax-rs.additional.annotations"; /** If set, the instrumentation will set its resource name on the local root too. */ public static final String AXIS_PROMOTE_RESOURCE_NAME = "trace.axis.promote.resource-name"; + public static final String SQS_BODY_PROPAGATION_ENABLED = "sqs.body.propagation.enabled"; private TraceInstrumentationConfig() {} From 21c7767d6f2d82cf7f5eba13ab2f7a5c378a52e0 Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Tue, 10 Dec 2024 08:45:58 -0700 Subject: [PATCH 4/7] Add test case when there are no message attributes --- .../src/test/groovy/SqsClientTest.groovy | 31 ++++++++++++++++++- .../config/TraceInstrumentationConfig.java | 2 +- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy index 6a0e14613ec..c250de9af5d 100644 --- a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy +++ b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy @@ -44,7 +44,7 @@ abstract class SqsClientTest extends VersionedNamingTestBase { // Set a service name that gets sorted early with SORT_BY_NAMES injectSysConfig(GeneralConfig.SERVICE_NAME, "A-service") injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, isDataStreamsEnabled().toString()) - injectSysConfig("sqs.body.propagation.enabled", "true") + injectSysConfig("trace.sqs.body.propagation.enabled", "true") } @Shared @@ -585,6 +585,35 @@ class SqsClientV1DataStreamsForkedTest extends SqsClientTest { client.shutdown() } + def "Data streams context not extracted from message body when message attributes are not present"() { + setup: + def client = AmazonSQSClientBuilder.standard() + .withEndpointConfiguration(endpoint) + .withCredentials(credentialsProvider) + .build() + def queueUrl = client.createQueue('somequeue').queueUrl + TEST_WRITER.clear() + + when: + injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, "false") + client.sendMessage(queueUrl, '{"Message": "sometext"}') + injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, "true") + def messages = client.receiveMessage(queueUrl).messages + messages.forEach {} + + TEST_DATA_STREAMS_WRITER.waitForGroups(1) + + then: + StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 } + + verifyAll(first) { + edgeTags == ["direction:in", "topic:somequeue", "type:sqs"] + edgeTags.size() == 3 + } + + cleanup: + client.shutdown() + } } diff --git a/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java b/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java index c20aca90e2a..7fca0156733 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java @@ -159,7 +159,7 @@ public final class TraceInstrumentationConfig { /** If set, the instrumentation will set its resource name on the local root too. */ public static final String AXIS_PROMOTE_RESOURCE_NAME = "trace.axis.promote.resource-name"; - public static final String SQS_BODY_PROPAGATION_ENABLED = "sqs.body.propagation.enabled"; + public static final String SQS_BODY_PROPAGATION_ENABLED = "trace.sqs.body.propagation.enabled"; private TraceInstrumentationConfig() {} } From e8d3d4e0be5441f0e6d2391d758661adb8d054d6 Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Tue, 10 Dec 2024 08:56:48 -0700 Subject: [PATCH 5/7] declare static object mapper --- .../instrumentation/aws/v1/sqs/MessageExtractAdapter.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageExtractAdapter.java b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageExtractAdapter.java index b097ae14e44..fd72ad16d09 100644 --- a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageExtractAdapter.java +++ b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageExtractAdapter.java @@ -16,8 +16,8 @@ public final class MessageExtractAdapter implements AgentPropagation.ContextVisitor { private static final Logger log = LoggerFactory.getLogger(MessageExtractAdapter.class); - public static final MessageExtractAdapter GETTER = new MessageExtractAdapter(); + private static final ObjectMapper MAPPER = new ObjectMapper(); public static final boolean SHOULD_EXTRACT_CONTEXT_FROM_BODY = Config.get().isSqsBodyPropagationEnabled(); @@ -47,10 +47,8 @@ public void forEachKey(Message carrier, AgentPropagation.KeyClassifier classifie public void forEachKeyInBody(String body, AgentPropagation.KeyClassifier classifier) throws IOException { - ObjectMapper objectMapper = new ObjectMapper(); - // Parse the JSON string into a JsonNode - JsonNode rootNode = objectMapper.readTree(body); + JsonNode rootNode = MAPPER.readTree(body); // Navigate to MessageAttributes._datadog JsonNode messageAttributes = rootNode.path("MessageAttributes").path("_datadog"); From af87d964c4ecdaeeabcfcd5c0888d71bafcdb020 Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Tue, 10 Dec 2024 08:59:29 -0700 Subject: [PATCH 6/7] add test for non json message --- .../src/test/groovy/SqsClientTest.groovy | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy index c250de9af5d..64fa37fab99 100644 --- a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy +++ b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy @@ -614,6 +614,37 @@ class SqsClientV1DataStreamsForkedTest extends SqsClientTest { cleanup: client.shutdown() } + + + def "Data streams context not extracted from message body when message is not a Json"() { + setup: + def client = AmazonSQSClientBuilder.standard() + .withEndpointConfiguration(endpoint) + .withCredentials(credentialsProvider) + .build() + def queueUrl = client.createQueue('somequeue').queueUrl + TEST_WRITER.clear() + + when: + injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, "false") + client.sendMessage(queueUrl, '{"Message": "not a json"') + injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, "true") + def messages = client.receiveMessage(queueUrl).messages + messages.forEach {} + + TEST_DATA_STREAMS_WRITER.waitForGroups(1) + + then: + StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 } + + verifyAll(first) { + edgeTags == ["direction:in", "topic:somequeue", "type:sqs"] + edgeTags.size() == 3 + } + + cleanup: + client.shutdown() + } } From bde98d2d9c72159f507620ca8c36083183ea13b5 Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Tue, 10 Dec 2024 09:27:58 -0700 Subject: [PATCH 7/7] remove log & widen exception --- .../instrumentation/aws/v1/sqs/MessageExtractAdapter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageExtractAdapter.java b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageExtractAdapter.java index fd72ad16d09..fe7ba0c5558 100644 --- a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageExtractAdapter.java +++ b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageExtractAdapter.java @@ -39,8 +39,8 @@ public void forEachKey(Message carrier, AgentPropagation.KeyClassifier classifie } else if (SHOULD_EXTRACT_CONTEXT_FROM_BODY) { try { this.forEachKeyInBody(carrier.getBody(), classifier); - } catch (IOException e) { - log.warn("Error extracting Datadog context from SQS message body", e); + } catch (Throwable e) { + log.debug("Error extracting Datadog context from SQS message body", e); } } }