From c7c71c5b8b10ad6ffe2838464f5f1acc0bd2b6be Mon Sep 17 00:00:00 2001 From: dormando Date: Fri, 23 Aug 2024 14:17:14 -0700 Subject: [PATCH] proxy: remove V1 API res = pool(r) rtable = mcp.await(r, pools) ... are no longer supported. this paves way for lots of internal cleanups and optimizations. This is because V2 holds long term references to pools in higher level objects, so the memory management for pool objects can be done more exactly. --- Makefile.am | 2 +- memcached.c | 1 - memcached.h | 3 +- proto_proxy.c | 49 +-- proxy.h | 30 +- proxy_await.c | 382 ----------------- proxy_lua.c | 33 -- proxy_luafgen.c | 8 +- t/proxyantiflap.lua | 11 +- t/proxybestats.lua | 3 +- t/proxyconfig.lua | 14 +- t/proxyconfigmulti2.lua | 11 +- t/proxydepthlim.t | 1 - t/proxyins.t | 2 +- t/proxylimits.lua | 17 +- t/proxytags.lua | 11 +- t/proxyunits.lua | 881 +++++++++++++++++----------------------- t/proxyunits.t | 317 +-------------- 18 files changed, 454 insertions(+), 1322 deletions(-) delete mode 100644 proxy_await.c diff --git a/Makefile.am b/Makefile.am index ad3c5870fb..5891b896e7 100644 --- a/Makefile.am +++ b/Makefile.am @@ -58,7 +58,7 @@ endif if ENABLE_PROXY memcached_SOURCES += proto_proxy.c proto_proxy.h vendor/mcmc/mcmc.h \ proxy_xxhash.c proxy.h \ - proxy_await.c proxy_ustats.c \ + proxy_ustats.c \ proxy_ratelim.c \ proxy_jump_hash.c proxy_request.c \ proxy_result.c proxy_inspector.c \ diff --git a/memcached.c b/memcached.c index 90d876e9d2..23b2588e89 100644 --- a/memcached.c +++ b/memcached.c @@ -1819,7 +1819,6 @@ void server_stats(ADD_STAT add_stats, void *c) { APPEND_STAT("proxy_conn_errors", "%llu", (unsigned long long)thread_stats.proxy_conn_errors); APPEND_STAT("proxy_conn_oom", "%llu", (unsigned long long)thread_stats.proxy_conn_oom); APPEND_STAT("proxy_req_active", "%llu", (unsigned long long)thread_stats.proxy_req_active); - APPEND_STAT("proxy_await_active", "%llu", (unsigned long long)thread_stats.proxy_await_active); } #endif APPEND_STAT("cmd_get", "%llu", (unsigned long long)thread_stats.get_cmds); diff --git a/memcached.h b/memcached.h index 0c250e1c99..5cd74924ed 100644 --- a/memcached.h +++ b/memcached.h @@ -359,8 +359,7 @@ struct slab_stats { X(proxy_conn_requests) \ X(proxy_conn_errors) \ X(proxy_conn_oom) \ - X(proxy_req_active) \ - X(proxy_await_active) + X(proxy_req_active) #endif /** diff --git a/proto_proxy.c b/proto_proxy.c index 11f77bd0f0..cc8f3b9a3b 100644 --- a/proto_proxy.c +++ b/proto_proxy.c @@ -503,15 +503,15 @@ void proxy_submit_cb(io_queue_t *q) { mcp_backend_t *be; P_DEBUG("%s: queueing req for backend: %p\n", __func__, (void *)p); if (p->qcount_incr) { - // funny workaround: awaiting IOP's don't count toward - // resuming a connection, only the completion of the await + // funny workaround: async IOP's don't count toward + // resuming a connection, only the completion of the async // condition. q->count++; } - if (p->await_background) { - P_DEBUG("%s: fast-returning await_background object: %p\n", __func__, (void *)p); - // intercept await backgrounds + if (p->background) { + P_DEBUG("%s: fast-returning background object: %p\n", __func__, (void *)p); + // intercept background requests // this call cannot recurse if we're on the worker thread, // since the worker thread has to finish executing this // function in order to pick up the returned IO. @@ -577,8 +577,8 @@ void proxy_submit_cb(io_queue_t *q) { return; } -// This function handles return processing for the "old style" API: direct -// pool calls and mcp.await() +// This function handles return processing for the "old style" API: +// currently just `mcp.internal()` void proxy_return_rctx_cb(io_pending_t *pending) { io_pending_proxy_t *p = (io_pending_proxy_t *)pending; if (p->client_resp && p->client_resp->blen) { @@ -588,17 +588,6 @@ void proxy_return_rctx_cb(io_pending_t *pending) { p->thread->proxy_vm_extra_kb += kb > 0 ? kb : 1; } - if (p->is_await) { - p->rctx->async_pending--; - mcplib_await_return(p); - // need to directly attempt to return the context, - // we may or may not be hitting proxy_run_rcontext from await_return. - if (p->rctx->async_pending == 0) { - mcp_funcgen_return_rctx(p->rctx); - } - return; - } - mcp_rcontext_t *rctx = p->rctx; lua_rotate(rctx->Lc, 1, 1); lua_settop(rctx->Lc, 1); @@ -879,10 +868,6 @@ static void _proxy_run_tresp_to_resp(mc_resp *tresp, mc_resp *resp) { // - need to only increment q->count once per stack of requests coming from a // resp. // -// There are workarounds for this all over. In the await code, we test for -// "the first await object" or "is an await background object", for -// incrementing the q->count -// For pool-backed requests we always increment in submit // For RQU backed requests (new API) there isn't an easy place to test for // "the first request", because: // - The connection queue is a stack of _all_ requests pending on this @@ -967,25 +952,7 @@ int proxy_run_rcontext(mcp_rcontext_t *rctx) { lua_pop(Lc, 1); int res = 0; - mcp_request_t *rq = NULL; - mcp_backend_t *be = NULL; - mcp_resp_t *r = NULL; switch (yield_type) { - case MCP_YIELD_AWAIT: - // called with await context on the stack. - rctx->first_queue = false; // HACK: ensure awaits are counted. - mcplib_await_run_rctx(rctx); - break; - case MCP_YIELD_POOL: - // TODO (v2): c only used for cache alloc? - // pool_call checks the argument already. - be = lua_touserdata(Lc, -1); - rq = lua_touserdata(Lc, -2); - // not using a pre-made res object from this yield type. - r = mcp_prep_resobj(Lc, rq, be, c->thread); - rctx->first_queue = false; // HACK: ensure poolreqs are counted. - mcp_queue_rctx_io(rctx, rq, be, r); - break; case MCP_YIELD_INTERNAL: // stack should be: rq, res if (rctx->parent) { @@ -1367,7 +1334,7 @@ io_pending_proxy_t *mcp_queue_rctx_io(mcp_rcontext_t *rctx, mcp_request_t *rq, m p->c = c; p->client_resp = r; p->flushed = false; - p->return_cb = proxy_return_rctx_cb; + p->return_cb = NULL; p->finalize_cb = proxy_finalize_rctx_cb; // pass along the request context for resumption. diff --git a/proxy.h b/proxy.h index 107af981be..b19baee051 100644 --- a/proxy.h +++ b/proxy.h @@ -100,12 +100,10 @@ struct mcp_memprofile { #define MCP_BACKEND_UPVALUE 1 -#define MCP_YIELD_POOL 1 -#define MCP_YIELD_AWAIT 2 -#define MCP_YIELD_INTERNAL 3 -#define MCP_YIELD_WAITCOND 4 -#define MCP_YIELD_WAITHANDLE 5 -#define MCP_YIELD_SLEEP 6 +#define MCP_YIELD_INTERNAL 1 +#define MCP_YIELD_WAITCOND 2 +#define MCP_YIELD_WAITHANDLE 3 +#define MCP_YIELD_SLEEP 4 #define SHAREDVM_FGEN_IDX 1 #define SHAREDVM_FGENSLOT_IDX 2 @@ -547,13 +545,9 @@ struct _io_pending_proxy_t { struct iovec iov[2]; // request string + tail buffer int iovcnt; // 1 or 2... unsigned int iovbytes; // total bytes in the iovec - int mcpres_ref; // mcp.res reference used for await() - int await_ref; // lua reference if we were an await object mcp_resp_t *client_resp; // reference (currently pointing to a lua object) bool flushed; // whether we've fully written this request to a backend. - bool is_await; // are we an await object? - bool await_first; // are we the main route for an await object? - bool await_background; // dummy IO for backgrounded awaits + bool background; // dummy IO for backgrounded awaits bool qcount_incr; // HACK. }; }; @@ -616,20 +610,6 @@ mcp_resp_t *mcp_prep_bare_resobj(lua_State *L, LIBEVENT_THREAD *t); void mcp_resp_set_elapsed(mcp_resp_t *r); io_pending_proxy_t *mcp_queue_rctx_io(mcp_rcontext_t *rctx, mcp_request_t *rq, mcp_backend_t *be, mcp_resp_t *r); -// await interface -enum mcp_await_e { - AWAIT_GOOD = 0, // looks for OK + NOT MISS - AWAIT_ANY, // any response, including errors, - AWAIT_OK, // any non-error response - AWAIT_FIRST, // return the result from the first pool - AWAIT_FASTGOOD, // returns on first hit or majority non-error - AWAIT_BACKGROUND, // returns as soon as background jobs are dispatched -}; -int mcplib_await(lua_State *L); -int mcplib_await_logerrors(lua_State *L); -int mcplib_await_run_rctx(mcp_rcontext_t *rctx); -int mcplib_await_return(io_pending_proxy_t *p); - // internal request interface int mcplib_internal(lua_State *L); int mcplib_internal_run(mcp_rcontext_t *rctx); diff --git a/proxy_await.c b/proxy_await.c deleted file mode 100644 index abfe080a33..0000000000 --- a/proxy_await.c +++ /dev/null @@ -1,382 +0,0 @@ -/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ - -#include "proxy.h" - -typedef struct mcp_await_s { - int pending; - int wait_for; - int req_ref; - int argtable_ref; // need to hold refs to any potential hash selectors - int restable_ref; // table of result objects - int detail_ref; // reference to detail string. - enum mcp_await_e type; - bool completed; // have we completed the parent coroutine or not - bool logerr; // create log_req entries for error responses - mcp_request_t *rq; - mc_resp *resp; // the top level mc_resp to fill in (as if we were an iop) - mcp_rcontext_t *rctx; // request context -} mcp_await_t; - -// TODO (v2): mcplib_await_gc() -// - needs to handle cases where an await is created, but a rare error happens -// before it completes and the coroutine is killed. must check and free its -// references. - -// local restable = mcp.await(request, pools, num_wait) -// NOTE: need to hold onto the pool objects since those hold backend -// references. Here we just keep a reference to the argument table. -static int _mcplib_await(lua_State *L, bool logerr) { - mcp_request_t *rq = luaL_checkudata(L, 1, "mcp.request"); - luaL_checktype(L, 2, LUA_TTABLE); - int n = 0; // length of table of pools - int wait_for = 0; // 0 means wait for all responses - enum mcp_await_e type = AWAIT_GOOD; - int detail_ref = 0; - - lua_pushnil(L); // init table key - while (lua_next(L, 2) != 0) { - luaL_checkudata(L, -1, "mcp.pool_proxy"); - lua_pop(L, 1); // remove value, keep key. - n++; - } - - if (n <= 0) { - proxy_lua_error(L, "mcp.await arguments must have at least one pool"); - } - - if (lua_isstring(L, 5)) { - // pops the detail string. - detail_ref = luaL_ref(L, LUA_REGISTRYINDEX); - } - - if (lua_isnumber(L, 4)) { - type = lua_tointeger(L, 4); - lua_pop(L, 1); - switch (type) { - case AWAIT_GOOD: - case AWAIT_ANY: - case AWAIT_OK: - case AWAIT_FIRST: - case AWAIT_FASTGOOD: - case AWAIT_BACKGROUND: - break; - default: - proxy_lua_error(L, "invalid type argument tp mcp.await"); - } - } - - if (lua_isnumber(L, 3)) { - wait_for = lua_tointeger(L, 3); - lua_pop(L, 1); - if (wait_for > n) { - wait_for = n; - } - } - - // FIRST is only looking for one valid request. - if (type == AWAIT_FIRST) { - wait_for = 1; - } - - // TODO (v2): quickly loop table once and ensure they're all pools? - // TODO (v2) in case of newuserdatauv throwing an error, we need to grab - // these references after allocating *aw else can leak memory. - int argtable_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pops the arg table - int req_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pops request object. - - // stack will be only the await object now - // allocate before grabbing references so an error won't cause leaks. - mcp_await_t *aw = lua_newuserdatauv(L, sizeof(mcp_await_t), 0); - memset(aw, 0, sizeof(mcp_await_t)); - - // create result table - lua_newtable(L); // -> 2 - aw->restable_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pop the result table - - aw->wait_for = wait_for; - aw->pending = n; - aw->argtable_ref = argtable_ref; - aw->rq = rq; - aw->req_ref = req_ref; - aw->detail_ref = detail_ref; - aw->type = type; - aw->logerr = logerr; - P_DEBUG("%s: about to yield [len: %d]\n", __func__, n); - - lua_pushinteger(L, MCP_YIELD_AWAIT); - return lua_yield(L, 2); -} - -// default await, no logging. -int mcplib_await(lua_State *L) { - return _mcplib_await(L, false); -} - -int mcplib_await_logerrors(lua_State *L) { - return _mcplib_await(L, true); -} - -// TODO (v2): need to get this code running under pcall(). -// It looks like a bulk of this code can move into mcplib_await(), -// and then here post-yield we can add the rcontext to the right -// places. Else these errors currently crash the daemon. -int mcplib_await_run_rctx(mcp_rcontext_t *rctx) { - P_DEBUG("%s: start\n", __func__); - conn *c = rctx->c; - lua_State *L = rctx->Lc; - WSTAT_INCR(c->thread, proxy_await_active, 1); - mcp_await_t *aw = lua_touserdata(L, -1); - assert(aw != NULL); - int await_ref = luaL_ref(L, LUA_REGISTRYINDEX); // await is popped. - lua_rawgeti(L, LUA_REGISTRYINDEX, aw->argtable_ref); // -> 1 - mcp_request_t *rq = aw->rq; - aw->rctx = rctx; - - // prepare the request key - const char *key = MCP_PARSER_KEY(rq->pr); - size_t len = rq->pr.klen; - // TODO (v3) await_first is used as a marker for upping the "wait for - // IO's" queue count, which means we need to force it off if we're in - // background mode, else we would accidentally wait for a response anyway. - // This note is for finding a less convoluted method for this. - bool await_first = (aw->type == AWAIT_BACKGROUND) ? false : true; - // loop arg table and run each pool backend selector - lua_pushnil(L); // -> 3 - while (lua_next(L, 1) != 0) { - P_DEBUG("%s: top of loop\n", __func__); - // (key, -2), (val, -1) - // skip the metatable checking here as we already check this in - // mcp.await()'s top level call. - mcp_pool_proxy_t *pp = lua_touserdata(L, -1); - if (pp == NULL) { - proxy_lua_error(L, "mcp.await must be supplied with a pool"); - } - - // NOTE: rq->be is only held to help pass the backend into the IOP in - // mcp_queue call. Could be a local variable and an argument too. - mcp_backend_t *be = mcplib_pool_proxy_call_helper(pp, key, len); - if (be == NULL) { - proxy_lua_error(L, "key dist hasher tried to use out of bounds index"); - } - - mcp_resp_t *res = mcp_prep_resobj(L, rq, be, rctx->c->thread); - io_pending_proxy_t *p = mcp_queue_rctx_io(rctx, rq, be, res); - if (p == NULL) { - // TODO: need to unroll this. _gc func? - } - rctx->async_pending++; - p->is_await = true; - p->await_ref = await_ref; - p->await_first = await_first; - // io_p needs to hold onto its own response reference, because we may or - // may not include it in the final await() result. - p->mcpres_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pops mcp.response - - await_first = false; - - // pop value, keep key. - lua_pop(L, 1); - } - - if (aw->type == AWAIT_BACKGROUND) { - io_pending_proxy_t *p = mcp_queue_rctx_io(rctx, NULL, NULL, NULL); - p->is_await = true; - p->await_ref = await_ref; - p->await_background = true; - - rctx->async_pending++; - aw->pending++; - aw->wait_for = 0; - } - - lua_pop(L, 1); // remove table key. - - P_DEBUG("%s: end\n", __func__); - - return 0; -} - -// NOTE: This is unprotected lua/C code. There are no lua-style errors thrown -// purposefully as of this writing, but it's still not safe. Either the code -// can be restructured to use less lua (which I think is better long term -// anyway) or it can be pushed behind a cfunc pcall so we don't crash the -// daemon if something bad happens. -int mcplib_await_return(io_pending_proxy_t *p) { - mcp_await_t *aw; - lua_State *L = p->thread->L; // use the main VM coroutine for work - bool cleanup = false; - bool valid = false; // is response valid to add to the result table. - bool completing = false; - - // TODO (v2): just push the await ptr into *p? - lua_rawgeti(L, LUA_REGISTRYINDEX, p->await_ref); - aw = lua_touserdata(L, -1); - lua_pop(L, 1); // remove AW object from stack - assert(aw != NULL); - P_DEBUG("%s: start [pending: %d]\n", __func__, aw->pending); - //dump_stack(L); - - aw->pending--; - assert(aw->pending >= 0); - // Await not yet satisfied. - // If wait_for != 0 check for response success - // if success and wait_for is *now* 0, we complete. - // add successful response to response table - // Also, if no wait_for, add response to response table - // TODO (v2): for GOOD or OK cases, it might be better to return the - // last object as valid if there are otherwise zero valids? - // Think we just have to count valids... - if (aw->type == AWAIT_BACKGROUND) { - // in the background case, we never want to collect responses. - if (p->await_background) { - // found the dummy IO, complete and return conn to worker. - completing = true; - } - } else if (!aw->completed) { - valid = true; // always collect results unless we are completed. - if (aw->wait_for > 0) { - bool is_good = false; - switch (aw->type) { - case AWAIT_GOOD: - if (p->client_resp->status == MCMC_OK && p->client_resp->resp.code != MCMC_CODE_END) { - is_good = true; - } - break; - case AWAIT_ANY: - is_good = true; - break; - case AWAIT_OK: - if (p->client_resp->status == MCMC_OK) { - is_good = true; - } - break; - case AWAIT_FIRST: - if (p->await_first) { - is_good = true; - } else { - // user only wants the first pool's result. - valid = false; - } - break; - case AWAIT_FASTGOOD: - if (p->client_resp->status == MCMC_OK) { - // End early on a hit. - if (p->client_resp->resp.code != MCMC_CODE_END) { - aw->wait_for = 0; - } else { - is_good = true; - } - } - break; - case AWAIT_BACKGROUND: - // In background mode we don't wait for any response. - break; - } - - if (is_good) { - aw->wait_for--; - } - - if (aw->wait_for == 0) { - completing = true; - } - } - } - - // note that post-completion, we stop gathering responses into the - // response table... because it's already been returned. - // So "valid" can only be true if also !completed - if (aw->pending == 0) { - if (!aw->completed) { - // were waiting for all responses. - completing = true; - } - cleanup = true; - P_DEBUG("%s: pending == 0\n", __func__); - } - - // a valid response to add to the result table. - if (valid) { - P_DEBUG("%s: valid\n", __func__); - lua_rawgeti(L, LUA_REGISTRYINDEX, aw->restable_ref); // -> 1 - lua_rawgeti(L, LUA_REGISTRYINDEX, p->mcpres_ref); // -> 2 - // couldn't find a table.insert() equivalent; so this is - // inserting into the length + 1 position manually. - //dump_stack(L); - lua_rawseti(L, -2, lua_rawlen(L, 1) + 1); // pops mcpres - lua_pop(L, 1); // pops restable - } - - // lose our internal mcpres reference regardless. - // also tag the elapsed time into the response. - if (p->mcpres_ref) { - // NOTE: this is redundant but the code is going away soon. not worth - // testing changing it. - mcp_resp_set_elapsed(p->client_resp); - - // instructed to generate log_req entries for each failed request, - // this is useful to do here as these can be asynchronous. - // NOTE: this may be a temporary feature. - if (aw->logerr && p->client_resp->status != MCMC_OK && aw->completed) { - size_t dlen = 0; - const char *detail = NULL; - logger *l = p->thread->l; - // only process logs if someone is listening. - if (l->eflags & LOG_PROXYREQS) { - lua_rawgeti(L, LUA_REGISTRYINDEX, aw->req_ref); - mcp_request_t *rq = lua_touserdata(L, -1); - lua_pop(L, 1); // references still held, just clearing stack. - mcp_resp_t *rs = p->client_resp; - - if (aw->detail_ref) { - lua_rawgeti(L, LUA_REGISTRYINDEX, aw->detail_ref); - detail = luaL_tolstring(L, -1, &dlen); - lua_pop(L, 1); - } - - logger_log(l, LOGGER_PROXY_REQ, NULL, rq->pr.request, rq->pr.reqlen, rs->elapsed, rs->resp.type, rs->resp.code, rs->status, aw->rctx->conn_fd, detail, dlen, rs->be_name, rs->be_port); - } - } - - luaL_unref(L, LUA_REGISTRYINDEX, p->mcpres_ref); - } - // our await_ref is shared, so we don't need to release it. - - if (completing) { - P_DEBUG("%s: completing\n", __func__); - assert(p->c->thread == p->thread); - aw->completed = true; - lua_State *Lc = p->rctx->Lc; - lua_rawgeti(Lc, LUA_REGISTRYINDEX, aw->restable_ref); // -> 1 - luaL_unref(L, LUA_REGISTRYINDEX, aw->restable_ref); - proxy_run_rcontext(p->rctx); - - io_queue_t *q = conn_io_queue_get(p->c, p->io_queue_type); - q->count--; - if (q->count == 0) { - // call re-add directly since we're already in the worker thread. - conn_worker_readd(p->c); - } - - } - - if (cleanup) { - P_DEBUG("%s: cleanup [completed: %d]\n", __func__, aw->completed); - luaL_unref(L, LUA_REGISTRYINDEX, aw->argtable_ref); - luaL_unref(L, LUA_REGISTRYINDEX, aw->req_ref); - luaL_unref(L, LUA_REGISTRYINDEX, p->await_ref); - if (aw->detail_ref) { - luaL_unref(L, LUA_REGISTRYINDEX, aw->detail_ref); - } - WSTAT_DECR(p->thread, proxy_await_active, 1); - } - - // Just remove anything we could have left on the primary VM stack - lua_settop(L, 0); - - // always return free this sub-IO object. - do_cache_free(p->thread->io_cache, p); - - return 0; -} - diff --git a/proxy_lua.c b/proxy_lua.c index ca7b262026..6cbaf73cc4 100644 --- a/proxy_lua.c +++ b/proxy_lua.c @@ -985,30 +985,6 @@ mcp_backend_t *mcplib_pool_proxy_call_helper(mcp_pool_proxy_t *pp, const char *k return pp->pool[lookup].be; } -// pool(request) -> yields the pool/request for further processing -static int mcplib_pool_proxy_call(lua_State *L) { - mcp_pool_proxy_t *pp = luaL_checkudata(L, -2, "mcp.pool_proxy"); - mcp_request_t *rq = luaL_checkudata(L, -1, "mcp.request"); - - // we have a fast path to the key/length. - if (!rq->pr.keytoken) { - proxy_lua_error(L, "cannot route commands without key"); - return 0; - } - const char *key = MCP_PARSER_KEY(rq->pr); - size_t len = rq->pr.klen; - mcp_backend_t *be = mcplib_pool_proxy_call_helper(pp, key, len); - if (be == NULL) { - proxy_lua_error(L, "key dist hasher tried to use out of bounds index"); - return 0; - } - lua_pushlightuserdata(L, be); - - // now yield request, pool, backend, mode up. - lua_pushinteger(L, MCP_YIELD_POOL); - return lua_yield(L, 4); -} - static int mcplib_backend_use_iothread(lua_State *L) { luaL_checktype(L, -1, LUA_TBOOLEAN); int state = lua_toboolean(L, -1); @@ -1527,12 +1503,6 @@ static void proxy_register_defines(lua_State *L) { X(P_OK); X(CMD_ANY); X(CMD_ANY_STORAGE); - X(AWAIT_GOOD); - X(AWAIT_ANY); - X(AWAIT_OK); - X(AWAIT_FIRST); - X(AWAIT_FASTGOOD); - X(AWAIT_BACKGROUND); Y(QWAIT_ANY, "WAIT_ANY"); Y(QWAIT_OK, "WAIT_OK"); Y(QWAIT_GOOD, "WAIT_GOOD"); @@ -1641,7 +1611,6 @@ int proxy_register_libs(void *ctx, LIBEVENT_THREAD *t, void *state) { }; const struct luaL_Reg mcplib_pool_proxy_m[] = { - {"__call", mcplib_pool_proxy_call}, {"__gc", mcplib_pool_proxy_gc}, {NULL, NULL} }; @@ -1732,8 +1701,6 @@ int proxy_register_libs(void *ctx, LIBEVENT_THREAD *t, void *state) { {"attach", mcplib_attach}, {"funcgen_new", mcplib_funcgen_new}, {"router_new", mcplib_router_new}, - {"await", mcplib_await}, - {"await_logerrors", mcplib_await_logerrors}, {"log", mcplib_log}, {"log_req", mcplib_log_req}, {"log_reqsample", mcplib_log_reqsample}, diff --git a/proxy_luafgen.c b/proxy_luafgen.c index a6d1516cbd..f1696e1ade 100644 --- a/proxy_luafgen.c +++ b/proxy_luafgen.c @@ -836,7 +836,7 @@ static void _mcp_start_rctx_process_error(mcp_rcontext_t *rctx, struct mcp_rqueu io_pending_proxy_t *p = mcp_queue_rctx_io(rctx->parent, NULL, NULL, r); p->return_cb = proxy_return_rqu_cb; p->queue_handle = rctx->parent_handle; - p->await_background = true; + p->background = true; } static void mcp_start_subrctx(mcp_rcontext_t *rctx) { @@ -855,7 +855,7 @@ static void mcp_start_subrctx(mcp_rcontext_t *rctx) { p->queue_handle = rctx->parent_handle; // TODO: change name of property to fast-return once mcp.await is // retired. - p->await_background = true; + p->background = true; } else if (type == LUA_TSTRING) { // TODO: wrap with a resobj and parse it. // for now we bypass the rqueue process handling @@ -867,7 +867,7 @@ static void mcp_start_subrctx(mcp_rcontext_t *rctx) { io_pending_proxy_t *p = mcp_queue_rctx_io(rctx->parent, NULL, NULL, NULL); p->return_cb = proxy_return_rqu_cb; p->queue_handle = rctx->parent_handle; - p->await_background = true; + p->background = true; } else { // generate a generic object with an error. _mcp_start_rctx_process_error(rctx, rqu); @@ -1218,7 +1218,7 @@ int mcplib_rcontext_wait_cond(lua_State *L) { if (rctx->wait_count == 0) { io_pending_proxy_t *p = mcp_queue_rctx_io(rctx, NULL, NULL, NULL); p->return_cb = proxy_return_rqu_dummy_cb; - p->await_background = true; + p->background = true; rctx->pending_reqs++; rctx->wait_mode = QWAIT_IDLE; // not actually waiting. } else if (argc > 3) { diff --git a/t/proxyantiflap.lua b/t/proxyantiflap.lua index a564a7dc2b..b18352c4d4 100644 --- a/t/proxyantiflap.lua +++ b/t/proxyantiflap.lua @@ -22,5 +22,14 @@ function mcp_config_pools() end function mcp_config_routes(pool) - mcp.attach(mcp.CMD_MG, function(r) return pool(r) end) + local fg = mcp.funcgen_new() + local handle = fg:new_handle(pool) + fg:ready({ + f = function(rctx) + return function(r) + return rctx:enqueue_and_wait(r, handle) + end + end + }) + mcp.attach(mcp.CMD_MG, fg) end diff --git a/t/proxybestats.lua b/t/proxybestats.lua index b35bc54105..369458bdb8 100644 --- a/t/proxybestats.lua +++ b/t/proxybestats.lua @@ -11,7 +11,8 @@ end -- not making requests, just opening/closing backends function mcp_config_routes(p) - pool = p -- stash this in a global. + fg = mcp.funcgen_new() -- stash in global + fg:new_handle(p) -- lock the pool from collection mcp.attach(mcp.CMD_MG, function(r) return "SERVER_ERROR nothing\r\n" end) diff --git a/t/proxyconfig.lua b/t/proxyconfig.lua index 04902fa215..a073215e31 100644 --- a/t/proxyconfig.lua +++ b/t/proxyconfig.lua @@ -79,7 +79,17 @@ function mcp_config_routes(zones) mcp.attach(mcp.CMD_MG, function(r) return "SERVER_ERROR no mg route\r\n" end) mcp.attach(mcp.CMD_MS, function(r) return "SERVER_ERROR no ms route\r\n" end) else - mcp.attach(mcp.CMD_MG, function(r) return zones["test"](r) end) - mcp.attach(mcp.CMD_MS, function(r) return zones["test"](r) end) + local fg = mcp.funcgen_new() + local h = fg:new_handle(zones["test"]) + fg:ready({ + f = function(rctx) + return function(r) + return rctx:enqueue_and_wait(r, h) + end + end + }) + + mcp.attach(mcp.CMD_MG, fg) + mcp.attach(mcp.CMD_MS, fg) end end diff --git a/t/proxyconfigmulti2.lua b/t/proxyconfigmulti2.lua index b2eb5ec1e7..3ede2227ed 100644 --- a/t/proxyconfigmulti2.lua +++ b/t/proxyconfigmulti2.lua @@ -1,4 +1,13 @@ function mcp_config_routes(p) - mcp.attach(mcp.CMD_MG, function(r) return p(r) end) + local fg = mcp.funcgen_new() + local h = fg:new_handle(p) + fg:ready({ + f = function(rctx) + return function(r) + return rctx:enqueue_and_wait(r, h) + end + end + }) + mcp.attach(mcp.CMD_MG, fg) end diff --git a/t/proxydepthlim.t b/t/proxydepthlim.t index 9ad9df3a2b..118d4abb02 100644 --- a/t/proxydepthlim.t +++ b/t/proxydepthlim.t @@ -35,7 +35,6 @@ subtest 'over-depth' => sub { } print $holder $todo; sleep 0.25; # ensure holder client is seen first - diag "foo"; # We never look at the backend in this test. $t->c_send($cmd); $t->c_recv("SERVER_ERROR backend failure\r\n", 'depth limit reached'); diff --git a/t/proxyins.t b/t/proxyins.t index 8356d87d36..66e8bd1e48 100644 --- a/t/proxyins.t +++ b/t/proxyins.t @@ -36,7 +36,7 @@ is(<$w>, "OK\r\n"); } sub test_mgintres { - 'note testing mcp.internal()'; + note 'testing mcp.internal()'; $t->c_send("ms intres/tokenint 5 F5\r\n"); $t->c_send("hello\r\n"); $t->c_recv("HD\r\n"); diff --git a/t/proxylimits.lua b/t/proxylimits.lua index 658e56fe34..c125154c36 100644 --- a/t/proxylimits.lua +++ b/t/proxylimits.lua @@ -73,8 +73,17 @@ end -- some tests against the two broad types of commands (gets vs sets with -- payloads) function mcp_config_routes(zones) - mcp.attach(mcp.CMD_MG, function(r) return zones["test"](r) end) - mcp.attach(mcp.CMD_MS, function(r) return zones["test"](r) end) - mcp.attach(mcp.CMD_SET, function(r) return zones["test"](r) end) - mcp.attach(mcp.CMD_GET, function(r) return zones["test"](r) end) + local fg = mcp.funcgen_new() + local h = fg:new_handle(zones["test"]) + fg:ready({ + f = function(rctx) + return function(r) + return rctx:enqueue_and_wait(r, h) + end + end + }) + mcp.attach(mcp.CMD_MG, fg) + mcp.attach(mcp.CMD_MS, fg) + mcp.attach(mcp.CMD_GET, fg) + mcp.attach(mcp.CMD_SET, fg) end diff --git a/t/proxytags.lua b/t/proxytags.lua index 33fe24c6ca..51bd0cec61 100644 --- a/t/proxytags.lua +++ b/t/proxytags.lua @@ -13,8 +13,17 @@ end function mcp_config_routes(p) if mode == "start" then + local fg = mcp.funcgen_new() + local h = fg:new_handle(p) + fg:ready({ + f = function(rctx) + return function(r) + return rctx:enqueue_and_wait(r, h) + end + end + }) -- one without tag - mcp.attach(mcp.CMD_MG, function(r) return p(r) end) + mcp.attach(mcp.CMD_MG, fg) -- no listener on a mcp.attach(mcp.CMD_MG, function(r) return "SERVER_ERROR tag A\r\n" end, "a") -- listener on b diff --git a/t/proxyunits.lua b/t/proxyunits.lua index a66e89f417..ca5c0acbb6 100644 --- a/t/proxyunits.lua +++ b/t/proxyunits.lua @@ -32,572 +32,435 @@ function mcp_config_pools(oldss) end -- WORKER CODE: - --- Using a very simple route handler only to allow testing the three --- workarounds in the same configuration file. -function prefix_factory(pattern, list, default) - local p = pattern - local l = list - local d = default - return function(r) - local route = l[string.match(r:key(), p)] - if route == nil then - return d(r) - end - return route(r) - end -end - --- just for golfing the code in mcp_config_routes() -function toproute_factory(pfx, label) - local err = "SERVER_ERROR no " .. label .. " route\r\n" - return prefix_factory("^/(%a+)/", pfx, function(r) return err end) +function new_basic(zones, func) + local fgen = mcp.funcgen_new() + local o = { t = {}, c = 0 } + + o.t.z1 = fgen:new_handle(zones.z1) + o.t.z2 = fgen:new_handle(zones.z2) + o.t.z3 = fgen:new_handle(zones.z3) + o.t.dead = fgen:new_handle(zones.dead) + o.t.no_label = fgen:new_handle(zones.no_label) + + fgen:ready({ f = func, a = o}) + return fgen end -- Do specialized testing based on the key prefix. function mcp_config_routes(zones) - local pfx_get = {} - local pfx_set = {} - local pfx_touch = {} - local pfx_gets = {} - local pfx_gat = {} - local pfx_gats = {} - local pfx_cas = {} - local pfx_add = {} - local pfx_delete = {} - local pfx_incr = {} - local pfx_decr = {} - local pfx_append = {} - local pfx_prepend = {} - local pfx_mg = {} - local pfx_ms = {} - local pfx_md = {} - local pfx_ma = {} - - local basic = function(r) - return zones.z1(r) - end - - pfx_get["b"] = basic - pfx_set["b"] = basic - pfx_touch["b"] = basic - pfx_gets["b"] = basic - pfx_gat["b"] = basic - pfx_gats["b"] = basic - pfx_cas["b"] = basic - pfx_add["b"] = basic - pfx_delete["b"] = basic - pfx_incr["b"] = basic - pfx_decr["b"] = basic - pfx_append["b"] = basic - pfx_prepend["b"] = basic - pfx_mg["b"] = basic - pfx_ms["b"] = basic - pfx_md["b"] = basic - pfx_ma["b"] = basic - - pfx_get["errcheck"] = function(r) - local res = zones.z1(r) - -- expect an error - if res:ok() then - return "FAIL\r\n" - end - if res:code() == mcp.MCMC_CODE_ERROR then - return "ERROR\r\n" - elseif res:code() == mcp.MCMC_CODE_CLIENT_ERROR then - return "CLIENT_ERROR\r\n" - elseif res:code() == mcp.MCMC_CODE_SERVER_ERROR then - return "SERVER_ERROR\r\n" - end - return "FAIL" - end - - -- show that we fetched the key by generating our own response string. - pfx_get["getkey"] = function(r) - return "VALUE |" .. r:key() .. " 0 2\r\nts\r\nEND\r\n" - end - - pfx_get["rtrimkey"] = function(r) - r:rtrimkey(4) - return zones.z1(r) - end - - pfx_get["ltrimkey"] = function(r) - r:ltrimkey(10) - return zones.z1(r) - end - - pfx_get["nolabel"] = function(r) - return zones.no_label(r) - end - - pfx_mg["ntokens"] = function(r) - return "VA 1 C123 v\r\n" .. r:ntokens() .. "\r\n" - end - - pfx_mg["hasflag"] = function(r) - if r:has_flag("c") then - return "HD C123\r\n" - elseif r:has_flag("O") then - return "HD Oabc\r\n" - end - return "NF\r\n" - end + local map = {} - -- Input flags: N10 k c R10 - -- Output flags: N100 k R100 - pfx_mg["flagtoken"] = function(r) - -- flag_token on non-existing flags: no effect - local Ttoken = r:flag_token("T", "T100") - local Otoken = r:flag_token("O", nil) - local vtoken = r:flag_token("v", "") - if vtoken or Otoken or Ttoken then - return "ERROR found non-existing flag.\r\n" + map.b = new_basic(zones, function(rctx, a) + return function(r) + return rctx:enqueue_and_wait(r, a.t.z1) end + end) - -- flag_token to replace: N10 -> N100 - local found, Ntoken = r:flag_token("N", "N100") - if not found or Ntoken ~= "10" then - return "ERROR unexpected N token.\r\n" + map.errcheck = new_basic(zones, function(rctx, a) + return function(r) + local res = rctx:enqueue_and_wait(r, a.t.z1) + -- expect an error + if res:ok() then + return "FAIL\r\n" + end + if res:code() == mcp.MCMC_CODE_ERROR then + return "ERROR\r\n" + elseif res:code() == mcp.MCMC_CODE_CLIENT_ERROR then + return "CLIENT_ERROR\r\n" + elseif res:code() == mcp.MCMC_CODE_SERVER_ERROR then + return "SERVER_ERROR\r\n" + end + return "FAIL" end + end) - -- flag_token with nil 2nd arg: equvalent to fetch - r:flag_token("k", nil) - if not r:has_flag("k") then - return "ERROR unexpected k token.\r\n" + -- show that we fetched the key by generating our own response string. + map.getkey = new_basic(zones, function(rctx, a) + return function(r) + return "VALUE |" .. r:key() .. " 0 2\r\nts\r\nEND\r\n" end + end) - -- flag_token with self 2nd arg: no effect - r:flag_token("c", "c") - if not r:has_flag("c") then - return "ERROR unexpected c token 1.\r\n" + map.rtrimkey = new_basic(zones, function(rctx, a) + return function(r) + r:rtrimkey(4) + return rctx:enqueue_and_wait(r, a.t.z1) end + end) - -- flag_token with "" 2nd arg: remove - r:flag_token("c", "") - if r:has_flag("c") then - return "ERROR unexpected c token 2.\r\n" + map.ltrimkey = new_basic(zones, function(rctx, a) + return function(r) + r:ltrimkey(10) + return rctx:enqueue_and_wait(r, a.t.z1) end + end) - -- repeated flag_token calls: new value is returned. - local _, Rtoken = r:flag_token("R", "R100") - if Rtoken ~= '10' then - return "ERROR unexpected R token 1.\r\n" - end - _, Rtoken = r:flag_token("R", "R100") - if Rtoken ~= '100' then - return "ERROR unexpected R token 2.\r\n" + map.nolabel = new_basic(zones, function(rctx, a) + return function(r) + return rctx:enqueue_and_wait(r, a.t.no_label) end + end) - return "HD\r\n" - end - - pfx_ms["request"] = function(r) - local key = r:key() - local newReq = mcp.request("ms /request/edit 2\r\n", "ab\r\n") - return zones.z1(newReq) - end - - pfx_mg["request"] = function(r) - local key = r:key() - if key == "/request/old" then - local newReq = mcp.request("mg /request/new c\r\n") - return zones.z1(newReq) - else - local res = zones.z1(r) - local newReq = mcp.request("ms /request/a " .. res:vlen() .. "\r\n", res) - return zones.z1(newReq) + map.ntokens = new_basic(zones, function(rctx, a) + return function(r) + return "VA 1 C123 v\r\n" .. r:ntokens() .. "\r\n" end - end - - pfx_get["response"] = function(r) - local res = zones.z1(r) - local key = r:key() - if key == "/response/hit" then - local hit = res:hit() - if hit then - return res + end) + + map.hasflag = { + [mcp.CMD_MG] = new_basic(zones, function(rctx, a) + return function(r) + if r:has_flag("c") then + return "HD C123\r\n" + elseif r:has_flag("O") then + return "HD Oabc\r\n" + end + return "NF\r\n" end - return "ERROR hit is false\r\n" - elseif key == "/response/not_hit" then - local hit = res:hit() - if not hit then - return "SERVER_ERROR\r\n" + end), + [mcp.CMD_GET] = new_basic(zones, function(rctx, a) + return function(r) + if r:has_flag("F") then + return "ERROR flag found\r\n" + end + return "END\r\n" end - return res - end - return "ERROR unhandled key\r\n" - end - - pfx_mg["response"] = function(r) - local res = zones.z1(r) - local key = r:key() - if key == "/response/elapsed" then - local elapsed = res:elapsed() - if elapsed > 100000 then - return res + end) + } + + -- Input flags: N10 k c R10 + -- Output flags: N100 k R100 + map.flagtoken = new_basic(zones, function(rctx, a) + return function(r) + -- flag_token on non-existing flags: no effect + local Ttoken = r:flag_token("T", "T100") + local Otoken = r:flag_token("O", nil) + local vtoken = r:flag_token("v", "") + if vtoken or Otoken or Ttoken then + return "ERROR found non-existing flag.\r\n" end - return "ERROR elapsed is invalid.\r\n" - elseif key == "/response/ok" then - local ok = res:ok() - if ok then - return res + + -- flag_token to replace: N10 -> N100 + local found, Ntoken = r:flag_token("N", "N100") + if not found or Ntoken ~= "10" then + return "ERROR unexpected N token.\r\n" end - return "ERROR ok is false\r\n" - elseif key == "/response/not_ok" then - local ok = res:ok() - if not ok then - return "SERVER_ERROR\r\n" + + -- flag_token with nil 2nd arg: equvalent to fetch + r:flag_token("k", nil) + if not r:has_flag("k") then + return "ERROR unexpected k token.\r\n" end - return "HD\r\n" - elseif key == "/response/hit" then - local hit = res:hit() - if hit then - return res + + -- flag_token with self 2nd arg: no effect + r:flag_token("c", "c") + if not r:has_flag("c") then + return "ERROR unexpected c token 1.\r\n" end - return "ERROR hit is false\r\n" - elseif key == "/response/not_hit" then - local hit = res:hit() - if not hit then - return "SERVER_ERROR\r\n" + + -- flag_token with "" 2nd arg: remove + r:flag_token("c", "") + if r:has_flag("c") then + return "ERROR unexpected c token 2.\r\n" end - return "HD\r\n" - elseif key == "/response/vlen" then - local vlen = res:vlen() - if vlen == 1 then - return res + + -- repeated flag_token calls: new value is returned. + local _, Rtoken = r:flag_token("R", "R100") + if Rtoken ~= '10' then + return "ERROR unexpected R token 1.\r\n" end - return "ERROR vlen is not 1\r\n" - elseif key == "/response/code_ok" then - local code = res:code() - if code == mcp.MCMC_CODE_OK then - return res + _, Rtoken = r:flag_token("R", "R100") + if Rtoken ~= '100' then + return "ERROR unexpected R token 2.\r\n" end - return "ERROR expect MCMC_CODE_OK, but got " .. code .. "\r\n" - elseif key == "/response/code_miss" then - local code = res:code() - if code == mcp.MCMC_CODE_END then - return res + + return "HD\r\n" + end + end) + + map.request = { + [mcp.CMD_MS] = new_basic(zones, function(rctx, a) + return function(r) + local key = r:key() + local newReq = mcp.request("ms /request/edit 2\r\n", "ab\r\n") + return rctx:enqueue_and_wait(newReq, a.t.z1) end - return "ERROR expect MCMC_CODE_END, but got " .. code .. "\r\n" - elseif key == "/response/line" then - local line = res:line() - if line == "v c123" then - return res + end), + [mcp.CMD_MG] = new_basic(zones, function(rctx, a) + return function(r) + local key = r:key() + if key == "/request/old" then + local newReq = mcp.request("mg /request/new c\r\n") + return rctx:enqueue_and_wait(newReq, a.t.z1) + else + local res = rctx:enqueue_and_wait(r, a.t.z1) + local newReq = mcp.request("ms /request/a " .. res:vlen() .. "\r\n", res) + return rctx:enqueue_and_wait(newReq, a.t.z2) + end end - return "ERROR unexpected line, got [" .. line .. "]\r\n" - elseif key == "/response/blank" then - res:flag_blank("O") - return res - end - return "ERROR unhandled key\r\n" - end + end) + } - pfx_ms["response"] = function(r) - local key = r:key() - local res = zones.z1(r) - local code = res:code() + map.response = { + [mcp.CMD_GET] = new_basic(zones, function(rctx, a) + return function(r) + local res = rctx:enqueue_and_wait(r, a.t.z1) + local key = r:key() + if key == "/response/hit" then + local hit = res:hit() + if hit then + return res + end + return "ERROR hit is false\r\n" + elseif key == "/response/not_hit" then + local hit = res:hit() + if not hit then + return "SERVER_ERROR\r\n" + end + return res + end + return "ERROR unhandled key\r\n" - if key == "/response/code_ok" then - if code == mcp.MCMC_CODE_OK then - return res end - return "ERROR expect MCMC_CODE_OK, but got " .. code .. "\r\n" - elseif key == "/response/line" then - local line = res:line() - if line == "O123 C123" then - return res + end), + [mcp.CMD_MG] = new_basic(zones, function(rctx, a) + return function(r) + local res = rctx:enqueue_and_wait(r, a.t.z1) + local key = r:key() + if key == "/response/elapsed" then + local elapsed = res:elapsed() + if elapsed > 100000 then + return res + end + return "ERROR elapsed is invalid.\r\n" + elseif key == "/response/ok" then + local ok = res:ok() + if ok then + return res + end + return "ERROR ok is false\r\n" + elseif key == "/response/not_ok" then + local ok = res:ok() + if not ok then + return "SERVER_ERROR\r\n" + end + return "HD\r\n" + elseif key == "/response/hit" then + local hit = res:hit() + if hit then + return res + end + return "ERROR hit is false\r\n" + elseif key == "/response/not_hit" then + local hit = res:hit() + if not hit then + return "SERVER_ERROR\r\n" + end + return "HD\r\n" + elseif key == "/response/vlen" then + local vlen = res:vlen() + if vlen == 1 then + return res + end + return "ERROR vlen is not 1\r\n" + elseif key == "/response/code_ok" then + local code = res:code() + if code == mcp.MCMC_CODE_OK then + return res + end + return "ERROR expect MCMC_CODE_OK, but got " .. code .. "\r\n" + elseif key == "/response/code_miss" then + local code = res:code() + if code == mcp.MCMC_CODE_END then + return res + end + return "ERROR expect MCMC_CODE_END, but got " .. code .. "\r\n" + elseif key == "/response/line" then + local line = res:line() + if line == "v c123" then + return res + end + return "ERROR unexpected line, got [" .. line .. "]\r\n" + elseif key == "/response/blank" then + res:flag_blank("O") + return res + end + return "ERROR unhandled key\r\n" + end - return "ERROR unexpected line, got [" .. line .. "]\r\n" - end - return "ERROR unhandled key\r\n" - end - - pfx_set["response"] = function(r) - local res = zones.z1(r) - local key = r:key() - if key == "/response/code_stored" then - local code = res:code() - if code == mcp.MCMC_CODE_STORED then - return res + end), + [mcp.CMD_MS] = new_basic(zones, function(rctx, a) + return function(r) + local key = r:key() + local res = rctx:enqueue_and_wait(r, a.t.z1) + local code = res:code() + + if key == "/response/code_ok" then + if code == mcp.MCMC_CODE_OK then + return res + end + return "ERROR expect MCMC_CODE_OK, but got " .. code .. "\r\n" + elseif key == "/response/line" then + local line = res:line() + if line == "O123 C123" then + return res + end + return "ERROR unexpected line, got [" .. line .. "]\r\n" + end + return "ERROR unhandled key\r\n" end - return "ERROR expect MCMC_CODE_STORED, but got " .. code .. "\r\n" - elseif key == "/response/code_exists" then - local code = res:code() - if code == mcp.MCMC_CODE_EXISTS then - return res + end), + [mcp.CMD_SET] = new_basic(zones, function(rctx, a) + return function(r) + local res = rctx:enqueue_and_wait(r, a.t.z1) + local key = r:key() + if key == "/response/code_stored" then + local code = res:code() + if code == mcp.MCMC_CODE_STORED then + return res + end + return "ERROR expect MCMC_CODE_STORED, but got " .. code .. "\r\n" + elseif key == "/response/code_exists" then + local code = res:code() + if code == mcp.MCMC_CODE_EXISTS then + return res + end + return "ERROR expect MCMC_CODE_EXISTS, but got " .. code .. "\r\n" + elseif key == "/response/code_not_stored" then + local code = res:code() + if code == mcp.MCMC_CODE_NOT_STORED then + return res + end + return "ERROR expect MCMC_CODE_NOT_STORED, but got " .. code .. "\r\n" + elseif key == "/response/code_not_found" then + local code = res:code() + if code == mcp.MCMC_CODE_NOT_FOUND then + return res + end + return "ERROR expect MCMC_CODE_NOT_FOUND, but got " .. code .. "\r\n" + end + return "ERROR unhandled key\r\n" end - return "ERROR expect MCMC_CODE_EXISTS, but got " .. code .. "\r\n" - elseif key == "/response/code_not_stored" then - local code = res:code() - if code == mcp.MCMC_CODE_NOT_STORED then - return res + end), + [mcp.CMD_TOUCH] = new_basic(zones, function(rctx, a) + return function(r) + local res = rctx:enqueue_and_wait(r, a.t.z1) + local key = r:key() + local code = res:code() + if code == mcp.MCMC_CODE_TOUCHED then + return res + end + return "ERROR expect MCMC_CODE_TOUCHED, but got " .. code .. "\r\n" end - return "ERROR expect MCMC_CODE_NOT_STORED, but got " .. code .. "\r\n" - elseif key == "/response/code_not_found" then - local code = res:code() - if code == mcp.MCMC_CODE_NOT_FOUND then - return res + end), + [mcp.CMD_DELETE] = new_basic(zones, function(rctx, a) + return function(r) + local res = rctx:enqueue_and_wait(r, a.t.z1) + local key = r:key() + local code = res:code() + if code == mcp.MCMC_CODE_DELETED then + return res + end + return "ERROR expect MCMC_CODE_DELETED, but got " .. code .. "\r\n" end - return "ERROR expect MCMC_CODE_NOT_FOUND, but got " .. code .. "\r\n" - end - return "ERROR unhandled key\r\n" - end - - pfx_touch["response"] = function(r) - local res = zones.z1(r) - local key = r:key() - local code = res:code() - if code == mcp.MCMC_CODE_TOUCHED then - return res - end - return "ERROR expect MCMC_CODE_TOUCHED, but got " .. code .. "\r\n" - end - - pfx_delete["response"] = function(r) - local res = zones.z1(r) - local key = r:key() - local code = res:code() - if code == mcp.MCMC_CODE_DELETED then - return res - end - return "ERROR expect MCMC_CODE_DELETED, but got " .. code .. "\r\n" - end + end), + } - pfx_get["hasflag"] = function(r) - if r:has_flag("F") then - return "ERROR flag found\r\n" - end - return "END\r\n" - end - - pfx_ms["token"] = function(r) - local key = r:key() - if key == "/token/replacement" then - r:token(4, "C456") - return zones.z1(r) - elseif key == "/token/removal" then - r:token(4, "") - return zones.z1(r) - else - local token = r:token(2) - r:flag_token("P", "P" .. token) - return zones.z1(r) - end - end - - -- Basic test for routing requests to specific pools. - -- Not sure how this could possibly break but testing for completeness. - pfx_get["zonetest"] = function(r) - local key = r:key() - if key == "/zonetest/a" then - return zones.z1(r) - elseif key == "/zonetest/b" then - return zones.z2(r) - elseif key == "/zonetest/c" then - return zones.z3(r) - else - return "END\r\n" + map.token = new_basic(zones, function(rctx, a) + return function(r) + local key = r:key() + if key == "/token/replacement" then + r:token(4, "C456") + elseif key == "/token/removal" then + r:token(4, "") + else + local token = r:token(2) + r:flag_token("P", "P" .. token) + end + return rctx:enqueue_and_wait(r, a.t.z1) end - end - - pfx_get["logtest"] = function(r) - mcp.log("testing manual log messages") - return "END\r\n" - end - - pfx_get["logreqtest"] = function(r) - local res = zones.z1(r) - mcp.log_req(r, res, "logreqtest") - return res - end - - pfx_get["logreqstest"] = function(r) - local res = zones.z1(r) - mcp.log_reqsample(150, 0, true, r, res, "logsampletest") - return res - end - - -- tell caller what we got back via a fake response - pfx_get["awaitbasic"] = function(r) - local vals = {} - local rtable = mcp.await(r, { zones.z1, zones.z2, zones.z3 }) - - for i, res in pairs(rtable) do - if res:hit() == true then - vals[i] = "hit" - elseif res:ok() == true then - vals[i] = "ok" + end) + + map.zonetest = new_basic(zones, function(rctx, a) + return function(r) + local key = r:key() + if key == "/zonetest/a" then + return rctx:enqueue_and_wait(r, a.t.z1) + elseif key == "/zonetest/b" then + return rctx:enqueue_and_wait(r, a.t.z2) + elseif key == "/zonetest/c" then + return rctx:enqueue_and_wait(r, a.t.z3) else - vals[i] = "err" + return "END\r\n" end end + end) - local val = table.concat(vals, " ") - local vlen = string.len(val) - -- convenience functions for creating responses would be nice :) - return "VALUE " .. r:key() .. " 0 " .. vlen .. "\r\n" .. val .. "\r\nEND\r\n" - end - - pfx_get["awaitone"] = function(r) - local mode = string.sub(r:key(), -1, -1) - local num = 0 - if mode == "a" then - num = 1 - elseif mode == "b" then - num = 2 + map.logtest = new_basic(zones, function(rctx, a) + return function(r) + mcp.log("testing manual log messages") + return "END\r\n" end - local rtable = mcp.await(r, { zones.z1, zones.z2, zones.z3 }, num) + end) - local ok_cnt = 0 - local hit_cnt = 0 - local count = 0 - for i, res in pairs(rtable) do - if res:ok() then - ok_cnt = ok_cnt + 1 - end - if res:hit() then - hit_cnt = hit_cnt + 1 - end - count = count + 1 - end - local resp = count .. ":" .. ok_cnt .. ":" .. hit_cnt - - local vlen = string.len(resp) - return "VALUE " .. r:key() .. " 0 " .. vlen .. "\r\n" .. resp .. "\r\nEND\r\n" - end - - -- should be the same as awaitone - pfx_get["awaitgood"] = function(r) - local mode = string.sub(r:key(), -1, -1) - local num = 0 - if mode == "a" then - num = 1 - elseif mode == "b" then - num = 2 + map.logreqtest = new_basic(zones, function(rctx, a) + return function(r) + local res = rctx:enqueue_and_wait(r, a.t.z1) + mcp.log_req(r, res, "logreqtest") + return res end - local rtable = mcp.await(r, { zones.z1, zones.z2, zones.z3 }, num, mcp.AWAIT_GOOD) + end) - local ok_cnt = 0 - local hit_cnt = 0 - local count = 0 - for i, res in pairs(rtable) do - if res:ok() then - ok_cnt = ok_cnt + 1 - end - if res:hit() then - hit_cnt = hit_cnt + 1 - end - count = count + 1 + map.logreqstest = new_basic(zones, function(rctx, a) + return function(r) + local res = rctx:enqueue_and_wait(r, a.t.z1) + mcp.log_reqsample(150, 0, true, r, res, "logsampletest") + return res end - local resp = count .. ":" .. ok_cnt .. ":" .. hit_cnt - - local vlen = string.len(resp) - return "VALUE " .. r:key() .. " 0 " .. vlen .. "\r\n" .. resp .. "\r\nEND\r\n" - end - - -- not sure if anything else should be checked here? if err or not? - pfx_get["awaitany"] = function(r) - local rtable = mcp.await(r, { zones.z1, zones.z2, zones.z3 }, 2, mcp.AWAIT_ANY) - local count = 0 - for i, res in pairs(rtable) do - count = count + 1 + end) + + map.sanity = new_basic(zones, function(rctx, a) + local z = {a.t.z1, a.t.z2, a.t.z3} + return function(r) + rctx:enqueue(r, z) + rctx:wait_cond(3) + return rctx:result(a.t.z3) end + end) - local vlen = string.len(count) - return "VALUE " .. r:key() .. " 0 " .. vlen .. "\r\n" .. count .. "\r\nEND\r\n" - end - - pfx_get["awaitbg"] = function(r) - local rtable = mcp.await(r, { zones.z1, zones.z2, zones.z3 }, 1, mcp.AWAIT_BACKGROUND) - local count = 0 - for i, res in pairs(rtable) do - count = count + 1 + map.dead = new_basic(zones, function(rctx, a) + return function(r) + return rctx:enqueue_and_wait(r, a.t.dead) end + end) - local vlen = string.len(count) - return "VALUE " .. r:key() .. " 0 " .. vlen .. "\r\n" .. count .. "\r\nEND\r\n" - end - - pfx_set["awaitlogerr"] = function(r) - local rtable = mcp.await_logerrors(r, { zones.z1, zones.z2, zones.z3 }, 1, mcp.AWAIT_FASTGOOD, "write_failed") - return rtable[1] - end - - -- testing different styles of building the table argument for mcp.await() - pfx_get["awaitfastgood"] = function(r) - local all_zones = { zones.z1, zones.z2, zones.z3 } - - local restable = mcp.await(r, all_zones, 2, mcp.AWAIT_FASTGOOD) - - local final_res = restable[1] - local count = 0 - for _, res in pairs(restable) do - if res:hit() then - final_res = res + map.deadrespcode = new_basic(zones, function(rctx, a) + return function(r) + local res = rctx:enqueue_and_wait(r, a.t.dead) + if res:code() == mcp.MCMC_CODE_SERVER_ERROR then + return "ERROR code_correct\r\n" end - count = count + 1 + return "ERROR code_incorrect: " .. res:code() .. "\r\n" end + end) - return final_res - end - - pfx_set["awaitfastgood"] = function(r) - local all_zones = { zones.z1, zones.z2, zones.z3 } + map.millis = new_basic(zones, function(rctx, a) + return function(r) + local time = mcp.time_real_millis() + return "HD t" .. time .. "\r\n" + end + end) - local restable = mcp.await(r, all_zones, 2) - local count = 0 - local good_res = restable[1] - for _, res in pairs(restable) do - if res:ok() then - good_res = res + local def_fg = mcp.funcgen_new() + def_fg:ready({ + f = function(rctx) + return function(r) + return "SERVER_ERROR no set route\r\n" end - count = count + 1 end + }) - print("Set Response count: " .. count) - return good_res - end - - pfx_touch["sanity"] = function(r) - local rtable = mcp.await(r, { zones.z1, zones.z2, zones.z3 }) - return rtable[3] - end - - pfx_get["dead"] = function(r) - return zones.dead(r) - end - - pfx_get["deadrespcode"] = function(r) - local res = zones.dead(r) - - if res:code() == mcp.MCMC_CODE_SERVER_ERROR then - return "ERROR code_correct\r\n" - end - return "ERROR code_incorrect: " .. res:code() .. "\r\n" - end - - pfx_mg["millis"] = function(r) - local time = mcp.time_real_millis() - return "HD t" .. time .. "\r\n" - end - - mcp.attach(mcp.CMD_GET, toproute_factory(pfx_get, "get")) - mcp.attach(mcp.CMD_SET, toproute_factory(pfx_set, "set")) - mcp.attach(mcp.CMD_TOUCH, toproute_factory(pfx_touch, "touch")) - mcp.attach(mcp.CMD_GETS, toproute_factory(pfx_gets, "gets")) - mcp.attach(mcp.CMD_GAT, toproute_factory(pfx_gat, "gat")) - mcp.attach(mcp.CMD_GATS, toproute_factory(pfx_gats, "gats")) - mcp.attach(mcp.CMD_CAS, toproute_factory(pfx_cas, "cas")) - mcp.attach(mcp.CMD_ADD, toproute_factory(pfx_add, "add")) - mcp.attach(mcp.CMD_DELETE, toproute_factory(pfx_delete, "delete")) - mcp.attach(mcp.CMD_INCR, toproute_factory(pfx_incr, "incr")) - mcp.attach(mcp.CMD_DECR, toproute_factory(pfx_decr, "decr")) - mcp.attach(mcp.CMD_APPEND, toproute_factory(pfx_append, "append")) - mcp.attach(mcp.CMD_PREPEND, toproute_factory(pfx_prepend, "prepend")) - mcp.attach(mcp.CMD_MG, toproute_factory(pfx_mg, "mg")) - mcp.attach(mcp.CMD_MS, toproute_factory(pfx_ms, "ms")) - mcp.attach(mcp.CMD_MD, toproute_factory(pfx_md, "md")) - mcp.attach(mcp.CMD_MA, toproute_factory(pfx_ma, "ma")) - + mcp.attach(mcp.CMD_ANY_STORAGE, mcp.router_new({ + map = map, mode = "anchor", start = "/", stop = "/", default = def_fg + })) end diff --git a/t/proxyunits.t b/t/proxyunits.t index 8eba2bf5bb..a61914fe4c 100644 --- a/t/proxyunits.t +++ b/t/proxyunits.t @@ -273,13 +273,12 @@ sub proxy_test { like(<$w>, qr/ts=(\S+) gid=\d+ type=proxy_backend error=trailingdata name=127.0.0.1 port=11414 label= depth=0 rbuf=garbage/, "got backend error log line"); } -note("Test bugfix for missingend:" . __LINE__); - # This is an example of a test which will only pass before a bugfix is issued. # It's good practice where possible to write a failing test, then check it # against a code fix. We then leave the test in the file for reference. # Though noting when it was fixed is probably better than what I did here :) SKIP: { + note("Test bugfix for missingend:" . __LINE__); skip "Remove this skip line to demonstrate pre-patch bug", 1; # Test issue with finding response complete when read lands between value # size and value + response line in size. @@ -820,12 +819,13 @@ check_sanity($ps); subtest 'request clone response' => sub { # be must receive cloned meta-set from the previous meta-get. my $be = $mbe[0]; + my $be2 = $mbe[1]; print $ps "mg /request/clone v\r\n"; is(scalar <$be>, "mg /request/clone v\r\n", "get passthrough"); print $be "VA 1 v\r\n4\r\n"; - is(scalar <$be>, "ms /request/a 1\r\n", "received cloned meta-set"); - is(scalar <$be>, "4\r\n", "received cloned meta-set value"); - print $be "HD\r\n"; + is(scalar <$be2>, "ms /request/a 1\r\n", "received cloned meta-set"); + is(scalar <$be2>, "4\r\n", "received cloned meta-set value"); + print $be2 "HD\r\n"; is(scalar <$ps>, "HD\r\n", "received HD"); }; } @@ -1148,313 +1148,6 @@ check_sanity($ps); like(<$watcher>, qr/ts=(\S+) gid=\d+ type=proxy_req elapsed=\d+ type=105 code=17 status=0 cfd=\d+ be=127.0.0.1:11411 detail=logsampletest req=get \/logreqstest\/b/, "only got b request from log sample"); } -# Basic proxy stats validation - -# Test user stats - -check_sanity($ps); -# Test await arguments (may move to own file?) -# TODO: the results table from mcp.await() contains all of the results so far, -# regardless of the mode. -# need some tests that show this. -{ - note("Test await argument:" . __LINE__); - - # DEFAULT MODE, i.e. AWAIT_GOOD - - subtest 'await(r, p): send [h, h, h], recv 3 hits' => sub { - # be_recv must receive hit from all three backends - my $key = "/awaitbasic/a"; - my $ps_send = "get $key\r\n"; - my @be_send = ["VALUE $key 0 2\r\nok\r\nEND\r\n"]; - proxy_test( - ps_send => $ps_send, - be_recv => {0 => [$ps_send], 1 => [$ps_send], 2 => [$ps_send]}, - be_send => {0 => @be_send, 1 => @be_send, 2 => @be_send}, - ps_recv => ["VALUE $key 0 11\r\n", "hit hit hit\r\n", "END\r\n"], - ); - }; - - subtest 'await(r, p, 1): send [h], recv 1 response and 2 reconn' => sub { - my $key = "/awaitone/a"; - my $ps_send = "get $key\r\n"; - proxy_test( - ps_send => $ps_send, - be_recv => {0 => [$ps_send], 1 => [$ps_send], 2 => [$ps_send]}, - be_send => {0 => ["VALUE $key 0 2\r\nok\r\nEND\r\n"]}, - ps_recv => ["VALUE $key 0 5\r\n", "1:1:1\r\n", "END\r\n"], - ); - # reconnect due to timeout - $mbe[1] = accept_backend($mocksrvs[1]); - $mbe[2] = accept_backend($mocksrvs[2]); - }; - - subtest 'await(r, p, 1): send [h, m, m], recv 1 response' => sub { - my $key = "/awaitone/a"; - my $ps_send = "get $key\r\n"; - my $be_hit = "VALUE $key 0 2\r\nok\r\nEND\r\n"; - my $be_miss = "END\r\n"; - proxy_test( - ps_send => $ps_send, - be_recv => {0 => [$ps_send], 1 => [$ps_send], 2 => [$ps_send]}, - be_send => {0 => [$be_hit]}, - ps_recv => ["VALUE $key 0 5\r\n", "1:1:1\r\n", "END\r\n"], - ); - # send response to not timeout - $mbe[1]->send($be_miss); - $mbe[2]->send($be_miss); - }; - - subtest 'await(r, p, 1): send [m, m, h], recv 3 responses' => sub { - my $key = "/awaitone/a"; - my $ps_send = "get $key\r\n"; - my $be_hit = "VALUE $key 0 2\r\nok\r\nEND\r\n"; - my $be_miss = "END\r\n"; - proxy_test( - ps_send => $ps_send, - be_recv => {0 => [$ps_send], 1 => [$ps_send], 2 => [$ps_send]}, - be_send => {0 => [$be_miss], 1 => [$be_miss], 2 => [$be_hit]}, - ps_recv => ["VALUE $key 0 5\r\n", "3:3:1\r\n", "END\r\n"], - ); - }; - - subtest 'await(r, p, 1): sent [m, h, m], recv 2 responses' => sub { - my $key = "/awaitone/a"; - my $ps_send = "get $key\r\n"; - my $be_hit = "VALUE $key 0 2\r\nok\r\nEND\r\n"; - my $be_miss = "END\r\n"; - proxy_test( - ps_send => $ps_send, - be_recv => {0 => [$ps_send], 1 => [$ps_send], 2 => [$ps_send]}, - be_send => {0 => [$be_miss], 1 => [$be_hit]}, - ps_recv => ["VALUE $key 0 5\r\n", "2:2:1\r\n", "END\r\n"], - ); - $mbe[2]->send($be_miss); - }; - - subtest 'await(r, p, 1): sent [h, h, h], recv 1 response' => sub { - my $key = "/awaitone/a"; - my $ps_send = "get $key\r\n"; - my $be_hit = "VALUE $key 0 2\r\nok\r\nEND\r\n"; - my $be_miss = "END\r\n"; - proxy_test( - ps_send => $ps_send, - be_recv => {0 => [$ps_send], 1 => [$ps_send], 2 => [$ps_send]}, - be_send => {0 => [$be_hit], 1 => [$be_hit], 2 => [$be_hit]}, - ps_recv => ["VALUE $key 0 5\r\n", "1:1:1\r\n", "END\r\n"], - ); - }; - - subtest 'await(r, p, 2): sent [h, h, h], recv 2 responses' => sub { - my $key = "/awaitone/b"; - my $ps_send = "get $key\r\n"; - my $be_hit = "VALUE $key 0 2\r\nok\r\nEND\r\n"; - my $be_miss = "END\r\n"; - proxy_test( - ps_send => $ps_send, - be_recv => {0 => [$ps_send], 1 => [$ps_send], 2 => [$ps_send]}, - be_send => {0 => [$be_hit], 1 => [$be_hit], 2 => [$be_hit]}, - ps_recv => ["VALUE $key 0 5\r\n", "2:2:2\r\n", "END\r\n"], - ); - }; - - subtest 'await(r, p, 1): send [e, m, h], recv 3 responses and 1 reconn' => sub { - my $key = "/awaitone/a"; - my $ps_send = "get $key\r\n"; - my $be_hit = "VALUE $key 0 2\r\nok\r\nEND\r\n"; - my $be_miss = "END\r\n"; - my $be_error = "ERROR backend failure\r\n"; - proxy_test( - ps_send => $ps_send, - be_recv => {0 => [$ps_send], 1 => [$ps_send], 2 => [$ps_send]}, - be_send => {0 => [$be_error], 1 => [$be_miss], 2 => [$be_hit]}, - ps_recv => ["VALUE $key 0 5\r\n", "3:2:1\r\n", "END\r\n"], - ); - # reconnect due to ERROR - $mbe[0] = accept_backend($mocksrvs[0]); - }; - - subtest 'await(r, p, 1): send [e, m, e], recv 3 responses and 1 reconn' => sub { - my $key = "/awaitone/a"; - my $ps_send = "get $key\r\n"; - my $be_hit = "VALUE $key 0 2\r\nok\r\nEND\r\n"; - my $be_miss = "END\r\n"; - my $be_error = "ERROR failure\r\n"; - my $be_server_error = "SERVER_ERROR backend failure\r\n"; - proxy_test( - ps_send => $ps_send, - be_recv => {0 => [$ps_send], 1 => [$ps_send], 2 => [$ps_send]}, - be_send => {0 => [$be_error], 1 => [$be_miss], 2 => [$be_server_error]}, - ps_recv => ["VALUE $key 0 5\r\n", "3:1:0\r\n", "END\r\n"], - ); - # reconnect due to ERROR - $mbe[0] = accept_backend($mocksrvs[0]); - }; - - subtest 'await(r, p, 1, mcp.AWAIT_GOOD): sent [h, h, h], recv 1 response' => sub { - # ps_recv must receive hit when one backend sent good response. - my $key = "/awaitgood/a"; - my $ps_send = "get $key\r\n"; - my $be_hit = "VALUE $key 0 2\r\nok\r\nEND\r\n"; - my $be_miss = "END\r\n"; - proxy_test( - ps_send => $ps_send, - be_recv => {0 => [$ps_send], 1 => [$ps_send], 2 => [$ps_send]}, - be_send => {0 => [$be_hit], 1 => [$be_hit], 2 => [$be_hit]}, - ps_recv => ["VALUE $key 0 5\r\n", "1:1:1\r\n", "END\r\n"], - ); - }; - - my $cmd; - my $key; - - # await(r, p, 2, mcp.AWAIT_ANY) - $key = "/awaitany/a"; - $cmd = "get $key\r\n"; - print $ps $cmd; - for my $be (@mbe) { - is(scalar <$be>, $cmd, "awaitany backend req"); - print $be "VALUE $key 0 2\r\nok\r\nEND\r\n"; - } - is(scalar <$ps>, "VALUE $key 0 1\r\n", "response from await"); - is(scalar <$ps>, "2\r\n", "looking for a two responses"); - is(scalar <$ps>, "END\r\n", "end from await"); - - # await(r, p, 2, mcp.AWAIT_OK) - # await(r, p, 1, mcp.AWAIT_FIRST) - # more AWAIT_FIRST tests? to see how much it waits on/etc. - # await(r, p, 2, mcp.AWAIT_FASTGOOD) - # - should return 1 res on good, else wait for N non-error responses - $key = "/awaitfastgood/a"; - $cmd = "get $key\r\n"; - print $ps $cmd; - my $fbe = $mbe[0]; - is(scalar <$fbe>, $cmd, "awaitfastgood backend req"); - print $fbe "VALUE $key 0 2\r\nok\r\nEND\r\n"; - # Should have response after the first hit. - is(scalar <$ps>, "VALUE $key 0 2\r\n", "response from await"); - is(scalar <$ps>, "ok\r\n", "await value"); - is(scalar <$ps>, "END\r\n", "end from await"); - for my $be ($mbe[1], $mbe[2]) { - is(scalar <$be>, $cmd, "awaitfastgood backend req"); - print $be "VALUE $key 0 2\r\nok\r\nEND\r\n"; - } - - # test three pools, second response returns good. should have a hit. - print $ps $cmd; - for my $be (@mbe) { - is(scalar <$be>, $cmd, "awaitfastgood backend req"); - } - $fbe = $mbe[0]; - print $fbe "END\r\n"; - $fbe = $mbe[1]; - print $fbe "VALUE $key 0 2\r\nun\r\nEND\r\n"; - is(scalar <$ps>, "VALUE $key 0 2\r\n", "response from await"); - is(scalar <$ps>, "un\r\n", "await value"); - is(scalar <$ps>, "END\r\n", "end from await"); - $fbe = $mbe[2]; - print $fbe "END\r\n"; - - # test three pools, but third returns good. should have returned already - print $ps $cmd; - for my $be ($mbe[0], $mbe[1]) { - is(scalar <$be>, $cmd, "awaitfastgood backend req"); - print $be "END\r\n"; - } - $fbe = $mbe[2]; - is(scalar <$fbe>, $cmd, "awaitfastgood backend req"); - print $fbe "VALUE $key 0 2\r\nnu\r\nEND\r\n"; - is(scalar <$ps>, "END\r\n", "miss from awaitfastgood"); - - # Testing a set related to fastgood. waiting for two responses. - $cmd = "set $key 0 0 2\r\nmo\r\n"; - print $ps $cmd; - for my $be ($mbe[0], $mbe[1]) { - is(scalar <$be>, "set $key 0 0 2\r\n", "set backend req"); - is(scalar <$be>, "mo\r\n", "set backend data"); - print $be "STORED\r\n"; - } - is(scalar <$ps>, "STORED\r\n", "got stored from await"); - $fbe = $mbe[2]; - is(scalar <$fbe>, "set $key 0 0 2\r\n", "set backend req"); - is(scalar <$fbe>, "mo\r\n", "set backend data"); - print $fbe "STORED\r\n"; - - # Testing another set; ensure it isn't returning early. - my $s = IO::Select->new(); - $s->add($ps); - print $ps $cmd; - for my $be (@mbe) { - is(scalar <$be>, "set $key 0 0 2\r\n", "set backend req"); - is(scalar <$be>, "mo\r\n", "set backend data"); - } - $fbe = $mbe[0]; - print $fbe "STORED\r\n"; - my @readable = $s->can_read(0.25); - is(scalar @readable, 0, "set doesn't return early"); - for my $be ($mbe[1], $mbe[2]) { - print $be "STORED\r\n"; - } - is(scalar <$ps>, "STORED\r\n", "set completed normally"); - - # await(r, p, 1, mcp.AWAIT_BACKGROUND) - ensure res without waiting - $key = "/awaitbg/a"; - $cmd = "get $key\r\n"; - print $ps $cmd; - # check we can get a response _before_ the backends are consulted. - is(scalar <$ps>, "VALUE $key 0 1\r\n", "response from await"); - is(scalar <$ps>, "0\r\n", "looking for zero responses"); - is(scalar <$ps>, "END\r\n", "end from await"); - for my $be (@mbe) { - is(scalar <$be>, $cmd, "awaitbg backend req"); - print $be "VALUE $key 0 2\r\nok\r\nEND\r\n"; - } - - # test hitting a pool normally then hit mcp.await() - # test hitting mcp.await() then a pool normally -} - -check_sanity($ps); - -{ - note("Test await_logerrors:" . __LINE__); - - my $watcher = $p_srv->new_sock; - print $watcher "watch proxyreqs\n"; - is(<$watcher>, "OK\r\n", "watcher enabled"); - - # test logging errors from special await. - my $key = "/awaitlogerr/a"; - my $cmd = "set $key 0 0 5\r\n"; - print $ps $cmd . "hello\r\n"; - # respond from the first backend normally, then other two with errors. - my $be = $mbe[0]; - is(scalar <$be>, $cmd, "await_logerrors backend req"); - is(scalar <$be>, "hello\r\n", "await_logerrors set payload"); - print $be "STORED\r\n"; - - is(scalar <$ps>, "STORED\r\n", "block until await responded"); - # now ship some errors. - for my $be ($mbe[1], $mbe[2]) { - is(scalar <$be>, $cmd, "await_logerrors backend req"); - is(scalar <$be>, "hello\r\n", "await_logerrors set payload"); - print $be "SERVER_ERROR out of memory\r\n"; - } - - like(<$watcher>, qr/ts=(\S+) gid=\d+ type=proxy_req elapsed=\d+ type=\d+ code=\d+ status=-1 cfd=\d+ be=(\S+) detail=write_failed req=set \/awaitlogerr\/a/, "await_logerrors log entry 1"); - like(<$watcher>, qr/ts=(\S+) gid=\d+ type=proxy_req elapsed=\d+ type=\d+ code=\d+ status=-1 cfd=\d+ be=(\S+) detail=write_failed req=set \/awaitlogerr\/a/, "await_logerrors log entry 2"); - - # Repeat the logreqtest to ensure we only got the log lines we expected. - $cmd = "get /logreqtest/a\r\n"; - print $ps $cmd; - is(scalar <$be>, $cmd, "got passthru for log"); - print $be "END\r\n"; - is(scalar <$ps>, "END\r\n", "got END from log test"); - like(<$watcher>, qr/ts=(\S+) gid=\d+ type=proxy_req elapsed=\d+ type=105 code=17 status=0 cfd=\d+ be=127.0.0.1:11411 detail=logreqtest req=get \/logreqtest\/a/, "found request log entry"); -} - -check_sanity($ps); - # Test out of spec commands from client # - wrong # of tokens # - bad key size