Skip to content

Commit

Permalink
proxy: speed up per-worker backend IO
Browse files Browse the repository at this point in the history
Uses a shorter return path for completed IO objects, resulting in 5%+
performance bump.

Also cleans up some of the remaining hack code from before the IO
subsystem fix.
  • Loading branch information
dormando committed Jan 14, 2025
1 parent ec1fb56 commit 83425f3
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 75 deletions.
7 changes: 4 additions & 3 deletions memcached.c
Original file line number Diff line number Diff line change
Expand Up @@ -547,8 +547,9 @@ void conn_worker_readd(conn *c) {
// Explicit fall-through.
case conn_io_queue:
conn_set_state(c, conn_io_resume);
// machine will know how to return based on secondary state.
drive_machine(c);
// schedule the event, which just runs drive_machine outside of
// any recursion here.
event_active(&c->event, 0, 0);
break;
case conn_write:
case conn_mwrite:
Expand Down Expand Up @@ -3277,8 +3278,8 @@ static void drive_machine(conn *c) {
// NOTE: Considering moving this to outside of the while loop.
// anything that sets a resp to suspended could also set the
// state to conn_io_queue and remove this inline check.
thread_io_queue_submit(c->thread);
conn_set_state(c, conn_io_queue);
thread_io_queue_submit(c->thread);

stop = true;
break;
Expand Down
70 changes: 4 additions & 66 deletions proto_proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -482,27 +482,13 @@ void proxy_thread_init(void *ctx, LIBEVENT_THREAD *thr) {
proxy_init_event_thread(t, ctx, thr->base);
}

// ctx_stack is a stack of io_pending_proxy_t's.
// head of q->s_ctx is the "newest" request so we must push into the head
// of the next queue, as requests are dequeued from the head
void proxy_submit_cb(io_queue_t *q) {
proxy_event_thread_t *e = ((proxy_ctx_t *)q->ctx)->proxy_io_thread;
iop_head_t head;
be_head_t w_head; // worker local stack.
STAILQ_INIT(&head);
STAILQ_INIT(&w_head);

// NOTE: responses get returned in the correct order no matter what, since
// mc_resp's are linked.
// we just need to ensure stuff is parsed off the backend in the correct
// order.
// So we can do with a single list here, but we need to repair the list as
// responses are parsed. (in the req_remaining-- section)
// TODO (v2):
// - except we can't do that because the deferred IO stack isn't
// compatible with queue.h.
// So for now we build the secondary list with an STAILQ, which
// can be transplanted/etc.
while (!STAILQ_EMPTY(&q->stack)) {
mcp_backend_t *be;
io_pending_proxy_t *p = (io_pending_proxy_t *)STAILQ_FIRST(&q->stack);
Expand All @@ -511,11 +497,10 @@ void proxy_submit_cb(io_queue_t *q) {

if (p->background) {
P_DEBUG("%s: fast-returning background object: %p\n", __func__, (void *)p);
// intercept background requests
// this call cannot recurse if we're on the worker thread,
// since the worker thread has to finish executing this
// function in order to pick up the returned IO.
return_io_pending((io_pending_t *)p);
assert(p->backend == NULL);
// must not resume requests inline here but they can be scheduled
// to run drive_machine() later.
conn_io_queue_return((io_pending_t *)p);
continue;
}
be = p->backend;
Expand Down Expand Up @@ -844,41 +829,6 @@ static void _proxy_run_tresp_to_resp(mc_resp *tresp, mc_resp *resp) {
resp->skip = tresp->skip;
}

// HACK NOTES:
// These are self-notes for dormando mostly.
// The IO queue system does not work well with the proxy, as we need to:
// - only increment q->count during the submit phase
// - .. because a resumed coroutine can queue more data.
// - and we will never hit q->count == 0
// - .. and then never resume the main connection. (conn_worker_readd)
// - which will never submit the new sub-requests
// - need to only increment q->count once per stack of requests coming from a
// resp.
//
// For RQU backed requests (new API) there isn't an easy place to test for
// "the first request", because:
// - The connection queue is a stack of _all_ requests pending on this
// connection, and many requests can arrive in one batch.
// - Thus we cannot simply check if there are items in the queue
// - RQU's can be recursive, so we have to loop back to the parent to check to
// see if we're the first queue or not.
//
// This hack workaround exists so I can fix the IO queue subsystem as a change
// independent of the RCTX change, as the IO queue touches everything and
// scares the shit out of me. It's much easier to make changes to it in
// isolation, when all existing systems are currently working and testable.
//
// Description of the hack:
// - in mcp_queue_io: roll up rctx to parent, and if we are the first IO to queue
// since the rcontext started, set p->qcounr_incr = true
// Later in submit_cb:
// - q->count++ if p->qcount_incr.
//
// Finally, in proxy_return_rqu_cb:
// - If parent completed non-yielded work, q->count-- to allow conn
// resumption.
// - At bottom of rqu_cb(), flush any IO queues for the connection in case we
// re-queued work.
int proxy_run_rcontext(mcp_rcontext_t *rctx) {
int nresults = 0;
lua_State *Lc = rctx->Lc;
Expand Down Expand Up @@ -1337,18 +1287,6 @@ io_pending_proxy_t *mcp_queue_rctx_io(mcp_rcontext_t *rctx, mcp_request_t *rq, m
mcp_request_attach(rq, p);
}

// HACK
// find the parent rctx
while (rctx->parent) {
rctx = rctx->parent;
}
// Hack to enforce the first iop increments client IO queue counter.
if (!rctx->first_queue) {
rctx->first_queue = true;
p->qcount_incr = true;
}
// END HACK

// link into the batch chain.
STAILQ_INSERT_TAIL(&q->stack, (io_pending_t *)p, iop_next);
P_DEBUG("%s: queued\n", __func__);
Expand Down
2 changes: 0 additions & 2 deletions proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,6 @@ struct _io_pending_proxy_t {
mcp_resp_t *client_resp; // reference (currently pointing to a lua object)
bool flushed; // whether we've fully written this request to a backend.
bool background; // dummy IO for backgrounded awaits
bool qcount_incr; // HACK.
};
};
};
Expand Down Expand Up @@ -745,7 +744,6 @@ struct mcp_rcontext_s {
enum mcp_rqueue_e wait_mode;
uint8_t lua_narg; // number of responses to push when yield resuming.
uint8_t uobj_count; // number of extra tracked req/res objects.
bool first_queue; // HACK
lua_State *Lc; // coroutine thread pointer.
mcp_request_t *request; // ptr to the above reference.
mcp_rcontext_t *parent; // parent rctx in the call graph
Expand Down
1 change: 0 additions & 1 deletion proxy_luafgen.c
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,6 @@ static void _mcp_funcgen_return_rctx(mcp_rcontext_t *rctx) {
}
rctx->wait_mode = QWAIT_IDLE;
rctx->resp = NULL;
rctx->first_queue = false; // HACK
if (rctx->request) {
mcp_request_cleanup(fgen->thread, rctx->request);
}
Expand Down
9 changes: 6 additions & 3 deletions proxy_network.c
Original file line number Diff line number Diff line change
Expand Up @@ -463,10 +463,13 @@ static void _drive_machine_next(struct mcp_backendconn_s *be, io_pending_proxy_t
assert(be->pending_read > -1);

mcp_resp_set_elapsed(p->client_resp);
// have to do the q->count-- and == 0 and redispatch_conn()
// stuff here. The moment we call return_io here we
// The moment we call return_io here we
// don't own *p anymore.
return_io_pending((io_pending_t *)p);
if (!be->be_parent->use_io_thread) {
conn_io_queue_return((io_pending_t *)p);
} else {
return_io_pending((io_pending_t *)p);
}
be->state = mcp_backend_read;
}

Expand Down

0 comments on commit 83425f3

Please sign in to comment.