Skip to content

Commit

Permalink
Format everything using sdfmt (#78)
Browse files Browse the repository at this point in the history
* Reformat with sdfmt

* Support reggae

Unfortunately it has to be using `dub run reggae -- --all-at-once`

* Format all tests using sdfmt
  • Loading branch information
skoppe authored Sep 4, 2023
1 parent bc84d98 commit fc18035
Show file tree
Hide file tree
Showing 76 changed files with 10,007 additions and 8,952 deletions.
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,10 @@ concurrency-test-*
*.obj
*.lst
.dsemver
/.reggae
/.ninja_*
/build.ninja
/compile_commands.json
/reggaefile.d
/rules.ninja
/ut
4 changes: 2 additions & 2 deletions dub.sdl
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ configuration "unittest" {
targetType "executable"
mainSourceFile "tests/ut/ut_runner.d"
dflags "-dip1000"
sourcePaths "source" "tests/ut"
importPaths "source" "tests/ut"
sourcePaths "source" "tests"
importPaths "source" "tests"
}
configuration "unittest-release" {
dependency "unit-threaded" version="*"
Expand Down
237 changes: 124 additions & 113 deletions source/concurrency/asyncscope.d
Original file line number Diff line number Diff line change
Expand Up @@ -4,128 +4,139 @@ import concurrency.stoptoken;
import concurrency.scheduler : NullScheduler;

private enum Flag {
locked = 0,
stopped = 1,
tick = 2
locked = 0,
stopped = 1,
tick = 2
}

auto asyncScope() @safe {
// ensure NRVO
auto as = shared AsyncScope(new shared StopSource());
return as;
// ensure NRVO
auto as = shared AsyncScope(new shared StopSource());
return as;
}

struct AsyncScope {
private:
import concurrency.bitfield : SharedBitField;
import concurrency.sender : Promise;

shared SharedBitField!Flag flag;
shared Promise!void completion;
shared StopSource stopSource;
Throwable throwable;

void forward() @trusted nothrow shared {
import core.atomic : atomicLoad;
auto t = throwable.atomicLoad();
if (t !is null)
completion.error(cast(Throwable)t);
else
completion.fulfill();
}

void complete() @safe nothrow shared {
auto newState = flag.sub(Flag.tick);
if (newState == 1) {
forward();
}
}

void setError(Throwable t) @trusted nothrow shared {
import core.atomic : cas;
cas(&throwable, cast(shared Throwable)null, cast(shared)t);
stop();
complete();
}
import concurrency.bitfield : SharedBitField;
import concurrency.sender : Promise;

shared SharedBitField!Flag flag;
shared Promise!void completion;
shared StopSource stopSource;
Throwable throwable;

void forward() @trusted nothrow shared {
import core.atomic : atomicLoad;
auto t = throwable.atomicLoad();
if (t !is null)
completion.error(cast(Throwable) t);
else
completion.fulfill();
}

void complete() @safe nothrow shared {
auto newState = flag.sub(Flag.tick);
if (newState == 1) {
forward();
}
}

void setError(Throwable t) @trusted nothrow shared {
import core.atomic : cas;
cas(&throwable, cast(shared Throwable) null, cast(shared) t);
stop();
complete();
}

public:
@disable this(ref return scope typeof(this) rhs);
@disable this(this);
@disable this();

~this() @safe shared {
import concurrency : syncWait;
import core.atomic : atomicLoad;
auto t = throwable.atomicLoad();
if (t !is null && (cast(shared(Exception))t) is null)
return;
if (!completion.isCompleted)
cleanup.syncWait();
}

this(shared StopSource stopSource) @safe shared {
completion = new shared Promise!void;
this.stopSource = stopSource;
}

auto cleanup() @safe shared {
stop();
return completion.sender();
}

bool stop() nothrow @trusted {
return (cast(shared)this).stop();
}

bool stop() nothrow @trusted shared {
import core.atomic : MemoryOrder;
if ((flag.load!(MemoryOrder.acq) & Flag.stopped) > 0)
return false;

auto newState = flag.add(Flag.stopped);
if (newState == 1) {
forward();
}
return stopSource.stop();
}

bool spawn(Sender)(Sender s) shared @trusted {
import concurrency.sender : connectHeap;
with (flag.update(0, Flag.tick)) {
if ((oldState & Flag.stopped) == 1) {
complete();
return false;
}
try {
s.connectHeap(AsyncScopeReceiver(&this)).start();
} catch (Throwable t) {
// we are required to catch the throwable here, otherwise
// the destructor will wait infinitely for something that
// no longer runs
// by calling setError we ensure the internal state is correct
setError(t);
throw t;
}
return true;
}
}
@disable
this(ref return scope typeof(this) rhs);
@disable
this(this);
@disable
this();

~this() @safe shared {
import concurrency : syncWait;
import core.atomic : atomicLoad;
auto t = throwable.atomicLoad();
if (t !is null && (cast(shared(Exception)) t) is null)
return;
if (!completion.isCompleted)
cleanup.syncWait();
}

this(shared StopSource stopSource) @safe shared {
completion = new shared Promise!void;
this.stopSource = stopSource;
}

auto cleanup() @safe shared {
stop();
return completion.sender();
}

bool stop() nothrow @trusted {
return (cast(shared) this).stop();
}

bool stop() nothrow @trusted shared {
import core.atomic : MemoryOrder;
if ((flag.load!(MemoryOrder.acq) & Flag.stopped) > 0)
return false;

auto newState = flag.add(Flag.stopped);
if (newState == 1) {
forward();
}

return stopSource.stop();
}

bool spawn(Sender)(Sender s) shared @trusted {
import concurrency.sender : connectHeap;
with (flag.update(0, Flag.tick)) {
if ((oldState & Flag.stopped) == 1) {
complete();
return false;
}

try {
s.connectHeap(AsyncScopeReceiver(&this)).start();
} catch (Throwable t) {
// we are required to catch the throwable here, otherwise
// the destructor will wait infinitely for something that
// no longer runs
// by calling setError we ensure the internal state is correct
setError(t);
throw t;
}

return true;
}
}
}

struct AsyncScopeReceiver {
private shared AsyncScope* s;
void setValue() nothrow @safe {
s.complete();
}
void setDone() nothrow @safe {
s.complete();
}
void setError(Throwable t) nothrow @safe {
s.setError(t);
}
auto getStopToken() nothrow @safe {
import concurrency.stoptoken : StopToken;
return StopToken(s.stopSource);
}
auto getScheduler() nothrow @safe {
return NullScheduler();
}
private shared AsyncScope* s;
void setValue() nothrow @safe {
s.complete();
}

void setDone() nothrow @safe {
s.complete();
}

void setError(Throwable t) nothrow @safe {
s.setError(t);
}

auto getStopToken() nothrow @safe {
import concurrency.stoptoken : StopToken;
return StopToken(s.stopSource);
}

auto getScheduler() nothrow @safe {
return NullScheduler();
}
}
Loading

0 comments on commit fc18035

Please sign in to comment.