Skip to content

Commit

Permalink
More fixes for kqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
trikko committed Dec 25, 2024
1 parent e82869a commit ebed90a
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 35 deletions.
74 changes: 60 additions & 14 deletions source/serverino/communicator.d
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import serverino.common;
import serverino.databuffer;
import serverino.daemon : WorkerInfo, now;
import serverino.config : DaemonConfigPtr;
import std.socket : Socket, SocketOption, SocketOptionLevel, lastSocketError, wouldHaveBlocked, SocketShutdown;
import std.socket : Socket, SocketOption, SocketOptionLevel, lastSocketError, wouldHaveBlocked, SocketShutdown, socket_t;
import std.string: join;
import std.algorithm : strip;
import std.conv : text, to;
Expand Down Expand Up @@ -138,13 +138,20 @@ package class Communicator
static if (serverino.common.Backend == BackendType.EPOLL)
{
import serverino.daemon : Daemon;
Daemon.epollRemoveSocket(clientSkt);
Daemon.epollRemoveSocket(clientSktHandle);
}
else static if (serverino.common.Backend == BackendType.KQUEUE)
{
import serverino.daemon : Daemon;
auto kv = kevent(clientSkt.handle, EVFILT_READ | EVFILT_WRITE, EV_DELETE, 0, 0, cast(void*) this);
kevent_f(Daemon.kq, &kv, 1, null, 0, null);

auto change = &Daemon.changeList[Daemon.changes];
change.ident = clientSktHandle;
change.filter = EVFILT_READ | EVFILT_WRITE;
change.flags = EV_DELETE | EV_DISABLE;
change.fflags = 0;
change.data = 0;
change.udata = null;
Daemon.changes++;
}

// Remove the communicator from the list of alives
Expand All @@ -160,6 +167,7 @@ package class Communicator
deads = this;

this.clientSkt = null;
clientSktHandle = socket_t.max;
}
}

Expand All @@ -185,17 +193,25 @@ package class Communicator
s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1);
s.blocking = false;
this.clientSkt = s;
this.clientSktHandle = s.handle;

static if (serverino.common.Backend == BackendType.EPOLL)
{
import serverino.daemon : Daemon;
import core.sys.linux.epoll : EPOLLIN;
Daemon.epollAddSocket(s, EPOLLIN, cast(void*) this);
Daemon.epollAddSocket(clientSktHandle, EPOLLIN, cast(void*) this);
}
else static if (serverino.common.Backend == BackendType.KQUEUE)
{
import serverino.daemon : Daemon;
Daemon.changeList.append(kevent(s.handle, EVFILT_READ, EV_ADD, 0, 0, cast(void*) this));
auto change = &Daemon.changeList[Daemon.changes];
change.ident = clientSktHandle;
change.filter = EVFILT_READ;
change.flags = EV_ADD | EV_ENABLE;
change.fflags = 0;
change.data = 0;
change.udata = cast(void*) this;
Daemon.changes++;
}
}
else assert(false);
Expand Down Expand Up @@ -268,7 +284,7 @@ package class Communicator
{
import serverino.daemon : Daemon;
import core.sys.linux.epoll : EPOLLIN;
Daemon.epollEditSocket(clientSkt, EPOLLIN, cast(void*) this);
Daemon.epollEditSocket(clientSktHandle, EPOLLIN, cast(void*) this);
}

hasBuffer = false;
Expand All @@ -278,7 +294,14 @@ package class Communicator
if (clientSkt !is null && hasBuffer)
{
import serverino.daemon : Daemon;
Daemon.changeList.append(kevent(clientSkt.handle, EVFILT_READ, EV_ADD, 0, 0, cast(void*) this));
auto change = &Daemon.changeList[Daemon.changes];
change.ident = clientSktHandle;
change.filter = EVFILT_READ;
change.flags = EV_ADD | EV_ENABLE;
change.fflags = 0;
change.data = 0;
change.udata = cast(void*) this;
Daemon.changes++;
}

hasBuffer = false;
Expand Down Expand Up @@ -424,15 +447,22 @@ package class Communicator
hasBuffer = false;
import serverino.daemon : Daemon;
import core.sys.linux.epoll : EPOLLIN;
Daemon.epollEditSocket(clientSkt, EPOLLIN, cast(void*) this);
Daemon.epollEditSocket(clientSktHandle, EPOLLIN, cast(void*) this);
}

static if (serverino.common.Backend == BackendType.KQUEUE)
if(hasBuffer)
{
hasBuffer = false;
import serverino.daemon : Daemon;
Daemon.changeList.append(kevent(clientSkt.handle, EVFILT_READ, EV_ADD, 0, 0, cast(void*) this));
auto change = &Daemon.changeList[Daemon.changes];
change.ident = clientSktHandle;
change.filter = EVFILT_READ;
change.flags = EV_ADD | EV_ENABLE;
change.fflags = 0;
change.data = 0;
change.udata = cast(void*) this;
Daemon.changes++;
}

