Skip to content

Commit

Permalink
Add dedicated IOScheduler
Browse files Browse the repository at this point in the history
For Sender objects throws at runtime if base receiver doesn't have
IOScheduler.
  • Loading branch information
skoppe committed Sep 1, 2024
1 parent 0e69f48 commit 3b8cb3d
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 95 deletions.
11 changes: 7 additions & 4 deletions source/concurrency/io/iouring.d
Original file line number Diff line number Diff line change
Expand Up @@ -314,10 +314,12 @@ struct RunOp(Sender, Receiver) {
import concurrency.operations.dofinally;
import concurrency.operations.whenall;
import concurrency.operations.withscheduler;
import concurrency.operations.withioscheduler;

alias RunSender = JustFromSender!(void delegate() @trusted shared);
alias SenderWithScheduler = WithSchedulerSender!(Sender, IOUringScheduler);
alias ValueSender = DoFinallySender!(SenderWithScheduler, void delegate() @safe nothrow shared);
alias SenderWithIOScheduler = WithIOSchedulerSender!(SenderWithScheduler, IOUringScheduler);
alias ValueSender = DoFinallySender!(SenderWithIOScheduler, void delegate() @safe nothrow shared);
alias CombinedSender = WhenAllSender!(ValueSender, RunSender);
alias Op = OpType!(CombinedSender, Receiver);

Expand All @@ -333,8 +335,9 @@ struct RunOp(Sender, Receiver) {
this(IOUringContext* context, Sender sender, return Receiver receiver) @trusted return scope {
this.context = context;
shared IOUringContext* sharedContext = cast(shared)context;
auto scheduler = IOUringScheduler(sharedContext);
op = whenAll(
sender.withScheduler(IOUringScheduler(cast(shared)context)).doFinally(() @safe nothrow shared {
sender.withScheduler(scheduler).withIOScheduler(scheduler).doFinally(() @safe nothrow shared {
stopSource.stop();
sharedContext.wakeup();
}),
Expand Down Expand Up @@ -522,7 +525,7 @@ struct ReadOperation(Receiver) {
}

struct AcceptSender {
import concurrency.scheduler : Client;
import concurrency.ioscheduler : Client;
import std.socket : socket_t;
alias Value = Client;
shared IOUringContext* context;
Expand All @@ -540,7 +543,7 @@ struct AcceptSender {
struct AcceptOperation(Receiver) {
import core.sys.posix.sys.socket : sockaddr, socklen_t;
import core.sys.posix.netinet.in_;
import concurrency.scheduler : Client;
import concurrency.ioscheduler : Client;
import std.socket : socket_t;

socket_t fd;
Expand Down
12 changes: 6 additions & 6 deletions source/concurrency/io/package.d
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module concurrency.io;

import concurrency.io.iouring;
import concurrency.scheduler : Client;
import concurrency.ioscheduler : Client;

import std.socket : socket_t;

Expand All @@ -19,7 +19,7 @@ struct ReadAsyncSender {
long offset;
auto connect(Receiver)(return Receiver receiver) @safe return scope {
// ensure NRVO
auto op = receiver.getScheduler().read(fd, buffer, offset).connect(receiver);
auto op = receiver.getIOScheduler().read(fd, buffer, offset).connect(receiver);
return op;
}
}
Expand All @@ -33,7 +33,7 @@ struct AcceptAsyncSender {
socket_t fd;
auto connect(Receiver)(return Receiver receiver) @safe return scope {
// ensure NRVO
auto op = receiver.getScheduler().accept(fd).connect(receiver);
auto op = receiver.getIOScheduler().accept(fd).connect(receiver);
return op;
}
}
Expand All @@ -50,7 +50,7 @@ struct ConnectAsyncSender {
ushort port;
auto connect(Receiver)(return Receiver receiver) @safe return scope {
// ensure NRVO
auto op = receiver.getScheduler().connect(fd, address, port).connect(receiver);
auto op = receiver.getIOScheduler().connect(fd, address, port).connect(receiver);
return op;
}
}
Expand All @@ -66,7 +66,7 @@ struct WriteAsyncSender {
long offset;
auto connect(Receiver)(return Receiver receiver) @safe return scope {
// ensure NRVO
auto op = receiver.getScheduler().write(fd, buffer, offset).connect(receiver);
auto op = receiver.getIOScheduler().write(fd, buffer, offset).connect(receiver);
return op;
}
}
Expand All @@ -80,7 +80,7 @@ struct CloseAsyncSender {
socket_t fd;
auto connect(Receiver)(return Receiver receiver) @safe return scope {
// ensure NRVO
auto op = receiver.getScheduler().close(fd).connect(receiver);
auto op = receiver.getIOScheduler().close(fd).connect(receiver);
return op;
}
}
94 changes: 94 additions & 0 deletions source/concurrency/ioscheduler.d
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
module concurrency.ioscheduler;

import concurrency.sender : SenderObjectBase, isSender;
import core.time : Duration;
import concepts;
import std.typecons : Nullable, nullable;

void checkIOScheduler(T)() {
import concurrency.sender : checkSender;
import core.time : msecs;
import std.traits : ReturnType;
alias ReadSender = ReturnType!(T.read);
checkSender!ReadSender();
// TODO: add other function checks
}

enum isIOScheduler(T) = is(typeof(checkIOScheduler!T));

struct Client {
import std.socket : socket_t;
version(Windows) {
import core.sys.windows.windows : sockaddr, socklen_t;
} else version(Posix) {
import core.sys.posix.sys.socket : sockaddr, socklen_t;
}

socket_t fd;
sockaddr addr;
socklen_t addrlen;
}

/// polymorphic IOScheduler
interface IOSchedulerObjectBase {
import std.socket : socket_t;
// TODO: read/write/close aren't just for sockets really
SenderObjectBase!(ubyte[]) read(socket_t fd, return ubyte[] buffer, long offset = 0) @safe;
SenderObjectBase!(Client) accept(socket_t fd) @safe;
SenderObjectBase!(socket_t) connect(socket_t fd, return string address, ushort port) @safe;
SenderObjectBase!(int) write(socket_t fd, return const(ubyte)[] buffer, long offset = 0) @safe;
SenderObjectBase!(void) close(socket_t fd) @safe;
}

struct NullIOScheduler {
import std.socket : socket_t;
import concurrency.sender : ValueSender;

string errorMsg;

ValueSender!(ubyte[]) read(socket_t fd, return ubyte[] buffer, long offset = 0) @safe {
throw new Exception(errorMsg);
}
ValueSender!(Client) accept(socket_t fd) @safe {
throw new Exception(errorMsg);
}
ValueSender!(socket_t) connect(socket_t fd, return string address, ushort port) @safe {
throw new Exception(errorMsg);
}
ValueSender!(int) write(socket_t fd, return const(ubyte)[] buffer, long offset = 0) @safe {
throw new Exception(errorMsg);
}
ValueSender!(void) close(socket_t fd) @safe {
throw new Exception(errorMsg);
}
}

class IOSchedulerObject(S) : IOSchedulerObjectBase {
import concurrency.sender : toSenderObject;
S scheduler;
this(S scheduler) {
this.scheduler = scheduler;
}

SenderObjectBase!(ubyte[]) read(socket_t fd, return ubyte[] buffer, long offset = 0) @safe {
return scheduler.read(fd, buffer, offset).toSenderObject();
}
SenderObjectBase!(Client) accept(socket_t fd) @safe {
return scheduler.accept(fd).toSenderObject();
}
// TODO: is trusted because of scope string address
SenderObjectBase!(socket_t) connect(socket_t fd, return string address, ushort port) @trusted {
string adr = address;
return scheduler.connect(fd, adr, port).toSenderObject();
}
SenderObjectBase!(int) write(socket_t fd, return const(ubyte)[] buffer, long offset = 0) @safe {
return scheduler.write(fd, buffer, offset).toSenderObject();
}
SenderObjectBase!(void) close(socket_t fd) @safe {
return scheduler.close(fd).toSenderObject();
}
}

IOSchedulerObjectBase toIOSchedulerObject(S)(S scheduler) {
return new IOSchedulerObject!(S)(scheduler);
}
52 changes: 52 additions & 0 deletions source/concurrency/operations/withioscheduler.d
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
module concurrency.operations.withioscheduler;

import concurrency;
import concurrency.receiver;
import concurrency.sender;
import concurrency.stoptoken;
import concepts;
import std.traits;

auto withIOScheduler(Sender, IOScheduler)(Sender sender, IOScheduler ioScheduler) {
return WithIOSchedulerSender!(Sender, IOScheduler)(sender, ioScheduler);
}

private struct WithIOSchedulerReceiver(Receiver, Value, IOScheduler) {
Receiver receiver;
IOScheduler ioScheduler;
static if (is(Value == void)) {
void setValue() @safe {
receiver.setValue();
}
} else {
void setValue(Value value) @safe {
receiver.setValue(value);
}
}

void setDone() @safe nothrow {
receiver.setDone();
}

void setError(Throwable e) @safe nothrow {
receiver.setError(e);
}

auto getIOScheduler() @safe nothrow {
return ioScheduler;
}

mixin ForwardExtensionPoints!receiver;
}

struct WithIOSchedulerSender(Sender, IOScheduler) if (models!(Sender, isSender)) {
alias Value = Sender.Value;
Sender sender;
IOScheduler ioScheduler;
auto connect(Receiver)(return Receiver receiver) @safe return scope {
alias R = WithIOSchedulerReceiver!(Receiver, Sender.Value, IOScheduler);
// ensure NRVO
auto op = sender.connect(R(receiver, ioScheduler));
return op;
}
}
8 changes: 8 additions & 0 deletions source/concurrency/receiver.d
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,19 @@ mixin template ForwardExtensionPoints(alias receiver) {
auto getScheduler() nothrow @safe {
return receiver.getScheduler();
}

static if (__traits(hasMember, receiver, "getIOScheduler")) {
auto getIOScheduler() nothrow @safe {
return receiver.getIOScheduler();
}
}
}

/// A polymorphic receiver of type T
interface ReceiverObjectBase(T) {
import concurrency.stoptoken : StopToken;
import concurrency.scheduler : SchedulerObjectBase;
import concurrency.ioscheduler : IOSchedulerObjectBase;
static assert(models!(ReceiverObjectBase!T, isReceiver));
static if (is(T == void))
void setValue() @safe;
Expand All @@ -40,6 +47,7 @@ interface ReceiverObjectBase(T) {
void setError(Throwable e) nothrow @safe;
shared(StopToken) getStopToken() nothrow @safe;
SchedulerObjectBase getScheduler() scope nothrow @safe;
IOSchedulerObjectBase getIOScheduler() scope nothrow @safe;
}

struct NullReceiver(T) {
Expand Down
84 changes: 0 additions & 84 deletions source/concurrency/scheduler.d
Original file line number Diff line number Diff line change
Expand Up @@ -17,59 +17,12 @@ void checkScheduler(T)() {

enum isScheduler(T) = is(typeof(checkScheduler!T));

struct Client {
import std.socket : socket_t;
version(Windows) {
import core.sys.windows.windows : sockaddr, socklen_t;
} else version(Posix) {
import core.sys.posix.sys.socket : sockaddr, socklen_t;
}

socket_t fd;
sockaddr addr;
socklen_t addrlen;
}

/// polymorphic Scheduler
interface SchedulerObjectBase {
import std.socket : socket_t;
SenderObjectBase!void schedule() @safe;
SenderObjectBase!void scheduleAfter(Duration d) @safe;
// TODO: do these belong here?
SenderObjectBase!(ubyte[]) read(socket_t fd, return ubyte[] buffer, long offset = 0) @safe;
SenderObjectBase!(Client) accept(socket_t fd) @safe;
SenderObjectBase!(socket_t) connect(socket_t fd, return string address, ushort port) @safe;
SenderObjectBase!(int) write(socket_t fd, return const(ubyte)[] buffer, long offset = 0) @safe;
SenderObjectBase!(void) close(socket_t fd) @safe;
}


// We can pull the LocalThreadExecutor (and its schedule/scheduleAfter) out into a specialized context.
// Just like we did with the iouring context

// The interesting bit is that the syncWait algorithm then might be inferred as @nogc

// The question remains how we would want to integrate these.
// With iouring we created a runner that would take a sender and would inject the scheduler and allow itself to steal the current thread.

// That last part is important, we don't want to spawn a thread just to run timers, we can do it perfectly fine on the current thread.
// Same with iouring or other event loops.

// That said, we can, if we want to, move the event loop to another thread.

// The only thing we can't do is cross schedule timers from one thread to another.
// Well, that is not true, we can create two context objects that expose a Scheduler






// Guess we just have to write it and see....

// Dietmar Kuhl used a iocontext with a run function that allows running it on the current thread.
// In rant I had the iocontext's runner return a sender so you could await that.

class SchedulerObject(S) : SchedulerObjectBase {
import concurrency.sender : toSenderObject;
S scheduler;
Expand All @@ -84,43 +37,6 @@ class SchedulerObject(S) : SchedulerObjectBase {
SenderObjectBase!void scheduleAfter(Duration d) @safe {
return scheduler.scheduleAfter(d).toSenderObject();
}
SenderObjectBase!(ubyte[]) read(socket_t fd, return ubyte[] buffer, long offset = 0) @safe {
static if (__traits(hasMember, S, "read")) {
return scheduler.read(fd, buffer, offset).toSenderObject();
} else {
throw new Exception("`read` not implemented on "~S.stringof);
}
}
SenderObjectBase!(Client) accept(socket_t fd) @safe {
static if (__traits(hasMember, S, "accept")) {
return scheduler.accept(fd).toSenderObject();
} else {
throw new Exception("`accept` not implemented on "~S.stringof);
}
}
// TODO: is trusted because of scope string address
SenderObjectBase!(socket_t) connect(socket_t fd, return string address, ushort port) @trusted {
static if (__traits(hasMember, S, "connect")) {
string adr = address;
return scheduler.connect(fd, adr, port).toSenderObject();
} else {
throw new Exception("`connect` not implemented on "~S.stringof);
}
}
SenderObjectBase!(int) write(socket_t fd, return const(ubyte)[] buffer, long offset = 0) @safe {
static if (__traits(hasMember, S, "write")) {
return scheduler.write(fd, buffer, offset).toSenderObject();
} else {
throw new Exception("`write` not implemented on "~S.stringof);
}
}
SenderObjectBase!(void) close(socket_t fd) @safe {
static if (__traits(hasMember, S, "close")) {
return scheduler.close(fd).toSenderObject();
} else {
throw new Exception("`close` not implemented on "~S.stringof);
}
}
}

SchedulerObjectBase toSchedulerObject(S)(S scheduler) {
Expand Down
Loading

0 comments on commit 3b8cb3d

Please sign in to comment.