Skip to content

Commit

Permalink
Trying to get connectHeap to play nice
Browse files Browse the repository at this point in the history
Normally the state object, because it is on the stack, is destroyed
somewhere _after_ the receiver is called. With `connectHeap` this is
not the case, since the state lives on the heap.

Instead we call destroy manually to simulate this.

Also note that connectHeap can probably be used to escape references to
scoped sender/receivers, so its a hodgepot.

Anyway, its not working...
  • Loading branch information
skoppe committed Aug 21, 2024
1 parent 142e544 commit 12c0ef4
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 28 deletions.
4 changes: 2 additions & 2 deletions source/concurrency/operations/forwardon.d
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ private struct ForwardOnReceiver(Receiver, Value, Scheduler) {
}

void setDone() @safe nothrow {
DoneSender().via(scheduler.schedule()).connectHeap(receiver).start();
TypedDoneSender!(Value)().via(scheduler.schedule()).connectHeap(receiver).start();
}

void setError(Throwable e) @safe nothrow {
ErrorSender(e).via(scheduler.schedule()).connectHeap(receiver).start();
TypedErrorSender!(Value)(e).via(scheduler.schedule()).connectHeap(receiver).start();
}

mixin ForwardExtensionPoints!receiver;
Expand Down
77 changes: 51 additions & 26 deletions source/concurrency/sender.d
Original file line number Diff line number Diff line change
Expand Up @@ -315,27 +315,27 @@ struct ConnectHeapReceiver(Receiver, Value) {
static if (is(Value == void)) {
void setValue() @safe {
Receiver local = receiver;
destroyState();
local.setValue();
destroyState();
}
} else{
} else {
void setValue(Value v) @safe {
Receiver local = receiver;
destroyState();
local.setValue(v);
destroyState();
}
}

void setDone() nothrow @safe {
Receiver local = receiver;
destroyState();
local.setDone();
destroyState();
}

void setError(Throwable e) nothrow @safe {
Receiver local = receiver;
destroyState();
local.setError(e);
destroyState();
}

import concurrency.receiver : ForwardExtensionPoints;
Expand Down Expand Up @@ -402,24 +402,36 @@ struct ThrowingSender {
struct DoneSender {
static assert(models!(typeof(this), isSender));
alias Value = void;
static struct DoneOp(Receiver) {
Receiver receiver;
@disable
this(ref return scope typeof(this) rhs);
@disable
this(this);
void start() nothrow @trusted scope {
receiver.setDone();
}

auto connect(Receiver)(return Receiver receiver) @safe return scope {
// ensure NRVO
auto op = TypedDoneSender!(void)().connect(receiver);
return op;
}
}

struct TypedDoneSender(T) {
static assert(models!(typeof(this), isSender));
alias Value = T;

auto connect(Receiver)(return Receiver receiver) @safe return scope {
// ensure NRVO
auto op = DoneOp!(Receiver)(receiver);
auto op = TypedDoneOp!(Receiver)(receiver);
return op;
}
}

struct TypedDoneOp(Receiver) {
Receiver receiver;
@disable
this(ref return scope typeof(this) rhs);
@disable
this(this);
void start() nothrow @trusted scope {
receiver.setDone();
}
}

/// A sender that always calls setValue with no args
struct VoidSender {
static assert(models!(typeof(this), isSender));
Expand Down Expand Up @@ -448,25 +460,38 @@ struct ErrorSender {
static assert(models!(typeof(this), isSender));
alias Value = void;
Throwable exception;
static struct ErrorOp(Receiver) {
Receiver receiver;
Throwable exception;
@disable
this(ref return scope typeof(this) rhs);
@disable
this(this);
void start() nothrow @trusted scope {
receiver.setError(exception);
}

auto connect(Receiver)(return Receiver receiver) @safe return scope {
// ensure NRVO
auto op = TypedErrorSender!(void)(exception).connect(receiver);
return op;
}
}

/// A sender that always calls setError
struct TypedErrorSender(T) {
static assert(models!(typeof(this), isSender));
alias Value = T;
Throwable exception;
auto connect(Receiver)(return Receiver receiver) @safe return scope {
// ensure NRVO
auto op = ErrorOp!(Receiver)(receiver, exception);
auto op = TypedErrorOp!(Receiver)(receiver, exception);
return op;
}
}

struct TypedErrorOp(Receiver) {
Receiver receiver;
Throwable exception;
@disable
this(ref return scope typeof(this) rhs);
@disable
this(this);
void start() nothrow @trusted scope {
receiver.setError(exception);
}
}

template OpType(Sender, Receiver) {
static if (is(Sender.Op)) {
alias OpType = Sender.Op;
Expand Down

0 comments on commit 12c0ef4

Please sign in to comment.