From 03812c7a888769711f1b4d41c5aba540f03229cf Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Wed, 17 Apr 2024 14:54:36 +0200 Subject: [PATCH 1/2] Extended onEnd to allow for explicit calling of next span processor --- .../sdk/trace/MultiSpanProcessor.java | 26 ++++++++++++++++--- .../sdk/trace/SpanProcessor.java | 6 +++++ .../sdk/trace/MultiSpanProcessorTest.java | 3 +++ 3 files changed, 32 insertions(+), 3 deletions(-) diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/MultiSpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/MultiSpanProcessor.java index 705351e83f5..2ff75dca441 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/MultiSpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/MultiSpanProcessor.java @@ -11,6 +11,8 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; +import java.util.function.Consumer; /** * Implementation of the {@code SpanProcessor} that simply forwards all received events to a list of @@ -19,6 +21,13 @@ final class MultiSpanProcessor implements SpanProcessor { private final List spanProcessorsStart; private final List spanProcessorsEnd; + + /** + * Will invoke {@link SpanProcessor#onEnd(ReadableSpan, Consumer)} of all processors from {@link #spanProcessorsEnd} + * in order. The output from the first processor is passed to the second, the output form the second to the third ans so on. + * The output of the last processor is passed to the {@link Consumer} provided as second argument to this biconsumer. + */ + private BiConsumer> processorsEndInvoker; private final List spanProcessorsAll; private final AtomicBoolean isShutdown = new AtomicBoolean(false); @@ -46,11 +55,15 @@ public boolean isStartRequired() { return !spanProcessorsStart.isEmpty(); } + + @Override + public void onEnd(ReadableSpan span, Consumer spanOutput) { + processorsEndInvoker.accept(span, spanOutput); + } + @Override public void onEnd(ReadableSpan readableSpan) { - for (SpanProcessor spanProcessor : spanProcessorsEnd) { - spanProcessor.onEnd(readableSpan); - } + onEnd(readableSpan, span -> {}); } @Override @@ -91,6 +104,13 @@ private MultiSpanProcessor(List spanProcessors) { spanProcessorsEnd.add(spanProcessor); } } + processorsEndInvoker = (span, drain) -> drain.accept(span); + for (int i=spanProcessorsEnd.size() - 1; i>=0; i--) { + BiConsumer> nextStage = processorsEndInvoker; + SpanProcessor processor = spanProcessorsEnd.get(i); + processorsEndInvoker = (span, finalOutput) -> + processor.onEnd(span, outputSpan -> nextStage.accept(outputSpan, finalOutput)); + } } @Override diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SpanProcessor.java index ba9fb037c84..b23a8fa5d7d 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SpanProcessor.java @@ -13,6 +13,7 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import javax.annotation.concurrent.ThreadSafe; /** @@ -67,6 +68,11 @@ static SpanProcessor composite(Iterable processors) { */ boolean isStartRequired(); + default void onEnd(ReadableSpan span, Consumer spanOutput) { + onEnd(span); + spanOutput.accept(span); + } + /** * Called when a {@link io.opentelemetry.api.trace.Span} is ended, if the {@link * Span#isRecording()} returns true. diff --git a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/MultiSpanProcessorTest.java b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/MultiSpanProcessorTest.java index c51fe61c954..ae24c9554af 100644 --- a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/MultiSpanProcessorTest.java +++ b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/MultiSpanProcessorTest.java @@ -8,6 +8,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -36,8 +37,10 @@ class MultiSpanProcessorTest { void setUp() { when(spanProcessor1.isStartRequired()).thenReturn(true); when(spanProcessor1.isEndRequired()).thenReturn(true); + doCallRealMethod().when(spanProcessor1).onEnd(any(), any()); when(spanProcessor1.forceFlush()).thenReturn(CompletableResultCode.ofSuccess()); when(spanProcessor1.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + doCallRealMethod().when(spanProcessor2).onEnd(any(), any()); when(spanProcessor2.isStartRequired()).thenReturn(true); when(spanProcessor2.isEndRequired()).thenReturn(true); when(spanProcessor2.forceFlush()).thenReturn(CompletableResultCode.ofSuccess()); From 99cccc25fdd57592cfe4d441050f44db1cf9be8e Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Wed, 17 Apr 2024 15:00:24 +0200 Subject: [PATCH 2/2] Typos and spotless --- .../sdk/trace/MultiSpanProcessor.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/MultiSpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/MultiSpanProcessor.java index 2ff75dca441..136fb67db52 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/MultiSpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/MultiSpanProcessor.java @@ -23,11 +23,13 @@ final class MultiSpanProcessor implements SpanProcessor { private final List spanProcessorsEnd; /** - * Will invoke {@link SpanProcessor#onEnd(ReadableSpan, Consumer)} of all processors from {@link #spanProcessorsEnd} - * in order. The output from the first processor is passed to the second, the output form the second to the third ans so on. - * The output of the last processor is passed to the {@link Consumer} provided as second argument to this biconsumer. + * Will invoke {@link SpanProcessor#onEnd(ReadableSpan, Consumer)} of all processors from {@link + * #spanProcessorsEnd} in order. The output from the first processor is passed to the second, the + * output from the second to the third and so on. The output of the last processor is passed to + * the {@link Consumer} provided as second argument to this biconsumer. */ private BiConsumer> processorsEndInvoker; + private final List spanProcessorsAll; private final AtomicBoolean isShutdown = new AtomicBoolean(false); @@ -55,7 +57,6 @@ public boolean isStartRequired() { return !spanProcessorsStart.isEmpty(); } - @Override public void onEnd(ReadableSpan span, Consumer spanOutput) { processorsEndInvoker.accept(span, spanOutput); @@ -105,11 +106,12 @@ private MultiSpanProcessor(List spanProcessors) { } } processorsEndInvoker = (span, drain) -> drain.accept(span); - for (int i=spanProcessorsEnd.size() - 1; i>=0; i--) { + for (int i = spanProcessorsEnd.size() - 1; i >= 0; i--) { BiConsumer> nextStage = processorsEndInvoker; SpanProcessor processor = spanProcessorsEnd.get(i); - processorsEndInvoker = (span, finalOutput) -> - processor.onEnd(span, outputSpan -> nextStage.accept(outputSpan, finalOutput)); + processorsEndInvoker = + (span, finalOutput) -> + processor.onEnd(span, outputSpan -> nextStage.accept(outputSpan, finalOutput)); } }