diff --git a/src/ra_aux.erl b/src/ra_aux.erl new file mode 100644 index 00000000..ef3fe61c --- /dev/null +++ b/src/ra_aux.erl @@ -0,0 +1,20 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. +-module(ra_aux). + +-export([ + machine_state/1 + ]). + +% -include("ra_server.hrl"). + +-opaque state() :: ra_server:state(). + +-export_type([state/0]). + +-spec machine_state(ra_server:state()) -> term(). +machine_state(State) -> + maps:get(?FUNCTION_NAME, State). diff --git a/src/ra_machine.erl b/src/ra_machine.erl index db5e87ff..34d72eb4 100644 --- a/src/ra_machine.erl +++ b/src/ra_machine.erl @@ -76,10 +76,12 @@ query/3, module/1, init_aux/2, + handle_aux/6, handle_aux/7, snapshot_module/1, version/1, which_module/2, + which_aux_fun/1, is_versioned/1 ]). @@ -343,8 +345,37 @@ init_aux(Mod, Name) -> when AuxState :: term(), LogState :: ra_log:state(). handle_aux(Mod, RaftState, Type, Cmd, Aux, Log, MacState) -> - ?OPT_CALL(Mod:handle_aux(RaftState, Type, Cmd, Aux, Log, MacState), - undefined). + Mod:handle_aux(RaftState, Type, Cmd, Aux, Log, MacState). + + +-spec handle_aux(module(), + ra_server:ra_state(), + {call, From :: from()} | cast, + Command :: term(), + AuxState, + State) -> + {reply, Reply :: term(), AuxState, State} | + {reply, Reply :: term(), AuxState, State, + [{monitor, process, aux, pid()}]} | + {no_reply, AuxState, State} | + {no_reply, AuxState, State, + [{monitor, process, aux, pid()}]} + when AuxState :: term(), + State :: ra_server:state(). +handle_aux(Mod, RaftState, Type, Cmd, Aux, State) -> + Mod:handle_aux(RaftState, Type, Cmd, Aux, State). + +-spec which_aux_fun(module()) -> + undefined | {atom(), arity()}. +which_aux_fun(Mod) when is_atom(Mod) -> + case lists:sort([E || {handle_aux, _Arity} = E + <- erlang:apply(Mod,module_info, [exports])]) of + [] -> + undefined; + [AuxFun | _] -> + %% favour {handle_aux, 5} as this is the newer api + AuxFun + end. -spec query(module(), fun((state()) -> Result), state()) -> Result when Result :: term(). diff --git a/src/ra_server.erl b/src/ra_server.erl index dc57d016..932009b9 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -91,6 +91,8 @@ commit_latency => option(non_neg_integer()) }. +-type state() :: ra_server_state(). + -type ra_state() :: leader | follower | candidate | pre_vote | await_condition | delete_and_terminate | terminating_leader | terminating_follower | recover @@ -214,7 +216,8 @@ -type config() :: ra_server_config(). --export_type([config/0, +-export_type([state/0, + config/0, ra_server_state/0, ra_state/0, ra_server_config/0, @@ -326,6 +329,7 @@ init(#{id := Id, machine_versions = [{SnapshotIdx, MacVer}], effective_machine_version = MacVer, effective_machine_module = MacMod, + effective_handle_aux_fun = ra_machine:which_aux_fun(MacMod), max_pipeline_count = MaxPipelineCount, max_append_entries_rpc_batch_size = MaxAERBatchSize, counter = maps:get(counter, Config, undefined), @@ -1300,7 +1304,9 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term, Cfg0#cfg{effective_machine_version = SnapMacVer, machine_versions = [{SnapIndex, SnapMacVer} | MachineVersions], - effective_machine_module = EffMacMod}; + effective_machine_module = EffMacMod, + effective_handle_aux_fun = + ra_machine:which_aux_fun(EffMacMod)}; false -> Cfg0 end, @@ -1476,11 +1482,37 @@ is_fully_replicated(#{commit_index := CI} = State) -> MinMI >= CI andalso MinCI >= CI end. -handle_aux(RaftState, Type, Cmd, #{cfg := #cfg{effective_machine_module = MacMod}, - aux_state := Aux0, log := Log0, - machine_state := MacState0} = State0) -> +handle_aux(RaftState, _Type, _Cmd, + #{cfg := #cfg{effective_handle_aux_fun = undefined}} = State0) -> + %% todo reply with error if Type is a call? + {RaftState, State0, []}; +handle_aux(RaftState, Type, Cmd, + #{cfg := #cfg{effective_machine_module = MacMod, + effective_handle_aux_fun = {handle_aux, 5}}, + aux_state := Aux0} = State0) -> + %% NEW API + case ra_machine:handle_aux(MacMod, RaftState, Type, Cmd, Aux0, + State0) of + {reply, Reply, Aux, State} -> + {RaftState, State#{aux_state => Aux}, + [{reply, Reply}]}; + {reply, Reply, Aux, State, Effects} -> + {RaftState, State#{aux_state => Aux}, + [{reply, Reply} | Effects]}; + {no_reply, Aux, State} -> + {RaftState, State#{aux_state => Aux}, []}; + {no_reply, Aux, State, Effects} -> + {RaftState, State#{aux_state => Aux}, Effects} + end; +handle_aux(RaftState, Type, Cmd, + #{cfg := #cfg{effective_machine_module = MacMod, + effective_handle_aux_fun = {handle_aux, 6}}, + aux_state := Aux0, + machine_state := MacState, + log := Log0} = State0) -> + %% OLD API case ra_machine:handle_aux(MacMod, RaftState, Type, Cmd, Aux0, - Log0, MacState0) of + Log0, MacState) of {reply, Reply, Aux, Log} -> {RaftState, State0#{log => Log, aux_state => Aux}, [{reply, Reply}]}; @@ -2494,7 +2526,10 @@ apply_with({Idx, Term, {noop, CmdMeta, NextMacVer}}, Cfg = Cfg0#cfg{effective_machine_version = NextMacVer, %% record this machine version "term" machine_versions = [{Idx, NextMacVer} | MacVersions], - effective_machine_module = Module}, + effective_machine_module = Module, + effective_handle_aux_fun = + ra_machine:which_aux_fun(Module) + }, State = State0#{cfg => Cfg, cluster_change_permitted => ClusterChangePerm}, Meta = augment_command_meta(Idx, Term, MacVer, CmdMeta), diff --git a/src/ra_server.hrl b/src/ra_server.hrl index d1eebec2..18afd039 100644 --- a/src/ra_server.hrl +++ b/src/ra_server.hrl @@ -20,6 +20,7 @@ machine_versions :: [{ra_index(), ra_machine:version()}, ...], effective_machine_version :: ra_machine:version(), effective_machine_module :: module(), + effective_handle_aux_fun :: undefined | {handle_aux, 5 | 6}, max_pipeline_count = ?DEFAULT_MAX_PIPELINE_COUNT :: non_neg_integer(), max_append_entries_rpc_batch_size = ?AER_CHUNK_SIZE :: non_neg_integer(), counter :: undefined | counters:counters_ref(), diff --git a/test/ra_machine_int_SUITE.erl b/test/ra_machine_int_SUITE.erl index af4de288..526c2c5a 100644 --- a/test/ra_machine_int_SUITE.erl +++ b/test/ra_machine_int_SUITE.erl @@ -47,6 +47,8 @@ all_tests() -> aux_eval, aux_tick, aux_command, + aux_command_v2, + aux_command_v1_and_v2, aux_monitor_effect, aux_and_machine_monitor_same_process, aux_and_machine_monitor_same_node, @@ -634,6 +636,101 @@ aux_command(Config) -> ra:delete_cluster(Cluster), ok. +aux_command_v2(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId1 = ?config(server_id, Config), + Cluster = [ServerId1, + ?config(server_id2, Config), + ?config(server_id3, Config)], + Mod = ?config(modname, Config), + meck:new(Mod, [non_strict]), + meck:expect(Mod, init, fun (_) -> [] end), + meck:expect(Mod, init_aux, fun (_) -> undefined end), + meck:expect(Mod, apply, + fun (_, {monitor_me, Pid}, State) -> + {[Pid | State], ok, [{monitor, process, Pid}]}; + (_, Cmd, State) -> + ct:pal("handling ~p", [Cmd]), + %% handle all + {State, ok} + end), + meck:expect(Mod, handle_aux, + fun + (RaftState, {call, _From}, emit, AuxState, Opaque) -> + %% emits aux state + {reply, {RaftState, AuxState}, AuxState, Opaque}; + (_RaftState, cast, eval, AuxState, Opaque) -> + %% replaces aux state + {no_reply, AuxState, Opaque}; + (_RaftState, cast, NewState, _AuxState, Opaque) -> + %% replaces aux state + {no_reply, NewState, Opaque} + + end), + ok = start_cluster(ClusterName, {module, Mod, #{}}, Cluster), + {ok, _, Leader} = ra:members(ServerId1), + ok = ra:cast_aux_command(Leader, banana), + {leader, banana} = ra:aux_command(Leader, emit), + [ServerId2, ServerId3] = Cluster -- [Leader], + {follower, undefined} = ra:aux_command(ServerId2, emit), + ok = ra:cast_aux_command(ServerId2, apple), + {follower, apple} = ra:aux_command(ServerId2, emit), + {follower, undefined} = ra:aux_command(ServerId3, emit), + ok = ra:cast_aux_command(ServerId3, orange), + {follower, orange} = ra:aux_command(ServerId3, emit), + ra:delete_cluster(Cluster), + ok. + +aux_command_v1_and_v2(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId1 = ?config(server_id, Config), + Cluster = [ServerId1, + ?config(server_id2, Config), + ?config(server_id3, Config)], + Mod = ?config(modname, Config), + meck:new(Mod, [non_strict]), + meck:expect(Mod, init, fun (_) -> [] end), + meck:expect(Mod, init_aux, fun (_) -> undefined end), + meck:expect(Mod, apply, + fun (_, {monitor_me, Pid}, State) -> + {[Pid | State], ok, [{monitor, process, Pid}]}; + (_, Cmd, State) -> + ct:pal("handling ~p", [Cmd]), + %% handle all + {State, ok} + end), + meck:expect(Mod, handle_aux, + fun + (_RaftState, _, _, _AuxState, _Log, _MacState) -> + exit(wrong_callback) + end), + meck:expect(Mod, handle_aux, + fun + (RaftState, {call, _From}, emit, AuxState, Opaque) -> + %% emits aux state + {reply, {RaftState, AuxState}, AuxState, Opaque}; + (_RaftState, cast, eval, AuxState, Opaque) -> + %% replaces aux state + {no_reply, AuxState, Opaque}; + (_RaftState, cast, NewState, _AuxState, Opaque) -> + %% replaces aux state + {no_reply, NewState, Opaque} + + end), + ok = start_cluster(ClusterName, {module, Mod, #{}}, Cluster), + {ok, _, Leader} = ra:members(ServerId1), + ok = ra:cast_aux_command(Leader, banana), + {leader, banana} = ra:aux_command(Leader, emit), + [ServerId2, ServerId3] = Cluster -- [Leader], + {follower, undefined} = ra:aux_command(ServerId2, emit), + ok = ra:cast_aux_command(ServerId2, apple), + {follower, apple} = ra:aux_command(ServerId2, emit), + {follower, undefined} = ra:aux_command(ServerId3, emit), + ok = ra:cast_aux_command(ServerId3, orange), + {follower, orange} = ra:aux_command(ServerId3, emit), + ra:delete_cluster(Cluster), + ok. + aux_eval(Config) -> %% aux handle is automatically passed an eval command after new entries %% have been applied