Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the StopToken/StopCallback and StopSource #89

Merged
merged 2 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest]
dc: [dmd-latest, ldc-latest, dmd-2.098.1, ldc-1.28.1]
dc: [dmd-latest, ldc-latest, dmd-2.105.3, ldc-1.35.0]
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v3
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Structured Concurrency

<img src="https://github.com/symmetryinvestments/concurrency/workflows/build/badge.svg"/>&nbsp;<img src="https://img.shields.io/badge/ldc%201.28.1+%20-supported-brightgreen"/>&nbsp;<img src="https://img.shields.io/badge/dmd%202.098.1+%20-supported-brightgreen"/>
<img src="https://github.com/symmetryinvestments/concurrency/workflows/build/badge.svg"/>&nbsp;<img src="https://img.shields.io/badge/ldc%201.35.0+%20-supported-brightgreen"/>&nbsp;<img src="https://img.shields.io/badge/dmd%202.105.3+%20-supported-brightgreen"/>
Provides various primitives useful for structured concurrency and async tasks.

## Senders/Receivers
Expand Down
12 changes: 6 additions & 6 deletions source/concurrency/asyncscope.d
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ private enum Flag {
}

auto asyncScope() @safe {
import concurrency.sender : Promise;
// ensure NRVO
auto as = shared AsyncScope(new shared StopSource());
auto as = shared AsyncScope(new shared Promise!void);
return as;
}

Expand Down Expand Up @@ -66,9 +67,8 @@ public:
cleanup.syncWait();
}

