From 7e0aa310ecf5ec7b9fd03df61b996c2823591950 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Tue, 9 Nov 2021 14:02:24 +0000 Subject: [PATCH] Mas i1801 monitorworkerq (#979) The worker pools are now monitored to collate: - queue_time - how much time does a piece of work spend queueing before being allocated a worker; - work_time - how much time is spent between the worker being checked out and checked in again (which presumably equated to the time spent doing the work - less any time on the worker_pool server message queue). As part of this change some erroneous tabs were removed from the worker_pool code, which were skewing the code layout within github --- src/riak_core_node_worker_pool.erl | 33 ++-- src/riak_core_node_worker_pool_sup.erl | 16 +- src/riak_core_stat.erl | 42 ++++- src/riak_core_vnode_worker_pool.erl | 32 ++-- src/riak_core_worker_pool.erl | 220 ++++++++++++++++++------- 5 files changed, 229 insertions(+), 114 deletions(-) diff --git a/src/riak_core_node_worker_pool.erl b/src/riak_core_node_worker_pool.erl index ef2e6357d..33b92be85 100644 --- a/src/riak_core_node_worker_pool.erl +++ b/src/riak_core_node_worker_pool.erl @@ -32,7 +32,7 @@ % Allows you to set up a DSCP-style set of pools (assuming the % vnode_worker_pool counts as ef. Otherwise can just have a % single node_worker_pool - :: be_pool|af1_pool|af2_pool|af3_pool|af4_pool|node_worker_pool. + :: be_pool|af1_pool|af2_pool|af3_pool|af4_pool|node_worker_pool. -export_type([worker_pool/0]). @@ -63,26 +63,23 @@ dscp_pools() -> [af1(), af2(), af3(), af4(), be()]. -spec start_link(atom(), pos_integer(), list(), list(), worker_pool()) - -> {ok, pid()}. + -> {ok, pid()}. %% @doc %% Start a worker pool, and register under the name PoolType, which should be %% a recognised name from type worker_pool() start_link(WorkerMod, PoolSize, WorkerArgs, WorkerProps, PoolType) - when PoolType == be_pool; - PoolType == af1_pool; - PoolType == af2_pool; - PoolType == af3_pool; - PoolType == af4_pool; - PoolType == node_worker_pool -> + when PoolType == be_pool; + PoolType == af1_pool; + PoolType == af2_pool; + PoolType == af3_pool; + PoolType == af4_pool; + PoolType == node_worker_pool -> {ok, Pid} = - riak_core_worker_pool:start_link([WorkerMod, - PoolSize, - WorkerArgs, - WorkerProps], - ?MODULE), + riak_core_worker_pool:start_link( + [WorkerMod, PoolSize, WorkerArgs, WorkerProps], + ?MODULE, + PoolType), register(PoolType, Pid), - lager:info("Registered worker pool of type ~w and size ~w", - [PoolType, PoolSize]), {ok, Pid}. do_init([WorkerMod, PoolSize, WorkerArgs, WorkerProps]) -> @@ -108,10 +105,10 @@ stop(Pid, Reason) -> %% wait for all the workers to finish any current work shutdown_pool(Pid, Wait) -> - riak_core_worker_pool:shutdown_pool(Pid, Wait). + riak_core_worker_pool:shutdown_pool(Pid, Wait). reply(From, Msg) -> - riak_core_vnode:reply(From, Msg). + riak_core_vnode:reply(From, Msg). do_work(Pid, Work, From) -> - riak_core_vnode_worker:handle_work(Pid, Work, From). + riak_core_vnode_worker:handle_work(Pid, Work, From). diff --git a/src/riak_core_node_worker_pool_sup.erl b/src/riak_core_node_worker_pool_sup.erl index 6db9cb199..46e0abbe9 100644 --- a/src/riak_core_node_worker_pool_sup.erl +++ b/src/riak_core_node_worker_pool_sup.erl @@ -22,11 +22,11 @@ %% Helper macro for declaring children of supervisor -define(CHILD(I, PoolType, Args, Type, Timeout), - {PoolType, - {I, start_link, Args}, - permanent, Timeout, Type, [I]}). + {PoolType, + {I, start_link, Args}, + permanent, Timeout, Type, [I]}). -define(CHILD(I, PoolType, Args, Type), - ?CHILD(I, PoolType, Args, Type, 5000)). + ?CHILD(I, PoolType, Args, Type, 5000)). -type worker_pool() :: riak_core_node_worker_pool:worker_pool(). @@ -53,8 +53,8 @@ start_pool(WorkerMod, PoolSize, WorkerArgs, WorkerProps, QueueType) -> end. pool(WorkerMod, PoolSize, WorkerArgs, WorkerProps, QueueType) -> - ?CHILD(riak_core_node_worker_pool, - QueueType, - [WorkerMod, PoolSize, WorkerArgs, WorkerProps, QueueType], - worker). + ?CHILD(riak_core_node_worker_pool, + QueueType, + [WorkerMod, PoolSize, WorkerArgs, WorkerProps, QueueType], + worker). diff --git a/src/riak_core_stat.erl b/src/riak_core_stat.erl index 7506cd17b..a264420bf 100644 --- a/src/riak_core_stat.erl +++ b/src/riak_core_stat.erl @@ -138,6 +138,12 @@ handle_call(_Req, _From, State) -> handle_cast({update, {worker_pool, vnode_pool}}, State) -> exometer_update([prefix(), ?APP, vnode, worker_pool], 1), {noreply, State}; +handle_cast({update, {worker_pool, queue_time, Pool, QueueTime}}, State) -> + exometer_update([prefix(), ?APP, worker_pool_queuetime, Pool], QueueTime), + {noreply, State}; +handle_cast({update, {worker_pool, work_time, Pool, WorkTime}}, State) -> + exometer_update([prefix(), ?APP, worker_pool_worktime, Pool], WorkTime), + {noreply, State}; handle_cast({update, {worker_pool, Pool}}, State) -> exometer_update([prefix(), ?APP, node, worker_pool, Pool], 1), {noreply, State}; @@ -200,12 +206,27 @@ stats() -> {last, rebalance_delay_last}]} | nwp_stats()]. nwp_stats() -> - [ {[vnode, worker_pool], counter, [], [{value, vnode_worker_pool_total}]}, - {[node, worker_pool, unregistered], counter, [], [{value, node_worker_pool_unregistered_total}]} | - [nwp_stat(Pool) || Pool <- riak_core_node_worker_pool:pools()]]. + PoolNames = [vnode_pool, unregistered] ++ riak_core_node_worker_pool:pools(), + + [nwp_stat(Pool) || Pool <- PoolNames] ++ + + [nwpqt_stat(Pool) || Pool <- PoolNames] ++ + + [nwpwt_stat(Pool) || Pool <- PoolNames]. nwp_stat(Pool) -> - {[node, worker_pool, Pool], counter, [], [{value, nwp_name_atom(Pool)}]}. + {[node, worker_pool, Pool], counter, [], + [{value, nwp_name_atom(Pool, <<"_total">>)}]}. + +nwpqt_stat(Pool) -> + {[worker_pool_queuetime, Pool], histogram, [], + [{mean , nwp_name_atom(Pool, <<"_queuetime_mean">>)}, + {max , nwp_name_atom(Pool, <<"_queuetime_100">>)}]}. + +nwpwt_stat(Pool) -> + {[worker_pool_worktime, Pool], histogram, [], + [{mean , nwp_name_atom(Pool, <<"_worktime_mean">>)}, + {max , nwp_name_atom(Pool, <<"_worktime_100">>)}]}. system_stats() -> [ @@ -273,16 +294,19 @@ vnodeq_aggregate(Service, MQLs0) -> vnodeq_atom(Service, Desc) -> binary_to_atom(<<(atom_to_binary(Service, latin1))/binary, Desc/binary>>, latin1). -nwp_name_atom(Atom) -> - binary_to_atom(<< <<"node_worker_pool_">>/binary, - (atom_to_binary(Atom, latin1))/binary, - <<"_total">>/binary>>, latin1). +-spec nwp_name_atom(atom(), binary()) -> atom(). +nwp_name_atom(QueueName, StatName) -> + binary_to_atom(<< <<"worker_">>/binary, + (atom_to_binary(QueueName, latin1))/binary, + StatName/binary >>, + latin1). -ifdef(TEST). nwp_name_to_atom_test() -> - ?assertEqual(node_worker_pool_af1_pool_total, nwp_name_atom(af1_pool)). + ?assertEqual(worker_af1_pool_total, nwp_name_atom(af1_pool, <<"_total">>)). + %% Check vnodeq aggregation function vnodeq_aggregate_empty_test() -> diff --git a/src/riak_core_vnode_worker_pool.erl b/src/riak_core_vnode_worker_pool.erl index 813e9216f..9376a094c 100644 --- a/src/riak_core_vnode_worker_pool.erl +++ b/src/riak_core_vnode_worker_pool.erl @@ -46,36 +46,32 @@ %% API -export([start_link/5, stop/2, shutdown_pool/2, handle_work/3]). -start_link(WorkerMod, - PoolSize, VNodeIndex, - WorkerArgs, WorkerProps) -> - riak_core_worker_pool:start_link([WorkerMod, - PoolSize, - VNodeIndex, - WorkerArgs, - WorkerProps], - ?MODULE). +start_link(WorkerMod, PoolSize, VNodeIndex, WorkerArgs, WorkerProps) -> + riak_core_worker_pool:start_link( + [WorkerMod, PoolSize, VNodeIndex, WorkerArgs, WorkerProps], + ?MODULE, + vnode_pool). handle_work(Pid, Work, From) -> riak_core_stat:update({worker_pool, vnode_pool}), - riak_core_worker_pool:handle_work(Pid, Work, From). + riak_core_worker_pool:handle_work(Pid, Work, From). stop(Pid, Reason) -> riak_core_worker_pool:stop(Pid, Reason). %% wait for all the workers to finish any current work shutdown_pool(Pid, Wait) -> - riak_core_worker_pool:shutdown_pool(Pid, Wait). + riak_core_worker_pool:shutdown_pool(Pid, Wait). reply(From, Msg) -> - riak_core_vnode:reply(From, Msg). + riak_core_vnode:reply(From, Msg). do_init([WorkerMod, PoolSize, VNodeIndex, WorkerArgs, WorkerProps]) -> - poolboy:start_link([{worker_module, riak_core_vnode_worker}, - {worker_args, - [VNodeIndex, WorkerArgs, WorkerProps, self()]}, - {worker_callback_mod, WorkerMod}, - {size, PoolSize}, {max_overflow, 0}]). + poolboy:start_link([{worker_module, riak_core_vnode_worker}, + {worker_args, + [VNodeIndex, WorkerArgs, WorkerProps, self()]}, + {worker_callback_mod, WorkerMod}, + {size, PoolSize}, {max_overflow, 0}]). do_work(Pid, Work, From) -> - riak_core_vnode_worker:handle_work(Pid, Work, From). + riak_core_vnode_worker:handle_work(Pid, Work, From). diff --git a/src/riak_core_worker_pool.erl b/src/riak_core_worker_pool.erl index 543a03c39..01a7817d9 100644 --- a/src/riak_core_worker_pool.erl +++ b/src/riak_core_worker_pool.erl @@ -51,7 +51,7 @@ -export([init/1, handle_event/3, handle_sync_event/4, handle_info/3, terminate/3, code_change/4]). --export([start_link/2, handle_work/3, stop/2, shutdown_pool/2]). +-export([start_link/3, handle_work/3, stop/2, shutdown_pool/2]). %% gen_fsm states -export([queueing/2, ready/2, ready/3, queueing/3, shutdown/2, shutdown/3]). @@ -66,14 +66,16 @@ -define(SHUTDOWN_WAIT, 60000). --record(state, { - queue = queue:new(), - pool :: pid(), - monitors = [] :: list(), - shutdown :: undefined | {pid(), reference()}, - callback_mod :: atom() +-record(state, {queue = queue:new(), + pool :: pid(), + monitors = [] :: list(), + shutdown :: undefined | {pid(), reference()}, + callback_mod :: atom(), + pool_name :: atom(), + checkouts = [] :: list(checkout()) }). +-type checkout() :: {pid(), erlang:timestamp()}. -callback handle_work(Pid::pid(), Work::term(), From::term()) -> any(). @@ -88,8 +90,8 @@ -callback do_work(Pid::pid(), Work::term(), From::term()) -> any(). -start_link(PoolBoyArgs, CallbackMod) -> - gen_fsm:start_link(?MODULE, [PoolBoyArgs, CallbackMod], []). +start_link(PoolBoyArgs, CallbackMod, PoolName) -> + gen_fsm:start_link(?MODULE, [PoolBoyArgs, CallbackMod, PoolName], []). handle_work(Pid, Work, From) -> gen_fsm:send_event(Pid, {work, Work, From}). @@ -101,9 +103,13 @@ stop(Pid, Reason) -> shutdown_pool(Pid, Wait) -> gen_fsm:sync_send_all_state_event(Pid, {shutdown, Wait}, infinity). -init([PoolBoyArgs, CallbackMod]) -> - {ok, Pid} = CallbackMod:do_init(PoolBoyArgs), - {ok, ready, #state{pool=Pid, callback_mod=CallbackMod}}. +init([PoolBoyArgs, CallbackMod, PoolName]) -> + {ok, Pid} = CallbackMod:do_init(PoolBoyArgs), + {ok, + ready, + #state{pool=Pid, + callback_mod = CallbackMod, + pool_name = PoolName}}. ready(_Event, _From, State) -> {reply, ok, ready, State}. @@ -114,79 +120,116 @@ queueing(_Event, _From, State) -> shutdown(_Event, _From, State) -> {reply, ok, shutdown, State}. -ready({work, Work, From} = Msg, #state{pool=Pool, queue=Q, monitors=Monitors} = State) -> - case poolboy:checkout(Pool, false) of +ready({work, Work, From} = Msg, + #state{pool=Pool, + queue=Q, + pool_name=PoolName, + checkouts=Checkouts0, + monitors=Monitors0} = State) -> + case poolboy_checkout(Pool, PoolName, Checkouts0) of full -> - {next_state, queueing, State#state{queue=queue:in(Msg, Q)}}; - Pid when is_pid(Pid) -> - NewMonitors = - riak_core_worker_pool:monitor_worker(Pid, From, Work, Monitors), - Mod = State#state.callback_mod, - Mod:do_work(Pid, Work, From), - {next_state, ready, State#state{monitors=NewMonitors}} + {next_state, queueing, State#state{queue=push_to_queue(Msg, Q)}}; + {Pid, Checkouts} when is_pid(Pid) -> + Monitors = + riak_core_worker_pool:monitor_worker(Pid, + From, + Work, + Monitors0), + do_work(Pid, Work, From, State#state.callback_mod), + {next_state, + ready, + State#state{monitors=Monitors, checkouts = Checkouts}} end; ready(_Event, State) -> {next_state, ready, State}. queueing({work, _Work, _From} = Msg, #state{queue=Q} = State) -> - {next_state, queueing, State#state{queue=queue:in(Msg, Q)}}; + {next_state, queueing, State#state{queue=push_to_queue(Msg, Q)}}; queueing(_Event, State) -> {next_state, queueing, State}. shutdown({work, _Work, From}, State) -> %% tell the process requesting work that we're shutting down Mod = State#state.callback_mod, - Mod:reply(From, {error, vnode_shutdown}), + Mod:reply(From, {error, vnode_shutdown}), {next_state, shutdown, State}; shutdown(_Event, State) -> {next_state, shutdown, State}. -handle_event({checkin, Pid}, shutdown, #state{pool=Pool, monitors=Monitors0} = State) -> +handle_event({checkin, Pid}, + shutdown, + #state{pool=Pool, + pool_name=PoolName, + monitors=Monitors0, + checkouts=Checkouts0} = State) -> Monitors = demonitor_worker(Pid, Monitors0), - poolboy:checkin(Pool, Pid), + {ok, Checkouts} = + poolboy_checkin(Pool, Pid, PoolName, Checkouts0), case Monitors of [] -> %% work all done, time to exit! {stop, shutdown, State}; _ -> - {next_state, shutdown, State#state{monitors=Monitors}} + {next_state, + shutdown, + State#state{monitors=Monitors, checkouts=Checkouts}} end; -handle_event({checkin, Worker}, _, #state{pool = Pool, queue=Q, monitors=Monitors} = State) -> - case queue:out(Q) of +handle_event({checkin, Worker}, + _, + #state{pool=Pool, + queue=Q, + pool_name=PoolName, + checkouts=Checkouts0, + monitors=Monitors0} = State) -> + case consume_from_queue(Q, State#state.pool_name) of {{value, {work, Work, From}}, Rem} -> %% there is outstanding work to do - instead of checking %% the worker back in, just hand it more work to do - NewMonitors = monitor_worker(Worker, From, Work, Monitors), - Mod = State#state.callback_mod, - Mod:do_work(Worker, Work, From), - {next_state, queueing, State#state{queue=Rem, - monitors=NewMonitors}}; + Monitors = monitor_worker(Worker, From, Work, Monitors0), + do_work(Worker, Work, From, State#state.callback_mod), + {next_state, + queueing, State#state{queue=Rem, monitors=Monitors}}; {empty, Empty} -> - NewMonitors = demonitor_worker(Worker, Monitors), - poolboy:checkin(Pool, Worker), - {next_state, ready, State#state{queue=Empty, monitors=NewMonitors}} + Monitors = demonitor_worker(Worker, Monitors0), + {ok, Checkouts} = + poolboy_checkin(Pool, Worker, PoolName, Checkouts0), + {next_state, + ready, + State#state{queue=Empty, + monitors=Monitors, + checkouts=Checkouts}} end; -handle_event(worker_start, StateName, #state{pool=Pool, queue=Q, monitors=Monitors}=State) -> +handle_event(worker_start, StateName, + #state{pool=Pool, + queue=Q, + pool_name=PoolName, + checkouts=Checkouts0, + monitors=Monitors0}=State) -> %% a new worker just started - if we have work pending, try to do it - case queue:out(Q) of + case consume_from_queue(Q, State#state.pool_name) of {{value, {work, Work, From}}, Rem} -> - case poolboy:checkout(Pool, false) of + case poolboy_checkout(Pool, PoolName, Checkouts0) of full -> {next_state, queueing, State}; - Pid when is_pid(Pid) -> - NewMonitors = monitor_worker(Pid, From, Work, Monitors), - Mod = State#state.callback_mod, - Mod:do_work(Pid, Work, From), - {next_state, queueing, State#state{queue=Rem, monitors=NewMonitors}} + {Pid, Checkouts} when is_pid(Pid) -> + Monitors = monitor_worker(Pid, From, Work, Monitors0), + do_work(Pid, Work, From, State#state.callback_mod), + {next_state, + queueing, + State#state{queue=Rem, + monitors=Monitors, + checkouts=Checkouts}} end; {empty, _} -> - {next_state, - %% If we are in state queueing with nothing in the queue, - %% move to the ready state so that the next incoming job - %% checks out the new worker from poolboy. - if StateName==queueing -> ready; - true -> StateName - end, - State} + {next_state, + %% If we are in state queueing with nothing in the queue, + %% move to the ready state so that the next incoming job + %% checks out the new worker from poolboy. + if StateName==queueing -> + ready; + true -> + StateName + end, + State} end; handle_event(_Event, StateName, State) -> {next_state, StateName, State}. @@ -212,25 +255,27 @@ handle_sync_event({shutdown, Time}, From, _StateName, #state{queue=Q, handle_sync_event(_Event, _From, StateName, State) -> {reply, {error, unknown_message}, StateName, State}. -handle_info({'DOWN', _Ref, _, Pid, Info}, StateName, #state{monitors=Monitors} = State) -> +handle_info({'DOWN', _Ref, _, Pid, Info}, + StateName, + #state{monitors=Monitors0} = State) -> %% remove the listing for the dead worker - case lists:keyfind(Pid, 1, Monitors) of + case lists:keyfind(Pid, 1, Monitors0) of {Pid, _, From, Work} -> - Mod = State#state.callback_mod, - Mod:reply(From, {error, {worker_crash, Info, Work}}), - NewMonitors = lists:keydelete(Pid, 1, Monitors), + Mod = State#state.callback_mod, + Mod:reply(From, {error, {worker_crash, Info, Work}}), + Monitors = lists:keydelete(Pid, 1, Monitors0), %% trigger to do more work will be 'worker_start' message %% when poolboy replaces this worker (if not a 'checkin' %% or 'handle_work') - {next_state, StateName, State#state{monitors=NewMonitors}}; + {next_state, StateName, State#state{monitors=Monitors}}; false -> {next_state, StateName, State} end; handle_info(shutdown, shutdown, #state{monitors=Monitors} = State) -> %% we've waited too long to shutdown, time to force the issue - Mod = State#state.callback_mod, - _ = [Mod:reply(From, {error, vnode_shutdown}) || - {_, _, From, _} <- Monitors], + Mod = State#state.callback_mod, + _ = [Mod:reply(From, {error, vnode_shutdown}) + || {_, _, From, _} <- Monitors], {stop, shutdown, State}; handle_info(_Info, StateName, State) -> {next_state, StateName, State}. @@ -278,3 +323,56 @@ discard_queued_work(Q, Mod) -> ok end. +-spec poolboy_checkin(pid(), pid(), atom(), list(checkout())) + -> {ok, list(checkout())}. +poolboy_checkin(Pool, Worker, PoolName, Checkouts) -> + R = poolboy:checkin(Pool, Worker), + case lists:keytake(Worker, 1, Checkouts) of + {value, {Worker, WT}, Checkouts0} -> + riak_core_stat:update({worker_pool, + work_time, + PoolName, + timer:now_diff(os:timestamp(), WT)}), + {R, Checkouts0}; + _ -> + lager:warning( + "Unexplained poolboy behaviour - failure to track checkouts"), + {R, Checkouts} + end. + +-spec poolboy_checkout(pid(), atom(), list(checkout())) + -> full | {pid(), list(checkout())}. +poolboy_checkout(Pool, PoolName, Checkouts) -> + case poolboy:checkout(Pool, false) of + full -> + full; + P when is_pid(P) -> + riak_core_stat:update({worker_pool, + queue_time, + PoolName, + 0}), + {P, [{P, os:timestamp()}|Checkouts]} + end. + +do_work(Pid, Work, From, Mod) -> + Mod:do_work(Pid, Work, From). + +-spec push_to_queue({work, term(), term()}, queue:queue()) -> queue:queue(). +push_to_queue(Msg, Q) -> + QT = os:timestamp(), + queue:in({QT, Msg}, Q). + +-spec consume_from_queue(queue:queue(), atom()) -> + {empty | {value, {work, term(), term()}}, + queue:queue()}. +consume_from_queue(Q, PoolName) -> + case queue:out(Q) of + {empty, Empty} -> + {empty, Empty}; + {{value, {QT, {work, Work, From}}}, Rem} -> + riak_core_stat:update({worker_pool, + queue_time, + PoolName, + timer:now_diff(os:timestamp(), QT)}), + {{value, {work, Work, From}}, Rem} + end.