Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pending traces to tracer flare #8053

Draft
wants to merge 17 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -448,4 +448,8 @@ public static long getDurationNano(CoreSpan<?> span) {
PendingTrace trace = (PendingTrace) traceCollector;
return trace.getLastWriteTime() - span.getStartTime();
}

public Iterable<DDSpan> getSpans() {
return spans;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@

import datadog.communication.ddagent.SharedCommunicationObjects;
import datadog.trace.api.Config;
import datadog.trace.api.flare.TracerFlare;
import datadog.trace.api.time.TimeSource;
import datadog.trace.core.monitor.HealthMetrics;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.ZipOutputStream;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.MpscBlockingConsumerArrayQueue;
import org.slf4j.Logger;
Expand Down Expand Up @@ -54,6 +59,7 @@ private static class DelayingPendingTraceBuffer extends PendingTraceBuffer {

private volatile boolean closed = false;
private final AtomicInteger flushCounter = new AtomicInteger(0);
private final AtomicInteger dumpCounter = new AtomicInteger(0);

private final LongRunningTracesTracker runningTracesTracker;

Expand All @@ -78,6 +84,7 @@ public void enqueue(Element pendingTrace) {

@Override
public void start() {
TracerFlare.addReporter(new TracerDump(this));
worker.start();
}

Expand Down Expand Up @@ -130,6 +137,27 @@ public void accept(Element pendingTrace) {
}
}

private static final class DumpDrain
implements MessagePassingQueue.Consumer<Element>, MessagePassingQueue.Supplier<Element> {
private static final DumpDrain DUMP_DRAIN = new DumpDrain();
private static final List<Element> data = new ArrayList<>();
private int index = 0;

@Override
public void accept(Element pendingTrace) {
data.add(pendingTrace);
mhlidd marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public Element get() {
if (index < data.size()) {
return data.get(index++);
}
return null; // Should never reach here or else queue may break according to
// MessagePassingQueue docs
}
}

private static final class FlushElement implements Element {
static FlushElement FLUSH_ELEMENT = new FlushElement();

Expand Down Expand Up @@ -162,6 +190,38 @@ public boolean writeOnBufferFull() {
}
}

private static final class DumpElement implements Element {
static DumpElement DUMP_ELEMENT = new DumpElement();

@Override
public long oldestFinishedTime() {
return 0;
}

@Override
public boolean lastReferencedNanosAgo(long nanos) {
return false;
}

@Override
public void write() {}

@Override
public DDSpan getRootSpan() {
return null;
}

@Override
public boolean setEnqueued(boolean enqueued) {
return true;
}

@Override
public boolean writeOnBufferFull() {
return true;
}
}

private final class Worker implements Runnable {

@Override
Expand All @@ -182,11 +242,18 @@ public void run() {

if (pendingTrace instanceof FlushElement) {
// Since this is an MPSC queue, the drain needs to be called on the consumer thread
queue.drain(WriteDrain.WRITE_DRAIN);
queue.drain(WriteDrain.WRITE_DRAIN, 50);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leftover?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I was trying to use the MPSC drain method that takes in a limit to the number of elements that get drained. Do you think that I should avoid doing this and using the Java stream API instead?

flushCounter.incrementAndGet();
continue;
}

if (pendingTrace instanceof DumpElement) {
queue.drain(DumpDrain.DUMP_DRAIN);
queue.fill(DumpDrain.DUMP_DRAIN, DumpDrain.data.size());
dumpCounter.incrementAndGet();
continue;
}

// The element is no longer in the queue
pendingTrace.setEnqueued(false);

Expand Down Expand Up @@ -235,6 +302,55 @@ public DelayingPendingTraceBuffer(
config, bufferSize, sharedCommunicationObjects, healthMetrics)
: null;
}

static class TracerDump implements TracerFlare.Reporter {

private final DelayingPendingTraceBuffer buffer;

public TracerDump(DelayingPendingTraceBuffer buffer) {
this.buffer = buffer;
}

@Override
public void addReportToFlare(ZipOutputStream zip) throws IOException {
TracerFlare.addText(zip, "trace_dump.txt", getDumpText());
}

private String getDumpText() {
if (buffer.worker.isAlive()) {
int count = buffer.dumpCounter.get();
int loop = 1;
boolean signaled = buffer.queue.offer(DumpElement.DUMP_ELEMENT);
while (!buffer.closed && !signaled) {
buffer.yieldOrSleep(loop++);
signaled = buffer.queue.offer(DumpElement.DUMP_ELEMENT);
}
int newCount = buffer.dumpCounter.get();
while (!buffer.closed && count >= newCount) {
buffer.yieldOrSleep(loop++);
newCount = buffer.dumpCounter.get();
}
}

DumpDrain.data.sort(
(span1, span2) ->
mhlidd marked this conversation as resolved.
Show resolved Hide resolved
Long.compare(
span1.getRootSpan().getStartTime(),
mhlidd marked this conversation as resolved.
Show resolved Hide resolved
span2.getRootSpan().getStartTime())); // Sort by oldest trace first

StringBuilder dumpText = new StringBuilder();
for (Element e : DumpDrain.data) {
if (e instanceof PendingTrace) {
PendingTrace trace = (PendingTrace) e;
for (DDSpan span : trace.getSpans()) {
dumpText.append(span.toString()).append('\n');
}
}
}
DumpDrain.data.clear(); // releasing memory used for ArrayList in drain
return dumpText.toString();
}
}
}

static class DiscardingPendingTraceBuffer extends PendingTraceBuffer {
Expand Down
Loading