Skip to content

Commit

Permalink
Merge pull request #206 from inaka/gen_server_callbacks
Browse files Browse the repository at this point in the history
Gen server callbacks
  • Loading branch information
elbrujohalcon authored Aug 12, 2024
2 parents 0ec5ee5 + 2de0ac5 commit e7d83ae
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 23 deletions.
94 changes: 73 additions & 21 deletions src/wpool_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,33 @@

-behaviour(gen_server).

%% Taken from gen_server OTP
-record(callback_cache,
{module :: module(),
handle_call ::
fun((Request :: term(), From :: from(), State :: term()) ->
{reply, Reply :: term(), NewState :: term()} |
{reply,
Reply :: term(),
NewState :: term(),
timeout() | hibernate | {continue, term()}} |
{noreply, NewState :: term()} |
{noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} |
{stop, Reason :: term(), Reply :: term(), NewState :: term()} |
{stop, Reason :: term(), NewState :: term()}),
handle_cast ::
fun((Request :: term(), State :: term()) ->
{noreply, NewState :: term()} |
{noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} |
{stop, Reason :: term(), NewState :: term()}),
handle_info ::
fun((Info :: timeout | term(), State :: term()) ->
{noreply, NewState :: term()} |
{noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} |
{stop, Reason :: term(), NewState :: term()})}).
-record(state,
{name :: atom(),
mod :: atom(),
mod :: #callback_cache{},
state :: term(),
options ::
#{time_checker := atom(),
Expand Down Expand Up @@ -103,21 +127,22 @@ get_state(#state{state = State}) ->
init({Name, Mod, InitArgs, LOptions}) ->
Options = maps:from_list(LOptions),
wpool_process_callbacks:notify(handle_init_start, Options, [Name]),
CbCache = create_callback_cache(Mod),
case Mod:init(InitArgs) of
{ok, ModState} ->
ok = notify_queue_manager(new_worker, Name, Options),
wpool_process_callbacks:notify(handle_worker_creation, Options, [Name]),
{ok,
#state{name = Name,
mod = Mod,
mod = CbCache,
state = ModState,
options = Options}};
{ok, ModState, NextStep} ->
ok = notify_queue_manager(new_worker, Name, Options),
wpool_process_callbacks:notify(handle_worker_creation, Options, [Name]),
{ok,
#state{name = Name,
mod = Mod,
mod = CbCache,
state = ModState,
options = Options},
NextStep};
Expand All @@ -130,37 +155,56 @@ init({Name, Mod, InitArgs, LOptions}) ->
%% @private
-spec terminate(atom(), state()) -> term().
terminate(Reason, State) ->
#state{mod = Mod,
#state{mod = #callback_cache{module = Mod},
state = ModState,
name = Name,
options = Options} =
State,
ok = notify_queue_manager(worker_dead, Name, Options),
wpool_process_callbacks:notify(handle_worker_death, Options, [Name, Reason]),
Mod:terminate(Reason, ModState).
case erlang:function_exported(Mod, terminate, 2) of
true ->
Mod:terminate(Reason, ModState);
_ ->
ok
end.

