diff --git a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageExtractAdapter.java b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageExtractAdapter.java index bea9bdd3c54..fe7ba0c5558 100644 --- a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageExtractAdapter.java +++ b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageExtractAdapter.java @@ -2,16 +2,24 @@ import com.amazonaws.services.sqs.model.Message; import com.amazonaws.services.sqs.model.MessageAttributeValue; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import datadog.trace.api.Config; import datadog.trace.bootstrap.instrumentation.api.AgentPropagation; import datadog.trace.bootstrap.instrumentation.messaging.DatadogAttributeParser; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Base64; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public final class MessageExtractAdapter implements AgentPropagation.ContextVisitor { private static final Logger log = LoggerFactory.getLogger(MessageExtractAdapter.class); - public static final MessageExtractAdapter GETTER = new MessageExtractAdapter(); + private static final ObjectMapper MAPPER = new ObjectMapper(); + public static final boolean SHOULD_EXTRACT_CONTEXT_FROM_BODY = + Config.get().isSqsBodyPropagationEnabled(); @Override public void forEachKey(Message carrier, AgentPropagation.KeyClassifier classifier) { @@ -28,6 +36,31 @@ public void forEachKey(Message carrier, AgentPropagation.KeyClassifier classifie } else if ("Binary".equals(datadog.getDataType())) { DatadogAttributeParser.forEachProperty(classifier, datadog.getBinaryValue()); } + } else if (SHOULD_EXTRACT_CONTEXT_FROM_BODY) { + try { + this.forEachKeyInBody(carrier.getBody(), classifier); + } catch (Throwable e) { + log.debug("Error extracting Datadog context from SQS message body", e); + } + } + } + + public void forEachKeyInBody(String body, AgentPropagation.KeyClassifier classifier) + throws IOException { + // Parse the JSON string into a JsonNode + JsonNode rootNode = MAPPER.readTree(body); + + // Navigate to MessageAttributes._datadog + JsonNode messageAttributes = rootNode.path("MessageAttributes").path("_datadog"); + + // Extract Value and Type + String value = messageAttributes.path("Value").asText(); + String type = messageAttributes.path("Type").asText(); + if ("String".equals(type)) { + DatadogAttributeParser.forEachProperty(classifier, value); + } else if ("Binary".equals(type)) { + ByteBuffer decodedValue = ByteBuffer.wrap(Base64.getDecoder().decode(value)); + DatadogAttributeParser.forEachProperty(classifier, decodedValue); } } diff --git a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy index af4cb4f219a..64fa37fab99 100644 --- a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy +++ b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy @@ -44,6 +44,7 @@ abstract class SqsClientTest extends VersionedNamingTestBase { // Set a service name that gets sorted early with SORT_BY_NAMES injectSysConfig(GeneralConfig.SERVICE_NAME, "A-service") injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, isDataStreamsEnabled().toString()) + injectSysConfig("trace.sqs.body.propagation.enabled", "true") } @Shared @@ -511,6 +512,22 @@ class SqsClientV0DataStreamsTest extends SqsClientTest { } class SqsClientV1DataStreamsForkedTest extends SqsClientTest { + private static final String MESSAGE_WITH_ATTRIBUTES = "{\n" + + " \"Type\" : \"Notification\",\n" + + " \"MessageId\" : \"cb337e2a-1c06-5629-86f5-21fba14fb492\",\n" + + " \"TopicArn\" : \"arn:aws:sns:us-east-1:223300679234:dsm-dev-sns-topic\",\n" + + " \"Message\" : \"Some message\",\n" + + " \"Timestamp\" : \"2024-12-10T03:52:41.662Z\",\n" + + " \"SignatureVersion\" : \"1\",\n" + + " \"Signature\" : \"ZsEewd5gNR8jLC08TenLDp5rhdBtGIdAzWk7j6fzDyUzb/t56R9SBPrNJtjsPO8Ep8v/iGs/wSFUrnm+Zh3N1duc3alR1bKTAbDlzbEBxaHsGcNwzMz14JF7bKLE+3nPIi0/kT8EuIiRevGqPtCG/NEe9oW2dOyvYZvt+L7GC0AS9L0yJp8Ag7NkgNvYbIqPeKcjj8S7WRiV95Useg0P46e5pn5FXmNKPlpIqYN28jnrTZHWUDTiO5RE7lfFcdH2tBaYSR9F/PwA1Mga5NrTxlZp/yDoOlOUFj5zXAtDDpjNTcR48jAu66Mpi1wom7Si7vc3ZsYzN2Z2ig/aUJLaNA==\",\n" + + " \"SigningCertURL\" : \"https://sns.us-east-1.amazonaws.com/SimpleNotificationService-some-pem.pem\",\n" + + " \"UnsubscribeURL\" : \"https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:7270067952343:dsm-dev-sns-topic:0d82adcc-5b42-4035-81c4-22ccd126fc41\",\n" + + " \"MessageAttributes\" : {\n" + + " \"_datadog\" : {\"Type\":\"Binary\",\"Value\":\"eyJ4LWRhdGFkb2ctdHJhY2UtaWQiOiI1ODExMzQ0MDA5MDA2NDM1Njk0IiwieC1kYXRhZG9nLXBhcmVudC1pZCI6Ijc3MjQzODMxMjg4OTMyNDAxNDAiLCJ4LWRhdGFkb2ctc2FtcGxpbmctcHJpb3JpdHkiOiIwIiwieC1kYXRhZG9nLXRhZ3MiOiJfZGQucC50aWQ9Njc1N2JiMDkwMDAwMDAwMCIsInRyYWNlcGFyZW50IjoiMDAtNjc1N2JiMDkwMDAwMDAwMDUwYTYwYTk2MWM2YzRkNmUtNmIzMjg1ODdiYWIzYjM0Yy0wMCIsInRyYWNlc3RhdGUiOiJkZD1zOjA7cDo2YjMyODU4N2JhYjNiMzRjO3QudGlkOjY3NTdiYjA5MDAwMDAwMDAiLCJkZC1wYXRod2F5LWN0eC1iYXNlNjQiOiJkdzdKcjU0VERkcjA5cFRyOVdUMDlwVHI5V1E9In0=\"}\n" + + " }\n" + + "}" + + @Override String expectedOperation(String awsService, String awsOperation) { if (awsService == "SQS") { @@ -537,6 +554,97 @@ class SqsClientV1DataStreamsForkedTest extends SqsClientTest { int version() { 1 } + + def "Data streams context extracted from message body"() { + setup: + def client = AmazonSQSClientBuilder.standard() + .withEndpointConfiguration(endpoint) + .withCredentials(credentialsProvider) + .build() + def queueUrl = client.createQueue('somequeue').queueUrl + TEST_WRITER.clear() + + when: + injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, "false") + client.sendMessage(queueUrl, MESSAGE_WITH_ATTRIBUTES) + injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, "true") + def messages = client.receiveMessage(queueUrl).messages + messages.forEach {/* consume to create message spans */ } + + TEST_DATA_STREAMS_WRITER.waitForGroups(1) + + then: + StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == -2734507826469073289 } + + verifyAll(first) { + edgeTags == ["direction:in", "topic:somequeue", "type:sqs"] + edgeTags.size() == 3 + } + + cleanup: + client.shutdown() + } + + def "Data streams context not extracted from message body when message attributes are not present"() { + setup: + def client = AmazonSQSClientBuilder.standard() + .withEndpointConfiguration(endpoint) + .withCredentials(credentialsProvider) + .build() + def queueUrl = client.createQueue('somequeue').queueUrl + TEST_WRITER.clear() + + when: + injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, "false") + client.sendMessage(queueUrl, '{"Message": "sometext"}') + injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, "true") + def messages = client.receiveMessage(queueUrl).messages + messages.forEach {} + + TEST_DATA_STREAMS_WRITER.waitForGroups(1) + + then: + StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 } + + verifyAll(first) { + edgeTags == ["direction:in", "topic:somequeue", "type:sqs"] + edgeTags.size() == 3 + } + + cleanup: + client.shutdown() + } + + + def "Data streams context not extracted from message body when message is not a Json"() { + setup: + def client = AmazonSQSClientBuilder.standard() + .withEndpointConfiguration(endpoint) + .withCredentials(credentialsProvider) + .build() + def queueUrl = client.createQueue('somequeue').queueUrl + TEST_WRITER.clear() + + when: + injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, "false") + client.sendMessage(queueUrl, '{"Message": "not a json"') + injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, "true") + def messages = client.receiveMessage(queueUrl).messages + messages.forEach {} + + TEST_DATA_STREAMS_WRITER.waitForGroups(1) + + then: + StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 } + + verifyAll(first) { + edgeTags == ["direction:in", "topic:somequeue", "type:sqs"] + edgeTags.size() == 3 + } + + cleanup: + client.shutdown() + } } diff --git a/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java b/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java index 0d81bf0ca85..7fca0156733 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java @@ -159,5 +159,7 @@ public final class TraceInstrumentationConfig { /** If set, the instrumentation will set its resource name on the local root too. */ public static final String AXIS_PROMOTE_RESOURCE_NAME = "trace.axis.promote.resource-name"; + public static final String SQS_BODY_PROPAGATION_ENABLED = "trace.sqs.body.propagation.enabled"; + private TraceInstrumentationConfig() {} } diff --git a/internal-api/src/main/java/datadog/trace/api/Config.java b/internal-api/src/main/java/datadog/trace/api/Config.java index e925bd5aaf1..8b5a2cc40d8 100644 --- a/internal-api/src/main/java/datadog/trace/api/Config.java +++ b/internal-api/src/main/java/datadog/trace/api/Config.java @@ -413,6 +413,7 @@ public static String getHostName() { private final boolean awsPropagationEnabled; private final boolean sqsPropagationEnabled; + private final boolean sqsBodyPropagationEnabled; private final boolean kafkaClientPropagationEnabled; private final Set kafkaClientPropagationDisabledTopics; @@ -1571,6 +1572,7 @@ PROFILING_DATADOG_PROFILER_ENABLED, isDatadogProfilerSafeInCurrentEnvironment()) awsPropagationEnabled = isPropagationEnabled(true, "aws", "aws-sdk"); sqsPropagationEnabled = isPropagationEnabled(true, "sqs"); + sqsBodyPropagationEnabled = configProvider.getBoolean(SQS_BODY_PROPAGATION_ENABLED, false); kafkaClientPropagationEnabled = isPropagationEnabled(true, "kafka", "kafka.client"); kafkaClientPropagationDisabledTopics = @@ -3048,6 +3050,10 @@ public boolean isSqsPropagationEnabled() { return sqsPropagationEnabled; } + public boolean isSqsBodyPropagationEnabled() { + return sqsBodyPropagationEnabled; + } + public boolean isKafkaClientPropagationEnabled() { return kafkaClientPropagationEnabled; }