Skip to content

Commit

Permalink
Add initial ra_machine:handle_aux/5 API
Browse files Browse the repository at this point in the history
the appropriate handle_aux function to use is re-evaluated every
time the effective machine module changes and is stored in the
ra_server configuration state.
  • Loading branch information
kjnilsson committed Feb 13, 2024
1 parent 2b9e4ed commit 8bc3d03
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 9 deletions.
20 changes: 20 additions & 0 deletions src/ra_aux.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries.
-module(ra_aux).

-export([
machine_state/1
]).

% -include("ra_server.hrl").

-opaque state() :: ra_server:state().

-export_type([state/0]).

-spec machine_state(ra_server:state()) -> term().
machine_state(State) ->
maps:get(?FUNCTION_NAME, State).
35 changes: 33 additions & 2 deletions src/ra_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,12 @@
query/3,
module/1,
init_aux/2,
handle_aux/6,
handle_aux/7,
snapshot_module/1,
version/1,
which_module/2,
which_aux_fun/1,
is_versioned/1
]).

Expand Down Expand Up @@ -343,8 +345,37 @@ init_aux(Mod, Name) ->
when AuxState :: term(),
LogState :: ra_log:state().
handle_aux(Mod, RaftState, Type, Cmd, Aux, Log, MacState) ->
?OPT_CALL(Mod:handle_aux(RaftState, Type, Cmd, Aux, Log, MacState),
undefined).
Mod:handle_aux(RaftState, Type, Cmd, Aux, Log, MacState).


-spec handle_aux(module(),
ra_server:ra_state(),
{call, From :: from()} | cast,
Command :: term(),
AuxState,
State) ->
{reply, Reply :: term(), AuxState, State} |
{reply, Reply :: term(), AuxState, State,
[{monitor, process, aux, pid()}]} |
{no_reply, AuxState, State} |
{no_reply, AuxState, State,
[{monitor, process, aux, pid()}]}
when AuxState :: term(),
State :: ra_server:state().
handle_aux(Mod, RaftState, Type, Cmd, Aux, State) ->
Mod:handle_aux(RaftState, Type, Cmd, Aux, State).

-spec which_aux_fun(module()) ->
undefined | {atom(), arity()}.
which_aux_fun(Mod) when is_atom(Mod) ->
case lists:sort([E || {handle_aux, _Arity} = E
<- erlang:apply(Mod,module_info, [exports])]) of
[] ->
undefined;
[AuxFun | _] ->
%% favour {handle_aux, 5} as this is the newer api
AuxFun
end.

-spec query(module(), fun((state()) -> Result), state()) ->
Result when Result :: term().
Expand Down
49 changes: 42 additions & 7 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@
commit_latency => option(non_neg_integer())
}.

-type state() :: ra_server_state().

-type ra_state() :: leader | follower | candidate
| pre_vote | await_condition | delete_and_terminate
| terminating_leader | terminating_follower | recover
Expand Down Expand Up @@ -214,7 +216,8 @@

-type config() :: ra_server_config().