bufferSent = 0;
Expand Down Expand Up @@ -509,13 +539,20 @@ package class Communicator
hasBuffer = true;
import serverino.daemon : Daemon;
import core.sys.linux.epoll : EPOLLIN, EPOLLOUT;
Daemon.epollEditSocket(clientSkt, EPOLLIN | EPOLLOUT, cast(void*) this);
Daemon.epollEditSocket(clientSktHandle, EPOLLIN | EPOLLOUT, cast(void*) this);
}
else static if(serverino.common.Backend == BackendType.KQUEUE)
{
hasBuffer = true;
import serverino.daemon : Daemon;
Daemon.changeList.append(kevent(clientSkt.handle, EVFILT_READ | EVFILT_WRITE, EV_ADD, 0, 0, cast(void*) this));
auto change = &Daemon.changeList[Daemon.changes];
change.ident = clientSktHandle;
change.filter = EVFILT_READ | EVFILT_WRITE;
change.flags = EV_ADD | EV_ENABLE;
change.fflags = 0;
change.data = 0;
change.udata = cast(void*) this;
Daemon.changes++;
}

}
Expand Down Expand Up @@ -545,13 +582,20 @@ package class Communicator
hasBuffer = true;
import serverino.daemon : Daemon;
import core.sys.linux.epoll : EPOLLIN, EPOLLOUT;
Daemon.epollEditSocket(clientSkt, EPOLLIN | EPOLLOUT, cast(void*) this);
Daemon.epollEditSocket(clientSktHandle, EPOLLIN | EPOLLOUT, cast(void*) this);
}
else static if(serverino.common.Backend == BackendType.KQUEUE)
{
hasBuffer = true;
import serverino.daemon : Daemon;
Daemon.changeList.append(kevent(clientSkt.handle, EVFILT_READ | EVFILT_WRITE, EV_ADD, 0, 0, cast(void*) this));
auto change = &Daemon.changeList[Daemon.changes];
change.ident = clientSktHandle;
change.filter = EVFILT_READ | EVFILT_WRITE;
change.flags = EV_ADD | EV_ENABLE;
change.fflags = 0;
change.data = 0;
change.udata = cast(void*) this;
Daemon.changes++;
}
}

Expand Down Expand Up @@ -1017,7 +1061,9 @@ package class Communicator
size_t responseSent;
size_t responseLength;
size_t id;

Socket clientSkt;
socket_t clientSktHandle = socket_t.max; // For KQUEUE and EPOLL, we need to keep the socket handle

ProtoRequest requestToProcess;
WorkerInfo worker;
Expand Down
68 changes: 47 additions & 21 deletions source/serverino/daemon.d
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import std.experimental.logger : log, info, warning;
import std.process : ProcessPipes;

import std.format : format;
import std.socket : Socket, SocketSet, SocketType, AddressFamily, SocketShutdown, TcpSocket, SocketOption, SocketOptionLevel, SocketException;
import std.socket : Socket, SocketSet, SocketType, AddressFamily, SocketShutdown, TcpSocket, SocketOption, SocketOptionLevel, SocketException, socket_t;
import std.algorithm : filter;
import std.datetime : SysTime, Clock, seconds;

Expand Down Expand Up @@ -127,6 +127,7 @@ package class WorkerInfo
Socket accepted = s.accept();
this.pi = new ProcessInfo(pipes.pid.processID);
this.unixSocket = accepted;
this.unixSocketHandle = accepted.handle;

// Wait for the worker to wake up.
ubyte[1] data;
Expand All @@ -136,11 +137,18 @@ package class WorkerInfo
{
import serverino.daemon : Daemon;
import core.sys.linux.epoll : EPOLLIN;
Daemon.epollAddSocket(accepted, EPOLLIN, cast(void*) this);
Daemon.epollAddSocket(unixSocketHandle, EPOLLIN, cast(void*) this);
}
else static if (serverino.common.Backend == BackendType.KQUEUE) {
import serverino.daemon : Daemon;
Daemon.changeList.append(kevent(accepted.handle, EVFILT_READ, EV_ADD, 0, 0, cast(void*) this));
auto change = &Daemon.changeList[Daemon.changes];
change.ident = unixSocketHandle;
change.filter = EVFILT_READ;
change.flags = EV_ADD | EV_ENABLE;
change.fflags = 0;
change.data = 0;
change.udata = cast(void*) this;
Daemon.changes++;
}

