Skip to content

Commit

Permalink
Merge pull request #7798 from DataDog/kr-igor/dsm-service-name-override
Browse files Browse the repository at this point in the history
Support service name overrides for DSM
  • Loading branch information
kr-igor authored Nov 18, 2024
2 parents c4bc3cc + 6856dc5 commit 6181783
Show file tree
Hide file tree
Showing 16 changed files with 291 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ public static void onEnter(@Advice.Argument(value = 0) int estimatedPayloadSize)
saved.getTimestampNanos(),
saved.getPathwayLatencyNano(),
saved.getEdgeLatencyNano(),
estimatedPayloadSize);
estimatedPayloadSize,
saved.getServiceNameOverride());
// then send the point
AgentTracer.get().getDataStreamsMonitoring().add(updated);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ public static void onEnter(@Advice.Argument(value = 0) int estimatedPayloadSize)
saved.getTimestampNanos(),
saved.getPathwayLatencyNano(),
saved.getEdgeLatencyNano(),
estimatedPayloadSize);
estimatedPayloadSize,
saved.getServiceNameOverride());
// then send the point
AgentTracer.get().getDataStreamsMonitoring().add(updated);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ class RecordingDatastreamsPayloadWriter implements DatastreamsPayloadWriter {
@SuppressWarnings('UnusedPrivateField')
private final Set<String> backlogs = []

private final Set<String> serviceNameOverrides = []

@Override
synchronized void writePayload(Collection<StatsBucket> data) {
synchronized void writePayload(Collection<StatsBucket> data, String serviceNameOverride) {
log.info("payload written - {}", data)
serviceNameOverrides.add(serviceNameOverride)
this.@payloads.addAll(data)
data.each { this.@groups.addAll(it.groups) }
for (StatsBucket bucket : data) {
Expand All @@ -32,6 +35,10 @@ class RecordingDatastreamsPayloadWriter implements DatastreamsPayloadWriter {
}
}

synchronized List<String> getServices() {
Collections.unmodifiableList(new ArrayList<>(this.@serviceNameOverrides))
}

synchronized List<StatsBucket> getPayloads() {
Collections.unmodifiableList(new ArrayList<>(this.@payloads))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@ public class DataStreamContextExtractor implements HttpCodec.Extractor {
private final TimeSource timeSource;
private final Supplier<TraceConfig> traceConfigSupplier;
private final long hashOfKnownTags;
private final String serviceNameOverride;

public DataStreamContextExtractor(
HttpCodec.Extractor delegate,
TimeSource timeSource,
Supplier<TraceConfig> traceConfigSupplier,
long hashOfKnownTags) {
long hashOfKnownTags,
String serviceNameOverride) {
this.delegate = delegate;
this.timeSource = timeSource;
this.traceConfigSupplier = traceConfigSupplier;
this.hashOfKnownTags = hashOfKnownTags;
this.serviceNameOverride = serviceNameOverride;
}

@Override
Expand All @@ -37,15 +40,17 @@ public <C> TagContext extract(C carrier, AgentPropagation.ContextVisitor<C> gett

if (shouldExtractPathwayContext) {
DefaultPathwayContext pathwayContext =
DefaultPathwayContext.extract(carrier, getter, this.timeSource, this.hashOfKnownTags);
DefaultPathwayContext.extract(
carrier, getter, this.timeSource, this.hashOfKnownTags, serviceNameOverride);

extracted.withPathwayContext(pathwayContext);
}

return extracted;
} else if (traceConfigSupplier.get().isDataStreamsEnabled()) {
DefaultPathwayContext pathwayContext =
DefaultPathwayContext.extract(carrier, getter, this.timeSource, this.hashOfKnownTags);
DefaultPathwayContext.extract(
carrier, getter, this.timeSource, this.hashOfKnownTags, serviceNameOverride);

if (pathwayContext != null) {
extracted = new TagContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
import java.util.Collection;

public interface DatastreamsPayloadWriter {
void writePayload(Collection<StatsBucket> data);
void writePayload(Collection<StatsBucket> data, String serviceNameOverride);
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -54,11 +55,11 @@ public class DefaultDataStreamsMonitoring implements DataStreamsMonitoring, Even
static final long FEATURE_CHECK_INTERVAL_NANOS = TimeUnit.MINUTES.toNanos(5);

private static final StatsPoint REPORT =
new StatsPoint(Collections.emptyList(), 0, 0, 0, 0, 0, 0, 0);
new StatsPoint(Collections.emptyList(), 0, 0, 0, 0, 0, 0, 0, null);
private static final StatsPoint POISON_PILL =
new StatsPoint(Collections.emptyList(), 0, 0, 0, 0, 0, 0, 0);
new StatsPoint(Collections.emptyList(), 0, 0, 0, 0, 0, 0, 0, null);

private final Map<Long, StatsBucket> timeToBucket = new HashMap<>();
private final Map<Long, Map<String, StatsBucket>> timeToBucket = new HashMap<>();
private final MpscArrayQueue<InboxItem> inbox = new MpscArrayQueue<>(1024);
private final DatastreamsPayloadWriter payloadWriter;
private final DDAgentFeaturesDiscovery features;
Expand All @@ -74,6 +75,7 @@ public class DefaultDataStreamsMonitoring implements DataStreamsMonitoring, Even
private volatile boolean agentSupportsDataStreams = false;
private volatile boolean configSupportsDataStreams = false;
private final ConcurrentHashMap<String, SchemaSampler> schemaSamplers;
private static final ThreadLocal<String> serviceNameOverride = new ThreadLocal<>();

public DefaultDataStreamsMonitoring(
Config config,
Expand Down Expand Up @@ -184,10 +186,29 @@ public void setProduceCheckpoint(String type, String target) {
setProduceCheckpoint(type, target, DataStreamsContextCarrier.NoOp.INSTANCE, false);
}

@Override
public void setThreadServiceName(String serviceName) {
if (serviceName == null) {
clearThreadServiceName();
return;
}

serviceNameOverride.set(serviceName);
}

@Override
public void clearThreadServiceName() {
serviceNameOverride.remove();
}

private static String getThreadServiceName() {
return serviceNameOverride.get();
}

@Override
public PathwayContext newPathwayContext() {
if (configSupportsDataStreams) {
return new DefaultPathwayContext(timeSource, hashOfKnownTags);
return new DefaultPathwayContext(timeSource, hashOfKnownTags, getThreadServiceName());
} else {
return AgentTracer.NoopPathwayContext.INSTANCE;
}
Expand All @@ -196,7 +217,7 @@ public PathwayContext newPathwayContext() {
@Override
public HttpCodec.Extractor extractor(HttpCodec.Extractor delegate) {
return new DataStreamContextExtractor(
delegate, timeSource, traceConfigSupplier, hashOfKnownTags);
delegate, timeSource, traceConfigSupplier, hashOfKnownTags, getThreadServiceName());
}

@Override
Expand All @@ -212,7 +233,8 @@ public void mergePathwayContextIntoSpan(AgentSpan span, DataStreamsContextCarrie
carrier,
DataStreamsContextCarrierAdapter.INSTANCE,
this.timeSource,
this.hashOfKnownTags);
this.hashOfKnownTags,
getThreadServiceName());
((DDSpan) span).context().mergePathwayContext(pathwayContext);
}
}
Expand All @@ -226,7 +248,7 @@ public void trackBacklog(LinkedHashMap<String, String> sortedTags, long value) {
}
tags.add(tag);
}
inbox.offer(new Backlog(tags, value, timeSource.getCurrentTimeNanos()));
inbox.offer(new Backlog(tags, value, timeSource.getCurrentTimeNanos(), getThreadServiceName()));
}

@Override
Expand Down Expand Up @@ -308,6 +330,15 @@ public void close() {
}

private class InboxProcessor implements Runnable {

private StatsBucket getStatsBucket(final long timestamp, final String serviceNameOverride) {
long bucket = currentBucket(timestamp);
Map<String, StatsBucket> statsBucketMap =
timeToBucket.computeIfAbsent(bucket, startTime -> new HashMap<>(1));
return statsBucketMap.computeIfAbsent(
serviceNameOverride, s -> new StatsBucket(bucket, bucketDurationNanos));
}

@Override
public void run() {
Thread currentThread = Thread.currentThread();
Expand Down Expand Up @@ -335,17 +366,14 @@ public void run() {
} else if (supportsDataStreams) {
if (payload instanceof StatsPoint) {
StatsPoint statsPoint = (StatsPoint) payload;
Long bucket = currentBucket(statsPoint.getTimestampNanos());
StatsBucket statsBucket =
timeToBucket.computeIfAbsent(
bucket, startTime -> new StatsBucket(startTime, bucketDurationNanos));
getStatsBucket(
statsPoint.getTimestampNanos(), statsPoint.getServiceNameOverride());
statsBucket.addPoint(statsPoint);
} else if (payload instanceof Backlog) {
Backlog backlog = (Backlog) payload;
Long bucket = currentBucket(backlog.getTimestampNanos());
StatsBucket statsBucket =
timeToBucket.computeIfAbsent(
bucket, startTime -> new StatsBucket(startTime, bucketDurationNanos));
getStatsBucket(backlog.getTimestampNanos(), backlog.getServiceNameOverride());
statsBucket.addBacklog(backlog);
}
}
Expand All @@ -363,21 +391,32 @@ private long currentBucket(long timestampNanos) {
private void flush(long timestampNanos) {
long currentBucket = currentBucket(timestampNanos);

List<StatsBucket> includedBuckets = new ArrayList<>();
Iterator<Map.Entry<Long, StatsBucket>> mapIterator = timeToBucket.entrySet().iterator();
// stats are grouped by time buckets and service names
Map<String, List<StatsBucket>> includedBuckets = new HashMap<>();
Iterator<Map.Entry<Long, Map<String, StatsBucket>>> mapIterator =
timeToBucket.entrySet().iterator();

while (mapIterator.hasNext()) {
Map.Entry<Long, StatsBucket> entry = mapIterator.next();

Map.Entry<Long, Map<String, StatsBucket>> entry = mapIterator.next();
if (entry.getKey() < currentBucket) {
mapIterator.remove();
includedBuckets.add(entry.getValue());
for (Map.Entry<String, StatsBucket> buckets : entry.getValue().entrySet()) {
if (!includedBuckets.containsKey(buckets.getKey())) {
includedBuckets.put(buckets.getKey(), new LinkedList<>());
}

includedBuckets.get(buckets.getKey()).add(buckets.getValue());
}
}
}

if (!includedBuckets.isEmpty()) {
log.debug("Flushing {} buckets", includedBuckets.size());
payloadWriter.writePayload(includedBuckets);
for (Map.Entry<String, List<StatsBucket>> entry : includedBuckets.entrySet()) {
if (!entry.getValue().isEmpty()) {
log.debug("Flushing {} buckets ({})", entry.getValue(), entry.getKey());
payloadWriter.writePayload(entry.getValue(), entry.getKey());
}
}
}
}

Expand Down
Loading

0 comments on commit 6181783

Please sign in to comment.