From f25743841def5134c9058687a744f42e0b3e9908 Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Tue, 9 Apr 2024 12:52:27 +0200 Subject: [PATCH 01/14] Implemented beforeEnd callback --- .../sdk/trace/MultiSpanProcessor.java | 17 ++++++ .../io/opentelemetry/sdk/trace/SdkSpan.java | 43 +++++++++------ .../sdk/trace/SpanProcessor.java | 20 +++++++ .../sdk/trace/MultiSpanProcessorTest.java | 11 ++++ .../opentelemetry/sdk/trace/SdkSpanTest.java | 53 +++++++++++++++++++ 5 files changed, 127 insertions(+), 17 deletions(-) diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/MultiSpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/MultiSpanProcessor.java index 705351e83f5..025f77cb5c6 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/MultiSpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/MultiSpanProcessor.java @@ -18,6 +18,7 @@ */ final class MultiSpanProcessor implements SpanProcessor { private final List spanProcessorsStart; + private final List spanProcessorsBeforeEnd; private final List spanProcessorsEnd; private final List spanProcessorsAll; private final AtomicBoolean isShutdown = new AtomicBoolean(false); @@ -58,6 +59,18 @@ public boolean isEndRequired() { return !spanProcessorsEnd.isEmpty(); } + @Override + public void beforeEnd(ReadWriteSpan span) { + for (SpanProcessor spanProcessor : spanProcessorsBeforeEnd) { + spanProcessor.beforeEnd(span); + } + } + + @Override + public boolean isBeforeEndRequired() { + return !spanProcessorsBeforeEnd.isEmpty(); + } + @Override public CompletableResultCode shutdown() { if (isShutdown.getAndSet(true)) { @@ -83,10 +96,14 @@ private MultiSpanProcessor(List spanProcessors) { this.spanProcessorsAll = spanProcessors; this.spanProcessorsStart = new ArrayList<>(spanProcessorsAll.size()); this.spanProcessorsEnd = new ArrayList<>(spanProcessorsAll.size()); + this.spanProcessorsBeforeEnd = new ArrayList<>(spanProcessorsAll.size()); for (SpanProcessor spanProcessor : spanProcessorsAll) { if (spanProcessor.isStartRequired()) { spanProcessorsStart.add(spanProcessor); } + if (spanProcessor.isBeforeEndRequired()) { + spanProcessorsBeforeEnd.add(spanProcessor); + } if (spanProcessor.isEndRequired()) { spanProcessorsEnd.add(spanProcessor); } diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpan.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpan.java index dfa8e9689a0..976a273a250 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpan.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpan.java @@ -95,9 +95,14 @@ final class SdkSpan implements ReadWriteSpan { @GuardedBy("lock") private long endEpochNanos; - // True if the span is ended. + private enum EndState { + NOT_ENDED, + ENDING, + ENDED + } + @GuardedBy("lock") - private boolean hasEnded; + private EndState hasEnded; private SdkSpan( SpanContext context, @@ -122,7 +127,7 @@ private SdkSpan( this.kind = kind; this.spanProcessor = spanProcessor; this.resource = resource; - this.hasEnded = false; + this.hasEnded = EndState.NOT_ENDED; this.clock = clock; this.startEpochNanos = startEpochNanos; this.attributes = attributes; @@ -220,7 +225,7 @@ public SpanData toSpanData() { status, name, endEpochNanos, - hasEnded); + hasEnded == EndState.ENDED); } } @@ -235,7 +240,7 @@ public T getAttribute(AttributeKey key) { @Override public boolean hasEnded() { synchronized (lock) { - return hasEnded; + return hasEnded == EndState.ENDED; } } @@ -281,7 +286,7 @@ public InstrumentationScopeInfo getInstrumentationScopeInfo() { @Override public long getLatencyNanos() { synchronized (lock) { - return (hasEnded ? endEpochNanos : clock.now()) - startEpochNanos; + return (hasEnded == EndState.NOT_ENDED ? clock.now() : endEpochNanos) - startEpochNanos; } } @@ -296,7 +301,7 @@ public ReadWriteSpan setAttribute(AttributeKey key, T value) { return this; } synchronized (lock) { - if (hasEnded) { + if (hasEnded == EndState.ENDED) { logger.log(Level.FINE, "Calling setAttribute() on an ended Span."); return this; } @@ -373,7 +378,7 @@ public ReadWriteSpan addEvent(String name, Attributes attributes, long timestamp private void addTimedEvent(EventData timedEvent) { synchronized (lock) { - if (hasEnded) { + if (hasEnded == EndState.ENDED) { logger.log(Level.FINE, "Calling addEvent() on an ended Span."); return; } @@ -393,7 +398,7 @@ public ReadWriteSpan setStatus(StatusCode statusCode, @Nullable String descripti return this; } synchronized (lock) { - if (hasEnded) { + if (hasEnded == EndState.ENDED) { logger.log(Level.FINE, "Calling setStatus() on an ended Span."); return this; } else if (this.status.getStatusCode() == StatusCode.OK) { @@ -431,7 +436,7 @@ public ReadWriteSpan updateName(String name) { return this; } synchronized (lock) { - if (hasEnded) { + if (hasEnded == EndState.ENDED) { logger.log(Level.FINE, "Calling updateName() on an ended Span."); return this; } @@ -456,7 +461,7 @@ public Span addLink(SpanContext spanContext, Attributes attributes) { spanLimits.getMaxNumberOfAttributesPerLink(), spanLimits.getMaxAttributeValueLength())); synchronized (lock) { - if (hasEnded) { + if (hasEnded == EndState.ENDED) { logger.log(Level.FINE, "Calling addLink() on an ended Span."); return this; } @@ -486,12 +491,16 @@ public void end(long timestamp, TimeUnit unit) { private void endInternal(long endEpochNanos) { synchronized (lock) { - if (hasEnded) { - logger.log(Level.FINE, "Calling end() on an ended Span."); + if (hasEnded != EndState.NOT_ENDED) { + logger.log(Level.FINE, "Calling end() on an ended or ending Span."); return; } + hasEnded = EndState.ENDING; this.endEpochNanos = endEpochNanos; - hasEnded = true; + if (spanProcessor.isBeforeEndRequired()) { + spanProcessor.beforeEnd(this); + } + hasEnded = EndState.ENDED; } if (spanProcessor.isEndRequired()) { spanProcessor.onEnd(this); @@ -501,7 +510,7 @@ private void endInternal(long endEpochNanos) { @Override public boolean isRecording() { synchronized (lock) { - return !hasEnded; + return hasEnded != EndState.ENDED; } } @@ -526,7 +535,7 @@ private List getImmutableTimedEvents() { // if the span has ended, then the events are unmodifiable // so we can return them directly and save copying all the data. - if (hasEnded) { + if (hasEnded == EndState.ENDED) { return Collections.unmodifiableList(events); } @@ -540,7 +549,7 @@ private Attributes getImmutableAttributes() { } // if the span has ended, then the attributes are unmodifiable, // so we can return them directly and save copying all the data. - if (hasEnded) { + if (hasEnded == EndState.ENDED) { return attributes; } // otherwise, make a copy of the data into an immutable container. diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SpanProcessor.java index ba9fb037c84..0faac0fac61 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SpanProcessor.java @@ -85,6 +85,26 @@ static SpanProcessor composite(Iterable processors) { */ boolean isEndRequired(); + + /** + * Called just before a {@link io.opentelemetry.api.trace.Span} is ended, if the {@link + * Span#isRecording()} returns true. This means that the span will still be mutable + * + *

This method is called synchronously on the execution thread, should not throw or block the + * execution thread. + * + * @param span the {@code Span} that is just about to be ended. + */ + default void beforeEnd(ReadWriteSpan span) { + } + + /** + * Returns {@code true} if this {@link SpanProcessor} requires before-end events. + */ + default boolean isBeforeEndRequired() { + return false; + } + /** * Processes all span events that have not yet been processed and closes used resources. * diff --git a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/MultiSpanProcessorTest.java b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/MultiSpanProcessorTest.java index c51fe61c954..198a5febdfa 100644 --- a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/MultiSpanProcessorTest.java +++ b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/MultiSpanProcessorTest.java @@ -35,10 +35,12 @@ class MultiSpanProcessorTest { @BeforeEach void setUp() { when(spanProcessor1.isStartRequired()).thenReturn(true); + when(spanProcessor1.isBeforeEndRequired()).thenReturn(true); when(spanProcessor1.isEndRequired()).thenReturn(true); when(spanProcessor1.forceFlush()).thenReturn(CompletableResultCode.ofSuccess()); when(spanProcessor1.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); when(spanProcessor2.isStartRequired()).thenReturn(true); + when(spanProcessor2.isBeforeEndRequired()).thenReturn(true); when(spanProcessor2.isEndRequired()).thenReturn(true); when(spanProcessor2.forceFlush()).thenReturn(CompletableResultCode.ofSuccess()); when(spanProcessor2.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); @@ -67,6 +69,10 @@ void twoSpanProcessor() { verify(spanProcessor1).onStart(same(Context.root()), same(readWriteSpan)); verify(spanProcessor2).onStart(same(Context.root()), same(readWriteSpan)); + multiSpanProcessor.beforeEnd(readWriteSpan); + verify(spanProcessor1).beforeEnd(same(readWriteSpan)); + verify(spanProcessor2).beforeEnd(same(readWriteSpan)); + multiSpanProcessor.onEnd(readableSpan); verify(spanProcessor1).onEnd(same(readableSpan)); verify(spanProcessor2).onEnd(same(readableSpan)); @@ -83,6 +89,7 @@ void twoSpanProcessor() { @Test void twoSpanProcessor_DifferentRequirements() { when(spanProcessor1.isEndRequired()).thenReturn(false); + when(spanProcessor2.isBeforeEndRequired()).thenReturn(false); when(spanProcessor2.isStartRequired()).thenReturn(false); SpanProcessor multiSpanProcessor = SpanProcessor.composite(Arrays.asList(spanProcessor1, spanProcessor2)); @@ -94,6 +101,10 @@ void twoSpanProcessor_DifferentRequirements() { verify(spanProcessor1).onStart(same(Context.root()), same(readWriteSpan)); verify(spanProcessor2, times(0)).onStart(any(Context.class), any(ReadWriteSpan.class)); + multiSpanProcessor.beforeEnd(readWriteSpan); + verify(spanProcessor1).beforeEnd(same(readWriteSpan)); + verify(spanProcessor2, times(0)).beforeEnd(any(ReadWriteSpan.class)); + multiSpanProcessor.onEnd(readableSpan); verify(spanProcessor1, times(0)).onEnd(any(ReadableSpan.class)); verify(spanProcessor2).onEnd(same(readableSpan)); diff --git a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/SdkSpanTest.java b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/SdkSpanTest.java index 42302f54b82..34c6755a83c 100644 --- a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/SdkSpanTest.java +++ b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/SdkSpanTest.java @@ -17,6 +17,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -57,6 +59,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.IntStream; import javax.annotation.Nullable; import org.junit.jupiter.api.BeforeEach; @@ -106,6 +110,7 @@ void setUp() { expectedAttributes = builder.build(); testClock = TestClock.create(Instant.ofEpochSecond(0, START_EPOCH_NANOS)); when(spanProcessor.isStartRequired()).thenReturn(true); + when(spanProcessor.isBeforeEndRequired()).thenReturn(true); when(spanProcessor.isEndRequired()).thenReturn(true); } @@ -139,6 +144,54 @@ void endSpanTwice_DoNotCrash() { assertThat(span.hasEnded()).isTrue(); } + @Test + void beforeEnd_spanStillMutable() { + SdkSpan span = createTestSpan(SpanKind.INTERNAL); + + AttributeKey dummyAttrib = AttributeKey.stringKey("processor_foo"); + + AtomicBoolean endedStateInProcessor = new AtomicBoolean(); + doAnswer(invocation -> { + ReadWriteSpan sp = invocation.getArgument(0, ReadWriteSpan.class); + assertThat(sp.hasEnded()).isFalse(); + sp.end(); //should have no effect, nested end should be detected + endedStateInProcessor.set(sp.hasEnded()); + sp.setAttribute(dummyAttrib, "bar"); + return null; + }).when(spanProcessor).beforeEnd(any()); + + span.end(); + verify(spanProcessor).beforeEnd(same(span)); + assertThat(span.hasEnded()).isTrue(); + assertThat(endedStateInProcessor.get()).isFalse(); + assertThat(span.getAttribute(dummyAttrib)).isEqualTo("bar"); + } + + @Test + void beforeEnd_latencyPinned() { + SdkSpan span = createTestSpan(SpanKind.INTERNAL); + + AtomicLong spanLatencyInProcessor = new AtomicLong(); + doAnswer(invocation -> { + ReadWriteSpan sp = invocation.getArgument(0, ReadWriteSpan.class); + + testClock.advance(Duration.ofSeconds(100)); + spanLatencyInProcessor.set(sp.getLatencyNanos()); + return null; + }).when(spanProcessor).beforeEnd(any()); + + testClock.advance(Duration.ofSeconds(1)); + long expectedDuration = testClock.now() - START_EPOCH_NANOS; + + assertThat(span.getLatencyNanos()).isEqualTo(expectedDuration); + + span.end(); + verify(spanProcessor).beforeEnd(same(span)); + assertThat(span.hasEnded()).isTrue(); + assertThat(span.getLatencyNanos()).isEqualTo(expectedDuration); + assertThat(spanLatencyInProcessor.get()).isEqualTo(expectedDuration); + } + @Test void toSpanData_ActiveSpan() { SdkSpan span = createTestSpan(SpanKind.INTERNAL); From 55f5244b2ed53366955491e011a3eeb60f7b0e80 Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Thu, 4 Jul 2024 14:21:23 +0200 Subject: [PATCH 02/14] Rename to onEnding --- .../sdk/trace/MultiSpanProcessor.java | 8 ++++---- .../io/opentelemetry/sdk/trace/SdkSpan.java | 4 ++-- .../opentelemetry/sdk/trace/SpanProcessor.java | 4 ++-- .../sdk/trace/MultiSpanProcessorTest.java | 18 +++++++++--------- .../opentelemetry/sdk/trace/SdkSpanTest.java | 10 +++++----- 5 files changed, 22 insertions(+), 22 deletions(-) diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/MultiSpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/MultiSpanProcessor.java index 025f77cb5c6..4e9cce74153 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/MultiSpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/MultiSpanProcessor.java @@ -60,14 +60,14 @@ public boolean isEndRequired() { } @Override - public void beforeEnd(ReadWriteSpan span) { + public void onEnding(ReadWriteSpan span) { for (SpanProcessor spanProcessor : spanProcessorsBeforeEnd) { - spanProcessor.beforeEnd(span); + spanProcessor.onEnding(span); } } @Override - public boolean isBeforeEndRequired() { + public boolean isOnEndingRequired() { return !spanProcessorsBeforeEnd.isEmpty(); } @@ -101,7 +101,7 @@ private MultiSpanProcessor(List spanProcessors) { if (spanProcessor.isStartRequired()) { spanProcessorsStart.add(spanProcessor); } - if (spanProcessor.isBeforeEndRequired()) { + if (spanProcessor.isOnEndingRequired()) { spanProcessorsBeforeEnd.add(spanProcessor); } if (spanProcessor.isEndRequired()) { diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpan.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpan.java index 976a273a250..d1e19798a0d 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpan.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpan.java @@ -497,8 +497,8 @@ private void endInternal(long endEpochNanos) { } hasEnded = EndState.ENDING; this.endEpochNanos = endEpochNanos; - if (spanProcessor.isBeforeEndRequired()) { - spanProcessor.beforeEnd(this); + if (spanProcessor.isOnEndingRequired()) { + spanProcessor.onEnding(this); } hasEnded = EndState.ENDED; } diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SpanProcessor.java index 0faac0fac61..b5c36ed2f17 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SpanProcessor.java @@ -95,13 +95,13 @@ static SpanProcessor composite(Iterable processors) { * * @param span the {@code Span} that is just about to be ended. */ - default void beforeEnd(ReadWriteSpan span) { + default void onEnding(ReadWriteSpan span) { } /** * Returns {@code true} if this {@link SpanProcessor} requires before-end events. */ - default boolean isBeforeEndRequired() { + default boolean isOnEndingRequired() { return false; } diff --git a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/MultiSpanProcessorTest.java b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/MultiSpanProcessorTest.java index 198a5febdfa..aaeab095d2d 100644 --- a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/MultiSpanProcessorTest.java +++ b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/MultiSpanProcessorTest.java @@ -35,12 +35,12 @@ class MultiSpanProcessorTest { @BeforeEach void setUp() { when(spanProcessor1.isStartRequired()).thenReturn(true); - when(spanProcessor1.isBeforeEndRequired()).thenReturn(true); + when(spanProcessor1.isOnEndingRequired()).thenReturn(true); when(spanProcessor1.isEndRequired()).thenReturn(true); when(spanProcessor1.forceFlush()).thenReturn(CompletableResultCode.ofSuccess()); when(spanProcessor1.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); when(spanProcessor2.isStartRequired()).thenReturn(true); - when(spanProcessor2.isBeforeEndRequired()).thenReturn(true); + when(spanProcessor2.isOnEndingRequired()).thenReturn(true); when(spanProcessor2.isEndRequired()).thenReturn(true); when(spanProcessor2.forceFlush()).thenReturn(CompletableResultCode.ofSuccess()); when(spanProcessor2.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); @@ -69,9 +69,9 @@ void twoSpanProcessor() { verify(spanProcessor1).onStart(same(Context.root()), same(readWriteSpan)); verify(spanProcessor2).onStart(same(Context.root()), same(readWriteSpan)); - multiSpanProcessor.beforeEnd(readWriteSpan); - verify(spanProcessor1).beforeEnd(same(readWriteSpan)); - verify(spanProcessor2).beforeEnd(same(readWriteSpan)); + multiSpanProcessor.onEnding(readWriteSpan); + verify(spanProcessor1).onEnding(same(readWriteSpan)); + verify(spanProcessor2).onEnding(same(readWriteSpan)); multiSpanProcessor.onEnd(readableSpan); verify(spanProcessor1).onEnd(same(readableSpan)); @@ -89,7 +89,7 @@ void twoSpanProcessor() { @Test void twoSpanProcessor_DifferentRequirements() { when(spanProcessor1.isEndRequired()).thenReturn(false); - when(spanProcessor2.isBeforeEndRequired()).thenReturn(false); + when(spanProcessor2.isOnEndingRequired()).thenReturn(false); when(spanProcessor2.isStartRequired()).thenReturn(false); SpanProcessor multiSpanProcessor = SpanProcessor.composite(Arrays.asList(spanProcessor1, spanProcessor2)); @@ -101,9 +101,9 @@ void twoSpanProcessor_DifferentRequirements() { verify(spanProcessor1).onStart(same(Context.root()), same(readWriteSpan)); verify(spanProcessor2, times(0)).onStart(any(Context.class), any(ReadWriteSpan.class)); - multiSpanProcessor.beforeEnd(readWriteSpan); - verify(spanProcessor1).beforeEnd(same(readWriteSpan)); - verify(spanProcessor2, times(0)).beforeEnd(any(ReadWriteSpan.class)); + multiSpanProcessor.onEnding(readWriteSpan); + verify(spanProcessor1).onEnding(same(readWriteSpan)); + verify(spanProcessor2, times(0)).onEnding(any(ReadWriteSpan.class)); multiSpanProcessor.onEnd(readableSpan); verify(spanProcessor1, times(0)).onEnd(any(ReadableSpan.class)); diff --git a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/SdkSpanTest.java b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/SdkSpanTest.java index 34c6755a83c..7e9fe7f73fc 100644 --- a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/SdkSpanTest.java +++ b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/SdkSpanTest.java @@ -110,7 +110,7 @@ void setUp() { expectedAttributes = builder.build(); testClock = TestClock.create(Instant.ofEpochSecond(0, START_EPOCH_NANOS)); when(spanProcessor.isStartRequired()).thenReturn(true); - when(spanProcessor.isBeforeEndRequired()).thenReturn(true); + when(spanProcessor.isOnEndingRequired()).thenReturn(true); when(spanProcessor.isEndRequired()).thenReturn(true); } @@ -158,10 +158,10 @@ void beforeEnd_spanStillMutable() { endedStateInProcessor.set(sp.hasEnded()); sp.setAttribute(dummyAttrib, "bar"); return null; - }).when(spanProcessor).beforeEnd(any()); + }).when(spanProcessor).onEnding(any()); span.end(); - verify(spanProcessor).beforeEnd(same(span)); + verify(spanProcessor).onEnding(same(span)); assertThat(span.hasEnded()).isTrue(); assertThat(endedStateInProcessor.get()).isFalse(); assertThat(span.getAttribute(dummyAttrib)).isEqualTo("bar"); @@ -178,7 +178,7 @@ void beforeEnd_latencyPinned() { testClock.advance(Duration.ofSeconds(100)); spanLatencyInProcessor.set(sp.getLatencyNanos()); return null; - }).when(spanProcessor).beforeEnd(any()); + }).when(spanProcessor).onEnding(any()); testClock.advance(Duration.ofSeconds(1)); long expectedDuration = testClock.now() - START_EPOCH_NANOS; @@ -186,7 +186,7 @@ void beforeEnd_latencyPinned() { assertThat(span.getLatencyNanos()).isEqualTo(expectedDuration); span.end(); - verify(spanProcessor).beforeEnd(same(span)); + verify(spanProcessor).onEnding(same(span)); assertThat(span.hasEnded()).isTrue(); assertThat(span.getLatencyNanos()).isEqualTo(expectedDuration); assertThat(spanLatencyInProcessor.get()).isEqualTo(expectedDuration); From f3fd785ad04520e2b2e87dad153be5924d78460d Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Wed, 7 Aug 2024 11:56:36 +0200 Subject: [PATCH 03/14] spotless and documentation clarifications --- .../io/opentelemetry/sdk/trace/SdkSpan.java | 2 +- .../sdk/trace/SpanProcessor.java | 16 ++++++--- .../opentelemetry/sdk/trace/SdkSpanTest.java | 36 +++++++++++-------- 3 files changed, 33 insertions(+), 21 deletions(-) diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpan.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpan.java index 17398154cd3..eb2bc74b9ce 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpan.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpan.java @@ -293,7 +293,7 @@ public InstrumentationScopeInfo getInstrumentationScopeInfo() { @Override public long getLatencyNanos() { synchronized (lock) { - return (hasEnded == EndState.NOT_ENDED ? clock.now() : endEpochNanos) - startEpochNanos; + return (hasEnded == EndState.NOT_ENDED ? clock.now() : endEpochNanos) - startEpochNanos; } } diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SpanProcessor.java index b5c36ed2f17..bb7b692ae2c 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SpanProcessor.java @@ -85,21 +85,27 @@ static SpanProcessor composite(Iterable processors) { */ boolean isEndRequired(); - /** * Called just before a {@link io.opentelemetry.api.trace.Span} is ended, if the {@link - * Span#isRecording()} returns true. This means that the span will still be mutable + * Span#isRecording()} returns true. This means that the span will still be mutable. Note that the + * span will only be modifiable synchronously from this callback, concurrent modifications from + * other threads will be prevented. * *

This method is called synchronously on the execution thread, should not throw or block the * execution thread. * + *

Note: This method is experimental and might be subject to future changes. + * * @param span the {@code Span} that is just about to be ended. */ - default void onEnding(ReadWriteSpan span) { - } + default void onEnding(ReadWriteSpan span) {} /** - * Returns {@code true} if this {@link SpanProcessor} requires before-end events. + * Returns {@code true} if this {@link SpanProcessor} requires onEnding events. + * + *

Note: This method is experimental and might be subject to future changes. + * + * @return {@code true} if this {@link SpanProcessor} requires onEnding events. */ default boolean isOnEndingRequired() { return false; diff --git a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/SdkSpanTest.java b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/SdkSpanTest.java index 4531b79690a..5b0373d6719 100644 --- a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/SdkSpanTest.java +++ b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/SdkSpanTest.java @@ -152,14 +152,17 @@ void beforeEnd_spanStillMutable() { AttributeKey dummyAttrib = AttributeKey.stringKey("processor_foo"); AtomicBoolean endedStateInProcessor = new AtomicBoolean(); - doAnswer(invocation -> { - ReadWriteSpan sp = invocation.getArgument(0, ReadWriteSpan.class); - assertThat(sp.hasEnded()).isFalse(); - sp.end(); //should have no effect, nested end should be detected - endedStateInProcessor.set(sp.hasEnded()); - sp.setAttribute(dummyAttrib, "bar"); - return null; - }).when(spanProcessor).onEnding(any()); + doAnswer( + invocation -> { + ReadWriteSpan sp = invocation.getArgument(0, ReadWriteSpan.class); + assertThat(sp.hasEnded()).isFalse(); + sp.end(); // should have no effect, nested end should be detected + endedStateInProcessor.set(sp.hasEnded()); + sp.setAttribute(dummyAttrib, "bar"); + return null; + }) + .when(spanProcessor) + .onEnding(any()); span.end(); verify(spanProcessor).onEnding(same(span)); @@ -173,13 +176,16 @@ void beforeEnd_latencyPinned() { SdkSpan span = createTestSpan(SpanKind.INTERNAL); AtomicLong spanLatencyInProcessor = new AtomicLong(); - doAnswer(invocation -> { - ReadWriteSpan sp = invocation.getArgument(0, ReadWriteSpan.class); - - testClock.advance(Duration.ofSeconds(100)); - spanLatencyInProcessor.set(sp.getLatencyNanos()); - return null; - }).when(spanProcessor).onEnding(any()); + doAnswer( + invocation -> { + ReadWriteSpan sp = invocation.getArgument(0, ReadWriteSpan.class); + + testClock.advance(Duration.ofSeconds(100)); + spanLatencyInProcessor.set(sp.getLatencyNanos()); + return null; + }) + .when(spanProcessor) + .onEnding(any()); testClock.advance(Duration.ofSeconds(1)); long expectedDuration = testClock.now() - START_EPOCH_NANOS; From c7ed9707f005a4356b106b83787a0b16159540ee Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Wed, 7 Aug 2024 12:20:31 +0200 Subject: [PATCH 04/14] Added jApiCMp output --- docs/apidiffs/current_vs_latest/opentelemetry-sdk-trace.txt | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-trace.txt b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-trace.txt index 4e569d20a7f..84e2d5c8e6c 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-trace.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-trace.txt @@ -1,2 +1,5 @@ Comparing source compatibility of opentelemetry-sdk-trace-1.41.0-SNAPSHOT.jar against opentelemetry-sdk-trace-1.40.0.jar -No changes. \ No newline at end of file +*** MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.sdk.trace.SpanProcessor (not serializable) + === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 + +++ NEW METHOD: PUBLIC(+) boolean isOnEndingRequired() + +++ NEW METHOD: PUBLIC(+) void onEnding(io.opentelemetry.sdk.trace.ReadWriteSpan) From d623112a0170591112debf9988f8fc646b8bfddc Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Tue, 13 Aug 2024 09:48:59 +0200 Subject: [PATCH 05/14] Prevent concurrent modifications from other threads instead of holding lock --- .../opentelemetry-sdk-trace.txt | 5 ++- .../io/opentelemetry/sdk/trace/SdkSpan.java | 37 ++++++++++++++----- .../opentelemetry/sdk/trace/SdkSpanTest.java | 36 +++++++++++++++++- 3 files changed, 66 insertions(+), 12 deletions(-) diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-trace.txt b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-trace.txt index 98ead6c6c5b..44f812e6b70 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-trace.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-trace.txt @@ -1,2 +1,5 @@ Comparing source compatibility of opentelemetry-sdk-trace-1.42.0-SNAPSHOT.jar against opentelemetry-sdk-trace-1.41.0.jar -No changes. \ No newline at end of file +*** MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.sdk.trace.SpanProcessor (not serializable) + === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 + +++ NEW METHOD: PUBLIC(+) boolean isOnEndingRequired() + +++ NEW METHOD: PUBLIC(+) void onEnding(io.opentelemetry.sdk.trace.ReadWriteSpan) diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpan.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpan.java index eb2bc74b9ce..c0b1d6bbfd0 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpan.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpan.java @@ -104,6 +104,16 @@ private enum EndState { @GuardedBy("lock") private EndState hasEnded; + /** + * The thread on which {@link #end()} is called and which will be invoking the {@link + * SpanProcessor}s. This field is used to ensure that only this thread may modify the span while + * it is in state {@link EndState#ENDING} to prevent concurrent updates outside of {@link + * SpanProcessor#onEnding(ReadWriteSpan)}. + */ + @GuardedBy("lock") + @Nullable + private Thread spanEndingThread; + private SdkSpan( SpanContext context, String name, @@ -308,7 +318,7 @@ public ReadWriteSpan setAttribute(AttributeKey key, T value) { return this; } synchronized (lock) { - if (hasEnded == EndState.ENDED) { + if (isSpanUnmodifiableByCurrentThread()) { logger.log(Level.FINE, "Calling setAttribute() on an ended Span."); return this; } @@ -323,6 +333,12 @@ public ReadWriteSpan setAttribute(AttributeKey key, T value) { return this; } + @GuardedBy("lock") + private boolean isSpanUnmodifiableByCurrentThread() { + return hasEnded == EndState.ENDED + || (hasEnded == EndState.ENDING && Thread.currentThread() != spanEndingThread); + } + @Override public ReadWriteSpan addEvent(String name) { if (name == null) { @@ -385,7 +401,7 @@ public ReadWriteSpan addEvent(String name, Attributes attributes, long timestamp private void addTimedEvent(EventData timedEvent) { synchronized (lock) { - if (hasEnded == EndState.ENDED) { + if (isSpanUnmodifiableByCurrentThread()) { logger.log(Level.FINE, "Calling addEvent() on an ended Span."); return; } @@ -405,7 +421,7 @@ public ReadWriteSpan setStatus(StatusCode statusCode, @Nullable String descripti return this; } synchronized (lock) { - if (hasEnded == EndState.ENDED) { + if (isSpanUnmodifiableByCurrentThread()) { logger.log(Level.FINE, "Calling setStatus() on an ended Span."); return this; } else if (this.status.getStatusCode() == StatusCode.OK) { @@ -443,7 +459,7 @@ public ReadWriteSpan updateName(String name) { return this; } synchronized (lock) { - if (hasEnded == EndState.ENDED) { + if (isSpanUnmodifiableByCurrentThread()) { logger.log(Level.FINE, "Calling updateName() on an ended Span."); return this; } @@ -468,7 +484,7 @@ public Span addLink(SpanContext spanContext, Attributes attributes) { spanLimits.getMaxNumberOfAttributesPerLink(), spanLimits.getMaxAttributeValueLength())); synchronized (lock) { - if (hasEnded == EndState.ENDED) { + if (isSpanUnmodifiableByCurrentThread()) { logger.log(Level.FINE, "Calling addLink() on an ended Span."); return this; } @@ -502,11 +518,14 @@ private void endInternal(long endEpochNanos) { logger.log(Level.FINE, "Calling end() on an ended or ending Span."); return; } - hasEnded = EndState.ENDING; this.endEpochNanos = endEpochNanos; - if (spanProcessor.isOnEndingRequired()) { - spanProcessor.onEnding(this); - } + spanEndingThread = Thread.currentThread(); + hasEnded = EndState.ENDING; + } + if (spanProcessor.isOnEndingRequired()) { + spanProcessor.onEnding(this); + } + synchronized (lock) { hasEnded = EndState.ENDED; } if (spanProcessor.isEndRequired()) { diff --git a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/SdkSpanTest.java b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/SdkSpanTest.java index 5b0373d6719..54a03db1db8 100644 --- a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/SdkSpanTest.java +++ b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/SdkSpanTest.java @@ -146,7 +146,7 @@ void endSpanTwice_DoNotCrash() { } @Test - void beforeEnd_spanStillMutable() { + void onEnding_spanStillMutable() { SdkSpan span = createTestSpan(SpanKind.INTERNAL); AttributeKey dummyAttrib = AttributeKey.stringKey("processor_foo"); @@ -172,7 +172,39 @@ void beforeEnd_spanStillMutable() { } @Test - void beforeEnd_latencyPinned() { + void onEnding_concurrentModificationsPrevented() { + SdkSpan span = createTestSpan(SpanKind.INTERNAL); + + AttributeKey syncAttrib = AttributeKey.stringKey("sync_foo"); + AttributeKey concurrentAttrib = AttributeKey.stringKey("concurrent_foo"); + + doAnswer( + invocation -> { + ReadWriteSpan sp = invocation.getArgument(0, ReadWriteSpan.class); + + Thread concurrent = + new Thread( + () -> { + sp.setAttribute(concurrentAttrib, "concurrent_bar"); + }); + concurrent.start(); + concurrent.join(); + + sp.setAttribute(syncAttrib, "sync_bar"); + + return null; + }) + .when(spanProcessor) + .onEnding(any()); + + span.end(); + verify(spanProcessor).onEnding(same(span)); + assertThat(span.getAttribute(concurrentAttrib)).isNull(); + assertThat(span.getAttribute(syncAttrib)).isEqualTo("sync_bar"); + } + + @Test + void onEnding_latencyPinned() { SdkSpan span = createTestSpan(SpanKind.INTERNAL); AtomicLong spanLatencyInProcessor = new AtomicLong(); From 78e715bac707958527784260282abfd9a95f2d22 Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Tue, 20 Aug 2024 13:23:14 +0200 Subject: [PATCH 06/14] Moved onEnding to ExtendedSpanProcessor interface --- .../opentelemetry-sdk-trace.txt | 5 +-- .../sdk/trace/MultiSpanProcessor.java | 20 ++++++--- .../io/opentelemetry/sdk/trace/SdkSpan.java | 8 +++- .../sdk/trace/SpanProcessor.java | 26 ----------- .../trace/internal/ExtendedSpanProcessor.java | 44 +++++++++++++++++++ .../sdk/trace/MultiSpanProcessorTest.java | 15 ++++--- .../opentelemetry/sdk/trace/SdkSpanTest.java | 3 +- 7 files changed, 75 insertions(+), 46 deletions(-) create mode 100644 sdk/trace/src/main/java/io/opentelemetry/sdk/trace/internal/ExtendedSpanProcessor.java diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-trace.txt b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-trace.txt index 44f812e6b70..98ead6c6c5b 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-trace.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-trace.txt @@ -1,5 +1,2 @@ Comparing source compatibility of opentelemetry-sdk-trace-1.42.0-SNAPSHOT.jar against opentelemetry-sdk-trace-1.41.0.jar -*** MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.sdk.trace.SpanProcessor (not serializable) - === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 - +++ NEW METHOD: PUBLIC(+) boolean isOnEndingRequired() - +++ NEW METHOD: PUBLIC(+) void onEnding(io.opentelemetry.sdk.trace.ReadWriteSpan) +No changes. \ No newline at end of file diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/MultiSpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/MultiSpanProcessor.java index aaf4c6c4516..65b37096568 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/MultiSpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/MultiSpanProcessor.java @@ -7,6 +7,7 @@ import io.opentelemetry.context.Context; import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.trace.internal.ExtendedSpanProcessor; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -16,9 +17,9 @@ * Implementation of the {@code SpanProcessor} that simply forwards all received events to a list of * {@code SpanProcessor}s. */ -final class MultiSpanProcessor implements SpanProcessor { +final class MultiSpanProcessor implements ExtendedSpanProcessor { private final List spanProcessorsStart; - private final List spanProcessorsBeforeEnd; + private final List spanProcessorsEnding; private final List spanProcessorsEnd; private final List spanProcessorsAll; private final AtomicBoolean isShutdown = new AtomicBoolean(false); @@ -61,14 +62,14 @@ public boolean isEndRequired() { @Override public void onEnding(ReadWriteSpan span) { - for (SpanProcessor spanProcessor : spanProcessorsBeforeEnd) { + for (ExtendedSpanProcessor spanProcessor : spanProcessorsEnding) { spanProcessor.onEnding(span); } } @Override public boolean isOnEndingRequired() { - return !spanProcessorsBeforeEnd.isEmpty(); + return !spanProcessorsEnding.isEmpty(); } @Override @@ -96,13 +97,16 @@ private MultiSpanProcessor(List spanProcessors) { this.spanProcessorsAll = spanProcessors; this.spanProcessorsStart = new ArrayList<>(spanProcessorsAll.size()); this.spanProcessorsEnd = new ArrayList<>(spanProcessorsAll.size()); - this.spanProcessorsBeforeEnd = new ArrayList<>(spanProcessorsAll.size()); + this.spanProcessorsEnding = new ArrayList<>(spanProcessorsAll.size()); for (SpanProcessor spanProcessor : spanProcessorsAll) { if (spanProcessor.isStartRequired()) { spanProcessorsStart.add(spanProcessor); } - if (spanProcessor.isOnEndingRequired()) { - spanProcessorsBeforeEnd.add(spanProcessor); + if (spanProcessor instanceof ExtendedSpanProcessor) { + ExtendedSpanProcessor extendedSpanProcessor = (ExtendedSpanProcessor) spanProcessor; + if (extendedSpanProcessor.isOnEndingRequired()) { + spanProcessorsEnding.add(extendedSpanProcessor); + } } if (spanProcessor.isEndRequired()) { spanProcessorsEnd.add(spanProcessor); @@ -115,6 +119,8 @@ public String toString() { return "MultiSpanProcessor{" + "spanProcessorsStart=" + spanProcessorsStart + + ", spanProcessorsEnding=" + + spanProcessorsEnding + ", spanProcessorsEnd=" + spanProcessorsEnd + ", spanProcessorsAll=" diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpan.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpan.java index c0b1d6bbfd0..9b7fc09967c 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpan.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpan.java @@ -23,6 +23,7 @@ import io.opentelemetry.sdk.trace.data.LinkData; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.data.StatusData; +import io.opentelemetry.sdk.trace.internal.ExtendedSpanProcessor; import io.opentelemetry.sdk.trace.internal.data.ExceptionEventData; import java.util.ArrayList; import java.util.Collections; @@ -522,8 +523,11 @@ private void endInternal(long endEpochNanos) { spanEndingThread = Thread.currentThread(); hasEnded = EndState.ENDING; } - if (spanProcessor.isOnEndingRequired()) { - spanProcessor.onEnding(this); + if (spanProcessor instanceof ExtendedSpanProcessor) { + ExtendedSpanProcessor extendedSpanProcessor = (ExtendedSpanProcessor) spanProcessor; + if (extendedSpanProcessor.isOnEndingRequired()) { + extendedSpanProcessor.onEnding(this); + } } synchronized (lock) { hasEnded = EndState.ENDED; diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SpanProcessor.java index bb7b692ae2c..ba9fb037c84 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SpanProcessor.java @@ -85,32 +85,6 @@ static SpanProcessor composite(Iterable processors) { */ boolean isEndRequired(); - /** - * Called just before a {@link io.opentelemetry.api.trace.Span} is ended, if the {@link - * Span#isRecording()} returns true. This means that the span will still be mutable. Note that the - * span will only be modifiable synchronously from this callback, concurrent modifications from - * other threads will be prevented. - * - *

This method is called synchronously on the execution thread, should not throw or block the - * execution thread. - * - *

Note: This method is experimental and might be subject to future changes. - * - * @param span the {@code Span} that is just about to be ended. - */ - default void onEnding(ReadWriteSpan span) {} - - /** - * Returns {@code true} if this {@link SpanProcessor} requires onEnding events. - * - *

Note: This method is experimental and might be subject to future changes. - * - * @return {@code true} if this {@link SpanProcessor} requires onEnding events. - */ - default boolean isOnEndingRequired() { - return false; - } - /** * Processes all span events that have not yet been processed and closes used resources. * diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/internal/ExtendedSpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/internal/ExtendedSpanProcessor.java new file mode 100644 index 00000000000..0466bee571d --- /dev/null +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/internal/ExtendedSpanProcessor.java @@ -0,0 +1,44 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.trace.internal; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.sdk.trace.ReadWriteSpan; +import io.opentelemetry.sdk.trace.SpanProcessor; + +/** + * A specialization of {@link SpanProcessor} providing more extension points. + * + *

Note that this interface is considered experimental and therefore should only be used at the + * risk of its methods being changed or removed. If it stabilized, the interface is likely removed + * and the methods are promoted to {@link SpanProcessor}. + */ +public interface ExtendedSpanProcessor extends SpanProcessor { + + /** + * Called just before a {@link io.opentelemetry.api.trace.Span} is ended, if the {@link + * Span#isRecording()} returns true. This means that the span will still be mutable. Note that the + * span will only be modifiable synchronously from this callback, concurrent modifications from + * other threads will be prevented. + * + *

This method is called synchronously on the execution thread, should not throw or block the + * execution thread. + * + *

Note: This method is experimental and might be subject to future changes. + * + * @param span the {@code Span} that is just about to be ended. + */ + void onEnding(ReadWriteSpan span); + + /** + * Returns {@code true} if this {@link SpanProcessor} requires onEnding events. + * + *

Note: This method is experimental and might be subject to future changes. + * + * @return {@code true} if this {@link SpanProcessor} requires onEnding events. + */ + boolean isOnEndingRequired(); +} diff --git a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/MultiSpanProcessorTest.java b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/MultiSpanProcessorTest.java index aaeab095d2d..e813e624890 100644 --- a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/MultiSpanProcessorTest.java +++ b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/MultiSpanProcessorTest.java @@ -14,6 +14,7 @@ import io.opentelemetry.context.Context; import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.trace.internal.ExtendedSpanProcessor; import java.util.Arrays; import java.util.Collections; import org.junit.jupiter.api.BeforeEach; @@ -27,8 +28,8 @@ @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) class MultiSpanProcessorTest { - @Mock private SpanProcessor spanProcessor1; - @Mock private SpanProcessor spanProcessor2; + @Mock private ExtendedSpanProcessor spanProcessor1; + @Mock private ExtendedSpanProcessor spanProcessor2; @Mock private ReadableSpan readableSpan; @Mock private ReadWriteSpan readWriteSpan; @@ -63,8 +64,9 @@ void oneSpanProcessor() { @Test void twoSpanProcessor() { - SpanProcessor multiSpanProcessor = - SpanProcessor.composite(Arrays.asList(spanProcessor1, spanProcessor2)); + ExtendedSpanProcessor multiSpanProcessor = + (ExtendedSpanProcessor) + SpanProcessor.composite(Arrays.asList(spanProcessor1, spanProcessor2)); multiSpanProcessor.onStart(Context.root(), readWriteSpan); verify(spanProcessor1).onStart(same(Context.root()), same(readWriteSpan)); verify(spanProcessor2).onStart(same(Context.root()), same(readWriteSpan)); @@ -91,8 +93,9 @@ void twoSpanProcessor_DifferentRequirements() { when(spanProcessor1.isEndRequired()).thenReturn(false); when(spanProcessor2.isOnEndingRequired()).thenReturn(false); when(spanProcessor2.isStartRequired()).thenReturn(false); - SpanProcessor multiSpanProcessor = - SpanProcessor.composite(Arrays.asList(spanProcessor1, spanProcessor2)); + ExtendedSpanProcessor multiSpanProcessor = + (ExtendedSpanProcessor) + SpanProcessor.composite(Arrays.asList(spanProcessor1, spanProcessor2)); assertThat(multiSpanProcessor.isStartRequired()).isTrue(); assertThat(multiSpanProcessor.isEndRequired()).isTrue(); diff --git a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/SdkSpanTest.java b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/SdkSpanTest.java index 54a03db1db8..7814074269b 100644 --- a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/SdkSpanTest.java +++ b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/SdkSpanTest.java @@ -44,6 +44,7 @@ import io.opentelemetry.sdk.trace.data.LinkData; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.data.StatusData; +import io.opentelemetry.sdk.trace.internal.ExtendedSpanProcessor; import io.opentelemetry.sdk.trace.internal.data.ExceptionEventData; import java.io.PrintWriter; import java.io.StringWriter; @@ -94,7 +95,7 @@ class SdkSpanTest { private final Map attributes = new HashMap<>(); private Attributes expectedAttributes; private final LinkData link = LinkData.create(spanContext); - @Mock private SpanProcessor spanProcessor; + @Mock private ExtendedSpanProcessor spanProcessor; private TestClock testClock; From 40f1cef347ac8562399163de656446ea75b5d4df Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Tue, 20 Aug 2024 13:35:39 +0200 Subject: [PATCH 07/14] Fixed string representation test --- .../java/io/opentelemetry/sdk/trace/MultiSpanProcessorTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/MultiSpanProcessorTest.java b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/MultiSpanProcessorTest.java index e813e624890..34ac6e1c03d 100644 --- a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/MultiSpanProcessorTest.java +++ b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/MultiSpanProcessorTest.java @@ -131,6 +131,7 @@ void stringRepresentation() { .hasToString( "MultiSpanProcessor{" + "spanProcessorsStart=[spanProcessor1, spanProcessor1], " + + "spanProcessorsEnding=[spanProcessor1, spanProcessor1], " + "spanProcessorsEnd=[spanProcessor1, spanProcessor1], " + "spanProcessorsAll=[spanProcessor1, spanProcessor1]}"); } From 2f686cf4cbbde99a31f451f348aa0844d0c53e12 Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Wed, 21 Aug 2024 10:21:49 +0200 Subject: [PATCH 08/14] Apply suggestions from code review Co-authored-by: jack-berg <34418638+jack-berg@users.noreply.github.com> --- sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpan.java | 2 +- .../opentelemetry/sdk/trace/internal/ExtendedSpanProcessor.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpan.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpan.java index 9b7fc09967c..f3490f085b9 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpan.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpan.java @@ -109,7 +109,7 @@ private enum EndState { * The thread on which {@link #end()} is called and which will be invoking the {@link * SpanProcessor}s. This field is used to ensure that only this thread may modify the span while * it is in state {@link EndState#ENDING} to prevent concurrent updates outside of {@link - * SpanProcessor#onEnding(ReadWriteSpan)}. + * ExtendedSpanProcessor#onEnding(ReadWriteSpan)}. */ @GuardedBy("lock") @Nullable diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/internal/ExtendedSpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/internal/ExtendedSpanProcessor.java index 0466bee571d..9b5d6c38f1c 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/internal/ExtendedSpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/internal/ExtendedSpanProcessor.java @@ -10,7 +10,7 @@ import io.opentelemetry.sdk.trace.SpanProcessor; /** - * A specialization of {@link SpanProcessor} providing more extension points. + * Extended {@link SpanProcessor} with experimental APIs. * *

Note that this interface is considered experimental and therefore should only be used at the * risk of its methods being changed or removed. If it stabilized, the interface is likely removed From 300b44ccaf8f756a8b47d32cb8bca93b8569d42b Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Wed, 21 Aug 2024 10:30:16 +0200 Subject: [PATCH 09/14] Review fixes --- .../trace/internal/ExtendedSpanProcessor.java | 22 ++++++++----------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/internal/ExtendedSpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/internal/ExtendedSpanProcessor.java index 9b5d6c38f1c..c426655671d 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/internal/ExtendedSpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/internal/ExtendedSpanProcessor.java @@ -5,30 +5,28 @@ package io.opentelemetry.sdk.trace.internal; -import io.opentelemetry.api.trace.Span; import io.opentelemetry.sdk.trace.ReadWriteSpan; +import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SpanProcessor; /** - * Extended {@link SpanProcessor} with experimental APIs. + * Extended {@link SpanProcessor} with experimental APIs. * - *

Note that this interface is considered experimental and therefore should only be used at the - * risk of its methods being changed or removed. If it stabilized, the interface is likely removed - * and the methods are promoted to {@link SpanProcessor}. + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. */ public interface ExtendedSpanProcessor extends SpanProcessor { /** - * Called just before a {@link io.opentelemetry.api.trace.Span} is ended, if the {@link - * Span#isRecording()} returns true. This means that the span will still be mutable. Note that the - * span will only be modifiable synchronously from this callback, concurrent modifications from - * other threads will be prevented. + * Called when a {@link io.opentelemetry.api.trace.Span} is ended, but before {@link + * SpanProcessor#onEnd(ReadableSpan)} is invoked with an immutable variant of this span. This + * means that the span will still be mutable. Note that the span will only be modifiable + * synchronously from this callback, concurrent modifications from other threads will be + * prevented. Only called if the {@link * Span#isRecording()} returns true. * *

This method is called synchronously on the execution thread, should not throw or block the * execution thread. * - *

Note: This method is experimental and might be subject to future changes. - * * @param span the {@code Span} that is just about to be ended. */ void onEnding(ReadWriteSpan span); @@ -36,8 +34,6 @@ public interface ExtendedSpanProcessor extends SpanProcessor { /** * Returns {@code true} if this {@link SpanProcessor} requires onEnding events. * - *

Note: This method is experimental and might be subject to future changes. - * * @return {@code true} if this {@link SpanProcessor} requires onEnding events. */ boolean isOnEndingRequired(); From c3b878438d2a4a94b95d1df7c485def2b6eb8c48 Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Wed, 21 Aug 2024 12:22:52 +0200 Subject: [PATCH 10/14] Fix javadoc --- .../sdk/trace/internal/ExtendedSpanProcessor.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/internal/ExtendedSpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/internal/ExtendedSpanProcessor.java index c426655671d..5c608bf8275 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/internal/ExtendedSpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/internal/ExtendedSpanProcessor.java @@ -5,6 +5,7 @@ package io.opentelemetry.sdk.trace.internal; +import io.opentelemetry.api.trace.Span; import io.opentelemetry.sdk.trace.ReadWriteSpan; import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SpanProcessor; @@ -22,7 +23,7 @@ public interface ExtendedSpanProcessor extends SpanProcessor { * SpanProcessor#onEnd(ReadableSpan)} is invoked with an immutable variant of this span. This * means that the span will still be mutable. Note that the span will only be modifiable * synchronously from this callback, concurrent modifications from other threads will be - * prevented. Only called if the {@link * Span#isRecording()} returns true. + * prevented. Only called if {@link Span#isRecording()} returns true. * *

This method is called synchronously on the execution thread, should not throw or block the * execution thread. From 8240cab6514daaf47009bd7d56578020e3797051 Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Fri, 30 Aug 2024 09:59:09 +0200 Subject: [PATCH 11/14] Added test case to demonstrate usage --- .../ExtendedSpanProcessorUsageTest.java | 85 +++++++++++++++++++ 1 file changed, 85 insertions(+) create mode 100644 sdk/trace/src/test/java/io/opentelemetry/sdk/trace/internal/ExtendedSpanProcessorUsageTest.java diff --git a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/internal/ExtendedSpanProcessorUsageTest.java b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/internal/ExtendedSpanProcessorUsageTest.java new file mode 100644 index 00000000000..ac7aabf69fa --- /dev/null +++ b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/internal/ExtendedSpanProcessorUsageTest.java @@ -0,0 +1,85 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.trace.internal; + +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.trace.ReadWriteSpan; +import io.opentelemetry.sdk.trace.ReadableSpan; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import org.junit.jupiter.api.Test; + +/** Demonstrating usage of {@link ExtendedSpanProcessor}. */ +public class ExtendedSpanProcessorUsageTest { + + private static final AttributeKey FOO_KEY = AttributeKey.stringKey("foo"); + private static final AttributeKey BAR_KEY = AttributeKey.stringKey("bar"); + + private static class CopyFooToBarProcessor implements ExtendedSpanProcessor { + + @Override + public void onStart(Context parentContext, ReadWriteSpan span) {} + + @Override + public boolean isStartRequired() { + return false; + } + + @Override + public void onEnd(ReadableSpan span) {} + + @Override + public boolean isEndRequired() { + return false; + } + + @Override + public void onEnding(ReadWriteSpan span) { + String val = span.getAttribute(FOO_KEY); + span.setAttribute(BAR_KEY, val); + } + + @Override + public boolean isOnEndingRequired() { + return true; + } + } + + @Test + public void testExtendedSpanProcessorUsage() { + InMemorySpanExporter exporter = InMemorySpanExporter.create(); + + try (SdkTracerProvider tracerProvider = + SdkTracerProvider.builder() + .addSpanProcessor(SimpleSpanProcessor.create(exporter)) + .addSpanProcessor(new CopyFooToBarProcessor()) + .build()) { + + Tracer tracer = tracerProvider.get("dummy-tracer"); + Span span = tracer.spanBuilder("my-span").startSpan(); + + span.setAttribute(FOO_KEY, "Hello!"); + + span.end(); + + assertThat(exporter.getFinishedSpanItems()) + .hasSize(1) + .first() + .satisfies( + spanData -> { + assertThat(spanData.getAttributes()) + .containsEntry(FOO_KEY, "Hello!") + .containsEntry(BAR_KEY, "Hello!"); + }); + } + } +} From 4aa663c0e67a740ca17c0ba9093cfc65c5980782 Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Mon, 2 Sep 2024 09:22:43 +0200 Subject: [PATCH 12/14] Apply suggestions from code review Co-authored-by: jack-berg <34418638+jack-berg@users.noreply.github.com> --- .../sdk/trace/internal/ExtendedSpanProcessorUsageTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/internal/ExtendedSpanProcessorUsageTest.java b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/internal/ExtendedSpanProcessorUsageTest.java index ac7aabf69fa..7f5d928fb31 100644 --- a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/internal/ExtendedSpanProcessorUsageTest.java +++ b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/internal/ExtendedSpanProcessorUsageTest.java @@ -19,7 +19,7 @@ import org.junit.jupiter.api.Test; /** Demonstrating usage of {@link ExtendedSpanProcessor}. */ -public class ExtendedSpanProcessorUsageTest { +class ExtendedSpanProcessorUsageTest { private static final AttributeKey FOO_KEY = AttributeKey.stringKey("foo"); private static final AttributeKey BAR_KEY = AttributeKey.stringKey("bar"); @@ -55,7 +55,7 @@ public boolean isOnEndingRequired() { } @Test - public void testExtendedSpanProcessorUsage() { + public void extendedSpanProcessorUsage() { InMemorySpanExporter exporter = InMemorySpanExporter.create(); try (SdkTracerProvider tracerProvider = From fc67a4d82243997334ff13f872947a133425a822 Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Mon, 2 Sep 2024 09:25:44 +0200 Subject: [PATCH 13/14] Make test method package private --- .../sdk/trace/internal/ExtendedSpanProcessorUsageTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/internal/ExtendedSpanProcessorUsageTest.java b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/internal/ExtendedSpanProcessorUsageTest.java index 7f5d928fb31..3185aa9a1df 100644 --- a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/internal/ExtendedSpanProcessorUsageTest.java +++ b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/internal/ExtendedSpanProcessorUsageTest.java @@ -55,7 +55,7 @@ public boolean isOnEndingRequired() { } @Test - public void extendedSpanProcessorUsage() { + void extendedSpanProcessorUsage() { InMemorySpanExporter exporter = InMemorySpanExporter.create(); try (SdkTracerProvider tracerProvider = From 80aa0592849cd008070413d93c83ef51bd0dc3bd Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Mon, 2 Sep 2024 09:26:05 +0200 Subject: [PATCH 14/14] Invert isSpanUnmodifiableByCurrentThread --- .../java/io/opentelemetry/sdk/trace/SdkSpan.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpan.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpan.java index f3490f085b9..1580b05c465 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpan.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpan.java @@ -319,7 +319,7 @@ public ReadWriteSpan setAttribute(AttributeKey key, T value) { return this; } synchronized (lock) { - if (isSpanUnmodifiableByCurrentThread()) { + if (!isModifiableByCurrentThread()) { logger.log(Level.FINE, "Calling setAttribute() on an ended Span."); return this; } @@ -335,9 +335,9 @@ public ReadWriteSpan setAttribute(AttributeKey key, T value) { } @GuardedBy("lock") - private boolean isSpanUnmodifiableByCurrentThread() { - return hasEnded == EndState.ENDED - || (hasEnded == EndState.ENDING && Thread.currentThread() != spanEndingThread); + private boolean isModifiableByCurrentThread() { + return hasEnded == EndState.NOT_ENDED + || (hasEnded == EndState.ENDING && Thread.currentThread() == spanEndingThread); } @Override @@ -402,7 +402,7 @@ public ReadWriteSpan addEvent(String name, Attributes attributes, long timestamp private void addTimedEvent(EventData timedEvent) { synchronized (lock) { - if (isSpanUnmodifiableByCurrentThread()) { + if (!isModifiableByCurrentThread()) { logger.log(Level.FINE, "Calling addEvent() on an ended Span."); return; } @@ -422,7 +422,7 @@ public ReadWriteSpan setStatus(StatusCode statusCode, @Nullable String descripti return this; } synchronized (lock) { - if (isSpanUnmodifiableByCurrentThread()) { + if (!isModifiableByCurrentThread()) { logger.log(Level.FINE, "Calling setStatus() on an ended Span."); return this; } else if (this.status.getStatusCode() == StatusCode.OK) { @@ -460,7 +460,7 @@ public ReadWriteSpan updateName(String name) { return this; } synchronized (lock) { - if (isSpanUnmodifiableByCurrentThread()) { + if (!isModifiableByCurrentThread()) { logger.log(Level.FINE, "Calling updateName() on an ended Span."); return this; } @@ -485,7 +485,7 @@ public Span addLink(SpanContext spanContext, Attributes attributes) { spanLimits.getMaxNumberOfAttributesPerLink(), spanLimits.getMaxAttributeValueLength())); synchronized (lock) { - if (isSpanUnmodifiableByCurrentThread()) { + if (!isModifiableByCurrentThread()) { logger.log(Level.FINE, "Calling addLink() on an ended Span."); return this; }