From b883a228925f9e91b4e56e28544799a2ee76d902 Mon Sep 17 00:00:00 2001 From: Sebastiaan Koppe Date: Wed, 21 Aug 2024 22:30:14 +0200 Subject: [PATCH] Fiber improvements, scheduler hacks --- source/concurrency/fiber.d | 82 ++++++-------- source/concurrency/io/iouring.d | 130 ++++++++++++++++++++--- source/concurrency/io/package.d | 18 +++- source/concurrency/operations/onresult.d | 4 +- source/concurrency/operations/then.d | 4 +- source/concurrency/scheduler.d | 12 ++- source/concurrency/sender.d | 2 +- source/during/io_uring.d | 14 +++ 8 files changed, 197 insertions(+), 69 deletions(-) diff --git a/source/concurrency/fiber.d b/source/concurrency/fiber.d index 0aa6a57..b31e972 100644 --- a/source/concurrency/fiber.d +++ b/source/concurrency/fiber.d @@ -5,8 +5,6 @@ import concepts; import core.thread.fiber; import core.thread.fiber : Fiber; -alias Continuation = Object; - class CancelledException : Exception { this(string file = __FILE__, size_t line = __LINE__, Throwable next = null) @nogc @safe pure nothrow { super("Cancelled", file, line, next); @@ -14,8 +12,12 @@ class CancelledException : Exception { } package(concurrency) abstract class BaseFiber : Fiber { - private Continuation continuation; + import concurrency.receiver : ReceiverObjectBase; + + private ReceiverObjectBase!void erasedReceiver; + private void delegate() @safe nothrow startSender; private Throwable nextError; + this(void delegate() dg, size_t sz, size_t guardPageSize) @trusted nothrow { super(dg, sz, guardPageSize); } @@ -50,40 +52,41 @@ struct FiberSender { } struct FiberSenderOp(Receiver) { + import concurrency.receiver : ReceiverObjectBase; + Receiver receiver; alias BaseSender = typeof(receiver.getScheduler().schedule()); alias Op = OpType!(BaseSender, FiberContinuationReceiver!Receiver); @disable this(this); @disable this(ref return scope typeof(this) rhs); + // ReceiverObjectBase!void erasedReceiver; void start() @trusted nothrow scope { auto fiber = new OpFiber!Op(cast(void delegate()shared nothrow @safe)&run); + fiber.erasedReceiver = FiberContinuationReceiver!Receiver(fiber, &cycle, receiver).toReceiverObject!void; cycle(fiber, true); } private void schedule(OpFiber!Op fiber) @trusted nothrow { // TODO: why can't we store the Op here? - fiber.op = receiver.getScheduler.schedule().connect(FiberContinuationReceiver!Receiver(fiber, &cycle, receiver)); - fiber.op.start(); + try { + fiber.op = receiver.getScheduler.schedule().connect(FiberContinuationReceiver!Receiver(fiber, &cycle, receiver)); + fiber.op.start(); + } catch (Exception e) { + receiver.setError(e); + } } private void cycle(BaseFiber f, bool inline_) @trusted nothrow { auto fiber = cast(OpFiber!Op)f; - if (!inline_) - return schedule(fiber); if (auto throwable = fiber.call!(Fiber.Rethrow.no)) { receiver.setError(throwable); return; } - if (fiber.continuation !is null) { - auto sender = cast(SenderObjectBase!void)fiber.continuation; - fiber.continuation = null; + if (fiber.startSender !is null) { + auto start = fiber.startSender; + fiber.startSender = null; try { - // TODO: we could try to reuse this space. - // e.g. inline some space in the FiberSenderOp and storing it there - // and/or otherwise (if too big) dynamically allocate and reuse that - // space. - auto op = sender.connectHeap(FiberContinuationReceiver!Receiver(fiber, &cycle, receiver)); - op.start(); + start(); } catch (Throwable t) { receiver.setError(t); return; @@ -111,6 +114,7 @@ struct FiberSenderOp(Receiver) { } // Receiver used to continue the Fiber after yielding on a Sender. +// TODO: this receiver could directly be a ReceiverObjectBase struct FiberContinuationReceiver(Receiver) { import concurrency.receiver : ForwardExtensionPoints; BaseFiber fiber; @@ -136,8 +140,9 @@ void yield() @trusted { auto yield(Sender)(return Sender sender) @trusted { import concurrency : Result; - import concurrency.operations : onResult, then; + import concurrency.operations : onResult, then, ignoreValue; import concurrency.sender : toSenderObject; + import concurrency.receiver : ReceiverObjectBase; auto fiber = BaseFiber.getThis(); @@ -146,32 +151,32 @@ auto yield(Sender)(return Sender sender) @trusted { local = YieldResult!(Sender.Value)(r); } - SenderObjectBase!void object; - auto base = sender - .onResult(cast(void delegate(Result!(Sender.Value)) @safe shared)&store); + .onResult(cast(void delegate(Result!(Sender.Value)) @safe shared)&store) + .ignoreValue(); - static if (is(Sender.Value == void)) { - object = base.toSenderObject(); - } else { - object = base.then((Sender.Value v){}).toSenderObject(); - } + alias Op = OpType!(typeof(base), ReceiverObjectBase!void); + Op op = base.connect(fiber.erasedReceiver); + fiber.startSender = &op.start; - fiber.continuation = cast(Object)object; - yield(); + + // The last remaining allocations are around the SchedulerObject returning SenderObjectBase + + + if (fiber.nextError) { auto error = fiber.nextError; fiber.nextError = null; throw error; } - return local; + return local.value; } import core.attribute : mustuse; -@mustuse struct YieldResult(T) { +struct YieldResult(T) { import concurrency.syncwait : Completed, Cancelled, Result, isA, match; import std.sumtype; @@ -198,21 +203,6 @@ import core.attribute : mustuse; ); } - bool isError() @safe nothrow { - return result.isA!Exception; - } - - bool isCancelled() @safe nothrow { - return std.sumtype.match!( - (Exception e) => (cast(CancelledException)e) !is null, - t => false - )(result); - } - - bool isOk() @safe nothrow { - return result.isA!Value; - } - auto value() @trusted scope { static if (is(T == void)) alias valueHandler = (Completed c) {}; @@ -223,8 +213,4 @@ import core.attribute : mustuse; throw e; })(result); } - - void assumeOk() @safe { - value(); - } } diff --git a/source/concurrency/io/iouring.d b/source/concurrency/io/iouring.d index 739f622..5f9832e 100644 --- a/source/concurrency/io/iouring.d +++ b/source/concurrency/io/iouring.d @@ -35,7 +35,7 @@ private struct Queue(Node) { struct IOUringContext { import concurrency.scheduler : Timer, TimingWheels, TimerCommand; - import core.time : msecs; + import core.time : msecs, Duration; private MPSCQueue!(Item) requests; private MPSCQueue!(Timer) timers; @@ -47,10 +47,22 @@ struct IOUringContext { private int event; private TimingWheels wheels; private enum Duration tickSize = 1.msecs; + // TODO: instead of using timers and timeout on the iouring_enter + // we could also use IORING_OP_TIMEOUT + // or even IORING_OP_LINK_TIMEOUT to link the timeout to the + // wakeup event + private bool dirtyTimers; + private long nextTimer; + private shared bool needsWakeup; private this(uint size) @trusted { // TODO: look into `IORING_SETUP_SQPOLL` for fast submission import core.sys.linux.sys.eventfd; - io.setup(size); + io.setup(size, + SetupFlags.SINGLE_ISSUER + // | SetupFlags.DEFER_TASKRUN + // | SetupFlags.COOP_TASKRUN + // | SetupFlags.SQPOLL + ); event = eventfd(0, EFD_CLOEXEC); requests = new MPSCQueue!(Item); timers = new MPSCQueue!(Timer); @@ -83,9 +95,11 @@ struct IOUringContext { private void wakeup() @trusted nothrow shared { import core.sys.posix.unistd; + import core.atomic : atomicLoad, MemoryOrder, cas; size_t wakeup = 1; - // TODO: check return value - core.sys.posix.unistd.write(event, &wakeup, wakeup.sizeof); + if ((&needsWakeup).cas!(MemoryOrder.raw, MemoryOrder.raw)(true, false)) { + core.sys.posix.unistd.write(event, &wakeup, wakeup.sizeof); + } } import core.time : Duration; @@ -113,14 +127,13 @@ struct IOUringContext { } private int run(scope shared StopToken stopToken) @safe nothrow { + import core.atomic : atomicStore, MemoryOrder; pending.append(requests.popAll()); + scheduleTimers(); putEventFdChannel(); while (!stopToken.isStopRequested() || !pending.empty() || !io.empty()) { putPending(); - // TODO: might have to flip this around. - scheduleTimers(); - int rc = submitAndWait(); // TODO: return without completing all pending or completed requests // will result in blocked request. Instead we need to cancel all requests @@ -128,9 +141,13 @@ struct IOUringContext { // Would it be possible to cancel the whole context in one go? if (rc < 0) return -rc; + atomicStore!(MemoryOrder.raw)(needsWakeup, false); completeTimers(); popCompleted(); + scheduleTimers(); + + atomicStore!(MemoryOrder.raw)(needsWakeup, true); pending.append(requests.popAll()); } @@ -145,6 +162,9 @@ struct IOUringContext { Queue!(Timer) items; items.append(timers.popAll()); + if (!items.empty) + dirtyTimers = true; + while (!items.empty) { auto timer = items.pop(); @@ -169,6 +189,8 @@ struct IOUringContext { if (incr > 0) { Timer* t; wheels.advance(incr, t); + if (t !is null) + dirtyTimers = true; while (t !is null) { auto next = t.next; t.userdata(TimerTrigger.trigger); @@ -177,14 +199,36 @@ struct IOUringContext { } } + import std.typecons : Nullable; + private Nullable!Duration timeUntilNextTimer() @safe nothrow { + import std.datetime.systime : Clock; + import core.time : hnsecs; + + long now = Clock.currStdTime; + if (dirtyTimers) { + dirtyTimers = false; + auto nextTriggerOpt = wheels.timeUntilNextEvent(tickSize, now); + if (nextTriggerOpt.isNull) { + nextTimer = 0; + return typeof(return).init; + } + nextTimer = now + nextTriggerOpt.get.split!"hnsecs".hnsecs; + return nextTriggerOpt; + } else if (nextTimer != 0) { + return typeof(return)((nextTimer - now).hnsecs); + } else { + return typeof(return).init; + } + } + private int submitAndWait() @safe nothrow { import std.datetime.systime : Clock; - auto nextTriggerOpt = wheels.timeUntilNextEvent(tickSize, Clock.currStdTime); + auto nextTriggerOpt = timeUntilNextTimer(); if (!nextTriggerOpt.isNull) { // next timer is in 0 msecs - if (nextTriggerOpt.get == 0.msecs) { + if (nextTriggerOpt.get <= 0.msecs) { // only submit any SubmissionEntries return io.submit(); } @@ -399,10 +443,14 @@ struct IOUringScheduler { return ConnectSender(context, fd, address, port); } - auto write(socket_t fd, ubyte[] buffer, long offset = 0) @safe nothrow @nogc { + auto write(socket_t fd, const(ubyte)[] buffer, long offset = 0) @safe nothrow @nogc { return WriteSender(context, fd, buffer, offset); } + auto close(socket_t fd) @safe nothrow @nogc { + return CloseSender(context, fd); + } + auto schedule() @safe nothrow @nogc { import concurrency.scheduler : ScheduleAfterSender; import core.time : msecs; @@ -551,7 +599,7 @@ struct WriteSender { alias Value = int; shared IOUringContext* context; socket_t fd; - ubyte[] buffer; + const(ubyte)[] buffer; long offset; auto connect(Receiver)(return Receiver receiver) @safe return scope { // ensure NRVO @@ -566,7 +614,7 @@ struct WriteSender { struct WriteOperation(Receiver) { import std.socket : socket_t; socket_t fd; - ubyte[] buffer; + const(ubyte)[] buffer; long offset; Receiver receiver; void submit(ref SubmissionEntry entry) @safe nothrow { @@ -580,3 +628,61 @@ struct WriteOperation(Receiver) { } } } + +struct NopSender { + alias Value = int; + shared IOUringContext* context; + auto connect(Receiver)(return Receiver receiver) @safe return scope { + // ensure NRVO + auto op = CancellableOperation!(NopOperation!Receiver)( + context, + NopOperation!(Receiver)(receiver) + ); + return op; + } +} + +struct NopOperation(Receiver) { + Receiver receiver; + void submit(ref SubmissionEntry entry) @safe nothrow { + entry.prepNop(); + } + void complete(const ref CompletionEntry entry) @safe nothrow { + if (entry.res >= 0) { + receiver.setValueOrError(entry.res); + } else { + receiver.setErrno("Nop failed", -entry.res); + } + } +} + +struct CloseSender { + import std.socket : socket_t; + alias Value = void; + shared IOUringContext* context; + socket_t fd; + auto connect(Receiver)(return Receiver receiver) @safe return scope { + // ensure NRVO + auto op = CancellableOperation!(CloseOperation!Receiver)( + context, + CloseOperation!(Receiver)(fd, receiver) + ); + return op; + } +} + +struct CloseOperation(Receiver) { + import std.socket : socket_t; + socket_t fd; + Receiver receiver; + void submit(ref SubmissionEntry entry) @safe nothrow { + entry.prepClose(fd); + } + void complete(const ref CompletionEntry entry) @safe nothrow { + if (entry.res >= 0) { + receiver.setValueOrError(); + } else { + receiver.setErrno("Close failed", -entry.res); + } + } +} diff --git a/source/concurrency/io/package.d b/source/concurrency/io/package.d index eaaf001..ff8a644 100644 --- a/source/concurrency/io/package.d +++ b/source/concurrency/io/package.d @@ -56,14 +56,14 @@ struct ConnectAsyncSender { } } -auto writeAsync(socket_t fd, ubyte[] buffer, long offset = 0) @safe nothrow @nogc { +auto writeAsync(socket_t fd, const(ubyte)[] buffer, long offset = 0) @safe nothrow @nogc { return WriteAsyncSender(fd, buffer, offset); } struct WriteAsyncSender { alias Value = int; socket_t fd; - ubyte[] buffer; + const(ubyte)[] buffer; long offset; auto connect(Receiver)(return Receiver receiver) @safe return scope { // ensure NRVO @@ -71,3 +71,17 @@ struct WriteAsyncSender { return op; } } + +auto closeAsync(socket_t fd) @safe nothrow @nogc { + return CloseAsyncSender(fd); +} + +struct CloseAsyncSender { + alias Value = void; + socket_t fd; + auto connect(Receiver)(return Receiver receiver) @safe return scope { + // ensure NRVO + auto op = receiver.getScheduler().close(fd).connect(receiver); + return op; + } +} diff --git a/source/concurrency/operations/onresult.d b/source/concurrency/operations/onresult.d index f3dec5f..061bf99 100644 --- a/source/concurrency/operations/onresult.d +++ b/source/concurrency/operations/onresult.d @@ -59,8 +59,8 @@ private struct OnResultReceiver(Value, SideEffect, Receiver) { mixin ForwardExtensionPoints!receiver; } -struct OnResultSender(Sender, SideEffect) if (models!(Sender, isSender)) { - static assert(models!(typeof(this), isSender)); +struct OnResultSender(Sender, SideEffect) { //if (models!(Sender, isSender)) { + //static assert(models!(typeof(this), isSender)); alias Value = Sender.Value; Sender sender; SideEffect effect; diff --git a/source/concurrency/operations/then.d b/source/concurrency/operations/then.d index 4df2b95..b5cbfdb 100644 --- a/source/concurrency/operations/then.d +++ b/source/concurrency/operations/then.d @@ -88,9 +88,9 @@ private struct ThenReceiver(Receiver, Value, Fun) { mixin ForwardExtensionPoints!receiver; } -struct ThenSender(Sender, Fun) if (models!(Sender, isSender)) { +struct ThenSender(Sender, Fun) {//} if (models!(Sender, isSender)) { import std.traits : ReturnType; - static assert(models!(typeof(this), isSender)); + //static assert(models!(typeof(this), isSender)); static if (is(ReturnType!fun == Result!T, T)) alias Value = T; else diff --git a/source/concurrency/scheduler.d b/source/concurrency/scheduler.d index 189a417..62f1512 100644 --- a/source/concurrency/scheduler.d +++ b/source/concurrency/scheduler.d @@ -34,7 +34,8 @@ interface SchedulerObjectBase { SenderObjectBase!(ubyte[]) read(socket_t fd, return ubyte[] buffer, long offset = 0) @safe; SenderObjectBase!(Client) accept(socket_t fd) @safe; SenderObjectBase!(socket_t) connect(socket_t fd, return string address, ushort port) @safe; - SenderObjectBase!(int) write(socket_t fd, return ubyte[] buffer, long offset = 0) @safe; + SenderObjectBase!(int) write(socket_t fd, return const(ubyte)[] buffer, long offset = 0) @safe; + SenderObjectBase!(void) close(socket_t fd) @safe; } @@ -101,13 +102,20 @@ class SchedulerObject(S) : SchedulerObjectBase { throw new Exception("`connect` not implemented on "~S.stringof); } } - SenderObjectBase!(int) write(socket_t fd, return ubyte[] buffer, long offset = 0) @safe { + SenderObjectBase!(int) write(socket_t fd, return const(ubyte)[] buffer, long offset = 0) @safe { static if (__traits(hasMember, S, "write")) { return scheduler.write(fd, buffer, offset).toSenderObject(); } else { throw new Exception("`write` not implemented on "~S.stringof); } } + SenderObjectBase!(void) close(socket_t fd) @safe { + static if (__traits(hasMember, S, "close")) { + return scheduler.close(fd).toSenderObject(); + } else { + throw new Exception("`close` not implemented on "~S.stringof); + } + } } SchedulerObjectBase toSchedulerObject(S)(S scheduler) { diff --git a/source/concurrency/sender.d b/source/concurrency/sender.d index f274f86..e4d4819 100644 --- a/source/concurrency/sender.d +++ b/source/concurrency/sender.d @@ -315,7 +315,7 @@ class SenderObjectImpl(Sender) : SenderObjectBase!(Sender.Value) { /// Converts any Sender to a polymorphic SenderObject auto toSenderObject(Sender)(Sender sender) { - static assert(models!(Sender, isSender)); + //static assert(models!(Sender, isSender)); static if (is(Sender : SenderObjectBase!(Sender.Value))) { return sender; } else diff --git a/source/during/io_uring.d b/source/during/io_uring.d index 1fe7548..9ad0188 100644 --- a/source/during/io_uring.d +++ b/source/during/io_uring.d @@ -970,6 +970,20 @@ enum SetupFlags : uint /// `IORING_SETUP_CQE32`: CQEs are 32 byte /// Note: since Linux 5.19 CQE32 = 1U << 11, + + /// Only one task/thread is allowed to submit requests + /// + /// Note: Available since 6.1. + SINGLE_ISSUER = 1U << 12, + + /* + * Defer running task work to get events. + * Rather than running bits of task work whenever the task transitions + * try to do it just before it is needed. + * + * Note: Available since 6.1. + */ + DEFER_TASKRUN = 1U << 13, } /// `io_uring_params->features` flags