Skip to content

Commit

Permalink
proxy: internal backend for V2 API
Browse files Browse the repository at this point in the history
Replaces `res = mcp.internal(req)` with the standard V2 API flow.
Create a handle for an fgen using `mcp.internal_backend` as an argument,
then call wait against it like a normal pool.
  • Loading branch information
dormando committed Jan 14, 2025
1 parent 83425f3 commit 4252ef4
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 50 deletions.
12 changes: 7 additions & 5 deletions proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ struct _io_pending_proxy_t {
// original struct ends here

mcp_rcontext_t *rctx; // pointer to request context.
mcp_resp_t *client_resp; // reference (currently pointing to a lua object)
int queue_handle; // queue slot to return this result to
bool ascii_multiget; // passed on from mcp_r_t
union {
Expand All @@ -541,7 +542,6 @@ struct _io_pending_proxy_t {
struct iovec iov[2]; // request string + tail buffer
int iovcnt; // 1 or 2...
unsigned int iovbytes; // total bytes in the iovec
mcp_resp_t *client_resp; // reference (currently pointing to a lua object)
bool flushed; // whether we've fully written this request to a backend.
bool background; // dummy IO for backgrounded awaits
};
Expand Down Expand Up @@ -608,6 +608,7 @@ io_pending_proxy_t *mcp_queue_rctx_io(mcp_rcontext_t *rctx, mcp_request_t *rq, m
// internal request interface
int mcplib_internal(lua_State *L);
int mcplib_internal_run(mcp_rcontext_t *rctx);
void *mcp_rcontext_internal(mcp_rcontext_t *rctx, mcp_request_t *rq, mcp_resp_t *r);

// user stats interface
#define MAX_USTATS_DEFAULT 1024
Expand Down Expand Up @@ -697,10 +698,11 @@ struct mcp_funcgen_router {

#define RQUEUE_TYPE_NONE 0
#define RQUEUE_TYPE_POOL 1
#define RQUEUE_TYPE_FGEN 2
#define RQUEUE_TYPE_UOBJ 3 // user tracked object types past this point
#define RQUEUE_TYPE_UOBJ_REQ 4
#define RQUEUE_TYPE_UOBJ_RES 5
#define RQUEUE_TYPE_INT 2
#define RQUEUE_TYPE_FGEN 3
#define RQUEUE_TYPE_UOBJ 4 // user tracked object types past this point
#define RQUEUE_TYPE_UOBJ_REQ 5
#define RQUEUE_TYPE_UOBJ_RES 6
#define RQUEUE_ASSIGNED (1<<0)
#define RQUEUE_R_RESUME (1<<1)
#define RQUEUE_R_GOOD (1<<3)
Expand Down
69 changes: 46 additions & 23 deletions proxy_internal.c
Original file line number Diff line number Diff line change
Expand Up @@ -1637,30 +1637,8 @@ static void process_marithmetic_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_res

/*** Lua and internal handler ***/

int mcplib_internal(lua_State *L) {
luaL_checkudata(L, 1, "mcp.request");
mcp_resp_t *r = lua_newuserdatauv(L, sizeof(mcp_resp_t), 0);
memset(r, 0, sizeof(mcp_resp_t));
luaL_getmetatable(L, "mcp.response");
lua_setmetatable(L, -2);

lua_pushinteger(L, MCP_YIELD_INTERNAL);
return lua_yield(L, 2);
}

// we're pretending to be p_c_ascii(), but reusing our already tokenized code.
// the text parser should eventually move to the new tokenizer and we can
// merge all of this code together.
int mcplib_internal_run(mcp_rcontext_t *rctx) {
lua_State *L = rctx->Lc;
mcp_request_t *rq = luaL_checkudata(L, 1, "mcp.request");
mcp_resp_t *r = luaL_checkudata(L, 2, "mcp.response");
mc_resp *resp = resp_start_unlinked(rctx->c);
LIBEVENT_THREAD *t = rctx->c->thread;
static inline int _mcplib_internal_run(LIBEVENT_THREAD *t, mcp_request_t *rq, mcp_resp_t *r, mc_resp *resp) {
mcp_parser_t *pr = &rq->pr;
if (resp == NULL) {
return -1;
}

// TODO: meta no-op isn't handled here. haven't decided how yet.
switch (rq->pr.command) {
Expand Down Expand Up @@ -1751,6 +1729,51 @@ int mcplib_internal_run(mcp_rcontext_t *rctx) {
// Always return OK from here as this is signalling an internal error.
r->status = MCMC_OK;

return 0;
}

int mcplib_internal(lua_State *L) {
luaL_checkudata(L, 1, "mcp.request");
mcp_resp_t *r = lua_newuserdatauv(L, sizeof(mcp_resp_t), 0);
memset(r, 0, sizeof(mcp_resp_t));
luaL_getmetatable(L, "mcp.response");
lua_setmetatable(L, -2);

lua_pushinteger(L, MCP_YIELD_INTERNAL);
return lua_yield(L, 2);
}

// V2 API internal handling.
void *mcp_rcontext_internal(mcp_rcontext_t *rctx, mcp_request_t *rq, mcp_resp_t *r) {
LIBEVENT_THREAD *t = rctx->fgen->thread;
mc_resp *resp = resp_start_unlinked(rctx->c);
if (resp == NULL) {
return NULL;
}

// TODO: release resp here instead on error?
if (_mcplib_internal_run(t, rq, r, resp) != 0) {
return NULL;
}

return resp;
}

// we're pretending to be p_c_ascii(), but reusing our already tokenized code.
// the text parser should eventually move to the new tokenizer and we can
// merge all of this code together.
int mcplib_internal_run(mcp_rcontext_t *rctx) {
lua_State *L = rctx->Lc;
mcp_request_t *rq = luaL_checkudata(L, 1, "mcp.request");
mcp_resp_t *r = luaL_checkudata(L, 2, "mcp.response");
mc_resp *resp = resp_start_unlinked(rctx->c);
LIBEVENT_THREAD *t = rctx->c->thread;
if (resp == NULL) {
return -1;
}

_mcplib_internal_run(t, rq, r, resp);

// resp object is associated with the
// response object, which is about a
// kilobyte.
Expand Down
10 changes: 10 additions & 0 deletions proxy_lua.c
Original file line number Diff line number Diff line change
Expand Up @@ -1801,6 +1801,10 @@ int proxy_register_libs(void *ctx, LIBEVENT_THREAD *t, void *state) {
luaL_newmetatable(L, "mcp.funcgen");
lua_pop(L, 1);

// mt for magical null wrapper for using internal cache as backend
luaL_newmetatable(L, "mcp.internal_be");
lua_pop(L, 1);

luaL_newlibtable(L, mcplib_f_routes);
} else {
// Change the extra space override for the configuration VM to just point
Expand Down Expand Up @@ -1834,6 +1838,12 @@ int proxy_register_libs(void *ctx, LIBEVENT_THREAD *t, void *state) {
luaL_newlibtable(L, mcplib_f_config);
}

// Create magic empty value to pass as an internal backend.
lua_newuserdatauv(L, 1, 0);
luaL_getmetatable(L, "mcp.internal_be");
lua_setmetatable(L, -2);
lua_setfield(L, -2, "internal_backend");

// create main library table.
//luaL_newlib(L, mcplib_f);
// TODO (v2): luaL_newlibtable() just pre-allocs the exact number of things
Expand Down
50 changes: 48 additions & 2 deletions proxy_luafgen.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ static void mcp_rcontext_cleanup(lua_State *L, mcp_funcgen_t *fgen, mcp_rcontext
// cleanup of request queue entries. recurse funcgen cleanup.
for (int x = 0; x < fgen->max_queues; x++) {
struct mcp_rqueue_s *rqu = &rctx->qslots[x];
if (rqu->obj_type == RQUEUE_TYPE_POOL) {
if (rqu->obj_type == RQUEUE_TYPE_POOL || rqu->obj_type == RQUEUE_TYPE_INT) {
// nothing to do.
} else if (rqu->obj_type == RQUEUE_TYPE_FGEN) {
// don't need to recurse, just free the subrctx.
Expand Down Expand Up @@ -221,7 +221,7 @@ static int _mcplib_funcgen_gencall(lua_State *L) {
struct mcp_rqueue_s *frqu = &fgen->queue_list[x];
struct mcp_rqueue_s *rqu = &rc->qslots[x];
rqu->obj_type = frqu->obj_type;
if (frqu->obj_type == RQUEUE_TYPE_POOL) {
if (frqu->obj_type == RQUEUE_TYPE_POOL || frqu->obj_type == RQUEUE_TYPE_INT) {
rqu->obj_ref = 0;
rqu->obj = frqu->obj;
mcp_resp_t *r = mcp_prep_bare_resobj(L, fgen->thread);
Expand Down Expand Up @@ -641,6 +641,7 @@ int mcplib_funcgen_new_handle(lua_State *L) {
mcp_funcgen_t *fgen = lua_touserdata(L, 1);
mcp_pool_proxy_t *pp = NULL;
mcp_funcgen_t *fg = NULL;
void *test = NULL;

if (fgen->ready) {
proxy_lua_error(L, "cannot modify function generator after calling ready");
Expand All @@ -649,6 +650,8 @@ int mcplib_funcgen_new_handle(lua_State *L) {

if ((pp = luaL_testudata(L, 2, "mcp.pool_proxy")) != NULL) {
// good.
} else if ((test = luaL_testudata(L, 2, "mcp.internal_be")) != NULL) {
// also good.
} else if ((fg = luaL_testudata(L, 2, "mcp.funcgen")) != NULL) {
if (fg->is_router) {
proxy_lua_error(L, "cannot assign a router to a handle in new_handle");
Expand Down Expand Up @@ -682,6 +685,11 @@ int mcplib_funcgen_new_handle(lua_State *L) {
rqu->obj_ref = luaL_ref(L, LUA_REGISTRYINDEX);
rqu->obj_type = RQUEUE_TYPE_POOL;
rqu->obj = pp;
} else if (test) {
// pops test from the stack
rqu->obj_ref = luaL_ref(L, LUA_REGISTRYINDEX);
rqu->obj_type = RQUEUE_TYPE_INT;
rqu->obj = test;
} else {
// pops the fgen from the stack.
mcp_funcgen_reference(L);
Expand Down Expand Up @@ -1151,6 +1159,44 @@ void mcp_run_rcontext_handle(mcp_rcontext_t *rctx, int handle) {
p->return_cb = proxy_return_rqu_cb;
p->queue_handle = handle;
rctx->pending_reqs++;
} else if (rqu->obj_type == RQUEUE_TYPE_INT) {
mcp_request_t *rq = rqu->rq;
mc_resp *resp = mcp_rcontext_internal(rctx, rq, rqu->res_obj);
if (resp == NULL) {
// NOTE: This can be OOM (no resp alloc)
// or bad parse (no such command)
// we _could_ set an ERRMSG here.
mcp_resp_t *r = rqu->res_obj;
r->status = MCMC_ERR;
r->resp.code = MCMC_CODE_SERVER_ERROR;
io_pending_proxy_t *p = mcp_queue_rctx_io(rctx, NULL, NULL, rqu->res_obj);
p->return_cb = proxy_return_rqu_cb;
p->queue_handle = handle;
p->background = true;
rctx->pending_reqs++;
} else if (resp->io_pending) {
resp->io_pending->return_cb = proxy_return_rqu_cb;
// Add io object to extstore submission queue.
io_queue_t *q = thread_io_queue_get(rctx->fgen->thread, IO_QUEUE_EXTSTORE);
io_pending_proxy_t *io = (io_pending_proxy_t *)resp->io_pending;
io->queue_handle = handle;
io->client_resp = rqu->res_obj;

STAILQ_INSERT_TAIL(&q->stack, (io_pending_t *)io, iop_next);

io->rctx = rctx;
io->c = rctx->c;
io->ascii_multiget = rq->ascii_multiget;
// mark the buffer into the mcp_resp for freeing later.
rqu->res_obj->buf = io->eio.buf;
rctx->pending_reqs++;
} else {
io_pending_proxy_t *p = mcp_queue_rctx_io(rctx, NULL, NULL, rqu->res_obj);
p->return_cb = proxy_return_rqu_cb;
p->queue_handle = handle;
p->background = true;
rctx->pending_reqs++;
}
} else if (rqu->obj_type == RQUEUE_TYPE_FGEN) {
// TODO: NULL the ->c post-return?
mcp_rcontext_t *subrctx = rqu->obj;
Expand Down
13 changes: 9 additions & 4 deletions t/proxyinternal.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@ function mcp_config_pools()
return true
end

-- Do specialized testing based on the key prefix.
function mcp_config_routes(zones)
mcp.attach(mcp.CMD_ANY_STORAGE, function(r)
return mcp.internal(r)
end)
local fg = mcp.funcgen_new()
local h = fg:new_handle(mcp.internal_backend)
fg:ready({ n = "internal", f = function(rctx)
return function(r)
return rctx:enqueue_and_wait(r, h)
end
end})

mcp.attach(mcp.CMD_ANY_STORAGE, fg)
end
30 changes: 14 additions & 16 deletions t/proxyinternal2.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,20 @@ function mcp_config_pools()
return true
end

local result_leak = {}
-- Do specialized testing based on the key prefix.
function mcp_config_routes(zones)
mcp.attach(mcp.CMD_ANY_STORAGE, function(r)
local cmd = r:command()
if cmd == mcp.CMD_GET or cmd == mcp.CMD_MG then
-- marking the object as <close> will clean up its internal
-- references as soon as it drops out of scope.
-- it is an error to try to use this 'res' outside of this 'if'
-- statement!
local res <close> = mcp.internal(r)
local res2 = mcp.internal(r)
res2:close() -- test manual closing.
-- uncomment to test effects of leaking a res obj
table.insert(result_leak, res)
local fg = mcp.funcgen_new()
local h1 = fg:new_handle(mcp.internal_backend)
local h2 = fg:new_handle(mcp.internal_backend)
fg:ready({ n = "internal", f = function(rctx)
return function(r)
-- ensure we can't leak by grabbing a result we then don't use.
local cmd = r:command()
if cmd == mcp.CMD_GET or cmd == mcp.CMD_MG then
local res1 = rctx:enqueue_and_wait(r, h1)
end
return rctx:enqueue_and_wait(r, h2)
end
return mcp.internal(r)
end)
end})

mcp.attach(mcp.CMD_ANY_STORAGE, fg)
end
15 changes: 15 additions & 0 deletions t/proxyinternal3.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
function mcp_config_pools()
return true
end

function mcp_config_routes(p)
local fg = mcp.funcgen_new()
local h = fg:new_handle(mcp.internal_backend)
fg:ready({ n = "internal", f = function(rctx)
return function(r)
return rctx:enqueue_and_wait(r, h)
end
end})

mcp.attach(mcp.CMD_ANY_STORAGE, fg)
end

0 comments on commit 4252ef4

Please sign in to comment.