diff --git a/dd-trace-core/src/main/java/datadog/trace/core/PendingTrace.java b/dd-trace-core/src/main/java/datadog/trace/core/PendingTrace.java index 0a947999a68..13c2e48b0c8 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/PendingTrace.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/PendingTrace.java @@ -448,4 +448,8 @@ public static long getDurationNano(CoreSpan span) { PendingTrace trace = (PendingTrace) traceCollector; return trace.getLastWriteTime() - span.getStartTime(); } + + public Iterable getSpans() { + return spans; + } } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java b/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java index ac48eb96337..8596a6e8d95 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java @@ -3,13 +3,21 @@ import static datadog.trace.util.AgentThreadFactory.AgentThread.TRACE_MONITOR; import static datadog.trace.util.AgentThreadFactory.THREAD_JOIN_TIMOUT_MS; import static datadog.trace.util.AgentThreadFactory.newAgentThread; +import static java.util.Comparator.comparingLong; import datadog.communication.ddagent.SharedCommunicationObjects; import datadog.trace.api.Config; +import datadog.trace.api.flare.TracerFlare; import datadog.trace.api.time.TimeSource; import datadog.trace.core.monitor.HealthMetrics; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; +import java.util.zip.ZipOutputStream; import org.jctools.queues.MessagePassingQueue; import org.jctools.queues.MpscBlockingConsumerArrayQueue; import org.slf4j.Logger; @@ -54,6 +62,7 @@ private static class DelayingPendingTraceBuffer extends PendingTraceBuffer { private volatile boolean closed = false; private final AtomicInteger flushCounter = new AtomicInteger(0); + private final AtomicInteger dumpCounter = new AtomicInteger(0); private final LongRunningTracesTracker runningTracesTracker; @@ -78,6 +87,7 @@ public void enqueue(Element pendingTrace) { @Override public void start() { + TracerFlare.addReporter(new TracerDump(this)); worker.start(); } @@ -130,6 +140,27 @@ public void accept(Element pendingTrace) { } } + private static final class DumpDrain + implements MessagePassingQueue.Consumer, MessagePassingQueue.Supplier { + private static final DumpDrain DUMP_DRAIN = new DumpDrain(); + private static final List DATA = new ArrayList<>(); + private int index = 0; + + @Override + public void accept(Element pendingTrace) { + DATA.add(pendingTrace); + } + + @Override + public Element get() { + if (index < DATA.size()) { + return DATA.get(index++); + } + return null; // Should never reach here or else queue may break according to + // MessagePassingQueue docs + } + } + private static final class FlushElement implements Element { static FlushElement FLUSH_ELEMENT = new FlushElement(); @@ -162,6 +193,38 @@ public boolean writeOnBufferFull() { } } + private static final class DumpElement implements Element { + static DumpElement DUMP_ELEMENT = new DumpElement(); + + @Override + public long oldestFinishedTime() { + return 0; + } + + @Override + public boolean lastReferencedNanosAgo(long nanos) { + return false; + } + + @Override + public void write() {} + + @Override + public DDSpan getRootSpan() { + return null; + } + + @Override + public boolean setEnqueued(boolean enqueued) { + return true; + } + + @Override + public boolean writeOnBufferFull() { + return true; + } + } + private final class Worker implements Runnable { @Override @@ -182,11 +245,18 @@ public void run() { if (pendingTrace instanceof FlushElement) { // Since this is an MPSC queue, the drain needs to be called on the consumer thread - queue.drain(WriteDrain.WRITE_DRAIN); + queue.drain(WriteDrain.WRITE_DRAIN, 50); flushCounter.incrementAndGet(); continue; } + if (pendingTrace instanceof DumpElement) { + queue.drain(DumpDrain.DUMP_DRAIN); + queue.fill(DumpDrain.DUMP_DRAIN, DumpDrain.DATA.size()); + dumpCounter.incrementAndGet(); + continue; + } + // The element is no longer in the queue pendingTrace.setEnqueued(false); @@ -208,7 +278,7 @@ public void run() { // Trace has been unmodified long enough, go ahead and write whatever is finished. pendingTrace.write(); } else { - // Trace is too new. Requeue it and sleep to avoid a hot loop. + // Trace is too new. Requeue it and sleep to avoid a hot loop. enqueue(pendingTrace); Thread.sleep(SLEEP_TIME_MS); } @@ -277,4 +347,59 @@ public static PendingTraceBuffer discarding() { public abstract void flush(); public abstract void enqueue(Element pendingTrace); + + private static class TracerDump implements TracerFlare.Reporter { + private static final Comparator TRACE_BY_START_TIME = + comparingLong(trace -> trace.getRootSpan().getStartTime()); + private static final Predicate NOT_PENDING_TRACE = + element -> !(element instanceof PendingTrace); + private final DelayingPendingTraceBuffer buffer; + + private TracerDump(DelayingPendingTraceBuffer buffer) { + this.buffer = buffer; + } + + @Override + public void prepareForFlare() { + if (buffer.worker.isAlive()) { + int count = buffer.dumpCounter.get(); + int loop = 1; + boolean signaled = buffer.queue.offer(DelayingPendingTraceBuffer.DumpElement.DUMP_ELEMENT); + while (!buffer.closed && !signaled) { + buffer.yieldOrSleep(loop++); + signaled = buffer.queue.offer(DelayingPendingTraceBuffer.DumpElement.DUMP_ELEMENT); + } + int newCount = buffer.dumpCounter.get(); + while (!buffer.closed && count >= newCount) { + buffer.yieldOrSleep(loop++); + newCount = buffer.dumpCounter.get(); + } + } + } + + @Override + public void addReportToFlare(ZipOutputStream zip) throws IOException { + TracerFlare.addText(zip, "trace_dump.txt", getDumpText()); + } + + private String getDumpText() { + // Removing elements from the drain that are not instances of PendingTrace + DelayingPendingTraceBuffer.DumpDrain.DATA.removeIf(NOT_PENDING_TRACE); + // Storing oldest traces first + DelayingPendingTraceBuffer.DumpDrain.DATA.sort((TRACE_BY_START_TIME).reversed()); + + StringBuilder dumpText = new StringBuilder(); + for (Element e : DelayingPendingTraceBuffer.DumpDrain.DATA) { + if (e instanceof PendingTrace) { + PendingTrace trace = (PendingTrace) e; + for (DDSpan span : trace.getSpans()) { + dumpText.append(span.toString()).append('\n'); + } + } + } + // Releasing memory used for ArrayList in drain + DelayingPendingTraceBuffer.DumpDrain.DATA.clear(); + return dumpText.toString(); + } + } } diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/LongRunningTracesTrackerTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/LongRunningTracesTrackerTest.groovy index c1a2e139b7d..b15574f76ee 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/LongRunningTracesTrackerTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/LongRunningTracesTrackerTest.groovy @@ -168,7 +168,7 @@ class LongRunningTracesTrackerTest extends DDSpecification { PendingTrace newTraceToTrack() { PendingTrace trace = factory.create(DDTraceId.ONE) - PendingTraceBufferTest::newSpanOf(trace, PrioritySampling.SAMPLER_KEEP) + PendingTraceBufferTest::newSpanOf(trace, PrioritySampling.SAMPLER_KEEP, 0) return trace } diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceBufferTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceBufferTest.groovy index c9062be094a..a927d7b14a1 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceBufferTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceBufferTest.groovy @@ -5,6 +5,7 @@ import datadog.communication.monitor.Monitoring import datadog.trace.SamplingPriorityMetadataChecker import datadog.trace.api.DDSpanId import datadog.trace.api.DDTraceId +import datadog.trace.api.flare.TracerFlare import datadog.trace.api.sampling.PrioritySampling import datadog.trace.api.time.SystemTimeSource import datadog.trace.bootstrap.instrumentation.api.AgentTracer.NoopPathwayContext @@ -20,8 +21,13 @@ import spock.util.concurrent.PollingConditions import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger +import java.util.zip.ZipInputStream +import java.util.zip.ZipOutputStream +import static datadog.trace.api.sampling.PrioritySampling.UNSET +import static datadog.trace.api.sampling.PrioritySampling.USER_KEEP import static datadog.trace.core.PendingTraceBuffer.BUFFER_SIZE +import static java.nio.charset.StandardCharsets.UTF_8 @Timeout(5) class PendingTraceBufferTest extends DDSpecification { @@ -143,7 +149,7 @@ class PendingTraceBufferTest extends DDSpecification { def "priority sampling is always sent"() { setup: - def parent = addContinuation(newSpanOf(factory.create(DDTraceId.ONE), PrioritySampling.USER_KEEP)) + def parent = addContinuation(newSpanOf(factory.create(DDTraceId.ONE), USER_KEEP, 0)) def metadataChecker = new SamplingPriorityMetadataChecker() when: "Fill the buffer - Only children - Priority taken from root" @@ -443,6 +449,36 @@ class PendingTraceBufferTest extends DDSpecification { } } + def "testing tracer flare dump"() { + setup: + TracerFlare.addReporter {} // exercises default methods + def dumpReporter = Mock(PendingTraceBuffer.TracerDump) + TracerFlare.addReporter(dumpReporter) + def trace = factory.create(DDTraceId.ONE) + def parent = newSpanOf(trace, UNSET, System.currentTimeMillis() * 1000) + def child = newSpanOf(parent) + + when: + parent.finish() + buffer.start() + def entries = buildAndExtractZip() + + then: + 1 * dumpReporter.prepareForFlare() + 1 * dumpReporter.addReportToFlare(_) + 1 * dumpReporter.cleanupAfterFlare() + entries.size() == 1 + (entries["trace_dump.txt"] as String).startsWith("DDSpan [ t_id=1, s_id=1, p_id=0 ]") // TODO + + then: + child.finish() + + then: + trace.size() == 0 + trace.pendingReferenceCount == 0 + } + + def addContinuation(DDSpan span) { def scope = scopeManager.activate(span, ScopeSource.INSTRUMENTATION, true) continuations << scope.capture() @@ -451,10 +487,10 @@ class PendingTraceBufferTest extends DDSpecification { } static DDSpan newSpanOf(PendingTrace trace) { - return newSpanOf(trace, PrioritySampling.UNSET) + return newSpanOf(trace, UNSET, 0) } - static DDSpan newSpanOf(PendingTrace trace, int samplingPriority) { + static DDSpan newSpanOf(PendingTrace trace, int samplingPriority, long timestampMicro) { def context = new DDSpanContext( trace.traceId, 1, @@ -475,7 +511,7 @@ class PendingTraceBufferTest extends DDSpecification { NoopPathwayContext.INSTANCE, false, PropagationTags.factory().empty()) - return DDSpan.create("test", 0, context, null) + return DDSpan.create("test", timestampMicro, context, null) } static DDSpan newSpanOf(DDSpan parent) { @@ -488,7 +524,7 @@ class PendingTraceBufferTest extends DDSpecification { "fakeService", "fakeOperation", "fakeResource", - PrioritySampling.UNSET, + UNSET, null, Collections.emptyMap(), false, @@ -502,4 +538,27 @@ class PendingTraceBufferTest extends DDSpecification { PropagationTags.factory().empty()) return DDSpan.create("test", 0, context, null) } + + def buildAndExtractZip() { + TracerFlare.prepareForFlare() + def out = new ByteArrayOutputStream() + try (ZipOutputStream zip = new ZipOutputStream(out)) { + TracerFlare.addReportsToFlare(zip) + } finally { + TracerFlare.cleanupAfterFlare() + } + + def entries = [:] + + def zip = new ZipInputStream(new ByteArrayInputStream(out.toByteArray())) + def entry + while (entry = zip.nextEntry) { + def bytes = new ByteArrayOutputStream() + bytes << zip + entries.put(entry.name, entry.name.endsWith(".bin") + ? bytes.toByteArray() : new String(bytes.toByteArray(), UTF_8)) + } + + return entries + } }