-export_type([config/0,
-export_type([state/0,
config/0,
ra_server_state/0,
ra_state/0,
ra_server_config/0,
Expand Down Expand Up @@ -326,6 +329,7 @@ init(#{id := Id,
machine_versions = [{SnapshotIdx, MacVer}],
effective_machine_version = MacVer,
effective_machine_module = MacMod,
effective_handle_aux_fun = ra_machine:which_aux_fun(MacMod),
max_pipeline_count = MaxPipelineCount,
max_append_entries_rpc_batch_size = MaxAERBatchSize,
counter = maps:get(counter, Config, undefined),
Expand Down Expand Up @@ -1300,7 +1304,9 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
Cfg0#cfg{effective_machine_version = SnapMacVer,
machine_versions = [{SnapIndex, SnapMacVer}
| MachineVersions],
effective_machine_module = EffMacMod};
effective_machine_module = EffMacMod,
effective_handle_aux_fun =
ra_machine:which_aux_fun(EffMacMod)};
false ->
Cfg0
end,
Expand Down Expand Up @@ -1476,11 +1482,37 @@ is_fully_replicated(#{commit_index := CI} = State) ->
MinMI >= CI andalso MinCI >= CI
end.

handle_aux(RaftState, Type, Cmd, #{cfg := #cfg{effective_machine_module = MacMod},
aux_state := Aux0, log := Log0,
machine_state := MacState0} = State0) ->
handle_aux(RaftState, _Type, _Cmd,
#{cfg := #cfg{effective_handle_aux_fun = undefined}} = State0) ->
%% todo reply with error if Type is a call?
{RaftState, State0, []};
handle_aux(RaftState, Type, Cmd,
#{cfg := #cfg{effective_machine_module = MacMod,
effective_handle_aux_fun = {handle_aux, 5}},
aux_state := Aux0} = State0) ->
%% NEW API
case ra_machine:handle_aux(MacMod, RaftState, Type, Cmd, Aux0,
State0) of
{reply, Reply, Aux, State} ->
{RaftState, State#{aux_state => Aux},
[{reply, Reply}]};
{reply, Reply, Aux, State, Effects} ->
{RaftState, State#{aux_state => Aux},
[{reply, Reply} | Effects]};
{no_reply, Aux, State} ->
{RaftState, State#{aux_state => Aux}, []};
{no_reply, Aux, State, Effects} ->
{RaftState, State#{aux_state => Aux}, Effects}
end;
handle_aux(RaftState, Type, Cmd,
#{cfg := #cfg{effective_machine_module = MacMod,
effective_handle_aux_fun = {handle_aux, 6}},
aux_state := Aux0,
machine_state := MacState,
log := Log0} = State0) ->
%% OLD API
case ra_machine:handle_aux(MacMod, RaftState, Type, Cmd, Aux0,
Log0, MacState0) of
Log0, MacState) of
{reply, Reply, Aux, Log} ->
{RaftState, State0#{log => Log, aux_state => Aux},
[{reply, Reply}]};
Expand Down Expand Up @@ -2494,7 +2526,10 @@ apply_with({Idx, Term, {noop, CmdMeta, NextMacVer}},
Cfg = Cfg0#cfg{effective_machine_version = NextMacVer,
%% record this machine version "term"
machine_versions = [{Idx, NextMacVer} | MacVersions],
effective_machine_module = Module},
effective_machine_module = Module,
effective_handle_aux_fun =
ra_machine:which_aux_fun(Module)
},
State = State0#{cfg => Cfg,
cluster_change_permitted => ClusterChangePerm},
Meta = augment_command_meta(Idx, Term, MacVer, CmdMeta),
Expand Down
1 change: 1 addition & 0 deletions src/ra_server.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
machine_versions :: [{ra_index(), ra_machine:version()}, ...],
effective_machine_version :: ra_machine:version(),
effective_machine_module :: module(),
effective_handle_aux_fun :: undefined | {handle_aux, 5 | 6},
max_pipeline_count = ?DEFAULT_MAX_PIPELINE_COUNT :: non_neg_integer(),
max_append_entries_rpc_batch_size = ?AER_CHUNK_SIZE :: non_neg_integer(),
counter :: undefined | counters:counters_ref(),
Expand Down
97 changes: 97 additions & 0 deletions test/ra_machine_int_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ all_tests() ->
aux_eval,
aux_tick,
aux_command,
aux_command_v2,
aux_command_v1_and_v2,
aux_monitor_effect,
aux_and_machine_monitor_same_process,
aux_and_machine_monitor_same_node,
Expand Down Expand Up @@ -634,6 +636,101 @@ aux_command(Config) ->
ra:delete_cluster(Cluster),
ok.

