Skip to content

Commit

Permalink
Add letDone sender operator
Browse files Browse the repository at this point in the history
  • Loading branch information
skoppe committed Sep 1, 2024
1 parent c5a2c45 commit ab680d3
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 0 deletions.
116 changes: 116 additions & 0 deletions source/concurrency/operations/letdone.d
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
module concurrency.operations.letdone;

auto letDone(Sender, Fun)(Sender sender, Fun fun) {
import concurrency.utils;
// static assert(isThreadSafeCallable!Fun);

return LetDone!(Sender, Fun)(sender, fun);
}

struct LetDone(Sender, Fun) {
import std.traits : ReturnType;
alias FinalSender = ReturnType!(Fun);
alias Value = FinalSender.Value;

Sender sender;
Fun fun;

auto connect(Receiver)(return Receiver receiver) @safe return scope {
// ensure NRVO
auto op = LetDoneOp!(Sender, Fun, Receiver)(sender, fun, receiver);
return op;
}
}

struct LetDoneOp(Sender, Fun, Receiver) {
import std.traits : ReturnType;
import concurrency.sender : OpType;

alias OpA = OpType!(Sender, LetDoneReceiver!(Sender.Value, Receiver));
alias FinalSender = ReturnType!(Fun);
alias OpB = OpType!(FinalSender, Receiver);

static assert(is(Sender.Value == FinalSender.Value), "Both value types must be the same.");

Fun fun;

// LetDoneOp essentially has 2 states:
// 1) executing the input Sender
// 2) executing the Sender returned by Fun.
//
// The Senders each have an OperationalState, but only
// one is used at a time.
// Therefore we can put them inside a union and use a
// discriminator to tell which is active. It is important
// to manually destroy the correct OperationalState when this
// object itself goes out of scope.
//
// To avoid storing yet another member we use the `fun` as
// a discriminator. If it is `null` it means we are executing the
// second Sender.
// union {
OpA opA;
OpB opB;
// }

@disable
this(ref return scope typeof(this) rhs);
@disable
this(this);


@disable void opAssign(typeof(this) rhs) nothrow @safe @nogc;
@disable void opAssign(ref typeof(this) rhs) nothrow @safe @nogc;

this(return Sender sender, Fun fun, return Receiver receiver) @trusted return scope {
this.fun = fun;
opA = sender.connect(LetDoneReceiver!(Sender.Value, Receiver)(receiver, &next));
}

void start() @trusted nothrow scope {
opA.start();
}

void next(Receiver receiver) @trusted nothrow {
import concurrency.sender : emplaceOperationalState;
try {
auto sender = nextSender();
opB.emplaceOperationalState(sender, receiver);
} catch (Exception e) {
receiver.setError(e);
return;
}
opB.start();
}
private auto nextSender() @trusted {
auto localFun = fun;
fun = null;
return localFun();
}
}

struct LetDoneReceiver(Value, Receiver) {
Receiver receiver;
void delegate(Receiver) @trusted nothrow next;

static if (is(Value == void)) {
void setValue() @safe {
receiver.setValue();
}
} else {
void setValue(Value value) @safe {
receiver.setValue(value);
}
}

void setDone() @safe nothrow {
next(receiver);
}

void setError(Throwable e) @safe nothrow {
receiver.setError(e);
}

import concurrency.receiver;
mixin ForwardExtensionPoints!receiver;
}
1 change: 1 addition & 0 deletions source/concurrency/operations/package.d
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ public import concurrency.operations.repeat;
public import concurrency.operations.ignorevalue;
public import concurrency.operations.dofinally;
public import concurrency.operations.letvalue;
public import concurrency.operations.letdone;
62 changes: 62 additions & 0 deletions tests/ut/concurrency/operations.d
Original file line number Diff line number Diff line change
Expand Up @@ -978,3 +978,65 @@ unittest {
.isError
.should == true;
}

@("letDone.outer.void") @safe
unittest {
just(41)
.letDone(() => just(42))
.syncWait
.value
.should == 41;
}

@("letDone.outer.error") @safe
unittest {
shared int g = 0;

ThrowingSender()
.letDone(() @safe shared {
g = 1;
return VoidSender();
})
.syncWait
.isError
.should == true;

g.should == 0;
}

@("letDone.outer.done") @safe
unittest {
shared int g = 0;

DoneSender()
.letDone(() @safe shared {
g = 1;
return VoidSender();
})
.syncWait
.isCancelled
.should == false;

g.should == 1;
}

@("letDone.inner.error") @safe
unittest {
DoneSender()
.letDone(() => ThrowingSender())
.syncWait
.isError
.should == true;
}

@("letDone.inner.exception") @safe
unittest {
DoneSender()
.letDone(() {
throw new Exception("foobar");
return VoidSender();
})
.syncWait
.isError
.should == true;
}

0 comments on commit ab680d3

Please sign in to comment.