Skip to content

Commit

Permalink
Added SpanProcessor OnEnding callback (#6367)
Browse files Browse the repository at this point in the history
Co-authored-by: jack-berg <34418638+jack-berg@users.noreply.github.com>
  • Loading branch information
JonasKunz and jack-berg authored Sep 4, 2024
1 parent bc2fad4 commit 09de4bd
Show file tree
Hide file tree
Showing 6 changed files with 313 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.internal.ExtendedSpanProcessor;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
Expand All @@ -16,8 +17,9 @@
* Implementation of the {@code SpanProcessor} that simply forwards all received events to a list of
* {@code SpanProcessor}s.
*/
final class MultiSpanProcessor implements SpanProcessor {
final class MultiSpanProcessor implements ExtendedSpanProcessor {
private final List<SpanProcessor> spanProcessorsStart;
private final List<ExtendedSpanProcessor> spanProcessorsEnding;
private final List<SpanProcessor> spanProcessorsEnd;
private final List<SpanProcessor> spanProcessorsAll;
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
Expand Down Expand Up @@ -58,6 +60,18 @@ public boolean isEndRequired() {
return !spanProcessorsEnd.isEmpty();
}

@Override
public void onEnding(ReadWriteSpan span) {
for (ExtendedSpanProcessor spanProcessor : spanProcessorsEnding) {
spanProcessor.onEnding(span);
}
}

@Override
public boolean isOnEndingRequired() {
return !spanProcessorsEnding.isEmpty();
}

@Override
public CompletableResultCode shutdown() {
if (isShutdown.getAndSet(true)) {
Expand All @@ -83,10 +97,17 @@ private MultiSpanProcessor(List<SpanProcessor> spanProcessors) {
this.spanProcessorsAll = spanProcessors;
this.spanProcessorsStart = new ArrayList<>(spanProcessorsAll.size());
this.spanProcessorsEnd = new ArrayList<>(spanProcessorsAll.size());
this.spanProcessorsEnding = new ArrayList<>(spanProcessorsAll.size());
for (SpanProcessor spanProcessor : spanProcessorsAll) {
if (spanProcessor.isStartRequired()) {
spanProcessorsStart.add(spanProcessor);
}
if (spanProcessor instanceof ExtendedSpanProcessor) {
ExtendedSpanProcessor extendedSpanProcessor = (ExtendedSpanProcessor) spanProcessor;
if (extendedSpanProcessor.isOnEndingRequired()) {
spanProcessorsEnding.add(extendedSpanProcessor);
}
}
if (spanProcessor.isEndRequired()) {
spanProcessorsEnd.add(spanProcessor);
}
Expand All @@ -98,6 +119,8 @@ public String toString() {
return "MultiSpanProcessor{"
+ "spanProcessorsStart="
+ spanProcessorsStart
+ ", spanProcessorsEnding="
+ spanProcessorsEnding
+ ", spanProcessorsEnd="
+ spanProcessorsEnd
+ ", spanProcessorsAll="
Expand Down
66 changes: 49 additions & 17 deletions sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpan.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.data.StatusData;
import io.opentelemetry.sdk.trace.internal.ExtendedSpanProcessor;
import io.opentelemetry.sdk.trace.internal.data.ExceptionEventData;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -95,9 +96,24 @@ final class SdkSpan implements ReadWriteSpan {
@GuardedBy("lock")
private long endEpochNanos;

// True if the span is ended.
private enum EndState {
NOT_ENDED,
ENDING,
ENDED
}

@GuardedBy("lock")
private boolean hasEnded;
private EndState hasEnded;

/**
* The thread on which {@link #end()} is called and which will be invoking the {@link
* SpanProcessor}s. This field is used to ensure that only this thread may modify the span while
* it is in state {@link EndState#ENDING} to prevent concurrent updates outside of {@link
* ExtendedSpanProcessor#onEnding(ReadWriteSpan)}.
*/
@GuardedBy("lock")
@Nullable
private Thread spanEndingThread;

private SdkSpan(
SpanContext context,
Expand All @@ -122,7 +138,7 @@ private SdkSpan(
this.kind = kind;
this.spanProcessor = spanProcessor;
this.resource = resource;
this.hasEnded = false;
this.hasEnded = EndState.NOT_ENDED;
this.clock = clock;
this.startEpochNanos = startEpochNanos;
this.attributes = attributes;
Expand Down Expand Up @@ -220,7 +236,7 @@ public SpanData toSpanData() {
status,
name,
endEpochNanos,
hasEnded);
hasEnded == EndState.ENDED);
}
}

Expand All @@ -242,7 +258,7 @@ public Attributes getAttributes() {
@Override
public boolean hasEnded() {
synchronized (lock) {
return hasEnded;
return hasEnded == EndState.ENDED;
}
}

Expand Down Expand Up @@ -288,7 +304,7 @@ public InstrumentationScopeInfo getInstrumentationScopeInfo() {
@Override
public long getLatencyNanos() {
synchronized (lock) {
return (hasEnded ? endEpochNanos : clock.now()) - startEpochNanos;
return (hasEnded == EndState.NOT_ENDED ? clock.now() : endEpochNanos) - startEpochNanos;
}
}

Expand All @@ -303,7 +319,7 @@ public <T> ReadWriteSpan setAttribute(AttributeKey<T> key, T value) {
return this;
}
synchronized (lock) {
if (hasEnded) {
if (!isModifiableByCurrentThread()) {
logger.log(Level.FINE, "Calling setAttribute() on an ended Span.");
return this;
}
Expand All @@ -318,6 +334,12 @@ public <T> ReadWriteSpan setAttribute(AttributeKey<T> key, T value) {
return this;
}

@GuardedBy("lock")
private boolean isModifiableByCurrentThread() {
return hasEnded == EndState.NOT_ENDED
|| (hasEnded == EndState.ENDING && Thread.currentThread() == spanEndingThread);
}

@Override
public ReadWriteSpan addEvent(String name) {
if (name == null) {
Expand Down Expand Up @@ -380,7 +402,7 @@ public ReadWriteSpan addEvent(String name, Attributes attributes, long timestamp

private void addTimedEvent(EventData timedEvent) {
synchronized (lock) {
if (hasEnded) {
if (!isModifiableByCurrentThread()) {
logger.log(Level.FINE, "Calling addEvent() on an ended Span.");
return;
}
Expand All @@ -400,7 +422,7 @@ public ReadWriteSpan setStatus(StatusCode statusCode, @Nullable String descripti
return this;
}
synchronized (lock) {
if (hasEnded) {
if (!isModifiableByCurrentThread()) {
logger.log(Level.FINE, "Calling setStatus() on an ended Span.");
return this;
} else if (this.status.getStatusCode() == StatusCode.OK) {
Expand Down Expand Up @@ -438,7 +460,7 @@ public ReadWriteSpan updateName(String name) {
return this;
}
synchronized (lock) {
if (hasEnded) {
if (!isModifiableByCurrentThread()) {
logger.log(Level.FINE, "Calling updateName() on an ended Span.");
return this;
}
Expand All @@ -463,7 +485,7 @@ public Span addLink(SpanContext spanContext, Attributes attributes) {
spanLimits.getMaxNumberOfAttributesPerLink(),
spanLimits.getMaxAttributeValueLength()));
synchronized (lock) {
if (hasEnded) {
if (!isModifiableByCurrentThread()) {
logger.log(Level.FINE, "Calling addLink() on an ended Span.");
return this;
}
Expand Down Expand Up @@ -493,12 +515,22 @@ public void end(long timestamp, TimeUnit unit) {

private void endInternal(long endEpochNanos) {
synchronized (lock) {
if (hasEnded) {
logger.log(Level.FINE, "Calling end() on an ended Span.");
if (hasEnded != EndState.NOT_ENDED) {
logger.log(Level.FINE, "Calling end() on an ended or ending Span.");
return;
}
this.endEpochNanos = endEpochNanos;
hasEnded = true;
spanEndingThread = Thread.currentThread();
hasEnded = EndState.ENDING;
}
if (spanProcessor instanceof ExtendedSpanProcessor) {
ExtendedSpanProcessor extendedSpanProcessor = (ExtendedSpanProcessor) spanProcessor;
if (extendedSpanProcessor.isOnEndingRequired()) {
extendedSpanProcessor.onEnding(this);
}
}
synchronized (lock) {
hasEnded = EndState.ENDED;
}
if (spanProcessor.isEndRequired()) {
spanProcessor.onEnd(this);
Expand All @@ -508,7 +540,7 @@ private void endInternal(long endEpochNanos) {
@Override
public boolean isRecording() {
synchronized (lock) {
return !hasEnded;
return hasEnded != EndState.ENDED;
}
}

Expand All @@ -533,7 +565,7 @@ private List<EventData> getImmutableTimedEvents() {

// if the span has ended, then the events are unmodifiable
// so we can return them directly and save copying all the data.
if (hasEnded) {
if (hasEnded == EndState.ENDED) {
return Collections.unmodifiableList(events);
}

Expand All @@ -547,7 +579,7 @@ private Attributes getImmutableAttributes() {
}
// if the span has ended, then the attributes are unmodifiable,
// so we can return them directly and save copying all the data.
if (hasEnded) {
if (hasEnded == EndState.ENDED) {
return attributes;
}
// otherwise, make a copy of the data into an immutable container.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.trace.internal;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.sdk.trace.ReadWriteSpan;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.SpanProcessor;

/**
* Extended {@link SpanProcessor} with experimental APIs.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public interface ExtendedSpanProcessor extends SpanProcessor {

/**
* Called when a {@link io.opentelemetry.api.trace.Span} is ended, but before {@link
* SpanProcessor#onEnd(ReadableSpan)} is invoked with an immutable variant of this span. This
* means that the span will still be mutable. Note that the span will only be modifiable
* synchronously from this callback, concurrent modifications from other threads will be
* prevented. Only called if {@link Span#isRecording()} returns true.
*
* <p>This method is called synchronously on the execution thread, should not throw or block the
* execution thread.
*
* @param span the {@code Span} that is just about to be ended.
*/
void onEnding(ReadWriteSpan span);

/**
* Returns {@code true} if this {@link SpanProcessor} requires onEnding events.
*
* @return {@code true} if this {@link SpanProcessor} requires onEnding events.
*/
boolean isOnEndingRequired();
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.internal.ExtendedSpanProcessor;
import java.util.Arrays;
import java.util.Collections;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -27,18 +28,20 @@
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
class MultiSpanProcessorTest {
@Mock private SpanProcessor spanProcessor1;
@Mock private SpanProcessor spanProcessor2;
@Mock private ExtendedSpanProcessor spanProcessor1;
@Mock private ExtendedSpanProcessor spanProcessor2;
@Mock private ReadableSpan readableSpan;
@Mock private ReadWriteSpan readWriteSpan;

@BeforeEach
void setUp() {
when(spanProcessor1.isStartRequired()).thenReturn(true);
when(spanProcessor1.isOnEndingRequired()).thenReturn(true);
when(spanProcessor1.isEndRequired()).thenReturn(true);
when(spanProcessor1.forceFlush()).thenReturn(CompletableResultCode.ofSuccess());
when(spanProcessor1.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
when(spanProcessor2.isStartRequired()).thenReturn(true);
when(spanProcessor2.isOnEndingRequired()).thenReturn(true);
when(spanProcessor2.isEndRequired()).thenReturn(true);
when(spanProcessor2.forceFlush()).thenReturn(CompletableResultCode.ofSuccess());
when(spanProcessor2.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
Expand All @@ -61,12 +64,17 @@ void oneSpanProcessor() {

@Test
void twoSpanProcessor() {
SpanProcessor multiSpanProcessor =
SpanProcessor.composite(Arrays.asList(spanProcessor1, spanProcessor2));
ExtendedSpanProcessor multiSpanProcessor =
(ExtendedSpanProcessor)
SpanProcessor.composite(Arrays.asList(spanProcessor1, spanProcessor2));
multiSpanProcessor.onStart(Context.root(), readWriteSpan);
verify(spanProcessor1).onStart(same(Context.root()), same(readWriteSpan));
verify(spanProcessor2).onStart(same(Context.root()), same(readWriteSpan));

multiSpanProcessor.onEnding(readWriteSpan);
verify(spanProcessor1).onEnding(same(readWriteSpan));
verify(spanProcessor2).onEnding(same(readWriteSpan));

multiSpanProcessor.onEnd(readableSpan);
verify(spanProcessor1).onEnd(same(readableSpan));
verify(spanProcessor2).onEnd(same(readableSpan));
Expand All @@ -83,9 +91,11 @@ void twoSpanProcessor() {
@Test
void twoSpanProcessor_DifferentRequirements() {
when(spanProcessor1.isEndRequired()).thenReturn(false);
when(spanProcessor2.isOnEndingRequired()).thenReturn(false);
when(spanProcessor2.isStartRequired()).thenReturn(false);
SpanProcessor multiSpanProcessor =
SpanProcessor.composite(Arrays.asList(spanProcessor1, spanProcessor2));
ExtendedSpanProcessor multiSpanProcessor =
(ExtendedSpanProcessor)
SpanProcessor.composite(Arrays.asList(spanProcessor1, spanProcessor2));

assertThat(multiSpanProcessor.isStartRequired()).isTrue();
assertThat(multiSpanProcessor.isEndRequired()).isTrue();
Expand All @@ -94,6 +104,10 @@ void twoSpanProcessor_DifferentRequirements() {
verify(spanProcessor1).onStart(same(Context.root()), same(readWriteSpan));
verify(spanProcessor2, times(0)).onStart(any(Context.class), any(ReadWriteSpan.class));

multiSpanProcessor.onEnding(readWriteSpan);
verify(spanProcessor1).onEnding(same(readWriteSpan));
verify(spanProcessor2, times(0)).onEnding(any(ReadWriteSpan.class));

multiSpanProcessor.onEnd(readableSpan);
verify(spanProcessor1, times(0)).onEnd(any(ReadableSpan.class));
verify(spanProcessor2).onEnd(same(readableSpan));
Expand All @@ -117,6 +131,7 @@ void stringRepresentation() {
.hasToString(
"MultiSpanProcessor{"
+ "spanProcessorsStart=[spanProcessor1, spanProcessor1], "
+ "spanProcessorsEnding=[spanProcessor1, spanProcessor1], "
+ "spanProcessorsEnd=[spanProcessor1, spanProcessor1], "
+ "spanProcessorsAll=[spanProcessor1, spanProcessor1]}");
}
Expand Down
Loading

0 comments on commit 09de4bd

Please sign in to comment.