%% @private
-spec code_change(string(), state(), any()) -> {ok, state()} | {error, term()}.
code_change(OldVsn, State, Extra) ->
case (State#state.mod):code_change(OldVsn, State#state.state, Extra) of
{ok, NewState} ->
{ok, State#state{state = NewState}};
Error ->
{error, Error}
-spec code_change(string() | {down, string()}, state(), any()) ->
{ok, state()} | {error, term()}.
code_change(OldVsn, #state{mod = #callback_cache{module = Mod}} = State, Extra) ->
case erlang:function_exported(Mod, code_change, 3) of
true ->
case Mod:code_change(OldVsn, State#state.state, Extra) of
{ok, NewState} ->
{ok, State#state{state = NewState}};
{error, Error} ->
{error, Error}
end;
_ ->
{ok, State}
end.

%% @private
-spec handle_info(any(), state()) ->
{noreply, state()} | {noreply, state(), next_step()} | {stop, term(), state()}.
handle_info(Info, State) ->
try (State#state.mod):handle_info(Info, State#state.state) of
handle_info(Info, #state{mod = CbCache} = State) ->
#callback_cache{module = Mod, handle_info = HandleInfo} = CbCache,
try HandleInfo(Info, State#state.state) of
{noreply, NewState} ->
{noreply, State#state{state = NewState}};
{noreply, NewState, NextStep} ->
{noreply, State#state{state = NewState}, NextStep};
{stop, Reason, NewState} ->
{stop, Reason, State#state{state = NewState}}
catch
error:undef:Stacktrace ->
case erlang:function_exported(Mod, handle_info, 2) of
false ->
{noreply, State};
true ->
erlang:raise(error, undef, Stacktrace)
end;
_:{noreply, NewState} ->
{noreply, State#state{state = NewState}};
_:{noreply, NewState, NextStep} ->
Expand All @@ -174,8 +218,8 @@ handle_info(Info, State) ->
{noreply, state()} |
{noreply, state(), next_step()} |
{stop, term(), state()}.
handle_continue(Continue, State) ->
try (State#state.mod):handle_continue(Continue, State#state.state) of
handle_continue(Continue, #state{mod = #callback_cache{module = Mod}} = State) ->
try Mod:handle_continue(Continue, State#state.state) of
{noreply, NewState} ->
{noreply, State#state{state = NewState}};
{noreply, NewState, NextStep} ->
Expand All @@ -193,7 +237,7 @@ handle_continue(Continue, State) ->

%% @private
-spec format_status(gen_server:format_status()) -> gen_server:format_status().
format_status(#{state := #state{mod = Mod}} = Status) ->
format_status(#{state := #state{mod = #callback_cache{module = Mod}}} = Status) ->
case erlang:function_exported(Mod, format_status, 1) of
false ->
Status;
Expand All @@ -207,11 +251,12 @@ format_status(#{state := #state{mod = Mod}} = Status) ->
%% @private
-spec handle_cast(term(), state()) ->
{noreply, state()} | {noreply, state(), next_step()} | {stop, term(), state()}.
handle_cast(Cast, #state{options = Options} = State) ->
handle_cast(Cast, #state{mod = CbCache, options = Options} = State) ->
#callback_cache{handle_cast = HandleCast} = CbCache,
Task = wpool_utils:task_init({cast, Cast}, Options),
ok = notify_queue_manager(worker_busy, State#state.name, Options),
Reply =
try (State#state.mod):handle_cast(Cast, State#state.state) of
try HandleCast(Cast, State#state.state) of
{noreply, NewState} ->
{noreply, State#state{state = NewState}};
{noreply, NewState, NextStep} ->
Expand All @@ -238,11 +283,12 @@ handle_cast(Cast, #state{options = Options} = State) ->
{noreply, state(), next_step()} |
{stop, term(), term(), state()} |
{stop, term(), state()}.
handle_call(Call, From, #state{options = Options} = State) ->
handle_call(Call, From, #state{mod = CbCache, options = Options} = State) ->
#callback_cache{handle_call = HandleCall} = CbCache,
Task = wpool_utils:task_init({call, Call}, Options),
ok = notify_queue_manager(worker_busy, State#state.name, Options),
Reply =
try (State#state.mod):handle_call(Call, From, State#state.state) of
try HandleCall(Call, From, State#state.state) of
{noreply, NewState} ->
{noreply, State#state{state = NewState}};
{noreply, NewState, NextStep} ->
Expand Down Expand Up @@ -277,3 +323,9 @@ notify_queue_manager(Function, Name, #{queue_manager := QueueManager}) ->
wpool_queue_manager:Function(QueueManager, Name);
notify_queue_manager(_, _, _) ->
ok.

create_callback_cache(Mod) ->
#callback_cache{module = Mod,
handle_call = fun Mod:handle_call/3,
handle_cast = fun Mod:handle_cast/2,
handle_info = fun Mod:handle_info/2}.
2 changes: 2 additions & 0 deletions test/crashy_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ code_change(_OldVsn, State, _Extra) ->
-spec handle_info(timeout | Info, term()) -> {noreply, timeout} | Info.
handle_info(timeout, _State) ->
{noreply, timeout};
handle_info(undef, _State) ->
erlang:error(undef);
handle_info(Info, _State) ->
Info.

Expand Down
19 changes: 17 additions & 2 deletions test/wpool_process_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
-export([all/0]).
-export([init_per_suite/1, end_per_suite/1, init_per_testcase/2, end_per_testcase/2]).
-export([init/1, init_timeout/1, info/1, cast/1, send_request/1, call/1, continue/1,
format_status/1, no_format_status/1, stop/1]).
handle_info_missing/1, handle_info_fails/1, format_status/1, no_format_status/1, stop/1]).
-export([pool_restart_crash/1, pool_norestart_crash/1, complete_coverage/1]).

-spec all() -> [atom()].
Expand Down Expand Up @@ -173,6 +173,21 @@ continue(_Config) ->

{comment, []}.

-spec handle_info_missing(config()) -> {comment, []}.
handle_info_missing(_Config) ->
%% sleepy_server does not implement handle_info/2
{ok, Pid} = wpool_process:start_link(?MODULE, sleepy_server, 1, []),
Pid ! test,
{comment, []}.

-spec handle_info_fails(config()) -> {comment, []}.
handle_info_fails(_Config) ->
%% sleepy_server does not implement handle_info/2
{ok, Pid} = wpool_process:start_link(?MODULE, crashy_server, {ok, state}, []),
Pid ! undef,
false = ktn_task:wait_for(fun() -> erlang:is_process_alive(Pid) end, false),
{comment, []}.

-spec format_status(config()) -> {comment, []}.
format_status(_Config) ->
%% echo_server implements format_status/1
Expand Down Expand Up @@ -319,7 +334,7 @@ complete_coverage(_Config) ->
ct:comment("Code Change"),
{ok, State} = wpool_process:init({complete_coverage, echo_server, {ok, state}, []}),
{ok, _} = wpool_process:code_change("oldvsn", State, {ok, state}),
{error, bad} = wpool_process:code_change("oldvsn", State, bad),
{error, bad} = wpool_process:code_change("oldvsn", State, {error, bad}),

{comment, []}.

Expand Down

0 comments on commit e7d83ae

Please sign in to comment.