diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml
index 6cd6955..71dd77a 100644
--- a/.github/workflows/main.yml
+++ b/.github/workflows/main.yml
@@ -8,7 +8,7 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest]
- dc: [dmd-latest, ldc-latest, dmd-2.098.1, ldc-1.28.1]
+ dc: [dmd-latest, ldc-latest, dmd-2.105.3, ldc-1.35.0]
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v3
diff --git a/README.md b/README.md
index 688c20f..77026cf 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,6 @@
# Structured Concurrency
-
+
Provides various primitives useful for structured concurrency and async tasks.
## Senders/Receivers
diff --git a/source/concurrency/asyncscope.d b/source/concurrency/asyncscope.d
index 6e1e25e..1adcf7b 100644
--- a/source/concurrency/asyncscope.d
+++ b/source/concurrency/asyncscope.d
@@ -10,8 +10,9 @@ private enum Flag {
}
auto asyncScope() @safe {
+ import concurrency.sender : Promise;
// ensure NRVO
- auto as = shared AsyncScope(new shared StopSource());
+ auto as = shared AsyncScope(new shared Promise!void);
return as;
}
@@ -66,9 +67,8 @@ public:
cleanup.syncWait();
}
- this(shared StopSource stopSource) @safe shared {
- completion = new shared Promise!void;
- this.stopSource = stopSource;
+ this(shared Promise!void completion) @safe shared {
+ this.completion = completion;
}
auto cleanup() @safe shared {
@@ -93,7 +93,7 @@ public:
return stopSource.stop();
}
- bool spawn(Sender)(Sender s) shared @trusted {
+ bool spawn(Sender)(Sender s) @trusted shared {
import concurrency.sender : connectHeap;
with (flag.update(0, Flag.tick)) {
if ((oldState & Flag.stopped) == 1) {
@@ -133,7 +133,7 @@ struct AsyncScopeReceiver {
auto getStopToken() nothrow @safe {
import concurrency.stoptoken : StopToken;
- return StopToken(s.stopSource);
+ return s.stopSource.token();
}
auto getScheduler() nothrow @safe {
diff --git a/source/concurrency/data/queue/mpsc.d b/source/concurrency/data/queue/mpsc.d
index c303720..e2caf56 100644
--- a/source/concurrency/data/queue/mpsc.d
+++ b/source/concurrency/data/queue/mpsc.d
@@ -2,7 +2,7 @@ module concurrency.data.queue.mpsc;
struct MPSCQueueProducer(Node) {
private MPSCQueue!(Node) q;
- void push(Node* node) shared @trusted {
+ void push(Node* node) @trusted shared {
(cast() q).push(node);
}
}
diff --git a/source/concurrency/data/queue/waitable.d b/source/concurrency/data/queue/waitable.d
index f2026dd..57f6ad2 100644
--- a/source/concurrency/data/queue/waitable.d
+++ b/source/concurrency/data/queue/waitable.d
@@ -61,7 +61,7 @@ struct WaitableQueueProducer(Q) {
private shared Q q;
private shared Semaphore sema;
- bool push(Q.ElementType t) shared @trusted {
+ bool push(Q.ElementType t) @trusted shared {
bool r = (cast() q).push(t);
if (r)
(cast() sema).notify();
diff --git a/source/concurrency/executor.d b/source/concurrency/executor.d
index 98c66fb..8546e5d 100644
--- a/source/concurrency/executor.d
+++ b/source/concurrency/executor.d
@@ -1,7 +1,7 @@
module concurrency.executor;
alias VoidFunction = void function() @safe;
-alias VoidDelegate = void delegate() shared @safe;
+alias VoidDelegate = void delegate() @safe shared;
interface Executor {
void execute(VoidFunction fn) @safe;
diff --git a/source/concurrency/fork.d b/source/concurrency/fork.d
index 1ea9320..f9a3577 100644
--- a/source/concurrency/fork.d
+++ b/source/concurrency/fork.d
@@ -63,7 +63,8 @@ version(Posix)
if (afterFork)
afterFork(pid);
- auto cb = token.onStop(() @trusted shared nothrow =>
+ shared StopCallback cb;
+ cb.register(token, () @trusted shared nothrow =>
cast(void) kill(pid, SIGINT));
int status;
auto ret = waitpid(pid, &status, 0);
@@ -102,7 +103,7 @@ version(Posix)
import concurrency.utils : closure;
executeInNewThread(
- cast(void delegate() shared @safe) &this.run);
+ cast(void delegate() @safe shared) &this.run);
}
}
diff --git a/source/concurrency/nursery.d b/source/concurrency/nursery.d
index 17ff76b..7b9dadc 100644
--- a/source/concurrency/nursery.d
+++ b/source/concurrency/nursery.d
@@ -1,8 +1,8 @@
module concurrency.nursery;
-import concurrency.stoptoken : StopSource, StopToken, StopCallback, onStop;
+import concurrency.stoptoken : StopSource, StopToken, StopCallback;
import concurrency.thread : LocalThreadExecutor;
-import concurrency.receiver : getStopToken;
+// import concurrency.receiver : getStopToken;
import concurrency.scheduler : SchedulerObjectBase;
import std.typecons : Nullable;
@@ -14,7 +14,7 @@ import std.typecons : Nullable;
/// When cancellation happens all Senders are waited on for completion.
/// Senders can be added to the Nursery at any time.
/// Senders are only started when the Nursery itself is being awaited on.
-class Nursery : StopSource {
+class Nursery {
import concurrency.sender : isSender, OperationalStateBase;
import core.sync.mutex : Mutex;
import concepts;
@@ -22,6 +22,7 @@ class Nursery : StopSource {
alias Value = void;
private {
+ shared StopSource stopSource;
Node[] operations;
struct Node {
OperationalStateBase state;
@@ -38,7 +39,7 @@ class Nursery : StopSource {
shared size_t counter = 0;
Throwable throwable; // first throwable from sender, if any
ReceiverObject receiver;
- StopCallback stopCallback;
+ shared StopCallback stopCallback;
Nursery assumeThreadSafe() @trusted shared nothrow {
return cast(Nursery) this;
}
@@ -50,8 +51,8 @@ class Nursery : StopSource {
with (assumeThreadSafe) mutex = new Mutex();
}
- override bool stop() nothrow @trusted {
- auto result = super.stop();
+ bool stop() nothrow @trusted {
+ auto result = stopSource.stop();
if (result)
(cast(shared) this).done(-1);
@@ -59,12 +60,13 @@ class Nursery : StopSource {
return result;
}
- override bool stop() nothrow @trusted shared {
+ bool stop() nothrow @trusted shared {
return (cast(Nursery) this).stop();
}
- StopToken getStopToken() nothrow @trusted shared {
- return StopToken(cast(Nursery) this);
+ shared(StopToken) getStopToken() nothrow @safe shared {
+ return stopSource.token();
+ // return shared StopToken(stopSource);
}
private auto getScheduler() nothrow @trusted shared {
@@ -94,9 +96,9 @@ class Nursery : StopSource {
if (isDone) {
throwable = null;
receiver = null;
- if (stopCallback)
- stopCallback.dispose();
- stopCallback = null;
+ // if (stopCallback)
+ stopCallback.dispose();
+ // stopCallback = null;
}
mutex.unlock_nothrow();
@@ -104,7 +106,7 @@ class Nursery : StopSource {
if (isDone && localReceiver !is null) {
if (localThrowable !is null) {
localReceiver.setError(localThrowable);
- } else if (isStopRequested()) {
+ } else if (stopSource.isStopRequested()) {
localReceiver.setDone();
} else {
try {
@@ -122,7 +124,7 @@ class Nursery : StopSource {
run(sender.get());
}
- void run(Sender)(Sender sender) shared @trusted {
+ void run(Sender)(Sender sender) @trusted shared {
import concepts;
static assert(models!(Sender, isSender));
import std.typecons : Nullable;
@@ -161,7 +163,7 @@ class Nursery : StopSource {
auto connect(Receiver)(
return Receiver receiver
- ) shared @trusted return scope {
+ ) @trusted shared return scope {
final class ReceiverImpl : ReceiverObject {
Receiver receiver;
SchedulerObjectBase scheduler;
@@ -187,21 +189,24 @@ class Nursery : StopSource {
scheduler = receiver.getScheduler().toSchedulerObject();
return scheduler;
}
+
+ shared(StopToken) getStopToken() nothrow @safe {
+ return receiver.getStopToken();
+ }
}
- auto stopToken = receiver.getStopToken();
- auto cb = (() @trusted => stopToken
- .onStop(() shared nothrow @trusted => cast(void) this.stop()))();
- return NurseryOp(this, cb, new ReceiverImpl(receiver));
+ return NurseryOp(this, new ReceiverImpl(receiver));
}
private
- void setReceiver(ReceiverObject r, StopCallback cb) nothrow @safe shared {
+ void setReceiver(ReceiverObject r) nothrow @trusted shared {
with (assumeThreadSafe) {
mutex.lock_nothrow();
assert(this.receiver is null, "Cannot await a nursery twice.");
receiver = r;
- stopCallback = cb;
+ auto dg = () nothrow @safe shared => cast(void) this.stop();
+ auto stopToken = r.getStopToken;
+ stopCallback.register(stopToken, dg);
auto ops = operations.dup();
mutex.unlock_nothrow();
@@ -217,6 +222,7 @@ private interface ReceiverObject {
void setDone() nothrow @safe;
void setError(Throwable e) nothrow @safe;
SchedulerObjectBase getScheduler() nothrow @safe;
+ shared(StopToken) getStopToken() nothrow @safe;
}
private struct NurseryReceiver(Value) {
@@ -228,7 +234,7 @@ private struct NurseryReceiver(Value) {
}
static if (is(Value == void)) {
- void setValue() shared @safe {
+ void setValue() @safe shared {
(cast() this).setDone();
}
@@ -236,7 +242,7 @@ private struct NurseryReceiver(Value) {
(cast() this).setDone();
}
} else {
- void setValue(Value val) shared @trusted {
+ void setValue(Value val) @trusted shared {
(cast() this).setDone();
}
@@ -264,24 +270,24 @@ private struct NurseryReceiver(Value) {
private struct NurseryOp {
shared Nursery nursery;
- StopCallback cb;
ReceiverObject receiver;
@disable
this(ref return scope typeof(this) rhs);
@disable
this(this);
- this(return shared Nursery n, StopCallback cb,
- ReceiverObject r) @safe return scope {
+ this(shared Nursery n,
+ ReceiverObject r) @safe {
nursery = n;
- this.cb = cb;
+ // this.cb = cb;
receiver = r;
}
void start() nothrow scope @trusted {
import core.atomic : atomicLoad;
- if (nursery.busy.atomicLoad == 0)
+ if (nursery.busy.atomicLoad == 0) {
+ // cb.dispose();
receiver.setDone();
- else
- nursery.setReceiver(receiver, cb);
+ } else
+ nursery.setReceiver(receiver);//, cb);
}
}
diff --git a/source/concurrency/operations/race.d b/source/concurrency/operations/race.d
index 6e7f158..00b324b 100644
--- a/source/concurrency/operations/race.d
+++ b/source/concurrency/operations/race.d
@@ -66,32 +66,32 @@ private struct RaceOp(Receiver, Senders...) {
this(ref return scope typeof(this) rhs);
this(Receiver receiver, return Senders senders,
bool noDropouts) @trusted scope {
+ state = State!R(noDropouts);
this.receiver = receiver;
- state = new State!(R)(noDropouts);
static if (Senders.length > 1) {
foreach (i, Sender; Senders) {
ops[i] = senders[i].connect(
- ElementReceiver!(Sender)(receiver, state, Senders.length));
+ ElementReceiver!(Sender)(receiver, &state, Senders.length));
}
} else {
ops.length = senders[0].length;
foreach (i; 0 .. senders[0].length) {
ops[i] = senders[0][i].connect(
- ElementReceiver(receiver, state, senders[0].length));
+ ElementReceiver(receiver, &state, senders[0].length));
}
}
}
void start() @trusted nothrow scope {
- import concurrency.stoptoken : StopSource;
if (receiver.getStopToken().isStopRequested) {
receiver.setDone();
return;
}
- state.cb = receiver.getStopToken().onStop(
- cast(void delegate() nothrow @safe shared) &state.stop
- ); // butt ugly cast, but it won't take the second overload
+ auto token = receiver.getStopToken();
+ // butt ugly cast, but it won't take the second overload
+ state.cb.register(token, cast(void delegate() nothrow @safe shared) &state.stopSource.stop);
+
static if (Senders.length > 1) {
foreach (i, _; Senders) {
ops[i].start();
@@ -121,9 +121,10 @@ struct RaceSender(Senders...)
}
}
-private class State(Value) : StopSource {
+private struct State(Value) {
import concurrency.bitfield;
- StopCallback cb;
+ shared StopSource stopSource;
+ shared StopCallback cb;
shared SharedBitField!Flags bitfield;
static if (!is(Value == void))
Value value;
@@ -148,10 +149,10 @@ private enum Counter : size_t {
private struct RaceReceiver(Receiver, InnerValue, Value) {
import core.atomic : atomicOp, atomicLoad, MemoryOrder;
Receiver receiver;
- State!(Value) state;
+ State!(Value)* state;
size_t senderCount;
auto getStopToken() {
- return StopToken(state);
+ return state.stopSource.token();
}
private bool isValueProduced(size_t state) {
@@ -176,7 +177,7 @@ private struct RaceReceiver(Receiver, InnerValue, Value) {
else
state.value = Value(value);
release(); // must release before calling .stop
- state.stop();
+ state.stopSource.stop();
} else
release();
@@ -190,7 +191,7 @@ private struct RaceReceiver(Receiver, InnerValue, Value) {
with (state.bitfield.update(Flags.value_produced, Counter.tick)) {
bool last = isLast(newState);
if (!isValueProduced(oldState)) {
- state.stop();
+ state.stopSource.stop();
}
if (last)
@@ -202,7 +203,7 @@ private struct RaceReceiver(Receiver, InnerValue, Value) {
with (state.bitfield.update(Flags.doneOrError_produced, Counter.tick)) {
bool last = isLast(newState);
if (state.noDropouts && !isDoneOrErrorProduced(oldState)) {
- state.stop();
+ state.stopSource.stop();
}
if (last)
@@ -217,7 +218,7 @@ private struct RaceReceiver(Receiver, InnerValue, Value) {
state.exception = exception;
if (state.noDropouts) {
release(); // release before stop
- state.stop();
+ state.stopSource.stop();
}
}
diff --git a/source/concurrency/operations/stopon.d b/source/concurrency/operations/stopon.d
index 8d979cd..d03eb6b 100644
--- a/source/concurrency/operations/stopon.d
+++ b/source/concurrency/operations/stopon.d
@@ -7,14 +7,14 @@ import concurrency.stoptoken;
import concepts;
import std.traits;
-auto stopOn(Sender)(Sender sender, StopToken stopToken) {
+auto stopOn(Sender)(Sender sender, shared StopToken stopToken) {
return StopOn!(Sender)(sender, stopToken);
}
private struct StopOnReceiver(Receiver, Value) {
private {
Receiver receiver;
- StopToken stopToken;
+ shared StopToken stopToken;
}
static if (is(Value == void)) {
@@ -35,7 +35,7 @@ private struct StopOnReceiver(Receiver, Value) {
receiver.setError(e);
}
- auto getStopToken() nothrow @trusted {
+ auto getStopToken() nothrow @safe {
return stopToken;
}
@@ -46,7 +46,7 @@ struct StopOn(Sender) if (models!(Sender, isSender)) {
static assert(models!(typeof(this), isSender));
alias Value = Sender.Value;
Sender sender;
- StopToken stopToken;
+ shared StopToken stopToken;
auto connect(Receiver)(return Receiver receiver) @safe return scope {
alias R = StopOnReceiver!(Receiver, Sender.Value);
// ensure NRVO
diff --git a/source/concurrency/operations/stopwhen.d b/source/concurrency/operations/stopwhen.d
index 3eb9ee5..7e738fa 100644
--- a/source/concurrency/operations/stopwhen.d
+++ b/source/concurrency/operations/stopwhen.d
@@ -29,11 +29,10 @@ private struct StopWhenOp(Receiver, Sender, Trigger) {
this(Receiver receiver, return Sender source,
return Trigger trigger) @trusted scope {
this.receiver = receiver;
- state = new State!(Sender.Value)();
sourceOp = source
- .connect(SourceReceiver!(Receiver, Sender.Value)(receiver, state));
+ .connect(SourceReceiver!(Receiver, Sender.Value)(receiver, &state));
triggerOp = trigger
- .connect(TriggerReceiver!(Receiver, Sender.Value)(receiver, state));
+ .connect(TriggerReceiver!(Receiver, Sender.Value)(receiver, &state));
}
void start() @trusted nothrow scope {
@@ -42,9 +41,10 @@ private struct StopWhenOp(Receiver, Sender, Trigger) {
return;
}
- state.cb = receiver.getStopToken().onStop(
- cast(void delegate() nothrow @safe shared) &state.stop
- ); // butt ugly cast, but it won't take the second overload
+ auto token = receiver.getStopToken();
+ // butt ugly cast, but it won't take the second overload
+ state.cb.register(token, cast(void delegate() nothrow @safe shared) &state.stopSource.stop);
+
sourceOp.start;
triggerOp.start;
}
@@ -64,9 +64,11 @@ struct StopWhenSender(Sender, Trigger)
}
}
-private class State(Value) : StopSource {
+// refactor to use StopSource
+private struct State(Value) {
import concurrency.bitfield;
- StopCallback cb;
+ shared StopSource stopSource;
+ shared StopCallback cb;
shared SharedBitField!Flags bitfield;
static if (!is(Value == void))
Value value;
@@ -116,15 +118,15 @@ private bool isLast(size_t state) @safe nothrow pure {
private struct TriggerReceiver(Receiver, Value) {
Receiver receiver;
- State!(Value) state;
+ State!(Value)* state;
auto getStopToken() {
- return StopToken(state);
+ return state.stopSource.token();
}
void setValue() @safe nothrow {
with (state.bitfield.update(Flags.tick)) {
if (!isLast(oldState))
- state.stop();
+ state.stopSource.stop();
else
state.process(receiver, newState);
}
@@ -133,7 +135,7 @@ private struct TriggerReceiver(Receiver, Value) {
void setDone() @safe nothrow {
with (state.bitfield.update(Flags.doneOrError_produced, Flags.tick)) {
if (!isLast(oldState))
- state.stop();
+ state.stopSource.stop();
else
state.process(receiver, newState);
}
@@ -145,7 +147,7 @@ private struct TriggerReceiver(Receiver, Value) {
if (!isDoneOrErrorProduced(oldState)) {
state.exception = exception;
release(); // release before stop
- state.stop();
+ state.stopSource.stop();
} else {
release();
if (last)
@@ -160,9 +162,9 @@ private struct TriggerReceiver(Receiver, Value) {
private struct SourceReceiver(Receiver, Value) {
import core.atomic : atomicOp, atomicLoad, MemoryOrder;
Receiver receiver;
- State!(Value) state;
+ State!(Value)* state;
auto getStopToken() {
- return StopToken(state);
+ return state.stopSource.token();
}
static if (!is(Value == void))
@@ -173,7 +175,7 @@ private struct SourceReceiver(Receiver, Value) {
release();
if (!last)
- state.stop();
+ state.stopSource.stop();
else if (isDoneOrErrorProduced(oldState))
state.process(receiver, oldState);
else
@@ -186,7 +188,7 @@ private struct SourceReceiver(Receiver, Value) {
with (state.bitfield.update(Flags.value_produced | Flags.tick)) {
bool last = isLast(oldState);
if (!last)
- state.stop();
+ state.stopSource.stop();
else if (isDoneOrErrorProduced(oldState))
state.process(receiver, oldState);
else
@@ -198,7 +200,7 @@ private struct SourceReceiver(Receiver, Value) {
with (state.bitfield.update(Flags.doneOrError_produced | Flags.tick)) {
bool last = isLast(oldState);
if (!last)
- state.stop();
+ state.stopSource.stop();
else
state.process(receiver, newState);
}
@@ -213,7 +215,7 @@ private struct SourceReceiver(Receiver, Value) {
release();
if (!last)
- state.stop();
+ state.stopSource.stop();
else
state.process(receiver, newState);
}
diff --git a/source/concurrency/operations/toshared.d b/source/concurrency/operations/toshared.d
index 2c56656..ce9141f 100644
--- a/source/concurrency/operations/toshared.d
+++ b/source/concurrency/operations/toshared.d
@@ -82,7 +82,7 @@ class SharedSender(Sender, Scheduler, ResetLogic resetLogic)
if ((newState >> 2) == 0) {
auto localStopSource = state.inst;
release(); // release early
- localStopSource.stop();
+ localStopSource.stopSource.stop();
return false;
} else {
auto localReceiver = state.inst;
@@ -145,14 +145,15 @@ struct SharedSenderOp(Sender, Scheduler, ResetLogic resetLogic, Receiver) {
alias Props = Properties!(Sender);
SharedSender!(Sender, Scheduler, resetLogic) parent;
Receiver receiver;
- StopCallback cb;
+ shared StopCallback cb;
@disable
this(ref return scope typeof(this) rhs);
@disable
this(this);
void start() nothrow @trusted scope {
parent.add(&(cast(shared) this).onValue);
- cb = receiver.getStopToken.onStop(&(cast(shared) this).onStop);
+ auto stopToken = receiver.getStopToken;
+ cb.register(stopToken, &(cast(shared) this).onStop);
}
void onStop() nothrow @trusted shared {
@@ -223,8 +224,8 @@ private struct SharedSenderReceiver(Sender, Scheduler, ResetLogic resetLogic) {
state.process!(resetLogic)(v);
}
- StopToken getStopToken() @trusted nothrow {
- return StopToken(state.inst);
+ shared(StopToken) getStopToken() @safe nothrow {
+ return state.inst.stopSource.token();
}
Scheduler getScheduler() @safe nothrow scope {
@@ -255,7 +256,7 @@ private template process(ResetLogic resetLogic) {
auto localState = state.inst;
InternalValue v = localState.value.match!((typeof(null)) => assert(0, "not happening"), v => v);
release(oldState & (~0x3)); // release early and remove all ticks
- if (localState.isStopRequested)
+ if (localState.stopSource.isStopRequested)
(() @trusted => v = Done())();
foreach (dg; localState.dgs[])
dg(v);
@@ -263,12 +264,13 @@ private template process(ResetLogic resetLogic) {
}
}
-private class SharedSenderInstState(Sender) : StopSource {
+private class SharedSenderInstState(Sender) {
import concurrency.slist;
import std.traits : ReturnType;
import std.sumtype;
alias Props = Properties!(Sender);
shared SList!(Props.DG) dgs;
+ shared StopSource stopSource;
SumType!(typeof(null), Props.InternalValue) value;
this() {
this.dgs = new shared SList!(Props.DG);
diff --git a/source/concurrency/operations/whenall.d b/source/concurrency/operations/whenall.d
index f6708ae..90de34b 100644
--- a/source/concurrency/operations/whenall.d
+++ b/source/concurrency/operations/whenall.d
@@ -117,12 +117,11 @@ private struct WhenAllOp(Receiver, Senders...) {
this(return Receiver receiver,
return Senders senders) @trusted scope return {
this.receiver = receiver;
- state = new WhenAllState!R();
static if (Senders.length > 1) {
foreach (i, Sender; Senders) {
ops[i] = senders[i].connect(
WhenAllReceiver!(Receiver, Sender.Value,
- R)(receiver, state, i, Senders.length));
+ R)(receiver, &state, i, Senders.length));
}
} else {
static if (!is(ArrayElement!(Senders).Value : void))
@@ -131,21 +130,21 @@ private struct WhenAllOp(Receiver, Senders...) {
foreach (i; 0 .. senders[0].length) {
ops[i] = senders[0][i].connect(
WhenAllReceiver!(Receiver, ArrayElement!(Senders).Value,
- R)(receiver, state, i, senders[0].length));
+ R)(receiver, &state, i, senders[0].length));
}
}
}
void start() @trusted nothrow scope {
- import concurrency.stoptoken : StopSource;
if (receiver.getStopToken().isStopRequested) {
receiver.setDone();
return;
}
- state.cb = receiver.getStopToken().onStop(
- cast(void delegate() nothrow @safe shared) &state.stop
- ); // butt ugly cast, but it won't take the second overload
+ auto token = receiver.getStopToken();
+ // butt ugly cast, but it won't take the second overload
+ state.cb.register(token, cast(void delegate() nothrow @safe shared) &state.stopSource.stop);
+
static if (Senders.length > 1) {
foreach (i, _; Senders) {
ops[i].start();
@@ -177,9 +176,10 @@ struct WhenAllSender(Senders...)
}
}
-private class WhenAllState(Value) : StopSource {
+private struct WhenAllState(Value) {
import concurrency.bitfield;
- StopCallback cb;
+ shared StopSource stopSource;
+ shared StopCallback cb;
static if (is(typeof(Value.values)))
Value value;
Throwable exception;
@@ -189,11 +189,11 @@ private class WhenAllState(Value) : StopSource {
private struct WhenAllReceiver(Receiver, InnerValue, Value) {
import core.atomic : atomicOp, atomicLoad, MemoryOrder;
Receiver receiver;
- WhenAllState!(Value) state;
+ WhenAllState!(Value)* state;
size_t senderIndex;
size_t senderCount;
auto getStopToken() {
- return StopToken(state);
+ return state.stopSource.token();
}
private bool isValueProduced(size_t state) {
@@ -232,7 +232,7 @@ private struct WhenAllReceiver(Receiver, InnerValue, Value) {
with (state.bitfield.update(Flags.doneOrError_produced, Counter.tick)) {
bool last = isLast(newState);
if (!isDoneOrErrorProduced(oldState))
- state.stop();
+ state.stopSource.stop();
if (last)
process(newState);
}
@@ -244,7 +244,7 @@ private struct WhenAllReceiver(Receiver, InnerValue, Value) {
if (!isDoneOrErrorProduced(oldState)) {
state.exception = exception;
release(); // must release before calling .stop
- state.stop();
+ state.stopSource.stop();
} else
release();
if (last)
diff --git a/source/concurrency/operations/withchild.d b/source/concurrency/operations/withchild.d
index 3ee5d38..01aba01 100644
--- a/source/concurrency/operations/withchild.d
+++ b/source/concurrency/operations/withchild.d
@@ -24,7 +24,7 @@ struct WithChildSender(SenderParent, SenderChild)
import concurrency.operations.stopon;
// ensure NRVO
auto op =
- whenAll(b.stopOn(receiver.getStopToken), a).stopOn(StopToken())
+ whenAll(b.stopOn(receiver.getStopToken), a).stopOn(shared StopToken())
.connect(receiver);
return op;
}
diff --git a/source/concurrency/operations/withstopsource.d b/source/concurrency/operations/withstopsource.d
index 93825f1..beaa7c8 100644
--- a/source/concurrency/operations/withstopsource.d
+++ b/source/concurrency/operations/withstopsource.d
@@ -7,26 +7,15 @@ import concurrency.stoptoken;
import concepts;
import std.traits;
-template withStopSource(Sender) {
- auto withStopSource(Sender sender, StopSource stopSource) {
- return SSSender!(Sender, StopSource)(sender, stopSource);
- }
-
- auto withStopSource(Sender sender, shared StopSource stopSource) @trusted {
- return SSSender!(Sender, StopSource)(sender, cast() stopSource);
- }
-
- auto withStopSource(Sender sender, ref InPlaceStopSource stopSource) @trusted {
- return SSSender!(Sender, InPlaceStopSource*)(sender, &stopSource);
- }
+// TODO: return scope?
+auto withStopSource(Sender)(Sender sender, ref shared StopSource stopSource) @trusted {
+ return SSSender!(Sender)(sender, &stopSource);
}
-private struct SSReceiver(Receiver, Value, OuterStopSource) {
+private struct SSReceiver(Receiver, Value) {
private {
Receiver receiver;
- OuterStopSource stopSource;
- StopSource combinedSource;
- StopCallback[2] cbs;
+ SSState* state;
}
static if (is(Value == void)) {
@@ -52,49 +41,65 @@ private struct SSReceiver(Receiver, Value, OuterStopSource) {
receiver.setError(e);
}
- auto getStopToken() nothrow @trusted scope {
- import core.atomic;
- if (this.combinedSource is null) {
- auto local = new StopSource();
- if (cas(&this.combinedSource, cast(StopSource) null, local)) {
- auto stop =
- cast(void delegate() shared nothrow @safe) &local.stop;
- cbs[0] = receiver.getStopToken().onStop(stop);
- cbs[1] = StopToken(stopSource).onStop(stop);
- if (atomicLoad(this.combinedSource) is null) {
- cbs[0].dispose();
- cbs[1].dispose();
- }
- } else {
- cbs[0].dispose();
- cbs[1].dispose();
- }
- }
-
- return StopToken(combinedSource);
+ auto getStopToken() nothrow @safe scope {
+ return state.combinedStopSource.token();
}
mixin ForwardExtensionPoints!receiver;
+
private void resetStopCallback() {
- import core.atomic;
- if (atomicExchange(&this.combinedSource, cast(StopSource) null)) {
- if (cbs[0])
- cbs[0].dispose();
- if (cbs[1])
- cbs[1].dispose();
+ state.left.dispose();
+ state.right.dispose();
+ }
+}
+
+struct SSState {
+ shared StopSource combinedStopSource;
+ shared StopCallback left;
+ shared StopCallback right;
+ ~this() @safe scope @nogc nothrow {}
+}
+
+struct SSOp(Receiver, Sender) {
+ SSState state;
+ alias Op = OpType!(Sender, SSReceiver!(Receiver, Sender.Value));
+ Op op;
+
+ @disable
+ this(ref return scope typeof(this) rhs);
+ @disable
+ this(this);
+ ~this() @trusted scope @nogc nothrow {}
+ this(Receiver receiver, shared StopSource* outerStopSource, Sender sender) @trusted {
+ auto token = receiver.getStopToken();
+ auto outerToken = outerStopSource.token();
+
+ state.left.register(token, cast(void delegate() @safe shared nothrow)&state.combinedStopSource.stop);
+ state.right.register(outerToken, cast(void delegate() @safe shared nothrow)&state.combinedStopSource.stop);
+
+ try {
+ op = sender.connect(SSReceiver!(Receiver, Sender.Value)(receiver, &state));
+ } catch (Exception e) {
+ state.left.dispose();
+ state.right.dispose();
+ throw e;
}
}
+
+ void start() @safe nothrow {
+ // because we start op only afterwards, we can be sure that the cb's are created beforehand
+ op.start();
+ }
}
-struct SSSender(Sender, StopSource) if (models!(Sender, isSender)) {
+struct SSSender(Sender) if (models!(Sender, isSender)) {
static assert(models!(typeof(this), isSender));
alias Value = Sender.Value;
Sender sender;
- StopSource stopSource;
- auto connect(Receiver)(return Receiver receiver) @safe return scope {
- alias R = SSReceiver!(Receiver, Sender.Value, StopSource);
+ shared StopSource* stopSource;
+ auto connect(Receiver)(return Receiver receiver) @trusted return scope {
// ensure NRVO
- auto op = sender.connect(R(receiver, stopSource));
+ auto op = SSOp!(Receiver, Sender)(receiver, stopSource, sender);
return op;
}
}
diff --git a/source/concurrency/operations/withstoptoken.d b/source/concurrency/operations/withstoptoken.d
index de49c45..09733a6 100644
--- a/source/concurrency/operations/withstoptoken.d
+++ b/source/concurrency/operations/withstoptoken.d
@@ -8,7 +8,7 @@ import concepts;
import std.traits;
import concurrency.utils : isThreadSafeFunction;
-deprecated("function passed is not shared @safe delegate or @safe function.")
+deprecated("function passed is not @safe shared delegate or @safe function.")
auto withStopToken(Sender, Fun)(Sender sender, Fun fun)
if (!isThreadSafeFunction!Fun) {
return STSender!(Sender, Fun)(sender, fun);
@@ -41,7 +41,7 @@ private struct STReceiver(Receiver, Value, Fun) {
} else {
import std.typecons : isTuple;
enum isExpandable = isTuple!Value && __traits(compiles, {
- fun(StopToken.init, Value.init.expand);
+ fun(shared(StopToken).init, Value.init.expand);
});
void setValue(Value value) @safe {
static if (is(ReturnType!Fun == void)) {
diff --git a/source/concurrency/receiver.d b/source/concurrency/receiver.d
index ced0818..4c7df1a 100644
--- a/source/concurrency/receiver.d
+++ b/source/concurrency/receiver.d
@@ -17,11 +17,6 @@ void checkReceiver(T)() {
enum isReceiver(T) = is(typeof(checkReceiver!T));
-auto getStopToken(Receiver)(Receiver r) nothrow @safe if (isReceiver!Receiver) {
- import concurrency.stoptoken : NeverStopToken;
- return NeverStopToken();
-}
-
mixin template ForwardExtensionPoints(alias receiver) {
auto getStopToken() nothrow @safe {
return receiver.getStopToken();
@@ -43,7 +38,7 @@ interface ReceiverObjectBase(T) {
void setValue(T value = T.init) @safe;
void setDone() nothrow @safe;
void setError(Throwable e) nothrow @safe;
- StopToken getStopToken() nothrow @safe;
+ shared(StopToken) getStopToken() nothrow @safe;
SchedulerObjectBase getScheduler() scope nothrow @safe;
}
diff --git a/source/concurrency/scheduler.d b/source/concurrency/scheduler.d
index e754db9..d09d20a 100644
--- a/source/concurrency/scheduler.d
+++ b/source/concurrency/scheduler.d
@@ -23,6 +23,33 @@ interface SchedulerObjectBase {
SenderObjectBase!void scheduleAfter(Duration d) @safe;
}
+
+// We can pull the LocalThreadExecutor (and its schedule/scheduleAfter) out into a specialized context.
+// Just like we did with the iouring context
+
+// The interesting bit is that the syncWait algorithm then might be inferred as @nogc
+
+// The question remains how we would want to integrate these.
+// With iouring we created a runner that would take a sender and would inject the scheduler and allow itself to steal the current thread.
+
+// That last part is important, we don't want to spawn a thread just to run timers, we can do it perfectly fine on the current thread.
+// Same with iouring or other event loops.
+
+// That said, we can, if we want to, move the event loop to another thread.
+
+// The only thing we can't do is cross schedule timers from one thread to another.
+// Well, that is not true, we can create two context objects that expose a Scheduler
+
+
+
+
+
+
+// Guess we just have to write it and see....
+
+// Dietmar Kuhl used a iocontext with a run function that allows running it on the current thread.
+// In rant I had the iocontext's runner return a sender so you could await that.
+
class SchedulerObject(S) : SchedulerObjectBase {
import concurrency.sender : toSenderObject;
S scheduler;
@@ -50,7 +77,7 @@ enum TimerTrigger {
cancel
}
-alias TimerDelegate = void delegate(TimerTrigger) shared @safe;
+alias TimerDelegate = void delegate(TimerTrigger) @safe shared;
import concurrency.timingwheels : ListElement;
alias Timer = ListElement!(TimerDelegate);
@@ -64,6 +91,7 @@ auto localThreadScheduler() {
alias LocalThreadScheduler = typeof(localThreadScheduler());
struct SchedulerAdapter(Worker) {
+ static assert(models!(typeof(this), isScheduler));
import concurrency.receiver : setValueOrError;
import concurrency.executor : VoidDelegate;
import core.time : Duration;
@@ -101,7 +129,7 @@ struct SchedulerAdapter(Worker) {
return ScheduleSender(worker);
}
- auto schedule() shared @trusted {
+ auto schedule() @trusted shared {
return (cast() this).schedule();
}
@@ -109,7 +137,7 @@ struct SchedulerAdapter(Worker) {
return ScheduleAfterSender!(Worker)(worker, dur);
}
- auto scheduleAfter(Duration dur) shared @trusted {
+ auto scheduleAfter(Duration dur) @trusted shared {
return (cast() this).scheduleAfter(dur);
}
}
@@ -117,7 +145,7 @@ struct SchedulerAdapter(Worker) {
struct ScheduleAfterOp(Worker, Receiver) {
import std.traits : ReturnType;
import concurrency.bitfield : SharedBitField;
- import concurrency.stoptoken : StopCallback, onStop;
+ import concurrency.stoptoken : StopCallback;
import concurrency.receiver : setValueOrError;
enum Flags {
@@ -127,26 +155,25 @@ struct ScheduleAfterOp(Worker, Receiver) {
setup = 0x4,
}
- // alias Timer = ReturnType!(Worker.addTimer);
Worker worker;
Duration dur;
Receiver receiver;
Timer timer;
- StopCallback stopCb;
+ shared StopCallback stopCb;
shared SharedBitField!Flags flags;
@disable
this(ref return scope typeof(this) rhs);
@disable
this(this);
+ // ~this() @safe scope {}
void start() @trusted scope nothrow {
if (receiver.getStopToken().isStopRequested) {
receiver.setDone();
return;
}
- stopCb =
- receiver.getStopToken()
- .onStop(cast(void delegate() nothrow @safe shared) &stop);
+ auto token = receiver.getStopToken();
+ stopCb.register(token, cast(void delegate() nothrow @safe shared) &stop);
try {
timer.userdata = cast(void delegate(TimerTrigger) @safe shared) &trigger;
@@ -312,7 +339,8 @@ auto withBaseScheduler(T, P)(auto ref T t, auto ref P p) {
static assert(
false,
"Neither " ~ T.stringof ~ " nor " ~ P.stringof
- ~ " are full schedulers. Chain the sender with a .withScheduler and ensure the Scheduler passes the isScheduler check."
+ ~ " are full schedulers. Chain the sender with a .withScheduler"
+ ~ " and ensure the Scheduler passes the isScheduler check."
);
}
diff --git a/source/concurrency/sender.d b/source/concurrency/sender.d
index 951ff87..7fea57e 100644
--- a/source/concurrency/sender.d
+++ b/source/concurrency/sender.d
@@ -68,8 +68,8 @@ void checkSender(T)() @safe {
void setError(Throwable e) @safe nothrow {}
- StopToken getStopToken() @safe nothrow {
- return StopToken.init;
+ shared(StopToken) getStopToken() @safe nothrow {
+ return shared(StopToken).init;
}
Scheduler getScheduler() @safe nothrow {
@@ -235,7 +235,7 @@ interface SenderObjectBase(T) {
receiver.setError(e);
}
- StopToken getStopToken() nothrow {
+ shared(StopToken) getStopToken() nothrow {
return receiver.getStopToken();
}
@@ -449,9 +449,9 @@ struct PromiseSenderOp(T, Receiver) {
alias InternalValue = Sender.InternalValue;
shared Sender parent;
Receiver receiver;
- StopCallback cb;
+ shared StopCallback cb;
shared SharedBitField!Flags bitfield;
- @disable
+ @disable
this(ref return scope typeof(this) rhs);
@disable
this(this);
@@ -480,8 +480,10 @@ struct PromiseSenderOp(T, Receiver) {
bool triggeredInline = parent.add(&(cast(shared) this).onValue);
// if triggeredInline there is no point in setting up the stop callback
- if (!triggeredInline)
- cb = receiver.getStopToken.onStop(&(cast(shared) this).onStop);
+ if (!triggeredInline) {
+ auto stopToken = receiver.getStopToken;
+ cb.register(stopToken, &(cast(shared) this).onStop);
+ }
with (bitfield.add(Flags.setup)) {
if (has(Flags.stop)) {
@@ -523,8 +525,7 @@ struct PromiseSenderOp(T, Receiver) {
// being executed.
// This means that when it completes we are the only one
// calling the receiver's termination functions.
- if (cb)
- cb.dispose();
+ cb.dispose();
value.match!((Sender.ValueRep v) {
try {
static if (is(Sender.Value == void))
@@ -639,7 +640,7 @@ class Promise(T) {
this.dgs = new shared SList!DG;
}
- auto sender() shared @safe nothrow {
+ auto sender() @safe shared nothrow {
return shared PromiseSender!T(this);
}
}
diff --git a/source/concurrency/signal.d b/source/concurrency/signal.d
index 97acfb5..689c2f8 100644
--- a/source/concurrency/signal.d
+++ b/source/concurrency/signal.d
@@ -2,47 +2,28 @@ module concurrency.signal;
import concurrency.stoptoken;
-shared(StopSource) globalStopSource() @trusted {
+ref shared(StopSource) globalStopSource() @trusted {
import core.atomic : atomicLoad, cas;
- if (globalSource.atomicLoad is null) {
- import concurrency.utils : dynamicLoad;
+ if (globalSourcePtr.atomicLoad is null) {
auto ptr = getGlobalStopSourcePointer();
- if (auto source = (*ptr).atomicLoad) {
- globalSource = source;
- return globalSource;
- }
-
- auto tmp = new shared StopSource();
- if (ptr.cas(cast(shared StopSource) null, tmp)) {
- setupCtrlCHandler(tmp);
- globalSource = tmp;
+ shared StopSource* empty = null;
+ if (ptr.cas(empty, &globalSource)) {
+ setupCtrlCHandler(globalSource);
+ globalSourcePtr = &globalSource;
} else
- globalSource = (*ptr).atomicLoad;
+ globalSourcePtr = (*ptr).atomicLoad;
}
- return globalSource;
-}
-
-/// Returns true if first to set (otherwise it is ignored)
-bool setGlobalStopSource(shared StopSource stopSource) @safe {
- import core.atomic : cas;
- auto ptr = getGlobalStopSourcePointer();
- if (!ptr.cas(cast(shared StopSource) null, stopSource))
- return false;
- globalSource = stopSource;
- return true;
+ return *globalSourcePtr;
}
/// Sets the stopSource to be called when receiving an interrupt
-void setupCtrlCHandler(shared StopSource stopSource) @trusted {
+void setupCtrlCHandler(ref shared StopSource stopSource) @trusted {
import core.atomic;
- if (stopSource is null)
- return;
-
- auto old = atomicExchange(&SignalHandler.signalStopSource, stopSource);
+ auto old = atomicExchange(&SignalHandler.signalStopSource, &stopSource);
if (old !is null)
return;
@@ -71,14 +52,15 @@ void setupCtrlCHandler(shared StopSource stopSource) @trusted {
}
}
+private static shared StopSource* globalSourcePtr;
private static shared StopSource globalSource;
// a mixin for OS-specific visibility
private mixin template globalStopSourcePointerImpl() {
// should not be called directly by usercode, call `getGlobalStopSourcePointer` instead
pragma(inline, false)
- extern(C) shared(StopSource*) concurrency_globalStopSourcePointer() @safe {
- return &globalSource;
+ extern(C) shared(StopSource*)* concurrency_globalStopSourcePointer() @safe {
+ return &globalSourcePtr;
}
}
@@ -91,7 +73,7 @@ version(Windows) {
// their own definition.
private mixin globalStopSourcePointerImpl;
- private shared(StopSource*) getGlobalStopSourcePointer() @safe {
+ private shared(StopSource*)* getGlobalStopSourcePointer() @safe {
import concurrency.utils : dynamicLoad;
return dynamicLoad!concurrency_globalStopSourcePointer()();
}
@@ -175,7 +157,7 @@ struct SignalHandler {
SignalHandler.notify(ABORT);
}
- private static shared StopSource signalStopSource;
+ private static shared StopSource* signalStopSource;
private static shared Thread handlerThread;
private static void launchHandlerThread() {
if (handlerThread.atomicLoad !is null)
diff --git a/source/concurrency/stoptoken.d b/source/concurrency/stoptoken.d
index a874b13..119587e 100644
--- a/source/concurrency/stoptoken.d
+++ b/source/concurrency/stoptoken.d
@@ -1,236 +1,201 @@
module concurrency.stoptoken;
-// originally this code is from https://github.com/josuttis/jthread by Nicolai Josuttis
-// it is licensed under the Creative Commons Attribution 4.0 Internation License http://creativecommons.org/licenses/by/4.0
-
-struct InPlaceStopSource {
- @disable this(ref return scope typeof(this) rhs);
- private stop_state state;
- bool stop() nothrow @safe {
- return state.request_stop();
- }
+// Cancellation is an important concept when running asynchronous tasks.
+// When an asynchronous operation is no longer needed it is desirable to
+// cancel it to avoid doing unnecessary work, potentionally freeing system
+// resources and maintaining responsiveness in the process.
- bool stop() nothrow @safe shared {
- with (assumeThreadSafe) {
- return stop();
- }
- }
+// There are several reasons why an asynchronous operation might become
+// irrelevant, including:
- bool isStopRequested() nothrow @safe @nogc {
- return state.is_stop_requested();
- }
+// - An error in one part of an application might render the results of
+// other ongoing asynchronous tasks irrelevant.
- bool isStopRequested() nothrow @safe @nogc shared {
- with (assumeThreadSafe) {
- return isStopRequested();
- }
- }
+// - Clients or end-users might want to abort an earlier request they made.
- /// resets the internal state, only do this if you are sure nothing else is looking at this...
- void reset(this t)() @system @nogc {
- this.state = stop_state();
- }
+// - Outstanding work might need to be cancelled due to stale data.
- private ref assumeThreadSafe() @trusted @nogc nothrow shared {
- return cast()this;
- }
-}
+// - Work might become irrelevant due to errors in external systems.
-class StopSource {
- private InPlaceStopSource source;
+// - As part of control flow in an asynchronous algorithm. Similar to how
+// you might have an `break` or early `return` in a synchronous piece of
+// code.
- bool stop() nothrow @safe {
- return source.stop();
- }
+// This module implements cooperative cancellation by providing a StopSource,
+// StopToken and StopCallback.
- bool stop() nothrow @trusted shared {
- return source.stop();
- }
+// - the StopSource represents the source of a cancellation request.
+// - the StopToken is used to determine if a cancellation request has been
+// initiated.
+// - the StopCallback is invoked when a cancellation request has been
+// initiated.
- bool isStopRequested() nothrow @safe @nogc {
- return source.isStopRequested;
- }
+// Implementation details:
- bool isStopRequested() nothrow @trusted @nogc shared {
- return source.isStopRequested;
- }
+// Allocations can have a non-neglible overhead when running many short-lived
+// asynchronous tasks. For this reason these objects are designed to be used
+// in-place on the stack.
- /// resets the internal state, only do this if you are sure nothing else is looking at this...
- void reset(this t)() @system @nogc {
- return source.reset();
- }
-}
+// To ensure program safety and correctness we need to uphold that a
+// `StopSource` outlives any associated `StopToken` or `StopCallback`,
+// otherwise we might open ourselves to use-after-free errors.
-struct StopToken {
- package(concurrency) stop_state* state;
- this(StopSource source) nothrow @safe @nogc {
- if (source !is null) {
- this.state = &source.source.state;
- isStopPossible = true;
- }
- }
+// We achieve that by:
- this(shared StopSource source) nothrow @trusted @nogc {
- this(cast()source);
- }
+// - having a StopCallback deregister itself from a StopSource on destruction
+// - having a StopSource deregister any StopCallbacks still registered on
+// destruction
+// - relying on `scope` StopTokens to ensure they don't outlive the
+// StopSource
+// - disabling copy and assignment constructors for the StopSource and
+// StopToken
+// - relying on dip1000
- this(ref stop_state state) nothrow @trusted @nogc {
- isStopPossible = true;
- this.state = &state;
- }
+/// A StopSource represents the source of a cancellation request. You can think
+/// of it as the entity that initiates the request for cancellation.
+/// Once a stop is requested, it cannot be withdrawn. Additional stop requests
+/// have no effect.
+struct StopSource {
+ @disable this(ref return scope StopSource rhs) @safe;
+ @disable this(ref return scope shared StopSource rhs) shared @safe;
+ public shared StopState state;
- this (ref InPlaceStopSource stopSource) nothrow @trusted @nogc {
- this(stopSource.state);
+ ~this() nothrow @safe @nogc shared scope {
+ assertNoCallbacks();
}
- this (InPlaceStopSource* stopSource) nothrow @trusted @nogc {
- if (stopSource !is null) {
- isStopPossible = true;
- this.state = &stopSource.state;
- }
+ /// Returns an associated StopToken
+ shared(StopToken) token() nothrow @safe @nogc shared return {
+ return shared StopToken(this);
}
- bool isStopRequested() nothrow @safe @nogc {
- return isStopPossible && state.is_stop_requested();
+ /// Trigger a cancellation request. This will trigger any StopCallback that was
+ /// registered through a StopToken associated with this StopSource.
+ /// Additionally any calls to the `isStopRequested` method on an associated
+ /// StopToken will return true.
+ ///
+ /// Returns true if this is the first cancellation request.
+ bool stop() nothrow @safe shared scope {
+ return state.requestStop();
}
- const bool isStopPossible;
-}
-
-struct NeverStopToken {
- enum isStopRequested = false;
- enum isStopPossible = false;
-}
-
-StopCallback onStop(
- StopSource stopSource,
- void delegate() nothrow @safe shared callback
-) nothrow @safe {
- auto cb = new StopCallback(callback);
- return onStop(stopSource, cb);
-}
-
-StopCallback onStop(StopSource stopSource,
- void function() nothrow @safe callback) nothrow @trusted {
- import std.functional : toDelegate;
- return stopSource
- .onStop(cast(void delegate() nothrow @safe shared) callback.toDelegate);
-}
+ /// Returns true if a cancellation request has been triggered.
+ bool isStopRequested() nothrow @safe @nogc shared scope {
+ return state.isStopRequested();
+ }
-StopCallback onStop(StopToken)(
- StopToken stopToken,
- void delegate() nothrow @safe shared callback
-) nothrow @safe {
- auto cb = new StopCallback(callback);
+ /// Resets the internal state. To ensure no dangling state it will first trigger
+ /// any registered StopCallbacks.
+ void reset() nothrow @safe shared scope {
+ stop();
+ this.state = StopState();
+ }
- onStop(stopToken, cb);
+ /// Primarily used in unittests or debug builds to ensure there are no
+ /// dangling callbacks.
+ /// Because both the StopSource and StopCallbacks are to be placed
+ /// on the stack, a programming error can result in hard to debug
+ /// use-after-free errors.
+ /// This method allows to catch those errors before they becomes
+ /// much harder to debug.
+ void assertNoCallbacks() nothrow @safe @nogc shared scope {
+ state.assertEmpty();
+ }
- return cb;
+ @disable void opAssign(shared StopSource rhs) nothrow @safe @nogc shared;
+ @disable void opAssign(ref shared StopSource rhs) nothrow @safe @nogc shared;
}
-StopCallback onStop(StopToken)(
- StopToken stopToken,
- void function() nothrow @safe callback
-) nothrow @trusted {
- import std.functional : toDelegate;
- return stopToken
- .onStop(cast(void delegate() nothrow @safe shared) callback.toDelegate);
-}
+/// A StopToken is associated with a StopSource and is used to determine
+/// whether a cancellation request has been initiated.
+/// It is up to the task to check this token and react accordingly.
+///
+/// Cancellation can be checked either by calling `isStopRequested` or
+/// registering a StopCallback. The StopCallback will be called when the
+/// underlying StopSource is triggered.
+///
+/// A StopToken can be obtained by calling the `token` method on a StopSource.
+struct StopToken {
+ private shared StopState* state;
-StopCallback onStop(StopToken)(StopToken stopToken,
- StopCallback cb) nothrow @safe {
- if (stopToken.isStopPossible) {
- stopToken.onStop(cb.callback);
+ private this(return ref shared StopSource stopSource) nothrow @safe @nogc shared {
+ this.state = &stopSource.state;
}
- return cb;
-}
-void onStop(StopToken)(StopToken stopToken,
- ref InPlaceStopCallback cb) nothrow @safe {
- if (stopToken.isStopPossible) {
- (*stopToken.state).onStop(cb);
+ /// Returns true if a cancellation request has been initiated with the associated StopSource
+ bool isStopRequested() nothrow @safe @nogc scope shared {
+ return state && state.isStopRequested();
}
}
-StopCallback onStop(StopSource stopSource, StopCallback cb) nothrow @safe {
- onStop(stopSource.source.state, cb.callback);
- return cb;
-}
-
-void onStop(StopSource stopSource, ref InPlaceStopCallback cb) nothrow @safe {
- onStop(stopSource.source.state, cb);
-}
-
-void onStop(ref stop_state state, ref InPlaceStopCallback cb) nothrow @trusted { // TODO: @safe
- if (state.try_add_callback(cb, true))
- cb.state = &state;
-}
+/// A StopCallback can be associated with a StopToken, and contains a callback
+/// that will be invoked when a stop request in the underlying StopSource's
+/// is triggered.
+///
+/// There is no guarantee which execution context the callback is invoked on.
+struct StopCallback {
+ @disable this(ref return scope shared StopCallback rhs);
+ @disable this(ref return scope StopCallback rhs);
+
+ ~this() @safe scope @nogc nothrow shared {
+ dispose();
+ }
+
+ /// Register the StopCallback with a StopToken and a specified callback.
+ ///
+ /// The supplied callback will be invoked exactly once when the StopToken
+ /// is triggered. That invocation might happen before this function returns.
+ ///
+ /// A subsequent call to `register` will first ensure any existing registered
+ /// callback is unregistered. Note that the existing callback might still get
+ /// invoked if the existing StopToken is triggered at the same time.
+ ///
+ /// In cases where this is unwanted you can call `dispose` yourself before
+ /// registering a new callback.
+ void register(return ref shared StopToken token, void delegate() nothrow @safe shared callback) nothrow @safe scope return shared {
+ dispose();
+
+ /// TODO: before we continue here we need to be 100% sure the previous callback
+ /// isn't invoked, or ensure we wait until it is done.
+
+ this.callback = callback;
-struct InPlaceStopCallback {
- @disable this(ref return scope typeof(this) rhs);
+ if (token.state && token.state.tryAddCallback(cast(shared)this))
+ this.state = token.state;
+ }
- void dispose() nothrow @trusted @nogc {
+ /// Dispose the stopcallback, deregistering it from an associated StopToken,
+ /// if any.
+ ///
+ /// The registered callback could still be triggered while this function is
+ /// executing, but it will have been ran to completion before it returns.
+ void dispose() shared nothrow @trusted @nogc scope {
import core.atomic : cas;
+ // TODO: might have to reset prev or next if state is null
if (state is null)
return;
auto local = state;
- static if (__traits(compiles, cas(&state, local, null))) {
- if (!cas(&state, local, null)) {
- assert(state is null);
- return;
- }
- } else {
- if (!cas(cast(shared) &state, cast(shared) local, null)) {
- assert(state is null);
- return;
- }
+ if (!cas(&state, local, null)) {
+ assert(state is null);
+ return;
}
-
- local.remove_callback(this);
- }
-
- void dispose() shared nothrow @trusted @nogc {
- (cast() this).dispose();
- }
-
- this(void delegate() nothrow shared @safe callback) nothrow @safe @nogc {
- this.callback = callback;
+ local.removeCallback(this);
}
+ @disable void opAssign(ref shared StopCallback rhs) nothrow @safe @nogc shared;
+ @disable void opAssign(shared StopCallback rhs) nothrow @safe @nogc shared;
private:
+ void delegate() nothrow @safe shared callback;
+ shared StopState* state;
- void delegate() nothrow shared @safe callback;
- stop_state* state;
-
- InPlaceStopCallback* next_ = null;
- InPlaceStopCallback** prev_ = null;
- bool* isRemoved_ = null;
+ shared StopCallback* next = null;
+ shared StopCallback** prev = null;
+ shared bool* isRemoved = null;
shared bool callbackFinishedExecuting = false;
-
- void execute() nothrow @safe {
- callback();
- }
}
-class StopCallback {
- this(void delegate() nothrow shared @safe callback) nothrow @safe @nogc {
- this.callback = InPlaceStopCallback(callback);
- }
-
- void dispose() nothrow @trusted @nogc {
- callback.dispose();
- }
-
- void dispose() shared nothrow @trusted @nogc {
- callback.dispose();
- }
-private:
-
- InPlaceStopCallback callback;
-}
-
-private void spin_yield() nothrow @trusted @nogc {
+private void spinYield() nothrow @trusted @nogc {
// TODO: could use the pause asm instruction
// it is available in LDC as intrinsic... but not in DMD
import core.thread : Thread;
@@ -238,71 +203,39 @@ private void spin_yield() nothrow @trusted @nogc {
Thread.yield();
}
-private struct stop_state {
+/// The StopState is the internal state object used in the StopSource.
+/// It contains the linked list of StopCallbacks and the methods to
+/// add, remove and trigger the callbacks in a thread-safe manner.
+///
+/// It is based on the code from https://github.com/josuttis/jthread
+/// by Nicolai Josuttis, licensed under the Creative Commons Attribution
+/// 4.0 Internation License http://creativecommons.org/licenses/by/4.0
+private struct StopState {
import core.thread : Thread;
import core.atomic : atomicStore, atomicLoad, MemoryOrder, atomicOp, atomicFetchAdd, atomicFetchSub;
+ import core.atomic : casWeak;
- static if (__traits(compiles, () {
- import core.atomic : casWeak;
- }) && __traits(compiles, () {
- import core.internal.atomic
- : atomicCompareExchangeWeakNoResult;
- }))
- import core.atomic : casWeak;
- else
- auto casWeak(MemoryOrder M1, MemoryOrder M2, T, V1, V2)(
- T* here,
- V1 ifThis,
- V2 writeThis
- ) pure nothrow @nogc @safe {
- import core.atomic : cas;
-
- static if (__traits(compiles,
- cas!(M1, M2)(here, ifThis, writeThis)))
- return cas!(M1, M2)(here, ifThis, writeThis);
- else
- return cas(here, ifThis, writeThis);
- }
-
-public:
- void add_token_reference() nothrow @safe @nogc {
- state_.atomicFetchAdd!(MemoryOrder.raw)(token_ref_increment);
- }
-
- void remove_token_reference() nothrow @safe @nogc {
- state_.atomicFetchSub!(MemoryOrder.acq_rel)(token_ref_increment);
- }
-
- void add_source_reference() nothrow @safe @nogc {
- state_.atomicFetchAdd!(MemoryOrder.raw)(source_ref_increment);
- }
-
- void remove_source_reference() nothrow @safe @nogc {
- state_.atomicFetchSub!(MemoryOrder.acq_rel)(source_ref_increment);
- }
-
- bool request_stop() nothrow @safe {
- if (!try_lock_and_signal_until_signalled()) {
+ bool requestStop() nothrow @safe shared scope {
+ if (!tryLockAndSignalUntilSignalled()) {
// Stop has already been requested.
return false;
}
// Set the 'stop_requested' signal and acquired the lock.
+ storeSignallingThread();
- signallingThread_ = Thread.getThis();
-
- while (head_ !is null) {
+ while (head !is null) {
// Dequeue the head of the queue
- auto cb = head_;
- head_ = cb.next_;
- const bool anyMore = head_ !is null;
+ auto cb = head;
+ head = cb.next;
+ const bool anyMore = head !is null;
if (anyMore) {
- (() @trusted => head_.prev_ =
- &head_)(); // compiler 2.091.1 complains "address of variable this assigned to this with longer lifetime". But this is this, how can it have a longer lifetime...
+ // @trusted due to error "`scope` applies to first indirection only"
+ (() @trusted => head.prev = &head)();
}
// Mark this item as removed from the list.
- cb.prev_ = null;
+ cb.prev = null;
// Don't hold lock while executing callback
// so we don't block other threads from deregistering callbacks.
@@ -311,18 +244,18 @@ public:
// TRICKY: Need to store a flag on the stack here that the callback
// can use to signal that the destructor was executed inline
// during the call. If the destructor was executed inline then
- // it's not safe to dereference cb after execute() returns.
+ // it's not safe to dereference cb after callback() returns.
// If the destructor runs on some other thread then the other
// thread will block waiting for this thread to signal that the
// callback has finished executing.
- bool isRemoved = false;
- (() @trusted => cb.isRemoved_ =
+ shared bool isRemoved = false;
+ (() @trusted => cb.isRemoved =
&isRemoved)(); // the pointer to the stack here is removed 3 lines down.
- cb.execute();
+ cb.callback();
if (!isRemoved) {
- cb.isRemoved_ = null;
+ cb.isRemoved = null;
cb.callbackFinishedExecuting
.atomicStore!(MemoryOrder.rel)(true);
}
@@ -343,67 +276,56 @@ public:
return true;
}
- bool is_stop_requested() nothrow @safe @nogc {
- return is_stop_requested(state_.atomicLoad!(MemoryOrder.acq));
- }
-
- bool is_stop_requestable() nothrow @safe @nogc {
- return is_stop_requestable(state_.atomicLoad!(MemoryOrder.acq));
+ bool isStopRequested() nothrow @safe @nogc shared scope {
+ return isStopRequested(state.atomicLoad!(MemoryOrder.acq));
}
- bool try_add_callback(ref InPlaceStopCallback cb,
- bool incrementRefCountIfSuccessful) nothrow @trusted {
+ bool tryAddCallback(ref return scope shared StopCallback cb) nothrow @trusted shared {
ulong oldState;
do {
goto load_state;
do {
- spin_yield();
+ spinYield();
load_state:
- oldState = state_.atomicLoad!(MemoryOrder.acq);
- if (is_stop_requested(oldState)) {
- cb.execute();
- return false;
- } else if (!is_stop_requestable(oldState)) {
+ oldState = state.atomicLoad!(MemoryOrder.acq);
+ if (isStopRequested(oldState)) {
+ cb.callback();
return false;
}
- } while (is_locked(oldState));
+ } while (isLocked(oldState));
} while (!casWeak!(MemoryOrder.acq, MemoryOrder.acq)(
- &state_, oldState, oldState | locked_flag));
+ &state, oldState, oldState | lockedFlag));
// Push callback onto callback list.
- cb.next_ = head_;
- if (cb.next_ !is null) {
- cb.next_.prev_ = &cb.next_;
+ cb.next = head;
+ if (cb.next !is null) {
+ cb.next.prev = &cb.next;
}
() @trusted {
- cb.prev_ = &head_;
+ cb.prev = &head;
}();
- head_ = &cb;
+ head = &cb;
- if (incrementRefCountIfSuccessful) {
- unlock_and_increment_token_ref_count();
- } else {
- unlock();
- }
+ unlock();
// Successfully added the callback.
return true;
}
- void remove_callback(ref InPlaceStopCallback cb) nothrow @safe @nogc {
+ void removeCallback(ref shared StopCallback cb) nothrow @safe @nogc shared {
lock();
- if (cb.prev_ !is null) {
+ if (cb.prev !is null) {
// Still registered, not yet executed
// Just remove from the list.
- *cb.prev_ = cb.next_;
- if (cb.next_ !is null) {
- cb.next_.prev_ = cb.prev_;
+ *cb.prev = cb.next;
+ if (cb.next !is null) {
+ cb.next.prev = cb.prev;
}
- unlock_and_decrement_token_ref_count();
+ unlock();
return;
}
@@ -413,94 +335,119 @@ public:
// Callback has either already executed or is executing
// concurrently on another thread.
- if (signallingThread_ is Thread.getThis()) {
+ if (isSignallingThread()) {
// Callback executed on this thread or is still currently executing
// and is deregistering itself from within the callback.
- if (cb.isRemoved_ !is null) {
- // Currently inside the callback, let the request_stop() method
+ if (cb.isRemoved !is null) {
+ // Currently inside the callback, let the requestStop() method
// know the object is about to be destructed and that it should
// not try to access the object when the callback returns.
- *cb.isRemoved_ = true;
+ *cb.isRemoved = true;
}
} else {
// Callback is currently executing on another thread,
// block until it finishes executing.
while (!cb.callbackFinishedExecuting.atomicLoad!(MemoryOrder.acq)) {
- spin_yield();
+ spinYield();
}
}
+ }
- remove_token_reference();
+ bool isFinished(ref shared StopCallback cb) nothrow @safe @nogc shared {
+ return cb.callbackFinishedExecuting.atomicLoad!(MemoryOrder.acq);
}
-private:
- static bool is_locked(ulong state) nothrow @safe @nogc {
- return (state & locked_flag) != 0;
+ void assertEmpty() @safe shared @nogc nothrow scope {
+ // Return early if head is zero.
+ if (head.atomicLoad!(MemoryOrder.acq) is null) {
+ return;
+ }
+
+ // Otherwise we have to do a bit of cleanup before asserting.
+ // `assertEmpty` is typically called during tests or debug builds only,
+ // right before destroying the StopSource and its StopState.
+ //
+ // In case it is violated we can't just assert however. Both StopCallbacks
+ // and StopSources sit on the stack and callbacks must be deregistered
+ // before either one is deconstructed, otherwise the callback might try
+ // to deregister itself from a dangling StopSource, potentially from
+ // another thread or fiber.
+ //
+ // This can result in complex to debug use-after-free errors.
+ // In order to reduce that we clean up those callbacks before asserting.
+ //
+ // There is no guarantee other callbacks won't be registered concurrently,
+ // either in between unlock and assert, or even afterwards, but this is
+ // primarily to be used for detecting errors to aid throubleshooting,
+ // not as a failsafe mechanism.
+ lock();
+ shared StopState* blank = null;
+ while (head !is null) {
+ // Go through all pointers and dispose, this is to
+ // avoid a segfault when doing the assert.
+ auto next = head.next;
+ atomicStore(head.state, blank);
+ head = next;
+ }
+ unlock();
+ assert(false, "StopSource has lingering callbacks");
}
- static bool is_stop_requested(ulong state) nothrow @safe @nogc {
- return (state & stop_requested_flag) != 0;
+ static bool isLocked(ulong state) nothrow @safe @nogc {
+ return (state & lockedFlag) != 0;
}
- static bool is_stop_requestable(ulong state) nothrow @safe @nogc {
- // Interruptible if it has already been interrupted or if there are
- // still interrupt_source instances in existence.
- return is_stop_requested(state) || (state >= source_ref_increment);
+ static bool isStopRequested(ulong state) nothrow @safe @nogc {
+ return (state & stopRequestedFlag) != 0;
}
- bool try_lock_and_signal_until_signalled() nothrow @safe @nogc {
+ bool tryLockAndSignalUntilSignalled() nothrow @safe @nogc shared scope {
ulong oldState;
do {
- oldState = state_.atomicLoad!(MemoryOrder.acq);
- if (is_stop_requested(oldState))
+ oldState = state.atomicLoad!(MemoryOrder.acq);
+ if (isStopRequested(oldState))
return false;
- while (is_locked(oldState)) {
- spin_yield();
- oldState = state_.atomicLoad!(MemoryOrder.acq);
- if (is_stop_requested(oldState))
+ while (isLocked(oldState)) {
+ spinYield();
+ oldState = state.atomicLoad!(MemoryOrder.acq);
+ if (isStopRequested(oldState))
return false;
}
} while (!casWeak!(MemoryOrder.seq, MemoryOrder.acq)(
- &state_, oldState,
- oldState | stop_requested_flag | locked_flag));
+ &state, oldState,
+ oldState | stopRequestedFlag | lockedFlag));
return true;
}
- void lock() nothrow @safe @nogc {
+ void lock() nothrow @safe @nogc shared scope {
ulong oldState;
do {
- oldState = state_.atomicLoad!(MemoryOrder.raw);
- while (is_locked(oldState)) {
- spin_yield();
- oldState = state_.atomicLoad!(MemoryOrder.raw);
+ oldState = state.atomicLoad!(MemoryOrder.raw);
+ while (isLocked(oldState)) {
+ spinYield();
+ oldState = state.atomicLoad!(MemoryOrder.raw);
}
} while (!casWeak!(MemoryOrder.acq, MemoryOrder.raw)(
- (&state_), oldState, oldState | locked_flag));
+ (&state), oldState, oldState | lockedFlag));
}
- void unlock() nothrow @safe @nogc {
- state_.atomicFetchSub!(MemoryOrder.rel)(locked_flag);
+ void unlock() nothrow @safe @nogc shared scope {
+ state.atomicFetchSub!(MemoryOrder.rel)(lockedFlag);
}
- void unlock_and_increment_token_ref_count() nothrow @safe @nogc {
- state_.atomicFetchSub!(MemoryOrder.rel)(locked_flag - token_ref_increment);
- }
+ enum stopRequestedFlag = 1L;
+ enum lockedFlag = 2L;
- void unlock_and_decrement_token_ref_count() nothrow @safe @nogc {
- state_.atomicFetchSub!(MemoryOrder.acq_rel)(locked_flag + token_ref_increment);
+ shared ulong state = 0;
+ StopCallback* head = null;
+ Thread signallingThread;
+
+ private void storeSignallingThread() @nogc nothrow @trusted shared scope {
+ (cast()signallingThread) = Thread.getThis();
}
- enum stop_requested_flag = 1L;
- enum locked_flag = 2L;
- enum token_ref_increment = 4L;
- enum source_ref_increment = 1L << 33u;
-
- // bit 0 - stop-requested
- // bit 1 - locked
- // bits 2-32 - token ref count (31 bits)
- // bits 33-63 - source ref count (31 bits)
- shared ulong state_ = source_ref_increment;
- InPlaceStopCallback* head_ = null;
- Thread signallingThread_;
+ private bool isSignallingThread() @nogc nothrow @trusted shared scope {
+ return (cast()signallingThread) is Thread.getThis();
+ }
}
diff --git a/source/concurrency/stream/cycle.d b/source/concurrency/stream/cycle.d
index a9e0d3f..05a21d4 100644
--- a/source/concurrency/stream/cycle.d
+++ b/source/concurrency/stream/cycle.d
@@ -1,13 +1,14 @@
module concurrency.stream.cycle;
import concurrency.stream.stream;
+import concurrency.stoptoken : StopToken;
import std.range : ElementType;
struct Cycle(Range) {
alias T = ElementType!Range;
alias DG = CollectDelegate!(T);
Range range;
- void loop(StopToken)(DG emit, StopToken stopToken) @safe {
+ void loop(DG emit, shared StopToken stopToken) @safe {
for (; !stopToken.isStopRequested;) {
foreach (item; range) {
emit(item);
diff --git a/source/concurrency/stream/flatmapbase.d b/source/concurrency/stream/flatmapbase.d
index dba68a2..1c83fe8 100644
--- a/source/concurrency/stream/flatmapbase.d
+++ b/source/concurrency/stream/flatmapbase.d
@@ -3,7 +3,7 @@ module concurrency.stream.flatmapbase;
import concurrency.stream.stream;
import concurrency.sender : OpType, isSender;
import concurrency.receiver : ForwardExtensionPoints;
-import concurrency.stoptoken : StopSource, StopToken;
+import concurrency.stoptoken : StopToken;
import std.traits : ReturnType;
import concurrency.utils : isThreadSafeFunction;
import concepts;
@@ -42,7 +42,7 @@ template FlatMapBaseStreamOp(Stream, Fun, OnOverlap overlap) {
static if (is(Properties.ElementType == void))
void item() {
- if (state.isStopRequested)
+ if (state.stopSource.isStopRequested)
return;
state.onItem();
with (state.bitfield.lock()) {
@@ -58,7 +58,7 @@ template FlatMapBaseStreamOp(Stream, Fun, OnOverlap overlap) {
else
void item(Properties.ElementType t) {
- if (state.isStopRequested)
+ if (state.stopSource.isStopRequested)
return;
state.onItem();
with (state.bitfield.lock()) {
@@ -104,7 +104,7 @@ private bool isDoneOrErrorProduced(size_t state) @safe @nogc nothrow pure {
}
final class State(TStreamSenderValue, TSenderValue, Receiver,
- OnOverlap overlap) : StopSource {
+ OnOverlap overlap) {
import concurrency.bitfield;
import concurrency.stoptoken;
import std.exception : assumeWontThrow;
@@ -118,32 +118,31 @@ final class State(TStreamSenderValue, TSenderValue, Receiver,
StreamSenderValue value;
Throwable throwable;
Semaphore semaphore;
- StopCallback cb;
+ shared StopSource stopSource;
+ shared StopCallback cb;
static if (overlap == OnOverlap.latest)
- StopSource innerStopSource;
+ shared StopSource innerStopSource;
shared SharedBitField!Flags bitfield;
this(DG dg, Receiver receiver) {
this.dg = dg;
this.receiver = receiver;
semaphore = new Semaphore(1);
- static if (overlap == OnOverlap.latest)
- innerStopSource = new StopSource();
bitfield = SharedBitField!Flags(Counter.tick);
- cb = receiver.getStopToken
- .onStop(cast(void delegate() nothrow @safe shared) &stop);
+ auto stopToken = receiver.getStopToken;
+ cb.register(stopToken, cast(void delegate() nothrow @safe shared) &stop);
}
- override bool stop() nothrow @trusted {
+ bool stop() nothrow @trusted {
return (cast(shared) this).stop();
}
- override bool stop() nothrow @trusted shared {
+ bool stop() nothrow @trusted shared {
static if (overlap == OnOverlap.latest) {
- auto r = super.stop();
+ auto r = stopSource.stop();
innerStopSource.stop();
return r;
} else {
- return super.stop();
+ return stopSource.stop();
}
}
@@ -176,11 +175,11 @@ final class State(TStreamSenderValue, TSenderValue, Receiver,
}
}
- private StopToken getSenderStopToken() @safe nothrow {
+ private shared(StopToken) getSenderStopToken() @safe nothrow {
static if (overlap == OnOverlap.latest) {
- return StopToken(innerStopSource);
+ return innerStopSource.token();
} else {
- return StopToken(this);
+ return stopSource.token();
}
}
}
@@ -212,7 +211,7 @@ struct StreamReceiver(State) {
if (!isDoneOrErrorProduced(oldState)) {
state.throwable = t;
release(); // must release before calling .stop
- state.stop();
+ state.stopSource.stop();
} else
release();
if (last)
@@ -224,14 +223,14 @@ struct StreamReceiver(State) {
with (state.bitfield.update(Flags.doneOrError_produced, Counter.tick)) {
bool last = isLast(newState);
if (!isDoneOrErrorProduced(oldState))
- state.stop();
+ state.stopSource.stop();
if (last)
state.process(newState);
}
}
- StopToken getStopToken() @safe nothrow {
- return StopToken(state);
+ shared(StopToken) getStopToken() @safe nothrow {
+ return state.stopSource.token();
}
private auto receiver() {
@@ -261,7 +260,7 @@ struct InnerSenderReceiver(State) {
if (!isDoneOrErrorProduced(oldState)) {
state.throwable = t;
release(); // must release before calling .stop
- state.stop();
+ state.stopSource.stop();
} else
release();
if (last)
@@ -273,7 +272,7 @@ struct InnerSenderReceiver(State) {
void setDone() @safe nothrow {
static if (State.onOverlap == OnOverlap.latest) {
- if (!state.isStopRequested) {
+ if (!state.stopSource.isStopRequested) {
state.bitfield.add(Counter.tick);
notify();
return;
@@ -283,7 +282,7 @@ struct InnerSenderReceiver(State) {
with (state.bitfield.update(Flags.doneOrError_produced, Counter.tick)) {
bool last = isLast(newState);
if (!isDoneOrErrorProduced(oldState))
- state.stop();
+ state.stopSource.stop();
if (last)
state.process(newState);
else
diff --git a/source/concurrency/stream/package.d b/source/concurrency/stream/package.d
index f789c74..dc2402e 100644
--- a/source/concurrency/stream/package.d
+++ b/source/concurrency/stream/package.d
@@ -61,7 +61,7 @@ auto infiniteStream(T)(T t) {
alias DG = CollectDelegate!(T);
struct Loop {
T val;
- void loop(StopToken)(DG emit, StopToken stopToken) {
+ void loop(DG emit, shared StopToken stopToken) {
while (!stopToken.isStopRequested)
emit(val);
}
@@ -75,7 +75,7 @@ auto iotaStream(T)(T start, T end) {
alias DG = CollectDelegate!(T);
struct Loop {
T b, e;
- void loop(StopToken)(DG emit, StopToken stopToken) {
+ void loop(DG emit, shared StopToken stopToken) {
foreach (i; b .. e) {
emit(i);
if (stopToken.isStopRequested)
@@ -92,7 +92,7 @@ auto arrayStream(T)(T[] arr) {
alias DG = CollectDelegate!(T);
struct Loop {
T[] arr;
- void loop(StopToken)(DG emit, StopToken stopToken) @safe {
+ void loop(DG emit, shared StopToken stopToken) @safe {
foreach (item; arr) {
emit(item);
if (stopToken.isStopRequested)
@@ -305,14 +305,14 @@ shared struct SharedStream(T) {
shared SharedStream!T source;
DG dg;
Receiver receiver;
- StopCallback cb;
+ shared StopCallback cb;
@disable
this(ref return scope typeof(this) rhs);
@disable
this(this);
void start() nothrow @trusted {
auto stopToken = receiver.getStopToken();
- cb = stopToken.onStop(&(cast(shared) this).onStop);
+ cb.register(stopToken, &(cast(shared) this).onStop);
if (stopToken.isStopRequested) {
cb.dispose();
receiver.setDone();
diff --git a/source/concurrency/stream/scan.d b/source/concurrency/stream/scan.d
index b167a27..5a35520 100644
--- a/source/concurrency/stream/scan.d
+++ b/source/concurrency/stream/scan.d
@@ -30,7 +30,7 @@ template ScanStreamOp(Stream, Fun, Seed) {
@disable
this(this);
this(Stream stream, Fun scanFn, Seed seed, DG dg,
- Receiver receiver) @trusted {
+ Receiver receiver) @trusted return scope {
this.scanFn = scanFn;
this.acc = seed;
this.dg = dg;
diff --git a/source/concurrency/stream/take.d b/source/concurrency/stream/take.d
index 5c9e5e4..cfd9369 100644
--- a/source/concurrency/stream/take.d
+++ b/source/concurrency/stream/take.d
@@ -3,7 +3,7 @@ module concurrency.stream.take;
import concurrency.stream.stream;
import concurrency.sender : OpType;
import concurrency.receiver : ForwardExtensionPoints;
-import concurrency.stoptoken : InPlaceStopSource;
+import concurrency.stoptoken : StopSource;
import concepts;
/// takes the first n values from a stream or until cancelled
@@ -17,7 +17,7 @@ auto take(Stream)(Stream stream, size_t n) if (models!(Stream, isStream)) {
struct TakeReceiver(Receiver, Value) {
Receiver receiver;
- InPlaceStopSource* stopSource;
+ shared StopSource* stopSource;
static if (is(Value == void))
void setValue() @safe {
receiver.setValue();
@@ -51,12 +51,12 @@ template TakeOp(Stream) {
struct TakeOp(Receiver) {
import concurrency.operations : withStopSource, SSSender;
import std.traits : ReturnType;
- alias SS = SSSender!(Properties.Sender, InPlaceStopSource*);
+ alias SS = SSSender!(Properties.Sender);
alias Op =
OpType!(SS, TakeReceiver!(Receiver, Properties.Sender.Value));
size_t n;
Properties.DG dg;
- InPlaceStopSource stopSource;
+ shared StopSource stopSource;
Op op;
@disable
this(ref return scope typeof(this) rhs);
diff --git a/source/concurrency/stream/throttling.d b/source/concurrency/stream/throttling.d
index c6bad3d..56fef90 100644
--- a/source/concurrency/stream/throttling.d
+++ b/source/concurrency/stream/throttling.d
@@ -21,14 +21,11 @@ enum ThrottleEmitLogic : uint {
last // emit the last item in the window
}
-;
enum ThrottleTimerLogic : uint {
noop, // don't reset the timer on new items
rearm // reset the timer on new items
}
-;
-
/// throttleFirst forwards one item and then enters a cooldown period during which it ignores items
auto throttleFirst(Stream)(Stream s, Duration d) {
return throttling!(Stream, ThrottleEmitLogic.first,
@@ -78,9 +75,9 @@ template ThrottleStreamOp(Stream, ThrottleEmitLogic emitLogic,
alias InnerReceiver =
TimerReceiver!(typeof(this), Properties.ElementType, emitLogic,
timerLogic);
- StopSource stopSource;
- StopSource timerStopSource;
- StopCallback cb;
+ shared StopSource stopSource;
+ shared StopSource timerStopSource;
+ shared StopCallback cb;
Throwable throwable;
alias Op = OpType!(Properties.Sender,
SenderReceiver!(typeof(this), Properties.Value));
@@ -97,8 +94,6 @@ template ThrottleStreamOp(Stream, ThrottleEmitLogic emitLogic,
this.dur = dur;
this.dg = dg;
this.receiver = receiver;
- stopSource = new StopSource();
- timerStopSource = new StopSource();
op = stream.collect(cast(Properties.DG) &onItem).connect(
SenderReceiver!(typeof(this), Properties.Value)(&this));
}
@@ -266,9 +261,9 @@ template ThrottleStreamOp(Stream, ThrottleEmitLogic emitLogic,
}
void start() @trusted nothrow scope {
- cb = receiver.getStopToken().onStop(
- cast(void delegate() nothrow @safe shared) &this.stop
- ); // butt ugly cast, but it won't take the second overload
+ auto stopToken = receiver.getStopToken();
+ // butt ugly cast, but it won't take the second overload
+ cb.register(stopToken, cast(void delegate() nothrow @safe shared) &this.stop);
op.start();
}
}
@@ -315,7 +310,7 @@ struct TimerReceiver(Op, ElementType, ThrottleEmitLogic emitLogic,
}
auto getStopToken() {
- return StopToken(state.timerStopSource);
+ return state.timerStopSource.token();
}
auto getScheduler() {
@@ -355,7 +350,7 @@ struct SenderReceiver(Op, Value) {
}
auto getStopToken() {
- return StopToken(state.stopSource);
+ return state.stopSource.token();
}
auto getScheduler() {
diff --git a/source/concurrency/stream/transform.d b/source/concurrency/stream/transform.d
index bfcb342..5b60dde 100644
--- a/source/concurrency/stream/transform.d
+++ b/source/concurrency/stream/transform.d
@@ -3,7 +3,6 @@ module concurrency.stream.transform;
import concurrency.stream.stream;
import concurrency.sender : OpType;
import concurrency.receiver : ForwardExtensionPoints;
-import concurrency.stoptoken : StopSource;
import std.traits : ReturnType;
import concurrency.utils : isThreadSafeFunction;
import concepts;
diff --git a/source/concurrency/syncwait.d b/source/concurrency/syncwait.d
index 54827df..e0cedd1 100644
--- a/source/concurrency/syncwait.d
+++ b/source/concurrency/syncwait.d
@@ -18,15 +18,14 @@ package struct SyncWaitReceiver2(Value) {
static if (!is(Value == void))
Value result;
Throwable throwable;
- StopSource stopSource;
- this(StopSource stopSource) {
- this.stopSource = stopSource;
- worker = LocalThreadWorker(getLocalThreadExecutor());
+ static auto construct() {
+ return State(LocalThreadWorker(getLocalThreadExecutor()));
}
}
State* state;
+ shared StopSource* stopSource;
void setDone() nothrow @safe {
state.canceled = true;
state.worker.stop();
@@ -49,7 +48,7 @@ package struct SyncWaitReceiver2(Value) {
}
auto getStopToken() nothrow @safe @nogc {
- return StopToken(state.stopSource);
+ return stopSource.token();
}
auto getScheduler() nothrow @safe {
@@ -114,6 +113,10 @@ struct Result(T) {
return std.sumtype.match!((T t) => t, function T(x) { throw new Exception("Unexpected value"); })(result);
}
+ auto trustedGet(T)() {
+ return std.sumtype.match!((T t) => t, function T(x) { assert(0, "nah"); })(result);
+ }
+
auto assumeOk() {
return value();
}
@@ -128,29 +131,36 @@ template match(Handlers...) {
}
/// Start the Sender and waits until it completes, cancels, or has an error.
-auto syncWait(Sender, StopSource)(auto ref Sender sender,
- StopSource stopSource) {
- return syncWaitImpl(sender, (() @trusted => cast() stopSource)());
+auto syncWait(Sender)(auto ref Sender sender,
+ shared ref StopSource stopSource) {
+ return syncWaitImpl(sender, stopSource);
}
-auto syncWait(Sender)(auto scope ref Sender sender) {
+auto syncWait(Sender)(auto scope ref Sender sender) @trusted {
import concurrency.signal : globalStopSource;
- auto childStopSource = new shared StopSource();
- auto cb = InPlaceStopCallback(() shared {
+ shared StopSource childStopSource;
+ shared parentStopToken = globalStopSource.token();
+ shared StopCallback cb;
+ cb.register(parentStopToken, () shared {
childStopSource.stop();
});
- StopToken parentStopToken = StopToken(globalStopSource);
- // parentStopToken.onStop(cb);
- auto result =
- syncWaitImpl(sender, (() @trusted => cast() childStopSource)());
- // detach stopSource
- cb.dispose();
- return result;
+
+ try {
+ auto result = syncWaitImpl(sender, childStopSource);
+
+ version (unittest) childStopSource.assertNoCallbacks;
+ // detach stopSource
+ cb.dispose();
+ return result;
+ } catch (Throwable t) { // @suppress(dscanner.suspicious.catch_em_all)
+ cb.dispose();
+ throw t;
+ }
}
private
Result!(Sender.Value) syncWaitImpl(Sender)(auto scope ref Sender sender,
- StopSource stopSource) @safe {
+ ref shared StopSource stopSource) @safe {
static assert(models!(Sender, isSender));
import concurrency.signal;
import core.stdc.signal : SIGTERM, SIGINT;
@@ -158,8 +168,8 @@ Result!(Sender.Value) syncWaitImpl(Sender)(auto scope ref Sender sender,
alias Value = Sender.Value;
alias Receiver = SyncWaitReceiver2!(Value);
- auto state = Receiver.State(stopSource);
- scope receiver = (() @trusted => Receiver(&state))();
+ auto state = Receiver.State.construct;
+ scope receiver = (() @trusted => Receiver(&state, &stopSource))();
auto op = sender.connect(receiver);
op.start();
@@ -171,7 +181,8 @@ Result!(Sender.Value) syncWaitImpl(Sender)(auto scope ref Sender sender,
if (state.throwable !is null) {
if (auto e = cast(Exception) state.throwable)
return Result!Value(e);
- throw state.throwable;
+ auto throwable = (() @trusted => state.throwable)();
+ throw throwable;
}
static if (is(Value == void))
diff --git a/source/concurrency/thread.d b/source/concurrency/thread.d
index 7ed8881..8482312 100644
--- a/source/concurrency/thread.d
+++ b/source/concurrency/thread.d
@@ -140,7 +140,7 @@ package struct LocalThreadWorker {
return removed;
}
- void start() @trusted {
+ void start() @trusted scope {
assert(isInContext); // start can only be called on the thread
import std.datetime.systime : Clock;
import std.array : Appender;
diff --git a/source/concurrency/utils.d b/source/concurrency/utils.d
index 94ee6a6..977175f 100644
--- a/source/concurrency/utils.d
+++ b/source/concurrency/utils.d
@@ -44,9 +44,9 @@ auto closure(Fun, Args...)(Fun fun, Args args) @trusted
auto cl = cast(shared) new Closure!(Fun, Args)(fun, args);
/// need to cast to @safe because a @trusted delegate doesn't fit a @safe one...
static if (hasFunctionAttributes!(Fun, "nothrow"))
- alias ResultType = void delegate() nothrow shared @safe;
+ alias ResultType = void delegate() nothrow @safe shared;
else
- alias ResultType = void delegate() shared @safe;
+ alias ResultType = void delegate() @safe shared;
return cast(ResultType) &cl.apply;
}
@@ -108,7 +108,7 @@ enum isThreadSafeCallable(alias Fun) =
// are all pointing to the same instance.
// We do this by exporting accessors functions which a dynamic library can
// call to get access to the global.
-auto dynamicLoad(alias fun)() nothrow @trusted {
+auto dynamicLoad(alias fun)() nothrow @trusted @nogc {
alias Fn = typeof(&fun);
__gshared Fn fn;
@@ -126,7 +126,7 @@ auto dynamicLoad(alias fun)() nothrow @trusted {
return fn;
}
-auto dynamicLoadRaw(alias fun)() nothrow @trusted {
+auto dynamicLoadRaw(alias fun)() nothrow @trusted @nogc {
alias Fn = typeof(&fun);
version(Windows) {
import core.sys.windows.windows;
diff --git a/tests/ut/concurrency/asyncscope.d b/tests/ut/concurrency/asyncscope.d
index 25eab59..4d5b403 100644
--- a/tests/ut/concurrency/asyncscope.d
+++ b/tests/ut/concurrency/asyncscope.d
@@ -180,7 +180,7 @@ auto waitingTask() {
import concurrency.thread : ThreadSender;
import concurrency.operations : withStopToken;
- return ThreadSender().withStopToken((StopToken token) @trusted {
+ return ThreadSender().withStopToken((shared StopToken token) @trusted {
import core.thread : Thread;
while (!token.isStopRequested) {
Thread.yield();
diff --git a/tests/ut/concurrency/fork.d b/tests/ut/concurrency/fork.d
index 4c01675..f5e0d2c 100644
--- a/tests/ut/concurrency/fork.d
+++ b/tests/ut/concurrency/fork.d
@@ -18,7 +18,7 @@ unittest {
@("sync_wait.fork.exception") @trusted
unittest {
import core.stdc.stdlib;
- ForkSender(getLocalThreadExecutor(), () shared @trusted {
+ ForkSender(getLocalThreadExecutor(), () @trusted shared {
exit(1);
}).syncWait.assumeOk.shouldThrow();
}
diff --git a/tests/ut/concurrency/mpsc.d b/tests/ut/concurrency/mpsc.d
index 153c70a..0f8b221 100644
--- a/tests/ut/concurrency/mpsc.d
+++ b/tests/ut/concurrency/mpsc.d
@@ -29,7 +29,7 @@ auto intSummer(Q)(Q q) {
import concurrency.stoptoken : StopToken;
import core.time : msecs;
- return just(q).withStopToken((StopToken stopToken, Q q) shared @safe {
+ return just(q).withStopToken((shared StopToken stopToken, Q q) @safe shared {
int sum = 0;
while (!stopToken.isStopRequested()) {
if (auto node = q.pop()) {
@@ -54,8 +54,8 @@ unittest {
import core.time : msecs;
auto q = new MPSCQueue!Node();
- q.intSummer.stopWhen(intProducer(q, 50000)).syncWait.value.should
- == 1250025000;
+ q.intSummer.stopWhen(intProducer(q, 50_000)).syncWait.value.should
+ == 1_250_025_000;
q.empty.should == true;
}
@@ -66,10 +66,10 @@ unittest {
auto q = new MPSCQueue!Node();
q
.intSummer
- .stopWhen(whenAll(intProducer(q, 10000), intProducer(q, 10000),
- intProducer(q, 10000), intProducer(q, 10000), ))
+ .stopWhen(whenAll(intProducer(q, 10_000), intProducer(q, 10_000),
+ intProducer(q, 10_000), intProducer(q, 10_000), ))
.syncWait
.value
- .should == 200020000;
+ .should == 200_020_000;
q.empty.should == true;
}
diff --git a/tests/ut/concurrency/nursery.d b/tests/ut/concurrency/nursery.d
index f681474..5faee93 100644
--- a/tests/ut/concurrency/nursery.d
+++ b/tests/ut/concurrency/nursery.d
@@ -52,8 +52,8 @@ unittest {
unittest {
auto nursery = new shared Nursery();
shared(int) global;
- nursery.run(ThreadSender().then(() shared @safe {
- nursery.run(ValueSender!(int)(5).then((int c) shared @safe {
+ nursery.run(ThreadSender().then(() @safe shared {
+ nursery.run(ValueSender!(int)(5).then((int c) @safe shared {
global = c;
}));
}));
@@ -66,17 +66,17 @@ unittest {
@("run.thread.stop.internal") @safe
unittest {
auto nursery = new shared Nursery();
- nursery.run(ThreadSender().then(() shared @safe => nursery.stop()));
+ nursery.run(ThreadSender().then(() @safe shared => nursery.stop()));
nursery.syncWait.isCancelled.should == true;
nursery.getStopToken().isStopRequested().shouldBeTrue();
}
-@("run.thread.stop.external") @trusted
+@("run.thread.stop.external") @safe
unittest {
auto nursery = new shared Nursery();
- auto stopSource = new shared StopSource();
- nursery.run(ThreadSender().then(() shared @safe => stopSource.stop()));
- nursery.syncWait(cast(StopSource) stopSource).isCancelled.should == true;
+ shared stopSource = StopSource();
+ nursery.run(ThreadSender().then(() @safe shared => stopSource.stop()));
+ nursery.syncWait(stopSource).isCancelled.should == true;
nursery.getStopToken().isStopRequested().shouldBeTrue();
stopSource.isStopRequested().shouldBeTrue();
}
@@ -85,12 +85,12 @@ unittest {
unittest {
import core.thread : Thread;
auto nursery = new shared Nursery();
- auto thread1 = ThreadSender().then(() shared @trusted {
+ auto thread1 = ThreadSender().then(() @trusted shared {
auto token = nursery.getStopToken();
while (!token.isStopRequested())
Thread.yield();
});
- auto thread2 = ThreadSender().then(() shared @safe => nursery.stop());
+ auto thread2 = ThreadSender().then(() @safe shared => nursery.stop());
nursery.run(thread1);
nursery.run(thread2);
nursery.syncWait.isCancelled.should == true;
@@ -115,17 +115,17 @@ unittest {
unittest {
import core.thread : Thread;
auto nursery = new shared Nursery();
- auto thread1 = ThreadSender().then(() shared @trusted {
+ auto thread1 = ThreadSender().then(() @trusted shared {
auto token = nursery.getStopToken();
while (!token.isStopRequested())
Thread.yield();
});
auto thread2 =
- ThreadSender().withStopToken((StopToken token) shared @trusted {
+ ThreadSender().withStopToken((shared StopToken token) @trusted shared {
while (!token.isStopRequested())
Thread.yield();
});
- auto thread3 = ThreadSender().then(() shared @safe {
+ auto thread3 = ThreadSender().then(() @safe shared {
throw new Exception("Error should stop everyone");
});
nursery.run(thread1);
@@ -139,18 +139,17 @@ unittest {
@("withStopSource.1") @safe
unittest {
import core.thread : Thread;
- auto stopSource = new StopSource();
+ shared StopSource stopSource;
auto nursery = new shared Nursery();
auto thread1 =
- ThreadSender().withStopToken((StopToken stopToken) shared @trusted {
+ ThreadSender().withStopToken((shared StopToken stopToken) @trusted shared {
while (!stopToken.isStopRequested)
Thread.yield();
}).withStopSource(stopSource);
// stop via the source
- auto stopper = ValueSender!StopSource(stopSource)
- .then((StopSource stopSource) shared => stopSource.stop());
+ auto stopper = justFrom(() shared => stopSource.stop());
nursery.run(thread1);
nursery.run(stopper);
@@ -161,18 +160,17 @@ unittest {
@("withStopSource.2") @safe
unittest {
import core.thread : Thread;
- auto stopSource = new StopSource();
+ shared StopSource stopSource;
auto nursery = new shared Nursery();
auto thread1 =
- ThreadSender().withStopToken((StopToken stopToken) shared @trusted {
+ ThreadSender().withStopToken((shared StopToken stopToken) @trusted shared {
while (!stopToken.isStopRequested)
Thread.yield();
}).withStopSource(stopSource);
// stop via the nursery
- auto stopper = ValueSender!(shared Nursery)(nursery)
- .then((shared Nursery nursery) shared => nursery.stop());
+ auto stopper = justFrom(() shared => nursery.stop());
nursery.run(thread1);
nursery.run(stopper);
diff --git a/tests/ut/concurrency/operations.d b/tests/ut/concurrency/operations.d
index e852ea7..9113853 100644
--- a/tests/ut/concurrency/operations.d
+++ b/tests/ut/concurrency/operations.d
@@ -57,7 +57,7 @@ unittest {
unittest {
race(ValueSender!int(4), ValueSender!int(5)).syncWait.value.should == 4;
auto fastThread = ThreadSender().then(() shared => 1);
- auto slowThread = ThreadSender().then(() shared @trusted {
+ auto slowThread = ThreadSender().then(() @trusted shared {
Thread.sleep(50.msecs);
return 2;
});
@@ -80,7 +80,7 @@ unittest {
@("race.exception.double") @safe
unittest {
- auto slow = ThreadSender().then(() shared @trusted {
+ auto slow = ThreadSender().then(() @trusted shared {
Thread.sleep(50.msecs);
throw new Exception("Slow");
});
@@ -92,7 +92,7 @@ unittest {
@("race.cancel-other") @safe
unittest {
- auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
+ auto waiting = ThreadSender().withStopToken((shared StopToken token) @trusted {
while (!token.isStopRequested) {
Thread.yield();
}
@@ -103,7 +103,7 @@ unittest {
@("race.cancel") @safe
unittest {
- auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
+ auto waiting = ThreadSender().withStopToken((shared StopToken token) @trusted {
while (!token.isStopRequested) {
Thread.yield();
}
@@ -189,7 +189,7 @@ unittest {
@("whenAll.cancel") @safe
unittest {
- auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
+ auto waiting = ThreadSender().withStopToken((shared StopToken token) @trusted {
while (!token.isStopRequested) {
Thread.yield();
}
@@ -197,7 +197,7 @@ unittest {
whenAll(waiting, DoneSender()).syncWait.isCancelled.should == true;
whenAll(ThrowingSender(), waiting).syncWait.assumeOk.shouldThrow;
whenAll(waiting, ThrowingSender()).syncWait.assumeOk.shouldThrow;
- auto waitingInt = ThreadSender().withStopToken((StopToken token) @trusted {
+ auto waitingInt = ThreadSender().withStopToken((shared StopToken token) @trusted {
while (!token.isStopRequested) {
Thread.yield();
}
@@ -211,14 +211,13 @@ unittest {
@("whenAll.stop") @safe
unittest {
- auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
+ auto waiting = ThreadSender().withStopToken((shared StopToken token) @trusted {
while (!token.isStopRequested) {
Thread.yield();
}
});
- auto source = new StopSource();
- auto stopper =
- just(source).then((StopSource source) shared => source.stop());
+ shared source = StopSource();
+ auto stopper = justFrom(() shared => source.stop());
whenAll(waiting, stopper).withStopSource(source).syncWait.isCancelled.should
== true;
}
@@ -367,32 +366,26 @@ unittest {
@("withStopToken.oob") @safe
unittest {
auto oob = OutOfBandValueSender!int(44);
- oob.withStopToken((StopToken stopToken, int t) => t).syncWait.value.should
+ oob.withStopToken((shared StopToken stopToken, int t) => t).syncWait.value.should
== 44;
}
@("withStopSource.oob") @safe
unittest {
auto oob = OutOfBandValueSender!int(45);
- oob.withStopSource(new StopSource()).syncWait.value.should == 45;
-}
-
-@("withStopSource.oob") @safe
-unittest {
- auto oob = OutOfBandValueSender!int(45);
- InPlaceStopSource stopSource;
- oob.withStopSource(stopSource).syncWait.value.should == 45;
+ shared source = StopSource();
+ oob.withStopSource(source).syncWait.value.should == 45;
}
@("withStopSource.tuple") @safe
unittest {
- just(14, 53).withStopToken((StopToken s, Tuple!(int, int) t) => t[0] * t[1])
+ just(14, 53).withStopToken((shared StopToken s, Tuple!(int, int) t) => t[0] * t[1])
.syncWait.value.should == 742;
}
@("value.withstoptoken.via.thread") @safe
unittest {
- ValueSender!int(4).withStopToken((StopToken s, int i) {
+ ValueSender!int(4).withStopToken((shared StopToken s, int i) {
throw new Exception("Badness");
}).via(ThreadSender()).syncWait.assumeOk.shouldThrowWithMessage("Badness");
}
@@ -405,7 +398,7 @@ unittest {
@("raceAll") @safe
unittest {
- auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
+ auto waiting = ThreadSender().withStopToken((shared StopToken token) @trusted {
while (!token.isStopRequested) {
Thread.yield();
}
@@ -438,8 +431,8 @@ unittest {
import concurrency.scheduler : ManualTimeWorker;
auto worker = new shared ManualTimeWorker();
- auto source = new StopSource();
- auto driver = just(source).then((StopSource source) shared {
+ shared StopSource source;
+ auto driver = justFrom(() shared {
worker.timeUntilNextEvent().should == 10.msecs.nullable;
source.stop();
worker.timeUntilNextEvent().isNull.should == true;
@@ -499,13 +492,13 @@ unittest {
@("stopOn") @safe
unittest {
- auto sourceInner = new shared StopSource();
- auto sourceOuter = new shared StopSource();
+ shared sourceInner = StopSource();
+ shared sourceOuter = StopSource();
shared bool b;
whenAll(
delay(5.msecs).then(() shared => b = true)
- .stopOn(StopToken(sourceInner)),
+ .stopOn(sourceInner.token()),
just(() => sourceOuter.stop())
).syncWait(sourceOuter).assumeOk;
b.should == true;
@@ -513,7 +506,7 @@ unittest {
shared bool d;
whenAll(
delay(5.msecs).then(() shared => b = true)
- .stopOn(StopToken(sourceInner)),
+ .stopOn(sourceInner.token()),
just(() => sourceInner.stop())
).syncWait(sourceOuter).assumeOk;
d.should == false;
@@ -527,41 +520,41 @@ unittest {
import core.sync.event : Event;
bool parentAfterChild;
Event childEvent, parentEvent;
- this() shared @trusted {
+ this() @trusted shared {
(cast() childEvent).initialize(false, false);
(cast() parentEvent).initialize(false, false);
}
- void signalChild() shared @trusted {
+ void signalChild() @trusted shared {
(cast() childEvent).set();
}
- void waitChild() shared @trusted {
+ void waitChild() @trusted shared {
(cast() childEvent).wait();
}
- void signalParent() shared @trusted {
+ void signalParent() @trusted shared {
(cast() parentEvent).set();
}
- void waitParent() shared @trusted {
+ void waitParent() @trusted shared {
(cast() parentEvent).wait();
}
}
auto state = new shared State();
- auto source = new shared StopSource;
+ shared source = StopSource();
import std.stdio;
auto child = just(state)
- .withStopToken((StopToken token, shared State state) @trusted {
+ .withStopToken((shared StopToken token, shared State state) @trusted {
while (!token.isStopRequested) {}
state.signalParent();
state.waitChild();
}).via(ThreadSender());
auto parent =
- just(state).withStopToken((StopToken token, shared State state) {
+ just(state).withStopToken((shared StopToken token, shared State state) {
state.waitParent();
state.parentAfterChild.atomicStore(token.isStopRequested == false);
state.signalChild();
@@ -569,7 +562,7 @@ unittest {
whenAll(
parent.withChild(child).withStopSource(source),
- just(source).then((shared StopSource s) => s.stop())
+ justFrom(() shared => source.stop())
).syncWait.isCancelled.should == true;
state.parentAfterChild.atomicLoad.should == true;
@@ -641,7 +634,7 @@ unittest {
@("stopWhen.source.value") @safe
unittest {
- auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
+ auto waiting = ThreadSender().withStopToken((shared StopToken token) @trusted {
while (!token.isStopRequested) {
Thread.yield();
}
@@ -654,7 +647,7 @@ unittest {
@("stopWhen.source.error") @safe
unittest {
- auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
+ auto waiting = ThreadSender().withStopToken((shared StopToken token) @trusted {
while (!token.isStopRequested) {
Thread.yield();
}
@@ -668,7 +661,7 @@ unittest {
@("stopWhen.source.cancelled") @safe
unittest {
- auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
+ auto waiting = ThreadSender().withStopToken((shared StopToken token) @trusted {
while (!token.isStopRequested) {
Thread.yield();
}
@@ -679,7 +672,7 @@ unittest {
@("stopWhen.trigger.error") @safe
unittest {
- auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
+ auto waiting = ThreadSender().withStopToken((shared StopToken token) @trusted {
while (!token.isStopRequested) {
Thread.yield();
}
@@ -694,7 +687,7 @@ unittest {
@("stopWhen.trigger.cancelled.value") @safe
unittest {
- auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
+ auto waiting = ThreadSender().withStopToken((shared StopToken token) @trusted {
while (!token.isStopRequested) {
Thread.yield();
}
diff --git a/tests/ut/concurrency/scheduler.d b/tests/ut/concurrency/scheduler.d
index f35ed59..5ffd4a8 100644
--- a/tests/ut/concurrency/scheduler.d
+++ b/tests/ut/concurrency/scheduler.d
@@ -22,7 +22,7 @@ unittest {
@("scheduleAfter.stop-before-add") @safe
unittest {
import concurrency.sender : delay, justFrom;
- auto source = new shared StopSource();
+ shared source = StopSource();
whenAll(justFrom(() shared => source.stop), delay(10.msecs))
.syncWait(source);
}
diff --git a/tests/ut/concurrency/sender.d b/tests/ut/concurrency/sender.d
index b94a968..81d0086 100644
--- a/tests/ut/concurrency/sender.d
+++ b/tests/ut/concurrency/sender.d
@@ -83,7 +83,7 @@ unittest {
unittest {
ThrowingSender().via(ThreadSender()).syncWait().isError.should == true;
}
-
+/+
@("toShared.basic") @safe
unittest {
import std.typecons : tuple;
@@ -171,17 +171,15 @@ unittest {
unittest {
import concurrency.stoptoken;
import core.atomic : atomicStore, atomicLoad;
- shared bool g;
+ shared bool g;stopSource
auto waiting =
- ThreadSender().withStopToken((StopToken token) shared @trusted {
+ ThreadSender().withStopToken((shared StopToken token) @trusted shared {
while (!token.isStopRequested) {}
g.atomicStore(true);
});
- auto source = new StopSource();
- auto stopper = just(source).then((StopSource source) shared {
- source.stop();
- });
+ shared source = StopSource();
+ auto stopper = justFrom(() shared => source.stop());
whenAll(waiting.toShared().withStopSource(source), stopper)
.syncWait.isCancelled.should == true;
@@ -209,7 +207,7 @@ unittest {
auto n = new shared Nursery();
auto s = n.toShared(localThreadScheduler());
}
-
++/
@("nvro") @safe
unittest {
static struct Op(Receiver) {
@@ -335,7 +333,7 @@ unittest {
import concurrency.stoptoken;
just(14, 52).syncWait.value.should == tuple(14, 52);
just(14, 53).then((int a, int b) => a * b).syncWait.value.should == 742;
- just(14, 54).withStopToken((StopToken s, int a, int b) => a * b).syncWait
+ just(14, 54).withStopToken((shared StopToken s, int a, int b) => a * b).syncWait
.value.should == 756;
}
diff --git a/tests/ut/concurrency/stoptoken.d b/tests/ut/concurrency/stoptoken.d
new file mode 100644
index 0000000..93955e2
--- /dev/null
+++ b/tests/ut/concurrency/stoptoken.d
@@ -0,0 +1,167 @@
+module ut.concurrency.stoptoken;
+
+import concurrency.stoptoken;
+import unit_threaded;
+import std.encoding;
+
+@("stopsource.isStopRequested.stop") @safe
+unittest {
+ auto source = shared StopSource();
+ source.isStopRequested.should == false;
+ source.stop();
+ source.isStopRequested.should == true;
+}
+
+@("stopsource.isStopRequested.reset") @safe
+unittest {
+ auto source = shared StopSource();
+ source.stop();
+ source.isStopRequested.should == true;
+ source.reset();
+ source.isStopRequested.should == false;
+}
+
+@("stopsource.reset") @safe
+unittest {
+ auto source = shared StopSource();
+ auto token = source.token();
+ shared StopCallback cb;
+ cb.register(token, () shared { });
+ source.reset();
+ source.assertNoCallbacks();
+}
+
+@("stopsource.no-copy") @safe
+unittest {
+ auto source = shared StopSource();
+ /// Disallow copy
+ static assert(!__traits(compiles, { shared s2 = source; }));
+}
+
+@("stopsource.no-assign") @safe
+unittest {
+ auto source = shared StopSource();
+ auto source2 = shared StopSource();
+ /// Disallow assign
+ static assert(!__traits(compiles, { source = shared StopSource(); }));
+ static assert(!__traits(compiles, { source = source2; }));
+}
+
+@("stoptoken.isStopRequested") @safe
+unittest {
+ auto source = shared StopSource();
+ auto token = source.token();
+ token.isStopRequested.should == false;
+ source.stop();
+ token.isStopRequested.should == true;
+}
+
+@("stoptoken.lifetime") @safe
+unittest {
+ shared StopToken token;
+ {
+ auto source = shared StopSource();
+ /// Error: address of variable `source` assigned to `token` with longer lifetime
+ static assert(!__traits(compiles, () @safe { token = source.token(); }));
+ }
+}
+
+@("stoptoken.scope.no-escape") @safe
+unittest {
+ shared StopToken t1;
+ {
+ shared source = shared StopSource();
+ auto token = source.token();
+ auto t2 = token;
+
+ // assert that token can't escape the StopSource
+ static assert(!__traits(compiles, t1 = token));
+ // assert that a copy can't escape the StopSource
+ static assert(!__traits(compiles, t1 = t2));
+ }
+}
+
+@("stopcallback.lifetime") @safe
+unittest {
+ shared StopCallback cb;
+ {
+ auto source = shared StopSource();
+ auto token = source.token();
+ /// We don't allow address of variable `token` to be assigned to `cb` with longer lifetime
+ static assert(!__traits(compiles, () @safe { cb.place(() shared { }, token); }));
+ }
+}
+
+@("stopcallback.opassign") @safe
+unittest {
+ shared StopCallback cb1;
+ shared StopCallback cb2;
+ StopCallback cb3;
+ /// Disallow assign
+ static assert(!__traits(compiles, { cb1 = shared StopCallback(); }));
+ static assert(!__traits(compiles, { cb1 = cb2; }));
+ static assert(!__traits(compiles, { cb1 = StopCallback(); }));
+ static assert(!__traits(compiles, { cb1 = cb3; }));
+}
+
+@("stopsource.StopCallback.stop") @safe
+unittest {
+ import core.atomic;
+ auto source = shared StopSource();
+ auto token = source.token();
+ shared bool set;
+ shared StopCallback cb;
+ cb.register(token, () shared { set.atomicStore(true); });
+ set.atomicLoad.should == false;
+ source.stop();
+ set.atomicLoad.should == true;
+}
+
+@("stopsource.StopCallback.reset") @safe
+unittest {
+ import core.atomic;
+ auto source = shared StopSource();
+ auto token = source.token();
+ shared bool set;
+ shared StopCallback cb;
+ cb.register(token, () shared { set.atomicStore(true); });
+ set.atomicLoad.should == false;
+ source.reset();
+ set.atomicLoad.should == true;
+}
+
+@("StopSource.stoptoken.stop") @safe
+unittest {
+ auto source = shared StopSource();
+ auto token = source.token();
+ token.isStopRequested.should == false;
+ source.stop();
+ token.isStopRequested.should == true;
+}
+
+@("StopSource.StopCallback.stop") @safe
+unittest {
+ import core.atomic;
+ auto source = shared StopSource();
+ auto token = source.token();
+ shared bool set;
+ shared StopCallback cb;
+ cb.register(token, () shared { set.atomicStore(true); });
+ set.atomicLoad.should == false;
+ source.stop();
+ set.atomicLoad.should == true;
+}
+
+@("StopSource.StopCallback.dispose") @safe
+unittest {
+ import core.atomic;
+ auto source = shared StopSource();
+ auto token = source.token();
+ shared bool set;
+ shared StopCallback cb;
+ cb.register(token, () shared { set.atomicStore(true); });
+ set.atomicLoad.should == false;
+ cb.dispose();
+ source.stop();
+ set.atomicLoad.should == false;
+}
diff --git a/tests/ut/concurrency/stream.d b/tests/ut/concurrency/stream.d
index 7d81cc7..43ec372 100644
--- a/tests/ut/concurrency/stream.d
+++ b/tests/ut/concurrency/stream.d
@@ -47,7 +47,7 @@ unittest {
unittest {
import concurrency.operations : withStopSource;
shared int g = 0;
- auto source = new shared StopSource();
+ auto source = shared StopSource();
infiniteStream(5).collect((int n) shared {
if (g < 14)
g.atomicOp!"+="(n);
@@ -82,7 +82,7 @@ unittest {
unittest {
struct Loop {
size_t b, e;
- void loop(DG, StopToken)(DG emit, StopToken stopToken) {
+ void loop(DG)(DG emit, shared StopToken stopToken) {
foreach (i; b .. e)
emit(i);
}
@@ -828,7 +828,7 @@ unittest {
unittest {
import concurrency.stream.defer;
static struct S {
- auto opCall() shared @safe {
+ auto opCall() @safe shared {
import concurrency.sender;
return just(1);
}
diff --git a/tests/ut/concurrency/thread.d b/tests/ut/concurrency/thread.d
index d7c1f1b..ec2d3a6 100644
--- a/tests/ut/concurrency/thread.d
+++ b/tests/ut/concurrency/thread.d
@@ -85,20 +85,3 @@ unittest {
import concurrency.utils;
dynamicLoadRaw!concurrency_getLocalThreadExecutor.should.not == null;
}
-
-@("nested.intervalStream") @safe
-unittest {
- import core.time : msecs;
- import concurrency.stream : intervalStream, take;
-
- static auto interval() {
- return intervalStream(1.msecs).take(90).collect(() shared {});
- }
-
- auto sender = justFrom(() => interval().syncWait()).via(ThreadSender());
-
- auto d = delay(10.msecs);
-
- whenAll(interval, sender, interval, sender, interval, sender, d, d, d)
- .syncWait().assumeOk;
-}
diff --git a/tests/ut/concurrency/utils.d b/tests/ut/concurrency/utils.d
index a7ebfca..63c8e09 100644
--- a/tests/ut/concurrency/utils.d
+++ b/tests/ut/concurrency/utils.d
@@ -25,7 +25,7 @@ unittest {
auto systemSharedClosure = (int i) shared => i * system() * k;
static assert(!isThreadSafeFunction!systemSharedClosure);
- auto trustedSharedClosure = (int i) shared @trusted => i * system() * k;
+ auto trustedSharedClosure = (int i) @trusted shared => i * system() * k;
static assert(isThreadSafeFunction!trustedSharedClosure);
}
diff --git a/tests/ut/concurrency/waitable.d b/tests/ut/concurrency/waitable.d
index 5e06b95..40eb26d 100644
--- a/tests/ut/concurrency/waitable.d
+++ b/tests/ut/concurrency/waitable.d
@@ -33,7 +33,7 @@ auto intSummer(Q)(Q q) {
import concurrency.stoptoken : StopToken;
import core.time : msecs;
- return just(q).withStopToken((StopToken stopToken, Q q) shared @safe {
+ return just(q).withStopToken((shared StopToken stopToken, Q q) @safe shared {
int sum = 0;
while (!stopToken.isStopRequested()) {
if (auto node = q.pop(100.msecs)) {
@@ -58,8 +58,8 @@ unittest {
import core.time : msecs;
auto q = new WaitableQueue!(MPSCQueue!Node)();
- q.intSummer.stopWhen(intProducer(q, 50000)).syncWait.value.should
- == 1250025000;
+ q.intSummer.stopWhen(intProducer(q, 50_000)).syncWait.value.should
+ == 1_250_025_000;
q.empty.should == true;
}
@@ -70,10 +70,10 @@ unittest {
auto q = new WaitableQueue!(MPSCQueue!Node)();
q
.intSummer
- .stopWhen(whenAll(intProducer(q, 10000), intProducer(q, 10000),
- intProducer(q, 10000), intProducer(q, 10000), ))
+ .stopWhen(whenAll(intProducer(q, 10_000), intProducer(q, 10_000),
+ intProducer(q, 10_000), intProducer(q, 10_000), ))
.syncWait
.value
- .should == 200020000;
+ .should == 200_020_000;
q.empty.should == true;
}
diff --git a/tests/ut/ut_runner.d b/tests/ut/ut_runner.d
index 6ff970d..8986fd2 100644
--- a/tests/ut/ut_runner.d
+++ b/tests/ut/ut_runner.d
@@ -17,5 +17,7 @@ int main(string[] args) {
"ut.concurrency.waitable",
"ut.concurrency.asyncscope",
"concurrency.timingwheels",
+ "concurrency.stoptoken",
+ "ut.concurrency.stoptoken",
);
}