From 12c0ef47b8e002b8f3f03f69e02f201b26ff707a Mon Sep 17 00:00:00 2001 From: Sebastiaan Koppe Date: Wed, 21 Aug 2024 17:25:38 +0200 Subject: [PATCH] Trying to get connectHeap to play nice Normally the state object, because it is on the stack, is destroyed somewhere _after_ the receiver is called. With `connectHeap` this is not the case, since the state lives on the heap. Instead we call destroy manually to simulate this. Also note that connectHeap can probably be used to escape references to scoped sender/receivers, so its a hodgepot. Anyway, its not working... --- source/concurrency/operations/forwardon.d | 4 +- source/concurrency/sender.d | 77 +++++++++++++++-------- 2 files changed, 53 insertions(+), 28 deletions(-) diff --git a/source/concurrency/operations/forwardon.d b/source/concurrency/operations/forwardon.d index c91ef2f..99fba88 100644 --- a/source/concurrency/operations/forwardon.d +++ b/source/concurrency/operations/forwardon.d @@ -28,11 +28,11 @@ private struct ForwardOnReceiver(Receiver, Value, Scheduler) { } void setDone() @safe nothrow { - DoneSender().via(scheduler.schedule()).connectHeap(receiver).start(); + TypedDoneSender!(Value)().via(scheduler.schedule()).connectHeap(receiver).start(); } void setError(Throwable e) @safe nothrow { - ErrorSender(e).via(scheduler.schedule()).connectHeap(receiver).start(); + TypedErrorSender!(Value)(e).via(scheduler.schedule()).connectHeap(receiver).start(); } mixin ForwardExtensionPoints!receiver; diff --git a/source/concurrency/sender.d b/source/concurrency/sender.d index c2d3861..55f24e3 100644 --- a/source/concurrency/sender.d +++ b/source/concurrency/sender.d @@ -315,27 +315,27 @@ struct ConnectHeapReceiver(Receiver, Value) { static if (is(Value == void)) { void setValue() @safe { Receiver local = receiver; - destroyState(); local.setValue(); + destroyState(); } - } else{ + } else { void setValue(Value v) @safe { Receiver local = receiver; - destroyState(); local.setValue(v); + destroyState(); } } void setDone() nothrow @safe { Receiver local = receiver; - destroyState(); local.setDone(); + destroyState(); } void setError(Throwable e) nothrow @safe { Receiver local = receiver; - destroyState(); local.setError(e); + destroyState(); } import concurrency.receiver : ForwardExtensionPoints; @@ -402,24 +402,36 @@ struct ThrowingSender { struct DoneSender { static assert(models!(typeof(this), isSender)); alias Value = void; - static struct DoneOp(Receiver) { - Receiver receiver; - @disable - this(ref return scope typeof(this) rhs); - @disable - this(this); - void start() nothrow @trusted scope { - receiver.setDone(); - } + + auto connect(Receiver)(return Receiver receiver) @safe return scope { + // ensure NRVO + auto op = TypedDoneSender!(void)().connect(receiver); + return op; } +} + +struct TypedDoneSender(T) { + static assert(models!(typeof(this), isSender)); + alias Value = T; auto connect(Receiver)(return Receiver receiver) @safe return scope { // ensure NRVO - auto op = DoneOp!(Receiver)(receiver); + auto op = TypedDoneOp!(Receiver)(receiver); return op; } } +struct TypedDoneOp(Receiver) { + Receiver receiver; + @disable + this(ref return scope typeof(this) rhs); + @disable + this(this); + void start() nothrow @trusted scope { + receiver.setDone(); + } +} + /// A sender that always calls setValue with no args struct VoidSender { static assert(models!(typeof(this), isSender)); @@ -448,25 +460,38 @@ struct ErrorSender { static assert(models!(typeof(this), isSender)); alias Value = void; Throwable exception; - static struct ErrorOp(Receiver) { - Receiver receiver; - Throwable exception; - @disable - this(ref return scope typeof(this) rhs); - @disable - this(this); - void start() nothrow @trusted scope { - receiver.setError(exception); - } + + auto connect(Receiver)(return Receiver receiver) @safe return scope { + // ensure NRVO + auto op = TypedErrorSender!(void)(exception).connect(receiver); + return op; } +} +/// A sender that always calls setError +struct TypedErrorSender(T) { + static assert(models!(typeof(this), isSender)); + alias Value = T; + Throwable exception; auto connect(Receiver)(return Receiver receiver) @safe return scope { // ensure NRVO - auto op = ErrorOp!(Receiver)(receiver, exception); + auto op = TypedErrorOp!(Receiver)(receiver, exception); return op; } } +struct TypedErrorOp(Receiver) { + Receiver receiver; + Throwable exception; + @disable + this(ref return scope typeof(this) rhs); + @disable + this(this); + void start() nothrow @trusted scope { + receiver.setError(exception); + } +} + template OpType(Sender, Receiver) { static if (is(Sender.Op)) { alias OpType = Sender.Op;