Skip to content

Commit

Permalink
proxy: remove V1 API
Browse files Browse the repository at this point in the history
res = pool(r)
rtable = mcp.await(r, pools)

... are no longer supported. this paves way for lots of internal
cleanups and optimizations. This is because V2 holds long term
references to pools in higher level objects, so the memory management
for pool objects can be done more exactly.
  • Loading branch information
dormando committed Aug 27, 2024
1 parent 9c81709 commit c7c71c5
Show file tree
Hide file tree
Showing 18 changed files with 454 additions and 1,322 deletions.
2 changes: 1 addition & 1 deletion Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ endif
if ENABLE_PROXY
memcached_SOURCES += proto_proxy.c proto_proxy.h vendor/mcmc/mcmc.h \
proxy_xxhash.c proxy.h \
proxy_await.c proxy_ustats.c \
proxy_ustats.c \
proxy_ratelim.c \
proxy_jump_hash.c proxy_request.c \
proxy_result.c proxy_inspector.c \
Expand Down
1 change: 0 additions & 1 deletion memcached.c
Original file line number Diff line number Diff line change
Expand Up @@ -1819,7 +1819,6 @@ void server_stats(ADD_STAT add_stats, void *c) {
APPEND_STAT("proxy_conn_errors", "%llu", (unsigned long long)thread_stats.proxy_conn_errors);
APPEND_STAT("proxy_conn_oom", "%llu", (unsigned long long)thread_stats.proxy_conn_oom);
APPEND_STAT("proxy_req_active", "%llu", (unsigned long long)thread_stats.proxy_req_active);
APPEND_STAT("proxy_await_active", "%llu", (unsigned long long)thread_stats.proxy_await_active);
}
#endif
APPEND_STAT("cmd_get", "%llu", (unsigned long long)thread_stats.get_cmds);
Expand Down
3 changes: 1 addition & 2 deletions memcached.h
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,7 @@ struct slab_stats {
X(proxy_conn_requests) \
X(proxy_conn_errors) \
X(proxy_conn_oom) \
X(proxy_req_active) \
X(proxy_await_active)
X(proxy_req_active)
#endif

/**
Expand Down
49 changes: 8 additions & 41 deletions proto_proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -503,15 +503,15 @@ void proxy_submit_cb(io_queue_t *q) {
mcp_backend_t *be;
P_DEBUG("%s: queueing req for backend: %p\n", __func__, (void *)p);
if (p->qcount_incr) {
// funny workaround: awaiting IOP's don't count toward
// resuming a connection, only the completion of the await
// funny workaround: async IOP's don't count toward
// resuming a connection, only the completion of the async
// condition.
q->count++;
}

if (p->await_background) {
P_DEBUG("%s: fast-returning await_background object: %p\n", __func__, (void *)p);
// intercept await backgrounds
if (p->background) {
P_DEBUG("%s: fast-returning background object: %p\n", __func__, (void *)p);
// intercept background requests
// this call cannot recurse if we're on the worker thread,
// since the worker thread has to finish executing this
// function in order to pick up the returned IO.
Expand Down Expand Up @@ -577,8 +577,8 @@ void proxy_submit_cb(io_queue_t *q) {
return;
}

// This function handles return processing for the "old style" API: direct
// pool calls and mcp.await()
// This function handles return processing for the "old style" API:
// currently just `mcp.internal()`
void proxy_return_rctx_cb(io_pending_t *pending) {
io_pending_proxy_t *p = (io_pending_proxy_t *)pending;
if (p->client_resp && p->client_resp->blen) {
Expand All @@ -588,17 +588,6 @@ void proxy_return_rctx_cb(io_pending_t *pending) {
p->thread->proxy_vm_extra_kb += kb > 0 ? kb : 1;
}

if (p->is_await) {
p->rctx->async_pending--;
mcplib_await_return(p);
// need to directly attempt to return the context,
// we may or may not be hitting proxy_run_rcontext from await_return.
if (p->rctx->async_pending == 0) {
mcp_funcgen_return_rctx(p->rctx);
}
return;
}

mcp_rcontext_t *rctx = p->rctx;
lua_rotate(rctx->Lc, 1, 1);
lua_settop(rctx->Lc, 1);
Expand Down Expand Up @@ -879,10 +868,6 @@ static void _proxy_run_tresp_to_resp(mc_resp *tresp, mc_resp *resp) {
// - need to only increment q->count once per stack of requests coming from a
// resp.
//
// There are workarounds for this all over. In the await code, we test for
// "the first await object" or "is an await background object", for
// incrementing the q->count
// For pool-backed requests we always increment in submit
// For RQU backed requests (new API) there isn't an easy place to test for
// "the first request", because:
// - The connection queue is a stack of _all_ requests pending on this
Expand Down Expand Up @@ -967,25 +952,7 @@ int proxy_run_rcontext(mcp_rcontext_t *rctx) {
lua_pop(Lc, 1);

int res = 0;
mcp_request_t *rq = NULL;
mcp_backend_t *be = NULL;
mcp_resp_t *r = NULL;
switch (yield_type) {
case MCP_YIELD_AWAIT:
// called with await context on the stack.
rctx->first_queue = false; // HACK: ensure awaits are counted.
mcplib_await_run_rctx(rctx);
break;
case MCP_YIELD_POOL:
// TODO (v2): c only used for cache alloc?
// pool_call checks the argument already.
be = lua_touserdata(Lc, -1);
rq = lua_touserdata(Lc, -2);
// not using a pre-made res object from this yield type.
r = mcp_prep_resobj(Lc, rq, be, c->thread);
rctx->first_queue = false; // HACK: ensure poolreqs are counted.
mcp_queue_rctx_io(rctx, rq, be, r);
break;
case MCP_YIELD_INTERNAL:
// stack should be: rq, res
if (rctx->parent) {
Expand Down Expand Up @@ -1367,7 +1334,7 @@ io_pending_proxy_t *mcp_queue_rctx_io(mcp_rcontext_t *rctx, mcp_request_t *rq, m
p->c = c;
p->client_resp = r;
p->flushed = false;
p->return_cb = proxy_return_rctx_cb;
p->return_cb = NULL;
p->finalize_cb = proxy_finalize_rctx_cb;

// pass along the request context for resumption.
Expand Down
30 changes: 5 additions & 25 deletions proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,10 @@ struct mcp_memprofile {

#define MCP_BACKEND_UPVALUE 1

#define MCP_YIELD_POOL 1
#define MCP_YIELD_AWAIT 2
#define MCP_YIELD_INTERNAL 3
#define MCP_YIELD_WAITCOND 4
#define MCP_YIELD_WAITHANDLE 5
#define MCP_YIELD_SLEEP 6
#define MCP_YIELD_INTERNAL 1
#define MCP_YIELD_WAITCOND 2
#define MCP_YIELD_WAITHANDLE 3
#define MCP_YIELD_SLEEP 4

#define SHAREDVM_FGEN_IDX 1
#define SHAREDVM_FGENSLOT_IDX 2
Expand Down Expand Up @@ -547,13 +545,9 @@ struct _io_pending_proxy_t {
struct iovec iov[2]; // request string + tail buffer
int iovcnt; // 1 or 2...
unsigned int iovbytes; // total bytes in the iovec
int mcpres_ref; // mcp.res reference used for await()
int await_ref; // lua reference if we were an await object
mcp_resp_t *client_resp; // reference (currently pointing to a lua object)
bool flushed; // whether we've fully written this request to a backend.
bool is_await; // are we an await object?
bool await_first; // are we the main route for an await object?
bool await_background; // dummy IO for backgrounded awaits
bool background; // dummy IO for backgrounded awaits
bool qcount_incr; // HACK.
};
};
Expand Down Expand Up @@ -616,20 +610,6 @@ mcp_resp_t *mcp_prep_bare_resobj(lua_State *L, LIBEVENT_THREAD *t);
void mcp_resp_set_elapsed(mcp_resp_t *r);
io_pending_proxy_t *mcp_queue_rctx_io(mcp_rcontext_t *rctx, mcp_request_t *rq, mcp_backend_t *be, mcp_resp_t *r);

// await interface
enum mcp_await_e {
AWAIT_GOOD = 0, // looks for OK + NOT MISS
AWAIT_ANY, // any response, including errors,
AWAIT_OK, // any non-error response
AWAIT_FIRST, // return the result from the first pool
AWAIT_FASTGOOD, // returns on first hit or majority non-error
AWAIT_BACKGROUND, // returns as soon as background jobs are dispatched
};
int mcplib_await(lua_State *L);
int mcplib_await_logerrors(lua_State *L);
int mcplib_await_run_rctx(mcp_rcontext_t *rctx);
int mcplib_await_return(io_pending_proxy_t *p);

// internal request interface
int mcplib_internal(lua_State *L);
int mcplib_internal_run(mcp_rcontext_t *rctx);
Expand Down
Loading

0 comments on commit c7c71c5

Please sign in to comment.