this(shared StopSource stopSource) @safe shared {
completion = new shared Promise!void;
this.stopSource = stopSource;
this(shared Promise!void completion) @safe shared {
this.completion = completion;
}

auto cleanup() @safe shared {
Expand All @@ -93,7 +93,7 @@ public:
return stopSource.stop();
}

bool spawn(Sender)(Sender s) shared @trusted {
bool spawn(Sender)(Sender s) @trusted shared {
import concurrency.sender : connectHeap;
with (flag.update(0, Flag.tick)) {
if ((oldState & Flag.stopped) == 1) {
Expand Down Expand Up @@ -133,7 +133,7 @@ struct AsyncScopeReceiver {

auto getStopToken() nothrow @safe {
import concurrency.stoptoken : StopToken;
return StopToken(s.stopSource);
return s.stopSource.token();
}

auto getScheduler() nothrow @safe {
Expand Down
2 changes: 1 addition & 1 deletion source/concurrency/data/queue/mpsc.d
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module concurrency.data.queue.mpsc;

struct MPSCQueueProducer(Node) {
private MPSCQueue!(Node) q;
void push(Node* node) shared @trusted {
void push(Node* node) @trusted shared {
(cast() q).push(node);
}
}
Expand Down
2 changes: 1 addition & 1 deletion source/concurrency/data/queue/waitable.d
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ struct WaitableQueueProducer(Q) {
private shared Q q;
private shared Semaphore sema;

bool push(Q.ElementType t) shared @trusted {
bool push(Q.ElementType t) @trusted shared {
bool r = (cast() q).push(t);
if (r)
(cast() sema).notify();
Expand Down
2 changes: 1 addition & 1 deletion source/concurrency/executor.d
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module concurrency.executor;

alias VoidFunction = void function() @safe;
alias VoidDelegate = void delegate() shared @safe;
alias VoidDelegate = void delegate() @safe shared;

interface Executor {
void execute(VoidFunction fn) @safe;
Expand Down
5 changes: 3 additions & 2 deletions source/concurrency/fork.d
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ version(Posix)

if (afterFork)
afterFork(pid);
auto cb = token.onStop(() @trusted shared nothrow =>
shared StopCallback cb;
cb.register(token, () @trusted shared nothrow =>
cast(void) kill(pid, SIGINT));
int status;
auto ret = waitpid(pid, &status, 0);
Expand Down Expand Up @@ -102,7 +103,7 @@ version(Posix)
import concurrency.utils : closure;

executeInNewThread(
cast(void delegate() shared @safe) &this.run);
cast(void delegate() @safe shared) &this.run);
}
}

Expand Down
66 changes: 36 additions & 30 deletions source/concurrency/nursery.d
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
module concurrency.nursery;

import concurrency.stoptoken : StopSource, StopToken, StopCallback, onStop;
import concurrency.stoptoken : StopSource, StopToken, StopCallback;
import concurrency.thread : LocalThreadExecutor;
import concurrency.receiver : getStopToken;
// import concurrency.receiver : getStopToken;
import concurrency.scheduler : SchedulerObjectBase;
import std.typecons : Nullable;

Expand All @@ -14,14 +14,15 @@ import std.typecons : Nullable;
/// When cancellation happens all Senders are waited on for completion.
/// Senders can be added to the Nursery at any time.
/// Senders are only started when the Nursery itself is being awaited on.
class Nursery : StopSource {
class Nursery {
import concurrency.sender : isSender, OperationalStateBase;
import core.sync.mutex : Mutex;
import concepts;
static assert(models!(typeof(this), isSender));

alias Value = void;
private {
shared StopSource stopSource;
Node[] operations;
struct Node {
OperationalStateBase state;
Expand All @@ -38,7 +39,7 @@ class Nursery : StopSource {
shared size_t counter = 0;
Throwable throwable; // first throwable from sender, if any
ReceiverObject receiver;
StopCallback stopCallback;
shared StopCallback stopCallback;
Nursery assumeThreadSafe() @trusted shared nothrow {
return cast(Nursery) this;
}
Expand All @@ -50,21 +51,22 @@ class Nursery : StopSource {
with (assumeThreadSafe) mutex = new Mutex();
}

override bool stop() nothrow @trusted {
auto result = super.stop();
bool stop() nothrow @trusted {
auto result = stopSource.stop();

if (result)
(cast(shared) this).done(-1);

return result;
}

override bool stop() nothrow @trusted shared {
bool stop() nothrow @trusted shared {
return (cast(Nursery) this).stop();
}

StopToken getStopToken() nothrow @trusted shared {
return StopToken(cast(Nursery) this);
shared(StopToken) getStopToken() nothrow @safe shared {
return stopSource.token();
// return shared StopToken(stopSource);
}

private auto getScheduler() nothrow @trusted shared {
Expand Down Expand Up @@ -94,17 +96,17 @@ class Nursery : StopSource {
if (isDone) {
throwable = null;
receiver = null;
if (stopCallback)
stopCallback.dispose();
stopCallback = null;
// if (stopCallback)
stopCallback.dispose();
// stopCallback = null;
}

mutex.unlock_nothrow();

if (isDone && localReceiver !is null) {
if (localThrowable !is null) {
localReceiver.setError(localThrowable);
} else if (isStopRequested()) {
} else if (stopSource.isStopRequested()) {
localReceiver.setDone();
} else {
try {
Expand All @@ -122,7 +124,7 @@ class Nursery : StopSource {
run(sender.get());
}

void run(Sender)(Sender sender) shared @trusted {
void run(Sender)(Sender sender) @trusted shared {
import concepts;
static assert(models!(Sender, isSender));
import std.typecons : Nullable;
Expand Down Expand Up @@ -161,7 +163,7 @@ class Nursery : StopSource {

auto connect(Receiver)(
return Receiver receiver
) shared @trusted return scope {
) @trusted shared return scope {
final class ReceiverImpl : ReceiverObject {
Receiver receiver;
SchedulerObjectBase scheduler;
Expand All @@ -187,21 +189,24 @@ class Nursery : StopSource {
scheduler = receiver.getScheduler().toSchedulerObject();
return scheduler;
}

shared(StopToken) getStopToken() nothrow @safe {
return receiver.getStopToken();
}
}

auto stopToken = receiver.getStopToken();
auto cb = (() @trusted => stopToken
.onStop(() shared nothrow @trusted => cast(void) this.stop()))();
return NurseryOp(this, cb, new ReceiverImpl(receiver));
return NurseryOp(this, new ReceiverImpl(receiver));
}

private
void setReceiver(ReceiverObject r, StopCallback cb) nothrow @safe shared {
void setReceiver(ReceiverObject r) nothrow @trusted shared {
with (assumeThreadSafe) {
mutex.lock_nothrow();
assert(this.receiver is null, "Cannot await a nursery twice.");
receiver = r;
stopCallback = cb;
auto dg = () nothrow @safe shared => cast(void) this.stop();
auto stopToken = r.getStopToken;
stopCallback.register(stopToken, dg);
auto ops = operations.dup();
mutex.unlock_nothrow();

Expand All @@ -217,6 +222,7 @@ private interface ReceiverObject {
void setDone() nothrow @safe;
void setError(Throwable e) nothrow @safe;
SchedulerObjectBase getScheduler() nothrow @safe;
shared(StopToken) getStopToken() nothrow @safe;
}

private struct NurseryReceiver(Value) {
Expand All @@ -228,15 +234,15 @@ private struct NurseryReceiver(Value) {
}

static if (is(Value == void)) {
void setValue() shared @safe {
void setValue() @safe shared {
(cast() this).setDone();
}

void setValue() @safe {
(cast() this).setDone();
}
} else {
void setValue(Value val) shared @trusted {
void setValue(Value val) @trusted shared {
(cast() this).setDone();
}

Expand Down Expand Up @@ -264,24 +270,24 @@ private struct NurseryReceiver(Value) {

private struct NurseryOp {
shared Nursery nursery;
StopCallback cb;
ReceiverObject receiver;
@disable
this(ref return scope typeof(this) rhs);
@disable
this(this);
this(return shared Nursery n, StopCallback cb,
ReceiverObject r) @safe return scope {
this(shared Nursery n,
ReceiverObject r) @safe {
nursery = n;
this.cb = cb;
// this.cb = cb;
receiver = r;
}

void start() nothrow scope @trusted {
import core.atomic : atomicLoad;
if (nursery.busy.atomicLoad == 0)
if (nursery.busy.atomicLoad == 0) {
// cb.dispose();
receiver.setDone();
else
nursery.setReceiver(receiver, cb);
} else
nursery.setReceiver(receiver);//, cb);
}
}
31 changes: 16 additions & 15 deletions source/concurrency/operations/race.d
Original file line number Diff line number Diff line change
Expand Up @@ -66,32 +66,32 @@ private struct RaceOp(Receiver, Senders...) {
this(ref return scope typeof(this) rhs);
this(Receiver receiver, return Senders senders,
bool noDropouts) @trusted scope {
state = State!R(noDropouts);
this.receiver = receiver;
state = new State!(R)(noDropouts);
static if (Senders.length > 1) {
foreach (i, Sender; Senders) {
ops[i] = senders[i].connect(
ElementReceiver!(Sender)(receiver, state, Senders.length));
ElementReceiver!(Sender)(receiver, &state, Senders.length));
}
} else {
ops.length = senders[0].length;
foreach (i; 0 .. senders[0].length) {
ops[i] = senders[0][i].connect(
ElementReceiver(receiver, state, senders[0].length));
ElementReceiver(receiver, &state, senders[0].length));
}
}
}

void start() @trusted nothrow scope {
import concurrency.stoptoken : StopSource;
if (receiver.getStopToken().isStopRequested) {
receiver.setDone();
return;
}

state.cb = receiver.getStopToken().onStop(
cast(void delegate() nothrow @safe shared) &state.stop
); // butt ugly cast, but it won't take the second overload
auto token = receiver.getStopToken();
// butt ugly cast, but it won't take the second overload
state.cb.register(token, cast(void delegate() nothrow @safe shared) &state.stopSource.stop);

static if (Senders.length > 1) {
foreach (i, _; Senders) {
ops[i].start();
Expand Down Expand Up @@ -121,9 +121,10 @@ struct RaceSender(Senders...)
}
}

private class State(Value) : StopSource {
private struct State(Value) {
import concurrency.bitfield;
StopCallback cb;
shared StopSource stopSource;
shared StopCallback cb;
shared SharedBitField!Flags bitfield;
static if (!is(Value == void))
Value value;
Expand All @@ -148,10 +149,10 @@ private enum Counter : size_t {
private struct RaceReceiver(Receiver, InnerValue, Value) {
import core.atomic : atomicOp, atomicLoad, MemoryOrder;
Receiver receiver;
State!(Value) state;
State!(Value)* state;
size_t senderCount;
auto getStopToken() {
return StopToken(state);
return state.stopSource.token();
}

private bool isValueProduced(size_t state) {
Expand All @@ -176,7 +177,7 @@ private struct RaceReceiver(Receiver, InnerValue, Value) {
else
state.value = Value(value);
release(); // must release before calling .stop
state.stop();
state.stopSource.stop();
} else
release();

Expand All @@ -190,7 +191,7 @@ private struct RaceReceiver(Receiver, InnerValue, Value) {
with (state.bitfield.update(Flags.value_produced, Counter.tick)) {
bool last = isLast(newState);
if (!isValueProduced(oldState)) {
state.stop();
state.stopSource.stop();
}

if (last)
Expand All @@ -202,7 +203,7 @@ private struct RaceReceiver(Receiver, InnerValue, Value) {
with (state.bitfield.update(Flags.doneOrError_produced, Counter.tick)) {
bool last = isLast(newState);
if (state.noDropouts && !isDoneOrErrorProduced(oldState)) {
state.stop();
state.stopSource.stop();
}

if (last)
Expand All @@ -217,7 +218,7 @@ private struct RaceReceiver(Receiver, InnerValue, Value) {
state.exception = exception;
if (state.noDropouts) {
release(); // release before stop
state.stop();
state.stopSource.stop();
}
}

Expand Down
Loading
Loading