Skip to content

Commit

Permalink
Simplification of #11804 for Reactive Stream specification support
Browse files Browse the repository at this point in the history
Simplified #11804
 + removed LastWillSubscription, LastWill and FinalSignal
 + introduced Suppressed and Cancelled Throwables
 + updated notes
  • Loading branch information
gregw committed May 28, 2024
1 parent 46dc7b3 commit c216b00
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
package org.eclipse.jetty.io.content;

import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -72,7 +70,8 @@ private void onSubscribe(Flow.Subscriber<? super Content.Chunk> subscriber, Cont
content.fail(error);
throw error;
}
LastWillSubscription subscription = new ActiveSubscription(content, subscriber);

ActiveSubscription subscription = new ActiveSubscription(content, subscriber);
// As per rule 1.9, this method must return normally (i.e. not throw).
try
{
Expand All @@ -82,7 +81,7 @@ private void onSubscribe(Flow.Subscriber<? super Content.Chunk> subscriber, Cont
{
// As per rule 2.13, we MUST consider subscription cancelled and
// MUST raise this error condition in a fashion that is adequate for the runtime environment.
subscription.cancel(err, LastWillSubscription.FinalSignal.SUPPRESS);
subscription.cancel(new Suppressed(err));
LOG.error("Flow.Subscriber " + subscriber + " violated rule 2.13", err);
}
}
Expand All @@ -92,11 +91,9 @@ private void onMultiSubscribe(Flow.Subscriber<? super Content.Chunk> subscriber)
// As per rule 1.9, we need to throw a `java.lang.NullPointerException`
// if the `Subscriber` is `null`
if (subscriber == null)
{
throw new NullPointerException("Flow.Subscriber must not be null");
}
LastWillSubscription subscription = new ExhaustedSubscription();

ExhaustedSubscription subscription = new ExhaustedSubscription();
// As per 1.9, this method must return normally (i.e. not throw).
try
{
Expand All @@ -109,76 +106,11 @@ private void onMultiSubscribe(Flow.Subscriber<? super Content.Chunk> subscriber)
{
// As per rule 2.13, we MUST consider subscription cancelled and
// MUST raise this error condition in a fashion that is adequate for the runtime environment.
subscription.cancel(err, LastWillSubscription.FinalSignal.SUPPRESS);
LOG.error("Flow.Subscriber " + subscriber + " violated rule 2.13", err);
}
}

// Subscription that have a reason of the cancellation and information about
// final signal (last will and testament pattern) that must be reported to the Subscriber.
// Relates to https://github.com/reactive-streams/reactive-streams-jvm/issues/271
private interface LastWillSubscription extends Flow.Subscription
{
@Override
default void cancel()
{
cancel(new CancellationException("Subscription was cancelled manually"), FinalSignal.SUPPRESS);
}

default void cancel(Throwable reason, FinalSignal finalSignal)
{
cancel(new LastWill(reason, finalSignal));
}

void cancel(LastWill lastWill);

record LastWill(Throwable reason, FinalSignal finalSignal)
{
public LastWill
{
Objects.requireNonNull(reason, "Last will reason must not be null");
Objects.requireNonNull(finalSignal, "Last will final signal must not be null");
}
}

enum FinalSignal
{
COMPLETE, ERROR, SUPPRESS
}

// Publisher
// 1.6 If a Publisher signals either onError or onComplete on a Subscriber,
// that Subscriber’s Subscription MUST be considered cancelled.
// 2.4 Subscriber.onComplete() and Subscriber.onError(Throwable t) MUST consider the
// Subscription cancelled after having received the signal.
//
// Publisher failed -> FinalSignal.ERROR
// 1.4 If a Publisher fails it MUST signal an onError.
//
// Publisher succeeded -> FinalSignal.COMPLETE
// 1.5 If a Publisher terminates successfully (finite stream) it MUST signal an onComplete.

// Subscriber
// 2.13 In the case that this rule is violated, any associated Subscription to the Subscriber
// MUST be considered as cancelled, and the caller MUST raise this error condition in a
// fashion that is adequate for the runtime environment.
//
// Subscriber.onSubscribe/onNext/onError/onComplete failed -> FinalSignal.SUPPRESS

// Subscription
//
// Subscription.cancel -> FinalSignal.SUPPRESS
// It's not clearly specified in the specification, but according to:
// - the issue: https://github.com/reactive-streams/reactive-streams-jvm/issues/458
// - TCK test 'untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals'
// - 1.8 If a Subscription is cancelled its Subscriber MUST eventually stop being signaled.
//
// Subscription.request with negative argument -> FinalSignal.ERROR
// 3.9 While the Subscription is not cancelled, Subscription.request(long n) MUST signal onError with a
// java.lang.IllegalArgumentException if the argument is <= 0.
}

private static final class ExhaustedSubscription implements LastWillSubscription
private static final class ExhaustedSubscription implements Flow.Subscription
{
@Override
public void request(long n)
Expand All @@ -187,17 +119,17 @@ public void request(long n)
}

@Override
public void cancel(LastWill lastWill)
public void cancel()
{
// As per rules 3.6 and 3.7, after the Subscription is cancelled all operations MUST be NOPs.
}
}

