Skip to content

Commit

Permalink
Add doFinally sender operator
Browse files Browse the repository at this point in the history
  • Loading branch information
skoppe committed Aug 21, 2024
1 parent d54e17a commit 35902c0
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 0 deletions.
57 changes: 57 additions & 0 deletions source/concurrency/operations/dofinally.d
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
module concurrency.operations.dofinally;

import concurrency;
import concurrency.receiver;
import concurrency.sender;
import concurrency.stoptoken;
import concepts;

/// runs a side-effect whenever the underlying sender completes
auto doFinally(Sender, SideEffect)(Sender sender, SideEffect effect) {
import concurrency.utils : isThreadSafeFunction;
static assert(isThreadSafeFunction!SideEffect);
return DoFinallySender!(Sender, SideEffect)(sender, effect);
}

private struct DoFinallyReceiver(Value, SideEffect, Receiver) {
Receiver receiver;
SideEffect sideEffect;
static if (is(Value == void))
// TODO: mustn't this be nothrow?
void setValue() @safe {
sideEffect();
receiver.setValue();
}

else
void setValue(Value value) @safe {
sideEffect();
receiver.setValue(value);
}

void setDone() @safe nothrow {
sideEffect();
receiver.setDone();
}

void setError(Throwable t) @safe nothrow {
sideEffect();
receiver.setError(t);
}

mixin ForwardExtensionPoints!receiver;
}

struct DoFinallySender(Sender, SideEffect) if (models!(Sender, isSender)) {
static assert(models!(typeof(this), isSender));
alias Value = Sender.Value;
Sender sender;
SideEffect effect;
auto connect(Receiver)(return Receiver receiver) @safe return scope {
// ensure NRVO
auto op = sender.connect(
DoFinallyReceiver!(Sender.Value, SideEffect, Receiver)(receiver,
effect));
return op;
}
}
1 change: 1 addition & 0 deletions source/concurrency/operations/package.d
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ public import concurrency.operations.oncompletion;
public import concurrency.operations.onresult;
public import concurrency.operations.repeat;
public import concurrency.operations.ignorevalue;
public import concurrency.operations.dofinally;
27 changes: 27 additions & 0 deletions tests/ut/concurrency/operations.d
Original file line number Diff line number Diff line change
Expand Up @@ -844,3 +844,30 @@ unittest {
unittest {
just(42).ignoreValue().syncWait.get!Completed.should == Completed();
}

@("doFinally.value") @safe
unittest {
import core.atomic : atomicOp;
shared int g = 0;
just(42).doFinally(() @safe shared => g.atomicOp!"+="(1)).syncWait
.assumeOk;
g.should == 1;
}

@("doFinally.done") @safe
unittest {
import core.atomic : atomicOp;
shared int g = 0;
DoneSender().doFinally(() @safe shared => g.atomicOp!"+="(1)).syncWait
.isCancelled.should == true;
g.should == 1;
}

@("doFinally.error") @safe
unittest {
import core.atomic : atomicOp;
shared int g = 0;
ThrowingSender().doFinally(() @safe shared => g.atomicOp!"+="(1))
.syncWait.isError.should == true;
g.should == 1;
}

0 comments on commit 35902c0

Please sign in to comment.