diff --git a/Makefile.am b/Makefile.am index ad3c5870f..5891b896e 100644 --- a/Makefile.am +++ b/Makefile.am @@ -58,7 +58,7 @@ endif if ENABLE_PROXY memcached_SOURCES += proto_proxy.c proto_proxy.h vendor/mcmc/mcmc.h \ proxy_xxhash.c proxy.h \ - proxy_await.c proxy_ustats.c \ + proxy_ustats.c \ proxy_ratelim.c \ proxy_jump_hash.c proxy_request.c \ proxy_result.c proxy_inspector.c \ diff --git a/memcached.c b/memcached.c index 90d876e9d..23b2588e8 100644 --- a/memcached.c +++ b/memcached.c @@ -1819,7 +1819,6 @@ void server_stats(ADD_STAT add_stats, void *c) { APPEND_STAT("proxy_conn_errors", "%llu", (unsigned long long)thread_stats.proxy_conn_errors); APPEND_STAT("proxy_conn_oom", "%llu", (unsigned long long)thread_stats.proxy_conn_oom); APPEND_STAT("proxy_req_active", "%llu", (unsigned long long)thread_stats.proxy_req_active); - APPEND_STAT("proxy_await_active", "%llu", (unsigned long long)thread_stats.proxy_await_active); } #endif APPEND_STAT("cmd_get", "%llu", (unsigned long long)thread_stats.get_cmds); diff --git a/memcached.h b/memcached.h index 0c250e1c9..5cd74924e 100644 --- a/memcached.h +++ b/memcached.h @@ -359,8 +359,7 @@ struct slab_stats { X(proxy_conn_requests) \ X(proxy_conn_errors) \ X(proxy_conn_oom) \ - X(proxy_req_active) \ - X(proxy_await_active) + X(proxy_req_active) #endif /** diff --git a/proto_proxy.c b/proto_proxy.c index 11f77bd0f..cc8f3b9a3 100644 --- a/proto_proxy.c +++ b/proto_proxy.c @@ -503,15 +503,15 @@ void proxy_submit_cb(io_queue_t *q) { mcp_backend_t *be; P_DEBUG("%s: queueing req for backend: %p\n", __func__, (void *)p); if (p->qcount_incr) { - // funny workaround: awaiting IOP's don't count toward - // resuming a connection, only the completion of the await + // funny workaround: async IOP's don't count toward + // resuming a connection, only the completion of the async // condition. q->count++; } - if (p->await_background) { - P_DEBUG("%s: fast-returning await_background object: %p\n", __func__, (void *)p); - // intercept await backgrounds + if (p->background) { + P_DEBUG("%s: fast-returning background object: %p\n", __func__, (void *)p); + // intercept background requests // this call cannot recurse if we're on the worker thread, // since the worker thread has to finish executing this // function in order to pick up the returned IO. @@ -577,8 +577,8 @@ void proxy_submit_cb(io_queue_t *q) { return; } -// This function handles return processing for the "old style" API: direct -// pool calls and mcp.await() +// This function handles return processing for the "old style" API: +// currently just `mcp.internal()` void proxy_return_rctx_cb(io_pending_t *pending) { io_pending_proxy_t *p = (io_pending_proxy_t *)pending; if (p->client_resp && p->client_resp->blen) { @@ -588,17 +588,6 @@ void proxy_return_rctx_cb(io_pending_t *pending) { p->thread->proxy_vm_extra_kb += kb > 0 ? kb : 1; } - if (p->is_await) { - p->rctx->async_pending--; - mcplib_await_return(p); - // need to directly attempt to return the context, - // we may or may not be hitting proxy_run_rcontext from await_return. - if (p->rctx->async_pending == 0) { - mcp_funcgen_return_rctx(p->rctx); - } - return; - } - mcp_rcontext_t *rctx = p->rctx; lua_rotate(rctx->Lc, 1, 1); lua_settop(rctx->Lc, 1); @@ -879,10 +868,6 @@ static void _proxy_run_tresp_to_resp(mc_resp *tresp, mc_resp *resp) { // - need to only increment q->count once per stack of requests coming from a // resp. // -// There are workarounds for this all over. In the await code, we test for -// "the first await object" or "is an await background object", for -// incrementing the q->count -// For pool-backed requests we always increment in submit // For RQU backed requests (new API) there isn't an easy place to test for // "the first request", because: // - The connection queue is a stack of _all_ requests pending on this @@ -967,25 +952,7 @@ int proxy_run_rcontext(mcp_rcontext_t *rctx) { lua_pop(Lc, 1); int res = 0; - mcp_request_t *rq = NULL; - mcp_backend_t *be = NULL; - mcp_resp_t *r = NULL; switch (yield_type) { - case MCP_YIELD_AWAIT: - // called with await context on the stack. - rctx->first_queue = false; // HACK: ensure awaits are counted. - mcplib_await_run_rctx(rctx); - break; - case MCP_YIELD_POOL: - // TODO (v2): c only used for cache alloc? - // pool_call checks the argument already. - be = lua_touserdata(Lc, -1); - rq = lua_touserdata(Lc, -2); - // not using a pre-made res object from this yield type. - r = mcp_prep_resobj(Lc, rq, be, c->thread); - rctx->first_queue = false; // HACK: ensure poolreqs are counted. - mcp_queue_rctx_io(rctx, rq, be, r); - break; case MCP_YIELD_INTERNAL: // stack should be: rq, res if (rctx->parent) { @@ -1367,7 +1334,7 @@ io_pending_proxy_t *mcp_queue_rctx_io(mcp_rcontext_t *rctx, mcp_request_t *rq, m p->c = c; p->client_resp = r; p->flushed = false; - p->return_cb = proxy_return_rctx_cb; + p->return_cb = NULL; p->finalize_cb = proxy_finalize_rctx_cb; // pass along the request context for resumption. diff --git a/proxy.h b/proxy.h index 107af981b..b19baee05 100644 --- a/proxy.h +++ b/proxy.h @@ -100,12 +100,10 @@ struct mcp_memprofile { #define MCP_BACKEND_UPVALUE 1 -#define MCP_YIELD_POOL 1 -#define MCP_YIELD_AWAIT 2 -#define MCP_YIELD_INTERNAL 3 -#define MCP_YIELD_WAITCOND 4 -#define MCP_YIELD_WAITHANDLE 5 -#define MCP_YIELD_SLEEP 6 +#define MCP_YIELD_INTERNAL 1 +#define MCP_YIELD_WAITCOND 2 +#define MCP_YIELD_WAITHANDLE 3 +#define MCP_YIELD_SLEEP 4 #define SHAREDVM_FGEN_IDX 1 #define SHAREDVM_FGENSLOT_IDX 2 @@ -547,13 +545,9 @@ struct _io_pending_proxy_t { struct iovec iov[2]; // request string + tail buffer int iovcnt; // 1 or 2... unsigned int iovbytes; // total bytes in the iovec - int mcpres_ref; // mcp.res reference used for await() - int await_ref; // lua reference if we were an await object mcp_resp_t *client_resp; // reference (currently pointing to a lua object) bool flushed; // whether we've fully written this request to a backend. - bool is_await; // are we an await object? - bool await_first; // are we the main route for an await object? - bool await_background; // dummy IO for backgrounded awaits + bool background; // dummy IO for backgrounded awaits bool qcount_incr; // HACK. }; }; @@ -616,20 +610,6 @@ mcp_resp_t *mcp_prep_bare_resobj(lua_State *L, LIBEVENT_THREAD *t); void mcp_resp_set_elapsed(mcp_resp_t *r); io_pending_proxy_t *mcp_queue_rctx_io(mcp_rcontext_t *rctx, mcp_request_t *rq, mcp_backend_t *be, mcp_resp_t *r); -// await interface -enum mcp_await_e { - AWAIT_GOOD = 0, // looks for OK + NOT MISS - AWAIT_ANY, // any response, including errors, - AWAIT_OK, // any non-error response - AWAIT_FIRST, // return the result from the first pool - AWAIT_FASTGOOD, // returns on first hit or majority non-error - AWAIT_BACKGROUND, // returns as soon as background jobs are dispatched -}; -int mcplib_await(lua_State *L); -int mcplib_await_logerrors(lua_State *L); -int mcplib_await_run_rctx(mcp_rcontext_t *rctx); -int mcplib_await_return(io_pending_proxy_t *p); - // internal request interface int mcplib_internal(lua_State *L); int mcplib_internal_run(mcp_rcontext_t *rctx); diff --git a/proxy_await.c b/proxy_await.c deleted file mode 100644 index abfe080a3..000000000 --- a/proxy_await.c +++ /dev/null @@ -1,382 +0,0 @@ -/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ - -#include "proxy.h" - -typedef struct mcp_await_s { - int pending; - int wait_for; - int req_ref; - int argtable_ref; // need to hold refs to any potential hash selectors - int restable_ref; // table of result objects - int detail_ref; // reference to detail string. - enum mcp_await_e type; - bool completed; // have we completed the parent coroutine or not - bool logerr; // create log_req entries for error responses - mcp_request_t *rq; - mc_resp *resp; // the top level mc_resp to fill in (as if we were an iop) - mcp_rcontext_t *rctx; // request context -} mcp_await_t; - -// TODO (v2): mcplib_await_gc() -// - needs to handle cases where an await is created, but a rare error happens -// before it completes and the coroutine is killed. must check and free its -// references. - -// local restable = mcp.await(request, pools, num_wait) -// NOTE: need to hold onto the pool objects since those hold backend -// references. Here we just keep a reference to the argument table. -static int _mcplib_await(lua_State *L, bool logerr) { - mcp_request_t *rq = luaL_checkudata(L, 1, "mcp.request"); - luaL_checktype(L, 2, LUA_TTABLE); - int n = 0; // length of table of pools - int wait_for = 0; // 0 means wait for all responses - enum mcp_await_e type = AWAIT_GOOD; - int detail_ref = 0; - - lua_pushnil(L); // init table key - while (lua_next(L, 2) != 0) { - luaL_checkudata(L, -1, "mcp.pool_proxy"); - lua_pop(L, 1); // remove value, keep key. - n++; - } - - if (n <= 0) { - proxy_lua_error(L, "mcp.await arguments must have at least one pool"); - } - - if (lua_isstring(L, 5)) { - // pops the detail string. - detail_ref = luaL_ref(L, LUA_REGISTRYINDEX); - } - - if (lua_isnumber(L, 4)) { - type = lua_tointeger(L, 4); - lua_pop(L, 1); - switch (type) { - case AWAIT_GOOD: - case AWAIT_ANY: - case AWAIT_OK: - case AWAIT_FIRST: - case AWAIT_FASTGOOD: - case AWAIT_BACKGROUND: - break; - default: - proxy_lua_error(L, "invalid type argument tp mcp.await"); - } - } - - if (lua_isnumber(L, 3)) { - wait_for = lua_tointeger(L, 3); - lua_pop(L, 1); - if (wait_for > n) { - wait_for = n; - } - } - - // FIRST is only looking for one valid request. - if (type == AWAIT_FIRST) { - wait_for = 1; - } - - // TODO (v2): quickly loop table once and ensure they're all pools? - // TODO (v2) in case of newuserdatauv throwing an error, we need to grab - // these references after allocating *aw else can leak memory. - int argtable_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pops the arg table - int req_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pops request object. - - // stack will be only the await object now - // allocate before grabbing references so an error won't cause leaks. - mcp_await_t *aw = lua_newuserdatauv(L, sizeof(mcp_await_t), 0); - memset(aw, 0, sizeof(mcp_await_t)); - - // create result table - lua_newtable(L); // -> 2 - aw->restable_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pop the result table - - aw->wait_for = wait_for; - aw->pending = n; - aw->argtable_ref = argtable_ref; - aw->rq = rq; - aw->req_ref = req_ref; - aw->detail_ref = detail_ref; - aw->type = type; - aw->logerr = logerr; - P_DEBUG("%s: about to yield [len: %d]\n", __func__, n); - - lua_pushinteger(L, MCP_YIELD_AWAIT); - return lua_yield(L, 2); -} - -// default await, no logging. -int mcplib_await(lua_State *L) { - return _mcplib_await(L, false); -} - -int mcplib_await_logerrors(lua_State *L) { - return _mcplib_await(L, true); -} - -// TODO (v2): need to get this code running under pcall(). -// It looks like a bulk of this code can move into mcplib_await(), -// and then here post-yield we can add the rcontext to the right -// places. Else these errors currently crash the daemon. -int mcplib_await_run_rctx(mcp_rcontext_t *rctx) { - P_DEBUG("%s: start\n", __func__); - conn *c = rctx->c; - lua_State *L = rctx->Lc; - WSTAT_INCR(c->thread, proxy_await_active, 1); - mcp_await_t *aw = lua_touserdata(L, -1); - assert(aw != NULL); - int await_ref = luaL_ref(L, LUA_REGISTRYINDEX); // await is popped. - lua_rawgeti(L, LUA_REGISTRYINDEX, aw->argtable_ref); // -> 1 - mcp_request_t *rq = aw->rq; - aw->rctx = rctx; - - // prepare the request key - const char *key = MCP_PARSER_KEY(rq->pr); - size_t len = rq->pr.klen; - // TODO (v3) await_first is used as a marker for upping the "wait for - // IO's" queue count, which means we need to force it off if we're in - // background mode, else we would accidentally wait for a response anyway. - // This note is for finding a less convoluted method for this. - bool await_first = (aw->type == AWAIT_BACKGROUND) ? false : true; - // loop arg table and run each pool backend selector - lua_pushnil(L); // -> 3 - while (lua_next(L, 1) != 0) { - P_DEBUG("%s: top of loop\n", __func__); - // (key, -2), (val, -1) - // skip the metatable checking here as we already check this in - // mcp.await()'s top level call. - mcp_pool_proxy_t *pp = lua_touserdata(L, -1); - if (pp == NULL) { - proxy_lua_error(L, "mcp.await must be supplied with a pool"); - } - - // NOTE: rq->be is only held to help pass the backend into the IOP in - // mcp_queue call. Could be a local variable and an argument too. - mcp_backend_t *be = mcplib_pool_proxy_call_helper(pp, key, len); - if (be == NULL) { - proxy_lua_error(L, "key dist hasher tried to use out of bounds index"); - } - - mcp_resp_t *res = mcp_prep_resobj(L, rq, be, rctx->c->thread); - io_pending_proxy_t *p = mcp_queue_rctx_io(rctx, rq, be, res); - if (p == NULL) { - // TODO: need to unroll this. _gc func? - } - rctx->async_pending++; - p->is_await = true; - p->await_ref = await_ref; - p->await_first = await_first; - // io_p needs to hold onto its own response reference, because we may or - // may not include it in the final await() result. - p->mcpres_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pops mcp.response - - await_first = false; - - // pop value, keep key. - lua_pop(L, 1); - } - - if (aw->type == AWAIT_BACKGROUND) { - io_pending_proxy_t *p = mcp_queue_rctx_io(rctx, NULL, NULL, NULL); - p->is_await = true; - p->await_ref = await_ref; - p->await_background = true; - - rctx->async_pending++; - aw->pending++; - aw->wait_for = 0; - } - - lua_pop(L, 1); // remove table key. - - P_DEBUG("%s: end\n", __func__); - - return 0; -} - -// NOTE: This is unprotected lua/C code. There are no lua-style errors thrown -// purposefully as of this writing, but it's still not safe. Either the code -// can be restructured to use less lua (which I think is better long term -// anyway) or it can be pushed behind a cfunc pcall so we don't crash the -// daemon if something bad happens. -int mcplib_await_return(io_pending_proxy_t *p) { - mcp_await_t *aw; - lua_State *L = p->thread->L; // use the main VM coroutine for work - bool cleanup = false; - bool valid = false; // is response valid to add to the result table. - bool completing = false; - - // TODO (v2): just push the await ptr into *p? - lua_rawgeti(L, LUA_REGISTRYINDEX, p->await_ref); - aw = lua_touserdata(L, -1); - lua_pop(L, 1); // remove AW object from stack - assert(aw != NULL); - P_DEBUG("%s: start [pending: %d]\n", __func__, aw->pending); - //dump_stack(L); - - aw->pending--; - assert(aw->pending >= 0); - // Await not yet satisfied. - // If wait_for != 0 check for response success - // if success and wait_for is *now* 0, we complete. - // add successful response to response table - // Also, if no wait_for, add response to response table - // TODO (v2): for GOOD or OK cases, it might be better to return the - // last object as valid if there are otherwise zero valids? - // Think we just have to count valids... - if (aw->type == AWAIT_BACKGROUND) { - // in the background case, we never want to collect responses. - if (p->await_background) { - // found the dummy IO, complete and return conn to worker. - completing = true; - } - } else if (!aw->completed) { - valid = true; // always collect results unless we are completed. - if (aw->wait_for > 0) { - bool is_good = false; - switch (aw->type) { - case AWAIT_GOOD: - if (p->client_resp->status == MCMC_OK && p->client_resp->resp.code != MCMC_CODE_END) { - is_good = true; - } - break; - case AWAIT_ANY: - is_good = true; - break; - case AWAIT_OK: - if (p->client_resp->status == MCMC_OK) { - is_good = true; - } - break; - case AWAIT_FIRST: - if (p->await_first) { - is_good = true; - } else { - // user only wants the first pool's result. - valid = false; - } - break; - case AWAIT_FASTGOOD: - if (p->client_resp->status == MCMC_OK) { - // End early on a hit. - if (p->client_resp->resp.code != MCMC_CODE_END) { - aw->wait_for = 0; - } else { - is_good = true; - } - } - break; - case AWAIT_BACKGROUND: - // In background mode we don't wait for any response. - break; - } - - if (is_good) { - aw->wait_for--; - } - - if (aw->wait_for == 0) { - completing = true; - } - } - } - - // note that post-completion, we stop gathering responses into the - // response table... because it's already been returned. - // So "valid" can only be true if also !completed - if (aw->pending == 0) { - if (!aw->completed) { - // were waiting for all responses. - completing = true; - } - cleanup = true; - P_DEBUG("%s: pending == 0\n", __func__); - } - - // a valid response to add to the result table. - if (valid) { - P_DEBUG("%s: valid\n", __func__); - lua_rawgeti(L, LUA_REGISTRYINDEX, aw->restable_ref); // -> 1 - lua_rawgeti(L, LUA_REGISTRYINDEX, p->mcpres_ref); // -> 2 - // couldn't find a table.insert() equivalent; so this is - // inserting into the length + 1 position manually. - //dump_stack(L); - lua_rawseti(L, -2, lua_rawlen(L, 1) + 1); // pops mcpres - lua_pop(L, 1); // pops restable - } - - // lose our internal mcpres reference regardless. - // also tag the elapsed time into the response. - if (p->mcpres_ref) { - // NOTE: this is redundant but the code is going away soon. not worth - // testing changing it. - mcp_resp_set_elapsed(p->client_resp); - - // instructed to generate log_req entries for each failed request, - // this is useful to do here as these can be asynchronous. - // NOTE: this may be a temporary feature. - if (aw->logerr && p->client_resp->status != MCMC_OK && aw->completed) { - size_t dlen = 0; - const char *detail = NULL; - logger *l = p->thread->l; - // only process logs if someone is listening. - if (l->eflags & LOG_PROXYREQS) { - lua_rawgeti(L, LUA_REGISTRYINDEX, aw->req_ref); - mcp_request_t *rq = lua_touserdata(L, -1); - lua_pop(L, 1); // references still held, just clearing stack. - mcp_resp_t *rs = p->client_resp; - - if (aw->detail_ref) { - lua_rawgeti(L, LUA_REGISTRYINDEX, aw->detail_ref); - detail = luaL_tolstring(L, -1, &dlen); - lua_pop(L, 1); - } - - logger_log(l, LOGGER_PROXY_REQ, NULL, rq->pr.request, rq->pr.reqlen, rs->elapsed, rs->resp.type, rs->resp.code, rs->status, aw->rctx->conn_fd, detail, dlen, rs->be_name, rs->be_port); - } - } - - luaL_unref(L, LUA_REGISTRYINDEX, p->mcpres_ref); - } - // our await_ref is shared, so we don't need to release it. - - if (completing) { - P_DEBUG("%s: completing\n", __func__); - assert(p->c->thread == p->thread); - aw->completed = true; - lua_State *Lc = p->rctx->Lc; - lua_rawgeti(Lc, LUA_REGISTRYINDEX, aw->restable_ref); // -> 1 - luaL_unref(L, LUA_REGISTRYINDEX, aw->restable_ref); - proxy_run_rcontext(p->rctx); - - io_queue_t *q = conn_io_queue_get(p->c, p->io_queue_type); - q->count--; - if (q->count == 0) { - // call re-add directly since we're already in the worker thread. - conn_worker_readd(p->c); - } - - } - - if (cleanup) { - P_DEBUG("%s: cleanup [completed: %d]\n", __func__, aw->completed); - luaL_unref(L, LUA_REGISTRYINDEX, aw->argtable_ref); - luaL_unref(L, LUA_REGISTRYINDEX, aw->req_ref); - luaL_unref(L, LUA_REGISTRYINDEX, p->await_ref); - if (aw->detail_ref) { - luaL_unref(L, LUA_REGISTRYINDEX, aw->detail_ref); - } - WSTAT_DECR(p->thread, proxy_await_active, 1); - } - - // Just remove anything we could have left on the primary VM stack - lua_settop(L, 0); - - // always return free this sub-IO object. - do_cache_free(p->thread->io_cache, p); - - return 0; -} - diff --git a/proxy_lua.c b/proxy_lua.c index ca7b26202..6cbaf73cc 100644 --- a/proxy_lua.c +++ b/proxy_lua.c @@ -985,30 +985,6 @@ mcp_backend_t *mcplib_pool_proxy_call_helper(mcp_pool_proxy_t *pp, const char *k return pp->pool[lookup].be; } -// pool(request) -> yields the pool/request for further processing -static int mcplib_pool_proxy_call(lua_State *L) { - mcp_pool_proxy_t *pp = luaL_checkudata(L, -2, "mcp.pool_proxy"); - mcp_request_t *rq = luaL_checkudata(L, -1, "mcp.request"); - - // we have a fast path to the key/length. - if (!rq->pr.keytoken) { - proxy_lua_error(L, "cannot route commands without key"); - return 0; - } - const char *key = MCP_PARSER_KEY(rq->pr); - size_t len = rq->pr.klen; - mcp_backend_t *be = mcplib_pool_proxy_call_helper(pp, key, len); - if (be == NULL) { - proxy_lua_error(L, "key dist hasher tried to use out of bounds index"); - return 0; - } - lua_pushlightuserdata(L, be); - - // now yield request, pool, backend, mode up. - lua_pushinteger(L, MCP_YIELD_POOL); - return lua_yield(L, 4); -} - static int mcplib_backend_use_iothread(lua_State *L) { luaL_checktype(L, -1, LUA_TBOOLEAN); int state = lua_toboolean(L, -1); @@ -1527,12 +1503,6 @@ static void proxy_register_defines(lua_State *L) { X(P_OK); X(CMD_ANY); X(CMD_ANY_STORAGE); - X(AWAIT_GOOD); - X(AWAIT_ANY); - X(AWAIT_OK); - X(AWAIT_FIRST); - X(AWAIT_FASTGOOD); - X(AWAIT_BACKGROUND); Y(QWAIT_ANY, "WAIT_ANY"); Y(QWAIT_OK, "WAIT_OK"); Y(QWAIT_GOOD, "WAIT_GOOD"); @@ -1641,7 +1611,6 @@ int proxy_register_libs(void *ctx, LIBEVENT_THREAD *t, void *state) { }; const struct luaL_Reg mcplib_pool_proxy_m[] = { - {"__call", mcplib_pool_proxy_call}, {"__gc", mcplib_pool_proxy_gc}, {NULL, NULL} }; @@ -1732,8 +1701,6 @@ int proxy_register_libs(void *ctx, LIBEVENT_THREAD *t, void *state) { {"attach", mcplib_attach}, {"funcgen_new", mcplib_funcgen_new}, {"router_new", mcplib_router_new}, - {"await", mcplib_await}, - {"await_logerrors", mcplib_await_logerrors}, {"log", mcplib_log}, {"log_req", mcplib_log_req}, {"log_reqsample", mcplib_log_reqsample}, diff --git a/proxy_luafgen.c b/proxy_luafgen.c index a6d1516cb..f1696e1ad 100644 --- a/proxy_luafgen.c +++ b/proxy_luafgen.c @@ -836,7 +836,7 @@ static void _mcp_start_rctx_process_error(mcp_rcontext_t *rctx, struct mcp_rqueu io_pending_proxy_t *p = mcp_queue_rctx_io(rctx->parent, NULL, NULL, r); p->return_cb = proxy_return_rqu_cb; p->queue_handle = rctx->parent_handle; - p->await_background = true; + p->background = true; } static void mcp_start_subrctx(mcp_rcontext_t *rctx) { @@ -855,7 +855,7 @@ static void mcp_start_subrctx(mcp_rcontext_t *rctx) { p->queue_handle = rctx->parent_handle; // TODO: change name of property to fast-return once mcp.await is // retired. - p->await_background = true; + p->background = true; } else if (type == LUA_TSTRING) { // TODO: wrap with a resobj and parse it. // for now we bypass the rqueue process handling @@ -867,7 +867,7 @@ static void mcp_start_subrctx(mcp_rcontext_t *rctx) { io_pending_proxy_t *p = mcp_queue_rctx_io(rctx->parent, NULL, NULL, NULL); p->return_cb = proxy_return_rqu_cb; p->queue_handle = rctx->parent_handle; - p->await_background = true; + p->background = true; } else { // generate a generic object with an error. _mcp_start_rctx_process_error(rctx, rqu); @@ -1218,7 +1218,7 @@ int mcplib_rcontext_wait_cond(lua_State *L) { if (rctx->wait_count == 0) { io_pending_proxy_t *p = mcp_queue_rctx_io(rctx, NULL, NULL, NULL); p->return_cb = proxy_return_rqu_dummy_cb; - p->await_background = true; + p->background = true; rctx->pending_reqs++; rctx->wait_mode = QWAIT_IDLE; // not actually waiting. } else if (argc > 3) { diff --git a/t/proxyantiflap.lua b/t/proxyantiflap.lua index a564a7dc2..b18352c4d 100644 --- a/t/proxyantiflap.lua +++ b/t/proxyantiflap.lua @@ -22,5 +22,14 @@ function mcp_config_pools() end function mcp_config_routes(pool) - mcp.attach(mcp.CMD_MG, function(r) return pool(r) end) + local fg = mcp.funcgen_new() + local handle = fg:new_handle(pool) + fg:ready({ + f = function(rctx) + return function(r) + return rctx:enqueue_and_wait(r, handle) + end + end + }) + mcp.attach(mcp.CMD_MG, fg) end diff --git a/t/proxybestats.lua b/t/proxybestats.lua index b35bc5410..369458bdb 100644 --- a/t/proxybestats.lua +++ b/t/proxybestats.lua @@ -11,7 +11,8 @@ end -- not making requests, just opening/closing backends function mcp_config_routes(p) - pool = p -- stash this in a global. + fg = mcp.funcgen_new() -- stash in global + fg:new_handle(p) -- lock the pool from collection mcp.attach(mcp.CMD_MG, function(r) return "SERVER_ERROR nothing\r\n" end) diff --git a/t/proxyconfig.lua b/t/proxyconfig.lua index 04902fa21..a073215e3 100644 --- a/t/proxyconfig.lua +++ b/t/proxyconfig.lua @@ -79,7 +79,17 @@ function mcp_config_routes(zones) mcp.attach(mcp.CMD_MG, function(r) return "SERVER_ERROR no mg route\r\n" end) mcp.attach(mcp.CMD_MS, function(r) return "SERVER_ERROR no ms route\r\n" end) else - mcp.attach(mcp.CMD_MG, function(r) return zones["test"](r) end) - mcp.attach(mcp.CMD_MS, function(r) return zones["test"](r) end) + local fg = mcp.funcgen_new() + local h = fg:new_handle(zones["test"]) + fg:ready({ + f = function(rctx) + return function(r) + return rctx:enqueue_and_wait(r, h) + end + end + }) + + mcp.attach(mcp.CMD_MG, fg) + mcp.attach(mcp.CMD_MS, fg) end end diff --git a/t/proxyconfigmulti2.lua b/t/proxyconfigmulti2.lua index b2eb5ec1e..3ede2227e 100644 --- a/t/proxyconfigmulti2.lua +++ b/t/proxyconfigmulti2.lua @@ -1,4 +1,13 @@ function mcp_config_routes(p) - mcp.attach(mcp.CMD_MG, function(r) return p(r) end) + local fg = mcp.funcgen_new() + local h = fg:new_handle(p) + fg:ready({ + f = function(rctx) + return function(r) + return rctx:enqueue_and_wait(r, h) + end + end + }) + mcp.attach(mcp.CMD_MG, fg) end diff --git a/t/proxydepthlim.t b/t/proxydepthlim.t index 9ad9df3a2..118d4abb0 100644 --- a/t/proxydepthlim.t +++ b/t/proxydepthlim.t @@ -35,7 +35,6 @@ subtest 'over-depth' => sub { } print $holder $todo; sleep 0.25; # ensure holder client is seen first - diag "foo"; # We never look at the backend in this test. $t->c_send($cmd); $t->c_recv("SERVER_ERROR backend failure\r\n", 'depth limit reached'); diff --git a/t/proxyins.t b/t/proxyins.t index 8356d87d3..66e8bd1e4 100644 --- a/t/proxyins.t +++ b/t/proxyins.t @@ -36,7 +36,7 @@ is(<$w>, "OK\r\n"); } sub test_mgintres { - 'note testing mcp.internal()'; + note 'testing mcp.internal()'; $t->c_send("ms intres/tokenint 5 F5\r\n"); $t->c_send("hello\r\n"); $t->c_recv("HD\r\n"); diff --git a/t/proxylimits.lua b/t/proxylimits.lua index 658e56fe3..c125154c3 100644 --- a/t/proxylimits.lua +++ b/t/proxylimits.lua @@ -73,8 +73,17 @@ end -- some tests against the two broad types of commands (gets vs sets with -- payloads) function mcp_config_routes(zones) - mcp.attach(mcp.CMD_MG, function(r) return zones["test"](r) end) - mcp.attach(mcp.CMD_MS, function(r) return zones["test"](r) end) - mcp.attach(mcp.CMD_SET, function(r) return zones["test"](r) end) - mcp.attach(mcp.CMD_GET, function(r) return zones["test"](r) end) + local fg = mcp.funcgen_new() + local h = fg:new_handle(zones["test"]) + fg:ready({ + f = function(rctx) + return function(r) + return rctx:enqueue_and_wait(r, h) + end + end + }) + mcp.attach(mcp.CMD_MG, fg) + mcp.attach(mcp.CMD_MS, fg) + mcp.attach(mcp.CMD_GET, fg) + mcp.attach(mcp.CMD_SET, fg) end diff --git a/t/proxytags.lua b/t/proxytags.lua index 33fe24c6c..51bd0cec6 100644 --- a/t/proxytags.lua +++ b/t/proxytags.lua @@ -13,8 +13,17 @@ end function mcp_config_routes(p) if mode == "start" then + local fg = mcp.funcgen_new() + local h = fg:new_handle(p) + fg:ready({ + f = function(rctx) + return function(r) + return rctx:enqueue_and_wait(r, h) + end + end + }) -- one without tag - mcp.attach(mcp.CMD_MG, function(r) return p(r) end) + mcp.attach(mcp.CMD_MG, fg) -- no listener on a mcp.attach(mcp.CMD_MG, function(r) return "SERVER_ERROR tag A\r\n" end, "a") -- listener on b diff --git a/t/proxyunits.lua b/t/proxyunits.lua index a66e89f41..ca5c0acbb 100644 --- a/t/proxyunits.lua +++ b/t/proxyunits.lua @@ -32,572 +32,435 @@ function mcp_config_pools(oldss) end -- WORKER CODE: - --- Using a very simple route handler only to allow testing the three --- workarounds in the same configuration file. -function prefix_factory(pattern, list, default) - local p = pattern - local l = list - local d = default - return function(r) - local route = l[string.match(r:key(), p)] - if route == nil then - return d(r) - end - return route(r) - end -end - --- just for golfing the code in mcp_config_routes() -function toproute_factory(pfx, label) - local err = "SERVER_ERROR no " .. label .. " route\r\n" - return prefix_factory("^/(%a+)/", pfx, function(r) return err end) +function new_basic(zones, func) + local fgen = mcp.funcgen_new() + local o = { t = {}, c = 0 } + + o.t.z1 = fgen:new_handle(zones.z1) + o.t.z2 = fgen:new_handle(zones.z2) + o.t.z3 = fgen:new_handle(zones.z3) + o.t.dead = fgen:new_handle(zones.dead) + o.t.no_label = fgen:new_handle(zones.no_label) + + fgen:ready({ f = func, a = o}) + return fgen end -- Do specialized testing based on the key prefix. function mcp_config_routes(zones) - local pfx_get = {} - local pfx_set = {} - local pfx_touch = {} - local pfx_gets = {} - local pfx_gat = {} - local pfx_gats = {} - local pfx_cas = {} - local pfx_add = {} - local pfx_delete = {} - local pfx_incr = {} - local pfx_decr = {} - local pfx_append = {} - local pfx_prepend = {} - local pfx_mg = {} - local pfx_ms = {} - local pfx_md = {} - local pfx_ma = {} - - local basic = function(r) - return zones.z1(r) - end - - pfx_get["b"] = basic - pfx_set["b"] = basic - pfx_touch["b"] = basic - pfx_gets["b"] = basic - pfx_gat["b"] = basic - pfx_gats["b"] = basic - pfx_cas["b"] = basic - pfx_add["b"] = basic - pfx_delete["b"] = basic - pfx_incr["b"] = basic - pfx_decr["b"] = basic - pfx_append["b"] = basic - pfx_prepend["b"] = basic - pfx_mg["b"] = basic - pfx_ms["b"] = basic - pfx_md["b"] = basic - pfx_ma["b"] = basic - - pfx_get["errcheck"] = function(r) - local res = zones.z1(r) - -- expect an error - if res:ok() then - return "FAIL\r\n" - end - if res:code() == mcp.MCMC_CODE_ERROR then - return "ERROR\r\n" - elseif res:code() == mcp.MCMC_CODE_CLIENT_ERROR then - return "CLIENT_ERROR\r\n" - elseif res:code() == mcp.MCMC_CODE_SERVER_ERROR then - return "SERVER_ERROR\r\n" - end - return "FAIL" - end - - -- show that we fetched the key by generating our own response string. - pfx_get["getkey"] = function(r) - return "VALUE |" .. r:key() .. " 0 2\r\nts\r\nEND\r\n" - end - - pfx_get["rtrimkey"] = function(r) - r:rtrimkey(4) - return zones.z1(r) - end - - pfx_get["ltrimkey"] = function(r) - r:ltrimkey(10) - return zones.z1(r) - end - - pfx_get["nolabel"] = function(r) - return zones.no_label(r) - end - - pfx_mg["ntokens"] = function(r) - return "VA 1 C123 v\r\n" .. r:ntokens() .. "\r\n" - end - - pfx_mg["hasflag"] = function(r) - if r:has_flag("c") then - return "HD C123\r\n" - elseif r:has_flag("O") then - return "HD Oabc\r\n" - end - return "NF\r\n" - end + local map = {} - -- Input flags: N10 k c R10 - -- Output flags: N100 k R100 - pfx_mg["flagtoken"] = function(r) - -- flag_token on non-existing flags: no effect - local Ttoken = r:flag_token("T", "T100") - local Otoken = r:flag_token("O", nil) - local vtoken = r:flag_token("v", "") - if vtoken or Otoken or Ttoken then - return "ERROR found non-existing flag.\r\n" + map.b = new_basic(zones, function(rctx, a) + return function(r) + return rctx:enqueue_and_wait(r, a.t.z1) end + end) - -- flag_token to replace: N10 -> N100 - local found, Ntoken = r:flag_token("N", "N100") - if not found or Ntoken ~= "10" then - return "ERROR unexpected N token.\r\n" + map.errcheck = new_basic(zones, function(rctx, a) + return function(r) + local res = rctx:enqueue_and_wait(r, a.t.z1) + -- expect an error + if res:ok() then + return "FAIL\r\n" + end + if res:code() == mcp.MCMC_CODE_ERROR then + return "ERROR\r\n" + elseif res:code() == mcp.MCMC_CODE_CLIENT_ERROR then + return "CLIENT_ERROR\r\n" + elseif res:code() == mcp.MCMC_CODE_SERVER_ERROR then + return "SERVER_ERROR\r\n" + end + return "FAIL" end + end) - -- flag_token with nil 2nd arg: equvalent to fetch - r:flag_token("k", nil) - if not r:has_flag("k") then - return "ERROR unexpected k token.\r\n" + -- show that we fetched the key by generating our own response string. + map.getkey = new_basic(zones, function(rctx, a) + return function(r) + return "VALUE |" .. r:key() .. " 0 2\r\nts\r\nEND\r\n" end + end) - -- flag_token with self 2nd arg: no effect - r:flag_token("c", "c") - if not r:has_flag("c") then - return "ERROR unexpected c token 1.\r\n" + map.rtrimkey = new_basic(zones, function(rctx, a) + return function(r) + r:rtrimkey(4) + return rctx:enqueue_and_wait(r, a.t.z1) end + end) - -- flag_token with "" 2nd arg: remove - r:flag_token("c", "") - if r:has_flag("c") then - return "ERROR unexpected c token 2.\r\n" + map.ltrimkey = new_basic(zones, function(rctx, a) + return function(r) + r:ltrimkey(10) + return rctx:enqueue_and_wait(r, a.t.z1) end + end) - -- repeated flag_token calls: new value is returned. - local _, Rtoken = r:flag_token("R", "R100") - if Rtoken ~= '10' then - return "ERROR unexpected R token 1.\r\n" - end - _, Rtoken = r:flag_token("R", "R100") - if Rtoken ~= '100' then - return "ERROR unexpected R token 2.\r\n" + map.nolabel = new_basic(zones, function(rctx, a) + return function(r) + return rctx:enqueue_and_wait(r, a.t.no_label) end + end) - return "HD\r\n" - end - - pfx_ms["request"] = function(r) - local key = r:key() - local newReq = mcp.request("ms /request/edit 2\r\n", "ab\r\n") - return zones.z1(newReq) - end - - pfx_mg["request"] = function(r) - local key = r:key() - if key == "/request/old" then - local newReq = mcp.request("mg /request/new c\r\n") - return zones.z1(newReq) - else - local res = zones.z1(r) - local newReq = mcp.request("ms /request/a " .. res:vlen() .. "\r\n", res) - return zones.z1(newReq) + map.ntokens = new_basic(zones, function(rctx, a) + return function(r) + return "VA 1 C123 v\r\n" .. r:ntokens() .. "\r\n" end - end - - pfx_get["response"] = function(r) - local res = zones.z1(r) - local key = r:key() - if key == "/response/hit" then - local hit = res:hit() - if hit then - return res + end) + + map.hasflag = { + [mcp.CMD_MG] = new_basic(zones, function(rctx, a) + return function(r) + if r:has_flag("c") then + return "HD C123\r\n" + elseif r:has_flag("O") then + return "HD Oabc\r\n" + end + return "NF\r\n" end - return "ERROR hit is false\r\n" - elseif key == "/response/not_hit" then - local hit = res:hit() - if not hit then - return "SERVER_ERROR\r\n" + end), + [mcp.CMD_GET] = new_basic(zones, function(rctx, a) + return function(r) + if r:has_flag("F") then + return "ERROR flag found\r\n" + end + return "END\r\n" end - return res - end - return "ERROR unhandled key\r\n" - end - - pfx_mg["response"] = function(r) - local res = zones.z1(r) - local key = r:key() - if key == "/response/elapsed" then - local elapsed = res:elapsed() - if elapsed > 100000 then - return res + end) + } + + -- Input flags: N10 k c R10 + -- Output flags: N100 k R100 + map.flagtoken = new_basic(zones, function(rctx, a) + return function(r) + -- flag_token on non-existing flags: no effect + local Ttoken = r:flag_token("T", "T100") + local Otoken = r:flag_token("O", nil) + local vtoken = r:flag_token("v", "") + if vtoken or Otoken or Ttoken then + return "ERROR found non-existing flag.\r\n" end - return "ERROR elapsed is invalid.\r\n" - elseif key == "/response/ok" then - local ok = res:ok() - if ok then - return res + + -- flag_token to replace: N10 -> N100 + local found, Ntoken = r:flag_token("N", "N100") + if not found or Ntoken ~= "10" then + return "ERROR unexpected N token.\r\n" end - return "ERROR ok is false\r\n" - elseif key == "/response/not_ok" then - local ok = res:ok() - if not ok then - return "SERVER_ERROR\r\n" + + -- flag_token with nil 2nd arg: equvalent to fetch + r:flag_token("k", nil) + if not r:has_flag("k") then + return "ERROR unexpected k token.\r\n" end - return "HD\r\n" - elseif key == "/response/hit" then - local hit = res:hit() - if hit then - return res + + -- flag_token with self 2nd arg: no effect + r:flag_token("c", "c") + if not r:has_flag("c") then + return "ERROR unexpected c token 1.\r\n" end - return "ERROR hit is false\r\n" - elseif key == "/response/not_hit" then - local hit = res:hit() - if not hit then - return "SERVER_ERROR\r\n" + + -- flag_token with "" 2nd arg: remove + r:flag_token("c", "") + if r:has_flag("c") then + return "ERROR unexpected c token 2.\r\n" end - return "HD\r\n" - elseif key == "/response/vlen" then - local vlen = res:vlen() - if vlen == 1 then - return res + + -- repeated flag_token calls: new value is returned. + local _, Rtoken = r:flag_token("R", "R100") + if Rtoken ~= '10' then + return "ERROR unexpected R token 1.\r\n" end - return "ERROR vlen is not 1\r\n" - elseif key == "/response/code_ok" then - local code = res:code() - if code == mcp.MCMC_CODE_OK then - return res + _, Rtoken = r:flag_token("R", "R100") + if Rtoken ~= '100' then + return "ERROR unexpected R token 2.\r\n" end - return "ERROR expect MCMC_CODE_OK, but got " .. code .. "\r\n" - elseif key == "/response/code_miss" then - local code = res:code() - if code == mcp.MCMC_CODE_END then - return res + + return "HD\r\n" + end + end) + + map.request = { + [mcp.CMD_MS] = new_basic(zones, function(rctx, a) + return function(r) + local key = r:key() + local newReq = mcp.request("ms /request/edit 2\r\n", "ab\r\n") + return rctx:enqueue_and_wait(newReq, a.t.z1) end - return "ERROR expect MCMC_CODE_END, but got " .. code .. "\r\n" - elseif key == "/response/line" then - local line = res:line() - if line == "v c123" then - return res + end), + [mcp.CMD_MG] = new_basic(zones, function(rctx, a) + return function(r) + local key = r:key() + if key == "/request/old" then + local newReq = mcp.request("mg /request/new c\r\n") + return rctx:enqueue_and_wait(newReq, a.t.z1) + else + local res = rctx:enqueue_and_wait(r, a.t.z1) + local newReq = mcp.request("ms /request/a " .. res:vlen() .. "\r\n", res) + return rctx:enqueue_and_wait(newReq, a.t.z2) + end end - return "ERROR unexpected line, got [" .. line .. "]\r\n" - elseif key == "/response/blank" then - res:flag_blank("O") - return res - end - return "ERROR unhandled key\r\n" - end + end) + } - pfx_ms["response"] = function(r) - local key = r:key() - local res = zones.z1(r) - local code = res:code() + map.response = { + [mcp.CMD_GET] = new_basic(zones, function(rctx, a) + return function(r) + local res = rctx:enqueue_and_wait(r, a.t.z1) + local key = r:key() + if key == "/response/hit" then + local hit = res:hit() + if hit then + return res + end + return "ERROR hit is false\r\n" + elseif key == "/response/not_hit" then + local hit = res:hit() + if not hit then + return "SERVER_ERROR\r\n" + end + return res + end + return "ERROR unhandled key\r\n" - if key == "/response/code_ok" then - if code == mcp.MCMC_CODE_OK then - return res end - return "ERROR expect MCMC_CODE_OK, but got " .. code .. "\r\n" - elseif key == "/response/line" then - local line = res:line() - if line == "O123 C123" then - return res + end), + [mcp.CMD_MG] = new_basic(zones, function(rctx, a) + return function(r) + local res = rctx:enqueue_and_wait(r, a.t.z1) + local key = r:key() + if key == "/response/elapsed" then + local elapsed = res:elapsed() + if elapsed > 100000 then + return res + end + return "ERROR elapsed is invalid.\r\n" + elseif key == "/response/ok" then + local ok = res:ok() + if ok then + return res + end + return "ERROR ok is false\r\n" + elseif key == "/response/not_ok" then + local ok = res:ok() + if not ok then + return "SERVER_ERROR\r\n" + end + return "HD\r\n" + elseif key == "/response/hit" then + local hit = res:hit() + if hit then + return res + end + return "ERROR hit is false\r\n" + elseif key == "/response/not_hit" then + local hit = res:hit() + if not hit then + return "SERVER_ERROR\r\n" + end + return "HD\r\n" + elseif key == "/response/vlen" then + local vlen = res:vlen() + if vlen == 1 then + return res + end + return "ERROR vlen is not 1\r\n" + elseif key == "/response/code_ok" then + local code = res:code() + if code == mcp.MCMC_CODE_OK then + return res + end + return "ERROR expect MCMC_CODE_OK, but got " .. code .. "\r\n" + elseif key == "/response/code_miss" then + local code = res:code() + if code == mcp.MCMC_CODE_END then + return res + end + return "ERROR expect MCMC_CODE_END, but got " .. code .. "\r\n" + elseif key == "/response/line" then + local line = res:line() + if line == "v c123" then + return res + end + return "ERROR unexpected line, got [" .. line .. "]\r\n" + elseif key == "/response/blank" then + res:flag_blank("O") + return res + end + return "ERROR unhandled key\r\n" + end - return "ERROR unexpected line, got [" .. line .. "]\r\n" - end - return "ERROR unhandled key\r\n" - end - - pfx_set["response"] = function(r) - local res = zones.z1(r) - local key = r:key() - if key == "/response/code_stored" then - local code = res:code() - if code == mcp.MCMC_CODE_STORED then - return res + end), + [mcp.CMD_MS] = new_basic(zones, function(rctx, a) + return function(r) + local key = r:key() + local res = rctx:enqueue_and_wait(r, a.t.z1) + local code = res:code() + + if key == "/response/code_ok" then + if code == mcp.MCMC_CODE_OK then + return res + end + return "ERROR expect MCMC_CODE_OK, but got " .. code .. "\r\n" + elseif key == "/response/line" then + local line = res:line() + if line == "O123 C123" then + return res + end + return "ERROR unexpected line, got [" .. line .. "]\r\n" + end + return "ERROR unhandled key\r\n" end - return "ERROR expect MCMC_CODE_STORED, but got " .. code .. "\r\n" - elseif key == "/response/code_exists" then - local code = res:code() - if code == mcp.MCMC_CODE_EXISTS then - return res + end), + [mcp.CMD_SET] = new_basic(zones, function(rctx, a) + return function(r) + local res = rctx:enqueue_and_wait(r, a.t.z1) + local key = r:key() + if key == "/response/code_stored" then + local code = res:code() + if code == mcp.MCMC_CODE_STORED then + return res + end + return "ERROR expect MCMC_CODE_STORED, but got " .. code .. "\r\n" + elseif key == "/response/code_exists" then + local code = res:code() + if code == mcp.MCMC_CODE_EXISTS then + return res + end + return "ERROR expect MCMC_CODE_EXISTS, but got " .. code .. "\r\n" + elseif key == "/response/code_not_stored" then + local code = res:code() + if code == mcp.MCMC_CODE_NOT_STORED then + return res + end + return "ERROR expect MCMC_CODE_NOT_STORED, but got " .. code .. "\r\n" + elseif key == "/response/code_not_found" then + local code = res:code() + if code == mcp.MCMC_CODE_NOT_FOUND then + return res + end + return "ERROR expect MCMC_CODE_NOT_FOUND, but got " .. code .. "\r\n" + end + return "ERROR unhandled key\r\n" end - return "ERROR expect MCMC_CODE_EXISTS, but got " .. code .. "\r\n" - elseif key == "/response/code_not_stored" then - local code = res:code() - if code == mcp.MCMC_CODE_NOT_STORED then - return res + end), + [mcp.CMD_TOUCH] = new_basic(zones, function(rctx, a) + return function(r) + local res = rctx:enqueue_and_wait(r, a.t.z1) + local key = r:key() + local code = res:code() + if code == mcp.MCMC_CODE_TOUCHED then + return res + end + return "ERROR expect MCMC_CODE_TOUCHED, but got " .. code .. "\r\n" end - return "ERROR expect MCMC_CODE_NOT_STORED, but got " .. code .. "\r\n" - elseif key == "/response/code_not_found" then - local code = res:code() - if code == mcp.MCMC_CODE_NOT_FOUND then - return res + end), + [mcp.CMD_DELETE] = new_basic(zones, function(rctx, a) + return function(r) + local res = rctx:enqueue_and_wait(r, a.t.z1) + local key = r:key() + local code = res:code() + if code == mcp.MCMC_CODE_DELETED then + return res + end + return "ERROR expect MCMC_CODE_DELETED, but got " .. code .. "\r\n" end - return "ERROR expect MCMC_CODE_NOT_FOUND, but got " .. code .. "\r\n" - end - return "ERROR unhandled key\r\n" - end - - pfx_touch["response"] = function(r) - local res = zones.z1(r) - local key = r:key() - local code = res:code() - if code == mcp.MCMC_CODE_TOUCHED then - return res - end - return "ERROR expect MCMC_CODE_TOUCHED, but got " .. code .. "\r\n" - end - - pfx_delete["response"] = function(r) - local res = zones.z1(r) - local key = r:key() - local code = res:code() - if code == mcp.MCMC_CODE_DELETED then - return res - end - return "ERROR expect MCMC_CODE_DELETED, but got " .. code .. "\r\n" - end + end), + } - pfx_get["hasflag"] = function(r) - if r:has_flag("F") then - return "ERROR flag found\r\n" - end - return "END\r\n" - end - - pfx_ms["token"] = function(r) - local key = r:key() - if key == "/token/replacement" then - r:token(4, "C456") - return zones.z1(r) - elseif key == "/token/removal" then - r:token(4, "") - return zones.z1(r) - else - local token = r:token(2) - r:flag_token("P", "P" .. token) - return zones.z1(r) - end - end - - -- Basic test for routing requests to specific pools. - -- Not sure how this could possibly break but testing for completeness. - pfx_get["zonetest"] = function(r) - local key = r:key() - if key == "/zonetest/a" then - return zones.z1(r) - elseif key == "/zonetest/b" then - return zones.z2(r) - elseif key == "/zonetest/c" then - return zones.z3(r) - else - return "END\r\n" + map.token = new_basic(zones, function(rctx, a) + return function(r) + local key = r:key() + if key == "/token/replacement" then + r:token(4, "C456") + elseif key == "/token/removal" then + r:token(4, "") + else + local token = r:token(2) + r:flag_token("P", "P" .. token) + end + return rctx:enqueue_and_wait(r, a.t.z1) end - end - - pfx_get["logtest"] = function(r) - mcp.log("testing manual log messages") - return "END\r\n" - end - - pfx_get["logreqtest"] = function(r) - local res = zones.z1(r) - mcp.log_req(r, res, "logreqtest") - return res - end - - pfx_get["logreqstest"] = function(r) - local res = zones.z1(r) - mcp.log_reqsample(150, 0, true, r, res, "logsampletest") - return res - end - - -- tell caller what we got back via a fake response - pfx_get["awaitbasic"] = function(r) - local vals = {} - local rtable = mcp.await(r, { zones.z1, zones.z2, zones.z3 }) - - for i, res in pairs(rtable) do - if res:hit() == true then - vals[i] = "hit" - elseif res:ok() == true then - vals[i] = "ok" + end) + + map.zonetest = new_basic(zones, function(rctx, a) + return function(r) + local key = r:key() + if key == "/zonetest/a" then + return rctx:enqueue_and_wait(r, a.t.z1) + elseif key == "/zonetest/b" then + return rctx:enqueue_and_wait(r, a.t.z2) + elseif key == "/zonetest/c" then + return rctx:enqueue_and_wait(r, a.t.z3) else - vals[i] = "err" + return "END\r\n" end end + end) - local val = table.concat(vals, " ") - local vlen = string.len(val) - -- convenience functions for creating responses would be nice :) - return "VALUE " .. r:key() .. " 0 " .. vlen .. "\r\n" .. val .. "\r\nEND\r\n" - end - - pfx_get["awaitone"] = function(r) - local mode = string.sub(r:key(), -1, -1) - local num = 0 - if mode == "a" then - num = 1 - elseif mode == "b" then - num = 2 + map.logtest = new_basic(zones, function(rctx, a) + return function(r) + mcp.log("testing manual log messages") + return "END\r\n" end - local rtable = mcp.await(r, { zones.z1, zones.z2, zones.z3 }, num) + end) - local ok_cnt = 0 - local hit_cnt = 0 - local count = 0 - for i, res in pairs(rtable) do - if res:ok() then - ok_cnt = ok_cnt + 1 - end - if res:hit() then - hit_cnt = hit_cnt + 1 - end - count = count + 1 - end - local resp = count .. ":" .. ok_cnt .. ":" .. hit_cnt - - local vlen = string.len(resp) - return "VALUE " .. r:key() .. " 0 " .. vlen .. "\r\n" .. resp .. "\r\nEND\r\n" - end - - -- should be the same as awaitone - pfx_get["awaitgood"] = function(r) - local mode = string.sub(r:key(), -1, -1) - local num = 0 - if mode == "a" then - num = 1 - elseif mode == "b" then - num = 2 + map.logreqtest = new_basic(zones, function(rctx, a) + return function(r) + local res = rctx:enqueue_and_wait(r, a.t.z1) + mcp.log_req(r, res, "logreqtest") + return res end - local rtable = mcp.await(r, { zones.z1, zones.z2, zones.z3 }, num, mcp.AWAIT_GOOD) + end) - local ok_cnt = 0 - local hit_cnt = 0 - local count = 0 - for i, res in pairs(rtable) do - if res:ok() then - ok_cnt = ok_cnt + 1 - end - if res:hit() then - hit_cnt = hit_cnt + 1 - end - count = count + 1 + map.logreqstest = new_basic(zones, function(rctx, a) + return function(r) + local res = rctx:enqueue_and_wait(r, a.t.z1) + mcp.log_reqsample(150, 0, true, r, res, "logsampletest") + return res end - local resp = count .. ":" .. ok_cnt .. ":" .. hit_cnt - - local vlen = string.len(resp) - return "VALUE " .. r:key() .. " 0 " .. vlen .. "\r\n" .. resp .. "\r\nEND\r\n" - end - - -- not sure if anything else should be checked here? if err or not? - pfx_get["awaitany"] = function(r) - local rtable = mcp.await(r, { zones.z1, zones.z2, zones.z3 }, 2, mcp.AWAIT_ANY) - local count = 0 - for i, res in pairs(rtable) do - count = count + 1 + end) + + map.sanity = new_basic(zones, function(rctx, a) + local z = {a.t.z1, a.t.z2, a.t.z3} + return function(r) + rctx:enqueue(r, z) + rctx:wait_cond(3) + return rctx:result(a.t.z3) end + end) - local vlen = string.len(count) - return "VALUE " .. r:key() .. " 0 " .. vlen .. "\r\n" .. count .. "\r\nEND\r\n" - end - - pfx_get["awaitbg"] = function(r) - local rtable = mcp.await(r, { zones.z1, zones.z2, zones.z3 }, 1, mcp.AWAIT_BACKGROUND) - local count = 0 - for i, res in pairs(rtable) do - count = count + 1 + map.dead = new_basic(zones, function(rctx, a) + return function(r) + return rctx:enqueue_and_wait(r, a.t.dead) end + end) - local vlen = string.len(count) - return "VALUE " .. r:key() .. " 0 " .. vlen .. "\r\n" .. count .. "\r\nEND\r\n" - end - - pfx_set["awaitlogerr"] = function(r) - local rtable = mcp.await_logerrors(r, { zones.z1, zones.z2, zones.z3 }, 1, mcp.AWAIT_FASTGOOD, "write_failed") - return rtable[1] - end - - -- testing different styles of building the table argument for mcp.await() - pfx_get["awaitfastgood"] = function(r) - local all_zones = { zones.z1, zones.z2, zones.z3 } - - local restable = mcp.await(r, all_zones, 2, mcp.AWAIT_FASTGOOD) - - local final_res = restable[1] - local count = 0 - for _, res in pairs(restable) do - if res:hit() then - final_res = res + map.deadrespcode = new_basic(zones, function(rctx, a) + return function(r) + local res = rctx:enqueue_and_wait(r, a.t.dead) + if res:code() == mcp.MCMC_CODE_SERVER_ERROR then + return "ERROR code_correct\r\n" end - count = count + 1 + return "ERROR code_incorrect: " .. res:code() .. "\r\n" end + end) - return final_res - end - - pfx_set["awaitfastgood"] = function(r) - local all_zones = { zones.z1, zones.z2, zones.z3 } + map.millis = new_basic(zones, function(rctx, a) + return function(r) + local time = mcp.time_real_millis() + return "HD t" .. time .. "\r\n" + end + end) - local restable = mcp.await(r, all_zones, 2) - local count = 0 - local good_res = restable[1] - for _, res in pairs(restable) do - if res:ok() then - good_res = res + local def_fg = mcp.funcgen_new() + def_fg:ready({ + f = function(rctx) + return function(r) + return "SERVER_ERROR no set route\r\n" end - count = count + 1 end + }) - print("Set Response count: " .. count) - return good_res - end - - pfx_touch["sanity"] = function(r) - local rtable = mcp.await(r, { zones.z1, zones.z2, zones.z3 }) - return rtable[3] - end - - pfx_get["dead"] = function(r) - return zones.dead(r) - end - - pfx_get["deadrespcode"] = function(r) - local res = zones.dead(r) - - if res:code() == mcp.MCMC_CODE_SERVER_ERROR then - return "ERROR code_correct\r\n" - end - return "ERROR code_incorrect: " .. res:code() .. "\r\n" - end - - pfx_mg["millis"] = function(r) - local time = mcp.time_real_millis() - return "HD t" .. time .. "\r\n" - end - - mcp.attach(mcp.CMD_GET, toproute_factory(pfx_get, "get")) - mcp.attach(mcp.CMD_SET, toproute_factory(pfx_set, "set")) - mcp.attach(mcp.CMD_TOUCH, toproute_factory(pfx_touch, "touch")) - mcp.attach(mcp.CMD_GETS, toproute_factory(pfx_gets, "gets")) - mcp.attach(mcp.CMD_GAT, toproute_factory(pfx_gat, "gat")) - mcp.attach(mcp.CMD_GATS, toproute_factory(pfx_gats, "gats")) - mcp.attach(mcp.CMD_CAS, toproute_factory(pfx_cas, "cas")) - mcp.attach(mcp.CMD_ADD, toproute_factory(pfx_add, "add")) - mcp.attach(mcp.CMD_DELETE, toproute_factory(pfx_delete, "delete")) - mcp.attach(mcp.CMD_INCR, toproute_factory(pfx_incr, "incr")) - mcp.attach(mcp.CMD_DECR, toproute_factory(pfx_decr, "decr")) - mcp.attach(mcp.CMD_APPEND, toproute_factory(pfx_append, "append")) - mcp.attach(mcp.CMD_PREPEND, toproute_factory(pfx_prepend, "prepend")) - mcp.attach(mcp.CMD_MG, toproute_factory(pfx_mg, "mg")) - mcp.attach(mcp.CMD_MS, toproute_factory(pfx_ms, "ms")) - mcp.attach(mcp.CMD_MD, toproute_factory(pfx_md, "md")) - mcp.attach(mcp.CMD_MA, toproute_factory(pfx_ma, "ma")) - + mcp.attach(mcp.CMD_ANY_STORAGE, mcp.router_new({ + map = map, mode = "anchor", start = "/", stop = "/", default = def_fg + })) end diff --git a/t/proxyunits.t b/t/proxyunits.t index 8eba2bf5b..a61914fe4 100644 --- a/t/proxyunits.t +++ b/t/proxyunits.t @@ -273,13 +273,12 @@ sub proxy_test { like(<$w>, qr/ts=(\S+) gid=\d+ type=proxy_backend error=trailingdata name=127.0.0.1 port=11414 label= depth=0 rbuf=garbage/, "got backend error log line"); } -note("Test bugfix for missingend:" . __LINE__); - # This is an example of a test which will only pass before a bugfix is issued. # It's good practice where possible to write a failing test, then check it # against a code fix. We then leave the test in the file for reference. # Though noting when it was fixed is probably better than what I did here :) SKIP: { + note("Test bugfix for missingend:" . __LINE__); skip "Remove this skip line to demonstrate pre-patch bug", 1; # Test issue with finding response complete when read lands between value # size and value + response line in size. @@ -820,12 +819,13 @@ check_sanity($ps); subtest 'request clone response' => sub { # be must receive cloned meta-set from the previous meta-get. my $be = $mbe[0]; + my $be2 = $mbe[1]; print $ps "mg /request/clone v\r\n"; is(scalar <$be>, "mg /request/clone v\r\n", "get passthrough"); print $be "VA 1 v\r\n4\r\n"; - is(scalar <$be>, "ms /request/a 1\r\n", "received cloned meta-set"); - is(scalar <$be>, "4\r\n", "received cloned meta-set value"); - print $be "HD\r\n"; + is(scalar <$be2>, "ms /request/a 1\r\n", "received cloned meta-set"); + is(scalar <$be2>, "4\r\n", "received cloned meta-set value"); + print $be2 "HD\r\n"; is(scalar <$ps>, "HD\r\n", "received HD"); }; } @@ -1148,313 +1148,6 @@ check_sanity($ps); like(<$watcher>, qr/ts=(\S+) gid=\d+ type=proxy_req elapsed=\d+ type=105 code=17 status=0 cfd=\d+ be=127.0.0.1:11411 detail=logsampletest req=get \/logreqstest\/b/, "only got b request from log sample"); } -# Basic proxy stats validation - -# Test user stats - -check_sanity($ps); -# Test await arguments (may move to own file?) -# TODO: the results table from mcp.await() contains all of the results so far, -# regardless of the mode. -# need some tests that show this. -{ - note("Test await argument:" . __LINE__); - - # DEFAULT MODE, i.e. AWAIT_GOOD - - subtest 'await(r, p): send [h, h, h], recv 3 hits' => sub { - # be_recv must receive hit from all three backends - my $key = "/awaitbasic/a"; - my $ps_send = "get $key\r\n"; - my @be_send = ["VALUE $key 0 2\r\nok\r\nEND\r\n"]; - proxy_test( - ps_send => $ps_send, - be_recv => {0 => [$ps_send], 1 => [$ps_send], 2 => [$ps_send]}, - be_send => {0 => @be_send, 1 => @be_send, 2 => @be_send}, - ps_recv => ["VALUE $key 0 11\r\n", "hit hit hit\r\n", "END\r\n"], - ); - }; - - subtest 'await(r, p, 1): send [h], recv 1 response and 2 reconn' => sub { - my $key = "/awaitone/a"; - my $ps_send = "get $key\r\n"; - proxy_test( - ps_send => $ps_send, - be_recv => {0 => [$ps_send], 1 => [$ps_send], 2 => [$ps_send]}, - be_send => {0 => ["VALUE $key 0 2\r\nok\r\nEND\r\n"]}, - ps_recv => ["VALUE $key 0 5\r\n", "1:1:1\r\n", "END\r\n"], - ); - # reconnect due to timeout - $mbe[1] = accept_backend($mocksrvs[1]); - $mbe[2] = accept_backend($mocksrvs[2]); - }; - - subtest 'await(r, p, 1): send [h, m, m], recv 1 response' => sub { - my $key = "/awaitone/a"; - my $ps_send = "get $key\r\n"; - my $be_hit = "VALUE $key 0 2\r\nok\r\nEND\r\n"; - my $be_miss = "END\r\n"; - proxy_test( - ps_send => $ps_send, - be_recv => {0 => [$ps_send], 1 => [$ps_send], 2 => [$ps_send]}, - be_send => {0 => [$be_hit]}, - ps_recv => ["VALUE $key 0 5\r\n", "1:1:1\r\n", "END\r\n"], - ); - # send response to not timeout - $mbe[1]->send($be_miss); - $mbe[2]->send($be_miss); - }; - - subtest 'await(r, p, 1): send [m, m, h], recv 3 responses' => sub { - my $key = "/awaitone/a"; - my $ps_send = "get $key\r\n"; - my $be_hit = "VALUE $key 0 2\r\nok\r\nEND\r\n"; - my $be_miss = "END\r\n"; - proxy_test( - ps_send => $ps_send, - be_recv => {0 => [$ps_send], 1 => [$ps_send], 2 => [$ps_send]}, - be_send => {0 => [$be_miss], 1 => [$be_miss], 2 => [$be_hit]}, - ps_recv => ["VALUE $key 0 5\r\n", "3:3:1\r\n", "END\r\n"], - ); - }; - - subtest 'await(r, p, 1): sent [m, h, m], recv 2 responses' => sub { - my $key = "/awaitone/a"; - my $ps_send = "get $key\r\n"; - my $be_hit = "VALUE $key 0 2\r\nok\r\nEND\r\n"; - my $be_miss = "END\r\n"; - proxy_test( - ps_send => $ps_send, - be_recv => {0 => [$ps_send], 1 => [$ps_send], 2 => [$ps_send]}, - be_send => {0 => [$be_miss], 1 => [$be_hit]}, - ps_recv => ["VALUE $key 0 5\r\n", "2:2:1\r\n", "END\r\n"], - ); - $mbe[2]->send($be_miss); - }; - - subtest 'await(r, p, 1): sent [h, h, h], recv 1 response' => sub { - my $key = "/awaitone/a"; - my $ps_send = "get $key\r\n"; - my $be_hit = "VALUE $key 0 2\r\nok\r\nEND\r\n"; - my $be_miss = "END\r\n"; - proxy_test( - ps_send => $ps_send, - be_recv => {0 => [$ps_send], 1 => [$ps_send], 2 => [$ps_send]}, - be_send => {0 => [$be_hit], 1 => [$be_hit], 2 => [$be_hit]}, - ps_recv => ["VALUE $key 0 5\r\n", "1:1:1\r\n", "END\r\n"], - ); - }; - - subtest 'await(r, p, 2): sent [h, h, h], recv 2 responses' => sub { - my $key = "/awaitone/b"; - my $ps_send = "get $key\r\n"; - my $be_hit = "VALUE $key 0 2\r\nok\r\nEND\r\n"; - my $be_miss = "END\r\n"; - proxy_test( - ps_send => $ps_send, - be_recv => {0 => [$ps_send], 1 => [$ps_send], 2 => [$ps_send]}, - be_send => {0 => [$be_hit], 1 => [$be_hit], 2 => [$be_hit]}, - ps_recv => ["VALUE $key 0 5\r\n", "2:2:2\r\n", "END\r\n"], - ); - }; - - subtest 'await(r, p, 1): send [e, m, h], recv 3 responses and 1 reconn' => sub { - my $key = "/awaitone/a"; - my $ps_send = "get $key\r\n"; - my $be_hit = "VALUE $key 0 2\r\nok\r\nEND\r\n"; - my $be_miss = "END\r\n"; - my $be_error = "ERROR backend failure\r\n"; - proxy_test( - ps_send => $ps_send, - be_recv => {0 => [$ps_send], 1 => [$ps_send], 2 => [$ps_send]}, - be_send => {0 => [$be_error], 1 => [$be_miss], 2 => [$be_hit]}, - ps_recv => ["VALUE $key 0 5\r\n", "3:2:1\r\n", "END\r\n"], - ); - # reconnect due to ERROR - $mbe[0] = accept_backend($mocksrvs[0]); - }; - - subtest 'await(r, p, 1): send [e, m, e], recv 3 responses and 1 reconn' => sub { - my $key = "/awaitone/a"; - my $ps_send = "get $key\r\n"; - my $be_hit = "VALUE $key 0 2\r\nok\r\nEND\r\n"; - my $be_miss = "END\r\n"; - my $be_error = "ERROR failure\r\n"; - my $be_server_error = "SERVER_ERROR backend failure\r\n"; - proxy_test( - ps_send => $ps_send, - be_recv => {0 => [$ps_send], 1 => [$ps_send], 2 => [$ps_send]}, - be_send => {0 => [$be_error], 1 => [$be_miss], 2 => [$be_server_error]}, - ps_recv => ["VALUE $key 0 5\r\n", "3:1:0\r\n", "END\r\n"], - ); - # reconnect due to ERROR - $mbe[0] = accept_backend($mocksrvs[0]); - }; - - subtest 'await(r, p, 1, mcp.AWAIT_GOOD): sent [h, h, h], recv 1 response' => sub { - # ps_recv must receive hit when one backend sent good response. - my $key = "/awaitgood/a"; - my $ps_send = "get $key\r\n"; - my $be_hit = "VALUE $key 0 2\r\nok\r\nEND\r\n"; - my $be_miss = "END\r\n"; - proxy_test( - ps_send => $ps_send, - be_recv => {0 => [$ps_send], 1 => [$ps_send], 2 => [$ps_send]}, - be_send => {0 => [$be_hit], 1 => [$be_hit], 2 => [$be_hit]}, - ps_recv => ["VALUE $key 0 5\r\n", "1:1:1\r\n", "END\r\n"], - ); - }; - - my $cmd; - my $key; - - # await(r, p, 2, mcp.AWAIT_ANY) - $key = "/awaitany/a"; - $cmd = "get $key\r\n"; - print $ps $cmd; - for my $be (@mbe) { - is(scalar <$be>, $cmd, "awaitany backend req"); - print $be "VALUE $key 0 2\r\nok\r\nEND\r\n"; - } - is(scalar <$ps>, "VALUE $key 0 1\r\n", "response from await"); - is(scalar <$ps>, "2\r\n", "looking for a two responses"); - is(scalar <$ps>, "END\r\n", "end from await"); - - # await(r, p, 2, mcp.AWAIT_OK) - # await(r, p, 1, mcp.AWAIT_FIRST) - # more AWAIT_FIRST tests? to see how much it waits on/etc. - # await(r, p, 2, mcp.AWAIT_FASTGOOD) - # - should return 1 res on good, else wait for N non-error responses - $key = "/awaitfastgood/a"; - $cmd = "get $key\r\n"; - print $ps $cmd; - my $fbe = $mbe[0]; - is(scalar <$fbe>, $cmd, "awaitfastgood backend req"); - print $fbe "VALUE $key 0 2\r\nok\r\nEND\r\n"; - # Should have response after the first hit. - is(scalar <$ps>, "VALUE $key 0 2\r\n", "response from await"); - is(scalar <$ps>, "ok\r\n", "await value"); - is(scalar <$ps>, "END\r\n", "end from await"); - for my $be ($mbe[1], $mbe[2]) { - is(scalar <$be>, $cmd, "awaitfastgood backend req"); - print $be "VALUE $key 0 2\r\nok\r\nEND\r\n"; - } - - # test three pools, second response returns good. should have a hit. - print $ps $cmd; - for my $be (@mbe) { - is(scalar <$be>, $cmd, "awaitfastgood backend req"); - } - $fbe = $mbe[0]; - print $fbe "END\r\n"; - $fbe = $mbe[1]; - print $fbe "VALUE $key 0 2\r\nun\r\nEND\r\n"; - is(scalar <$ps>, "VALUE $key 0 2\r\n", "response from await"); - is(scalar <$ps>, "un\r\n", "await value"); - is(scalar <$ps>, "END\r\n", "end from await"); - $fbe = $mbe[2]; - print $fbe "END\r\n"; - - # test three pools, but third returns good. should have returned already - print $ps $cmd; - for my $be ($mbe[0], $mbe[1]) { - is(scalar <$be>, $cmd, "awaitfastgood backend req"); - print $be "END\r\n"; - } - $fbe = $mbe[2]; - is(scalar <$fbe>, $cmd, "awaitfastgood backend req"); - print $fbe "VALUE $key 0 2\r\nnu\r\nEND\r\n"; - is(scalar <$ps>, "END\r\n", "miss from awaitfastgood"); - - # Testing a set related to fastgood. waiting for two responses. - $cmd = "set $key 0 0 2\r\nmo\r\n"; - print $ps $cmd; - for my $be ($mbe[0], $mbe[1]) { - is(scalar <$be>, "set $key 0 0 2\r\n", "set backend req"); - is(scalar <$be>, "mo\r\n", "set backend data"); - print $be "STORED\r\n"; - } - is(scalar <$ps>, "STORED\r\n", "got stored from await"); - $fbe = $mbe[2]; - is(scalar <$fbe>, "set $key 0 0 2\r\n", "set backend req"); - is(scalar <$fbe>, "mo\r\n", "set backend data"); - print $fbe "STORED\r\n"; - - # Testing another set; ensure it isn't returning early. - my $s = IO::Select->new(); - $s->add($ps); - print $ps $cmd; - for my $be (@mbe) { - is(scalar <$be>, "set $key 0 0 2\r\n", "set backend req"); - is(scalar <$be>, "mo\r\n", "set backend data"); - } - $fbe = $mbe[0]; - print $fbe "STORED\r\n"; - my @readable = $s->can_read(0.25); - is(scalar @readable, 0, "set doesn't return early"); - for my $be ($mbe[1], $mbe[2]) { - print $be "STORED\r\n"; - } - is(scalar <$ps>, "STORED\r\n", "set completed normally"); - - # await(r, p, 1, mcp.AWAIT_BACKGROUND) - ensure res without waiting - $key = "/awaitbg/a"; - $cmd = "get $key\r\n"; - print $ps $cmd; - # check we can get a response _before_ the backends are consulted. - is(scalar <$ps>, "VALUE $key 0 1\r\n", "response from await"); - is(scalar <$ps>, "0\r\n", "looking for zero responses"); - is(scalar <$ps>, "END\r\n", "end from await"); - for my $be (@mbe) { - is(scalar <$be>, $cmd, "awaitbg backend req"); - print $be "VALUE $key 0 2\r\nok\r\nEND\r\n"; - } - - # test hitting a pool normally then hit mcp.await() - # test hitting mcp.await() then a pool normally -} - -check_sanity($ps); - -{ - note("Test await_logerrors:" . __LINE__); - - my $watcher = $p_srv->new_sock; - print $watcher "watch proxyreqs\n"; - is(<$watcher>, "OK\r\n", "watcher enabled"); - - # test logging errors from special await. - my $key = "/awaitlogerr/a"; - my $cmd = "set $key 0 0 5\r\n"; - print $ps $cmd . "hello\r\n"; - # respond from the first backend normally, then other two with errors. - my $be = $mbe[0]; - is(scalar <$be>, $cmd, "await_logerrors backend req"); - is(scalar <$be>, "hello\r\n", "await_logerrors set payload"); - print $be "STORED\r\n"; - - is(scalar <$ps>, "STORED\r\n", "block until await responded"); - # now ship some errors. - for my $be ($mbe[1], $mbe[2]) { - is(scalar <$be>, $cmd, "await_logerrors backend req"); - is(scalar <$be>, "hello\r\n", "await_logerrors set payload"); - print $be "SERVER_ERROR out of memory\r\n"; - } - - like(<$watcher>, qr/ts=(\S+) gid=\d+ type=proxy_req elapsed=\d+ type=\d+ code=\d+ status=-1 cfd=\d+ be=(\S+) detail=write_failed req=set \/awaitlogerr\/a/, "await_logerrors log entry 1"); - like(<$watcher>, qr/ts=(\S+) gid=\d+ type=proxy_req elapsed=\d+ type=\d+ code=\d+ status=-1 cfd=\d+ be=(\S+) detail=write_failed req=set \/awaitlogerr\/a/, "await_logerrors log entry 2"); - - # Repeat the logreqtest to ensure we only got the log lines we expected. - $cmd = "get /logreqtest/a\r\n"; - print $ps $cmd; - is(scalar <$be>, $cmd, "got passthru for log"); - print $be "END\r\n"; - is(scalar <$ps>, "END\r\n", "got END from log test"); - like(<$watcher>, qr/ts=(\S+) gid=\d+ type=proxy_req elapsed=\d+ type=105 code=17 status=0 cfd=\d+ be=127.0.0.1:11411 detail=logreqtest req=get \/logreqtest\/a/, "found request log entry"); -} - -check_sanity($ps); - # Test out of spec commands from client # - wrong # of tokens # - bad key size