Skip to content

Commit

Permalink
Fiber improvements, scheduler hacks, ignorevalue and fix for connectHeap
Browse files Browse the repository at this point in the history
  • Loading branch information
skoppe committed Aug 21, 2024
1 parent ffd57e9 commit 4a9fc3a
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 62 deletions.
82 changes: 34 additions & 48 deletions source/concurrency/fiber.d
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@ import concepts;
import core.thread.fiber;
import core.thread.fiber : Fiber;

alias Continuation = Object;

class CancelledException : Exception {
this(string file = __FILE__, size_t line = __LINE__, Throwable next = null) @nogc @safe pure nothrow {
super("Cancelled", file, line, next);
}
}

package(concurrency) abstract class BaseFiber : Fiber {
private Continuation continuation;
import concurrency.receiver : ReceiverObjectBase;

private ReceiverObjectBase!void erasedReceiver;
private void delegate() @safe nothrow startSender;
private Throwable nextError;

this(void delegate() dg, size_t sz, size_t guardPageSize) @trusted nothrow {
super(dg, sz, guardPageSize);
}
Expand Down Expand Up @@ -50,40 +52,41 @@ struct FiberSender {
}

struct FiberSenderOp(Receiver) {
import concurrency.receiver : ReceiverObjectBase;

Receiver receiver;
alias BaseSender = typeof(receiver.getScheduler().schedule());
alias Op = OpType!(BaseSender, FiberContinuationReceiver!Receiver);
@disable this(this);
@disable this(ref return scope typeof(this) rhs);
// ReceiverObjectBase!void erasedReceiver;
void start() @trusted nothrow scope {
auto fiber = new OpFiber!Op(cast(void delegate()shared nothrow @safe)&run);
fiber.erasedReceiver = FiberContinuationReceiver!Receiver(fiber, &cycle, receiver).toReceiverObject!void;
cycle(fiber, true);
}
private void schedule(OpFiber!Op fiber) @trusted nothrow {
// TODO: why can't we store the Op here?
fiber.op = receiver.getScheduler.schedule().connect(FiberContinuationReceiver!Receiver(fiber, &cycle, receiver));
fiber.op.start();
try {
fiber.op = receiver.getScheduler.schedule().connect(FiberContinuationReceiver!Receiver(fiber, &cycle, receiver));
fiber.op.start();
} catch (Exception e) {
receiver.setError(e);
}
}
private void cycle(BaseFiber f, bool inline_) @trusted nothrow {
auto fiber = cast(OpFiber!Op)f;
if (!inline_)
return schedule(fiber);

if (auto throwable = fiber.call!(Fiber.Rethrow.no)) {
receiver.setError(throwable);
return;
}

if (fiber.continuation !is null) {
auto sender = cast(SenderObjectBase!void)fiber.continuation;
fiber.continuation = null;
if (fiber.startSender !is null) {
auto start = fiber.startSender;
fiber.startSender = null;
try {
// TODO: we could try to reuse this space.
// e.g. inline some space in the FiberSenderOp and storing it there
// and/or otherwise (if too big) dynamically allocate and reuse that
// space.
auto op = sender.connectHeap(FiberContinuationReceiver!Receiver(fiber, &cycle, receiver));
op.start();
start();
} catch (Throwable t) {
receiver.setError(t);
return;
Expand Down Expand Up @@ -111,6 +114,7 @@ struct FiberSenderOp(Receiver) {
}

// Receiver used to continue the Fiber after yielding on a Sender.
// TODO: this receiver could directly be a ReceiverObjectBase
struct FiberContinuationReceiver(Receiver) {
import concurrency.receiver : ForwardExtensionPoints;
BaseFiber fiber;
Expand All @@ -136,8 +140,9 @@ void yield() @trusted {

auto yield(Sender)(return Sender sender) @trusted {
import concurrency : Result;
import concurrency.operations : onResult, then;
import concurrency.operations : onResult, then, ignoreValue;
import concurrency.sender : toSenderObject;
import concurrency.receiver : ReceiverObjectBase;

auto fiber = BaseFiber.getThis();

Expand All @@ -146,32 +151,32 @@ auto yield(Sender)(return Sender sender) @trusted {
local = YieldResult!(Sender.Value)(r);
}

SenderObjectBase!void object;

auto base = sender
.onResult(cast(void delegate(Result!(Sender.Value)) @safe shared)&store);
.onResult(cast(void delegate(Result!(Sender.Value)) @safe shared)&store)
.ignoreValue();

static if (is(Sender.Value == void)) {
object = base.toSenderObject();
} else {
object = base.then((Sender.Value v){}).toSenderObject();
}
alias Op = OpType!(typeof(base), ReceiverObjectBase!void);
Op op = base.connect(fiber.erasedReceiver);
fiber.startSender = &op.start;

fiber.continuation = cast(Object)object;

yield();


// The last remaining allocations are around the SchedulerObject returning SenderObjectBase



if (fiber.nextError) {
auto error = fiber.nextError;
fiber.nextError = null;
throw error;
}

return local;
return local.value;
}

import core.attribute : mustuse;
@mustuse struct YieldResult(T) {
struct YieldResult(T) {
import concurrency.syncwait : Completed, Cancelled, Result, isA, match;
import std.sumtype;

Expand All @@ -198,21 +203,6 @@ import core.attribute : mustuse;
);
}

bool isError() @safe nothrow {
return result.isA!Exception;
}

bool isCancelled() @safe nothrow {
return std.sumtype.match!(
(Exception e) => (cast(CancelledException)e) !is null,
t => false
)(result);
}

bool isOk() @safe nothrow {
return result.isA!Value;
}

auto value() @trusted scope {
static if (is(T == void))
alias valueHandler = (Completed c) {};
Expand All @@ -223,8 +213,4 @@ import core.attribute : mustuse;
throw e;
})(result);
}

void assumeOk() @safe {
value();
}
}
Loading

0 comments on commit 4a9fc3a

Please sign in to comment.