setStatus(WorkerInfo.State.IDLING);
Expand All @@ -165,13 +173,18 @@ package class WorkerInfo
static if (serverino.common.Backend == BackendType.EPOLL)
{
import serverino.daemon : Daemon;
Daemon.epollRemoveSocket(unixSocket);
Daemon.epollRemoveSocket(unixSocketHandle);
}
else static if (serverino.common.Backend == BackendType.KQUEUE)
{
import serverino.daemon : Daemon;
auto kv = kevent(unixSocket.handle, EVFILT_READ | EVFILT_WRITE, EV_DELETE, 0, 0, cast(void*) this);
kevent_f(Daemon.kq, &kv, 1, null, 0, null);
auto change = &Daemon.changeList[Daemon.changes];
change.ident = unixSocketHandle;
change.filter = EVFILT_READ | EVFILT_WRITE;
change.flags = EV_DELETE | EV_DISABLE;
change.data = 0;
change.udata = null;
Daemon.changes++;
}

unixSocket.shutdown(SocketShutdown.BOTH);
Expand Down Expand Up @@ -361,6 +374,8 @@ package:

State status = State.STOPPED;
Socket unixSocket = null;
socket_t unixSocketHandle = socket_t.max;

Communicator communicator = null;
bool reloadRequested = false;
bool isDynamic = false;
Expand Down Expand Up @@ -497,7 +512,8 @@ package:
kq = kqueue();
if (kq == -1) throw new Exception("Failed to create kqueue");
eventList.length = 1024;
changeList.reserve(1024);
changeList.length = 1024;
changes = 0;
}

// Starting all the listeners.
Expand Down Expand Up @@ -563,9 +579,17 @@ package:
}

static if (serverino.common.Backend == BackendType.EPOLL)
epollAddSocket(listener.socket, EPOLLIN, cast(void*)listener);
epollAddSocket(listener.socket.handle, EPOLLIN, cast(void*)listener);
else static if (serverino.common.Backend == BackendType.KQUEUE)
changeList.append(kevent(listener.socket.handle, EVFILT_READ, EV_ADD, 0, 0, cast(void*)listener));
{
auto change = &Daemon.changeList[Daemon.changes];
change.ident = listener.socket.handle;
change.filter = EVFILT_READ;
change.flags = EV_ADD | EV_ENABLE;
change.data = 0;
change.udata = cast(void*)listener;
Daemon.changes++;
}
}

ThreadBase mainThread;
Expand Down Expand Up @@ -650,9 +674,11 @@ package:
import core.stdc.stdlib : exit;
debug import std.stdio : writeln;



auto timeout = timespec(1, 0);
int updates = kevent_f(kq, changeList.array.ptr, cast(int)changeList.length, eventList.ptr, cast(int)eventList.length, &timeout);
changeList.length = 0;
int updates = kevent_f(kq, changeList.ptr, cast(int)changes, eventList.ptr, cast(int)eventList.length, &timeout);
changes = 0;
}

now = CoarseTime.currTime;
Expand Down Expand Up @@ -852,8 +878,6 @@ package:

foreach(ref kevent e; eventList[0..updates])
{
if (e.udata is null)
continue;

Object o = cast(Object)(cast(void*) e.udata);

Expand Down Expand Up @@ -981,29 +1005,30 @@ package:

static if (serverino.common.Backend == BackendType.EPOLL)
{
import std.socket : socket_t;

void epollAddSocket(Socket s, int events, void* ptr)
void epollAddSocket(socket_t s, int events, void* ptr)
{
epoll_event evt;
evt.events = events;
evt.data.ptr = ptr;

auto res = epoll_ctl(epoll, EPOLL_CTL_ADD, s.handle, &evt);
auto res = epoll_ctl(epoll, EPOLL_CTL_ADD, s, &evt);
assert(res == 0);
}

void epollRemoveSocket(Socket s)
void epollRemoveSocket(socket_t s)
{
epoll_ctl(epoll, EPOLL_CTL_DEL, s.handle, null);
epoll_ctl(epoll, EPOLL_CTL_DEL, s, null);
}

void epollEditSocket(Socket s, int events, void* ptr)
void epollEditSocket(socket_t s, int events, void* ptr)
{
epoll_event evt;
evt.events = events;
evt.data.ptr = ptr;

auto res = epoll_ctl(epoll, EPOLL_CTL_MOD, s.handle, &evt);
auto res = epoll_ctl(epoll, EPOLL_CTL_MOD, s, &evt);
assert(res == 0);
}

Expand All @@ -1014,8 +1039,9 @@ package:
import serverino.databuffer;

int kq;
DataBuffer!kevent changeList;
kevent[] eventList;
size_t changes = 0;
kevent[] changeList;
kevent[] eventList;
}

private __gshared:
Expand Down

0 comments on commit ebed90a

Please sign in to comment.