Skip to content

Commit

Permalink
Make more using inplacestopcallbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
skoppe committed Oct 14, 2023
1 parent 6300393 commit 6886d7e
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 17 deletions.
9 changes: 5 additions & 4 deletions source/concurrency/operations/race.d
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,10 @@ private struct RaceOp(Receiver, Senders...) {
return;
}

state.cb = receiver.getStopToken().onStop(
cast(void delegate() nothrow @safe shared) &state.stop
); // butt ugly cast, but it won't take the second overload
// butt ugly cast, but it won't take the second overload
state.cb = InPlaceStopCallback(cast(void delegate() nothrow @safe shared) &state.stop);
receiver.getStopToken().onStop(state.cb);

static if (Senders.length > 1) {
foreach (i, _; Senders) {
ops[i].start();
Expand Down Expand Up @@ -123,7 +124,7 @@ struct RaceSender(Senders...)

private class State(Value) : StopSource {
import concurrency.bitfield;
StopCallback cb;
InPlaceStopCallback cb;
shared SharedBitField!Flags bitfield;
static if (!is(Value == void))
Value value;
Expand Down
9 changes: 5 additions & 4 deletions source/concurrency/operations/stopwhen.d
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ private struct StopWhenOp(Receiver, Sender, Trigger) {
return;
}

state.cb = receiver.getStopToken().onStop(
cast(void delegate() nothrow @safe shared) &state.stop
); // butt ugly cast, but it won't take the second overload
// butt ugly cast, but it won't take the second overload
state.cb = InPlaceStopCallback(cast(void delegate() nothrow @safe shared) &state.stop);
receiver.getStopToken().onStop(state.cb);

sourceOp.start;
triggerOp.start;
}
Expand All @@ -66,7 +67,7 @@ struct StopWhenSender(Sender, Trigger)

private class State(Value) : StopSource {
import concurrency.bitfield;
StopCallback cb;
InPlaceStopCallback cb;
shared SharedBitField!Flags bitfield;
static if (!is(Value == void))
Value value;
Expand Down
9 changes: 5 additions & 4 deletions source/concurrency/operations/whenall.d
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,10 @@ private struct WhenAllOp(Receiver, Senders...) {
return;
}

state.cb = receiver.getStopToken().onStop(
cast(void delegate() nothrow @safe shared) &state.stop
); // butt ugly cast, but it won't take the second overload
// butt ugly cast, but it won't take the second overload
state.cb = InPlaceStopCallback(cast(void delegate() nothrow @safe shared) &state.stop);
receiver.getStopToken().onStop(state.cb);

static if (Senders.length > 1) {
foreach (i, _; Senders) {
ops[i].start();
Expand Down Expand Up @@ -179,7 +180,7 @@ struct WhenAllSender(Senders...)

private class WhenAllState(Value) : StopSource {
import concurrency.bitfield;
StopCallback cb;
InPlaceStopCallback cb;
static if (is(typeof(Value.values)))
Value value;
Throwable exception;
Expand Down
13 changes: 8 additions & 5 deletions source/concurrency/operations/withstopsource.d
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ private struct SSReceiver(Receiver, Value) {

struct SSState {
shared InPlaceStopSource combinedStopSource;
StopCallback[2] cbs;
InPlaceStopCallback[2] cbs;
}

struct SSOp(Receiver, OuterStopSource, Sender) {
Expand All @@ -77,15 +77,18 @@ struct SSOp(Receiver, OuterStopSource, Sender) {
@disable
this(this);
this(Receiver receiver, OuterStopSource outerStopSource, Sender sender) @trusted {
state.cbs[0] = receiver.getStopToken().onStop(cast(void delegate() shared @safe nothrow)&state.combinedStopSource.stop);
state.cbs[0] = InPlaceStopCallback(cast(void delegate() shared @safe nothrow)&state.combinedStopSource.stop);
state.cbs[1] = InPlaceStopCallback(cast(void delegate() shared @safe nothrow)&state.combinedStopSource.stop);

receiver.getStopToken().onStop(state.cbs[0]);
static if (is(OuterStopSource == shared InPlaceStopSource*)) {
state.cbs[1] = onStop(*outerStopSource, cast(void delegate() shared @safe nothrow)&state.combinedStopSource.stop);
onStop(*outerStopSource, state.cbs[1]);
} else {
state.cbs[1] = outerStopSource.onStop(cast(void delegate() shared @safe nothrow)&state.combinedStopSource.stop);
outerStopSource.onStop(state.cbs[1]);
}

try {
op = sender.connect(SSReceiver!(Receiver, Sender.Value)(receiver, &state));
op = sender.connect(SSReceiver!(Receiver, Sender.Value)(receiver, &state));
} catch (Exception e) {
state.cbs[0].dispose();
state.cbs[1].dispose();
Expand Down

0 comments on commit 6886d7e

Please sign in to comment.