private static final class ActiveSubscription extends IteratingCallback implements LastWillSubscription
private static final class ActiveSubscription extends IteratingCallback implements Flow.Subscription
{
private static final long NO_MORE_DEMAND = -1;
private static final Throwable COMPLETED = new StaticException("Source.Content read fully");
private final AtomicReference<LastWill> cancelled;
private final AtomicReference<Throwable> cancelled;
private final AtomicLong demand;
private Content.Source content;
private Flow.Subscriber<? super Content.Chunk> subscriber;
Expand All @@ -221,25 +153,21 @@ public ActiveSubscription(Content.Source content, Flow.Subscriber<? super Conten
@Override
protected Action process()
{
LastWill cancelled = this.cancelled.get();
Throwable cancelled = this.cancelled.get();
if (cancelled != null)
{
Throwable reason = cancelled.reason;
FinalSignal finalSignal = cancelled.finalSignal;
// As per rule 3.13, Subscription.cancel() MUST request the Publisher to eventually
// drop any references to the corresponding subscriber.
this.demand.set(NO_MORE_DEMAND);
// TODO: HttpChannelState does not satisfy the contract of Content.Source "If read() has returned a last chunk, this is a no operation."
if (finalSignal != FinalSignal.COMPLETE)
this.content.fail(reason);
if (cancelled != COMPLETED)
this.content.fail(cancelled);
this.content = null;
try
{
if (finalSignal == FinalSignal.COMPLETE)
if (cancelled == COMPLETED)
this.subscriber.onComplete();
if (finalSignal == FinalSignal.ERROR)
this.subscriber.onError(reason);
// do nothing on FinalSignal.SUPPRESS
else if (!(cancelled instanceof Suppressed))
this.subscriber.onError(cancelled);
}
catch (Throwable err)
{
Expand All @@ -259,7 +187,7 @@ protected Action process()

if (Content.Chunk.isFailure(chunk))
{
cancel(chunk.getFailure(), FinalSignal.ERROR);
cancel(chunk.getFailure());
chunk.release();
return Action.IDLE;
}
Expand All @@ -270,14 +198,14 @@ protected Action process()
}
catch (Throwable err)
{
cancel(err, FinalSignal.SUPPRESS);
cancel(new Suppressed(err));
LOG.error("Flow.Subscriber " + subscriber + " violated rule 2.13", err);
}
chunk.release();

if (chunk.isLast())
{
cancel(COMPLETED, FinalSignal.COMPLETE);
cancel(COMPLETED);
return Action.IDLE;
}

Expand All @@ -298,7 +226,7 @@ public void request(long n)
if (n <= 0L)
{
String errorMsg = "Flow.Subscriber " + subscriber + " violated rule 3.9: non-positive requests are not allowed.";
cancel(new IllegalArgumentException(errorMsg), FinalSignal.ERROR);
cancel(new IllegalArgumentException(errorMsg));
return;
}

Expand All @@ -309,15 +237,73 @@ public void request(long n)
}

@Override
public void cancel(LastWill lastWill)
public void cancel()
{
cancel(new Cancelled());
}

public void cancel(Throwable cause)
{
// As per rules 3.6 and 3.7, after the Subscription is cancelled all operations MUST be NOPs.
//
// As per rule 3.5, this handles cancellation requests, and is idempotent, thread-safe and not
// synchronously performing heavy computations
if (!cancelled.compareAndSet(null, lastWill))
if (!cancelled.compareAndSet(null, cause))
return;
this.iterate();
}

// Publisher notes
//
// 1.6 If a Publisher signals either onError or onComplete on a Subscriber,
// that Subscriber’s Subscription MUST be considered cancelled.
// 2.4 Subscriber.onComplete() and Subscriber.onError(Throwable t) MUST consider the
// Subscription cancelled after having received the signal.
//
// Publisher failed -> cancel(Throwable)
// 1.4 If a Publisher fails it MUST signal an onError.
//
// Publisher succeeded -> cancel(COMPLETED)
// 1.5 If a Publisher terminates successfully (finite stream) it MUST signal an onComplete.

// Subscriber
// 2.13 In the case that this rule is violated, any associated Subscription to the Subscriber
// MUST be considered as cancelled, and the caller MUST raise this error condition in a
// fashion that is adequate for the runtime environment.
//
// Subscriber.onSubscribe/onNext/onError/onComplete failed -> cancel(new Suppressed(cause))

// Subscription notes
//
// Subscription.cancel -> cancel(new Cancelled())
// It's not clearly specified in the specification, but according to:
// - the issue: https://github.com/reactive-streams/reactive-streams-jvm/issues/458
// - TCK test 'untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals'
// - 1.8 If a Subscription is cancelled its Subscriber MUST eventually stop being signaled.
//
// Subscription.request with negative argument -> cancel(err)
// 3.9 While the Subscription is not cancelled, Subscription.request(long n) MUST signal onError with a
// java.lang.IllegalArgumentException if the argument is <= 0.
}

private static class Suppressed extends Throwable
{
Suppressed(String message)
{
super(message);
}

Suppressed(Throwable cause)
{
super(cause.getMessage(), cause);
}
}

private static class Cancelled extends Suppressed
{
Cancelled()
{
super("Subscription was cancelled");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public Content.Chunk read()
@Override
public void demand(Runnable demandCallback)
{
// TODO: recursive stack overflow
// recursive stack overflow not necessary for this test
demandCallback.run();
}

Expand Down

0 comments on commit c216b00

Please sign in to comment.