Skip to content

Commit

Permalink
Add fiber, add io ops to object scheduler, fix wakeup
Browse files Browse the repository at this point in the history
  • Loading branch information
skoppe committed Aug 17, 2024
1 parent 13e96f6 commit ffd57e9
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 56 deletions.
36 changes: 29 additions & 7 deletions source/concurrency/fiber.d
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class CancelledException : Exception {

package(concurrency) abstract class BaseFiber : Fiber {
private Continuation continuation;
private Throwable nextError;
this(void delegate() dg, size_t sz, size_t guardPageSize) @trusted nothrow {
super(dg, sz, guardPageSize);
}
Expand All @@ -34,6 +35,11 @@ class OpFiber(Op) : BaseFiber {
}
}

auto fiber(Fun)(Fun fun) {
import concurrency.operations : then;
return FiberSender().then(fun);
}

struct FiberSender {
static assert (models!(typeof(this), isSender));
alias Value = void;
Expand Down Expand Up @@ -114,6 +120,7 @@ struct FiberContinuationReceiver(Receiver) {
cycle(fiber, true);
}
void setError(Throwable e) nothrow @safe {
fiber.nextError = e;
cycle(fiber, true);
}
void setValue() nothrow @safe {
Expand All @@ -127,9 +134,9 @@ void yield() @trusted {
std.concurrency.yield();
}

auto yield(Sender)(Sender sender) @trusted {
auto yield(Sender)(return Sender sender) @trusted {
import concurrency : Result;
import concurrency.operations : onResult;
import concurrency.operations : onResult, then;
import concurrency.sender : toSenderObject;

auto fiber = BaseFiber.getThis();
Expand All @@ -139,12 +146,27 @@ auto yield(Sender)(Sender sender) @trusted {
local = YieldResult!(Sender.Value)(r);
}

fiber.continuation = cast(Object)sender
.onResult(cast(void delegate(Result!(Sender.Value)) @safe shared)&store)
.toSenderObject;
SenderObjectBase!void object;

auto base = sender
.onResult(cast(void delegate(Result!(Sender.Value)) @safe shared)&store);

static if (is(Sender.Value == void)) {
object = base.toSenderObject();
} else {
object = base.then((Sender.Value v){}).toSenderObject();
}

fiber.continuation = cast(Object)object;

yield();

if (fiber.nextError) {
auto error = fiber.nextError;
fiber.nextError = null;
throw error;
}

return local;
}

Expand Down Expand Up @@ -191,13 +213,13 @@ import core.attribute : mustuse;
return result.isA!Value;
}

auto value() @safe {
auto value() @trusted scope {
static if (is(T == void))
alias valueHandler = (Completed c) {};
else
alias valueHandler = (T t) => t;

return std.sumtype.match!(valueHandler, function T(Exception e) {
return std.sumtype.match!(valueHandler, function T(Exception e) @trusted {
throw e;
})(result);
}
Expand Down
72 changes: 46 additions & 26 deletions source/concurrency/io/iouring.d
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ version(linux):

import concurrency.data.queue.mpsc;
import concurrency.stoptoken;
import concurrency.receiver : setErrno;
import concurrency.receiver : setErrno, setValueOrError;
import during;
import core.stdc.errno : ECANCELED;

Expand Down Expand Up @@ -83,7 +83,7 @@ struct IOUringContext {

private void wakeup() @trusted nothrow shared {
import core.sys.posix.unistd;
ubyte wakeup = 1;
size_t wakeup = 1;
// TODO: check return value
core.sys.posix.unistd.write(event, &wakeup, wakeup.sizeof);
}
Expand Down Expand Up @@ -115,8 +115,8 @@ struct IOUringContext {
private int run(scope shared StopToken stopToken) @safe nothrow {
pending.append(requests.popAll());

putEventFdChannel();
while (!stopToken.isStopRequested() || !pending.empty() || !io.empty()) {
putEventFdChannel();
putPending();
// TODO: might have to flip this around.
scheduleTimers();
Expand Down Expand Up @@ -206,17 +206,18 @@ struct IOUringContext {

private void putEventFdChannel() @safe nothrow {
io.putWith!((ref SubmissionEntry e, IOUringContext* context) {
e.prepRead(context.event, context.buffer[], 0);
e.prepRead(context.event, context.buffer[0..8], 0);
})(&this);
}

private void putPending() @safe nothrow {
while (!pending.empty && !io.full()) {
auto item = pending.pop();
SubmissionEntry entry;
item.submit(entry);
entry.setUserDataRaw(item);
io.put(entry);
if (item.submit(entry)) {
entry.setUserDataRaw(item);
io.put(entry);
}
}
}

Expand All @@ -227,6 +228,8 @@ struct IOUringContext {
auto item = entry.userDataAs!(Item*);
if (item !is null)
item.complete(entry);
else
putEventFdChannel();
io.popFront();
}
}
Expand All @@ -252,17 +255,27 @@ struct RunOp(Sender, Receiver) {

alias RunSender = JustFromSender!(void delegate() @trusted shared);
alias SenderWithScheduler = WithSchedulerSender!(Sender, IOUringScheduler);
alias ValueSender = DoFinallySender!(SenderWithScheduler, bool delegate() @safe nothrow shared);
alias ValueSender = DoFinallySender!(SenderWithScheduler, void delegate() @safe nothrow shared);
alias CombinedSender = WhenAllSender!(ValueSender, RunSender);
alias Op = OpType!(CombinedSender, Receiver);

IOUringContext* context;
Op op;
shared StopSource stopSource;
Op op;

@disable
this(ref return scope typeof(this) rhs);
@disable
this(this);

this(IOUringContext* context, Sender sender, return Receiver receiver) @trusted return scope {
this.context = context;
shared IOUringContext* sharedContext = cast(shared)context;
op = whenAll(
sender.withScheduler(IOUringScheduler(cast(shared)context)).doFinally(() @safe shared => stopSource.stop()),
sender.withScheduler(IOUringScheduler(cast(shared)context)).doFinally(() @safe nothrow shared {
stopSource.stop();
sharedContext.wakeup();
}),
justFrom(&(cast(shared)this).run),
).connect(receiver);
}
Expand All @@ -282,7 +295,7 @@ struct RunOp(Sender, Receiver) {

struct Item {
// TODO: we are storing 2 this pointers here
void delegate(ref SubmissionEntry sqe) @safe nothrow submit;
bool delegate(ref SubmissionEntry sqe) @safe nothrow submit;
void delegate(ref const CompletionEntry cqe) @safe nothrow complete;
Item* next;
}
Expand Down Expand Up @@ -312,13 +325,20 @@ struct CancellableOperation(Operation) {
}
}

private void submit(ref SubmissionEntry entry) @trusted nothrow {
import core.atomic;
ops.atomicFetchAdd!(MemoryOrder.raw)(1);
auto stopToken = operation.receiver.getStopToken();
cb.register(stopToken, &(cast(shared)this).onStop);
// TODO: shouldn't submit be shared?
private bool submit(ref SubmissionEntry entry) @trusted nothrow {
try {
import core.atomic;
ops.atomicFetchAdd!(MemoryOrder.raw)(1);
auto stopToken = operation.receiver.getStopToken();
cb.register(stopToken, &(cast(shared)this).onStop);

operation.submit(entry);
operation.submit(entry);
return true;
} catch (Throwable e) {
operation.receiver.setError(e);
return false;
}
}

private void complete(const ref CompletionEntry entry) @safe nothrow {
Expand Down Expand Up @@ -352,8 +372,9 @@ struct CancellableOperation(Operation) {
}
}

private void submitStop(ref SubmissionEntry entry) nothrow @safe {
private bool submitStop(ref SubmissionEntry entry) nothrow @safe {
entry.prepCancel(item);
return true;
}

private ref assumeThreadSafe() nothrow @trusted shared {
Expand Down Expand Up @@ -422,15 +443,15 @@ struct ReadOperation(Receiver) {
}
void complete(const ref CompletionEntry entry) @safe nothrow {
if (entry.res >= 0) {
receiver.setValue(buffer[offset..$]);
receiver.setValueOrError(buffer[offset..entry.res]);
} else {
receiver.setErrno("Read failed", -entry.res);
}
}
}

struct AcceptSender {
import concurrency.io : Client;
import concurrency.scheduler : Client;
import std.socket : socket_t;
alias Value = Client;
shared IOUringContext* context;
Expand All @@ -448,7 +469,7 @@ struct AcceptSender {
struct AcceptOperation(Receiver) {
import core.sys.posix.sys.socket : sockaddr, socklen_t;
import core.sys.posix.netinet.in_;
import concurrency.io : Client;
import concurrency.scheduler : Client;
import std.socket : socket_t;

socket_t fd;
Expand All @@ -461,7 +482,7 @@ struct AcceptOperation(Receiver) {
void complete(const ref CompletionEntry entry) @safe nothrow {
import std.socket : socket_t;
if (entry.res >= 0) {
receiver.setValue(Client(cast(socket_t)entry.res, addr, addrlen));
receiver.setValueOrError(Client(cast(socket_t)entry.res, addr, addrlen));
} else {
receiver.setErrno("Accept failed", -entry.res);
}
Expand All @@ -470,7 +491,7 @@ struct AcceptOperation(Receiver) {

struct ConnectSender {
import std.socket : socket_t;
alias Value = int;
alias Value = socket_t;
shared IOUringContext* context;
socket_t fd;
string address;
Expand All @@ -483,7 +504,6 @@ struct ConnectSender {
);
return op;
}

}

struct ConnectOperation(Receiver) {
Expand Down Expand Up @@ -519,7 +539,7 @@ struct ConnectOperation(Receiver) {
}
void complete(const ref CompletionEntry entry) @safe nothrow {
if (entry.res >= 0) {
receiver.setValue(cast(socket_t)entry.res);
receiver.setValueOrError(cast(socket_t)entry.res);
} else {
receiver.setErrno("Connect failed", -entry.res);
}
Expand Down Expand Up @@ -554,7 +574,7 @@ struct WriteOperation(Receiver) {
}
void complete(const ref CompletionEntry entry) @safe nothrow {
if (entry.res >= 0) {
receiver.setValue(entry.res);
receiver.setValueOrError(entry.res);
} else {
receiver.setErrno("Write failed", -entry.res);
}
Expand Down
9 changes: 1 addition & 8 deletions source/concurrency/io/package.d
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
module concurrency.io;

import concurrency.io.iouring;
import concurrency.scheduler : Client;

import std.socket : socket_t;

Expand Down Expand Up @@ -28,14 +29,6 @@ auto acceptAsync(socket_t fd) @safe nothrow @nogc {
return AcceptAsyncSender(fd);
}

struct Client {
import std.socket : socket_t;
import core.sys.posix.sys.socket : sockaddr, socklen_t;
socket_t fd;
sockaddr addr;
socklen_t addrlen;
}

struct AcceptAsyncSender {
alias Value = Client;
socket_t fd;
Expand Down
20 changes: 8 additions & 12 deletions source/concurrency/io/socket.d
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module concurrency.io.socket;
import std.socket : socket_t;

auto getSocket() @trusted {
auto tcpSocket() @trusted {
import std.socket : socket_t;
version(Windows) {
import core.sys.windows.windows;
Expand All @@ -20,15 +20,19 @@ auto getSocket() @trusted {
}

socket_t sock = cast(socket_t) socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
int on = 1;
if (sock == -1)
throw new Exception("socket");

int on = 1;
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &on, on.sizeof);
setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &on, on.sizeof);
version(Posix) // on windows REUSEADDR includes REUSEPORT
setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, &on, on.sizeof);

return sock;
}

auto listen(string address = "", ushort port = 0, int backlog = 128) @trusted {
auto listenTcp(string address = "", ushort port = 0, int backlog = 128) @trusted {
import core.stdc.stdio : fprintf, stderr;
import std.socket : socket_t;
version(Windows) {
Expand All @@ -48,9 +52,7 @@ auto listen(string address = "", ushort port = 0, int backlog = 128) @trusted {
}
import core.stdc.errno;

socket_t sock = cast(socket_t) socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
if (sock == -1)
throw new Exception("socket");
socket_t sock = tcpSocket();

sockaddr_in addr;
addr.sin_family = AF_INET;
Expand All @@ -69,12 +71,6 @@ auto listen(string address = "", ushort port = 0, int backlog = 128) @trusted {
} else
addr.sin_addr.s_addr = INADDR_ANY;

int on = 1;
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &on, on.sizeof);
setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &on, on.sizeof);
version(Posix) // on windows REUSEADDR includes REUSEPORT
setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, &on, on.sizeof);

if (bind(sock, cast(sockaddr*) &addr, addr.sizeof) == -1) {
closeSocket(sock);
throw new Exception("bind");
Expand Down
Loading

0 comments on commit ffd57e9

Please sign in to comment.