Skip to content

Commit

Permalink
WIP for pushing to phobos
Browse files Browse the repository at this point in the history
  • Loading branch information
skoppe committed Mar 12, 2024
1 parent 6886d7e commit c83e52c
Show file tree
Hide file tree
Showing 41 changed files with 1,099 additions and 682 deletions.
12 changes: 6 additions & 6 deletions source/concurrency/asyncscope.d
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ private enum Flag {
}

auto asyncScope() @safe {
import concurrency.sender : Promise;
// ensure NRVO
auto as = shared AsyncScope(new shared StopSource());
auto as = shared AsyncScope(new shared Promise!void);
return as;
}

Expand Down Expand Up @@ -66,9 +67,8 @@ public:
cleanup.syncWait();
}

this(shared StopSource stopSource) @safe shared {
completion = new shared Promise!void;
this.stopSource = stopSource;
this(shared Promise!void completion) @safe shared {
this.completion = completion;
}

auto cleanup() @safe shared {
Expand All @@ -93,7 +93,7 @@ public:
return stopSource.stop();
}

bool spawn(Sender)(Sender s) shared @trusted {
bool spawn(Sender)(Sender s) @trusted shared {
import concurrency.sender : connectHeap;
with (flag.update(0, Flag.tick)) {
if ((oldState & Flag.stopped) == 1) {
Expand Down Expand Up @@ -133,7 +133,7 @@ struct AsyncScopeReceiver {

auto getStopToken() nothrow @safe {
import concurrency.stoptoken : StopToken;
return StopToken(s.stopSource);
return s.stopSource.token();
}

auto getScheduler() nothrow @safe {
Expand Down
2 changes: 1 addition & 1 deletion source/concurrency/data/queue/mpsc.d
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module concurrency.data.queue.mpsc;

struct MPSCQueueProducer(Node) {
private MPSCQueue!(Node) q;
void push(Node* node) shared @trusted {
void push(Node* node) @trusted shared {
(cast() q).push(node);
}
}
Expand Down
2 changes: 1 addition & 1 deletion source/concurrency/data/queue/waitable.d
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ struct WaitableQueueProducer(Q) {
private shared Q q;
private shared Semaphore sema;

bool push(Q.ElementType t) shared @trusted {
bool push(Q.ElementType t) @trusted shared {
bool r = (cast() q).push(t);
if (r)
(cast() sema).notify();
Expand Down
2 changes: 1 addition & 1 deletion source/concurrency/executor.d
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module concurrency.executor;

alias VoidFunction = void function() @safe;
alias VoidDelegate = void delegate() shared @safe;
alias VoidDelegate = void delegate() @safe shared;

interface Executor {
void execute(VoidFunction fn) @safe;
Expand Down
5 changes: 3 additions & 2 deletions source/concurrency/fork.d
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ version(Posix)

if (afterFork)
afterFork(pid);
auto cb = token.onStop(() @trusted shared nothrow =>
shared StopCallback cb;
cb.register(token, () @trusted shared nothrow =>
cast(void) kill(pid, SIGINT));
int status;
auto ret = waitpid(pid, &status, 0);
Expand Down Expand Up @@ -102,7 +103,7 @@ version(Posix)
import concurrency.utils : closure;

executeInNewThread(
cast(void delegate() shared @safe) &this.run);
cast(void delegate() @safe shared) &this.run);
}
}

Expand Down
63 changes: 34 additions & 29 deletions source/concurrency/nursery.d
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
module concurrency.nursery;

import concurrency.stoptoken : StopSource, StopToken, StopCallback, onStop;
import concurrency.stoptoken : StopSource, StopToken, StopCallback;
import concurrency.thread : LocalThreadExecutor;
import concurrency.receiver : getStopToken;
// import concurrency.receiver : getStopToken;
import concurrency.scheduler : SchedulerObjectBase;
import std.typecons : Nullable;

Expand All @@ -14,14 +14,15 @@ import std.typecons : Nullable;
/// When cancellation happens all Senders are waited on for completion.
/// Senders can be added to the Nursery at any time.
/// Senders are only started when the Nursery itself is being awaited on.
class Nursery : StopSource {
class Nursery {
import concurrency.sender : isSender, OperationalStateBase;
import core.sync.mutex : Mutex;
import concepts;
static assert(models!(typeof(this), isSender));

alias Value = void;
private {
shared StopSource stopSource;
Node[] operations;
struct Node {
OperationalStateBase state;
Expand All @@ -38,7 +39,7 @@ class Nursery : StopSource {
shared size_t counter = 0;
Throwable throwable; // first throwable from sender, if any
ReceiverObject receiver;
StopCallback stopCallback;
shared StopCallback stopCallback;
Nursery assumeThreadSafe() @trusted shared nothrow {
return cast(Nursery) this;
}
Expand All @@ -50,21 +51,22 @@ class Nursery : StopSource {
with (assumeThreadSafe) mutex = new Mutex();
}

override bool stop() nothrow @trusted {
auto result = super.stop();
bool stop() nothrow @trusted {
auto result = stopSource.stop();

if (result)
(cast(shared) this).done(-1);

return result;
}

override bool stop() nothrow @trusted shared {
bool stop() nothrow @trusted shared {
return (cast(Nursery) this).stop();
}

StopToken getStopToken() nothrow @trusted shared {
return StopToken(cast(Nursery) this);
shared(StopToken) getStopToken() nothrow @safe shared {
return stopSource.token();
// return shared StopToken(stopSource);
}

private auto getScheduler() nothrow @trusted shared {
Expand Down Expand Up @@ -94,17 +96,17 @@ class Nursery : StopSource {
if (isDone) {
throwable = null;
receiver = null;
if (stopCallback)
stopCallback.dispose();
stopCallback = null;
// if (stopCallback)
stopCallback.dispose();
// stopCallback = null;
}

mutex.unlock_nothrow();

if (isDone && localReceiver !is null) {
if (localThrowable !is null) {
localReceiver.setError(localThrowable);
} else if (isStopRequested()) {
} else if (stopSource.isStopRequested()) {
localReceiver.setDone();
} else {
try {
Expand All @@ -122,7 +124,7 @@ class Nursery : StopSource {
run(sender.get());
}

void run(Sender)(Sender sender) shared @trusted {
void run(Sender)(Sender sender) @trusted shared {
import concepts;
static assert(models!(Sender, isSender));
import std.typecons : Nullable;
Expand Down Expand Up @@ -161,7 +163,7 @@ class Nursery : StopSource {

auto connect(Receiver)(
return Receiver receiver
) shared @trusted return scope {
) @trusted shared return scope {
final class ReceiverImpl : ReceiverObject {
Receiver receiver;
SchedulerObjectBase scheduler;
Expand All @@ -187,21 +189,24 @@ class Nursery : StopSource {
scheduler = receiver.getScheduler().toSchedulerObject();
return scheduler;
}

shared(StopToken) getStopToken() nothrow @safe {
return receiver.getStopToken();
}
}

auto stopToken = receiver.getStopToken();
auto cb = (() @trusted => stopToken
.onStop(() shared nothrow @trusted => cast(void) this.stop()))();
return NurseryOp(this, cb, new ReceiverImpl(receiver));
return NurseryOp(this, new ReceiverImpl(receiver));
}

private
void setReceiver(ReceiverObject r, StopCallback cb) nothrow @safe shared {
void setReceiver(ReceiverObject r) nothrow @trusted shared {
with (assumeThreadSafe) {
mutex.lock_nothrow();
assert(this.receiver is null, "Cannot await a nursery twice.");
receiver = r;
stopCallback = cb;
auto dg = () nothrow @safe shared => cast(void) this.stop();
auto stopToken = r.getStopToken;
stopCallback.register(stopToken, dg);
auto ops = operations.dup();
mutex.unlock_nothrow();

Expand All @@ -217,6 +222,7 @@ private interface ReceiverObject {
void setDone() nothrow @safe;
void setError(Throwable e) nothrow @safe;
SchedulerObjectBase getScheduler() nothrow @safe;
shared(StopToken) getStopToken() nothrow @safe;
}

private struct NurseryReceiver(Value) {
Expand All @@ -228,15 +234,15 @@ private struct NurseryReceiver(Value) {
}

static if (is(Value == void)) {
void setValue() shared @safe {
void setValue() @safe shared {
(cast() this).setDone();
}

void setValue() @safe {
(cast() this).setDone();
}
} else {
void setValue(Value val) shared @trusted {
void setValue(Value val) @trusted shared {
(cast() this).setDone();
}

Expand Down Expand Up @@ -264,25 +270,24 @@ private struct NurseryReceiver(Value) {

private struct NurseryOp {
shared Nursery nursery;
StopCallback cb;
ReceiverObject receiver;
@disable
this(ref return scope typeof(this) rhs);
@disable
this(this);
this(return shared Nursery n, StopCallback cb,
ReceiverObject r) @safe return scope {
this(shared Nursery n,
ReceiverObject r) @safe {
nursery = n;
this.cb = cb;
// this.cb = cb;
receiver = r;
}

void start() nothrow scope @trusted {
import core.atomic : atomicLoad;
if (nursery.busy.atomicLoad == 0) {
cb.dispose();
// cb.dispose();
receiver.setDone();
} else
nursery.setReceiver(receiver, cb);
nursery.setReceiver(receiver);//, cb);
}
}
28 changes: 14 additions & 14 deletions source/concurrency/operations/race.d
Original file line number Diff line number Diff line change
Expand Up @@ -66,32 +66,31 @@ private struct RaceOp(Receiver, Senders...) {
this(ref return scope typeof(this) rhs);
this(Receiver receiver, return Senders senders,
bool noDropouts) @trusted scope {
state = State!R(noDropouts);
this.receiver = receiver;
state = new State!(R)(noDropouts);
static if (Senders.length > 1) {
foreach (i, Sender; Senders) {
ops[i] = senders[i].connect(
ElementReceiver!(Sender)(receiver, state, Senders.length));
ElementReceiver!(Sender)(receiver, &state, Senders.length));
}
} else {
ops.length = senders[0].length;
foreach (i; 0 .. senders[0].length) {
ops[i] = senders[0][i].connect(
ElementReceiver(receiver, state, senders[0].length));
ElementReceiver(receiver, &state, senders[0].length));
}
}
}

void start() @trusted nothrow scope {
import concurrency.stoptoken : StopSource;
if (receiver.getStopToken().isStopRequested) {
receiver.setDone();
return;
}

auto token = receiver.getStopToken();
// butt ugly cast, but it won't take the second overload
state.cb = InPlaceStopCallback(cast(void delegate() nothrow @safe shared) &state.stop);
receiver.getStopToken().onStop(state.cb);
state.cb.register(token, cast(void delegate() nothrow @safe shared) &state.stopSource.stop);

static if (Senders.length > 1) {
foreach (i, _; Senders) {
Expand Down Expand Up @@ -122,9 +121,10 @@ struct RaceSender(Senders...)
}
}

private class State(Value) : StopSource {
private struct State(Value) {
import concurrency.bitfield;
InPlaceStopCallback cb;
shared StopSource stopSource;
shared StopCallback cb;
shared SharedBitField!Flags bitfield;
static if (!is(Value == void))
Value value;
Expand All @@ -149,10 +149,10 @@ private enum Counter : size_t {
private struct RaceReceiver(Receiver, InnerValue, Value) {
import core.atomic : atomicOp, atomicLoad, MemoryOrder;
Receiver receiver;
State!(Value) state;
State!(Value)* state;
size_t senderCount;
auto getStopToken() {
return StopToken(state);
return state.stopSource.token();
}

private bool isValueProduced(size_t state) {
Expand All @@ -177,7 +177,7 @@ private struct RaceReceiver(Receiver, InnerValue, Value) {
else
state.value = Value(value);
release(); // must release before calling .stop
state.stop();
state.stopSource.stop();
} else
release();

Expand All @@ -191,7 +191,7 @@ private struct RaceReceiver(Receiver, InnerValue, Value) {
with (state.bitfield.update(Flags.value_produced, Counter.tick)) {
bool last = isLast(newState);
if (!isValueProduced(oldState)) {
state.stop();
state.stopSource.stop();
}

if (last)
Expand All @@ -203,7 +203,7 @@ private struct RaceReceiver(Receiver, InnerValue, Value) {
with (state.bitfield.update(Flags.doneOrError_produced, Counter.tick)) {
bool last = isLast(newState);
if (state.noDropouts && !isDoneOrErrorProduced(oldState)) {
state.stop();
state.stopSource.stop();
}

if (last)
Expand All @@ -218,7 +218,7 @@ private struct RaceReceiver(Receiver, InnerValue, Value) {
state.exception = exception;
if (state.noDropouts) {
release(); // release before stop
state.stop();
state.stopSource.stop();
}
}

Expand Down
Loading

0 comments on commit c83e52c

Please sign in to comment.