aux_command_v2(Config) ->
ClusterName = ?config(cluster_name, Config),
ServerId1 = ?config(server_id, Config),
Cluster = [ServerId1,
?config(server_id2, Config),
?config(server_id3, Config)],
Mod = ?config(modname, Config),
meck:new(Mod, [non_strict]),
meck:expect(Mod, init, fun (_) -> [] end),
meck:expect(Mod, init_aux, fun (_) -> undefined end),
meck:expect(Mod, apply,
fun (_, {monitor_me, Pid}, State) ->
{[Pid | State], ok, [{monitor, process, Pid}]};
(_, Cmd, State) ->
ct:pal("handling ~p", [Cmd]),
%% handle all
{State, ok}
end),
meck:expect(Mod, handle_aux,
fun
(RaftState, {call, _From}, emit, AuxState, Opaque) ->
%% emits aux state
{reply, {RaftState, AuxState}, AuxState, Opaque};
(_RaftState, cast, eval, AuxState, Opaque) ->
%% replaces aux state
{no_reply, AuxState, Opaque};
(_RaftState, cast, NewState, _AuxState, Opaque) ->
%% replaces aux state
{no_reply, NewState, Opaque}

end),
ok = start_cluster(ClusterName, {module, Mod, #{}}, Cluster),
{ok, _, Leader} = ra:members(ServerId1),
ok = ra:cast_aux_command(Leader, banana),
{leader, banana} = ra:aux_command(Leader, emit),
[ServerId2, ServerId3] = Cluster -- [Leader],
{follower, undefined} = ra:aux_command(ServerId2, emit),
ok = ra:cast_aux_command(ServerId2, apple),
{follower, apple} = ra:aux_command(ServerId2, emit),
{follower, undefined} = ra:aux_command(ServerId3, emit),
ok = ra:cast_aux_command(ServerId3, orange),
{follower, orange} = ra:aux_command(ServerId3, emit),
ra:delete_cluster(Cluster),
ok.

aux_command_v1_and_v2(Config) ->
ClusterName = ?config(cluster_name, Config),
ServerId1 = ?config(server_id, Config),
Cluster = [ServerId1,
?config(server_id2, Config),
?config(server_id3, Config)],
Mod = ?config(modname, Config),
meck:new(Mod, [non_strict]),
meck:expect(Mod, init, fun (_) -> [] end),
meck:expect(Mod, init_aux, fun (_) -> undefined end),
meck:expect(Mod, apply,
fun (_, {monitor_me, Pid}, State) ->
{[Pid | State], ok, [{monitor, process, Pid}]};
(_, Cmd, State) ->
ct:pal("handling ~p", [Cmd]),
%% handle all
{State, ok}
end),
meck:expect(Mod, handle_aux,
fun
(_RaftState, _, _, _AuxState, _Log, _MacState) ->
exit(wrong_callback)
end),
meck:expect(Mod, handle_aux,
fun
(RaftState, {call, _From}, emit, AuxState, Opaque) ->
%% emits aux state
{reply, {RaftState, AuxState}, AuxState, Opaque};
(_RaftState, cast, eval, AuxState, Opaque) ->
%% replaces aux state
{no_reply, AuxState, Opaque};
(_RaftState, cast, NewState, _AuxState, Opaque) ->
%% replaces aux state
{no_reply, NewState, Opaque}

end),
ok = start_cluster(ClusterName, {module, Mod, #{}}, Cluster),
{ok, _, Leader} = ra:members(ServerId1),
ok = ra:cast_aux_command(Leader, banana),
{leader, banana} = ra:aux_command(Leader, emit),
[ServerId2, ServerId3] = Cluster -- [Leader],
{follower, undefined} = ra:aux_command(ServerId2, emit),
ok = ra:cast_aux_command(ServerId2, apple),
{follower, apple} = ra:aux_command(ServerId2, emit),
{follower, undefined} = ra:aux_command(ServerId3, emit),
ok = ra:cast_aux_command(ServerId3, orange),
{follower, orange} = ra:aux_command(ServerId3, emit),
ra:delete_cluster(Cluster),
ok.

aux_eval(Config) ->
%% aux handle is automatically passed an eval command after new entries
%% have been applied
Expand Down

0 comments on commit 8bc3d03

Please sign in to comment.