From 7ad9f18365f59975a0a2e17502eaeb1d8abdfaef Mon Sep 17 00:00:00 2001 From: Sebastiaan Koppe Date: Tue, 26 Sep 2023 21:15:08 +0200 Subject: [PATCH 1/2] Revamp Stoptoken StopSource and StopCallbacks --- source/concurrency/asyncscope.d | 12 +- source/concurrency/data/queue/mpsc.d | 2 +- source/concurrency/data/queue/waitable.d | 2 +- source/concurrency/executor.d | 2 +- source/concurrency/fork.d | 5 +- source/concurrency/nursery.d | 66 +- source/concurrency/operations/race.d | 31 +- source/concurrency/operations/stopon.d | 8 +- source/concurrency/operations/stopwhen.d | 40 +- source/concurrency/operations/toshared.d | 16 +- source/concurrency/operations/whenall.d | 26 +- source/concurrency/operations/withchild.d | 2 +- .../concurrency/operations/withstopsource.d | 99 +-- source/concurrency/operations/withstoptoken.d | 4 +- source/concurrency/receiver.d | 7 +- source/concurrency/scheduler.d | 48 +- source/concurrency/sender.d | 21 +- source/concurrency/signal.d | 48 +- source/concurrency/stoptoken.d | 589 ++++++++---------- source/concurrency/stream/cycle.d | 3 +- source/concurrency/stream/flatmapbase.d | 47 +- source/concurrency/stream/package.d | 10 +- source/concurrency/stream/scan.d | 2 +- source/concurrency/stream/take.d | 8 +- source/concurrency/stream/throttling.d | 21 +- source/concurrency/stream/transform.d | 1 - source/concurrency/syncwait.d | 55 +- source/concurrency/thread.d | 2 +- source/concurrency/utils.d | 8 +- tests/ut/concurrency/asyncscope.d | 2 +- tests/ut/concurrency/fork.d | 2 +- tests/ut/concurrency/mpsc.d | 12 +- tests/ut/concurrency/nursery.d | 38 +- tests/ut/concurrency/operations.d | 77 ++- tests/ut/concurrency/scheduler.d | 2 +- tests/ut/concurrency/sender.d | 16 +- tests/ut/concurrency/stoptoken.d | 167 +++++ tests/ut/concurrency/stream.d | 6 +- tests/ut/concurrency/thread.d | 17 - tests/ut/concurrency/utils.d | 2 +- tests/ut/concurrency/waitable.d | 12 +- tests/ut/ut_runner.d | 2 + 42 files changed, 828 insertions(+), 712 deletions(-) create mode 100644 tests/ut/concurrency/stoptoken.d diff --git a/source/concurrency/asyncscope.d b/source/concurrency/asyncscope.d index 6e1e25e..1adcf7b 100644 --- a/source/concurrency/asyncscope.d +++ b/source/concurrency/asyncscope.d @@ -10,8 +10,9 @@ private enum Flag { } auto asyncScope() @safe { + import concurrency.sender : Promise; // ensure NRVO - auto as = shared AsyncScope(new shared StopSource()); + auto as = shared AsyncScope(new shared Promise!void); return as; } @@ -66,9 +67,8 @@ public: cleanup.syncWait(); } - this(shared StopSource stopSource) @safe shared { - completion = new shared Promise!void; - this.stopSource = stopSource; + this(shared Promise!void completion) @safe shared { + this.completion = completion; } auto cleanup() @safe shared { @@ -93,7 +93,7 @@ public: return stopSource.stop(); } - bool spawn(Sender)(Sender s) shared @trusted { + bool spawn(Sender)(Sender s) @trusted shared { import concurrency.sender : connectHeap; with (flag.update(0, Flag.tick)) { if ((oldState & Flag.stopped) == 1) { @@ -133,7 +133,7 @@ struct AsyncScopeReceiver { auto getStopToken() nothrow @safe { import concurrency.stoptoken : StopToken; - return StopToken(s.stopSource); + return s.stopSource.token(); } auto getScheduler() nothrow @safe { diff --git a/source/concurrency/data/queue/mpsc.d b/source/concurrency/data/queue/mpsc.d index c303720..e2caf56 100644 --- a/source/concurrency/data/queue/mpsc.d +++ b/source/concurrency/data/queue/mpsc.d @@ -2,7 +2,7 @@ module concurrency.data.queue.mpsc; struct MPSCQueueProducer(Node) { private MPSCQueue!(Node) q; - void push(Node* node) shared @trusted { + void push(Node* node) @trusted shared { (cast() q).push(node); } } diff --git a/source/concurrency/data/queue/waitable.d b/source/concurrency/data/queue/waitable.d index f2026dd..57f6ad2 100644 --- a/source/concurrency/data/queue/waitable.d +++ b/source/concurrency/data/queue/waitable.d @@ -61,7 +61,7 @@ struct WaitableQueueProducer(Q) { private shared Q q; private shared Semaphore sema; - bool push(Q.ElementType t) shared @trusted { + bool push(Q.ElementType t) @trusted shared { bool r = (cast() q).push(t); if (r) (cast() sema).notify(); diff --git a/source/concurrency/executor.d b/source/concurrency/executor.d index 98c66fb..8546e5d 100644 --- a/source/concurrency/executor.d +++ b/source/concurrency/executor.d @@ -1,7 +1,7 @@ module concurrency.executor; alias VoidFunction = void function() @safe; -alias VoidDelegate = void delegate() shared @safe; +alias VoidDelegate = void delegate() @safe shared; interface Executor { void execute(VoidFunction fn) @safe; diff --git a/source/concurrency/fork.d b/source/concurrency/fork.d index 1ea9320..f9a3577 100644 --- a/source/concurrency/fork.d +++ b/source/concurrency/fork.d @@ -63,7 +63,8 @@ version(Posix) if (afterFork) afterFork(pid); - auto cb = token.onStop(() @trusted shared nothrow => + shared StopCallback cb; + cb.register(token, () @trusted shared nothrow => cast(void) kill(pid, SIGINT)); int status; auto ret = waitpid(pid, &status, 0); @@ -102,7 +103,7 @@ version(Posix) import concurrency.utils : closure; executeInNewThread( - cast(void delegate() shared @safe) &this.run); + cast(void delegate() @safe shared) &this.run); } } diff --git a/source/concurrency/nursery.d b/source/concurrency/nursery.d index 17ff76b..7b9dadc 100644 --- a/source/concurrency/nursery.d +++ b/source/concurrency/nursery.d @@ -1,8 +1,8 @@ module concurrency.nursery; -import concurrency.stoptoken : StopSource, StopToken, StopCallback, onStop; +import concurrency.stoptoken : StopSource, StopToken, StopCallback; import concurrency.thread : LocalThreadExecutor; -import concurrency.receiver : getStopToken; +// import concurrency.receiver : getStopToken; import concurrency.scheduler : SchedulerObjectBase; import std.typecons : Nullable; @@ -14,7 +14,7 @@ import std.typecons : Nullable; /// When cancellation happens all Senders are waited on for completion. /// Senders can be added to the Nursery at any time. /// Senders are only started when the Nursery itself is being awaited on. -class Nursery : StopSource { +class Nursery { import concurrency.sender : isSender, OperationalStateBase; import core.sync.mutex : Mutex; import concepts; @@ -22,6 +22,7 @@ class Nursery : StopSource { alias Value = void; private { + shared StopSource stopSource; Node[] operations; struct Node { OperationalStateBase state; @@ -38,7 +39,7 @@ class Nursery : StopSource { shared size_t counter = 0; Throwable throwable; // first throwable from sender, if any ReceiverObject receiver; - StopCallback stopCallback; + shared StopCallback stopCallback; Nursery assumeThreadSafe() @trusted shared nothrow { return cast(Nursery) this; } @@ -50,8 +51,8 @@ class Nursery : StopSource { with (assumeThreadSafe) mutex = new Mutex(); } - override bool stop() nothrow @trusted { - auto result = super.stop(); + bool stop() nothrow @trusted { + auto result = stopSource.stop(); if (result) (cast(shared) this).done(-1); @@ -59,12 +60,13 @@ class Nursery : StopSource { return result; } - override bool stop() nothrow @trusted shared { + bool stop() nothrow @trusted shared { return (cast(Nursery) this).stop(); } - StopToken getStopToken() nothrow @trusted shared { - return StopToken(cast(Nursery) this); + shared(StopToken) getStopToken() nothrow @safe shared { + return stopSource.token(); + // return shared StopToken(stopSource); } private auto getScheduler() nothrow @trusted shared { @@ -94,9 +96,9 @@ class Nursery : StopSource { if (isDone) { throwable = null; receiver = null; - if (stopCallback) - stopCallback.dispose(); - stopCallback = null; + // if (stopCallback) + stopCallback.dispose(); + // stopCallback = null; } mutex.unlock_nothrow(); @@ -104,7 +106,7 @@ class Nursery : StopSource { if (isDone && localReceiver !is null) { if (localThrowable !is null) { localReceiver.setError(localThrowable); - } else if (isStopRequested()) { + } else if (stopSource.isStopRequested()) { localReceiver.setDone(); } else { try { @@ -122,7 +124,7 @@ class Nursery : StopSource { run(sender.get()); } - void run(Sender)(Sender sender) shared @trusted { + void run(Sender)(Sender sender) @trusted shared { import concepts; static assert(models!(Sender, isSender)); import std.typecons : Nullable; @@ -161,7 +163,7 @@ class Nursery : StopSource { auto connect(Receiver)( return Receiver receiver - ) shared @trusted return scope { + ) @trusted shared return scope { final class ReceiverImpl : ReceiverObject { Receiver receiver; SchedulerObjectBase scheduler; @@ -187,21 +189,24 @@ class Nursery : StopSource { scheduler = receiver.getScheduler().toSchedulerObject(); return scheduler; } + + shared(StopToken) getStopToken() nothrow @safe { + return receiver.getStopToken(); + } } - auto stopToken = receiver.getStopToken(); - auto cb = (() @trusted => stopToken - .onStop(() shared nothrow @trusted => cast(void) this.stop()))(); - return NurseryOp(this, cb, new ReceiverImpl(receiver)); + return NurseryOp(this, new ReceiverImpl(receiver)); } private - void setReceiver(ReceiverObject r, StopCallback cb) nothrow @safe shared { + void setReceiver(ReceiverObject r) nothrow @trusted shared { with (assumeThreadSafe) { mutex.lock_nothrow(); assert(this.receiver is null, "Cannot await a nursery twice."); receiver = r; - stopCallback = cb; + auto dg = () nothrow @safe shared => cast(void) this.stop(); + auto stopToken = r.getStopToken; + stopCallback.register(stopToken, dg); auto ops = operations.dup(); mutex.unlock_nothrow(); @@ -217,6 +222,7 @@ private interface ReceiverObject { void setDone() nothrow @safe; void setError(Throwable e) nothrow @safe; SchedulerObjectBase getScheduler() nothrow @safe; + shared(StopToken) getStopToken() nothrow @safe; } private struct NurseryReceiver(Value) { @@ -228,7 +234,7 @@ private struct NurseryReceiver(Value) { } static if (is(Value == void)) { - void setValue() shared @safe { + void setValue() @safe shared { (cast() this).setDone(); } @@ -236,7 +242,7 @@ private struct NurseryReceiver(Value) { (cast() this).setDone(); } } else { - void setValue(Value val) shared @trusted { + void setValue(Value val) @trusted shared { (cast() this).setDone(); } @@ -264,24 +270,24 @@ private struct NurseryReceiver(Value) { private struct NurseryOp { shared Nursery nursery; - StopCallback cb; ReceiverObject receiver; @disable this(ref return scope typeof(this) rhs); @disable this(this); - this(return shared Nursery n, StopCallback cb, - ReceiverObject r) @safe return scope { + this(shared Nursery n, + ReceiverObject r) @safe { nursery = n; - this.cb = cb; + // this.cb = cb; receiver = r; } void start() nothrow scope @trusted { import core.atomic : atomicLoad; - if (nursery.busy.atomicLoad == 0) + if (nursery.busy.atomicLoad == 0) { + // cb.dispose(); receiver.setDone(); - else - nursery.setReceiver(receiver, cb); + } else + nursery.setReceiver(receiver);//, cb); } } diff --git a/source/concurrency/operations/race.d b/source/concurrency/operations/race.d index 6e7f158..00b324b 100644 --- a/source/concurrency/operations/race.d +++ b/source/concurrency/operations/race.d @@ -66,32 +66,32 @@ private struct RaceOp(Receiver, Senders...) { this(ref return scope typeof(this) rhs); this(Receiver receiver, return Senders senders, bool noDropouts) @trusted scope { + state = State!R(noDropouts); this.receiver = receiver; - state = new State!(R)(noDropouts); static if (Senders.length > 1) { foreach (i, Sender; Senders) { ops[i] = senders[i].connect( - ElementReceiver!(Sender)(receiver, state, Senders.length)); + ElementReceiver!(Sender)(receiver, &state, Senders.length)); } } else { ops.length = senders[0].length; foreach (i; 0 .. senders[0].length) { ops[i] = senders[0][i].connect( - ElementReceiver(receiver, state, senders[0].length)); + ElementReceiver(receiver, &state, senders[0].length)); } } } void start() @trusted nothrow scope { - import concurrency.stoptoken : StopSource; if (receiver.getStopToken().isStopRequested) { receiver.setDone(); return; } - state.cb = receiver.getStopToken().onStop( - cast(void delegate() nothrow @safe shared) &state.stop - ); // butt ugly cast, but it won't take the second overload + auto token = receiver.getStopToken(); + // butt ugly cast, but it won't take the second overload + state.cb.register(token, cast(void delegate() nothrow @safe shared) &state.stopSource.stop); + static if (Senders.length > 1) { foreach (i, _; Senders) { ops[i].start(); @@ -121,9 +121,10 @@ struct RaceSender(Senders...) } } -private class State(Value) : StopSource { +private struct State(Value) { import concurrency.bitfield; - StopCallback cb; + shared StopSource stopSource; + shared StopCallback cb; shared SharedBitField!Flags bitfield; static if (!is(Value == void)) Value value; @@ -148,10 +149,10 @@ private enum Counter : size_t { private struct RaceReceiver(Receiver, InnerValue, Value) { import core.atomic : atomicOp, atomicLoad, MemoryOrder; Receiver receiver; - State!(Value) state; + State!(Value)* state; size_t senderCount; auto getStopToken() { - return StopToken(state); + return state.stopSource.token(); } private bool isValueProduced(size_t state) { @@ -176,7 +177,7 @@ private struct RaceReceiver(Receiver, InnerValue, Value) { else state.value = Value(value); release(); // must release before calling .stop - state.stop(); + state.stopSource.stop(); } else release(); @@ -190,7 +191,7 @@ private struct RaceReceiver(Receiver, InnerValue, Value) { with (state.bitfield.update(Flags.value_produced, Counter.tick)) { bool last = isLast(newState); if (!isValueProduced(oldState)) { - state.stop(); + state.stopSource.stop(); } if (last) @@ -202,7 +203,7 @@ private struct RaceReceiver(Receiver, InnerValue, Value) { with (state.bitfield.update(Flags.doneOrError_produced, Counter.tick)) { bool last = isLast(newState); if (state.noDropouts && !isDoneOrErrorProduced(oldState)) { - state.stop(); + state.stopSource.stop(); } if (last) @@ -217,7 +218,7 @@ private struct RaceReceiver(Receiver, InnerValue, Value) { state.exception = exception; if (state.noDropouts) { release(); // release before stop - state.stop(); + state.stopSource.stop(); } } diff --git a/source/concurrency/operations/stopon.d b/source/concurrency/operations/stopon.d index 8d979cd..d03eb6b 100644 --- a/source/concurrency/operations/stopon.d +++ b/source/concurrency/operations/stopon.d @@ -7,14 +7,14 @@ import concurrency.stoptoken; import concepts; import std.traits; -auto stopOn(Sender)(Sender sender, StopToken stopToken) { +auto stopOn(Sender)(Sender sender, shared StopToken stopToken) { return StopOn!(Sender)(sender, stopToken); } private struct StopOnReceiver(Receiver, Value) { private { Receiver receiver; - StopToken stopToken; + shared StopToken stopToken; } static if (is(Value == void)) { @@ -35,7 +35,7 @@ private struct StopOnReceiver(Receiver, Value) { receiver.setError(e); } - auto getStopToken() nothrow @trusted { + auto getStopToken() nothrow @safe { return stopToken; } @@ -46,7 +46,7 @@ struct StopOn(Sender) if (models!(Sender, isSender)) { static assert(models!(typeof(this), isSender)); alias Value = Sender.Value; Sender sender; - StopToken stopToken; + shared StopToken stopToken; auto connect(Receiver)(return Receiver receiver) @safe return scope { alias R = StopOnReceiver!(Receiver, Sender.Value); // ensure NRVO diff --git a/source/concurrency/operations/stopwhen.d b/source/concurrency/operations/stopwhen.d index 3eb9ee5..7e738fa 100644 --- a/source/concurrency/operations/stopwhen.d +++ b/source/concurrency/operations/stopwhen.d @@ -29,11 +29,10 @@ private struct StopWhenOp(Receiver, Sender, Trigger) { this(Receiver receiver, return Sender source, return Trigger trigger) @trusted scope { this.receiver = receiver; - state = new State!(Sender.Value)(); sourceOp = source - .connect(SourceReceiver!(Receiver, Sender.Value)(receiver, state)); + .connect(SourceReceiver!(Receiver, Sender.Value)(receiver, &state)); triggerOp = trigger - .connect(TriggerReceiver!(Receiver, Sender.Value)(receiver, state)); + .connect(TriggerReceiver!(Receiver, Sender.Value)(receiver, &state)); } void start() @trusted nothrow scope { @@ -42,9 +41,10 @@ private struct StopWhenOp(Receiver, Sender, Trigger) { return; } - state.cb = receiver.getStopToken().onStop( - cast(void delegate() nothrow @safe shared) &state.stop - ); // butt ugly cast, but it won't take the second overload + auto token = receiver.getStopToken(); + // butt ugly cast, but it won't take the second overload + state.cb.register(token, cast(void delegate() nothrow @safe shared) &state.stopSource.stop); + sourceOp.start; triggerOp.start; } @@ -64,9 +64,11 @@ struct StopWhenSender(Sender, Trigger) } } -private class State(Value) : StopSource { +// refactor to use StopSource +private struct State(Value) { import concurrency.bitfield; - StopCallback cb; + shared StopSource stopSource; + shared StopCallback cb; shared SharedBitField!Flags bitfield; static if (!is(Value == void)) Value value; @@ -116,15 +118,15 @@ private bool isLast(size_t state) @safe nothrow pure { private struct TriggerReceiver(Receiver, Value) { Receiver receiver; - State!(Value) state; + State!(Value)* state; auto getStopToken() { - return StopToken(state); + return state.stopSource.token(); } void setValue() @safe nothrow { with (state.bitfield.update(Flags.tick)) { if (!isLast(oldState)) - state.stop(); + state.stopSource.stop(); else state.process(receiver, newState); } @@ -133,7 +135,7 @@ private struct TriggerReceiver(Receiver, Value) { void setDone() @safe nothrow { with (state.bitfield.update(Flags.doneOrError_produced, Flags.tick)) { if (!isLast(oldState)) - state.stop(); + state.stopSource.stop(); else state.process(receiver, newState); } @@ -145,7 +147,7 @@ private struct TriggerReceiver(Receiver, Value) { if (!isDoneOrErrorProduced(oldState)) { state.exception = exception; release(); // release before stop - state.stop(); + state.stopSource.stop(); } else { release(); if (last) @@ -160,9 +162,9 @@ private struct TriggerReceiver(Receiver, Value) { private struct SourceReceiver(Receiver, Value) { import core.atomic : atomicOp, atomicLoad, MemoryOrder; Receiver receiver; - State!(Value) state; + State!(Value)* state; auto getStopToken() { - return StopToken(state); + return state.stopSource.token(); } static if (!is(Value == void)) @@ -173,7 +175,7 @@ private struct SourceReceiver(Receiver, Value) { release(); if (!last) - state.stop(); + state.stopSource.stop(); else if (isDoneOrErrorProduced(oldState)) state.process(receiver, oldState); else @@ -186,7 +188,7 @@ private struct SourceReceiver(Receiver, Value) { with (state.bitfield.update(Flags.value_produced | Flags.tick)) { bool last = isLast(oldState); if (!last) - state.stop(); + state.stopSource.stop(); else if (isDoneOrErrorProduced(oldState)) state.process(receiver, oldState); else @@ -198,7 +200,7 @@ private struct SourceReceiver(Receiver, Value) { with (state.bitfield.update(Flags.doneOrError_produced | Flags.tick)) { bool last = isLast(oldState); if (!last) - state.stop(); + state.stopSource.stop(); else state.process(receiver, newState); } @@ -213,7 +215,7 @@ private struct SourceReceiver(Receiver, Value) { release(); if (!last) - state.stop(); + state.stopSource.stop(); else state.process(receiver, newState); } diff --git a/source/concurrency/operations/toshared.d b/source/concurrency/operations/toshared.d index 2c56656..ce9141f 100644 --- a/source/concurrency/operations/toshared.d +++ b/source/concurrency/operations/toshared.d @@ -82,7 +82,7 @@ class SharedSender(Sender, Scheduler, ResetLogic resetLogic) if ((newState >> 2) == 0) { auto localStopSource = state.inst; release(); // release early - localStopSource.stop(); + localStopSource.stopSource.stop(); return false; } else { auto localReceiver = state.inst; @@ -145,14 +145,15 @@ struct SharedSenderOp(Sender, Scheduler, ResetLogic resetLogic, Receiver) { alias Props = Properties!(Sender); SharedSender!(Sender, Scheduler, resetLogic) parent; Receiver receiver; - StopCallback cb; + shared StopCallback cb; @disable this(ref return scope typeof(this) rhs); @disable this(this); void start() nothrow @trusted scope { parent.add(&(cast(shared) this).onValue); - cb = receiver.getStopToken.onStop(&(cast(shared) this).onStop); + auto stopToken = receiver.getStopToken; + cb.register(stopToken, &(cast(shared) this).onStop); } void onStop() nothrow @trusted shared { @@ -223,8 +224,8 @@ private struct SharedSenderReceiver(Sender, Scheduler, ResetLogic resetLogic) { state.process!(resetLogic)(v); } - StopToken getStopToken() @trusted nothrow { - return StopToken(state.inst); + shared(StopToken) getStopToken() @safe nothrow { + return state.inst.stopSource.token(); } Scheduler getScheduler() @safe nothrow scope { @@ -255,7 +256,7 @@ private template process(ResetLogic resetLogic) { auto localState = state.inst; InternalValue v = localState.value.match!((typeof(null)) => assert(0, "not happening"), v => v); release(oldState & (~0x3)); // release early and remove all ticks - if (localState.isStopRequested) + if (localState.stopSource.isStopRequested) (() @trusted => v = Done())(); foreach (dg; localState.dgs[]) dg(v); @@ -263,12 +264,13 @@ private template process(ResetLogic resetLogic) { } } -private class SharedSenderInstState(Sender) : StopSource { +private class SharedSenderInstState(Sender) { import concurrency.slist; import std.traits : ReturnType; import std.sumtype; alias Props = Properties!(Sender); shared SList!(Props.DG) dgs; + shared StopSource stopSource; SumType!(typeof(null), Props.InternalValue) value; this() { this.dgs = new shared SList!(Props.DG); diff --git a/source/concurrency/operations/whenall.d b/source/concurrency/operations/whenall.d index f6708ae..90de34b 100644 --- a/source/concurrency/operations/whenall.d +++ b/source/concurrency/operations/whenall.d @@ -117,12 +117,11 @@ private struct WhenAllOp(Receiver, Senders...) { this(return Receiver receiver, return Senders senders) @trusted scope return { this.receiver = receiver; - state = new WhenAllState!R(); static if (Senders.length > 1) { foreach (i, Sender; Senders) { ops[i] = senders[i].connect( WhenAllReceiver!(Receiver, Sender.Value, - R)(receiver, state, i, Senders.length)); + R)(receiver, &state, i, Senders.length)); } } else { static if (!is(ArrayElement!(Senders).Value : void)) @@ -131,21 +130,21 @@ private struct WhenAllOp(Receiver, Senders...) { foreach (i; 0 .. senders[0].length) { ops[i] = senders[0][i].connect( WhenAllReceiver!(Receiver, ArrayElement!(Senders).Value, - R)(receiver, state, i, senders[0].length)); + R)(receiver, &state, i, senders[0].length)); } } } void start() @trusted nothrow scope { - import concurrency.stoptoken : StopSource; if (receiver.getStopToken().isStopRequested) { receiver.setDone(); return; } - state.cb = receiver.getStopToken().onStop( - cast(void delegate() nothrow @safe shared) &state.stop - ); // butt ugly cast, but it won't take the second overload + auto token = receiver.getStopToken(); + // butt ugly cast, but it won't take the second overload + state.cb.register(token, cast(void delegate() nothrow @safe shared) &state.stopSource.stop); + static if (Senders.length > 1) { foreach (i, _; Senders) { ops[i].start(); @@ -177,9 +176,10 @@ struct WhenAllSender(Senders...) } } -private class WhenAllState(Value) : StopSource { +private struct WhenAllState(Value) { import concurrency.bitfield; - StopCallback cb; + shared StopSource stopSource; + shared StopCallback cb; static if (is(typeof(Value.values))) Value value; Throwable exception; @@ -189,11 +189,11 @@ private class WhenAllState(Value) : StopSource { private struct WhenAllReceiver(Receiver, InnerValue, Value) { import core.atomic : atomicOp, atomicLoad, MemoryOrder; Receiver receiver; - WhenAllState!(Value) state; + WhenAllState!(Value)* state; size_t senderIndex; size_t senderCount; auto getStopToken() { - return StopToken(state); + return state.stopSource.token(); } private bool isValueProduced(size_t state) { @@ -232,7 +232,7 @@ private struct WhenAllReceiver(Receiver, InnerValue, Value) { with (state.bitfield.update(Flags.doneOrError_produced, Counter.tick)) { bool last = isLast(newState); if (!isDoneOrErrorProduced(oldState)) - state.stop(); + state.stopSource.stop(); if (last) process(newState); } @@ -244,7 +244,7 @@ private struct WhenAllReceiver(Receiver, InnerValue, Value) { if (!isDoneOrErrorProduced(oldState)) { state.exception = exception; release(); // must release before calling .stop - state.stop(); + state.stopSource.stop(); } else release(); if (last) diff --git a/source/concurrency/operations/withchild.d b/source/concurrency/operations/withchild.d index 3ee5d38..01aba01 100644 --- a/source/concurrency/operations/withchild.d +++ b/source/concurrency/operations/withchild.d @@ -24,7 +24,7 @@ struct WithChildSender(SenderParent, SenderChild) import concurrency.operations.stopon; // ensure NRVO auto op = - whenAll(b.stopOn(receiver.getStopToken), a).stopOn(StopToken()) + whenAll(b.stopOn(receiver.getStopToken), a).stopOn(shared StopToken()) .connect(receiver); return op; } diff --git a/source/concurrency/operations/withstopsource.d b/source/concurrency/operations/withstopsource.d index 93825f1..beaa7c8 100644 --- a/source/concurrency/operations/withstopsource.d +++ b/source/concurrency/operations/withstopsource.d @@ -7,26 +7,15 @@ import concurrency.stoptoken; import concepts; import std.traits; -template withStopSource(Sender) { - auto withStopSource(Sender sender, StopSource stopSource) { - return SSSender!(Sender, StopSource)(sender, stopSource); - } - - auto withStopSource(Sender sender, shared StopSource stopSource) @trusted { - return SSSender!(Sender, StopSource)(sender, cast() stopSource); - } - - auto withStopSource(Sender sender, ref InPlaceStopSource stopSource) @trusted { - return SSSender!(Sender, InPlaceStopSource*)(sender, &stopSource); - } +// TODO: return scope? +auto withStopSource(Sender)(Sender sender, ref shared StopSource stopSource) @trusted { + return SSSender!(Sender)(sender, &stopSource); } -private struct SSReceiver(Receiver, Value, OuterStopSource) { +private struct SSReceiver(Receiver, Value) { private { Receiver receiver; - OuterStopSource stopSource; - StopSource combinedSource; - StopCallback[2] cbs; + SSState* state; } static if (is(Value == void)) { @@ -52,49 +41,65 @@ private struct SSReceiver(Receiver, Value, OuterStopSource) { receiver.setError(e); } - auto getStopToken() nothrow @trusted scope { - import core.atomic; - if (this.combinedSource is null) { - auto local = new StopSource(); - if (cas(&this.combinedSource, cast(StopSource) null, local)) { - auto stop = - cast(void delegate() shared nothrow @safe) &local.stop; - cbs[0] = receiver.getStopToken().onStop(stop); - cbs[1] = StopToken(stopSource).onStop(stop); - if (atomicLoad(this.combinedSource) is null) { - cbs[0].dispose(); - cbs[1].dispose(); - } - } else { - cbs[0].dispose(); - cbs[1].dispose(); - } - } - - return StopToken(combinedSource); + auto getStopToken() nothrow @safe scope { + return state.combinedStopSource.token(); } mixin ForwardExtensionPoints!receiver; + private void resetStopCallback() { - import core.atomic; - if (atomicExchange(&this.combinedSource, cast(StopSource) null)) { - if (cbs[0]) - cbs[0].dispose(); - if (cbs[1]) - cbs[1].dispose(); + state.left.dispose(); + state.right.dispose(); + } +} + +struct SSState { + shared StopSource combinedStopSource; + shared StopCallback left; + shared StopCallback right; + ~this() @safe scope @nogc nothrow {} +} + +struct SSOp(Receiver, Sender) { + SSState state; + alias Op = OpType!(Sender, SSReceiver!(Receiver, Sender.Value)); + Op op; + + @disable + this(ref return scope typeof(this) rhs); + @disable + this(this); + ~this() @trusted scope @nogc nothrow {} + this(Receiver receiver, shared StopSource* outerStopSource, Sender sender) @trusted { + auto token = receiver.getStopToken(); + auto outerToken = outerStopSource.token(); + + state.left.register(token, cast(void delegate() @safe shared nothrow)&state.combinedStopSource.stop); + state.right.register(outerToken, cast(void delegate() @safe shared nothrow)&state.combinedStopSource.stop); + + try { + op = sender.connect(SSReceiver!(Receiver, Sender.Value)(receiver, &state)); + } catch (Exception e) { + state.left.dispose(); + state.right.dispose(); + throw e; } } + + void start() @safe nothrow { + // because we start op only afterwards, we can be sure that the cb's are created beforehand + op.start(); + } } -struct SSSender(Sender, StopSource) if (models!(Sender, isSender)) { +struct SSSender(Sender) if (models!(Sender, isSender)) { static assert(models!(typeof(this), isSender)); alias Value = Sender.Value; Sender sender; - StopSource stopSource; - auto connect(Receiver)(return Receiver receiver) @safe return scope { - alias R = SSReceiver!(Receiver, Sender.Value, StopSource); + shared StopSource* stopSource; + auto connect(Receiver)(return Receiver receiver) @trusted return scope { // ensure NRVO - auto op = sender.connect(R(receiver, stopSource)); + auto op = SSOp!(Receiver, Sender)(receiver, stopSource, sender); return op; } } diff --git a/source/concurrency/operations/withstoptoken.d b/source/concurrency/operations/withstoptoken.d index de49c45..09733a6 100644 --- a/source/concurrency/operations/withstoptoken.d +++ b/source/concurrency/operations/withstoptoken.d @@ -8,7 +8,7 @@ import concepts; import std.traits; import concurrency.utils : isThreadSafeFunction; -deprecated("function passed is not shared @safe delegate or @safe function.") +deprecated("function passed is not @safe shared delegate or @safe function.") auto withStopToken(Sender, Fun)(Sender sender, Fun fun) if (!isThreadSafeFunction!Fun) { return STSender!(Sender, Fun)(sender, fun); @@ -41,7 +41,7 @@ private struct STReceiver(Receiver, Value, Fun) { } else { import std.typecons : isTuple; enum isExpandable = isTuple!Value && __traits(compiles, { - fun(StopToken.init, Value.init.expand); + fun(shared(StopToken).init, Value.init.expand); }); void setValue(Value value) @safe { static if (is(ReturnType!Fun == void)) { diff --git a/source/concurrency/receiver.d b/source/concurrency/receiver.d index ced0818..4c7df1a 100644 --- a/source/concurrency/receiver.d +++ b/source/concurrency/receiver.d @@ -17,11 +17,6 @@ void checkReceiver(T)() { enum isReceiver(T) = is(typeof(checkReceiver!T)); -auto getStopToken(Receiver)(Receiver r) nothrow @safe if (isReceiver!Receiver) { - import concurrency.stoptoken : NeverStopToken; - return NeverStopToken(); -} - mixin template ForwardExtensionPoints(alias receiver) { auto getStopToken() nothrow @safe { return receiver.getStopToken(); @@ -43,7 +38,7 @@ interface ReceiverObjectBase(T) { void setValue(T value = T.init) @safe; void setDone() nothrow @safe; void setError(Throwable e) nothrow @safe; - StopToken getStopToken() nothrow @safe; + shared(StopToken) getStopToken() nothrow @safe; SchedulerObjectBase getScheduler() scope nothrow @safe; } diff --git a/source/concurrency/scheduler.d b/source/concurrency/scheduler.d index e754db9..d09d20a 100644 --- a/source/concurrency/scheduler.d +++ b/source/concurrency/scheduler.d @@ -23,6 +23,33 @@ interface SchedulerObjectBase { SenderObjectBase!void scheduleAfter(Duration d) @safe; } + +// We can pull the LocalThreadExecutor (and its schedule/scheduleAfter) out into a specialized context. +// Just like we did with the iouring context + +// The interesting bit is that the syncWait algorithm then might be inferred as @nogc + +// The question remains how we would want to integrate these. +// With iouring we created a runner that would take a sender and would inject the scheduler and allow itself to steal the current thread. + +// That last part is important, we don't want to spawn a thread just to run timers, we can do it perfectly fine on the current thread. +// Same with iouring or other event loops. + +// That said, we can, if we want to, move the event loop to another thread. + +// The only thing we can't do is cross schedule timers from one thread to another. +// Well, that is not true, we can create two context objects that expose a Scheduler + + + + + + +// Guess we just have to write it and see.... + +// Dietmar Kuhl used a iocontext with a run function that allows running it on the current thread. +// In rant I had the iocontext's runner return a sender so you could await that. + class SchedulerObject(S) : SchedulerObjectBase { import concurrency.sender : toSenderObject; S scheduler; @@ -50,7 +77,7 @@ enum TimerTrigger { cancel } -alias TimerDelegate = void delegate(TimerTrigger) shared @safe; +alias TimerDelegate = void delegate(TimerTrigger) @safe shared; import concurrency.timingwheels : ListElement; alias Timer = ListElement!(TimerDelegate); @@ -64,6 +91,7 @@ auto localThreadScheduler() { alias LocalThreadScheduler = typeof(localThreadScheduler()); struct SchedulerAdapter(Worker) { + static assert(models!(typeof(this), isScheduler)); import concurrency.receiver : setValueOrError; import concurrency.executor : VoidDelegate; import core.time : Duration; @@ -101,7 +129,7 @@ struct SchedulerAdapter(Worker) { return ScheduleSender(worker); } - auto schedule() shared @trusted { + auto schedule() @trusted shared { return (cast() this).schedule(); } @@ -109,7 +137,7 @@ struct SchedulerAdapter(Worker) { return ScheduleAfterSender!(Worker)(worker, dur); } - auto scheduleAfter(Duration dur) shared @trusted { + auto scheduleAfter(Duration dur) @trusted shared { return (cast() this).scheduleAfter(dur); } } @@ -117,7 +145,7 @@ struct SchedulerAdapter(Worker) { struct ScheduleAfterOp(Worker, Receiver) { import std.traits : ReturnType; import concurrency.bitfield : SharedBitField; - import concurrency.stoptoken : StopCallback, onStop; + import concurrency.stoptoken : StopCallback; import concurrency.receiver : setValueOrError; enum Flags { @@ -127,26 +155,25 @@ struct ScheduleAfterOp(Worker, Receiver) { setup = 0x4, } - // alias Timer = ReturnType!(Worker.addTimer); Worker worker; Duration dur; Receiver receiver; Timer timer; - StopCallback stopCb; + shared StopCallback stopCb; shared SharedBitField!Flags flags; @disable this(ref return scope typeof(this) rhs); @disable this(this); + // ~this() @safe scope {} void start() @trusted scope nothrow { if (receiver.getStopToken().isStopRequested) { receiver.setDone(); return; } - stopCb = - receiver.getStopToken() - .onStop(cast(void delegate() nothrow @safe shared) &stop); + auto token = receiver.getStopToken(); + stopCb.register(token, cast(void delegate() nothrow @safe shared) &stop); try { timer.userdata = cast(void delegate(TimerTrigger) @safe shared) &trigger; @@ -312,7 +339,8 @@ auto withBaseScheduler(T, P)(auto ref T t, auto ref P p) { static assert( false, "Neither " ~ T.stringof ~ " nor " ~ P.stringof - ~ " are full schedulers. Chain the sender with a .withScheduler and ensure the Scheduler passes the isScheduler check." + ~ " are full schedulers. Chain the sender with a .withScheduler" + ~ " and ensure the Scheduler passes the isScheduler check." ); } diff --git a/source/concurrency/sender.d b/source/concurrency/sender.d index 951ff87..7fea57e 100644 --- a/source/concurrency/sender.d +++ b/source/concurrency/sender.d @@ -68,8 +68,8 @@ void checkSender(T)() @safe { void setError(Throwable e) @safe nothrow {} - StopToken getStopToken() @safe nothrow { - return StopToken.init; + shared(StopToken) getStopToken() @safe nothrow { + return shared(StopToken).init; } Scheduler getScheduler() @safe nothrow { @@ -235,7 +235,7 @@ interface SenderObjectBase(T) { receiver.setError(e); } - StopToken getStopToken() nothrow { + shared(StopToken) getStopToken() nothrow { return receiver.getStopToken(); } @@ -449,9 +449,9 @@ struct PromiseSenderOp(T, Receiver) { alias InternalValue = Sender.InternalValue; shared Sender parent; Receiver receiver; - StopCallback cb; + shared StopCallback cb; shared SharedBitField!Flags bitfield; - @disable + @disable this(ref return scope typeof(this) rhs); @disable this(this); @@ -480,8 +480,10 @@ struct PromiseSenderOp(T, Receiver) { bool triggeredInline = parent.add(&(cast(shared) this).onValue); // if triggeredInline there is no point in setting up the stop callback - if (!triggeredInline) - cb = receiver.getStopToken.onStop(&(cast(shared) this).onStop); + if (!triggeredInline) { + auto stopToken = receiver.getStopToken; + cb.register(stopToken, &(cast(shared) this).onStop); + } with (bitfield.add(Flags.setup)) { if (has(Flags.stop)) { @@ -523,8 +525,7 @@ struct PromiseSenderOp(T, Receiver) { // being executed. // This means that when it completes we are the only one // calling the receiver's termination functions. - if (cb) - cb.dispose(); + cb.dispose(); value.match!((Sender.ValueRep v) { try { static if (is(Sender.Value == void)) @@ -639,7 +640,7 @@ class Promise(T) { this.dgs = new shared SList!DG; } - auto sender() shared @safe nothrow { + auto sender() @safe shared nothrow { return shared PromiseSender!T(this); } } diff --git a/source/concurrency/signal.d b/source/concurrency/signal.d index 97acfb5..689c2f8 100644 --- a/source/concurrency/signal.d +++ b/source/concurrency/signal.d @@ -2,47 +2,28 @@ module concurrency.signal; import concurrency.stoptoken; -shared(StopSource) globalStopSource() @trusted { +ref shared(StopSource) globalStopSource() @trusted { import core.atomic : atomicLoad, cas; - if (globalSource.atomicLoad is null) { - import concurrency.utils : dynamicLoad; + if (globalSourcePtr.atomicLoad is null) { auto ptr = getGlobalStopSourcePointer(); - if (auto source = (*ptr).atomicLoad) { - globalSource = source; - return globalSource; - } - - auto tmp = new shared StopSource(); - if (ptr.cas(cast(shared StopSource) null, tmp)) { - setupCtrlCHandler(tmp); - globalSource = tmp; + shared StopSource* empty = null; + if (ptr.cas(empty, &globalSource)) { + setupCtrlCHandler(globalSource); + globalSourcePtr = &globalSource; } else - globalSource = (*ptr).atomicLoad; + globalSourcePtr = (*ptr).atomicLoad; } - return globalSource; -} - -/// Returns true if first to set (otherwise it is ignored) -bool setGlobalStopSource(shared StopSource stopSource) @safe { - import core.atomic : cas; - auto ptr = getGlobalStopSourcePointer(); - if (!ptr.cas(cast(shared StopSource) null, stopSource)) - return false; - globalSource = stopSource; - return true; + return *globalSourcePtr; } /// Sets the stopSource to be called when receiving an interrupt -void setupCtrlCHandler(shared StopSource stopSource) @trusted { +void setupCtrlCHandler(ref shared StopSource stopSource) @trusted { import core.atomic; - if (stopSource is null) - return; - - auto old = atomicExchange(&SignalHandler.signalStopSource, stopSource); + auto old = atomicExchange(&SignalHandler.signalStopSource, &stopSource); if (old !is null) return; @@ -71,14 +52,15 @@ void setupCtrlCHandler(shared StopSource stopSource) @trusted { } } +private static shared StopSource* globalSourcePtr; private static shared StopSource globalSource; // a mixin for OS-specific visibility private mixin template globalStopSourcePointerImpl() { // should not be called directly by usercode, call `getGlobalStopSourcePointer` instead pragma(inline, false) - extern(C) shared(StopSource*) concurrency_globalStopSourcePointer() @safe { - return &globalSource; + extern(C) shared(StopSource*)* concurrency_globalStopSourcePointer() @safe { + return &globalSourcePtr; } } @@ -91,7 +73,7 @@ version(Windows) { // their own definition. private mixin globalStopSourcePointerImpl; - private shared(StopSource*) getGlobalStopSourcePointer() @safe { + private shared(StopSource*)* getGlobalStopSourcePointer() @safe { import concurrency.utils : dynamicLoad; return dynamicLoad!concurrency_globalStopSourcePointer()(); } @@ -175,7 +157,7 @@ struct SignalHandler { SignalHandler.notify(ABORT); } - private static shared StopSource signalStopSource; + private static shared StopSource* signalStopSource; private static shared Thread handlerThread; private static void launchHandlerThread() { if (handlerThread.atomicLoad !is null) diff --git a/source/concurrency/stoptoken.d b/source/concurrency/stoptoken.d index a874b13..119587e 100644 --- a/source/concurrency/stoptoken.d +++ b/source/concurrency/stoptoken.d @@ -1,236 +1,201 @@ module concurrency.stoptoken; -// originally this code is from https://github.com/josuttis/jthread by Nicolai Josuttis -// it is licensed under the Creative Commons Attribution 4.0 Internation License http://creativecommons.org/licenses/by/4.0 - -struct InPlaceStopSource { - @disable this(ref return scope typeof(this) rhs); - private stop_state state; - bool stop() nothrow @safe { - return state.request_stop(); - } +// Cancellation is an important concept when running asynchronous tasks. +// When an asynchronous operation is no longer needed it is desirable to +// cancel it to avoid doing unnecessary work, potentionally freeing system +// resources and maintaining responsiveness in the process. - bool stop() nothrow @safe shared { - with (assumeThreadSafe) { - return stop(); - } - } +// There are several reasons why an asynchronous operation might become +// irrelevant, including: - bool isStopRequested() nothrow @safe @nogc { - return state.is_stop_requested(); - } +// - An error in one part of an application might render the results of +// other ongoing asynchronous tasks irrelevant. - bool isStopRequested() nothrow @safe @nogc shared { - with (assumeThreadSafe) { - return isStopRequested(); - } - } +// - Clients or end-users might want to abort an earlier request they made. - /// resets the internal state, only do this if you are sure nothing else is looking at this... - void reset(this t)() @system @nogc { - this.state = stop_state(); - } +// - Outstanding work might need to be cancelled due to stale data. - private ref assumeThreadSafe() @trusted @nogc nothrow shared { - return cast()this; - } -} +// - Work might become irrelevant due to errors in external systems. -class StopSource { - private InPlaceStopSource source; +// - As part of control flow in an asynchronous algorithm. Similar to how +// you might have an `break` or early `return` in a synchronous piece of +// code. - bool stop() nothrow @safe { - return source.stop(); - } +// This module implements cooperative cancellation by providing a StopSource, +// StopToken and StopCallback. - bool stop() nothrow @trusted shared { - return source.stop(); - } +// - the StopSource represents the source of a cancellation request. +// - the StopToken is used to determine if a cancellation request has been +// initiated. +// - the StopCallback is invoked when a cancellation request has been +// initiated. - bool isStopRequested() nothrow @safe @nogc { - return source.isStopRequested; - } +// Implementation details: - bool isStopRequested() nothrow @trusted @nogc shared { - return source.isStopRequested; - } +// Allocations can have a non-neglible overhead when running many short-lived +// asynchronous tasks. For this reason these objects are designed to be used +// in-place on the stack. - /// resets the internal state, only do this if you are sure nothing else is looking at this... - void reset(this t)() @system @nogc { - return source.reset(); - } -} +// To ensure program safety and correctness we need to uphold that a +// `StopSource` outlives any associated `StopToken` or `StopCallback`, +// otherwise we might open ourselves to use-after-free errors. -struct StopToken { - package(concurrency) stop_state* state; - this(StopSource source) nothrow @safe @nogc { - if (source !is null) { - this.state = &source.source.state; - isStopPossible = true; - } - } +// We achieve that by: - this(shared StopSource source) nothrow @trusted @nogc { - this(cast()source); - } +// - having a StopCallback deregister itself from a StopSource on destruction +// - having a StopSource deregister any StopCallbacks still registered on +// destruction +// - relying on `scope` StopTokens to ensure they don't outlive the +// StopSource +// - disabling copy and assignment constructors for the StopSource and +// StopToken +// - relying on dip1000 - this(ref stop_state state) nothrow @trusted @nogc { - isStopPossible = true; - this.state = &state; - } +/// A StopSource represents the source of a cancellation request. You can think +/// of it as the entity that initiates the request for cancellation. +/// Once a stop is requested, it cannot be withdrawn. Additional stop requests +/// have no effect. +struct StopSource { + @disable this(ref return scope StopSource rhs) @safe; + @disable this(ref return scope shared StopSource rhs) shared @safe; + public shared StopState state; - this (ref InPlaceStopSource stopSource) nothrow @trusted @nogc { - this(stopSource.state); + ~this() nothrow @safe @nogc shared scope { + assertNoCallbacks(); } - this (InPlaceStopSource* stopSource) nothrow @trusted @nogc { - if (stopSource !is null) { - isStopPossible = true; - this.state = &stopSource.state; - } + /// Returns an associated StopToken + shared(StopToken) token() nothrow @safe @nogc shared return { + return shared StopToken(this); } - bool isStopRequested() nothrow @safe @nogc { - return isStopPossible && state.is_stop_requested(); + /// Trigger a cancellation request. This will trigger any StopCallback that was + /// registered through a StopToken associated with this StopSource. + /// Additionally any calls to the `isStopRequested` method on an associated + /// StopToken will return true. + /// + /// Returns true if this is the first cancellation request. + bool stop() nothrow @safe shared scope { + return state.requestStop(); } - const bool isStopPossible; -} - -struct NeverStopToken { - enum isStopRequested = false; - enum isStopPossible = false; -} - -StopCallback onStop( - StopSource stopSource, - void delegate() nothrow @safe shared callback -) nothrow @safe { - auto cb = new StopCallback(callback); - return onStop(stopSource, cb); -} - -StopCallback onStop(StopSource stopSource, - void function() nothrow @safe callback) nothrow @trusted { - import std.functional : toDelegate; - return stopSource - .onStop(cast(void delegate() nothrow @safe shared) callback.toDelegate); -} + /// Returns true if a cancellation request has been triggered. + bool isStopRequested() nothrow @safe @nogc shared scope { + return state.isStopRequested(); + } -StopCallback onStop(StopToken)( - StopToken stopToken, - void delegate() nothrow @safe shared callback -) nothrow @safe { - auto cb = new StopCallback(callback); + /// Resets the internal state. To ensure no dangling state it will first trigger + /// any registered StopCallbacks. + void reset() nothrow @safe shared scope { + stop(); + this.state = StopState(); + } - onStop(stopToken, cb); + /// Primarily used in unittests or debug builds to ensure there are no + /// dangling callbacks. + /// Because both the StopSource and StopCallbacks are to be placed + /// on the stack, a programming error can result in hard to debug + /// use-after-free errors. + /// This method allows to catch those errors before they becomes + /// much harder to debug. + void assertNoCallbacks() nothrow @safe @nogc shared scope { + state.assertEmpty(); + } - return cb; + @disable void opAssign(shared StopSource rhs) nothrow @safe @nogc shared; + @disable void opAssign(ref shared StopSource rhs) nothrow @safe @nogc shared; } -StopCallback onStop(StopToken)( - StopToken stopToken, - void function() nothrow @safe callback -) nothrow @trusted { - import std.functional : toDelegate; - return stopToken - .onStop(cast(void delegate() nothrow @safe shared) callback.toDelegate); -} +/// A StopToken is associated with a StopSource and is used to determine +/// whether a cancellation request has been initiated. +/// It is up to the task to check this token and react accordingly. +/// +/// Cancellation can be checked either by calling `isStopRequested` or +/// registering a StopCallback. The StopCallback will be called when the +/// underlying StopSource is triggered. +/// +/// A StopToken can be obtained by calling the `token` method on a StopSource. +struct StopToken { + private shared StopState* state; -StopCallback onStop(StopToken)(StopToken stopToken, - StopCallback cb) nothrow @safe { - if (stopToken.isStopPossible) { - stopToken.onStop(cb.callback); + private this(return ref shared StopSource stopSource) nothrow @safe @nogc shared { + this.state = &stopSource.state; } - return cb; -} -void onStop(StopToken)(StopToken stopToken, - ref InPlaceStopCallback cb) nothrow @safe { - if (stopToken.isStopPossible) { - (*stopToken.state).onStop(cb); + /// Returns true if a cancellation request has been initiated with the associated StopSource + bool isStopRequested() nothrow @safe @nogc scope shared { + return state && state.isStopRequested(); } } -StopCallback onStop(StopSource stopSource, StopCallback cb) nothrow @safe { - onStop(stopSource.source.state, cb.callback); - return cb; -} - -void onStop(StopSource stopSource, ref InPlaceStopCallback cb) nothrow @safe { - onStop(stopSource.source.state, cb); -} - -void onStop(ref stop_state state, ref InPlaceStopCallback cb) nothrow @trusted { // TODO: @safe - if (state.try_add_callback(cb, true)) - cb.state = &state; -} +/// A StopCallback can be associated with a StopToken, and contains a callback +/// that will be invoked when a stop request in the underlying StopSource's +/// is triggered. +/// +/// There is no guarantee which execution context the callback is invoked on. +struct StopCallback { + @disable this(ref return scope shared StopCallback rhs); + @disable this(ref return scope StopCallback rhs); + + ~this() @safe scope @nogc nothrow shared { + dispose(); + } + + /// Register the StopCallback with a StopToken and a specified callback. + /// + /// The supplied callback will be invoked exactly once when the StopToken + /// is triggered. That invocation might happen before this function returns. + /// + /// A subsequent call to `register` will first ensure any existing registered + /// callback is unregistered. Note that the existing callback might still get + /// invoked if the existing StopToken is triggered at the same time. + /// + /// In cases where this is unwanted you can call `dispose` yourself before + /// registering a new callback. + void register(return ref shared StopToken token, void delegate() nothrow @safe shared callback) nothrow @safe scope return shared { + dispose(); + + /// TODO: before we continue here we need to be 100% sure the previous callback + /// isn't invoked, or ensure we wait until it is done. + + this.callback = callback; -struct InPlaceStopCallback { - @disable this(ref return scope typeof(this) rhs); + if (token.state && token.state.tryAddCallback(cast(shared)this)) + this.state = token.state; + } - void dispose() nothrow @trusted @nogc { + /// Dispose the stopcallback, deregistering it from an associated StopToken, + /// if any. + /// + /// The registered callback could still be triggered while this function is + /// executing, but it will have been ran to completion before it returns. + void dispose() shared nothrow @trusted @nogc scope { import core.atomic : cas; + // TODO: might have to reset prev or next if state is null if (state is null) return; auto local = state; - static if (__traits(compiles, cas(&state, local, null))) { - if (!cas(&state, local, null)) { - assert(state is null); - return; - } - } else { - if (!cas(cast(shared) &state, cast(shared) local, null)) { - assert(state is null); - return; - } + if (!cas(&state, local, null)) { + assert(state is null); + return; } - - local.remove_callback(this); - } - - void dispose() shared nothrow @trusted @nogc { - (cast() this).dispose(); - } - - this(void delegate() nothrow shared @safe callback) nothrow @safe @nogc { - this.callback = callback; + local.removeCallback(this); } + @disable void opAssign(ref shared StopCallback rhs) nothrow @safe @nogc shared; + @disable void opAssign(shared StopCallback rhs) nothrow @safe @nogc shared; private: + void delegate() nothrow @safe shared callback; + shared StopState* state; - void delegate() nothrow shared @safe callback; - stop_state* state; - - InPlaceStopCallback* next_ = null; - InPlaceStopCallback** prev_ = null; - bool* isRemoved_ = null; + shared StopCallback* next = null; + shared StopCallback** prev = null; + shared bool* isRemoved = null; shared bool callbackFinishedExecuting = false; - - void execute() nothrow @safe { - callback(); - } } -class StopCallback { - this(void delegate() nothrow shared @safe callback) nothrow @safe @nogc { - this.callback = InPlaceStopCallback(callback); - } - - void dispose() nothrow @trusted @nogc { - callback.dispose(); - } - - void dispose() shared nothrow @trusted @nogc { - callback.dispose(); - } -private: - - InPlaceStopCallback callback; -} - -private void spin_yield() nothrow @trusted @nogc { +private void spinYield() nothrow @trusted @nogc { // TODO: could use the pause asm instruction // it is available in LDC as intrinsic... but not in DMD import core.thread : Thread; @@ -238,71 +203,39 @@ private void spin_yield() nothrow @trusted @nogc { Thread.yield(); } -private struct stop_state { +/// The StopState is the internal state object used in the StopSource. +/// It contains the linked list of StopCallbacks and the methods to +/// add, remove and trigger the callbacks in a thread-safe manner. +/// +/// It is based on the code from https://github.com/josuttis/jthread +/// by Nicolai Josuttis, licensed under the Creative Commons Attribution +/// 4.0 Internation License http://creativecommons.org/licenses/by/4.0 +private struct StopState { import core.thread : Thread; import core.atomic : atomicStore, atomicLoad, MemoryOrder, atomicOp, atomicFetchAdd, atomicFetchSub; + import core.atomic : casWeak; - static if (__traits(compiles, () { - import core.atomic : casWeak; - }) && __traits(compiles, () { - import core.internal.atomic - : atomicCompareExchangeWeakNoResult; - })) - import core.atomic : casWeak; - else - auto casWeak(MemoryOrder M1, MemoryOrder M2, T, V1, V2)( - T* here, - V1 ifThis, - V2 writeThis - ) pure nothrow @nogc @safe { - import core.atomic : cas; - - static if (__traits(compiles, - cas!(M1, M2)(here, ifThis, writeThis))) - return cas!(M1, M2)(here, ifThis, writeThis); - else - return cas(here, ifThis, writeThis); - } - -public: - void add_token_reference() nothrow @safe @nogc { - state_.atomicFetchAdd!(MemoryOrder.raw)(token_ref_increment); - } - - void remove_token_reference() nothrow @safe @nogc { - state_.atomicFetchSub!(MemoryOrder.acq_rel)(token_ref_increment); - } - - void add_source_reference() nothrow @safe @nogc { - state_.atomicFetchAdd!(MemoryOrder.raw)(source_ref_increment); - } - - void remove_source_reference() nothrow @safe @nogc { - state_.atomicFetchSub!(MemoryOrder.acq_rel)(source_ref_increment); - } - - bool request_stop() nothrow @safe { - if (!try_lock_and_signal_until_signalled()) { + bool requestStop() nothrow @safe shared scope { + if (!tryLockAndSignalUntilSignalled()) { // Stop has already been requested. return false; } // Set the 'stop_requested' signal and acquired the lock. + storeSignallingThread(); - signallingThread_ = Thread.getThis(); - - while (head_ !is null) { + while (head !is null) { // Dequeue the head of the queue - auto cb = head_; - head_ = cb.next_; - const bool anyMore = head_ !is null; + auto cb = head; + head = cb.next; + const bool anyMore = head !is null; if (anyMore) { - (() @trusted => head_.prev_ = - &head_)(); // compiler 2.091.1 complains "address of variable this assigned to this with longer lifetime". But this is this, how can it have a longer lifetime... + // @trusted due to error "`scope` applies to first indirection only" + (() @trusted => head.prev = &head)(); } // Mark this item as removed from the list. - cb.prev_ = null; + cb.prev = null; // Don't hold lock while executing callback // so we don't block other threads from deregistering callbacks. @@ -311,18 +244,18 @@ public: // TRICKY: Need to store a flag on the stack here that the callback // can use to signal that the destructor was executed inline // during the call. If the destructor was executed inline then - // it's not safe to dereference cb after execute() returns. + // it's not safe to dereference cb after callback() returns. // If the destructor runs on some other thread then the other // thread will block waiting for this thread to signal that the // callback has finished executing. - bool isRemoved = false; - (() @trusted => cb.isRemoved_ = + shared bool isRemoved = false; + (() @trusted => cb.isRemoved = &isRemoved)(); // the pointer to the stack here is removed 3 lines down. - cb.execute(); + cb.callback(); if (!isRemoved) { - cb.isRemoved_ = null; + cb.isRemoved = null; cb.callbackFinishedExecuting .atomicStore!(MemoryOrder.rel)(true); } @@ -343,67 +276,56 @@ public: return true; } - bool is_stop_requested() nothrow @safe @nogc { - return is_stop_requested(state_.atomicLoad!(MemoryOrder.acq)); - } - - bool is_stop_requestable() nothrow @safe @nogc { - return is_stop_requestable(state_.atomicLoad!(MemoryOrder.acq)); + bool isStopRequested() nothrow @safe @nogc shared scope { + return isStopRequested(state.atomicLoad!(MemoryOrder.acq)); } - bool try_add_callback(ref InPlaceStopCallback cb, - bool incrementRefCountIfSuccessful) nothrow @trusted { + bool tryAddCallback(ref return scope shared StopCallback cb) nothrow @trusted shared { ulong oldState; do { goto load_state; do { - spin_yield(); + spinYield(); load_state: - oldState = state_.atomicLoad!(MemoryOrder.acq); - if (is_stop_requested(oldState)) { - cb.execute(); - return false; - } else if (!is_stop_requestable(oldState)) { + oldState = state.atomicLoad!(MemoryOrder.acq); + if (isStopRequested(oldState)) { + cb.callback(); return false; } - } while (is_locked(oldState)); + } while (isLocked(oldState)); } while (!casWeak!(MemoryOrder.acq, MemoryOrder.acq)( - &state_, oldState, oldState | locked_flag)); + &state, oldState, oldState | lockedFlag)); // Push callback onto callback list. - cb.next_ = head_; - if (cb.next_ !is null) { - cb.next_.prev_ = &cb.next_; + cb.next = head; + if (cb.next !is null) { + cb.next.prev = &cb.next; } () @trusted { - cb.prev_ = &head_; + cb.prev = &head; }(); - head_ = &cb; + head = &cb; - if (incrementRefCountIfSuccessful) { - unlock_and_increment_token_ref_count(); - } else { - unlock(); - } + unlock(); // Successfully added the callback. return true; } - void remove_callback(ref InPlaceStopCallback cb) nothrow @safe @nogc { + void removeCallback(ref shared StopCallback cb) nothrow @safe @nogc shared { lock(); - if (cb.prev_ !is null) { + if (cb.prev !is null) { // Still registered, not yet executed // Just remove from the list. - *cb.prev_ = cb.next_; - if (cb.next_ !is null) { - cb.next_.prev_ = cb.prev_; + *cb.prev = cb.next; + if (cb.next !is null) { + cb.next.prev = cb.prev; } - unlock_and_decrement_token_ref_count(); + unlock(); return; } @@ -413,94 +335,119 @@ public: // Callback has either already executed or is executing // concurrently on another thread. - if (signallingThread_ is Thread.getThis()) { + if (isSignallingThread()) { // Callback executed on this thread or is still currently executing // and is deregistering itself from within the callback. - if (cb.isRemoved_ !is null) { - // Currently inside the callback, let the request_stop() method + if (cb.isRemoved !is null) { + // Currently inside the callback, let the requestStop() method // know the object is about to be destructed and that it should // not try to access the object when the callback returns. - *cb.isRemoved_ = true; + *cb.isRemoved = true; } } else { // Callback is currently executing on another thread, // block until it finishes executing. while (!cb.callbackFinishedExecuting.atomicLoad!(MemoryOrder.acq)) { - spin_yield(); + spinYield(); } } + } - remove_token_reference(); + bool isFinished(ref shared StopCallback cb) nothrow @safe @nogc shared { + return cb.callbackFinishedExecuting.atomicLoad!(MemoryOrder.acq); } -private: - static bool is_locked(ulong state) nothrow @safe @nogc { - return (state & locked_flag) != 0; + void assertEmpty() @safe shared @nogc nothrow scope { + // Return early if head is zero. + if (head.atomicLoad!(MemoryOrder.acq) is null) { + return; + } + + // Otherwise we have to do a bit of cleanup before asserting. + // `assertEmpty` is typically called during tests or debug builds only, + // right before destroying the StopSource and its StopState. + // + // In case it is violated we can't just assert however. Both StopCallbacks + // and StopSources sit on the stack and callbacks must be deregistered + // before either one is deconstructed, otherwise the callback might try + // to deregister itself from a dangling StopSource, potentially from + // another thread or fiber. + // + // This can result in complex to debug use-after-free errors. + // In order to reduce that we clean up those callbacks before asserting. + // + // There is no guarantee other callbacks won't be registered concurrently, + // either in between unlock and assert, or even afterwards, but this is + // primarily to be used for detecting errors to aid throubleshooting, + // not as a failsafe mechanism. + lock(); + shared StopState* blank = null; + while (head !is null) { + // Go through all pointers and dispose, this is to + // avoid a segfault when doing the assert. + auto next = head.next; + atomicStore(head.state, blank); + head = next; + } + unlock(); + assert(false, "StopSource has lingering callbacks"); } - static bool is_stop_requested(ulong state) nothrow @safe @nogc { - return (state & stop_requested_flag) != 0; + static bool isLocked(ulong state) nothrow @safe @nogc { + return (state & lockedFlag) != 0; } - static bool is_stop_requestable(ulong state) nothrow @safe @nogc { - // Interruptible if it has already been interrupted or if there are - // still interrupt_source instances in existence. - return is_stop_requested(state) || (state >= source_ref_increment); + static bool isStopRequested(ulong state) nothrow @safe @nogc { + return (state & stopRequestedFlag) != 0; } - bool try_lock_and_signal_until_signalled() nothrow @safe @nogc { + bool tryLockAndSignalUntilSignalled() nothrow @safe @nogc shared scope { ulong oldState; do { - oldState = state_.atomicLoad!(MemoryOrder.acq); - if (is_stop_requested(oldState)) + oldState = state.atomicLoad!(MemoryOrder.acq); + if (isStopRequested(oldState)) return false; - while (is_locked(oldState)) { - spin_yield(); - oldState = state_.atomicLoad!(MemoryOrder.acq); - if (is_stop_requested(oldState)) + while (isLocked(oldState)) { + spinYield(); + oldState = state.atomicLoad!(MemoryOrder.acq); + if (isStopRequested(oldState)) return false; } } while (!casWeak!(MemoryOrder.seq, MemoryOrder.acq)( - &state_, oldState, - oldState | stop_requested_flag | locked_flag)); + &state, oldState, + oldState | stopRequestedFlag | lockedFlag)); return true; } - void lock() nothrow @safe @nogc { + void lock() nothrow @safe @nogc shared scope { ulong oldState; do { - oldState = state_.atomicLoad!(MemoryOrder.raw); - while (is_locked(oldState)) { - spin_yield(); - oldState = state_.atomicLoad!(MemoryOrder.raw); + oldState = state.atomicLoad!(MemoryOrder.raw); + while (isLocked(oldState)) { + spinYield(); + oldState = state.atomicLoad!(MemoryOrder.raw); } } while (!casWeak!(MemoryOrder.acq, MemoryOrder.raw)( - (&state_), oldState, oldState | locked_flag)); + (&state), oldState, oldState | lockedFlag)); } - void unlock() nothrow @safe @nogc { - state_.atomicFetchSub!(MemoryOrder.rel)(locked_flag); + void unlock() nothrow @safe @nogc shared scope { + state.atomicFetchSub!(MemoryOrder.rel)(lockedFlag); } - void unlock_and_increment_token_ref_count() nothrow @safe @nogc { - state_.atomicFetchSub!(MemoryOrder.rel)(locked_flag - token_ref_increment); - } + enum stopRequestedFlag = 1L; + enum lockedFlag = 2L; - void unlock_and_decrement_token_ref_count() nothrow @safe @nogc { - state_.atomicFetchSub!(MemoryOrder.acq_rel)(locked_flag + token_ref_increment); + shared ulong state = 0; + StopCallback* head = null; + Thread signallingThread; + + private void storeSignallingThread() @nogc nothrow @trusted shared scope { + (cast()signallingThread) = Thread.getThis(); } - enum stop_requested_flag = 1L; - enum locked_flag = 2L; - enum token_ref_increment = 4L; - enum source_ref_increment = 1L << 33u; - - // bit 0 - stop-requested - // bit 1 - locked - // bits 2-32 - token ref count (31 bits) - // bits 33-63 - source ref count (31 bits) - shared ulong state_ = source_ref_increment; - InPlaceStopCallback* head_ = null; - Thread signallingThread_; + private bool isSignallingThread() @nogc nothrow @trusted shared scope { + return (cast()signallingThread) is Thread.getThis(); + } } diff --git a/source/concurrency/stream/cycle.d b/source/concurrency/stream/cycle.d index a9e0d3f..05a21d4 100644 --- a/source/concurrency/stream/cycle.d +++ b/source/concurrency/stream/cycle.d @@ -1,13 +1,14 @@ module concurrency.stream.cycle; import concurrency.stream.stream; +import concurrency.stoptoken : StopToken; import std.range : ElementType; struct Cycle(Range) { alias T = ElementType!Range; alias DG = CollectDelegate!(T); Range range; - void loop(StopToken)(DG emit, StopToken stopToken) @safe { + void loop(DG emit, shared StopToken stopToken) @safe { for (; !stopToken.isStopRequested;) { foreach (item; range) { emit(item); diff --git a/source/concurrency/stream/flatmapbase.d b/source/concurrency/stream/flatmapbase.d index dba68a2..1c83fe8 100644 --- a/source/concurrency/stream/flatmapbase.d +++ b/source/concurrency/stream/flatmapbase.d @@ -3,7 +3,7 @@ module concurrency.stream.flatmapbase; import concurrency.stream.stream; import concurrency.sender : OpType, isSender; import concurrency.receiver : ForwardExtensionPoints; -import concurrency.stoptoken : StopSource, StopToken; +import concurrency.stoptoken : StopToken; import std.traits : ReturnType; import concurrency.utils : isThreadSafeFunction; import concepts; @@ -42,7 +42,7 @@ template FlatMapBaseStreamOp(Stream, Fun, OnOverlap overlap) { static if (is(Properties.ElementType == void)) void item() { - if (state.isStopRequested) + if (state.stopSource.isStopRequested) return; state.onItem(); with (state.bitfield.lock()) { @@ -58,7 +58,7 @@ template FlatMapBaseStreamOp(Stream, Fun, OnOverlap overlap) { else void item(Properties.ElementType t) { - if (state.isStopRequested) + if (state.stopSource.isStopRequested) return; state.onItem(); with (state.bitfield.lock()) { @@ -104,7 +104,7 @@ private bool isDoneOrErrorProduced(size_t state) @safe @nogc nothrow pure { } final class State(TStreamSenderValue, TSenderValue, Receiver, - OnOverlap overlap) : StopSource { + OnOverlap overlap) { import concurrency.bitfield; import concurrency.stoptoken; import std.exception : assumeWontThrow; @@ -118,32 +118,31 @@ final class State(TStreamSenderValue, TSenderValue, Receiver, StreamSenderValue value; Throwable throwable; Semaphore semaphore; - StopCallback cb; + shared StopSource stopSource; + shared StopCallback cb; static if (overlap == OnOverlap.latest) - StopSource innerStopSource; + shared StopSource innerStopSource; shared SharedBitField!Flags bitfield; this(DG dg, Receiver receiver) { this.dg = dg; this.receiver = receiver; semaphore = new Semaphore(1); - static if (overlap == OnOverlap.latest) - innerStopSource = new StopSource(); bitfield = SharedBitField!Flags(Counter.tick); - cb = receiver.getStopToken - .onStop(cast(void delegate() nothrow @safe shared) &stop); + auto stopToken = receiver.getStopToken; + cb.register(stopToken, cast(void delegate() nothrow @safe shared) &stop); } - override bool stop() nothrow @trusted { + bool stop() nothrow @trusted { return (cast(shared) this).stop(); } - override bool stop() nothrow @trusted shared { + bool stop() nothrow @trusted shared { static if (overlap == OnOverlap.latest) { - auto r = super.stop(); + auto r = stopSource.stop(); innerStopSource.stop(); return r; } else { - return super.stop(); + return stopSource.stop(); } } @@ -176,11 +175,11 @@ final class State(TStreamSenderValue, TSenderValue, Receiver, } } - private StopToken getSenderStopToken() @safe nothrow { + private shared(StopToken) getSenderStopToken() @safe nothrow { static if (overlap == OnOverlap.latest) { - return StopToken(innerStopSource); + return innerStopSource.token(); } else { - return StopToken(this); + return stopSource.token(); } } } @@ -212,7 +211,7 @@ struct StreamReceiver(State) { if (!isDoneOrErrorProduced(oldState)) { state.throwable = t; release(); // must release before calling .stop - state.stop(); + state.stopSource.stop(); } else release(); if (last) @@ -224,14 +223,14 @@ struct StreamReceiver(State) { with (state.bitfield.update(Flags.doneOrError_produced, Counter.tick)) { bool last = isLast(newState); if (!isDoneOrErrorProduced(oldState)) - state.stop(); + state.stopSource.stop(); if (last) state.process(newState); } } - StopToken getStopToken() @safe nothrow { - return StopToken(state); + shared(StopToken) getStopToken() @safe nothrow { + return state.stopSource.token(); } private auto receiver() { @@ -261,7 +260,7 @@ struct InnerSenderReceiver(State) { if (!isDoneOrErrorProduced(oldState)) { state.throwable = t; release(); // must release before calling .stop - state.stop(); + state.stopSource.stop(); } else release(); if (last) @@ -273,7 +272,7 @@ struct InnerSenderReceiver(State) { void setDone() @safe nothrow { static if (State.onOverlap == OnOverlap.latest) { - if (!state.isStopRequested) { + if (!state.stopSource.isStopRequested) { state.bitfield.add(Counter.tick); notify(); return; @@ -283,7 +282,7 @@ struct InnerSenderReceiver(State) { with (state.bitfield.update(Flags.doneOrError_produced, Counter.tick)) { bool last = isLast(newState); if (!isDoneOrErrorProduced(oldState)) - state.stop(); + state.stopSource.stop(); if (last) state.process(newState); else diff --git a/source/concurrency/stream/package.d b/source/concurrency/stream/package.d index f789c74..dc2402e 100644 --- a/source/concurrency/stream/package.d +++ b/source/concurrency/stream/package.d @@ -61,7 +61,7 @@ auto infiniteStream(T)(T t) { alias DG = CollectDelegate!(T); struct Loop { T val; - void loop(StopToken)(DG emit, StopToken stopToken) { + void loop(DG emit, shared StopToken stopToken) { while (!stopToken.isStopRequested) emit(val); } @@ -75,7 +75,7 @@ auto iotaStream(T)(T start, T end) { alias DG = CollectDelegate!(T); struct Loop { T b, e; - void loop(StopToken)(DG emit, StopToken stopToken) { + void loop(DG emit, shared StopToken stopToken) { foreach (i; b .. e) { emit(i); if (stopToken.isStopRequested) @@ -92,7 +92,7 @@ auto arrayStream(T)(T[] arr) { alias DG = CollectDelegate!(T); struct Loop { T[] arr; - void loop(StopToken)(DG emit, StopToken stopToken) @safe { + void loop(DG emit, shared StopToken stopToken) @safe { foreach (item; arr) { emit(item); if (stopToken.isStopRequested) @@ -305,14 +305,14 @@ shared struct SharedStream(T) { shared SharedStream!T source; DG dg; Receiver receiver; - StopCallback cb; + shared StopCallback cb; @disable this(ref return scope typeof(this) rhs); @disable this(this); void start() nothrow @trusted { auto stopToken = receiver.getStopToken(); - cb = stopToken.onStop(&(cast(shared) this).onStop); + cb.register(stopToken, &(cast(shared) this).onStop); if (stopToken.isStopRequested) { cb.dispose(); receiver.setDone(); diff --git a/source/concurrency/stream/scan.d b/source/concurrency/stream/scan.d index b167a27..5a35520 100644 --- a/source/concurrency/stream/scan.d +++ b/source/concurrency/stream/scan.d @@ -30,7 +30,7 @@ template ScanStreamOp(Stream, Fun, Seed) { @disable this(this); this(Stream stream, Fun scanFn, Seed seed, DG dg, - Receiver receiver) @trusted { + Receiver receiver) @trusted return scope { this.scanFn = scanFn; this.acc = seed; this.dg = dg; diff --git a/source/concurrency/stream/take.d b/source/concurrency/stream/take.d index 5c9e5e4..cfd9369 100644 --- a/source/concurrency/stream/take.d +++ b/source/concurrency/stream/take.d @@ -3,7 +3,7 @@ module concurrency.stream.take; import concurrency.stream.stream; import concurrency.sender : OpType; import concurrency.receiver : ForwardExtensionPoints; -import concurrency.stoptoken : InPlaceStopSource; +import concurrency.stoptoken : StopSource; import concepts; /// takes the first n values from a stream or until cancelled @@ -17,7 +17,7 @@ auto take(Stream)(Stream stream, size_t n) if (models!(Stream, isStream)) { struct TakeReceiver(Receiver, Value) { Receiver receiver; - InPlaceStopSource* stopSource; + shared StopSource* stopSource; static if (is(Value == void)) void setValue() @safe { receiver.setValue(); @@ -51,12 +51,12 @@ template TakeOp(Stream) { struct TakeOp(Receiver) { import concurrency.operations : withStopSource, SSSender; import std.traits : ReturnType; - alias SS = SSSender!(Properties.Sender, InPlaceStopSource*); + alias SS = SSSender!(Properties.Sender); alias Op = OpType!(SS, TakeReceiver!(Receiver, Properties.Sender.Value)); size_t n; Properties.DG dg; - InPlaceStopSource stopSource; + shared StopSource stopSource; Op op; @disable this(ref return scope typeof(this) rhs); diff --git a/source/concurrency/stream/throttling.d b/source/concurrency/stream/throttling.d index c6bad3d..56fef90 100644 --- a/source/concurrency/stream/throttling.d +++ b/source/concurrency/stream/throttling.d @@ -21,14 +21,11 @@ enum ThrottleEmitLogic : uint { last // emit the last item in the window } -; enum ThrottleTimerLogic : uint { noop, // don't reset the timer on new items rearm // reset the timer on new items } -; - /// throttleFirst forwards one item and then enters a cooldown period during which it ignores items auto throttleFirst(Stream)(Stream s, Duration d) { return throttling!(Stream, ThrottleEmitLogic.first, @@ -78,9 +75,9 @@ template ThrottleStreamOp(Stream, ThrottleEmitLogic emitLogic, alias InnerReceiver = TimerReceiver!(typeof(this), Properties.ElementType, emitLogic, timerLogic); - StopSource stopSource; - StopSource timerStopSource; - StopCallback cb; + shared StopSource stopSource; + shared StopSource timerStopSource; + shared StopCallback cb; Throwable throwable; alias Op = OpType!(Properties.Sender, SenderReceiver!(typeof(this), Properties.Value)); @@ -97,8 +94,6 @@ template ThrottleStreamOp(Stream, ThrottleEmitLogic emitLogic, this.dur = dur; this.dg = dg; this.receiver = receiver; - stopSource = new StopSource(); - timerStopSource = new StopSource(); op = stream.collect(cast(Properties.DG) &onItem).connect( SenderReceiver!(typeof(this), Properties.Value)(&this)); } @@ -266,9 +261,9 @@ template ThrottleStreamOp(Stream, ThrottleEmitLogic emitLogic, } void start() @trusted nothrow scope { - cb = receiver.getStopToken().onStop( - cast(void delegate() nothrow @safe shared) &this.stop - ); // butt ugly cast, but it won't take the second overload + auto stopToken = receiver.getStopToken(); + // butt ugly cast, but it won't take the second overload + cb.register(stopToken, cast(void delegate() nothrow @safe shared) &this.stop); op.start(); } } @@ -315,7 +310,7 @@ struct TimerReceiver(Op, ElementType, ThrottleEmitLogic emitLogic, } auto getStopToken() { - return StopToken(state.timerStopSource); + return state.timerStopSource.token(); } auto getScheduler() { @@ -355,7 +350,7 @@ struct SenderReceiver(Op, Value) { } auto getStopToken() { - return StopToken(state.stopSource); + return state.stopSource.token(); } auto getScheduler() { diff --git a/source/concurrency/stream/transform.d b/source/concurrency/stream/transform.d index bfcb342..5b60dde 100644 --- a/source/concurrency/stream/transform.d +++ b/source/concurrency/stream/transform.d @@ -3,7 +3,6 @@ module concurrency.stream.transform; import concurrency.stream.stream; import concurrency.sender : OpType; import concurrency.receiver : ForwardExtensionPoints; -import concurrency.stoptoken : StopSource; import std.traits : ReturnType; import concurrency.utils : isThreadSafeFunction; import concepts; diff --git a/source/concurrency/syncwait.d b/source/concurrency/syncwait.d index 54827df..e0cedd1 100644 --- a/source/concurrency/syncwait.d +++ b/source/concurrency/syncwait.d @@ -18,15 +18,14 @@ package struct SyncWaitReceiver2(Value) { static if (!is(Value == void)) Value result; Throwable throwable; - StopSource stopSource; - this(StopSource stopSource) { - this.stopSource = stopSource; - worker = LocalThreadWorker(getLocalThreadExecutor()); + static auto construct() { + return State(LocalThreadWorker(getLocalThreadExecutor())); } } State* state; + shared StopSource* stopSource; void setDone() nothrow @safe { state.canceled = true; state.worker.stop(); @@ -49,7 +48,7 @@ package struct SyncWaitReceiver2(Value) { } auto getStopToken() nothrow @safe @nogc { - return StopToken(state.stopSource); + return stopSource.token(); } auto getScheduler() nothrow @safe { @@ -114,6 +113,10 @@ struct Result(T) { return std.sumtype.match!((T t) => t, function T(x) { throw new Exception("Unexpected value"); })(result); } + auto trustedGet(T)() { + return std.sumtype.match!((T t) => t, function T(x) { assert(0, "nah"); })(result); + } + auto assumeOk() { return value(); } @@ -128,29 +131,36 @@ template match(Handlers...) { } /// Start the Sender and waits until it completes, cancels, or has an error. -auto syncWait(Sender, StopSource)(auto ref Sender sender, - StopSource stopSource) { - return syncWaitImpl(sender, (() @trusted => cast() stopSource)()); +auto syncWait(Sender)(auto ref Sender sender, + shared ref StopSource stopSource) { + return syncWaitImpl(sender, stopSource); } -auto syncWait(Sender)(auto scope ref Sender sender) { +auto syncWait(Sender)(auto scope ref Sender sender) @trusted { import concurrency.signal : globalStopSource; - auto childStopSource = new shared StopSource(); - auto cb = InPlaceStopCallback(() shared { + shared StopSource childStopSource; + shared parentStopToken = globalStopSource.token(); + shared StopCallback cb; + cb.register(parentStopToken, () shared { childStopSource.stop(); }); - StopToken parentStopToken = StopToken(globalStopSource); - // parentStopToken.onStop(cb); - auto result = - syncWaitImpl(sender, (() @trusted => cast() childStopSource)()); - // detach stopSource - cb.dispose(); - return result; + + try { + auto result = syncWaitImpl(sender, childStopSource); + + version (unittest) childStopSource.assertNoCallbacks; + // detach stopSource + cb.dispose(); + return result; + } catch (Throwable t) { // @suppress(dscanner.suspicious.catch_em_all) + cb.dispose(); + throw t; + } } private Result!(Sender.Value) syncWaitImpl(Sender)(auto scope ref Sender sender, - StopSource stopSource) @safe { + ref shared StopSource stopSource) @safe { static assert(models!(Sender, isSender)); import concurrency.signal; import core.stdc.signal : SIGTERM, SIGINT; @@ -158,8 +168,8 @@ Result!(Sender.Value) syncWaitImpl(Sender)(auto scope ref Sender sender, alias Value = Sender.Value; alias Receiver = SyncWaitReceiver2!(Value); - auto state = Receiver.State(stopSource); - scope receiver = (() @trusted => Receiver(&state))(); + auto state = Receiver.State.construct; + scope receiver = (() @trusted => Receiver(&state, &stopSource))(); auto op = sender.connect(receiver); op.start(); @@ -171,7 +181,8 @@ Result!(Sender.Value) syncWaitImpl(Sender)(auto scope ref Sender sender, if (state.throwable !is null) { if (auto e = cast(Exception) state.throwable) return Result!Value(e); - throw state.throwable; + auto throwable = (() @trusted => state.throwable)(); + throw throwable; } static if (is(Value == void)) diff --git a/source/concurrency/thread.d b/source/concurrency/thread.d index 7ed8881..8482312 100644 --- a/source/concurrency/thread.d +++ b/source/concurrency/thread.d @@ -140,7 +140,7 @@ package struct LocalThreadWorker { return removed; } - void start() @trusted { + void start() @trusted scope { assert(isInContext); // start can only be called on the thread import std.datetime.systime : Clock; import std.array : Appender; diff --git a/source/concurrency/utils.d b/source/concurrency/utils.d index 94ee6a6..977175f 100644 --- a/source/concurrency/utils.d +++ b/source/concurrency/utils.d @@ -44,9 +44,9 @@ auto closure(Fun, Args...)(Fun fun, Args args) @trusted auto cl = cast(shared) new Closure!(Fun, Args)(fun, args); /// need to cast to @safe because a @trusted delegate doesn't fit a @safe one... static if (hasFunctionAttributes!(Fun, "nothrow")) - alias ResultType = void delegate() nothrow shared @safe; + alias ResultType = void delegate() nothrow @safe shared; else - alias ResultType = void delegate() shared @safe; + alias ResultType = void delegate() @safe shared; return cast(ResultType) &cl.apply; } @@ -108,7 +108,7 @@ enum isThreadSafeCallable(alias Fun) = // are all pointing to the same instance. // We do this by exporting accessors functions which a dynamic library can // call to get access to the global. -auto dynamicLoad(alias fun)() nothrow @trusted { +auto dynamicLoad(alias fun)() nothrow @trusted @nogc { alias Fn = typeof(&fun); __gshared Fn fn; @@ -126,7 +126,7 @@ auto dynamicLoad(alias fun)() nothrow @trusted { return fn; } -auto dynamicLoadRaw(alias fun)() nothrow @trusted { +auto dynamicLoadRaw(alias fun)() nothrow @trusted @nogc { alias Fn = typeof(&fun); version(Windows) { import core.sys.windows.windows; diff --git a/tests/ut/concurrency/asyncscope.d b/tests/ut/concurrency/asyncscope.d index 25eab59..4d5b403 100644 --- a/tests/ut/concurrency/asyncscope.d +++ b/tests/ut/concurrency/asyncscope.d @@ -180,7 +180,7 @@ auto waitingTask() { import concurrency.thread : ThreadSender; import concurrency.operations : withStopToken; - return ThreadSender().withStopToken((StopToken token) @trusted { + return ThreadSender().withStopToken((shared StopToken token) @trusted { import core.thread : Thread; while (!token.isStopRequested) { Thread.yield(); diff --git a/tests/ut/concurrency/fork.d b/tests/ut/concurrency/fork.d index 4c01675..f5e0d2c 100644 --- a/tests/ut/concurrency/fork.d +++ b/tests/ut/concurrency/fork.d @@ -18,7 +18,7 @@ unittest { @("sync_wait.fork.exception") @trusted unittest { import core.stdc.stdlib; - ForkSender(getLocalThreadExecutor(), () shared @trusted { + ForkSender(getLocalThreadExecutor(), () @trusted shared { exit(1); }).syncWait.assumeOk.shouldThrow(); } diff --git a/tests/ut/concurrency/mpsc.d b/tests/ut/concurrency/mpsc.d index 153c70a..0f8b221 100644 --- a/tests/ut/concurrency/mpsc.d +++ b/tests/ut/concurrency/mpsc.d @@ -29,7 +29,7 @@ auto intSummer(Q)(Q q) { import concurrency.stoptoken : StopToken; import core.time : msecs; - return just(q).withStopToken((StopToken stopToken, Q q) shared @safe { + return just(q).withStopToken((shared StopToken stopToken, Q q) @safe shared { int sum = 0; while (!stopToken.isStopRequested()) { if (auto node = q.pop()) { @@ -54,8 +54,8 @@ unittest { import core.time : msecs; auto q = new MPSCQueue!Node(); - q.intSummer.stopWhen(intProducer(q, 50000)).syncWait.value.should - == 1250025000; + q.intSummer.stopWhen(intProducer(q, 50_000)).syncWait.value.should + == 1_250_025_000; q.empty.should == true; } @@ -66,10 +66,10 @@ unittest { auto q = new MPSCQueue!Node(); q .intSummer - .stopWhen(whenAll(intProducer(q, 10000), intProducer(q, 10000), - intProducer(q, 10000), intProducer(q, 10000), )) + .stopWhen(whenAll(intProducer(q, 10_000), intProducer(q, 10_000), + intProducer(q, 10_000), intProducer(q, 10_000), )) .syncWait .value - .should == 200020000; + .should == 200_020_000; q.empty.should == true; } diff --git a/tests/ut/concurrency/nursery.d b/tests/ut/concurrency/nursery.d index f681474..5faee93 100644 --- a/tests/ut/concurrency/nursery.d +++ b/tests/ut/concurrency/nursery.d @@ -52,8 +52,8 @@ unittest { unittest { auto nursery = new shared Nursery(); shared(int) global; - nursery.run(ThreadSender().then(() shared @safe { - nursery.run(ValueSender!(int)(5).then((int c) shared @safe { + nursery.run(ThreadSender().then(() @safe shared { + nursery.run(ValueSender!(int)(5).then((int c) @safe shared { global = c; })); })); @@ -66,17 +66,17 @@ unittest { @("run.thread.stop.internal") @safe unittest { auto nursery = new shared Nursery(); - nursery.run(ThreadSender().then(() shared @safe => nursery.stop())); + nursery.run(ThreadSender().then(() @safe shared => nursery.stop())); nursery.syncWait.isCancelled.should == true; nursery.getStopToken().isStopRequested().shouldBeTrue(); } -@("run.thread.stop.external") @trusted +@("run.thread.stop.external") @safe unittest { auto nursery = new shared Nursery(); - auto stopSource = new shared StopSource(); - nursery.run(ThreadSender().then(() shared @safe => stopSource.stop())); - nursery.syncWait(cast(StopSource) stopSource).isCancelled.should == true; + shared stopSource = StopSource(); + nursery.run(ThreadSender().then(() @safe shared => stopSource.stop())); + nursery.syncWait(stopSource).isCancelled.should == true; nursery.getStopToken().isStopRequested().shouldBeTrue(); stopSource.isStopRequested().shouldBeTrue(); } @@ -85,12 +85,12 @@ unittest { unittest { import core.thread : Thread; auto nursery = new shared Nursery(); - auto thread1 = ThreadSender().then(() shared @trusted { + auto thread1 = ThreadSender().then(() @trusted shared { auto token = nursery.getStopToken(); while (!token.isStopRequested()) Thread.yield(); }); - auto thread2 = ThreadSender().then(() shared @safe => nursery.stop()); + auto thread2 = ThreadSender().then(() @safe shared => nursery.stop()); nursery.run(thread1); nursery.run(thread2); nursery.syncWait.isCancelled.should == true; @@ -115,17 +115,17 @@ unittest { unittest { import core.thread : Thread; auto nursery = new shared Nursery(); - auto thread1 = ThreadSender().then(() shared @trusted { + auto thread1 = ThreadSender().then(() @trusted shared { auto token = nursery.getStopToken(); while (!token.isStopRequested()) Thread.yield(); }); auto thread2 = - ThreadSender().withStopToken((StopToken token) shared @trusted { + ThreadSender().withStopToken((shared StopToken token) @trusted shared { while (!token.isStopRequested()) Thread.yield(); }); - auto thread3 = ThreadSender().then(() shared @safe { + auto thread3 = ThreadSender().then(() @safe shared { throw new Exception("Error should stop everyone"); }); nursery.run(thread1); @@ -139,18 +139,17 @@ unittest { @("withStopSource.1") @safe unittest { import core.thread : Thread; - auto stopSource = new StopSource(); + shared StopSource stopSource; auto nursery = new shared Nursery(); auto thread1 = - ThreadSender().withStopToken((StopToken stopToken) shared @trusted { + ThreadSender().withStopToken((shared StopToken stopToken) @trusted shared { while (!stopToken.isStopRequested) Thread.yield(); }).withStopSource(stopSource); // stop via the source - auto stopper = ValueSender!StopSource(stopSource) - .then((StopSource stopSource) shared => stopSource.stop()); + auto stopper = justFrom(() shared => stopSource.stop()); nursery.run(thread1); nursery.run(stopper); @@ -161,18 +160,17 @@ unittest { @("withStopSource.2") @safe unittest { import core.thread : Thread; - auto stopSource = new StopSource(); + shared StopSource stopSource; auto nursery = new shared Nursery(); auto thread1 = - ThreadSender().withStopToken((StopToken stopToken) shared @trusted { + ThreadSender().withStopToken((shared StopToken stopToken) @trusted shared { while (!stopToken.isStopRequested) Thread.yield(); }).withStopSource(stopSource); // stop via the nursery - auto stopper = ValueSender!(shared Nursery)(nursery) - .then((shared Nursery nursery) shared => nursery.stop()); + auto stopper = justFrom(() shared => nursery.stop()); nursery.run(thread1); nursery.run(stopper); diff --git a/tests/ut/concurrency/operations.d b/tests/ut/concurrency/operations.d index e852ea7..9113853 100644 --- a/tests/ut/concurrency/operations.d +++ b/tests/ut/concurrency/operations.d @@ -57,7 +57,7 @@ unittest { unittest { race(ValueSender!int(4), ValueSender!int(5)).syncWait.value.should == 4; auto fastThread = ThreadSender().then(() shared => 1); - auto slowThread = ThreadSender().then(() shared @trusted { + auto slowThread = ThreadSender().then(() @trusted shared { Thread.sleep(50.msecs); return 2; }); @@ -80,7 +80,7 @@ unittest { @("race.exception.double") @safe unittest { - auto slow = ThreadSender().then(() shared @trusted { + auto slow = ThreadSender().then(() @trusted shared { Thread.sleep(50.msecs); throw new Exception("Slow"); }); @@ -92,7 +92,7 @@ unittest { @("race.cancel-other") @safe unittest { - auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { + auto waiting = ThreadSender().withStopToken((shared StopToken token) @trusted { while (!token.isStopRequested) { Thread.yield(); } @@ -103,7 +103,7 @@ unittest { @("race.cancel") @safe unittest { - auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { + auto waiting = ThreadSender().withStopToken((shared StopToken token) @trusted { while (!token.isStopRequested) { Thread.yield(); } @@ -189,7 +189,7 @@ unittest { @("whenAll.cancel") @safe unittest { - auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { + auto waiting = ThreadSender().withStopToken((shared StopToken token) @trusted { while (!token.isStopRequested) { Thread.yield(); } @@ -197,7 +197,7 @@ unittest { whenAll(waiting, DoneSender()).syncWait.isCancelled.should == true; whenAll(ThrowingSender(), waiting).syncWait.assumeOk.shouldThrow; whenAll(waiting, ThrowingSender()).syncWait.assumeOk.shouldThrow; - auto waitingInt = ThreadSender().withStopToken((StopToken token) @trusted { + auto waitingInt = ThreadSender().withStopToken((shared StopToken token) @trusted { while (!token.isStopRequested) { Thread.yield(); } @@ -211,14 +211,13 @@ unittest { @("whenAll.stop") @safe unittest { - auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { + auto waiting = ThreadSender().withStopToken((shared StopToken token) @trusted { while (!token.isStopRequested) { Thread.yield(); } }); - auto source = new StopSource(); - auto stopper = - just(source).then((StopSource source) shared => source.stop()); + shared source = StopSource(); + auto stopper = justFrom(() shared => source.stop()); whenAll(waiting, stopper).withStopSource(source).syncWait.isCancelled.should == true; } @@ -367,32 +366,26 @@ unittest { @("withStopToken.oob") @safe unittest { auto oob = OutOfBandValueSender!int(44); - oob.withStopToken((StopToken stopToken, int t) => t).syncWait.value.should + oob.withStopToken((shared StopToken stopToken, int t) => t).syncWait.value.should == 44; } @("withStopSource.oob") @safe unittest { auto oob = OutOfBandValueSender!int(45); - oob.withStopSource(new StopSource()).syncWait.value.should == 45; -} - -@("withStopSource.oob") @safe -unittest { - auto oob = OutOfBandValueSender!int(45); - InPlaceStopSource stopSource; - oob.withStopSource(stopSource).syncWait.value.should == 45; + shared source = StopSource(); + oob.withStopSource(source).syncWait.value.should == 45; } @("withStopSource.tuple") @safe unittest { - just(14, 53).withStopToken((StopToken s, Tuple!(int, int) t) => t[0] * t[1]) + just(14, 53).withStopToken((shared StopToken s, Tuple!(int, int) t) => t[0] * t[1]) .syncWait.value.should == 742; } @("value.withstoptoken.via.thread") @safe unittest { - ValueSender!int(4).withStopToken((StopToken s, int i) { + ValueSender!int(4).withStopToken((shared StopToken s, int i) { throw new Exception("Badness"); }).via(ThreadSender()).syncWait.assumeOk.shouldThrowWithMessage("Badness"); } @@ -405,7 +398,7 @@ unittest { @("raceAll") @safe unittest { - auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { + auto waiting = ThreadSender().withStopToken((shared StopToken token) @trusted { while (!token.isStopRequested) { Thread.yield(); } @@ -438,8 +431,8 @@ unittest { import concurrency.scheduler : ManualTimeWorker; auto worker = new shared ManualTimeWorker(); - auto source = new StopSource(); - auto driver = just(source).then((StopSource source) shared { + shared StopSource source; + auto driver = justFrom(() shared { worker.timeUntilNextEvent().should == 10.msecs.nullable; source.stop(); worker.timeUntilNextEvent().isNull.should == true; @@ -499,13 +492,13 @@ unittest { @("stopOn") @safe unittest { - auto sourceInner = new shared StopSource(); - auto sourceOuter = new shared StopSource(); + shared sourceInner = StopSource(); + shared sourceOuter = StopSource(); shared bool b; whenAll( delay(5.msecs).then(() shared => b = true) - .stopOn(StopToken(sourceInner)), + .stopOn(sourceInner.token()), just(() => sourceOuter.stop()) ).syncWait(sourceOuter).assumeOk; b.should == true; @@ -513,7 +506,7 @@ unittest { shared bool d; whenAll( delay(5.msecs).then(() shared => b = true) - .stopOn(StopToken(sourceInner)), + .stopOn(sourceInner.token()), just(() => sourceInner.stop()) ).syncWait(sourceOuter).assumeOk; d.should == false; @@ -527,41 +520,41 @@ unittest { import core.sync.event : Event; bool parentAfterChild; Event childEvent, parentEvent; - this() shared @trusted { + this() @trusted shared { (cast() childEvent).initialize(false, false); (cast() parentEvent).initialize(false, false); } - void signalChild() shared @trusted { + void signalChild() @trusted shared { (cast() childEvent).set(); } - void waitChild() shared @trusted { + void waitChild() @trusted shared { (cast() childEvent).wait(); } - void signalParent() shared @trusted { + void signalParent() @trusted shared { (cast() parentEvent).set(); } - void waitParent() shared @trusted { + void waitParent() @trusted shared { (cast() parentEvent).wait(); } } auto state = new shared State(); - auto source = new shared StopSource; + shared source = StopSource(); import std.stdio; auto child = just(state) - .withStopToken((StopToken token, shared State state) @trusted { + .withStopToken((shared StopToken token, shared State state) @trusted { while (!token.isStopRequested) {} state.signalParent(); state.waitChild(); }).via(ThreadSender()); auto parent = - just(state).withStopToken((StopToken token, shared State state) { + just(state).withStopToken((shared StopToken token, shared State state) { state.waitParent(); state.parentAfterChild.atomicStore(token.isStopRequested == false); state.signalChild(); @@ -569,7 +562,7 @@ unittest { whenAll( parent.withChild(child).withStopSource(source), - just(source).then((shared StopSource s) => s.stop()) + justFrom(() shared => source.stop()) ).syncWait.isCancelled.should == true; state.parentAfterChild.atomicLoad.should == true; @@ -641,7 +634,7 @@ unittest { @("stopWhen.source.value") @safe unittest { - auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { + auto waiting = ThreadSender().withStopToken((shared StopToken token) @trusted { while (!token.isStopRequested) { Thread.yield(); } @@ -654,7 +647,7 @@ unittest { @("stopWhen.source.error") @safe unittest { - auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { + auto waiting = ThreadSender().withStopToken((shared StopToken token) @trusted { while (!token.isStopRequested) { Thread.yield(); } @@ -668,7 +661,7 @@ unittest { @("stopWhen.source.cancelled") @safe unittest { - auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { + auto waiting = ThreadSender().withStopToken((shared StopToken token) @trusted { while (!token.isStopRequested) { Thread.yield(); } @@ -679,7 +672,7 @@ unittest { @("stopWhen.trigger.error") @safe unittest { - auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { + auto waiting = ThreadSender().withStopToken((shared StopToken token) @trusted { while (!token.isStopRequested) { Thread.yield(); } @@ -694,7 +687,7 @@ unittest { @("stopWhen.trigger.cancelled.value") @safe unittest { - auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { + auto waiting = ThreadSender().withStopToken((shared StopToken token) @trusted { while (!token.isStopRequested) { Thread.yield(); } diff --git a/tests/ut/concurrency/scheduler.d b/tests/ut/concurrency/scheduler.d index f35ed59..5ffd4a8 100644 --- a/tests/ut/concurrency/scheduler.d +++ b/tests/ut/concurrency/scheduler.d @@ -22,7 +22,7 @@ unittest { @("scheduleAfter.stop-before-add") @safe unittest { import concurrency.sender : delay, justFrom; - auto source = new shared StopSource(); + shared source = StopSource(); whenAll(justFrom(() shared => source.stop), delay(10.msecs)) .syncWait(source); } diff --git a/tests/ut/concurrency/sender.d b/tests/ut/concurrency/sender.d index b94a968..81d0086 100644 --- a/tests/ut/concurrency/sender.d +++ b/tests/ut/concurrency/sender.d @@ -83,7 +83,7 @@ unittest { unittest { ThrowingSender().via(ThreadSender()).syncWait().isError.should == true; } - +/+ @("toShared.basic") @safe unittest { import std.typecons : tuple; @@ -171,17 +171,15 @@ unittest { unittest { import concurrency.stoptoken; import core.atomic : atomicStore, atomicLoad; - shared bool g; + shared bool g;stopSource auto waiting = - ThreadSender().withStopToken((StopToken token) shared @trusted { + ThreadSender().withStopToken((shared StopToken token) @trusted shared { while (!token.isStopRequested) {} g.atomicStore(true); }); - auto source = new StopSource(); - auto stopper = just(source).then((StopSource source) shared { - source.stop(); - }); + shared source = StopSource(); + auto stopper = justFrom(() shared => source.stop()); whenAll(waiting.toShared().withStopSource(source), stopper) .syncWait.isCancelled.should == true; @@ -209,7 +207,7 @@ unittest { auto n = new shared Nursery(); auto s = n.toShared(localThreadScheduler()); } - ++/ @("nvro") @safe unittest { static struct Op(Receiver) { @@ -335,7 +333,7 @@ unittest { import concurrency.stoptoken; just(14, 52).syncWait.value.should == tuple(14, 52); just(14, 53).then((int a, int b) => a * b).syncWait.value.should == 742; - just(14, 54).withStopToken((StopToken s, int a, int b) => a * b).syncWait + just(14, 54).withStopToken((shared StopToken s, int a, int b) => a * b).syncWait .value.should == 756; } diff --git a/tests/ut/concurrency/stoptoken.d b/tests/ut/concurrency/stoptoken.d new file mode 100644 index 0000000..93955e2 --- /dev/null +++ b/tests/ut/concurrency/stoptoken.d @@ -0,0 +1,167 @@ +module ut.concurrency.stoptoken; + +import concurrency.stoptoken; +import unit_threaded; +import std.encoding; + +@("stopsource.isStopRequested.stop") @safe +unittest { + auto source = shared StopSource(); + source.isStopRequested.should == false; + source.stop(); + source.isStopRequested.should == true; +} + +@("stopsource.isStopRequested.reset") @safe +unittest { + auto source = shared StopSource(); + source.stop(); + source.isStopRequested.should == true; + source.reset(); + source.isStopRequested.should == false; +} + +@("stopsource.reset") @safe +unittest { + auto source = shared StopSource(); + auto token = source.token(); + shared StopCallback cb; + cb.register(token, () shared { }); + source.reset(); + source.assertNoCallbacks(); +} + +@("stopsource.no-copy") @safe +unittest { + auto source = shared StopSource(); + /// Disallow copy + static assert(!__traits(compiles, { shared s2 = source; })); +} + +@("stopsource.no-assign") @safe +unittest { + auto source = shared StopSource(); + auto source2 = shared StopSource(); + /// Disallow assign + static assert(!__traits(compiles, { source = shared StopSource(); })); + static assert(!__traits(compiles, { source = source2; })); +} + +@("stoptoken.isStopRequested") @safe +unittest { + auto source = shared StopSource(); + auto token = source.token(); + token.isStopRequested.should == false; + source.stop(); + token.isStopRequested.should == true; +} + +@("stoptoken.lifetime") @safe +unittest { + shared StopToken token; + { + auto source = shared StopSource(); + /// Error: address of variable `source` assigned to `token` with longer lifetime + static assert(!__traits(compiles, () @safe { token = source.token(); })); + } +} + +@("stoptoken.scope.no-escape") @safe +unittest { + shared StopToken t1; + { + shared source = shared StopSource(); + auto token = source.token(); + auto t2 = token; + + // assert that token can't escape the StopSource + static assert(!__traits(compiles, t1 = token)); + // assert that a copy can't escape the StopSource + static assert(!__traits(compiles, t1 = t2)); + } +} + +@("stopcallback.lifetime") @safe +unittest { + shared StopCallback cb; + { + auto source = shared StopSource(); + auto token = source.token(); + /// We don't allow address of variable `token` to be assigned to `cb` with longer lifetime + static assert(!__traits(compiles, () @safe { cb.place(() shared { }, token); })); + } +} + +@("stopcallback.opassign") @safe +unittest { + shared StopCallback cb1; + shared StopCallback cb2; + StopCallback cb3; + /// Disallow assign + static assert(!__traits(compiles, { cb1 = shared StopCallback(); })); + static assert(!__traits(compiles, { cb1 = cb2; })); + static assert(!__traits(compiles, { cb1 = StopCallback(); })); + static assert(!__traits(compiles, { cb1 = cb3; })); +} + +@("stopsource.StopCallback.stop") @safe +unittest { + import core.atomic; + auto source = shared StopSource(); + auto token = source.token(); + shared bool set; + shared StopCallback cb; + cb.register(token, () shared { set.atomicStore(true); }); + set.atomicLoad.should == false; + source.stop(); + set.atomicLoad.should == true; +} + +@("stopsource.StopCallback.reset") @safe +unittest { + import core.atomic; + auto source = shared StopSource(); + auto token = source.token(); + shared bool set; + shared StopCallback cb; + cb.register(token, () shared { set.atomicStore(true); }); + set.atomicLoad.should == false; + source.reset(); + set.atomicLoad.should == true; +} + +@("StopSource.stoptoken.stop") @safe +unittest { + auto source = shared StopSource(); + auto token = source.token(); + token.isStopRequested.should == false; + source.stop(); + token.isStopRequested.should == true; +} + +@("StopSource.StopCallback.stop") @safe +unittest { + import core.atomic; + auto source = shared StopSource(); + auto token = source.token(); + shared bool set; + shared StopCallback cb; + cb.register(token, () shared { set.atomicStore(true); }); + set.atomicLoad.should == false; + source.stop(); + set.atomicLoad.should == true; +} + +@("StopSource.StopCallback.dispose") @safe +unittest { + import core.atomic; + auto source = shared StopSource(); + auto token = source.token(); + shared bool set; + shared StopCallback cb; + cb.register(token, () shared { set.atomicStore(true); }); + set.atomicLoad.should == false; + cb.dispose(); + source.stop(); + set.atomicLoad.should == false; +} diff --git a/tests/ut/concurrency/stream.d b/tests/ut/concurrency/stream.d index 7d81cc7..43ec372 100644 --- a/tests/ut/concurrency/stream.d +++ b/tests/ut/concurrency/stream.d @@ -47,7 +47,7 @@ unittest { unittest { import concurrency.operations : withStopSource; shared int g = 0; - auto source = new shared StopSource(); + auto source = shared StopSource(); infiniteStream(5).collect((int n) shared { if (g < 14) g.atomicOp!"+="(n); @@ -82,7 +82,7 @@ unittest { unittest { struct Loop { size_t b, e; - void loop(DG, StopToken)(DG emit, StopToken stopToken) { + void loop(DG)(DG emit, shared StopToken stopToken) { foreach (i; b .. e) emit(i); } @@ -828,7 +828,7 @@ unittest { unittest { import concurrency.stream.defer; static struct S { - auto opCall() shared @safe { + auto opCall() @safe shared { import concurrency.sender; return just(1); } diff --git a/tests/ut/concurrency/thread.d b/tests/ut/concurrency/thread.d index d7c1f1b..ec2d3a6 100644 --- a/tests/ut/concurrency/thread.d +++ b/tests/ut/concurrency/thread.d @@ -85,20 +85,3 @@ unittest { import concurrency.utils; dynamicLoadRaw!concurrency_getLocalThreadExecutor.should.not == null; } - -@("nested.intervalStream") @safe -unittest { - import core.time : msecs; - import concurrency.stream : intervalStream, take; - - static auto interval() { - return intervalStream(1.msecs).take(90).collect(() shared {}); - } - - auto sender = justFrom(() => interval().syncWait()).via(ThreadSender()); - - auto d = delay(10.msecs); - - whenAll(interval, sender, interval, sender, interval, sender, d, d, d) - .syncWait().assumeOk; -} diff --git a/tests/ut/concurrency/utils.d b/tests/ut/concurrency/utils.d index a7ebfca..63c8e09 100644 --- a/tests/ut/concurrency/utils.d +++ b/tests/ut/concurrency/utils.d @@ -25,7 +25,7 @@ unittest { auto systemSharedClosure = (int i) shared => i * system() * k; static assert(!isThreadSafeFunction!systemSharedClosure); - auto trustedSharedClosure = (int i) shared @trusted => i * system() * k; + auto trustedSharedClosure = (int i) @trusted shared => i * system() * k; static assert(isThreadSafeFunction!trustedSharedClosure); } diff --git a/tests/ut/concurrency/waitable.d b/tests/ut/concurrency/waitable.d index 5e06b95..40eb26d 100644 --- a/tests/ut/concurrency/waitable.d +++ b/tests/ut/concurrency/waitable.d @@ -33,7 +33,7 @@ auto intSummer(Q)(Q q) { import concurrency.stoptoken : StopToken; import core.time : msecs; - return just(q).withStopToken((StopToken stopToken, Q q) shared @safe { + return just(q).withStopToken((shared StopToken stopToken, Q q) @safe shared { int sum = 0; while (!stopToken.isStopRequested()) { if (auto node = q.pop(100.msecs)) { @@ -58,8 +58,8 @@ unittest { import core.time : msecs; auto q = new WaitableQueue!(MPSCQueue!Node)(); - q.intSummer.stopWhen(intProducer(q, 50000)).syncWait.value.should - == 1250025000; + q.intSummer.stopWhen(intProducer(q, 50_000)).syncWait.value.should + == 1_250_025_000; q.empty.should == true; } @@ -70,10 +70,10 @@ unittest { auto q = new WaitableQueue!(MPSCQueue!Node)(); q .intSummer - .stopWhen(whenAll(intProducer(q, 10000), intProducer(q, 10000), - intProducer(q, 10000), intProducer(q, 10000), )) + .stopWhen(whenAll(intProducer(q, 10_000), intProducer(q, 10_000), + intProducer(q, 10_000), intProducer(q, 10_000), )) .syncWait .value - .should == 200020000; + .should == 200_020_000; q.empty.should == true; } diff --git a/tests/ut/ut_runner.d b/tests/ut/ut_runner.d index 6ff970d..8986fd2 100644 --- a/tests/ut/ut_runner.d +++ b/tests/ut/ut_runner.d @@ -17,5 +17,7 @@ int main(string[] args) { "ut.concurrency.waitable", "ut.concurrency.asyncscope", "concurrency.timingwheels", + "concurrency.stoptoken", + "ut.concurrency.stoptoken", ); } From 9f4478db21793ad2b0a5a93b4349d15e44499263 Mon Sep 17 00:00:00 2001 From: Sebastiaan Koppe Date: Sun, 23 Jun 2024 21:34:16 +0200 Subject: [PATCH 2/2] Update minimal dmd and ldc compilers --- .github/workflows/main.yml | 2 +- README.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 6cd6955..71dd77a 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -8,7 +8,7 @@ jobs: fail-fast: false matrix: os: [ubuntu-latest, windows-latest] - dc: [dmd-latest, ldc-latest, dmd-2.098.1, ldc-1.28.1] + dc: [dmd-latest, ldc-latest, dmd-2.105.3, ldc-1.35.0] runs-on: ${{ matrix.os }} steps: - uses: actions/checkout@v3 diff --git a/README.md b/README.md index 688c20f..77026cf 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Structured Concurrency -   +   Provides various primitives useful for structured concurrency and async tasks. ## Senders/Receivers