Skip to content

Commit

Permalink
Introduce inplacestopcallback
Browse files Browse the repository at this point in the history
  • Loading branch information
skoppe committed Sep 25, 2023
1 parent f80ac7e commit 4cdff17
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 18 deletions.
62 changes: 46 additions & 16 deletions source/concurrency/stoptoken.d
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,11 @@ StopCallback onStop(StopToken)(
StopToken stopToken,
void delegate() nothrow @safe shared callback
) nothrow @safe {
if (stopToken.isStopPossible) {
return (*stopToken.state).onStop(new StopCallback(callback));
}
auto cb = new StopCallback(callback);

onStop(stopToken, cb);

return new StopCallback(callback);
return cb;
}

StopCallback onStop(StopToken)(
Expand All @@ -140,22 +140,35 @@ StopCallback onStop(StopToken)(
StopCallback onStop(StopToken)(StopToken stopToken,
StopCallback cb) nothrow @safe {
if (stopToken.isStopPossible) {
return stopToken.state.onStop(cb);
stopToken.onStop(cb.callback);
}
return cb;
}

void onStop(StopToken)(StopToken stopToken,
ref InPlaceStopCallback cb) nothrow @safe {
if (stopToken.isStopPossible) {
(*stopToken.state).onStop(cb);
}
}

StopCallback onStop(StopSource stopSource, StopCallback cb) nothrow @safe {
return onStop(stopSource.source.state, cb);
onStop(stopSource.source.state, cb.callback);
return cb;
}

void onStop(StopSource stopSource, ref InPlaceStopCallback cb) nothrow @safe {
onStop(stopSource.source.state, cb);
}

StopCallback onStop(ref stop_state state, StopCallback cb) nothrow @trusted { // TODO: @safe
void onStop(ref stop_state state, ref InPlaceStopCallback cb) nothrow @trusted { // TODO: @safe
if (state.try_add_callback(cb, true))
cb.state = &state;
return cb;
}

class StopCallback {
struct InPlaceStopCallback {
@disable this(ref return scope typeof(this) rhs);

void dispose() nothrow @trusted @nogc {
import core.atomic : cas;

Expand Down Expand Up @@ -190,8 +203,8 @@ private:
void delegate() nothrow shared @safe callback;
stop_state* state;

StopCallback next_ = null;
StopCallback* prev_ = null;
InPlaceStopCallback* next_ = null;
InPlaceStopCallback** prev_ = null;
bool* isRemoved_ = null;
shared bool callbackFinishedExecuting = false;

Expand All @@ -200,6 +213,23 @@ private:
}
}

class StopCallback {
this(void delegate() nothrow shared @safe callback) nothrow @safe @nogc {
this.callback = InPlaceStopCallback(callback);
}

void dispose() nothrow @trusted @nogc {
callback.dispose();
}

void dispose() shared nothrow @trusted @nogc {
callback.dispose();
}
private:

InPlaceStopCallback callback;
}

private void spin_yield() nothrow @trusted @nogc {
// TODO: could use the pause asm instruction
// it is available in LDC as intrinsic... but not in DMD
Expand Down Expand Up @@ -321,8 +351,8 @@ public:
return is_stop_requestable(state_.atomicLoad!(MemoryOrder.acq));
}

bool try_add_callback(StopCallback cb,
bool incrementRefCountIfSuccessful) nothrow @safe {
bool try_add_callback(ref InPlaceStopCallback cb,
bool incrementRefCountIfSuccessful) nothrow @trusted {
ulong oldState;
do {
goto load_state;
Expand Down Expand Up @@ -350,7 +380,7 @@ public:
() @trusted {
cb.prev_ = &head_;
}();
head_ = cb;
head_ = &cb;

if (incrementRefCountIfSuccessful) {
unlock_and_increment_token_ref_count();
Expand All @@ -362,7 +392,7 @@ public:
return true;
}

void remove_callback(StopCallback cb) nothrow @safe @nogc {
void remove_callback(ref InPlaceStopCallback cb) nothrow @safe @nogc {
lock();

if (cb.prev_ !is null) {
Expand Down Expand Up @@ -471,6 +501,6 @@ private:
// bits 2-32 - token ref count (31 bits)
// bits 33-63 - source ref count (31 bits)
shared ulong state_ = source_ref_increment;
StopCallback head_ = null;
InPlaceStopCallback* head_ = null;
Thread signallingThread_;
}
5 changes: 3 additions & 2 deletions source/concurrency/syncwait.d
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,11 @@ auto syncWait(Sender, StopSource)(auto ref Sender sender,
auto syncWait(Sender)(auto scope ref Sender sender) {
import concurrency.signal : globalStopSource;
auto childStopSource = new shared StopSource();
StopToken parentStopToken = StopToken(globalStopSource);
StopCallback cb = parentStopToken.onStop(() shared {
auto cb = InPlaceStopCallback(() shared {
childStopSource.stop();
});
StopToken parentStopToken = StopToken(globalStopSource);
// parentStopToken.onStop(cb);
auto result =
syncWaitImpl(sender, (() @trusted => cast() childStopSource)());
// detach stopSource
Expand Down

0 comments on commit 4cdff17

Please sign in to comment.