Skip to content

Commit

Permalink
avoid calculate kafka message size when dsm is not enabled (#6179)
Browse files Browse the repository at this point in the history
  • Loading branch information
amarziali authored Nov 9, 2023
1 parent 80b242d commit 2e6162b
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,11 @@ protected void startNewRecordSpan(ConsumerRecord<?, ?> val) {
sortedTags.put(GROUP_TAG, group);
sortedTags.put(TOPIC_TAG, val.topic());
sortedTags.put(TYPE_TAG, "kafka");

final long payloadSize =
span.traceConfig().isDataStreamsEnabled() ? computePayloadSizeBytes(val) : 0;
AgentTracer.get()
.getDataStreamsMonitoring()
.setCheckpoint(span, sortedTags, val.timestamp(), computePayloadSizeBytes(val));
.setCheckpoint(span, sortedTags, val.timestamp(), payloadSize);
} else {
span = startSpan(operationName, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import org.apache.kafka.common.header.Headers;

public final class Utils {
private Utils() {} // prevent instanciation
private Utils() {} // prevent instantiation

// this method is used in kafka-clients and kafka-streams instrumentations
public static long computePayloadSizeBytes(ConsumerRecord<?, ?> val) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,11 @@ public static void start(
}
sortedTags.put(TOPIC_TAG, record.topic());
sortedTags.put(TYPE_TAG, "kafka");
final long payloadSize =
span.traceConfig().isDataStreamsEnabled() ? computePayloadSizeBytes(record.value) : 0;
AgentTracer.get()
.getDataStreamsMonitoring()
.setCheckpoint(
span, sortedTags, record.timestamp, computePayloadSizeBytes(record.value));
.setCheckpoint(span, sortedTags, record.timestamp, payloadSize);
} else {
span = startSpan(KAFKA_CONSUME, null);
}
Expand Down

0 comments on commit 2e6162b

Please sign in to comment.