From 4a9fc3ac02f10bc7f4595b1052033087be4d6eec Mon Sep 17 00:00:00 2001 From: Sebastiaan Koppe Date: Wed, 21 Aug 2024 16:42:10 +0200 Subject: [PATCH] Fiber improvements, scheduler hacks, ignorevalue and fix for connectHeap --- source/concurrency/fiber.d | 82 +++++------- source/concurrency/io/iouring.d | 130 ++++++++++++++++++-- source/concurrency/operations/ignorevalue.d | 48 ++++++++ source/concurrency/operations/package.d | 2 + source/concurrency/scheduler.d | 12 +- source/during/io_uring.d | 14 +++ 6 files changed, 226 insertions(+), 62 deletions(-) create mode 100644 source/concurrency/operations/ignorevalue.d diff --git a/source/concurrency/fiber.d b/source/concurrency/fiber.d index 0aa6a57..b31e972 100644 --- a/source/concurrency/fiber.d +++ b/source/concurrency/fiber.d @@ -5,8 +5,6 @@ import concepts; import core.thread.fiber; import core.thread.fiber : Fiber; -alias Continuation = Object; - class CancelledException : Exception { this(string file = __FILE__, size_t line = __LINE__, Throwable next = null) @nogc @safe pure nothrow { super("Cancelled", file, line, next); @@ -14,8 +12,12 @@ class CancelledException : Exception { } package(concurrency) abstract class BaseFiber : Fiber { - private Continuation continuation; + import concurrency.receiver : ReceiverObjectBase; + + private ReceiverObjectBase!void erasedReceiver; + private void delegate() @safe nothrow startSender; private Throwable nextError; + this(void delegate() dg, size_t sz, size_t guardPageSize) @trusted nothrow { super(dg, sz, guardPageSize); } @@ -50,40 +52,41 @@ struct FiberSender { } struct FiberSenderOp(Receiver) { + import concurrency.receiver : ReceiverObjectBase; + Receiver receiver; alias BaseSender = typeof(receiver.getScheduler().schedule()); alias Op = OpType!(BaseSender, FiberContinuationReceiver!Receiver); @disable this(this); @disable this(ref return scope typeof(this) rhs); + // ReceiverObjectBase!void erasedReceiver; void start() @trusted nothrow scope { auto fiber = new OpFiber!Op(cast(void delegate()shared nothrow @safe)&run); + fiber.erasedReceiver = FiberContinuationReceiver!Receiver(fiber, &cycle, receiver).toReceiverObject!void; cycle(fiber, true); } private void schedule(OpFiber!Op fiber) @trusted nothrow { // TODO: why can't we store the Op here? - fiber.op = receiver.getScheduler.schedule().connect(FiberContinuationReceiver!Receiver(fiber, &cycle, receiver)); - fiber.op.start(); + try { + fiber.op = receiver.getScheduler.schedule().connect(FiberContinuationReceiver!Receiver(fiber, &cycle, receiver)); + fiber.op.start(); + } catch (Exception e) { + receiver.setError(e); + } } private void cycle(BaseFiber f, bool inline_) @trusted nothrow { auto fiber = cast(OpFiber!Op)f; - if (!inline_) - return schedule(fiber); if (auto throwable = fiber.call!(Fiber.Rethrow.no)) { receiver.setError(throwable); return; } - if (fiber.continuation !is null) { - auto sender = cast(SenderObjectBase!void)fiber.continuation; - fiber.continuation = null; + if (fiber.startSender !is null) { + auto start = fiber.startSender; + fiber.startSender = null; try { - // TODO: we could try to reuse this space. - // e.g. inline some space in the FiberSenderOp and storing it there - // and/or otherwise (if too big) dynamically allocate and reuse that - // space. - auto op = sender.connectHeap(FiberContinuationReceiver!Receiver(fiber, &cycle, receiver)); - op.start(); + start(); } catch (Throwable t) { receiver.setError(t); return; @@ -111,6 +114,7 @@ struct FiberSenderOp(Receiver) { } // Receiver used to continue the Fiber after yielding on a Sender. +// TODO: this receiver could directly be a ReceiverObjectBase struct FiberContinuationReceiver(Receiver) { import concurrency.receiver : ForwardExtensionPoints; BaseFiber fiber; @@ -136,8 +140,9 @@ void yield() @trusted { auto yield(Sender)(return Sender sender) @trusted { import concurrency : Result; - import concurrency.operations : onResult, then; + import concurrency.operations : onResult, then, ignoreValue; import concurrency.sender : toSenderObject; + import concurrency.receiver : ReceiverObjectBase; auto fiber = BaseFiber.getThis(); @@ -146,32 +151,32 @@ auto yield(Sender)(return Sender sender) @trusted { local = YieldResult!(Sender.Value)(r); } - SenderObjectBase!void object; - auto base = sender - .onResult(cast(void delegate(Result!(Sender.Value)) @safe shared)&store); + .onResult(cast(void delegate(Result!(Sender.Value)) @safe shared)&store) + .ignoreValue(); - static if (is(Sender.Value == void)) { - object = base.toSenderObject(); - } else { - object = base.then((Sender.Value v){}).toSenderObject(); - } + alias Op = OpType!(typeof(base), ReceiverObjectBase!void); + Op op = base.connect(fiber.erasedReceiver); + fiber.startSender = &op.start; - fiber.continuation = cast(Object)object; - yield(); + + // The last remaining allocations are around the SchedulerObject returning SenderObjectBase + + + if (fiber.nextError) { auto error = fiber.nextError; fiber.nextError = null; throw error; } - return local; + return local.value; } import core.attribute : mustuse; -@mustuse struct YieldResult(T) { +struct YieldResult(T) { import concurrency.syncwait : Completed, Cancelled, Result, isA, match; import std.sumtype; @@ -198,21 +203,6 @@ import core.attribute : mustuse; ); } - bool isError() @safe nothrow { - return result.isA!Exception; - } - - bool isCancelled() @safe nothrow { - return std.sumtype.match!( - (Exception e) => (cast(CancelledException)e) !is null, - t => false - )(result); - } - - bool isOk() @safe nothrow { - return result.isA!Value; - } - auto value() @trusted scope { static if (is(T == void)) alias valueHandler = (Completed c) {}; @@ -223,8 +213,4 @@ import core.attribute : mustuse; throw e; })(result); } - - void assumeOk() @safe { - value(); - } } diff --git a/source/concurrency/io/iouring.d b/source/concurrency/io/iouring.d index 739f622..5f9832e 100644 --- a/source/concurrency/io/iouring.d +++ b/source/concurrency/io/iouring.d @@ -35,7 +35,7 @@ private struct Queue(Node) { struct IOUringContext { import concurrency.scheduler : Timer, TimingWheels, TimerCommand; - import core.time : msecs; + import core.time : msecs, Duration; private MPSCQueue!(Item) requests; private MPSCQueue!(Timer) timers; @@ -47,10 +47,22 @@ struct IOUringContext { private int event; private TimingWheels wheels; private enum Duration tickSize = 1.msecs; + // TODO: instead of using timers and timeout on the iouring_enter + // we could also use IORING_OP_TIMEOUT + // or even IORING_OP_LINK_TIMEOUT to link the timeout to the + // wakeup event + private bool dirtyTimers; + private long nextTimer; + private shared bool needsWakeup; private this(uint size) @trusted { // TODO: look into `IORING_SETUP_SQPOLL` for fast submission import core.sys.linux.sys.eventfd; - io.setup(size); + io.setup(size, + SetupFlags.SINGLE_ISSUER + // | SetupFlags.DEFER_TASKRUN + // | SetupFlags.COOP_TASKRUN + // | SetupFlags.SQPOLL + ); event = eventfd(0, EFD_CLOEXEC); requests = new MPSCQueue!(Item); timers = new MPSCQueue!(Timer); @@ -83,9 +95,11 @@ struct IOUringContext { private void wakeup() @trusted nothrow shared { import core.sys.posix.unistd; + import core.atomic : atomicLoad, MemoryOrder, cas; size_t wakeup = 1; - // TODO: check return value - core.sys.posix.unistd.write(event, &wakeup, wakeup.sizeof); + if ((&needsWakeup).cas!(MemoryOrder.raw, MemoryOrder.raw)(true, false)) { + core.sys.posix.unistd.write(event, &wakeup, wakeup.sizeof); + } } import core.time : Duration; @@ -113,14 +127,13 @@ struct IOUringContext { } private int run(scope shared StopToken stopToken) @safe nothrow { + import core.atomic : atomicStore, MemoryOrder; pending.append(requests.popAll()); + scheduleTimers(); putEventFdChannel(); while (!stopToken.isStopRequested() || !pending.empty() || !io.empty()) { putPending(); - // TODO: might have to flip this around. - scheduleTimers(); - int rc = submitAndWait(); // TODO: return without completing all pending or completed requests // will result in blocked request. Instead we need to cancel all requests @@ -128,9 +141,13 @@ struct IOUringContext { // Would it be possible to cancel the whole context in one go? if (rc < 0) return -rc; + atomicStore!(MemoryOrder.raw)(needsWakeup, false); completeTimers(); popCompleted(); + scheduleTimers(); + + atomicStore!(MemoryOrder.raw)(needsWakeup, true); pending.append(requests.popAll()); } @@ -145,6 +162,9 @@ struct IOUringContext { Queue!(Timer) items; items.append(timers.popAll()); + if (!items.empty) + dirtyTimers = true; + while (!items.empty) { auto timer = items.pop(); @@ -169,6 +189,8 @@ struct IOUringContext { if (incr > 0) { Timer* t; wheels.advance(incr, t); + if (t !is null) + dirtyTimers = true; while (t !is null) { auto next = t.next; t.userdata(TimerTrigger.trigger); @@ -177,14 +199,36 @@ struct IOUringContext { } } + import std.typecons : Nullable; + private Nullable!Duration timeUntilNextTimer() @safe nothrow { + import std.datetime.systime : Clock; + import core.time : hnsecs; + + long now = Clock.currStdTime; + if (dirtyTimers) { + dirtyTimers = false; + auto nextTriggerOpt = wheels.timeUntilNextEvent(tickSize, now); + if (nextTriggerOpt.isNull) { + nextTimer = 0; + return typeof(return).init; + } + nextTimer = now + nextTriggerOpt.get.split!"hnsecs".hnsecs; + return nextTriggerOpt; + } else if (nextTimer != 0) { + return typeof(return)((nextTimer - now).hnsecs); + } else { + return typeof(return).init; + } + } + private int submitAndWait() @safe nothrow { import std.datetime.systime : Clock; - auto nextTriggerOpt = wheels.timeUntilNextEvent(tickSize, Clock.currStdTime); + auto nextTriggerOpt = timeUntilNextTimer(); if (!nextTriggerOpt.isNull) { // next timer is in 0 msecs - if (nextTriggerOpt.get == 0.msecs) { + if (nextTriggerOpt.get <= 0.msecs) { // only submit any SubmissionEntries return io.submit(); } @@ -399,10 +443,14 @@ struct IOUringScheduler { return ConnectSender(context, fd, address, port); } - auto write(socket_t fd, ubyte[] buffer, long offset = 0) @safe nothrow @nogc { + auto write(socket_t fd, const(ubyte)[] buffer, long offset = 0) @safe nothrow @nogc { return WriteSender(context, fd, buffer, offset); } + auto close(socket_t fd) @safe nothrow @nogc { + return CloseSender(context, fd); + } + auto schedule() @safe nothrow @nogc { import concurrency.scheduler : ScheduleAfterSender; import core.time : msecs; @@ -551,7 +599,7 @@ struct WriteSender { alias Value = int; shared IOUringContext* context; socket_t fd; - ubyte[] buffer; + const(ubyte)[] buffer; long offset; auto connect(Receiver)(return Receiver receiver) @safe return scope { // ensure NRVO @@ -566,7 +614,7 @@ struct WriteSender { struct WriteOperation(Receiver) { import std.socket : socket_t; socket_t fd; - ubyte[] buffer; + const(ubyte)[] buffer; long offset; Receiver receiver; void submit(ref SubmissionEntry entry) @safe nothrow { @@ -580,3 +628,61 @@ struct WriteOperation(Receiver) { } } } + +struct NopSender { + alias Value = int; + shared IOUringContext* context; + auto connect(Receiver)(return Receiver receiver) @safe return scope { + // ensure NRVO + auto op = CancellableOperation!(NopOperation!Receiver)( + context, + NopOperation!(Receiver)(receiver) + ); + return op; + } +} + +struct NopOperation(Receiver) { + Receiver receiver; + void submit(ref SubmissionEntry entry) @safe nothrow { + entry.prepNop(); + } + void complete(const ref CompletionEntry entry) @safe nothrow { + if (entry.res >= 0) { + receiver.setValueOrError(entry.res); + } else { + receiver.setErrno("Nop failed", -entry.res); + } + } +} + +struct CloseSender { + import std.socket : socket_t; + alias Value = void; + shared IOUringContext* context; + socket_t fd; + auto connect(Receiver)(return Receiver receiver) @safe return scope { + // ensure NRVO + auto op = CancellableOperation!(CloseOperation!Receiver)( + context, + CloseOperation!(Receiver)(fd, receiver) + ); + return op; + } +} + +struct CloseOperation(Receiver) { + import std.socket : socket_t; + socket_t fd; + Receiver receiver; + void submit(ref SubmissionEntry entry) @safe nothrow { + entry.prepClose(fd); + } + void complete(const ref CompletionEntry entry) @safe nothrow { + if (entry.res >= 0) { + receiver.setValueOrError(); + } else { + receiver.setErrno("Close failed", -entry.res); + } + } +} diff --git a/source/concurrency/operations/ignorevalue.d b/source/concurrency/operations/ignorevalue.d new file mode 100644 index 0000000..0218945 --- /dev/null +++ b/source/concurrency/operations/ignorevalue.d @@ -0,0 +1,48 @@ +module concurrency.operations.ignorevalue; + +import concurrency; +import concurrency.receiver; +import concurrency.sender; +import concurrency.stoptoken; +import concepts; + +/// runs a side-effect whenever the underlying sender completes +auto ignoreValue(Sender)(Sender sender) { + return IgnoreValueSender!(Sender)(sender); +} + +private struct IgnoreValueReceiver(Value, Receiver) { + Receiver receiver; + static if (is(Value == void)) + // TODO: mustn't this be nothrow? + void setValue() @safe { + receiver.setValue(); + } + + else + void setValue(Value value) @safe { + receiver.setValue(); + } + + void setDone() @safe nothrow { + receiver.setDone(); + } + + void setError(Throwable t) @safe nothrow { + receiver.setError(t); + } + + mixin ForwardExtensionPoints!receiver; +} + +struct IgnoreValueSender(Sender) { //if (models!(Sender, isSender)) { + // static assert(models!(typeof(this), isSender)); + alias Value = void; + Sender sender; + auto connect(Receiver)(return Receiver receiver) @safe return scope { + // ensure NRVO + auto op = sender.connect( + IgnoreValueReceiver!(Sender.Value, Receiver)(receiver)); + return op; + } +} diff --git a/source/concurrency/operations/package.d b/source/concurrency/operations/package.d index 9c3a0cd..af4b929 100644 --- a/source/concurrency/operations/package.d +++ b/source/concurrency/operations/package.d @@ -25,3 +25,5 @@ public import concurrency.operations.completewitherror; public import concurrency.operations.oncompletion; public import concurrency.operations.onresult; public import concurrency.operations.repeat; +public import concurrency.operations.ignorevalue; +public import concurrency.operations.dofinally; diff --git a/source/concurrency/scheduler.d b/source/concurrency/scheduler.d index 189a417..62f1512 100644 --- a/source/concurrency/scheduler.d +++ b/source/concurrency/scheduler.d @@ -34,7 +34,8 @@ interface SchedulerObjectBase { SenderObjectBase!(ubyte[]) read(socket_t fd, return ubyte[] buffer, long offset = 0) @safe; SenderObjectBase!(Client) accept(socket_t fd) @safe; SenderObjectBase!(socket_t) connect(socket_t fd, return string address, ushort port) @safe; - SenderObjectBase!(int) write(socket_t fd, return ubyte[] buffer, long offset = 0) @safe; + SenderObjectBase!(int) write(socket_t fd, return const(ubyte)[] buffer, long offset = 0) @safe; + SenderObjectBase!(void) close(socket_t fd) @safe; } @@ -101,13 +102,20 @@ class SchedulerObject(S) : SchedulerObjectBase { throw new Exception("`connect` not implemented on "~S.stringof); } } - SenderObjectBase!(int) write(socket_t fd, return ubyte[] buffer, long offset = 0) @safe { + SenderObjectBase!(int) write(socket_t fd, return const(ubyte)[] buffer, long offset = 0) @safe { static if (__traits(hasMember, S, "write")) { return scheduler.write(fd, buffer, offset).toSenderObject(); } else { throw new Exception("`write` not implemented on "~S.stringof); } } + SenderObjectBase!(void) close(socket_t fd) @safe { + static if (__traits(hasMember, S, "close")) { + return scheduler.close(fd).toSenderObject(); + } else { + throw new Exception("`close` not implemented on "~S.stringof); + } + } } SchedulerObjectBase toSchedulerObject(S)(S scheduler) { diff --git a/source/during/io_uring.d b/source/during/io_uring.d index 1fe7548..9ad0188 100644 --- a/source/during/io_uring.d +++ b/source/during/io_uring.d @@ -970,6 +970,20 @@ enum SetupFlags : uint /// `IORING_SETUP_CQE32`: CQEs are 32 byte /// Note: since Linux 5.19 CQE32 = 1U << 11, + + /// Only one task/thread is allowed to submit requests + /// + /// Note: Available since 6.1. + SINGLE_ISSUER = 1U << 12, + + /* + * Defer running task work to get events. + * Rather than running bits of task work whenever the task transitions + * try to do it just before it is needed. + * + * Note: Available since 6.1. + */ + DEFER_TASKRUN = 1U << 13, } /// `io_uring_params->features` flags