Skip to content

Commit

Permalink
Merge pull request #7620 from DataDog/chris.agocs/parse_128_bit_trace…
Browse files Browse the repository at this point in the history
…_id_from_lambda_extension

Parse 128 bit trace Id retuned by lambda extension
  • Loading branch information
agocs authored Sep 27, 2024
2 parents aa1e54c + 0ba15ad commit 3fbcd3e
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -956,7 +956,7 @@ public AgentSpan blackholeSpan() {

@Override
public AgentSpan.Context notifyExtensionStart(Object event) {
return LambdaHandler.notifyStartInvocation(event, propagationTagsFactory);
return LambdaHandler.notifyStartInvocation(this, event);
}

@Override
Expand Down
48 changes: 12 additions & 36 deletions dd-trace-core/src/main/java/datadog/trace/lambda/LambdaHandler.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
package datadog.trace.lambda;

import static datadog.trace.api.TracePropagationStyle.DATADOG;
import static java.util.concurrent.TimeUnit.SECONDS;

import com.squareup.moshi.JsonAdapter;
import com.squareup.moshi.Moshi;
import datadog.trace.api.DDSpanId;
import datadog.trace.api.DDTags;
import datadog.trace.api.DDTraceId;
import datadog.trace.api.sampling.PrioritySampling;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.core.propagation.ExtractedContext;
import datadog.trace.core.propagation.PropagationTags;
import datadog.trace.core.CoreTracer;
import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
Expand Down Expand Up @@ -44,7 +40,6 @@ public class LambdaHandler {
private static final String DATADOG_INVOCATION_ERROR_MSG = "x-datadog-invocation-error-msg";
private static final String DATADOG_INVOCATION_ERROR_TYPE = "x-datadog-invocation-error-type";
private static final String DATADOG_INVOCATION_ERROR_STACK = "x-datadog-invocation-error-stack";
private static final String DATADOG_TAGS_KEY = "x-datadog-tags";

private static final String START_INVOCATION = "/lambda/start-invocation";
private static final String END_INVOCATION = "/lambda/end-invocation";
Expand Down Expand Up @@ -73,8 +68,7 @@ public class LambdaHandler {

private static String EXTENSION_BASE_URL = "http://127.0.0.1:8124";

public static AgentSpan.Context notifyStartInvocation(
Object event, PropagationTags.Factory propagationTagsFactory) {
public static AgentSpan.Context notifyStartInvocation(CoreTracer tracer, Object event) {
RequestBody body = RequestBody.create(jsonMediaType, writeValueAsString(event));
try (Response response =
HTTP_CLIENT
Expand All @@ -86,33 +80,16 @@ public static AgentSpan.Context notifyStartInvocation(
.build())
.execute()) {
if (response.isSuccessful()) {
final String traceID = response.headers().get(DATADOG_TRACE_ID);
final String priority = response.headers().get(DATADOG_SAMPLING_PRIORITY);
if (null != traceID && null != priority) {
int samplingPriority = PrioritySampling.UNSET;
try {
samplingPriority = Integer.parseInt(priority);
} catch (final NumberFormatException ignored) {
log.warn("could not read the sampling priority, defaulting to UNSET");
}
log.debug(
"notifyStartInvocation success, found traceID = {} and samplingPriority = {}",
traceID,
samplingPriority);
PropagationTags propagationTags =
propagationTagsFactory.fromHeaderValue(
PropagationTags.HeaderType.DATADOG, response.headers().get(DATADOG_TAGS_KEY));
return new ExtractedContext(
DDTraceId.from(traceID),
DDSpanId.ZERO,
samplingPriority,
null,
propagationTags,
DATADOG);
} else {
log.debug(
"could not find traceID or sampling priority in notifyStartInvocation, not injecting the context");
}

return tracer
.propagate()
.extract(
response.headers(),
(carrier, classifier) -> {
for (String headerName : carrier.names()) {
classifier.accept(headerName, carrier.get(headerName));
}
});
}
} catch (Throwable ignored) {
log.error("could not reach the extension");
Expand All @@ -121,7 +98,6 @@ public static AgentSpan.Context notifyStartInvocation(
}

public static boolean notifyEndInvocation(AgentSpan span, Object result, boolean isError) {

if (null == span || null == span.getSamplingPriority()) {
log.error(
"could not notify the extension as the lambda span is null or no sampling priority has been found");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package datadog.trace.lambda

import datadog.trace.api.Config
import datadog.trace.api.DDSpanId
import datadog.trace.api.DDTags
import datadog.trace.api.DDTraceId
import datadog.trace.core.propagation.PropagationTags
import datadog.trace.core.CoreTracer
import datadog.trace.core.test.DDCoreSpecification
import datadog.trace.core.DDSpan
import com.amazonaws.services.lambda.runtime.events.SQSEvent
Expand All @@ -30,8 +29,7 @@ class LambdaHandlerTest extends DDCoreSpecification {

def "test start invocation success"() {
given:
Config config = Mock(Config)
config.getxDatadogTagsMaxLength() >> 512
CoreTracer ct = tracerBuilder().build()

def server = httpServer {
handlers {
Expand All @@ -47,24 +45,58 @@ class LambdaHandlerTest extends DDCoreSpecification {
LambdaHandler.setExtensionBaseUrl(server.address.toString())

when:
def objTest = LambdaHandler.notifyStartInvocation(obj, PropagationTags.factory(config))
def objTest = LambdaHandler.notifyStartInvocation(ct, obj)

then:
objTest.getTraceId().toString() == traceId
objTest.getSamplingPriority() == samplingPriority

cleanup:
server.close()
ct.close()

where:
traceId | samplingPriority | obj
"1234" | 2 | new TestObject()
}

def "test start invocation with 128 bit trace ID"() {
given:
CoreTracer ct = tracerBuilder().build()

def server = httpServer {
handlers {
post("/lambda/start-invocation") {
response
.status(200)
.addHeader("x-datadog-trace-id", "5744042798732701615")
.addHeader("x-datadog-sampling-priority", "2")
.addHeader("x-datadog-tags", "_dd.p.tid=1914fe7789eb32be")
.send()
}
}
}
LambdaHandler.setExtensionBaseUrl(server.address.toString())

when:
def objTest = LambdaHandler.notifyStartInvocation(ct, obj)

then:
objTest.getTraceId().toHexString() == traceId
objTest.getSamplingPriority() == samplingPriority

cleanup:
server.close()
ct.close()

where:
traceId | samplingPriority | obj
"1914fe7789eb32be4fb6f07e011a6faf" | 2 | new TestObject()
}

def "test start invocation failure"() {
given:
Config config = Mock(Config)
config.getxDatadogTagsMaxLength() >> 512
CoreTracer ct = tracerBuilder().build()

def server = httpServer {
handlers {
Expand All @@ -78,13 +110,14 @@ class LambdaHandlerTest extends DDCoreSpecification {
LambdaHandler.setExtensionBaseUrl(server.address.toString())

when:
def objTest = LambdaHandler.notifyStartInvocation(obj, PropagationTags.factory(config))
def objTest = LambdaHandler.notifyStartInvocation(ct, obj)

then:
objTest == expected

cleanup:
server.close()
ct.close()

where:
expected | obj
Expand Down

0 comments on commit 3fbcd3e

Please sign in to comment.