From 6886d7e516c81fdc446501fd4a44fdbd19274f84 Mon Sep 17 00:00:00 2001 From: Sebastiaan Koppe Date: Sat, 14 Oct 2023 21:46:58 +0200 Subject: [PATCH] Make more using inplacestopcallbacks --- source/concurrency/operations/race.d | 9 +++++---- source/concurrency/operations/stopwhen.d | 9 +++++---- source/concurrency/operations/whenall.d | 9 +++++---- source/concurrency/operations/withstopsource.d | 13 ++++++++----- 4 files changed, 23 insertions(+), 17 deletions(-) diff --git a/source/concurrency/operations/race.d b/source/concurrency/operations/race.d index 6e7f158..830963b 100644 --- a/source/concurrency/operations/race.d +++ b/source/concurrency/operations/race.d @@ -89,9 +89,10 @@ private struct RaceOp(Receiver, Senders...) { return; } - state.cb = receiver.getStopToken().onStop( - cast(void delegate() nothrow @safe shared) &state.stop - ); // butt ugly cast, but it won't take the second overload + // butt ugly cast, but it won't take the second overload + state.cb = InPlaceStopCallback(cast(void delegate() nothrow @safe shared) &state.stop); + receiver.getStopToken().onStop(state.cb); + static if (Senders.length > 1) { foreach (i, _; Senders) { ops[i].start(); @@ -123,7 +124,7 @@ struct RaceSender(Senders...) private class State(Value) : StopSource { import concurrency.bitfield; - StopCallback cb; + InPlaceStopCallback cb; shared SharedBitField!Flags bitfield; static if (!is(Value == void)) Value value; diff --git a/source/concurrency/operations/stopwhen.d b/source/concurrency/operations/stopwhen.d index 3eb9ee5..85fca0c 100644 --- a/source/concurrency/operations/stopwhen.d +++ b/source/concurrency/operations/stopwhen.d @@ -42,9 +42,10 @@ private struct StopWhenOp(Receiver, Sender, Trigger) { return; } - state.cb = receiver.getStopToken().onStop( - cast(void delegate() nothrow @safe shared) &state.stop - ); // butt ugly cast, but it won't take the second overload + // butt ugly cast, but it won't take the second overload + state.cb = InPlaceStopCallback(cast(void delegate() nothrow @safe shared) &state.stop); + receiver.getStopToken().onStop(state.cb); + sourceOp.start; triggerOp.start; } @@ -66,7 +67,7 @@ struct StopWhenSender(Sender, Trigger) private class State(Value) : StopSource { import concurrency.bitfield; - StopCallback cb; + InPlaceStopCallback cb; shared SharedBitField!Flags bitfield; static if (!is(Value == void)) Value value; diff --git a/source/concurrency/operations/whenall.d b/source/concurrency/operations/whenall.d index f6708ae..c8c60ec 100644 --- a/source/concurrency/operations/whenall.d +++ b/source/concurrency/operations/whenall.d @@ -143,9 +143,10 @@ private struct WhenAllOp(Receiver, Senders...) { return; } - state.cb = receiver.getStopToken().onStop( - cast(void delegate() nothrow @safe shared) &state.stop - ); // butt ugly cast, but it won't take the second overload + // butt ugly cast, but it won't take the second overload + state.cb = InPlaceStopCallback(cast(void delegate() nothrow @safe shared) &state.stop); + receiver.getStopToken().onStop(state.cb); + static if (Senders.length > 1) { foreach (i, _; Senders) { ops[i].start(); @@ -179,7 +180,7 @@ struct WhenAllSender(Senders...) private class WhenAllState(Value) : StopSource { import concurrency.bitfield; - StopCallback cb; + InPlaceStopCallback cb; static if (is(typeof(Value.values))) Value value; Throwable exception; diff --git a/source/concurrency/operations/withstopsource.d b/source/concurrency/operations/withstopsource.d index f08c0bf..0bb8554 100644 --- a/source/concurrency/operations/withstopsource.d +++ b/source/concurrency/operations/withstopsource.d @@ -64,7 +64,7 @@ private struct SSReceiver(Receiver, Value) { struct SSState { shared InPlaceStopSource combinedStopSource; - StopCallback[2] cbs; + InPlaceStopCallback[2] cbs; } struct SSOp(Receiver, OuterStopSource, Sender) { @@ -77,15 +77,18 @@ struct SSOp(Receiver, OuterStopSource, Sender) { @disable this(this); this(Receiver receiver, OuterStopSource outerStopSource, Sender sender) @trusted { - state.cbs[0] = receiver.getStopToken().onStop(cast(void delegate() shared @safe nothrow)&state.combinedStopSource.stop); + state.cbs[0] = InPlaceStopCallback(cast(void delegate() shared @safe nothrow)&state.combinedStopSource.stop); + state.cbs[1] = InPlaceStopCallback(cast(void delegate() shared @safe nothrow)&state.combinedStopSource.stop); + + receiver.getStopToken().onStop(state.cbs[0]); static if (is(OuterStopSource == shared InPlaceStopSource*)) { - state.cbs[1] = onStop(*outerStopSource, cast(void delegate() shared @safe nothrow)&state.combinedStopSource.stop); + onStop(*outerStopSource, state.cbs[1]); } else { - state.cbs[1] = outerStopSource.onStop(cast(void delegate() shared @safe nothrow)&state.combinedStopSource.stop); + outerStopSource.onStop(state.cbs[1]); } try { - op = sender.connect(SSReceiver!(Receiver, Sender.Value)(receiver, &state)); + op = sender.connect(SSReceiver!(Receiver, Sender.Value)(receiver, &state)); } catch (Exception e) { state.cbs[0].dispose(); state.cbs[1].dispose();