Skip to content

Commit

Permalink
proxy: per-backend automatic logging
Browse files Browse the repository at this point in the history
Options similar to mcp.log_reqsample() are applied to responses as
they're read off the network.

Can be a lot faster than relying on callbacks to do logging, if you're
not doing logging inline with a route handler. Also simpler in some
cases (ie routelib).
  • Loading branch information
dormando committed Jan 21, 2025
1 parent 5fe01d0 commit 3365189
Show file tree
Hide file tree
Showing 13 changed files with 251 additions and 49 deletions.
19 changes: 16 additions & 3 deletions logger.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
#include <string.h>
#include <errno.h>
#include <poll.h>
#include <ctype.h>
#include <stdarg.h>

#if defined(__sun)
Expand Down Expand Up @@ -347,6 +346,7 @@ static void _logger_log_proxy_req(logentry *e, const entry_details *d, const voi
unsigned short type = va_arg(ap, int);
unsigned short code = va_arg(ap, int);
int status = va_arg(ap, int);
int flag = va_arg(ap, int);
int conn_fd = va_arg(ap, int);
char *detail = va_arg(ap, char *);
int dlen = va_arg(ap, int);
Expand All @@ -357,6 +357,7 @@ static void _logger_log_proxy_req(logentry *e, const entry_details *d, const voi
le->type = type;
le->code = code;
le->status = status;
le->flag = flag;
le->conn_fd = conn_fd;
le->dlen = dlen;
le->elapsed = elapsed;
Expand Down Expand Up @@ -391,14 +392,26 @@ static void _logger_log_proxy_req(logentry *e, const entry_details *d, const voi
e->size = sizeof(struct logentry_proxy_req) + reqlen + dlen + le->be_namelen + le->be_portlen;
}

// FIXME: if I ever get better discipline around headers we can include the
// proxy header. Since proxy.h includes memcached.h we need to redefine things
// here.
#define RQUEUE_R_GOOD (1<<3)
#define RQUEUE_R_OK (1<<4)
#define RQUEUE_R_ANY (1<<5)
static int _logger_parse_prx_req(logentry *e, char *scratch) {
int total;
struct logentry_proxy_req *le = (void *)e->data;
const char *rqu_res = "any";
if (le->flag == RQUEUE_R_GOOD) {
rqu_res = "good";
} else if (le->flag == RQUEUE_R_OK) {
rqu_res = "ok";
}

total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
"ts=%lld.%d gid=%llu type=proxy_req elapsed=%lu type=%d code=%d status=%d cfd=%d be=%.*s:%.*s detail=%.*s req=%.*s\n",
"ts=%lld.%d gid=%llu type=proxy_req elapsed=%lu type=%d code=%d status=%d res=%s cfd=%d be=%.*s:%.*s detail=%.*s req=%.*s\n",
(long long int) e->tv.tv_sec, (int) e->tv.tv_usec, (unsigned long long) e->gid,
le->elapsed, le->type, le->code, le->status, le->conn_fd,
le->elapsed, le->type, le->code, le->status, rqu_res, le->conn_fd,
(int)le->be_namelen, le->data+le->reqlen+le->dlen,
(int)le->be_portlen, le->data+le->reqlen+le->dlen+le->be_namelen, // fml.
(int)le->dlen, le->data+le->reqlen, (int)le->reqlen, le->data
Expand Down
1 change: 1 addition & 0 deletions logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ struct logentry_conn_event {
struct logentry_proxy_req {
unsigned short type;
unsigned short code;
uint8_t flag;
int status;
int conn_fd;
uint32_t reqlen;
Expand Down
17 changes: 1 addition & 16 deletions proto_proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -1196,8 +1196,6 @@ mcp_resp_t *mcp_prep_bare_resobj(lua_State *L, LIBEVENT_THREAD *t) {

void mcp_set_resobj(mcp_resp_t *r, mcp_request_t *rq, mcp_backend_t *be, LIBEVENT_THREAD *t) {
memset(r, 0, sizeof(mcp_resp_t));
r->buf = NULL;
r->blen = 0;
r->thread = t;
assert(r->thread != NULL);
gettimeofday(&r->start, NULL);
Expand All @@ -1224,20 +1222,7 @@ void mcp_set_resobj(mcp_resp_t *r, mcp_request_t *rq, mcp_backend_t *be, LIBEVEN
}

r->cmd = rq->pr.command;

strncpy(r->be_name, be->name, MAX_NAMELEN+1);
strncpy(r->be_port, be->port, MAX_PORTLEN+1);

}

mcp_resp_t *mcp_prep_resobj(lua_State *L, mcp_request_t *rq, mcp_backend_t *be, LIBEVENT_THREAD *t) {
mcp_resp_t *r = lua_newuserdatauv(L, sizeof(mcp_resp_t), 0);
mcp_set_resobj(r, rq, be, t);

luaL_getmetatable(L, "mcp.response");
lua_setmetatable(L, -2);

return r;
r->be = be;
}

void mcp_resp_set_elapsed(mcp_resp_t *r) {
Expand Down
19 changes: 14 additions & 5 deletions proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,13 @@ struct proxy_tunables {
bool down; // backend is forced into a down/bad state.
};

struct proxy_logging {
unsigned int deadline; // log if slower than N us (user specifies ms)
unsigned int rate; // sampling
bool all_errors; // always log on an error case
char *detail;
};

typedef STAILQ_HEAD(globalobj_head_s, mcp_globalobj_s) globalobj_head_t;
typedef struct {
lua_State *proxy_state; // main configuration vm
Expand Down Expand Up @@ -382,7 +389,9 @@ struct mcp_backend_label_s {
char label[MAX_LABELLEN+1];
size_t llen; // cache label length for small speedup in pool creation.
int conncount; // number of sockets to make.
bool use_logging;
struct proxy_tunables tunables;
struct proxy_logging logging;
};

// lua object wrapper meant to own a malloc'ed conn structure
Expand Down Expand Up @@ -435,13 +444,15 @@ struct mcp_backend_s {
bool transferred; // if beconn has been shipped to owner thread.
bool use_io_thread; // note if this backend is worker-local or not.
bool stacked; // if backend already queued for syscalls.
bool use_logging; // if automatic logging is enabled.
STAILQ_ENTRY(mcp_backend_s) beconn_next; // stack for connecting conns
STAILQ_ENTRY(mcp_backend_s) be_next; // stack for backends
iop_head_t iop_head; // stack of inbound requests.
struct proxy_logging logging;
struct proxy_tunables tunables; // this gets copied a few times for speed.
char name[MAX_NAMELEN+1];
char port[MAX_PORTLEN+1];
char label[MAX_LABELLEN+1];
struct proxy_tunables tunables; // this gets copied a few times for speed.
struct mcp_backendconn_s be[];
};
typedef STAILQ_HEAD(be_head_s, mcp_backend_s) be_head_t;
Expand Down Expand Up @@ -494,6 +505,7 @@ typedef struct {
char *buf; // response line + potentially value.
mc_resp *cresp; // client mc_resp object during extstore fetches.
LIBEVENT_THREAD *thread; // cresp's owner thread needed for extstore cleanup.
mcp_backend_t *be; // backend that generated this response
unsigned int blen; // total size of the value to read.
struct timeval start; // time this object was created.
long elapsed; // time elapsed once handled.
Expand All @@ -502,8 +514,6 @@ typedef struct {
uint8_t cmd; // from parser (pr.command)
uint8_t extra; // ascii multiget hack for memory accounting. extra blen.
enum mcp_resp_mode mode; // reply mode (for noreply fixing)
char be_name[MAX_NAMELEN+1];
char be_port[MAX_PORTLEN+1];
} mcp_resp_t;

// re-cast an io_pending_t into this more descriptive structure.
Expand Down Expand Up @@ -600,7 +610,6 @@ void proxy_init_event_thread(proxy_event_thread_t *t, proxy_ctx_t *ctx, struct e
void *proxy_event_thread(void *arg);
void proxy_run_backend_queue(be_head_t *head);
struct mcp_backendconn_s *proxy_choose_beconn(mcp_backend_t *be);
mcp_resp_t *mcp_prep_resobj(lua_State *L, mcp_request_t *rq, mcp_backend_t *be, LIBEVENT_THREAD *t);
mcp_resp_t *mcp_prep_bare_resobj(lua_State *L, LIBEVENT_THREAD *t);
void mcp_resp_set_elapsed(mcp_resp_t *r);
io_pending_proxy_t *mcp_queue_rctx_io(mcp_rcontext_t *rctx, mcp_request_t *rq, mcp_backend_t *be, mcp_resp_t *r);
Expand Down Expand Up @@ -787,7 +796,7 @@ int mcplib_funcgen_gc(lua_State *L);
void mcp_funcgen_reference(lua_State *L);
void mcp_funcgen_dereference(lua_State *L, mcp_funcgen_t *fgen);
void mcp_rcontext_push_rqu_res(lua_State *L, mcp_rcontext_t *rctx, int handle);

void mcplib_rqu_log(mcp_request_t *rq, mcp_resp_t *rs, int flag, int cfd);

int mcplib_factory_command_new(lua_State *L);

Expand Down
4 changes: 0 additions & 4 deletions proxy_internal.c
Original file line number Diff line number Diff line change
Expand Up @@ -1718,10 +1718,6 @@ static inline int _mcplib_internal_run(LIBEVENT_THREAD *t, mcp_request_t *rq, mc
}
}

// in case someone logs this response it should make sense.
memcpy(r->be_name, "internal", strlen("internal"));
memcpy(r->be_port, "0", 1);

// TODO: r-> will need status/code/mode copied from resp.
r->cresp = resp;
r->thread = t;
Expand Down
Loading

0 comments on commit 3365189

Please sign in to comment.