Skip to content

Commit

Permalink
Destroy connectHeap's state when completed
Browse files Browse the repository at this point in the history
  • Loading branch information
skoppe committed Aug 21, 2024
1 parent 958a158 commit 142e544
Showing 1 changed file with 57 additions and 3 deletions.
60 changes: 57 additions & 3 deletions source/concurrency/sender.d
Original file line number Diff line number Diff line change
Expand Up @@ -277,19 +277,71 @@ interface OperationalStateBase {
/// calls connect on the Sender but stores the OperationState on the heap
OperationalStateBase connectHeap(Sender, Receiver)(Sender sender,
Receiver receiver) @safe {
alias State = typeof(sender.connect(receiver));
alias State = typeof(sender.connect(ConnectHeapReceiver!(Receiver, Sender.Value).init));
return new class(sender, receiver) OperationalStateBase {
State state;
// TODO: this is unsafe and allows returning an object with
// unlimited lifetime containing scoped senders and receivers.
// This object can consequently be returned and it would violate
// all kinds of guarantees by dip1000 and @safe
this(return Sender sender, return Receiver receiver) @trusted {
state = sender.connect(receiver);
state = sender.connect(ConnectHeapReceiver!(Receiver, Sender.Value)(&kill, receiver));
}

void start() @safe nothrow {
state.start();
}

void kill() @safe nothrow {
state.destroy();
}
};
}

// NOTE the destroy state is necessary in order to clean up
// any potential resources therein

// Although the question does pose itself
// how was I able to circumvent the typesystem
// The Sender that I am connecting with in connectHeap
// was able to produce an operational state that held
// on to the stopstate longer than said stopstate

// !!! Its because of the @trusted on the this of the connectHeap !!!
struct ConnectHeapReceiver(Receiver, Value) {
void delegate() @safe nothrow destroyState;
Receiver receiver;

static if (is(Value == void)) {
void setValue() @safe {
Receiver local = receiver;
destroyState();
local.setValue();
}
} else{
void setValue(Value v) @safe {
Receiver local = receiver;
destroyState();
local.setValue(v);
}
}

void setDone() nothrow @safe {
Receiver local = receiver;
destroyState();
local.setDone();
}

void setError(Throwable e) nothrow @safe {
Receiver local = receiver;
destroyState();
local.setError(e);
}

import concurrency.receiver : ForwardExtensionPoints;
mixin ForwardExtensionPoints!receiver;
}

/// A class extending from SenderObjectBase that wraps any Sender
class SenderObjectImpl(Sender) : SenderObjectBase!(Sender.Value) {
import concurrency.receiver : ReceiverObjectBase;
Expand All @@ -304,7 +356,9 @@ class SenderObjectImpl(Sender) : SenderObjectBase!(Sender.Value) {
) @trusted scope {
auto state = sender.connectHeap(receiver);
return
OperationObject(cast(typeof(OperationObject._start)) &state.start);
OperationObject(
cast(typeof(OperationObject._start)) &state.start,
);
}

OperationObject connect(Receiver)(return Receiver receiver) @safe scope {
Expand Down

0 comments on commit 142e544

Please sign in to comment.