Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added SpanProcessor OnEnding callback #6367

Merged
merged 18 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.internal.ExtendedSpanProcessor;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
Expand All @@ -16,8 +17,9 @@
* Implementation of the {@code SpanProcessor} that simply forwards all received events to a list of
* {@code SpanProcessor}s.
*/
final class MultiSpanProcessor implements SpanProcessor {
final class MultiSpanProcessor implements ExtendedSpanProcessor {
private final List<SpanProcessor> spanProcessorsStart;
private final List<ExtendedSpanProcessor> spanProcessorsEnding;
private final List<SpanProcessor> spanProcessorsEnd;
private final List<SpanProcessor> spanProcessorsAll;
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
Expand Down Expand Up @@ -58,6 +60,18 @@ public boolean isEndRequired() {
return !spanProcessorsEnd.isEmpty();
}

@Override
public void onEnding(ReadWriteSpan span) {
for (ExtendedSpanProcessor spanProcessor : spanProcessorsEnding) {
spanProcessor.onEnding(span);
}
}

@Override
public boolean isOnEndingRequired() {
return !spanProcessorsEnding.isEmpty();
}

@Override
public CompletableResultCode shutdown() {
if (isShutdown.getAndSet(true)) {
Expand All @@ -83,10 +97,17 @@ private MultiSpanProcessor(List<SpanProcessor> spanProcessors) {
this.spanProcessorsAll = spanProcessors;
this.spanProcessorsStart = new ArrayList<>(spanProcessorsAll.size());
this.spanProcessorsEnd = new ArrayList<>(spanProcessorsAll.size());
this.spanProcessorsEnding = new ArrayList<>(spanProcessorsAll.size());
for (SpanProcessor spanProcessor : spanProcessorsAll) {
if (spanProcessor.isStartRequired()) {
spanProcessorsStart.add(spanProcessor);
}
if (spanProcessor instanceof ExtendedSpanProcessor) {
ExtendedSpanProcessor extendedSpanProcessor = (ExtendedSpanProcessor) spanProcessor;
if (extendedSpanProcessor.isOnEndingRequired()) {
spanProcessorsEnding.add(extendedSpanProcessor);
}
}
if (spanProcessor.isEndRequired()) {
spanProcessorsEnd.add(spanProcessor);
}
Expand All @@ -98,6 +119,8 @@ public String toString() {
return "MultiSpanProcessor{"
+ "spanProcessorsStart="
+ spanProcessorsStart
+ ", spanProcessorsEnding="
+ spanProcessorsEnding
+ ", spanProcessorsEnd="
+ spanProcessorsEnd
+ ", spanProcessorsAll="
Expand Down
66 changes: 49 additions & 17 deletions sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpan.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.data.StatusData;
import io.opentelemetry.sdk.trace.internal.ExtendedSpanProcessor;
import io.opentelemetry.sdk.trace.internal.data.ExceptionEventData;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -95,9 +96,24 @@ final class SdkSpan implements ReadWriteSpan {
@GuardedBy("lock")
private long endEpochNanos;

// True if the span is ended.
private enum EndState {
NOT_ENDED,
ENDING,
ENDED
}

@GuardedBy("lock")
private boolean hasEnded;
private EndState hasEnded;

/**
* The thread on which {@link #end()} is called and which will be invoking the {@link
* SpanProcessor}s. This field is used to ensure that only this thread may modify the span while
* it is in state {@link EndState#ENDING} to prevent concurrent updates outside of {@link
* ExtendedSpanProcessor#onEnding(ReadWriteSpan)}.
*/
@GuardedBy("lock")
@Nullable
private Thread spanEndingThread;

private SdkSpan(
SpanContext context,
Expand All @@ -122,7 +138,7 @@ private SdkSpan(
this.kind = kind;
this.spanProcessor = spanProcessor;
this.resource = resource;
this.hasEnded = false;
this.hasEnded = EndState.NOT_ENDED;
this.clock = clock;
this.startEpochNanos = startEpochNanos;
this.attributes = attributes;
Expand Down Expand Up @@ -220,7 +236,7 @@ public SpanData toSpanData() {
status,
name,
endEpochNanos,
hasEnded);
hasEnded == EndState.ENDED);
}
}

Expand All @@ -242,7 +258,7 @@ public Attributes getAttributes() {
@Override
public boolean hasEnded() {
synchronized (lock) {
return hasEnded;
return hasEnded == EndState.ENDED;
}
}

Expand Down Expand Up @@ -288,7 +304,7 @@ public InstrumentationScopeInfo getInstrumentationScopeInfo() {
@Override
public long getLatencyNanos() {
synchronized (lock) {
return (hasEnded ? endEpochNanos : clock.now()) - startEpochNanos;
return (hasEnded == EndState.NOT_ENDED ? clock.now() : endEpochNanos) - startEpochNanos;
}
}

Expand All @@ -303,7 +319,7 @@ public <T> ReadWriteSpan setAttribute(AttributeKey<T> key, T value) {
return this;
}
synchronized (lock) {
if (hasEnded) {
if (isSpanUnmodifiableByCurrentThread()) {
logger.log(Level.FINE, "Calling setAttribute() on an ended Span.");
return this;
}
Expand All @@ -318,6 +334,12 @@ public <T> ReadWriteSpan setAttribute(AttributeKey<T> key, T value) {
return this;
}

@GuardedBy("lock")
private boolean isSpanUnmodifiableByCurrentThread() {
return hasEnded == EndState.ENDED
|| (hasEnded == EndState.ENDING && Thread.currentThread() != spanEndingThread);
}
JonasKunz marked this conversation as resolved.
Show resolved Hide resolved

@Override
public ReadWriteSpan addEvent(String name) {
if (name == null) {
Expand Down Expand Up @@ -380,7 +402,7 @@ public ReadWriteSpan addEvent(String name, Attributes attributes, long timestamp

private void addTimedEvent(EventData timedEvent) {
synchronized (lock) {
if (hasEnded) {
if (isSpanUnmodifiableByCurrentThread()) {
logger.log(Level.FINE, "Calling addEvent() on an ended Span.");
return;
}
Expand All @@ -400,7 +422,7 @@ public ReadWriteSpan setStatus(StatusCode statusCode, @Nullable String descripti
return this;
}
synchronized (lock) {
if (hasEnded) {
if (isSpanUnmodifiableByCurrentThread()) {
logger.log(Level.FINE, "Calling setStatus() on an ended Span.");
return this;
} else if (this.status.getStatusCode() == StatusCode.OK) {
Expand Down Expand Up @@ -438,7 +460,7 @@ public ReadWriteSpan updateName(String name) {
return this;
}
synchronized (lock) {
if (hasEnded) {
if (isSpanUnmodifiableByCurrentThread()) {
logger.log(Level.FINE, "Calling updateName() on an ended Span.");
return this;
}
Expand All @@ -463,7 +485,7 @@ public Span addLink(SpanContext spanContext, Attributes attributes) {
spanLimits.getMaxNumberOfAttributesPerLink(),
spanLimits.getMaxAttributeValueLength()));
synchronized (lock) {
if (hasEnded) {
if (isSpanUnmodifiableByCurrentThread()) {
logger.log(Level.FINE, "Calling addLink() on an ended Span.");
return this;
}
Expand Down Expand Up @@ -493,12 +515,22 @@ public void end(long timestamp, TimeUnit unit) {

private void endInternal(long endEpochNanos) {
synchronized (lock) {
if (hasEnded) {
logger.log(Level.FINE, "Calling end() on an ended Span.");
if (hasEnded != EndState.NOT_ENDED) {
logger.log(Level.FINE, "Calling end() on an ended or ending Span.");
return;
}
this.endEpochNanos = endEpochNanos;
hasEnded = true;
spanEndingThread = Thread.currentThread();
JonasKunz marked this conversation as resolved.
Show resolved Hide resolved
hasEnded = EndState.ENDING;
}
if (spanProcessor instanceof ExtendedSpanProcessor) {
ExtendedSpanProcessor extendedSpanProcessor = (ExtendedSpanProcessor) spanProcessor;
if (extendedSpanProcessor.isOnEndingRequired()) {
extendedSpanProcessor.onEnding(this);
}
}
synchronized (lock) {
hasEnded = EndState.ENDED;
}
if (spanProcessor.isEndRequired()) {
spanProcessor.onEnd(this);
Expand All @@ -508,7 +540,7 @@ private void endInternal(long endEpochNanos) {
@Override
public boolean isRecording() {
synchronized (lock) {
return !hasEnded;
return hasEnded != EndState.ENDED;
}
}

Expand All @@ -533,7 +565,7 @@ private List<EventData> getImmutableTimedEvents() {

// if the span has ended, then the events are unmodifiable
// so we can return them directly and save copying all the data.
if (hasEnded) {
if (hasEnded == EndState.ENDED) {
return Collections.unmodifiableList(events);
}

Expand All @@ -547,7 +579,7 @@ private Attributes getImmutableAttributes() {
}
// if the span has ended, then the attributes are unmodifiable,
// so we can return them directly and save copying all the data.
if (hasEnded) {
if (hasEnded == EndState.ENDED) {
return attributes;
}
// otherwise, make a copy of the data into an immutable container.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.trace.internal;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.sdk.trace.ReadWriteSpan;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.SpanProcessor;

/**
* Extended {@link SpanProcessor} with experimental APIs.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public interface ExtendedSpanProcessor extends SpanProcessor {

/**
* Called when a {@link io.opentelemetry.api.trace.Span} is ended, but before {@link
* SpanProcessor#onEnd(ReadableSpan)} is invoked with an immutable variant of this span. This
* means that the span will still be mutable. Note that the span will only be modifiable
* synchronously from this callback, concurrent modifications from other threads will be
* prevented. Only called if {@link Span#isRecording()} returns true.
*
* <p>This method is called synchronously on the execution thread, should not throw or block the
* execution thread.
*
* @param span the {@code Span} that is just about to be ended.
*/
void onEnding(ReadWriteSpan span);

/**
* Returns {@code true} if this {@link SpanProcessor} requires onEnding events.
*
* @return {@code true} if this {@link SpanProcessor} requires onEnding events.
*/
boolean isOnEndingRequired();
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.internal.ExtendedSpanProcessor;
import java.util.Arrays;
import java.util.Collections;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -27,18 +28,20 @@
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
class MultiSpanProcessorTest {
@Mock private SpanProcessor spanProcessor1;
@Mock private SpanProcessor spanProcessor2;
@Mock private ExtendedSpanProcessor spanProcessor1;
@Mock private ExtendedSpanProcessor spanProcessor2;
@Mock private ReadableSpan readableSpan;
@Mock private ReadWriteSpan readWriteSpan;

@BeforeEach
void setUp() {
when(spanProcessor1.isStartRequired()).thenReturn(true);
when(spanProcessor1.isOnEndingRequired()).thenReturn(true);
when(spanProcessor1.isEndRequired()).thenReturn(true);
when(spanProcessor1.forceFlush()).thenReturn(CompletableResultCode.ofSuccess());
when(spanProcessor1.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
when(spanProcessor2.isStartRequired()).thenReturn(true);
when(spanProcessor2.isOnEndingRequired()).thenReturn(true);
when(spanProcessor2.isEndRequired()).thenReturn(true);
when(spanProcessor2.forceFlush()).thenReturn(CompletableResultCode.ofSuccess());
when(spanProcessor2.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
Expand All @@ -61,12 +64,17 @@ void oneSpanProcessor() {

@Test
void twoSpanProcessor() {
SpanProcessor multiSpanProcessor =
SpanProcessor.composite(Arrays.asList(spanProcessor1, spanProcessor2));
ExtendedSpanProcessor multiSpanProcessor =
(ExtendedSpanProcessor)
SpanProcessor.composite(Arrays.asList(spanProcessor1, spanProcessor2));
multiSpanProcessor.onStart(Context.root(), readWriteSpan);
verify(spanProcessor1).onStart(same(Context.root()), same(readWriteSpan));
verify(spanProcessor2).onStart(same(Context.root()), same(readWriteSpan));

multiSpanProcessor.onEnding(readWriteSpan);
verify(spanProcessor1).onEnding(same(readWriteSpan));
verify(spanProcessor2).onEnding(same(readWriteSpan));

multiSpanProcessor.onEnd(readableSpan);
verify(spanProcessor1).onEnd(same(readableSpan));
verify(spanProcessor2).onEnd(same(readableSpan));
Expand All @@ -83,9 +91,11 @@ void twoSpanProcessor() {
@Test
void twoSpanProcessor_DifferentRequirements() {
when(spanProcessor1.isEndRequired()).thenReturn(false);
when(spanProcessor2.isOnEndingRequired()).thenReturn(false);
when(spanProcessor2.isStartRequired()).thenReturn(false);
SpanProcessor multiSpanProcessor =
SpanProcessor.composite(Arrays.asList(spanProcessor1, spanProcessor2));
ExtendedSpanProcessor multiSpanProcessor =
(ExtendedSpanProcessor)
SpanProcessor.composite(Arrays.asList(spanProcessor1, spanProcessor2));

assertThat(multiSpanProcessor.isStartRequired()).isTrue();
assertThat(multiSpanProcessor.isEndRequired()).isTrue();
Expand All @@ -94,6 +104,10 @@ void twoSpanProcessor_DifferentRequirements() {
verify(spanProcessor1).onStart(same(Context.root()), same(readWriteSpan));
verify(spanProcessor2, times(0)).onStart(any(Context.class), any(ReadWriteSpan.class));

multiSpanProcessor.onEnding(readWriteSpan);
verify(spanProcessor1).onEnding(same(readWriteSpan));
verify(spanProcessor2, times(0)).onEnding(any(ReadWriteSpan.class));

multiSpanProcessor.onEnd(readableSpan);
verify(spanProcessor1, times(0)).onEnd(any(ReadableSpan.class));
verify(spanProcessor2).onEnd(same(readableSpan));
Expand All @@ -117,6 +131,7 @@ void stringRepresentation() {
.hasToString(
"MultiSpanProcessor{"
+ "spanProcessorsStart=[spanProcessor1, spanProcessor1], "
+ "spanProcessorsEnding=[spanProcessor1, spanProcessor1], "
+ "spanProcessorsEnd=[spanProcessor1, spanProcessor1], "
+ "spanProcessorsAll=[spanProcessor1, spanProcessor1]}");
}
Expand Down
Loading
Loading