diff --git a/source/concurrency/asyncscope.d b/source/concurrency/asyncscope.d index 6e1e25e..1adcf7b 100644 --- a/source/concurrency/asyncscope.d +++ b/source/concurrency/asyncscope.d @@ -10,8 +10,9 @@ private enum Flag { } auto asyncScope() @safe { + import concurrency.sender : Promise; // ensure NRVO - auto as = shared AsyncScope(new shared StopSource()); + auto as = shared AsyncScope(new shared Promise!void); return as; } @@ -66,9 +67,8 @@ public: cleanup.syncWait(); } - this(shared StopSource stopSource) @safe shared { - completion = new shared Promise!void; - this.stopSource = stopSource; + this(shared Promise!void completion) @safe shared { + this.completion = completion; } auto cleanup() @safe shared { @@ -93,7 +93,7 @@ public: return stopSource.stop(); } - bool spawn(Sender)(Sender s) shared @trusted { + bool spawn(Sender)(Sender s) @trusted shared { import concurrency.sender : connectHeap; with (flag.update(0, Flag.tick)) { if ((oldState & Flag.stopped) == 1) { @@ -133,7 +133,7 @@ struct AsyncScopeReceiver { auto getStopToken() nothrow @safe { import concurrency.stoptoken : StopToken; - return StopToken(s.stopSource); + return s.stopSource.token(); } auto getScheduler() nothrow @safe { diff --git a/source/concurrency/data/queue/mpsc.d b/source/concurrency/data/queue/mpsc.d index c303720..e2caf56 100644 --- a/source/concurrency/data/queue/mpsc.d +++ b/source/concurrency/data/queue/mpsc.d @@ -2,7 +2,7 @@ module concurrency.data.queue.mpsc; struct MPSCQueueProducer(Node) { private MPSCQueue!(Node) q; - void push(Node* node) shared @trusted { + void push(Node* node) @trusted shared { (cast() q).push(node); } } diff --git a/source/concurrency/data/queue/waitable.d b/source/concurrency/data/queue/waitable.d index f2026dd..57f6ad2 100644 --- a/source/concurrency/data/queue/waitable.d +++ b/source/concurrency/data/queue/waitable.d @@ -61,7 +61,7 @@ struct WaitableQueueProducer(Q) { private shared Q q; private shared Semaphore sema; - bool push(Q.ElementType t) shared @trusted { + bool push(Q.ElementType t) @trusted shared { bool r = (cast() q).push(t); if (r) (cast() sema).notify(); diff --git a/source/concurrency/executor.d b/source/concurrency/executor.d index 98c66fb..8546e5d 100644 --- a/source/concurrency/executor.d +++ b/source/concurrency/executor.d @@ -1,7 +1,7 @@ module concurrency.executor; alias VoidFunction = void function() @safe; -alias VoidDelegate = void delegate() shared @safe; +alias VoidDelegate = void delegate() @safe shared; interface Executor { void execute(VoidFunction fn) @safe; diff --git a/source/concurrency/fork.d b/source/concurrency/fork.d index 1ea9320..f9a3577 100644 --- a/source/concurrency/fork.d +++ b/source/concurrency/fork.d @@ -63,7 +63,8 @@ version(Posix) if (afterFork) afterFork(pid); - auto cb = token.onStop(() @trusted shared nothrow => + shared StopCallback cb; + cb.register(token, () @trusted shared nothrow => cast(void) kill(pid, SIGINT)); int status; auto ret = waitpid(pid, &status, 0); @@ -102,7 +103,7 @@ version(Posix) import concurrency.utils : closure; executeInNewThread( - cast(void delegate() shared @safe) &this.run); + cast(void delegate() @safe shared) &this.run); } } diff --git a/source/concurrency/nursery.d b/source/concurrency/nursery.d index fbbc25e..7b9dadc 100644 --- a/source/concurrency/nursery.d +++ b/source/concurrency/nursery.d @@ -1,8 +1,8 @@ module concurrency.nursery; -import concurrency.stoptoken : StopSource, StopToken, StopCallback, onStop; +import concurrency.stoptoken : StopSource, StopToken, StopCallback; import concurrency.thread : LocalThreadExecutor; -import concurrency.receiver : getStopToken; +// import concurrency.receiver : getStopToken; import concurrency.scheduler : SchedulerObjectBase; import std.typecons : Nullable; @@ -14,7 +14,7 @@ import std.typecons : Nullable; /// When cancellation happens all Senders are waited on for completion. /// Senders can be added to the Nursery at any time. /// Senders are only started when the Nursery itself is being awaited on. -class Nursery : StopSource { +class Nursery { import concurrency.sender : isSender, OperationalStateBase; import core.sync.mutex : Mutex; import concepts; @@ -22,6 +22,7 @@ class Nursery : StopSource { alias Value = void; private { + shared StopSource stopSource; Node[] operations; struct Node { OperationalStateBase state; @@ -38,7 +39,7 @@ class Nursery : StopSource { shared size_t counter = 0; Throwable throwable; // first throwable from sender, if any ReceiverObject receiver; - StopCallback stopCallback; + shared StopCallback stopCallback; Nursery assumeThreadSafe() @trusted shared nothrow { return cast(Nursery) this; } @@ -50,8 +51,8 @@ class Nursery : StopSource { with (assumeThreadSafe) mutex = new Mutex(); } - override bool stop() nothrow @trusted { - auto result = super.stop(); + bool stop() nothrow @trusted { + auto result = stopSource.stop(); if (result) (cast(shared) this).done(-1); @@ -59,12 +60,13 @@ class Nursery : StopSource { return result; } - override bool stop() nothrow @trusted shared { + bool stop() nothrow @trusted shared { return (cast(Nursery) this).stop(); } - StopToken getStopToken() nothrow @trusted shared { - return StopToken(cast(Nursery) this); + shared(StopToken) getStopToken() nothrow @safe shared { + return stopSource.token(); + // return shared StopToken(stopSource); } private auto getScheduler() nothrow @trusted shared { @@ -94,9 +96,9 @@ class Nursery : StopSource { if (isDone) { throwable = null; receiver = null; - if (stopCallback) - stopCallback.dispose(); - stopCallback = null; + // if (stopCallback) + stopCallback.dispose(); + // stopCallback = null; } mutex.unlock_nothrow(); @@ -104,7 +106,7 @@ class Nursery : StopSource { if (isDone && localReceiver !is null) { if (localThrowable !is null) { localReceiver.setError(localThrowable); - } else if (isStopRequested()) { + } else if (stopSource.isStopRequested()) { localReceiver.setDone(); } else { try { @@ -122,7 +124,7 @@ class Nursery : StopSource { run(sender.get()); } - void run(Sender)(Sender sender) shared @trusted { + void run(Sender)(Sender sender) @trusted shared { import concepts; static assert(models!(Sender, isSender)); import std.typecons : Nullable; @@ -161,7 +163,7 @@ class Nursery : StopSource { auto connect(Receiver)( return Receiver receiver - ) shared @trusted return scope { + ) @trusted shared return scope { final class ReceiverImpl : ReceiverObject { Receiver receiver; SchedulerObjectBase scheduler; @@ -187,21 +189,24 @@ class Nursery : StopSource { scheduler = receiver.getScheduler().toSchedulerObject(); return scheduler; } + + shared(StopToken) getStopToken() nothrow @safe { + return receiver.getStopToken(); + } } - auto stopToken = receiver.getStopToken(); - auto cb = (() @trusted => stopToken - .onStop(() shared nothrow @trusted => cast(void) this.stop()))(); - return NurseryOp(this, cb, new ReceiverImpl(receiver)); + return NurseryOp(this, new ReceiverImpl(receiver)); } private - void setReceiver(ReceiverObject r, StopCallback cb) nothrow @safe shared { + void setReceiver(ReceiverObject r) nothrow @trusted shared { with (assumeThreadSafe) { mutex.lock_nothrow(); assert(this.receiver is null, "Cannot await a nursery twice."); receiver = r; - stopCallback = cb; + auto dg = () nothrow @safe shared => cast(void) this.stop(); + auto stopToken = r.getStopToken; + stopCallback.register(stopToken, dg); auto ops = operations.dup(); mutex.unlock_nothrow(); @@ -217,6 +222,7 @@ private interface ReceiverObject { void setDone() nothrow @safe; void setError(Throwable e) nothrow @safe; SchedulerObjectBase getScheduler() nothrow @safe; + shared(StopToken) getStopToken() nothrow @safe; } private struct NurseryReceiver(Value) { @@ -228,7 +234,7 @@ private struct NurseryReceiver(Value) { } static if (is(Value == void)) { - void setValue() shared @safe { + void setValue() @safe shared { (cast() this).setDone(); } @@ -236,7 +242,7 @@ private struct NurseryReceiver(Value) { (cast() this).setDone(); } } else { - void setValue(Value val) shared @trusted { + void setValue(Value val) @trusted shared { (cast() this).setDone(); } @@ -264,25 +270,24 @@ private struct NurseryReceiver(Value) { private struct NurseryOp { shared Nursery nursery; - StopCallback cb; ReceiverObject receiver; @disable this(ref return scope typeof(this) rhs); @disable this(this); - this(return shared Nursery n, StopCallback cb, - ReceiverObject r) @safe return scope { + this(shared Nursery n, + ReceiverObject r) @safe { nursery = n; - this.cb = cb; + // this.cb = cb; receiver = r; } void start() nothrow scope @trusted { import core.atomic : atomicLoad; if (nursery.busy.atomicLoad == 0) { - cb.dispose(); + // cb.dispose(); receiver.setDone(); } else - nursery.setReceiver(receiver, cb); + nursery.setReceiver(receiver);//, cb); } } diff --git a/source/concurrency/operations/race.d b/source/concurrency/operations/race.d index 830963b..00b324b 100644 --- a/source/concurrency/operations/race.d +++ b/source/concurrency/operations/race.d @@ -66,32 +66,31 @@ private struct RaceOp(Receiver, Senders...) { this(ref return scope typeof(this) rhs); this(Receiver receiver, return Senders senders, bool noDropouts) @trusted scope { + state = State!R(noDropouts); this.receiver = receiver; - state = new State!(R)(noDropouts); static if (Senders.length > 1) { foreach (i, Sender; Senders) { ops[i] = senders[i].connect( - ElementReceiver!(Sender)(receiver, state, Senders.length)); + ElementReceiver!(Sender)(receiver, &state, Senders.length)); } } else { ops.length = senders[0].length; foreach (i; 0 .. senders[0].length) { ops[i] = senders[0][i].connect( - ElementReceiver(receiver, state, senders[0].length)); + ElementReceiver(receiver, &state, senders[0].length)); } } } void start() @trusted nothrow scope { - import concurrency.stoptoken : StopSource; if (receiver.getStopToken().isStopRequested) { receiver.setDone(); return; } + auto token = receiver.getStopToken(); // butt ugly cast, but it won't take the second overload - state.cb = InPlaceStopCallback(cast(void delegate() nothrow @safe shared) &state.stop); - receiver.getStopToken().onStop(state.cb); + state.cb.register(token, cast(void delegate() nothrow @safe shared) &state.stopSource.stop); static if (Senders.length > 1) { foreach (i, _; Senders) { @@ -122,9 +121,10 @@ struct RaceSender(Senders...) } } -private class State(Value) : StopSource { +private struct State(Value) { import concurrency.bitfield; - InPlaceStopCallback cb; + shared StopSource stopSource; + shared StopCallback cb; shared SharedBitField!Flags bitfield; static if (!is(Value == void)) Value value; @@ -149,10 +149,10 @@ private enum Counter : size_t { private struct RaceReceiver(Receiver, InnerValue, Value) { import core.atomic : atomicOp, atomicLoad, MemoryOrder; Receiver receiver; - State!(Value) state; + State!(Value)* state; size_t senderCount; auto getStopToken() { - return StopToken(state); + return state.stopSource.token(); } private bool isValueProduced(size_t state) { @@ -177,7 +177,7 @@ private struct RaceReceiver(Receiver, InnerValue, Value) { else state.value = Value(value); release(); // must release before calling .stop - state.stop(); + state.stopSource.stop(); } else release(); @@ -191,7 +191,7 @@ private struct RaceReceiver(Receiver, InnerValue, Value) { with (state.bitfield.update(Flags.value_produced, Counter.tick)) { bool last = isLast(newState); if (!isValueProduced(oldState)) { - state.stop(); + state.stopSource.stop(); } if (last) @@ -203,7 +203,7 @@ private struct RaceReceiver(Receiver, InnerValue, Value) { with (state.bitfield.update(Flags.doneOrError_produced, Counter.tick)) { bool last = isLast(newState); if (state.noDropouts && !isDoneOrErrorProduced(oldState)) { - state.stop(); + state.stopSource.stop(); } if (last) @@ -218,7 +218,7 @@ private struct RaceReceiver(Receiver, InnerValue, Value) { state.exception = exception; if (state.noDropouts) { release(); // release before stop - state.stop(); + state.stopSource.stop(); } } diff --git a/source/concurrency/operations/stopon.d b/source/concurrency/operations/stopon.d index 8d979cd..d03eb6b 100644 --- a/source/concurrency/operations/stopon.d +++ b/source/concurrency/operations/stopon.d @@ -7,14 +7,14 @@ import concurrency.stoptoken; import concepts; import std.traits; -auto stopOn(Sender)(Sender sender, StopToken stopToken) { +auto stopOn(Sender)(Sender sender, shared StopToken stopToken) { return StopOn!(Sender)(sender, stopToken); } private struct StopOnReceiver(Receiver, Value) { private { Receiver receiver; - StopToken stopToken; + shared StopToken stopToken; } static if (is(Value == void)) { @@ -35,7 +35,7 @@ private struct StopOnReceiver(Receiver, Value) { receiver.setError(e); } - auto getStopToken() nothrow @trusted { + auto getStopToken() nothrow @safe { return stopToken; } @@ -46,7 +46,7 @@ struct StopOn(Sender) if (models!(Sender, isSender)) { static assert(models!(typeof(this), isSender)); alias Value = Sender.Value; Sender sender; - StopToken stopToken; + shared StopToken stopToken; auto connect(Receiver)(return Receiver receiver) @safe return scope { alias R = StopOnReceiver!(Receiver, Sender.Value); // ensure NRVO diff --git a/source/concurrency/operations/stopwhen.d b/source/concurrency/operations/stopwhen.d index 85fca0c..7e738fa 100644 --- a/source/concurrency/operations/stopwhen.d +++ b/source/concurrency/operations/stopwhen.d @@ -29,11 +29,10 @@ private struct StopWhenOp(Receiver, Sender, Trigger) { this(Receiver receiver, return Sender source, return Trigger trigger) @trusted scope { this.receiver = receiver; - state = new State!(Sender.Value)(); sourceOp = source - .connect(SourceReceiver!(Receiver, Sender.Value)(receiver, state)); + .connect(SourceReceiver!(Receiver, Sender.Value)(receiver, &state)); triggerOp = trigger - .connect(TriggerReceiver!(Receiver, Sender.Value)(receiver, state)); + .connect(TriggerReceiver!(Receiver, Sender.Value)(receiver, &state)); } void start() @trusted nothrow scope { @@ -42,9 +41,9 @@ private struct StopWhenOp(Receiver, Sender, Trigger) { return; } + auto token = receiver.getStopToken(); // butt ugly cast, but it won't take the second overload - state.cb = InPlaceStopCallback(cast(void delegate() nothrow @safe shared) &state.stop); - receiver.getStopToken().onStop(state.cb); + state.cb.register(token, cast(void delegate() nothrow @safe shared) &state.stopSource.stop); sourceOp.start; triggerOp.start; @@ -65,9 +64,11 @@ struct StopWhenSender(Sender, Trigger) } } -private class State(Value) : StopSource { +// refactor to use StopSource +private struct State(Value) { import concurrency.bitfield; - InPlaceStopCallback cb; + shared StopSource stopSource; + shared StopCallback cb; shared SharedBitField!Flags bitfield; static if (!is(Value == void)) Value value; @@ -117,15 +118,15 @@ private bool isLast(size_t state) @safe nothrow pure { private struct TriggerReceiver(Receiver, Value) { Receiver receiver; - State!(Value) state; + State!(Value)* state; auto getStopToken() { - return StopToken(state); + return state.stopSource.token(); } void setValue() @safe nothrow { with (state.bitfield.update(Flags.tick)) { if (!isLast(oldState)) - state.stop(); + state.stopSource.stop(); else state.process(receiver, newState); } @@ -134,7 +135,7 @@ private struct TriggerReceiver(Receiver, Value) { void setDone() @safe nothrow { with (state.bitfield.update(Flags.doneOrError_produced, Flags.tick)) { if (!isLast(oldState)) - state.stop(); + state.stopSource.stop(); else state.process(receiver, newState); } @@ -146,7 +147,7 @@ private struct TriggerReceiver(Receiver, Value) { if (!isDoneOrErrorProduced(oldState)) { state.exception = exception; release(); // release before stop - state.stop(); + state.stopSource.stop(); } else { release(); if (last) @@ -161,9 +162,9 @@ private struct TriggerReceiver(Receiver, Value) { private struct SourceReceiver(Receiver, Value) { import core.atomic : atomicOp, atomicLoad, MemoryOrder; Receiver receiver; - State!(Value) state; + State!(Value)* state; auto getStopToken() { - return StopToken(state); + return state.stopSource.token(); } static if (!is(Value == void)) @@ -174,7 +175,7 @@ private struct SourceReceiver(Receiver, Value) { release(); if (!last) - state.stop(); + state.stopSource.stop(); else if (isDoneOrErrorProduced(oldState)) state.process(receiver, oldState); else @@ -187,7 +188,7 @@ private struct SourceReceiver(Receiver, Value) { with (state.bitfield.update(Flags.value_produced | Flags.tick)) { bool last = isLast(oldState); if (!last) - state.stop(); + state.stopSource.stop(); else if (isDoneOrErrorProduced(oldState)) state.process(receiver, oldState); else @@ -199,7 +200,7 @@ private struct SourceReceiver(Receiver, Value) { with (state.bitfield.update(Flags.doneOrError_produced | Flags.tick)) { bool last = isLast(oldState); if (!last) - state.stop(); + state.stopSource.stop(); else state.process(receiver, newState); } @@ -214,7 +215,7 @@ private struct SourceReceiver(Receiver, Value) { release(); if (!last) - state.stop(); + state.stopSource.stop(); else state.process(receiver, newState); } diff --git a/source/concurrency/operations/toshared.d b/source/concurrency/operations/toshared.d index 2c56656..ce9141f 100644 --- a/source/concurrency/operations/toshared.d +++ b/source/concurrency/operations/toshared.d @@ -82,7 +82,7 @@ class SharedSender(Sender, Scheduler, ResetLogic resetLogic) if ((newState >> 2) == 0) { auto localStopSource = state.inst; release(); // release early - localStopSource.stop(); + localStopSource.stopSource.stop(); return false; } else { auto localReceiver = state.inst; @@ -145,14 +145,15 @@ struct SharedSenderOp(Sender, Scheduler, ResetLogic resetLogic, Receiver) { alias Props = Properties!(Sender); SharedSender!(Sender, Scheduler, resetLogic) parent; Receiver receiver; - StopCallback cb; + shared StopCallback cb; @disable this(ref return scope typeof(this) rhs); @disable this(this); void start() nothrow @trusted scope { parent.add(&(cast(shared) this).onValue); - cb = receiver.getStopToken.onStop(&(cast(shared) this).onStop); + auto stopToken = receiver.getStopToken; + cb.register(stopToken, &(cast(shared) this).onStop); } void onStop() nothrow @trusted shared { @@ -223,8 +224,8 @@ private struct SharedSenderReceiver(Sender, Scheduler, ResetLogic resetLogic) { state.process!(resetLogic)(v); } - StopToken getStopToken() @trusted nothrow { - return StopToken(state.inst); + shared(StopToken) getStopToken() @safe nothrow { + return state.inst.stopSource.token(); } Scheduler getScheduler() @safe nothrow scope { @@ -255,7 +256,7 @@ private template process(ResetLogic resetLogic) { auto localState = state.inst; InternalValue v = localState.value.match!((typeof(null)) => assert(0, "not happening"), v => v); release(oldState & (~0x3)); // release early and remove all ticks - if (localState.isStopRequested) + if (localState.stopSource.isStopRequested) (() @trusted => v = Done())(); foreach (dg; localState.dgs[]) dg(v); @@ -263,12 +264,13 @@ private template process(ResetLogic resetLogic) { } } -private class SharedSenderInstState(Sender) : StopSource { +private class SharedSenderInstState(Sender) { import concurrency.slist; import std.traits : ReturnType; import std.sumtype; alias Props = Properties!(Sender); shared SList!(Props.DG) dgs; + shared StopSource stopSource; SumType!(typeof(null), Props.InternalValue) value; this() { this.dgs = new shared SList!(Props.DG); diff --git a/source/concurrency/operations/whenall.d b/source/concurrency/operations/whenall.d index c8c60ec..90de34b 100644 --- a/source/concurrency/operations/whenall.d +++ b/source/concurrency/operations/whenall.d @@ -117,12 +117,11 @@ private struct WhenAllOp(Receiver, Senders...) { this(return Receiver receiver, return Senders senders) @trusted scope return { this.receiver = receiver; - state = new WhenAllState!R(); static if (Senders.length > 1) { foreach (i, Sender; Senders) { ops[i] = senders[i].connect( WhenAllReceiver!(Receiver, Sender.Value, - R)(receiver, state, i, Senders.length)); + R)(receiver, &state, i, Senders.length)); } } else { static if (!is(ArrayElement!(Senders).Value : void)) @@ -131,21 +130,20 @@ private struct WhenAllOp(Receiver, Senders...) { foreach (i; 0 .. senders[0].length) { ops[i] = senders[0][i].connect( WhenAllReceiver!(Receiver, ArrayElement!(Senders).Value, - R)(receiver, state, i, senders[0].length)); + R)(receiver, &state, i, senders[0].length)); } } } void start() @trusted nothrow scope { - import concurrency.stoptoken : StopSource; if (receiver.getStopToken().isStopRequested) { receiver.setDone(); return; } + auto token = receiver.getStopToken(); // butt ugly cast, but it won't take the second overload - state.cb = InPlaceStopCallback(cast(void delegate() nothrow @safe shared) &state.stop); - receiver.getStopToken().onStop(state.cb); + state.cb.register(token, cast(void delegate() nothrow @safe shared) &state.stopSource.stop); static if (Senders.length > 1) { foreach (i, _; Senders) { @@ -178,9 +176,10 @@ struct WhenAllSender(Senders...) } } -private class WhenAllState(Value) : StopSource { +private struct WhenAllState(Value) { import concurrency.bitfield; - InPlaceStopCallback cb; + shared StopSource stopSource; + shared StopCallback cb; static if (is(typeof(Value.values))) Value value; Throwable exception; @@ -190,11 +189,11 @@ private class WhenAllState(Value) : StopSource { private struct WhenAllReceiver(Receiver, InnerValue, Value) { import core.atomic : atomicOp, atomicLoad, MemoryOrder; Receiver receiver; - WhenAllState!(Value) state; + WhenAllState!(Value)* state; size_t senderIndex; size_t senderCount; auto getStopToken() { - return StopToken(state); + return state.stopSource.token(); } private bool isValueProduced(size_t state) { @@ -233,7 +232,7 @@ private struct WhenAllReceiver(Receiver, InnerValue, Value) { with (state.bitfield.update(Flags.doneOrError_produced, Counter.tick)) { bool last = isLast(newState); if (!isDoneOrErrorProduced(oldState)) - state.stop(); + state.stopSource.stop(); if (last) process(newState); } @@ -245,7 +244,7 @@ private struct WhenAllReceiver(Receiver, InnerValue, Value) { if (!isDoneOrErrorProduced(oldState)) { state.exception = exception; release(); // must release before calling .stop - state.stop(); + state.stopSource.stop(); } else release(); if (last) diff --git a/source/concurrency/operations/withchild.d b/source/concurrency/operations/withchild.d index 3ee5d38..01aba01 100644 --- a/source/concurrency/operations/withchild.d +++ b/source/concurrency/operations/withchild.d @@ -24,7 +24,7 @@ struct WithChildSender(SenderParent, SenderChild) import concurrency.operations.stopon; // ensure NRVO auto op = - whenAll(b.stopOn(receiver.getStopToken), a).stopOn(StopToken()) + whenAll(b.stopOn(receiver.getStopToken), a).stopOn(shared StopToken()) .connect(receiver); return op; } diff --git a/source/concurrency/operations/withstopsource.d b/source/concurrency/operations/withstopsource.d index 0bb8554..beaa7c8 100644 --- a/source/concurrency/operations/withstopsource.d +++ b/source/concurrency/operations/withstopsource.d @@ -7,18 +7,9 @@ import concurrency.stoptoken; import concepts; import std.traits; -template withStopSource(Sender) { - auto withStopSource(Sender sender, StopSource stopSource) { - return SSSender!(Sender, StopSource)(sender, stopSource); - } - - auto withStopSource(Sender sender, shared StopSource stopSource) @trusted { - return SSSender!(Sender, StopSource)(sender, cast() stopSource); - } - - auto withStopSource(Sender sender, ref shared InPlaceStopSource stopSource) @trusted { - return SSSender!(Sender, shared InPlaceStopSource*)(sender, &stopSource); - } +// TODO: return scope? +auto withStopSource(Sender)(Sender sender, ref shared StopSource stopSource) @trusted { + return SSSender!(Sender)(sender, &stopSource); } private struct SSReceiver(Receiver, Value) { @@ -50,24 +41,26 @@ private struct SSReceiver(Receiver, Value) { receiver.setError(e); } - auto getStopToken() nothrow @trusted scope { - return StopToken(state.combinedStopSource); + auto getStopToken() nothrow @safe scope { + return state.combinedStopSource.token(); } mixin ForwardExtensionPoints!receiver; private void resetStopCallback() { - state.cbs[0].dispose(); - state.cbs[1].dispose(); + state.left.dispose(); + state.right.dispose(); } } struct SSState { - shared InPlaceStopSource combinedStopSource; - InPlaceStopCallback[2] cbs; + shared StopSource combinedStopSource; + shared StopCallback left; + shared StopCallback right; + ~this() @safe scope @nogc nothrow {} } -struct SSOp(Receiver, OuterStopSource, Sender) { +struct SSOp(Receiver, Sender) { SSState state; alias Op = OpType!(Sender, SSReceiver!(Receiver, Sender.Value)); Op op; @@ -76,22 +69,19 @@ struct SSOp(Receiver, OuterStopSource, Sender) { this(ref return scope typeof(this) rhs); @disable this(this); - this(Receiver receiver, OuterStopSource outerStopSource, Sender sender) @trusted { - state.cbs[0] = InPlaceStopCallback(cast(void delegate() shared @safe nothrow)&state.combinedStopSource.stop); - state.cbs[1] = InPlaceStopCallback(cast(void delegate() shared @safe nothrow)&state.combinedStopSource.stop); - - receiver.getStopToken().onStop(state.cbs[0]); - static if (is(OuterStopSource == shared InPlaceStopSource*)) { - onStop(*outerStopSource, state.cbs[1]); - } else { - outerStopSource.onStop(state.cbs[1]); - } + ~this() @trusted scope @nogc nothrow {} + this(Receiver receiver, shared StopSource* outerStopSource, Sender sender) @trusted { + auto token = receiver.getStopToken(); + auto outerToken = outerStopSource.token(); + + state.left.register(token, cast(void delegate() @safe shared nothrow)&state.combinedStopSource.stop); + state.right.register(outerToken, cast(void delegate() @safe shared nothrow)&state.combinedStopSource.stop); try { op = sender.connect(SSReceiver!(Receiver, Sender.Value)(receiver, &state)); } catch (Exception e) { - state.cbs[0].dispose(); - state.cbs[1].dispose(); + state.left.dispose(); + state.right.dispose(); throw e; } } @@ -102,14 +92,14 @@ struct SSOp(Receiver, OuterStopSource, Sender) { } } -struct SSSender(Sender, StopSource) if (models!(Sender, isSender)) { +struct SSSender(Sender) if (models!(Sender, isSender)) { static assert(models!(typeof(this), isSender)); alias Value = Sender.Value; Sender sender; - StopSource stopSource; + shared StopSource* stopSource; auto connect(Receiver)(return Receiver receiver) @trusted return scope { // ensure NRVO - auto op = SSOp!(Receiver, StopSource, Sender)(receiver, stopSource, sender); + auto op = SSOp!(Receiver, Sender)(receiver, stopSource, sender); return op; } } diff --git a/source/concurrency/operations/withstoptoken.d b/source/concurrency/operations/withstoptoken.d index de49c45..09733a6 100644 --- a/source/concurrency/operations/withstoptoken.d +++ b/source/concurrency/operations/withstoptoken.d @@ -8,7 +8,7 @@ import concepts; import std.traits; import concurrency.utils : isThreadSafeFunction; -deprecated("function passed is not shared @safe delegate or @safe function.") +deprecated("function passed is not @safe shared delegate or @safe function.") auto withStopToken(Sender, Fun)(Sender sender, Fun fun) if (!isThreadSafeFunction!Fun) { return STSender!(Sender, Fun)(sender, fun); @@ -41,7 +41,7 @@ private struct STReceiver(Receiver, Value, Fun) { } else { import std.typecons : isTuple; enum isExpandable = isTuple!Value && __traits(compiles, { - fun(StopToken.init, Value.init.expand); + fun(shared(StopToken).init, Value.init.expand); }); void setValue(Value value) @safe { static if (is(ReturnType!Fun == void)) { diff --git a/source/concurrency/receiver.d b/source/concurrency/receiver.d index ced0818..4c7df1a 100644 --- a/source/concurrency/receiver.d +++ b/source/concurrency/receiver.d @@ -17,11 +17,6 @@ void checkReceiver(T)() { enum isReceiver(T) = is(typeof(checkReceiver!T)); -auto getStopToken(Receiver)(Receiver r) nothrow @safe if (isReceiver!Receiver) { - import concurrency.stoptoken : NeverStopToken; - return NeverStopToken(); -} - mixin template ForwardExtensionPoints(alias receiver) { auto getStopToken() nothrow @safe { return receiver.getStopToken(); @@ -43,7 +38,7 @@ interface ReceiverObjectBase(T) { void setValue(T value = T.init) @safe; void setDone() nothrow @safe; void setError(Throwable e) nothrow @safe; - StopToken getStopToken() nothrow @safe; + shared(StopToken) getStopToken() nothrow @safe; SchedulerObjectBase getScheduler() scope nothrow @safe; } diff --git a/source/concurrency/scheduler.d b/source/concurrency/scheduler.d index 1e5157a..d09d20a 100644 --- a/source/concurrency/scheduler.d +++ b/source/concurrency/scheduler.d @@ -23,6 +23,33 @@ interface SchedulerObjectBase { SenderObjectBase!void scheduleAfter(Duration d) @safe; } + +// We can pull the LocalThreadExecutor (and its schedule/scheduleAfter) out into a specialized context. +// Just like we did with the iouring context + +// The interesting bit is that the syncWait algorithm then might be inferred as @nogc + +// The question remains how we would want to integrate these. +// With iouring we created a runner that would take a sender and would inject the scheduler and allow itself to steal the current thread. + +// That last part is important, we don't want to spawn a thread just to run timers, we can do it perfectly fine on the current thread. +// Same with iouring or other event loops. + +// That said, we can, if we want to, move the event loop to another thread. + +// The only thing we can't do is cross schedule timers from one thread to another. +// Well, that is not true, we can create two context objects that expose a Scheduler + + + + + + +// Guess we just have to write it and see.... + +// Dietmar Kuhl used a iocontext with a run function that allows running it on the current thread. +// In rant I had the iocontext's runner return a sender so you could await that. + class SchedulerObject(S) : SchedulerObjectBase { import concurrency.sender : toSenderObject; S scheduler; @@ -50,7 +77,7 @@ enum TimerTrigger { cancel } -alias TimerDelegate = void delegate(TimerTrigger) shared @safe; +alias TimerDelegate = void delegate(TimerTrigger) @safe shared; import concurrency.timingwheels : ListElement; alias Timer = ListElement!(TimerDelegate); @@ -64,6 +91,7 @@ auto localThreadScheduler() { alias LocalThreadScheduler = typeof(localThreadScheduler()); struct SchedulerAdapter(Worker) { + static assert(models!(typeof(this), isScheduler)); import concurrency.receiver : setValueOrError; import concurrency.executor : VoidDelegate; import core.time : Duration; @@ -101,7 +129,7 @@ struct SchedulerAdapter(Worker) { return ScheduleSender(worker); } - auto schedule() shared @trusted { + auto schedule() @trusted shared { return (cast() this).schedule(); } @@ -109,7 +137,7 @@ struct SchedulerAdapter(Worker) { return ScheduleAfterSender!(Worker)(worker, dur); } - auto scheduleAfter(Duration dur) shared @trusted { + auto scheduleAfter(Duration dur) @trusted shared { return (cast() this).scheduleAfter(dur); } } @@ -117,7 +145,7 @@ struct SchedulerAdapter(Worker) { struct ScheduleAfterOp(Worker, Receiver) { import std.traits : ReturnType; import concurrency.bitfield : SharedBitField; - import concurrency.stoptoken : InPlaceStopCallback, onStop; + import concurrency.stoptoken : StopCallback; import concurrency.receiver : setValueOrError; enum Flags { @@ -131,20 +159,21 @@ struct ScheduleAfterOp(Worker, Receiver) { Duration dur; Receiver receiver; Timer timer; - InPlaceStopCallback stopCb; + shared StopCallback stopCb; shared SharedBitField!Flags flags; @disable this(ref return scope typeof(this) rhs); @disable this(this); + // ~this() @safe scope {} void start() @trusted scope nothrow { if (receiver.getStopToken().isStopRequested) { receiver.setDone(); return; } - stopCb = InPlaceStopCallback(cast(void delegate() nothrow @safe shared) &stop); - receiver.getStopToken().onStop(stopCb); + auto token = receiver.getStopToken(); + stopCb.register(token, cast(void delegate() nothrow @safe shared) &stop); try { timer.userdata = cast(void delegate(TimerTrigger) @safe shared) &trigger; diff --git a/source/concurrency/sender.d b/source/concurrency/sender.d index a3ff2b5..36e4bf7 100644 --- a/source/concurrency/sender.d +++ b/source/concurrency/sender.d @@ -68,8 +68,8 @@ void checkSender(T)() @safe { void setError(Throwable e) @safe nothrow {} - StopToken getStopToken() @safe nothrow { - return StopToken.init; + shared(StopToken) getStopToken() @safe nothrow { + return shared(StopToken).init; } Scheduler getScheduler() @safe nothrow { @@ -235,7 +235,7 @@ interface SenderObjectBase(T) { receiver.setError(e); } - StopToken getStopToken() nothrow { + shared(StopToken) getStopToken() nothrow { return receiver.getStopToken(); } @@ -449,9 +449,9 @@ struct PromiseSenderOp(T, Receiver) { alias InternalValue = Sender.InternalValue; shared Sender parent; Receiver receiver; - StopCallback cb; + shared StopCallback cb; shared SharedBitField!Flags bitfield; - @disable + @disable this(ref return scope typeof(this) rhs); @disable this(this); @@ -480,8 +480,10 @@ struct PromiseSenderOp(T, Receiver) { bool triggeredInline = parent.add(&(cast(shared) this).onValue); // if triggeredInline there is no point in setting up the stop callback - if (!triggeredInline) - cb = receiver.getStopToken.onStop(&(cast(shared) this).onStop); + if (!triggeredInline) { + auto stopToken = receiver.getStopToken; + cb.register(stopToken, &(cast(shared) this).onStop); + } with (bitfield.add(Flags.setup)) { if (has(Flags.stop)) { @@ -523,8 +525,7 @@ struct PromiseSenderOp(T, Receiver) { // being executed. // This means that when it completes we are the only one // calling the receiver's termination functions. - if (cb) - cb.dispose(); + cb.dispose(); value.match!((Sender.ValueRep v) { try { static if (is(Sender.Value == void)) @@ -645,7 +646,7 @@ class Promise(T) { this.dgs = new shared SList!DG; } - auto sender() shared @safe nothrow { + auto sender() @safe shared nothrow { return shared PromiseSender!T(this); } } diff --git a/source/concurrency/signal.d b/source/concurrency/signal.d index 672774d..bfc4e21 100644 --- a/source/concurrency/signal.d +++ b/source/concurrency/signal.d @@ -2,7 +2,7 @@ module concurrency.signal; import concurrency.stoptoken; -ref shared(InPlaceStopSource) globalStopSource() @trusted { +ref shared(StopSource) globalStopSource() @trusted { import core.atomic : atomicLoad, cas; if (globalSourcePtr.atomicLoad is null) { @@ -13,7 +13,7 @@ ref shared(InPlaceStopSource) globalStopSource() @trusted { return *globalSourcePtr; } - shared InPlaceStopSource* empty = null; + shared StopSource* empty = null; if (ptr.cas(empty, &globalSource)) { setupCtrlCHandler(globalSource); globalSourcePtr = &globalSource; @@ -24,19 +24,8 @@ ref shared(InPlaceStopSource) globalStopSource() @trusted { return *globalSourcePtr; } -/// Returns true if first to set (otherwise it is ignored) -bool setGlobalStopSource(ref shared InPlaceStopSource stopSource) @safe { - import core.atomic : cas; - auto ptr = getGlobalStopSourcePointer(); - shared InPlaceStopSource* expected; - if (!ptr.cas(expected, &stopSource)) - return false; - globalSource = stopSource; - return true; -} - /// Sets the stopSource to be called when receiving an interrupt -void setupCtrlCHandler(ref shared InPlaceStopSource stopSource) @trusted { +void setupCtrlCHandler(ref shared StopSource stopSource) @trusted { import core.atomic; auto old = atomicExchange(&SignalHandler.signalStopSource, &stopSource); @@ -68,19 +57,19 @@ void setupCtrlCHandler(ref shared InPlaceStopSource stopSource) @trusted { } } -private static shared InPlaceStopSource* globalSourcePtr; -private static shared InPlaceStopSource globalSource; +private static shared StopSource* globalSourcePtr; +private static shared StopSource globalSource; // we export this function so that dynamic libraries can load it to access // the host's globalStopSource pointer. // Otherwise they would access their own local instance. // should not be called directly by usercode, instead use `globalStopSource`. export extern(C) -shared (InPlaceStopSource*)* concurrency_globalStopSourcePointer() @safe @nogc { +shared (StopSource*)* concurrency_globalStopSourcePointer() @safe @nogc { return &globalSourcePtr; } -private shared (InPlaceStopSource*)* getGlobalStopSourcePointer() @safe @nogc { +private shared (StopSource*)* getGlobalStopSourcePointer() @safe @nogc { import concurrency.utils : dynamicLoad; return dynamicLoad!concurrency_globalStopSourcePointer()(); } @@ -155,7 +144,7 @@ struct SignalHandler { SignalHandler.notify(ABORT); } - private static shared InPlaceStopSource* signalStopSource; + private static shared StopSource* signalStopSource; private static shared Thread handlerThread; private static void launchHandlerThread() { if (handlerThread.atomicLoad !is null) diff --git a/source/concurrency/stoptoken.d b/source/concurrency/stoptoken.d index f6e92ee..119587e 100644 --- a/source/concurrency/stoptoken.d +++ b/source/concurrency/stoptoken.d @@ -1,259 +1,201 @@ module concurrency.stoptoken; -// originally this code is from https://github.com/josuttis/jthread by Nicolai Josuttis -// it is licensed under the Creative Commons Attribution 4.0 Internation License http://creativecommons.org/licenses/by/4.0 - -struct InPlaceStopSource { - @disable this(ref return scope typeof(this) rhs); - private shared StopState state; - bool stop() nothrow @safe shared { - return state.request_stop(); - } +// Cancellation is an important concept when running asynchronous tasks. +// When an asynchronous operation is no longer needed it is desirable to +// cancel it to avoid doing unnecessary work, potentionally freeing system +// resources and maintaining responsiveness in the process. - bool stop() nothrow @safe { - with (assumeSafeShared) { - return stop(); - } - } +// There are several reasons why an asynchronous operation might become +// irrelevant, including: - bool isStopRequested() nothrow @safe @nogc shared { - return state.is_stop_requested(); - } +// - An error in one part of an application might render the results of +// other ongoing asynchronous tasks irrelevant. - bool isStopRequested() nothrow @safe @nogc { - with (assumeSafeShared) { - return isStopRequested(); - } - } +// - Clients or end-users might want to abort an earlier request they made. - /// resets the internal state, only do this if you are sure nothing else is looking at this... - void reset(this t)() @system @nogc shared { - this.state = StopState(); - } +// - Outstanding work might need to be cancelled due to stale data. - private ref assumeSafeShared() @trusted @nogc nothrow { - return cast(shared)this; - } +// - Work might become irrelevant due to errors in external systems. - void assertNoCallbacks() @safe shared { - state.assertNoCallbacks(); - } +// - As part of control flow in an asynchronous algorithm. Similar to how +// you might have an `break` or early `return` in a synchronous piece of +// code. - void assertNoCallbacks() @safe { - with (assumeSafeShared) - assertNoCallbacks(); - } -} +// This module implements cooperative cancellation by providing a StopSource, +// StopToken and StopCallback. -class StopSource { - package(concurrency) shared InPlaceStopSource source; +// - the StopSource represents the source of a cancellation request. +// - the StopToken is used to determine if a cancellation request has been +// initiated. +// - the StopCallback is invoked when a cancellation request has been +// initiated. - bool stop() nothrow @safe { - return source.stop(); - } +// Implementation details: - bool stop() nothrow @trusted shared { - return source.stop(); - } +// Allocations can have a non-neglible overhead when running many short-lived +// asynchronous tasks. For this reason these objects are designed to be used +// in-place on the stack. - bool isStopRequested() nothrow @safe @nogc { - return source.isStopRequested; - } +// To ensure program safety and correctness we need to uphold that a +// `StopSource` outlives any associated `StopToken` or `StopCallback`, +// otherwise we might open ourselves to use-after-free errors. - bool isStopRequested() nothrow @trusted @nogc shared { - return source.isStopRequested; - } +// We achieve that by: - /// resets the internal state, only do this if you are sure nothing else is looking at this... - void reset(this t)() @system @nogc { - return (cast(shared)source).reset(); - } +// - having a StopCallback deregister itself from a StopSource on destruction +// - having a StopSource deregister any StopCallbacks still registered on +// destruction +// - relying on `scope` StopTokens to ensure they don't outlive the +// StopSource +// - disabling copy and assignment constructors for the StopSource and +// StopToken +// - relying on dip1000 - void assertNoCallbacks() @safe shared { - source.assertNoCallbacks(); - } -} +/// A StopSource represents the source of a cancellation request. You can think +/// of it as the entity that initiates the request for cancellation. +/// Once a stop is requested, it cannot be withdrawn. Additional stop requests +/// have no effect. +struct StopSource { + @disable this(ref return scope StopSource rhs) @safe; + @disable this(ref return scope shared StopSource rhs) shared @safe; + public shared StopState state; -struct StopToken { - package(concurrency) shared StopState* state; - this(StopSource source) nothrow @safe @nogc { - if (source !is null) { - this.state = &source.source.state; - isStopPossible = true; - } + ~this() nothrow @safe @nogc shared scope { + assertNoCallbacks(); } - this(shared StopSource source) nothrow @trusted @nogc { - this(cast()source); + /// Returns an associated StopToken + shared(StopToken) token() nothrow @safe @nogc shared return { + return shared StopToken(this); } - this(ref shared StopState state) nothrow @trusted @nogc { - isStopPossible = true; - this.state = &state; + /// Trigger a cancellation request. This will trigger any StopCallback that was + /// registered through a StopToken associated with this StopSource. + /// Additionally any calls to the `isStopRequested` method on an associated + /// StopToken will return true. + /// + /// Returns true if this is the first cancellation request. + bool stop() nothrow @safe shared scope { + return state.requestStop(); } - this (ref shared InPlaceStopSource stopSource) nothrow @trusted @nogc { - this(stopSource.state); + /// Returns true if a cancellation request has been triggered. + bool isStopRequested() nothrow @safe @nogc shared scope { + return state.isStopRequested(); } - this (shared InPlaceStopSource* stopSource) nothrow @trusted @nogc { - if (stopSource !is null) { - isStopPossible = true; - this.state = &stopSource.state; - } + /// Resets the internal state. To ensure no dangling state it will first trigger + /// any registered StopCallbacks. + void reset() nothrow @safe shared scope { + stop(); + this.state = StopState(); } - bool isStopRequested() nothrow @safe @nogc { - return isStopPossible && state.is_stop_requested(); + /// Primarily used in unittests or debug builds to ensure there are no + /// dangling callbacks. + /// Because both the StopSource and StopCallbacks are to be placed + /// on the stack, a programming error can result in hard to debug + /// use-after-free errors. + /// This method allows to catch those errors before they becomes + /// much harder to debug. + void assertNoCallbacks() nothrow @safe @nogc shared scope { + state.assertEmpty(); } - const bool isStopPossible; -} - -struct NeverStopToken { - enum isStopRequested = false; - enum isStopPossible = false; -} - -StopCallback onStop( - StopSource stopSource, - void delegate() nothrow @safe shared callback -) nothrow @safe { - auto cb = new StopCallback(callback); - return onStop(stopSource, cb); -} - -StopCallback onStop(StopSource stopSource, - void function() nothrow @safe callback) nothrow @trusted { - import std.functional : toDelegate; - return stopSource - .onStop(cast(void delegate() nothrow @safe shared) callback.toDelegate); -} - -StopCallback onStop( - ref shared InPlaceStopSource stopSource, - void delegate() nothrow @safe shared callback -) nothrow @safe { - auto cb = new StopCallback(callback); - stopSource.state.onStop(cb.callback); - return cb; -} - -StopCallback onStop(StopToken)( - StopToken stopToken, - void delegate() nothrow @safe shared callback -) nothrow @safe { - auto cb = new StopCallback(callback); - - onStop(stopToken, cb); - - return cb; + @disable void opAssign(shared StopSource rhs) nothrow @safe @nogc shared; + @disable void opAssign(ref shared StopSource rhs) nothrow @safe @nogc shared; } -StopCallback onStop(StopToken)( - StopToken stopToken, - void function() nothrow @safe callback -) nothrow @trusted { - import std.functional : toDelegate; - return stopToken - .onStop(cast(void delegate() nothrow @safe shared) callback.toDelegate); -} +/// A StopToken is associated with a StopSource and is used to determine +/// whether a cancellation request has been initiated. +/// It is up to the task to check this token and react accordingly. +/// +/// Cancellation can be checked either by calling `isStopRequested` or +/// registering a StopCallback. The StopCallback will be called when the +/// underlying StopSource is triggered. +/// +/// A StopToken can be obtained by calling the `token` method on a StopSource. +struct StopToken { + private shared StopState* state; -StopCallback onStop(StopToken)(StopToken stopToken, - StopCallback cb) nothrow @safe { - if (stopToken.isStopPossible) { - stopToken.onStop(cb.callback); + private this(return ref shared StopSource stopSource) nothrow @safe @nogc shared { + this.state = &stopSource.state; } - return cb; -} -void onStop(StopToken)(StopToken stopToken, - ref InPlaceStopCallback cb) nothrow @safe { - if (stopToken.isStopPossible) { - (*stopToken.state).onStop(cb); + /// Returns true if a cancellation request has been initiated with the associated StopSource + bool isStopRequested() nothrow @safe @nogc scope shared { + return state && state.isStopRequested(); } } -StopCallback onStop(StopSource stopSource, StopCallback cb) nothrow @safe { - onStop(stopSource.source.state, cb.callback); - return cb; -} - -void onStop(StopSource stopSource, ref InPlaceStopCallback cb) nothrow @safe { - onStop(stopSource.source.state, cb); -} - -void onStop(ref shared StopState state, ref InPlaceStopCallback cb) nothrow @trusted { // TODO: @safe - // TODO: shared - if (state.try_add_callback(cast(shared)cb, true)) - cb.state = &state; -} +/// A StopCallback can be associated with a StopToken, and contains a callback +/// that will be invoked when a stop request in the underlying StopSource's +/// is triggered. +/// +/// There is no guarantee which execution context the callback is invoked on. +struct StopCallback { + @disable this(ref return scope shared StopCallback rhs); + @disable this(ref return scope StopCallback rhs); + + ~this() @safe scope @nogc nothrow shared { + dispose(); + } + + /// Register the StopCallback with a StopToken and a specified callback. + /// + /// The supplied callback will be invoked exactly once when the StopToken + /// is triggered. That invocation might happen before this function returns. + /// + /// A subsequent call to `register` will first ensure any existing registered + /// callback is unregistered. Note that the existing callback might still get + /// invoked if the existing StopToken is triggered at the same time. + /// + /// In cases where this is unwanted you can call `dispose` yourself before + /// registering a new callback. + void register(return ref shared StopToken token, void delegate() nothrow @safe shared callback) nothrow @safe scope return shared { + dispose(); + + /// TODO: before we continue here we need to be 100% sure the previous callback + /// isn't invoked, or ensure we wait until it is done. + + this.callback = callback; -struct InPlaceStopCallback { - @disable this(ref return scope typeof(this) rhs); + if (token.state && token.state.tryAddCallback(cast(shared)this)) + this.state = token.state; + } - void dispose() nothrow @trusted @nogc { + /// Dispose the stopcallback, deregistering it from an associated StopToken, + /// if any. + /// + /// The registered callback could still be triggered while this function is + /// executing, but it will have been ran to completion before it returns. + void dispose() shared nothrow @trusted @nogc scope { import core.atomic : cas; + // TODO: might have to reset prev or next if state is null if (state is null) return; auto local = state; - static if (__traits(compiles, cas(&state, local, null))) { - if (!cas(&state, local, null)) { - assert(state is null); - return; - } - } else { - if (!cas(cast(shared) &state, cast(shared) local, null)) { - assert(state is null); - return; - } + if (!cas(&state, local, null)) { + assert(state is null); + return; } - - local.remove_callback(this); - } - - void dispose() shared nothrow @trusted @nogc { - (cast() this).dispose(); - } - - this(void delegate() nothrow shared @safe callback) nothrow @safe @nogc { - this.callback = callback; + local.removeCallback(this); } + @disable void opAssign(ref shared StopCallback rhs) nothrow @safe @nogc shared; + @disable void opAssign(shared StopCallback rhs) nothrow @safe @nogc shared; private: - - void delegate() nothrow shared @safe callback; + void delegate() nothrow @safe shared callback; shared StopState* state; - shared InPlaceStopCallback* next_ = null; - shared InPlaceStopCallback** prev_ = null; - shared bool* isRemoved_ = null; + shared StopCallback* next = null; + shared StopCallback** prev = null; + shared bool* isRemoved = null; shared bool callbackFinishedExecuting = false; - - void execute() nothrow @safe shared { - callback(); - } -} - -class StopCallback { - this(void delegate() nothrow shared @safe callback) nothrow @safe @nogc { - this.callback = InPlaceStopCallback(callback); - } - - void dispose() nothrow @trusted @nogc { - callback.dispose(); - } - - void dispose() shared nothrow @trusted @nogc { - callback.dispose(); - } -private: - - InPlaceStopCallback callback; } -private void spin_yield() nothrow @trusted @nogc { +private void spinYield() nothrow @trusted @nogc { // TODO: could use the pause asm instruction // it is available in LDC as intrinsic... but not in DMD import core.thread : Thread; @@ -261,71 +203,39 @@ private void spin_yield() nothrow @trusted @nogc { Thread.yield(); } +/// The StopState is the internal state object used in the StopSource. +/// It contains the linked list of StopCallbacks and the methods to +/// add, remove and trigger the callbacks in a thread-safe manner. +/// +/// It is based on the code from https://github.com/josuttis/jthread +/// by Nicolai Josuttis, licensed under the Creative Commons Attribution +/// 4.0 Internation License http://creativecommons.org/licenses/by/4.0 private struct StopState { import core.thread : Thread; import core.atomic : atomicStore, atomicLoad, MemoryOrder, atomicOp, atomicFetchAdd, atomicFetchSub; + import core.atomic : casWeak; - static if (__traits(compiles, () { - import core.atomic : casWeak; - }) && __traits(compiles, () { - import core.internal.atomic - : atomicCompareExchangeWeakNoResult; - })) - import core.atomic : casWeak; - else - auto casWeak(MemoryOrder M1, MemoryOrder M2, T, V1, V2)( - T* here, - V1 ifThis, - V2 writeThis - ) pure nothrow @nogc @safe { - import core.atomic : cas; - - static if (__traits(compiles, - cas!(M1, M2)(here, ifThis, writeThis))) - return cas!(M1, M2)(here, ifThis, writeThis); - else - return cas(here, ifThis, writeThis); - } - -public: - void add_token_reference() nothrow @safe @nogc shared { - state_.atomicFetchAdd!(MemoryOrder.raw)(token_ref_increment); - } - - void remove_token_reference() nothrow @safe @nogc shared { - state_.atomicFetchSub!(MemoryOrder.acq_rel)(token_ref_increment); - } - - void add_source_reference() nothrow @safe @nogc shared { - state_.atomicFetchAdd!(MemoryOrder.raw)(source_ref_increment); - } - - void remove_source_reference() nothrow @safe @nogc shared { - state_.atomicFetchSub!(MemoryOrder.acq_rel)(source_ref_increment); - } - - bool request_stop() nothrow @safe shared { - if (!try_lock_and_signal_until_signalled()) { + bool requestStop() nothrow @safe shared scope { + if (!tryLockAndSignalUntilSignalled()) { // Stop has already been requested. return false; } // Set the 'stop_requested' signal and acquired the lock. + storeSignallingThread(); - assumeThreadSafe.signallingThread_ = Thread.getThis(); - - while (head_ !is null) { + while (head !is null) { // Dequeue the head of the queue - auto cb = head_; - head_ = cb.next_; - const bool anyMore = head_ !is null; + auto cb = head; + head = cb.next; + const bool anyMore = head !is null; if (anyMore) { - (() @trusted => head_.prev_ = - &head_)(); // compiler 2.091.1 complains "address of variable this assigned to this with longer lifetime". But this is this, how can it have a longer lifetime... + // @trusted due to error "`scope` applies to first indirection only" + (() @trusted => head.prev = &head)(); } // Mark this item as removed from the list. - cb.prev_ = null; + cb.prev = null; // Don't hold lock while executing callback // so we don't block other threads from deregistering callbacks. @@ -334,18 +244,18 @@ public: // TRICKY: Need to store a flag on the stack here that the callback // can use to signal that the destructor was executed inline // during the call. If the destructor was executed inline then - // it's not safe to dereference cb after execute() returns. + // it's not safe to dereference cb after callback() returns. // If the destructor runs on some other thread then the other // thread will block waiting for this thread to signal that the // callback has finished executing. shared bool isRemoved = false; - (() @trusted => cb.isRemoved_ = + (() @trusted => cb.isRemoved = &isRemoved)(); // the pointer to the stack here is removed 3 lines down. - cb.execute(); + cb.callback(); if (!isRemoved) { - cb.isRemoved_ = null; + cb.isRemoved = null; cb.callbackFinishedExecuting .atomicStore!(MemoryOrder.rel)(true); } @@ -366,67 +276,56 @@ public: return true; } - bool is_stop_requested() nothrow @safe @nogc shared { - return is_stop_requested(state_.atomicLoad!(MemoryOrder.acq)); + bool isStopRequested() nothrow @safe @nogc shared scope { + return isStopRequested(state.atomicLoad!(MemoryOrder.acq)); } - bool is_stop_requestable() nothrow @safe @nogc shared { - return is_stop_requestable(state_.atomicLoad!(MemoryOrder.acq)); - } - - bool try_add_callback(ref shared InPlaceStopCallback cb, - bool incrementRefCountIfSuccessful) nothrow @trusted shared { + bool tryAddCallback(ref return scope shared StopCallback cb) nothrow @trusted shared { ulong oldState; do { goto load_state; do { - spin_yield(); + spinYield(); load_state: - oldState = state_.atomicLoad!(MemoryOrder.acq); - if (is_stop_requested(oldState)) { - cb.execute(); - return false; - } else if (!is_stop_requestable(oldState)) { + oldState = state.atomicLoad!(MemoryOrder.acq); + if (isStopRequested(oldState)) { + cb.callback(); return false; } - } while (is_locked(oldState)); + } while (isLocked(oldState)); } while (!casWeak!(MemoryOrder.acq, MemoryOrder.acq)( - &state_, oldState, oldState | locked_flag)); + &state, oldState, oldState | lockedFlag)); // Push callback onto callback list. - cb.next_ = head_; - if (cb.next_ !is null) { - cb.next_.prev_ = &cb.next_; + cb.next = head; + if (cb.next !is null) { + cb.next.prev = &cb.next; } () @trusted { - cb.prev_ = &head_; + cb.prev = &head; }(); - head_ = &cb; + head = &cb; - if (incrementRefCountIfSuccessful) { - unlock_and_increment_token_ref_count(); - } else { - unlock(); - } + unlock(); // Successfully added the callback. return true; } - void remove_callback(ref InPlaceStopCallback cb) nothrow @safe @nogc shared { + void removeCallback(ref shared StopCallback cb) nothrow @safe @nogc shared { lock(); - if (cb.prev_ !is null) { + if (cb.prev !is null) { // Still registered, not yet executed // Just remove from the list. - *cb.prev_ = cb.next_; - if (cb.next_ !is null) { - cb.next_.prev_ = cb.prev_; + *cb.prev = cb.next; + if (cb.next !is null) { + cb.next.prev = cb.prev; } - unlock_and_decrement_token_ref_count(); + unlock(); return; } @@ -436,105 +335,119 @@ public: // Callback has either already executed or is executing // concurrently on another thread. - if (assumeThreadSafe.signallingThread_ is Thread.getThis()) { + if (isSignallingThread()) { // Callback executed on this thread or is still currently executing // and is deregistering itself from within the callback. - if (cb.isRemoved_ !is null) { - // Currently inside the callback, let the request_stop() method + if (cb.isRemoved !is null) { + // Currently inside the callback, let the requestStop() method // know the object is about to be destructed and that it should // not try to access the object when the callback returns. - *cb.isRemoved_ = true; + *cb.isRemoved = true; } } else { // Callback is currently executing on another thread, // block until it finishes executing. while (!cb.callbackFinishedExecuting.atomicLoad!(MemoryOrder.acq)) { - spin_yield(); + spinYield(); } } + } - remove_token_reference(); + bool isFinished(ref shared StopCallback cb) nothrow @safe @nogc shared { + return cb.callbackFinishedExecuting.atomicLoad!(MemoryOrder.acq); } -private: - void assertNoCallbacks() @safe shared { + void assertEmpty() @safe shared @nogc nothrow scope { + // Return early if head is zero. + if (head.atomicLoad!(MemoryOrder.acq) is null) { + return; + } + + // Otherwise we have to do a bit of cleanup before asserting. + // `assertEmpty` is typically called during tests or debug builds only, + // right before destroying the StopSource and its StopState. + // + // In case it is violated we can't just assert however. Both StopCallbacks + // and StopSources sit on the stack and callbacks must be deregistered + // before either one is deconstructed, otherwise the callback might try + // to deregister itself from a dangling StopSource, potentially from + // another thread or fiber. + // + // This can result in complex to debug use-after-free errors. + // In order to reduce that we clean up those callbacks before asserting. + // + // There is no guarantee other callbacks won't be registered concurrently, + // either in between unlock and assert, or even afterwards, but this is + // primarily to be used for detecting errors to aid throubleshooting, + // not as a failsafe mechanism. lock(); - auto empty = head_ is null; + shared StopState* blank = null; + while (head !is null) { + // Go through all pointers and dispose, this is to + // avoid a segfault when doing the assert. + auto next = head.next; + atomicStore(head.state, blank); + head = next; + } unlock(); - assert(empty, "StopSource has lingering callbacks"); - } - - static bool is_locked(ulong state) nothrow @safe @nogc { - return (state & locked_flag) != 0; + assert(false, "StopSource has lingering callbacks"); } - static bool is_stop_requested(ulong state) nothrow @safe @nogc { - return (state & stop_requested_flag) != 0; + static bool isLocked(ulong state) nothrow @safe @nogc { + return (state & lockedFlag) != 0; } - static bool is_stop_requestable(ulong state) nothrow @safe @nogc { - // Interruptible if it has already been interrupted or if there are - // still interrupt_source instances in existence. - return is_stop_requested(state) || (state >= source_ref_increment); + static bool isStopRequested(ulong state) nothrow @safe @nogc { + return (state & stopRequestedFlag) != 0; } - bool try_lock_and_signal_until_signalled() nothrow @safe @nogc shared { + bool tryLockAndSignalUntilSignalled() nothrow @safe @nogc shared scope { ulong oldState; do { - oldState = state_.atomicLoad!(MemoryOrder.acq); - if (is_stop_requested(oldState)) + oldState = state.atomicLoad!(MemoryOrder.acq); + if (isStopRequested(oldState)) return false; - while (is_locked(oldState)) { - spin_yield(); - oldState = state_.atomicLoad!(MemoryOrder.acq); - if (is_stop_requested(oldState)) + while (isLocked(oldState)) { + spinYield(); + oldState = state.atomicLoad!(MemoryOrder.acq); + if (isStopRequested(oldState)) return false; } } while (!casWeak!(MemoryOrder.seq, MemoryOrder.acq)( - &state_, oldState, - oldState | stop_requested_flag | locked_flag)); + &state, oldState, + oldState | stopRequestedFlag | lockedFlag)); return true; } - void lock() nothrow @safe @nogc shared { + void lock() nothrow @safe @nogc shared scope { ulong oldState; do { - oldState = state_.atomicLoad!(MemoryOrder.raw); - while (is_locked(oldState)) { - spin_yield(); - oldState = state_.atomicLoad!(MemoryOrder.raw); + oldState = state.atomicLoad!(MemoryOrder.raw); + while (isLocked(oldState)) { + spinYield(); + oldState = state.atomicLoad!(MemoryOrder.raw); } } while (!casWeak!(MemoryOrder.acq, MemoryOrder.raw)( - (&state_), oldState, oldState | locked_flag)); + (&state), oldState, oldState | lockedFlag)); } - void unlock() nothrow @safe @nogc shared { - state_.atomicFetchSub!(MemoryOrder.rel)(locked_flag); + void unlock() nothrow @safe @nogc shared scope { + state.atomicFetchSub!(MemoryOrder.rel)(lockedFlag); } - void unlock_and_increment_token_ref_count() nothrow @safe @nogc shared { - state_.atomicFetchSub!(MemoryOrder.rel)(locked_flag - token_ref_increment); - } + enum stopRequestedFlag = 1L; + enum lockedFlag = 2L; + + shared ulong state = 0; + StopCallback* head = null; + Thread signallingThread; - void unlock_and_decrement_token_ref_count() nothrow @safe @nogc shared { - state_.atomicFetchSub!(MemoryOrder.acq_rel)(locked_flag + token_ref_increment); + private void storeSignallingThread() @nogc nothrow @trusted shared scope { + (cast()signallingThread) = Thread.getThis(); } - enum stop_requested_flag = 1L; - enum locked_flag = 2L; - enum token_ref_increment = 4L; - enum source_ref_increment = 1L << 33u; - - // bit 0 - stop-requested - // bit 1 - locked - // bits 2-32 - token ref count (31 bits) - // bits 33-63 - source ref count (31 bits) - shared ulong state_ = source_ref_increment; - InPlaceStopCallback* head_ = null; - Thread signallingThread_; - - ref assumeThreadSafe() @trusted nothrow @nogc shared { - return cast()this; + private bool isSignallingThread() @nogc nothrow @trusted shared scope { + return (cast()signallingThread) is Thread.getThis(); } } diff --git a/source/concurrency/stream/cycle.d b/source/concurrency/stream/cycle.d index a9e0d3f..05a21d4 100644 --- a/source/concurrency/stream/cycle.d +++ b/source/concurrency/stream/cycle.d @@ -1,13 +1,14 @@ module concurrency.stream.cycle; import concurrency.stream.stream; +import concurrency.stoptoken : StopToken; import std.range : ElementType; struct Cycle(Range) { alias T = ElementType!Range; alias DG = CollectDelegate!(T); Range range; - void loop(StopToken)(DG emit, StopToken stopToken) @safe { + void loop(DG emit, shared StopToken stopToken) @safe { for (; !stopToken.isStopRequested;) { foreach (item; range) { emit(item); diff --git a/source/concurrency/stream/flatmapbase.d b/source/concurrency/stream/flatmapbase.d index 4775a85..1c83fe8 100644 --- a/source/concurrency/stream/flatmapbase.d +++ b/source/concurrency/stream/flatmapbase.d @@ -3,7 +3,7 @@ module concurrency.stream.flatmapbase; import concurrency.stream.stream; import concurrency.sender : OpType, isSender; import concurrency.receiver : ForwardExtensionPoints; -import concurrency.stoptoken : StopSource, StopToken; +import concurrency.stoptoken : StopToken; import std.traits : ReturnType; import concurrency.utils : isThreadSafeFunction; import concepts; @@ -42,7 +42,7 @@ template FlatMapBaseStreamOp(Stream, Fun, OnOverlap overlap) { static if (is(Properties.ElementType == void)) void item() { - if (state.isStopRequested) + if (state.stopSource.isStopRequested) return; state.onItem(); with (state.bitfield.lock()) { @@ -58,7 +58,7 @@ template FlatMapBaseStreamOp(Stream, Fun, OnOverlap overlap) { else void item(Properties.ElementType t) { - if (state.isStopRequested) + if (state.stopSource.isStopRequested) return; state.onItem(); with (state.bitfield.lock()) { @@ -104,7 +104,7 @@ private bool isDoneOrErrorProduced(size_t state) @safe @nogc nothrow pure { } final class State(TStreamSenderValue, TSenderValue, Receiver, - OnOverlap overlap) : StopSource { + OnOverlap overlap) { import concurrency.bitfield; import concurrency.stoptoken; import std.exception : assumeWontThrow; @@ -118,30 +118,31 @@ final class State(TStreamSenderValue, TSenderValue, Receiver, StreamSenderValue value; Throwable throwable; Semaphore semaphore; - StopCallback cb; + shared StopSource stopSource; + shared StopCallback cb; static if (overlap == OnOverlap.latest) - shared InPlaceStopSource innerStopSource; + shared StopSource innerStopSource; shared SharedBitField!Flags bitfield; this(DG dg, Receiver receiver) { this.dg = dg; this.receiver = receiver; semaphore = new Semaphore(1); bitfield = SharedBitField!Flags(Counter.tick); - cb = receiver.getStopToken - .onStop(cast(void delegate() nothrow @safe shared) &stop); + auto stopToken = receiver.getStopToken; + cb.register(stopToken, cast(void delegate() nothrow @safe shared) &stop); } - override bool stop() nothrow @trusted { + bool stop() nothrow @trusted { return (cast(shared) this).stop(); } - override bool stop() nothrow @trusted shared { + bool stop() nothrow @trusted shared { static if (overlap == OnOverlap.latest) { - auto r = super.stop(); + auto r = stopSource.stop(); innerStopSource.stop(); return r; } else { - return super.stop(); + return stopSource.stop(); } } @@ -174,11 +175,11 @@ final class State(TStreamSenderValue, TSenderValue, Receiver, } } - private StopToken getSenderStopToken() @safe nothrow { + private shared(StopToken) getSenderStopToken() @safe nothrow { static if (overlap == OnOverlap.latest) { - return StopToken(innerStopSource); + return innerStopSource.token(); } else { - return StopToken(this); + return stopSource.token(); } } } @@ -210,7 +211,7 @@ struct StreamReceiver(State) { if (!isDoneOrErrorProduced(oldState)) { state.throwable = t; release(); // must release before calling .stop - state.stop(); + state.stopSource.stop(); } else release(); if (last) @@ -222,14 +223,14 @@ struct StreamReceiver(State) { with (state.bitfield.update(Flags.doneOrError_produced, Counter.tick)) { bool last = isLast(newState); if (!isDoneOrErrorProduced(oldState)) - state.stop(); + state.stopSource.stop(); if (last) state.process(newState); } } - StopToken getStopToken() @safe nothrow { - return StopToken(state); + shared(StopToken) getStopToken() @safe nothrow { + return state.stopSource.token(); } private auto receiver() { @@ -259,7 +260,7 @@ struct InnerSenderReceiver(State) { if (!isDoneOrErrorProduced(oldState)) { state.throwable = t; release(); // must release before calling .stop - state.stop(); + state.stopSource.stop(); } else release(); if (last) @@ -271,7 +272,7 @@ struct InnerSenderReceiver(State) { void setDone() @safe nothrow { static if (State.onOverlap == OnOverlap.latest) { - if (!state.isStopRequested) { + if (!state.stopSource.isStopRequested) { state.bitfield.add(Counter.tick); notify(); return; @@ -281,7 +282,7 @@ struct InnerSenderReceiver(State) { with (state.bitfield.update(Flags.doneOrError_produced, Counter.tick)) { bool last = isLast(newState); if (!isDoneOrErrorProduced(oldState)) - state.stop(); + state.stopSource.stop(); if (last) state.process(newState); else diff --git a/source/concurrency/stream/package.d b/source/concurrency/stream/package.d index f789c74..dc2402e 100644 --- a/source/concurrency/stream/package.d +++ b/source/concurrency/stream/package.d @@ -61,7 +61,7 @@ auto infiniteStream(T)(T t) { alias DG = CollectDelegate!(T); struct Loop { T val; - void loop(StopToken)(DG emit, StopToken stopToken) { + void loop(DG emit, shared StopToken stopToken) { while (!stopToken.isStopRequested) emit(val); } @@ -75,7 +75,7 @@ auto iotaStream(T)(T start, T end) { alias DG = CollectDelegate!(T); struct Loop { T b, e; - void loop(StopToken)(DG emit, StopToken stopToken) { + void loop(DG emit, shared StopToken stopToken) { foreach (i; b .. e) { emit(i); if (stopToken.isStopRequested) @@ -92,7 +92,7 @@ auto arrayStream(T)(T[] arr) { alias DG = CollectDelegate!(T); struct Loop { T[] arr; - void loop(StopToken)(DG emit, StopToken stopToken) @safe { + void loop(DG emit, shared StopToken stopToken) @safe { foreach (item; arr) { emit(item); if (stopToken.isStopRequested) @@ -305,14 +305,14 @@ shared struct SharedStream(T) { shared SharedStream!T source; DG dg; Receiver receiver; - StopCallback cb; + shared StopCallback cb; @disable this(ref return scope typeof(this) rhs); @disable this(this); void start() nothrow @trusted { auto stopToken = receiver.getStopToken(); - cb = stopToken.onStop(&(cast(shared) this).onStop); + cb.register(stopToken, &(cast(shared) this).onStop); if (stopToken.isStopRequested) { cb.dispose(); receiver.setDone(); diff --git a/source/concurrency/stream/scan.d b/source/concurrency/stream/scan.d index b167a27..5a35520 100644 --- a/source/concurrency/stream/scan.d +++ b/source/concurrency/stream/scan.d @@ -30,7 +30,7 @@ template ScanStreamOp(Stream, Fun, Seed) { @disable this(this); this(Stream stream, Fun scanFn, Seed seed, DG dg, - Receiver receiver) @trusted { + Receiver receiver) @trusted return scope { this.scanFn = scanFn; this.acc = seed; this.dg = dg; diff --git a/source/concurrency/stream/take.d b/source/concurrency/stream/take.d index eeadc87..cfd9369 100644 --- a/source/concurrency/stream/take.d +++ b/source/concurrency/stream/take.d @@ -3,7 +3,7 @@ module concurrency.stream.take; import concurrency.stream.stream; import concurrency.sender : OpType; import concurrency.receiver : ForwardExtensionPoints; -import concurrency.stoptoken : InPlaceStopSource; +import concurrency.stoptoken : StopSource; import concepts; /// takes the first n values from a stream or until cancelled @@ -17,7 +17,7 @@ auto take(Stream)(Stream stream, size_t n) if (models!(Stream, isStream)) { struct TakeReceiver(Receiver, Value) { Receiver receiver; - shared InPlaceStopSource* stopSource; + shared StopSource* stopSource; static if (is(Value == void)) void setValue() @safe { receiver.setValue(); @@ -51,12 +51,12 @@ template TakeOp(Stream) { struct TakeOp(Receiver) { import concurrency.operations : withStopSource, SSSender; import std.traits : ReturnType; - alias SS = SSSender!(Properties.Sender, shared InPlaceStopSource*); + alias SS = SSSender!(Properties.Sender); alias Op = OpType!(SS, TakeReceiver!(Receiver, Properties.Sender.Value)); size_t n; Properties.DG dg; - shared InPlaceStopSource stopSource; + shared StopSource stopSource; Op op; @disable this(ref return scope typeof(this) rhs); diff --git a/source/concurrency/stream/throttling.d b/source/concurrency/stream/throttling.d index c9b1a91..56fef90 100644 --- a/source/concurrency/stream/throttling.d +++ b/source/concurrency/stream/throttling.d @@ -75,9 +75,9 @@ template ThrottleStreamOp(Stream, ThrottleEmitLogic emitLogic, alias InnerReceiver = TimerReceiver!(typeof(this), Properties.ElementType, emitLogic, timerLogic); - shared InPlaceStopSource stopSource; - shared InPlaceStopSource timerStopSource; - StopCallback cb; + shared StopSource stopSource; + shared StopSource timerStopSource; + shared StopCallback cb; Throwable throwable; alias Op = OpType!(Properties.Sender, SenderReceiver!(typeof(this), Properties.Value)); @@ -261,9 +261,9 @@ template ThrottleStreamOp(Stream, ThrottleEmitLogic emitLogic, } void start() @trusted nothrow scope { - cb = receiver.getStopToken().onStop( - cast(void delegate() nothrow @safe shared) &this.stop - ); // butt ugly cast, but it won't take the second overload + auto stopToken = receiver.getStopToken(); + // butt ugly cast, but it won't take the second overload + cb.register(stopToken, cast(void delegate() nothrow @safe shared) &this.stop); op.start(); } } @@ -310,7 +310,7 @@ struct TimerReceiver(Op, ElementType, ThrottleEmitLogic emitLogic, } auto getStopToken() { - return StopToken(state.timerStopSource); + return state.timerStopSource.token(); } auto getScheduler() { @@ -350,7 +350,7 @@ struct SenderReceiver(Op, Value) { } auto getStopToken() { - return StopToken(state.stopSource); + return state.stopSource.token(); } auto getScheduler() { diff --git a/source/concurrency/stream/transform.d b/source/concurrency/stream/transform.d index bfcb342..5b60dde 100644 --- a/source/concurrency/stream/transform.d +++ b/source/concurrency/stream/transform.d @@ -3,7 +3,6 @@ module concurrency.stream.transform; import concurrency.stream.stream; import concurrency.sender : OpType; import concurrency.receiver : ForwardExtensionPoints; -import concurrency.stoptoken : StopSource; import std.traits : ReturnType; import concurrency.utils : isThreadSafeFunction; import concepts; diff --git a/source/concurrency/syncwait.d b/source/concurrency/syncwait.d index be93ea9..e0cedd1 100644 --- a/source/concurrency/syncwait.d +++ b/source/concurrency/syncwait.d @@ -25,7 +25,7 @@ package struct SyncWaitReceiver2(Value) { } State* state; - shared InPlaceStopSource* stopSource; + shared StopSource* stopSource; void setDone() nothrow @safe { state.canceled = true; state.worker.stop(); @@ -48,7 +48,7 @@ package struct SyncWaitReceiver2(Value) { } auto getStopToken() nothrow @safe @nogc { - return StopToken(*stopSource); + return stopSource.token(); } auto getScheduler() nothrow @safe { @@ -113,6 +113,10 @@ struct Result(T) { return std.sumtype.match!((T t) => t, function T(x) { throw new Exception("Unexpected value"); })(result); } + auto trustedGet(T)() { + return std.sumtype.match!((T t) => t, function T(x) { assert(0, "nah"); })(result); + } + auto assumeOk() { return value(); } @@ -128,18 +132,18 @@ template match(Handlers...) { /// Start the Sender and waits until it completes, cancels, or has an error. auto syncWait(Sender)(auto ref Sender sender, - shared ref InPlaceStopSource stopSource) { + shared ref StopSource stopSource) { return syncWaitImpl(sender, stopSource); } auto syncWait(Sender)(auto scope ref Sender sender) @trusted { import concurrency.signal : globalStopSource; - shared InPlaceStopSource childStopSource; - auto cb = InPlaceStopCallback(() shared { + shared StopSource childStopSource; + shared parentStopToken = globalStopSource.token(); + shared StopCallback cb; + cb.register(parentStopToken, () shared { childStopSource.stop(); }); - StopToken parentStopToken = StopToken(globalStopSource); - parentStopToken.onStop(cb); try { auto result = syncWaitImpl(sender, childStopSource); @@ -156,7 +160,7 @@ auto syncWait(Sender)(auto scope ref Sender sender) @trusted { private Result!(Sender.Value) syncWaitImpl(Sender)(auto scope ref Sender sender, - ref shared InPlaceStopSource stopSource) @safe { + ref shared StopSource stopSource) @safe { static assert(models!(Sender, isSender)); import concurrency.signal; import core.stdc.signal : SIGTERM, SIGINT; diff --git a/source/concurrency/syncwaitsimple.d b/source/concurrency/syncwaitsimple.d new file mode 100644 index 0000000..1ba3207 --- /dev/null +++ b/source/concurrency/syncwaitsimple.d @@ -0,0 +1,401 @@ +module concurrency.syncwaitsimple; + +import concurrency.stoptoken; +import concurrency.sender; +import concurrency.thread; +import concepts; +import std.sumtype; +import concurrency.syncwait : Result; + +/** + * + +We can simply assume that the Sender that runs in syncWait is blocking, why? Because if it isn't then it needs to run +async, and where does it run if syncWait didn't give it a execution context to run in? + +Ergo, it runs on the main thread. + +Currently we have ThreadSender and FiberSender. For ThreadSender we probably want to do this differently and instead +have something that owns the Thread, like a pool (even a pool for one), and now we have something that owns the threads +and needs to join on them, thus blocking the main thread. + +For FiberSender the situation is different. The Fiber can be considered blocking as well though, since if it yields it +always yields *to* something. By default the syncWait doens't expose additional execution contexts, so where can it run? +Only on the main thread. + + +The question then becomes, how do we inject these SingleThreadTimers, ThreadPools and EventLoops in the async graph, and +have them block until syncWait completes? + +As we saw with iouring, the EventLoop itself can be modelled as a Sender. + + +In C++ they model the executors as separate entities. + +This has the benefit that one can easily schedule (parts of) the async graph onto a specific scheduler. + +For instance, while the parsing of an HTTP request can happen mostly on the io scheduler, the handling might +be best moved to a thread pool, until the request is done and is about to start on the response, at which point +rescheduling it on the io scheduler could happen. + +Such a thread pool is created somewhere and then passed to sub async computations. + + +However, do note that such a thread pool can be gotton by using letValue, and then has its lifetime constrained. +The extra nesting isn't nice, and it might best be done with an immovable, non-copyable stack item. + + + +{ + timer_single_thread context; + + context.run(just(...)).syncWait(); +} + +{ + auto context = thread_pool(4); + + context.run(just(...)).syncWait(); +} + +{ + auto context = iouring(512); + + context.run(just(...)).syncWait(); +} + +Now, run iouring, with a thread_pool for tasks and a timer_single_thread + +{ + timer_single_thread timer; + auto pool = thread_pool(4); + auto context = iouring(512); + + auto scheduler = pool.getScheduler(); + + timer.run(context.run(server(pool)).on(scheduler)).syncWait; +} + +Here timer and context by default block the thread, but because we run the +iouring on the thread_pool scheduler only timer blocks the thread. + + +A timer/iouring is something that outlives the sender it runs while providing +scheduling. + +If something provides a NullScheduler it means it isn't scheduling things on the +current thread. + + + */ + + +/* + +Lets say we accept the semaphore thing + +*/ + +import concurrency.syncwait : Cancelled, Completed; + +private struct State(Value) { + static if (!is(Value == void)) + alias Store = SumType!(Value, Cancelled, Throwable); + else + alias Store = SumType!(Completed, Cancelled, Throwable); + + shared StopSource* stopSource; + import core.sync.semaphore; + Store result; + InPlaceSemaphore semaphore; + + this(shared ref StopSource stopSource) { + import core.lifetime; + + this.stopSource = &stopSource; + semaphore = InPlaceSemaphore(0); + } + + private void notify() nothrow @trusted @nogc { + semaphore.notify(); + } + + private void wait() nothrow @trusted @nogc { + semaphore.wait(); + } +} + +package struct SyncWaitReceiver(Value) { + private State!(Value)* state; + void setDone() nothrow @trusted { + state.result = Cancelled(); + state.notify(); + } + + void setError(Throwable e) nothrow @trusted { + state.result = e; + state.notify(); + } + + static if (is(Value == void)) + void setValue() nothrow @trusted { + state.result = Completed(); + state.notify(); + } + + else + void setValue(Value value) nothrow @trusted { + state.result = value; + state.notify(); + } + + auto getStopToken() nothrow @safe @nogc { + return state.stopSource.token(); + } + + auto getScheduler() nothrow @trusted { + import concurrency.scheduler : NullScheduler; + + return NullScheduler(); + } +} + +/// Start the Sender and waits until it completes, cancels, or has an error. +auto syncWaitSimple(Sender)(auto ref Sender sender, + shared ref StopSource stopSource) { + return syncWaitImpl(sender, stopSource); +} + +auto syncWaitSimple(Sender)(auto scope ref Sender sender) @trusted { + import concurrency.signal : globalStopSource; + shared StopSource childStopSource; + auto cb = shared StopCallback(() shared { + childStopSource.stop(); + }); + shared parentStopToken = StopToken(globalStopSource); + cb.onStop(paentStopToken); + + try { + auto result = syncWaitImpl(sender, childStopSource); + + version (unittest) childStopSource.assertNoCallbacks; + // detach stopSource + cb.dispose(); + return result; + } catch (Throwable t) { // @suppress(dscanner.suspicious.catch_em_all) + cb.dispose(); + throw t; + } +} + +private +Result!(Sender.Value) syncWaitImpl(Sender)(auto scope ref Sender sender, + ref shared StopSource stopSource) @trusted { + static assert(models!(Sender, isSender)); + + alias Value = Sender.Value; + auto state = State!Value(stopSource); + + scope receiver = SyncWaitReceiver!(Value)(&state); + auto op = sender.connect(receiver); + op.start(); + + state.wait(); + + return state.result.match!((Throwable exception) { + if (auto e = cast(Exception) exception) + return Result!Value(e); + throw exception; + }, + (r) => Result!Value(r)); +} + +import core.sync.exception; +import core.time; + +version (OSX) + version = Darwin; +else version (iOS) + version = Darwin; +else version (TVOS) + version = Darwin; +else version (WatchOS) + version = Darwin; + +version (Windows) +{ + import core.sys.windows.basetsd /+: HANDLE+/; + import core.sys.windows.winbase /+: CloseHandle, CreateSemaphoreA, INFINITE, + ReleaseSemaphore, WAIT_OBJECT_0, WaitForSingleObject+/; + import core.sys.windows.windef /+: BOOL, DWORD+/; + import core.sys.windows.winerror /+: WAIT_TIMEOUT+/; +} +else version (Darwin) +{ + import core.sync.config; + import core.stdc.errno; + import core.sys.posix.time; + import core.sys.darwin.mach.semaphore; +} +else version (Posix) +{ + import core.sync.config; + import core.stdc.errno; + import core.sys.posix.pthread; + import core.sys.posix.semaphore; +} +else +{ + static assert(false, "Platform not supported"); +} + + +//////////////////////////////////////////////////////////////////////////////// +// Semaphore +// +// void wait(); +// void notify(); +//////////////////////////////////////////////////////////////////////////////// + + +/** + * This class represents a general counting semaphore as concieved by Edsger + * Dijkstra. As per Mesa type monitors however, "signal" has been replaced + * with "notify" to indicate that control is not transferred to the waiter when + * a notification is sent. + */ +struct InPlaceSemaphore { + //////////////////////////////////////////////////////////////////////////// + // Initialization + //////////////////////////////////////////////////////////////////////////// + + + /** + * Initializes a semaphore object with the specified initial count. + * + * Params: + * count = The initial count for the semaphore. + * + * Throws: + * SyncError on error. + */ + this( uint count ) @nogc nothrow @trusted + { + version (Windows) + { + m_hndl = CreateSemaphoreA( null, count, int.max, null ); + assert( m_hndl != m_hndl.init, "Unable to create semaphore" ); + } + else version (Darwin) + { + auto rc = semaphore_create( mach_task_self(), &m_hndl, SYNC_POLICY_FIFO, count ); + assert( rc == 0, "Unable to create semaphore" ); + } + else version (Posix) + { + int rc = sem_init( &m_hndl, 0, count ); + assert( rc == 0, "Unable to create semaphore" ); + } + } + + + ~this() @nogc nothrow @trusted + { + version (Windows) + { + BOOL rc = CloseHandle( m_hndl ); + assert( rc, "Unable to destroy semaphore" ); + } + else version (Darwin) + { + auto rc = semaphore_destroy( mach_task_self(), m_hndl ); + assert( !rc, "Unable to destroy semaphore" ); + } + else version (Posix) + { + int rc = sem_destroy( &m_hndl ); + assert( !rc, "Unable to destroy semaphore" ); + } + } + + + //////////////////////////////////////////////////////////////////////////// + // General Actions + //////////////////////////////////////////////////////////////////////////// + + + /** + * Wait until the current count is above zero, then atomically decrement + * the count by one and return. + * + * Returns: + * false on error. + */ + bool wait() nothrow @nogc @trusted + { + version (Windows) + { + DWORD rc = WaitForSingleObject( m_hndl, INFINITE ); + return rc == WAIT_OBJECT_0; + } + else version (Darwin) + { + while ( true ) + { + auto rc = semaphore_wait( m_hndl ); + if ( !rc ) + return true; + if ( rc == KERN_ABORTED && errno == EINTR ) + continue; + return false; + } + } + else version (Posix) + { + while ( true ) + { + if ( !sem_wait( &m_hndl ) ) + return true; + if ( errno != EINTR ) + return false; + } + } + } + + /** + * Atomically increment the current count by one. This will notify one + * waiter, if there are any in the queue. + * + * Returns: + * false on error. + */ + bool notify() nothrow @nogc @trusted + { + version (Windows) + { + if ( !ReleaseSemaphore( m_hndl, 1, null ) ) + return false; + } + else version (Darwin) + { + if ( semaphore_signal( m_hndl ) ) + return false; + } + else version (Posix) + { + if ( sem_post( &m_hndl ) ) + return false; + } + return true; + } + + /// Aliases the operating-system-specific semaphore type. + version (Windows) alias Handle = HANDLE; + /// ditto + else version (Darwin) alias Handle = semaphore_t; + /// ditto + else version (Posix) alias Handle = sem_t; + + /// Handle to the system-specific semaphore. + private Handle m_hndl; +} diff --git a/source/concurrency/utils.d b/source/concurrency/utils.d index 154afc4..977175f 100644 --- a/source/concurrency/utils.d +++ b/source/concurrency/utils.d @@ -44,9 +44,9 @@ auto closure(Fun, Args...)(Fun fun, Args args) @trusted auto cl = cast(shared) new Closure!(Fun, Args)(fun, args); /// need to cast to @safe because a @trusted delegate doesn't fit a @safe one... static if (hasFunctionAttributes!(Fun, "nothrow")) - alias ResultType = void delegate() nothrow shared @safe; + alias ResultType = void delegate() nothrow @safe shared; else - alias ResultType = void delegate() shared @safe; + alias ResultType = void delegate() @safe shared; return cast(ResultType) &cl.apply; } diff --git a/tests/ut/concurrency/asyncscope.d b/tests/ut/concurrency/asyncscope.d index 25eab59..4d5b403 100644 --- a/tests/ut/concurrency/asyncscope.d +++ b/tests/ut/concurrency/asyncscope.d @@ -180,7 +180,7 @@ auto waitingTask() { import concurrency.thread : ThreadSender; import concurrency.operations : withStopToken; - return ThreadSender().withStopToken((StopToken token) @trusted { + return ThreadSender().withStopToken((shared StopToken token) @trusted { import core.thread : Thread; while (!token.isStopRequested) { Thread.yield(); diff --git a/tests/ut/concurrency/fork.d b/tests/ut/concurrency/fork.d index 4c01675..f5e0d2c 100644 --- a/tests/ut/concurrency/fork.d +++ b/tests/ut/concurrency/fork.d @@ -18,7 +18,7 @@ unittest { @("sync_wait.fork.exception") @trusted unittest { import core.stdc.stdlib; - ForkSender(getLocalThreadExecutor(), () shared @trusted { + ForkSender(getLocalThreadExecutor(), () @trusted shared { exit(1); }).syncWait.assumeOk.shouldThrow(); } diff --git a/tests/ut/concurrency/mpsc.d b/tests/ut/concurrency/mpsc.d index 153c70a..0f8b221 100644 --- a/tests/ut/concurrency/mpsc.d +++ b/tests/ut/concurrency/mpsc.d @@ -29,7 +29,7 @@ auto intSummer(Q)(Q q) { import concurrency.stoptoken : StopToken; import core.time : msecs; - return just(q).withStopToken((StopToken stopToken, Q q) shared @safe { + return just(q).withStopToken((shared StopToken stopToken, Q q) @safe shared { int sum = 0; while (!stopToken.isStopRequested()) { if (auto node = q.pop()) { @@ -54,8 +54,8 @@ unittest { import core.time : msecs; auto q = new MPSCQueue!Node(); - q.intSummer.stopWhen(intProducer(q, 50000)).syncWait.value.should - == 1250025000; + q.intSummer.stopWhen(intProducer(q, 50_000)).syncWait.value.should + == 1_250_025_000; q.empty.should == true; } @@ -66,10 +66,10 @@ unittest { auto q = new MPSCQueue!Node(); q .intSummer - .stopWhen(whenAll(intProducer(q, 10000), intProducer(q, 10000), - intProducer(q, 10000), intProducer(q, 10000), )) + .stopWhen(whenAll(intProducer(q, 10_000), intProducer(q, 10_000), + intProducer(q, 10_000), intProducer(q, 10_000), )) .syncWait .value - .should == 200020000; + .should == 200_020_000; q.empty.should == true; } diff --git a/tests/ut/concurrency/nursery.d b/tests/ut/concurrency/nursery.d index ac70dc5..5faee93 100644 --- a/tests/ut/concurrency/nursery.d +++ b/tests/ut/concurrency/nursery.d @@ -52,8 +52,8 @@ unittest { unittest { auto nursery = new shared Nursery(); shared(int) global; - nursery.run(ThreadSender().then(() shared @safe { - nursery.run(ValueSender!(int)(5).then((int c) shared @safe { + nursery.run(ThreadSender().then(() @safe shared { + nursery.run(ValueSender!(int)(5).then((int c) @safe shared { global = c; })); })); @@ -66,7 +66,7 @@ unittest { @("run.thread.stop.internal") @safe unittest { auto nursery = new shared Nursery(); - nursery.run(ThreadSender().then(() shared @safe => nursery.stop())); + nursery.run(ThreadSender().then(() @safe shared => nursery.stop())); nursery.syncWait.isCancelled.should == true; nursery.getStopToken().isStopRequested().shouldBeTrue(); } @@ -74,8 +74,8 @@ unittest { @("run.thread.stop.external") @safe unittest { auto nursery = new shared Nursery(); - shared stopSource = InPlaceStopSource(); - nursery.run(ThreadSender().then(() shared @safe => stopSource.stop())); + shared stopSource = StopSource(); + nursery.run(ThreadSender().then(() @safe shared => stopSource.stop())); nursery.syncWait(stopSource).isCancelled.should == true; nursery.getStopToken().isStopRequested().shouldBeTrue(); stopSource.isStopRequested().shouldBeTrue(); @@ -85,12 +85,12 @@ unittest { unittest { import core.thread : Thread; auto nursery = new shared Nursery(); - auto thread1 = ThreadSender().then(() shared @trusted { + auto thread1 = ThreadSender().then(() @trusted shared { auto token = nursery.getStopToken(); while (!token.isStopRequested()) Thread.yield(); }); - auto thread2 = ThreadSender().then(() shared @safe => nursery.stop()); + auto thread2 = ThreadSender().then(() @safe shared => nursery.stop()); nursery.run(thread1); nursery.run(thread2); nursery.syncWait.isCancelled.should == true; @@ -115,17 +115,17 @@ unittest { unittest { import core.thread : Thread; auto nursery = new shared Nursery(); - auto thread1 = ThreadSender().then(() shared @trusted { + auto thread1 = ThreadSender().then(() @trusted shared { auto token = nursery.getStopToken(); while (!token.isStopRequested()) Thread.yield(); }); auto thread2 = - ThreadSender().withStopToken((StopToken token) shared @trusted { + ThreadSender().withStopToken((shared StopToken token) @trusted shared { while (!token.isStopRequested()) Thread.yield(); }); - auto thread3 = ThreadSender().then(() shared @safe { + auto thread3 = ThreadSender().then(() @safe shared { throw new Exception("Error should stop everyone"); }); nursery.run(thread1); @@ -139,11 +139,11 @@ unittest { @("withStopSource.1") @safe unittest { import core.thread : Thread; - shared InPlaceStopSource stopSource; + shared StopSource stopSource; auto nursery = new shared Nursery(); auto thread1 = - ThreadSender().withStopToken((StopToken stopToken) shared @trusted { + ThreadSender().withStopToken((shared StopToken stopToken) @trusted shared { while (!stopToken.isStopRequested) Thread.yield(); }).withStopSource(stopSource); @@ -160,11 +160,11 @@ unittest { @("withStopSource.2") @safe unittest { import core.thread : Thread; - shared InPlaceStopSource stopSource; + shared StopSource stopSource; auto nursery = new shared Nursery(); auto thread1 = - ThreadSender().withStopToken((StopToken stopToken) shared @trusted { + ThreadSender().withStopToken((shared StopToken stopToken) @trusted shared { while (!stopToken.isStopRequested) Thread.yield(); }).withStopSource(stopSource); diff --git a/tests/ut/concurrency/operations.d b/tests/ut/concurrency/operations.d index 6a2a824..9113853 100644 --- a/tests/ut/concurrency/operations.d +++ b/tests/ut/concurrency/operations.d @@ -57,7 +57,7 @@ unittest { unittest { race(ValueSender!int(4), ValueSender!int(5)).syncWait.value.should == 4; auto fastThread = ThreadSender().then(() shared => 1); - auto slowThread = ThreadSender().then(() shared @trusted { + auto slowThread = ThreadSender().then(() @trusted shared { Thread.sleep(50.msecs); return 2; }); @@ -80,7 +80,7 @@ unittest { @("race.exception.double") @safe unittest { - auto slow = ThreadSender().then(() shared @trusted { + auto slow = ThreadSender().then(() @trusted shared { Thread.sleep(50.msecs); throw new Exception("Slow"); }); @@ -92,7 +92,7 @@ unittest { @("race.cancel-other") @safe unittest { - auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { + auto waiting = ThreadSender().withStopToken((shared StopToken token) @trusted { while (!token.isStopRequested) { Thread.yield(); } @@ -103,7 +103,7 @@ unittest { @("race.cancel") @safe unittest { - auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { + auto waiting = ThreadSender().withStopToken((shared StopToken token) @trusted { while (!token.isStopRequested) { Thread.yield(); } @@ -189,7 +189,7 @@ unittest { @("whenAll.cancel") @safe unittest { - auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { + auto waiting = ThreadSender().withStopToken((shared StopToken token) @trusted { while (!token.isStopRequested) { Thread.yield(); } @@ -197,7 +197,7 @@ unittest { whenAll(waiting, DoneSender()).syncWait.isCancelled.should == true; whenAll(ThrowingSender(), waiting).syncWait.assumeOk.shouldThrow; whenAll(waiting, ThrowingSender()).syncWait.assumeOk.shouldThrow; - auto waitingInt = ThreadSender().withStopToken((StopToken token) @trusted { + auto waitingInt = ThreadSender().withStopToken((shared StopToken token) @trusted { while (!token.isStopRequested) { Thread.yield(); } @@ -211,12 +211,12 @@ unittest { @("whenAll.stop") @safe unittest { - auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { + auto waiting = ThreadSender().withStopToken((shared StopToken token) @trusted { while (!token.isStopRequested) { Thread.yield(); } }); - shared source = InPlaceStopSource(); + shared source = StopSource(); auto stopper = justFrom(() shared => source.stop()); whenAll(waiting, stopper).withStopSource(source).syncWait.isCancelled.should == true; @@ -366,26 +366,26 @@ unittest { @("withStopToken.oob") @safe unittest { auto oob = OutOfBandValueSender!int(44); - oob.withStopToken((StopToken stopToken, int t) => t).syncWait.value.should + oob.withStopToken((shared StopToken stopToken, int t) => t).syncWait.value.should == 44; } @("withStopSource.oob") @safe unittest { auto oob = OutOfBandValueSender!int(45); - shared source = InPlaceStopSource(); + shared source = StopSource(); oob.withStopSource(source).syncWait.value.should == 45; } @("withStopSource.tuple") @safe unittest { - just(14, 53).withStopToken((StopToken s, Tuple!(int, int) t) => t[0] * t[1]) + just(14, 53).withStopToken((shared StopToken s, Tuple!(int, int) t) => t[0] * t[1]) .syncWait.value.should == 742; } @("value.withstoptoken.via.thread") @safe unittest { - ValueSender!int(4).withStopToken((StopToken s, int i) { + ValueSender!int(4).withStopToken((shared StopToken s, int i) { throw new Exception("Badness"); }).via(ThreadSender()).syncWait.assumeOk.shouldThrowWithMessage("Badness"); } @@ -398,7 +398,7 @@ unittest { @("raceAll") @safe unittest { - auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { + auto waiting = ThreadSender().withStopToken((shared StopToken token) @trusted { while (!token.isStopRequested) { Thread.yield(); } @@ -431,7 +431,7 @@ unittest { import concurrency.scheduler : ManualTimeWorker; auto worker = new shared ManualTimeWorker(); - shared InPlaceStopSource source; + shared StopSource source; auto driver = justFrom(() shared { worker.timeUntilNextEvent().should == 10.msecs.nullable; source.stop(); @@ -492,13 +492,13 @@ unittest { @("stopOn") @safe unittest { - shared sourceInner = InPlaceStopSource(); - shared sourceOuter = InPlaceStopSource(); + shared sourceInner = StopSource(); + shared sourceOuter = StopSource(); shared bool b; whenAll( delay(5.msecs).then(() shared => b = true) - .stopOn(StopToken(sourceInner)), + .stopOn(sourceInner.token()), just(() => sourceOuter.stop()) ).syncWait(sourceOuter).assumeOk; b.should == true; @@ -506,7 +506,7 @@ unittest { shared bool d; whenAll( delay(5.msecs).then(() shared => b = true) - .stopOn(StopToken(sourceInner)), + .stopOn(sourceInner.token()), just(() => sourceInner.stop()) ).syncWait(sourceOuter).assumeOk; d.should == false; @@ -520,41 +520,41 @@ unittest { import core.sync.event : Event; bool parentAfterChild; Event childEvent, parentEvent; - this() shared @trusted { + this() @trusted shared { (cast() childEvent).initialize(false, false); (cast() parentEvent).initialize(false, false); } - void signalChild() shared @trusted { + void signalChild() @trusted shared { (cast() childEvent).set(); } - void waitChild() shared @trusted { + void waitChild() @trusted shared { (cast() childEvent).wait(); } - void signalParent() shared @trusted { + void signalParent() @trusted shared { (cast() parentEvent).set(); } - void waitParent() shared @trusted { + void waitParent() @trusted shared { (cast() parentEvent).wait(); } } auto state = new shared State(); - shared source = InPlaceStopSource(); + shared source = StopSource(); import std.stdio; auto child = just(state) - .withStopToken((StopToken token, shared State state) @trusted { + .withStopToken((shared StopToken token, shared State state) @trusted { while (!token.isStopRequested) {} state.signalParent(); state.waitChild(); }).via(ThreadSender()); auto parent = - just(state).withStopToken((StopToken token, shared State state) { + just(state).withStopToken((shared StopToken token, shared State state) { state.waitParent(); state.parentAfterChild.atomicStore(token.isStopRequested == false); state.signalChild(); @@ -634,7 +634,7 @@ unittest { @("stopWhen.source.value") @safe unittest { - auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { + auto waiting = ThreadSender().withStopToken((shared StopToken token) @trusted { while (!token.isStopRequested) { Thread.yield(); } @@ -647,7 +647,7 @@ unittest { @("stopWhen.source.error") @safe unittest { - auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { + auto waiting = ThreadSender().withStopToken((shared StopToken token) @trusted { while (!token.isStopRequested) { Thread.yield(); } @@ -661,7 +661,7 @@ unittest { @("stopWhen.source.cancelled") @safe unittest { - auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { + auto waiting = ThreadSender().withStopToken((shared StopToken token) @trusted { while (!token.isStopRequested) { Thread.yield(); } @@ -672,7 +672,7 @@ unittest { @("stopWhen.trigger.error") @safe unittest { - auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { + auto waiting = ThreadSender().withStopToken((shared StopToken token) @trusted { while (!token.isStopRequested) { Thread.yield(); } @@ -687,7 +687,7 @@ unittest { @("stopWhen.trigger.cancelled.value") @safe unittest { - auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { + auto waiting = ThreadSender().withStopToken((shared StopToken token) @trusted { while (!token.isStopRequested) { Thread.yield(); } diff --git a/tests/ut/concurrency/scheduler.d b/tests/ut/concurrency/scheduler.d index 83dc0e0..5ffd4a8 100644 --- a/tests/ut/concurrency/scheduler.d +++ b/tests/ut/concurrency/scheduler.d @@ -22,7 +22,7 @@ unittest { @("scheduleAfter.stop-before-add") @safe unittest { import concurrency.sender : delay, justFrom; - shared source = InPlaceStopSource(); + shared source = StopSource(); whenAll(justFrom(() shared => source.stop), delay(10.msecs)) .syncWait(source); } diff --git a/tests/ut/concurrency/sender.d b/tests/ut/concurrency/sender.d index 5ea3e04..08dfbcb 100644 --- a/tests/ut/concurrency/sender.d +++ b/tests/ut/concurrency/sender.d @@ -35,6 +35,17 @@ unittest { == true; } +@("syncWaitSimple.value") @safe +unittest { + import concurrency.syncwaitsimple; + import concurrency.signal : globalStopSource; + + auto stopSource = &globalStopSource(); + + int result = (() @nogc => ValueSender!(int)(77).syncWaitSimple(*stopSource).trustedGet!int)(); + result.shouldEqual(77); +} + @("value.start.attributes.1") @safe nothrow @nogc unittest { ValueSender!(int)(5).connect(NullReceiver!int()).start(); @@ -174,11 +185,11 @@ unittest { shared bool g;stopSource auto waiting = - ThreadSender().withStopToken((StopToken token) shared @trusted { + ThreadSender().withStopToken((shared StopToken token) @trusted shared { while (!token.isStopRequested) {} g.atomicStore(true); }); - shared source = InPlaceStopSource(); + shared source = StopSource(); auto stopper = justFrom(() shared => source.stop()); whenAll(waiting.toShared().withStopSource(source), stopper) @@ -333,7 +344,7 @@ unittest { import concurrency.stoptoken; just(14, 52).syncWait.value.should == tuple(14, 52); just(14, 53).then((int a, int b) => a * b).syncWait.value.should == 742; - just(14, 54).withStopToken((StopToken s, int a, int b) => a * b).syncWait + just(14, 54).withStopToken((shared StopToken s, int a, int b) => a * b).syncWait .value.should == 756; } diff --git a/tests/ut/concurrency/stoptoken.d b/tests/ut/concurrency/stoptoken.d index 7b16f02..93955e2 100644 --- a/tests/ut/concurrency/stoptoken.d +++ b/tests/ut/concurrency/stoptoken.d @@ -2,96 +2,164 @@ module ut.concurrency.stoptoken; import concurrency.stoptoken; import unit_threaded; +import std.encoding; -@("stopsource.stoptoken.stop") @safe +@("stopsource.isStopRequested.stop") @safe unittest { - auto source = shared InPlaceStopSource(); - auto token = StopToken(source); - token.isStopRequested.should == false; + auto source = shared StopSource(); + source.isStopRequested.should == false; source.stop(); - token.isStopRequested.should == true; + source.isStopRequested.should == true; } -@("stopsource.stopcallback.stop") @safe +@("stopsource.isStopRequested.reset") @safe unittest { - import core.atomic; - auto source = shared InPlaceStopSource(); - auto token = StopToken(source); - shared bool set; - token.onStop(() shared { set.atomicStore(true); }); - set.atomicLoad.should == false; + auto source = shared StopSource(); source.stop(); - set.atomicLoad.should == true; + source.isStopRequested.should == true; + source.reset(); + source.isStopRequested.should == false; } -@("stopsource.inplacestopcallback.stop") @safe +@("stopsource.reset") @safe unittest { - import core.atomic; - auto source = shared InPlaceStopSource(); - auto token = StopToken(source); - shared bool set; - InPlaceStopCallback cb = InPlaceStopCallback(() shared { set.atomicStore(true); }); - token.onStop(cb); - set.atomicLoad.should == false; - source.stop(); - set.atomicLoad.should == true; + auto source = shared StopSource(); + auto token = source.token(); + shared StopCallback cb; + cb.register(token, () shared { }); + source.reset(); + source.assertNoCallbacks(); } -@("inplacestopsource.stoptoken.stop") @safe +@("stopsource.no-copy") @safe unittest { - auto source = shared InPlaceStopSource(); - auto token = StopToken(source); + auto source = shared StopSource(); + /// Disallow copy + static assert(!__traits(compiles, { shared s2 = source; })); +} + +@("stopsource.no-assign") @safe +unittest { + auto source = shared StopSource(); + auto source2 = shared StopSource(); + /// Disallow assign + static assert(!__traits(compiles, { source = shared StopSource(); })); + static assert(!__traits(compiles, { source = source2; })); +} + +@("stoptoken.isStopRequested") @safe +unittest { + auto source = shared StopSource(); + auto token = source.token(); token.isStopRequested.should == false; source.stop(); - token.isStopPossible.should == true; + token.isStopRequested.should == true; +} + +@("stoptoken.lifetime") @safe +unittest { + shared StopToken token; + { + auto source = shared StopSource(); + /// Error: address of variable `source` assigned to `token` with longer lifetime + static assert(!__traits(compiles, () @safe { token = source.token(); })); + } } -@("inplacestopsource.stopcallback.stop") @safe +@("stoptoken.scope.no-escape") @safe +unittest { + shared StopToken t1; + { + shared source = shared StopSource(); + auto token = source.token(); + auto t2 = token; + + // assert that token can't escape the StopSource + static assert(!__traits(compiles, t1 = token)); + // assert that a copy can't escape the StopSource + static assert(!__traits(compiles, t1 = t2)); + } +} + +@("stopcallback.lifetime") @safe +unittest { + shared StopCallback cb; + { + auto source = shared StopSource(); + auto token = source.token(); + /// We don't allow address of variable `token` to be assigned to `cb` with longer lifetime + static assert(!__traits(compiles, () @safe { cb.place(() shared { }, token); })); + } +} + +@("stopcallback.opassign") @safe +unittest { + shared StopCallback cb1; + shared StopCallback cb2; + StopCallback cb3; + /// Disallow assign + static assert(!__traits(compiles, { cb1 = shared StopCallback(); })); + static assert(!__traits(compiles, { cb1 = cb2; })); + static assert(!__traits(compiles, { cb1 = StopCallback(); })); + static assert(!__traits(compiles, { cb1 = cb3; })); +} + +@("stopsource.StopCallback.stop") @safe unittest { import core.atomic; - auto source = shared InPlaceStopSource(); - auto token = StopToken(source); + auto source = shared StopSource(); + auto token = source.token(); shared bool set; - token.onStop(() shared { set.atomicStore(true); }); + shared StopCallback cb; + cb.register(token, () shared { set.atomicStore(true); }); set.atomicLoad.should == false; source.stop(); set.atomicLoad.should == true; } -@("inplacestopsource.inplacestopcallback.stop") @safe +@("stopsource.StopCallback.reset") @safe unittest { import core.atomic; - auto source = shared InPlaceStopSource(); - auto token = StopToken(source); + auto source = shared StopSource(); + auto token = source.token(); shared bool set; - InPlaceStopCallback cb = InPlaceStopCallback(() shared { set.atomicStore(true); }); - token.onStop(cb); + shared StopCallback cb; + cb.register(token, () shared { set.atomicStore(true); }); set.atomicLoad.should == false; - source.stop(); + source.reset(); set.atomicLoad.should == true; } -@("inplacestopsource.stopcallback.dispose") @safe +@("StopSource.stoptoken.stop") @safe +unittest { + auto source = shared StopSource(); + auto token = source.token(); + token.isStopRequested.should == false; + source.stop(); + token.isStopRequested.should == true; +} + +@("StopSource.StopCallback.stop") @safe unittest { import core.atomic; - auto source = shared InPlaceStopSource(); - auto token = StopToken(source); + auto source = shared StopSource(); + auto token = source.token(); shared bool set; - auto cb = token.onStop(() shared { set.atomicStore(true); }); + shared StopCallback cb; + cb.register(token, () shared { set.atomicStore(true); }); set.atomicLoad.should == false; - cb.dispose(); source.stop(); - set.atomicLoad.should == false; + set.atomicLoad.should == true; } -@("inplacestopsource.inplacestopcallback.dispose") @safe +@("StopSource.StopCallback.dispose") @safe unittest { import core.atomic; - auto source = shared InPlaceStopSource(); - auto token = StopToken(source); + auto source = shared StopSource(); + auto token = source.token(); shared bool set; - InPlaceStopCallback cb = InPlaceStopCallback(() shared { set.atomicStore(true); }); - token.onStop(cb); + shared StopCallback cb; + cb.register(token, () shared { set.atomicStore(true); }); set.atomicLoad.should == false; cb.dispose(); source.stop(); diff --git a/tests/ut/concurrency/stream.d b/tests/ut/concurrency/stream.d index 7d81cc7..43ec372 100644 --- a/tests/ut/concurrency/stream.d +++ b/tests/ut/concurrency/stream.d @@ -47,7 +47,7 @@ unittest { unittest { import concurrency.operations : withStopSource; shared int g = 0; - auto source = new shared StopSource(); + auto source = shared StopSource(); infiniteStream(5).collect((int n) shared { if (g < 14) g.atomicOp!"+="(n); @@ -82,7 +82,7 @@ unittest { unittest { struct Loop { size_t b, e; - void loop(DG, StopToken)(DG emit, StopToken stopToken) { + void loop(DG)(DG emit, shared StopToken stopToken) { foreach (i; b .. e) emit(i); } @@ -828,7 +828,7 @@ unittest { unittest { import concurrency.stream.defer; static struct S { - auto opCall() shared @safe { + auto opCall() @safe shared { import concurrency.sender; return just(1); } diff --git a/tests/ut/concurrency/utils.d b/tests/ut/concurrency/utils.d index a7ebfca..63c8e09 100644 --- a/tests/ut/concurrency/utils.d +++ b/tests/ut/concurrency/utils.d @@ -25,7 +25,7 @@ unittest { auto systemSharedClosure = (int i) shared => i * system() * k; static assert(!isThreadSafeFunction!systemSharedClosure); - auto trustedSharedClosure = (int i) shared @trusted => i * system() * k; + auto trustedSharedClosure = (int i) @trusted shared => i * system() * k; static assert(isThreadSafeFunction!trustedSharedClosure); } diff --git a/tests/ut/concurrency/waitable.d b/tests/ut/concurrency/waitable.d index 5e06b95..40eb26d 100644 --- a/tests/ut/concurrency/waitable.d +++ b/tests/ut/concurrency/waitable.d @@ -33,7 +33,7 @@ auto intSummer(Q)(Q q) { import concurrency.stoptoken : StopToken; import core.time : msecs; - return just(q).withStopToken((StopToken stopToken, Q q) shared @safe { + return just(q).withStopToken((shared StopToken stopToken, Q q) @safe shared { int sum = 0; while (!stopToken.isStopRequested()) { if (auto node = q.pop(100.msecs)) { @@ -58,8 +58,8 @@ unittest { import core.time : msecs; auto q = new WaitableQueue!(MPSCQueue!Node)(); - q.intSummer.stopWhen(intProducer(q, 50000)).syncWait.value.should - == 1250025000; + q.intSummer.stopWhen(intProducer(q, 50_000)).syncWait.value.should + == 1_250_025_000; q.empty.should == true; } @@ -70,10 +70,10 @@ unittest { auto q = new WaitableQueue!(MPSCQueue!Node)(); q .intSummer - .stopWhen(whenAll(intProducer(q, 10000), intProducer(q, 10000), - intProducer(q, 10000), intProducer(q, 10000), )) + .stopWhen(whenAll(intProducer(q, 10_000), intProducer(q, 10_000), + intProducer(q, 10_000), intProducer(q, 10_000), )) .syncWait .value - .should == 200020000; + .should == 200_020_000; q.empty.should == true; } diff --git a/tests/ut/ut_runner.d b/tests/ut/ut_runner.d index d5f67d9..8986fd2 100644 --- a/tests/ut/ut_runner.d +++ b/tests/ut/ut_runner.d @@ -17,6 +17,7 @@ int main(string[] args) { "ut.concurrency.waitable", "ut.concurrency.asyncscope", "concurrency.timingwheels", + "concurrency.stoptoken", "ut.concurrency.stoptoken", ); }