From 9a83ae1d9cc5a0b7885ce8b89fea6fe9ee146e31 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Mon, 29 Jan 2024 13:55:09 +0000 Subject: [PATCH 1/5] New ra_machine:handle_aux/5 callback That replaces handle_aux/6 Adds ra_aux for interacting safely with the internal state and log. Move state query to ra_server The appropriate handle_aux function to use is re-evaluated every time the effective machine module changes and is stored in the ra_server configuration state. --- README.md | 2 +- src/ra_aux.erl | 62 ++++++++++++++++ src/ra_log.erl | 25 +++++-- src/ra_machine.erl | 50 ++++++++++++- src/ra_server.erl | 129 +++++++++++++++++++++++++++++++--- src/ra_server.hrl | 1 + src/ra_server_proc.erl | 115 +++++------------------------- test/ra_machine_int_SUITE.erl | 100 ++++++++++++++++++++++++++ 8 files changed, 371 insertions(+), 113 deletions(-) create mode 100644 src/ra_aux.erl diff --git a/README.md b/README.md index 48eaf44f..3c464a88 100644 --- a/README.md +++ b/README.md @@ -440,7 +440,7 @@ Ra attempts to follow [Semantic Versioning](https://semver.org/). The modules that form part of the public API are: * `ra` -* `ra_machine` +* `ra_machine` (behaviour callbacks only) * `ra_system` * `ra_counters` * `ra_leaderboard` diff --git a/src/ra_aux.erl b/src/ra_aux.erl new file mode 100644 index 00000000..023ab5c5 --- /dev/null +++ b/src/ra_aux.erl @@ -0,0 +1,62 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. +-module(ra_aux). + +-export([ + machine_state/1, + leader_id/1, + members_info/1, + overview/1, + log_last_index_term/1, + log_fetch/2, + log_stats/1 + ]). + +-include("ra.hrl"). + +-opaque state() :: ra_server:state(). + +-export_type([state/0]). + +-spec machine_state(ra_aux:state()) -> term(). +machine_state(State) -> + maps:get(?FUNCTION_NAME, State). + +-spec leader_id(ra_aux:state()) -> undefined | ra_server_id(). +leader_id(State) -> + maps:get(?FUNCTION_NAME, State). + +-spec members_info(ra_aux:state()) -> ra_cluster(). +members_info(State) -> + ra_server:state_query(?FUNCTION_NAME, State). + +-spec overview(ra_aux:state()) -> map(). +overview(State) -> + ra_server:state_query(?FUNCTION_NAME, State). + +-spec log_last_index_term(ra_aux:state()) -> ra_idxterm(). +log_last_index_term(#{log := Log}) -> + ra_log:last_index_term(Log). + +-spec log_fetch(ra_index(), ra_aux:state()) -> + {undefined | + {ra_term(), + CmdMetadata :: ra_server:command_meta(), + Command :: term()}, ra_aux:state()}. +log_fetch(Idx, #{log := Log0} = State) + when is_integer(Idx) -> + case ra_log:fetch(Idx, Log0) of + {{Idx, Term, {'$usr', Meta, Cmd, _ReplyMode}}, Log} -> + {{Term, Meta, Cmd}, State#{log => Log}}; + {_, Log} -> + %% we only allow usr commands to be read + {undefined, State#{log => Log}} + end. + +-spec log_stats(ra_aux:state()) -> ra_log:overview(). +log_stats(#{log := Log}) -> + ra_log:overview(Log). + diff --git a/src/ra_log.erl b/src/ra_log.erl index ff4cd0a8..001ef9f9 100644 --- a/src/ra_log.erl +++ b/src/ra_log.erl @@ -127,13 +127,26 @@ counter => counters:counters_ref(), initial_access_pattern => sequential | random}. + +-type overview() :: + #{type := ra_log, + last_index := ra_index(), + first_index := ra_index(), + last_written_index_term := ra_idxterm(), + num_segments := non_neg_integer(), + open_segments => non_neg_integer(), + snapshot_index => undefined | ra_index(), + cache_size => non_neg_integer(), + atom() => term()}. + -export_type([state/0, ra_log_init_args/0, ra_meta_key/0, segment_ref/0, event/0, event_body/0, - effect/0 + effect/0, + overview/0 ]). pre_init(#{uid := UId, @@ -785,7 +798,7 @@ exists({Idx, Term}, Log0) -> {false, Log} end. --spec overview(state()) -> map(). +-spec overview(state()) -> overview(). overview(#?MODULE{last_index = LastIndex, first_index = FirstIndex, last_written_index_term = LWIT, @@ -803,10 +816,10 @@ overview(#?MODULE{last_index = LastIndex, {I, _} -> I end, latest_checkpoint_index => - case ra_snapshot:latest_checkpoint(SnapshotState) of - undefined -> undefined; - {I, _} -> I - end, + case ra_snapshot:latest_checkpoint(SnapshotState) of + undefined -> undefined; + {I, _} -> I + end, cache_size => ra_log_cache:size(Cache) }. diff --git a/src/ra_machine.erl b/src/ra_machine.erl index 4734019a..a2443163 100644 --- a/src/ra_machine.erl +++ b/src/ra_machine.erl @@ -76,10 +76,12 @@ query/3, module/1, init_aux/2, + handle_aux/6, handle_aux/7, snapshot_module/1, version/1, which_module/2, + which_aux_fun/1, is_versioned/1 ]). @@ -209,6 +211,7 @@ -optional_callbacks([tick/2, state_enter/2, init_aux/1, + handle_aux/5, handle_aux/6, overview/1, snapshot_module/0, @@ -239,6 +242,20 @@ -callback init_aux(Name :: atom()) -> term(). +-callback handle_aux(ra_server:ra_state(), + {call, From :: from()} | cast, + Command :: term(), + AuxState, + State) -> + {reply, Reply :: term(), AuxState, State} | + {reply, Reply :: term(), AuxState, State, + [{monitor, process, aux, pid()}]} | + {no_reply, AuxState, State} | + {no_reply, AuxState, State, + [{monitor, process, aux, pid()}]} + when AuxState :: term(), + State :: ra_aux:state(). + -callback handle_aux(ra_server:ra_state(), {call, From :: from()} | cast, Command :: term(), @@ -335,8 +352,37 @@ init_aux(Mod, Name) -> when AuxState :: term(), LogState :: ra_log:state(). handle_aux(Mod, RaftState, Type, Cmd, Aux, Log, MacState) -> - ?OPT_CALL(Mod:handle_aux(RaftState, Type, Cmd, Aux, Log, MacState), - undefined). + Mod:handle_aux(RaftState, Type, Cmd, Aux, Log, MacState). + + +-spec handle_aux(module(), + ra_server:ra_state(), + {call, From :: from()} | cast, + Command :: term(), + AuxState, + State) -> + {reply, Reply :: term(), AuxState, State} | + {reply, Reply :: term(), AuxState, State, + [{monitor, process, aux, pid()}]} | + {no_reply, AuxState, State} | + {no_reply, AuxState, State, + [{monitor, process, aux, pid()}]} + when AuxState :: term(), + State :: ra_server:state(). +handle_aux(Mod, RaftState, Type, Cmd, Aux, State) -> + Mod:handle_aux(RaftState, Type, Cmd, Aux, State). + +-spec which_aux_fun(module()) -> + undefined | {atom(), arity()}. +which_aux_fun(Mod) when is_atom(Mod) -> + case lists:sort([E || {handle_aux, _Arity} = E + <- erlang:apply(Mod,module_info, [exports])]) of + [] -> + undefined; + [AuxFun | _] -> + %% favour {handle_aux, 5} as this is the newer api + AuxFun + end. -spec query(module(), fun((state()) -> Result), state()) -> Result when Result :: term(). diff --git a/src/ra_server.erl b/src/ra_server.erl index e1815116..12d44431 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -58,7 +58,8 @@ log_fold/3, log_read/2, get_membership/1, - recover/1 + recover/1, + state_query/2 ]). -type ra_await_condition_fun() :: @@ -92,6 +93,8 @@ commit_latency => option(non_neg_integer()) }. +-type state() :: ra_server_state(). + -type ra_state() :: leader | follower | candidate | pre_vote | await_condition | delete_and_terminate | terminating_leader | terminating_follower | recover @@ -215,7 +218,8 @@ -type config() :: ra_server_config(). --export_type([config/0, +-export_type([state/0, + config/0, ra_server_state/0, ra_state/0, ra_server_config/0, @@ -327,6 +331,7 @@ init(#{id := Id, machine_versions = [{SnapshotIdx, MacVer}], effective_machine_version = MacVer, effective_machine_module = MacMod, + effective_handle_aux_fun = ra_machine:which_aux_fun(MacMod), max_pipeline_count = MaxPipelineCount, max_append_entries_rpc_batch_size = MaxAERBatchSize, counter = maps:get(counter, Config, undefined), @@ -1301,7 +1306,9 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term, Cfg0#cfg{effective_machine_version = SnapMacVer, machine_versions = [{SnapIndex, SnapMacVer} | MachineVersions], - effective_machine_module = EffMacMod}; + effective_machine_module = EffMacMod, + effective_handle_aux_fun = + ra_machine:which_aux_fun(EffMacMod)}; false -> Cfg0 end, @@ -1477,11 +1484,37 @@ is_fully_replicated(#{commit_index := CI} = State) -> MinMI >= CI andalso MinCI >= CI end. -handle_aux(RaftState, Type, Cmd, #{cfg := #cfg{effective_machine_module = MacMod}, - aux_state := Aux0, log := Log0, - machine_state := MacState0} = State0) -> +handle_aux(RaftState, _Type, _Cmd, + #{cfg := #cfg{effective_handle_aux_fun = undefined}} = State0) -> + %% todo reply with error if Type is a call? + {RaftState, State0, []}; +handle_aux(RaftState, Type, Cmd, + #{cfg := #cfg{effective_machine_module = MacMod, + effective_handle_aux_fun = {handle_aux, 5}}, + aux_state := Aux0} = State0) -> + %% NEW API + case ra_machine:handle_aux(MacMod, RaftState, Type, Cmd, Aux0, + State0) of + {reply, Reply, Aux, State} -> + {RaftState, State#{aux_state => Aux}, + [{reply, Reply}]}; + {reply, Reply, Aux, State, Effects} -> + {RaftState, State#{aux_state => Aux}, + [{reply, Reply} | Effects]}; + {no_reply, Aux, State} -> + {RaftState, State#{aux_state => Aux}, []}; + {no_reply, Aux, State, Effects} -> + {RaftState, State#{aux_state => Aux}, Effects} + end; +handle_aux(RaftState, Type, Cmd, + #{cfg := #cfg{effective_machine_module = MacMod, + effective_handle_aux_fun = {handle_aux, 6}}, + aux_state := Aux0, + machine_state := MacState, + log := Log0} = State0) -> + %% OLD API case ra_machine:handle_aux(MacMod, RaftState, Type, Cmd, Aux0, - Log0, MacState0) of + Log0, MacState) of {reply, Reply, Aux, Log} -> {RaftState, State0#{log => Log, aux_state => Aux}, [{reply, Reply}]}; @@ -2226,6 +2259,83 @@ update_term(_, State) -> last_idx_term(#{log := Log}) -> ra_log:last_index_term(Log). + +state_query(all, State) -> State; +state_query(overview, State) -> + overview(State); +state_query(machine, #{machine_state := MacState}) -> + MacState; +state_query(voters, #{cluster := Cluster}) -> + Vs = maps:fold(fun(K, V, Acc) -> + case maps:get(voter_status, V, undefined) of + undefined -> [K|Acc]; + S -> case maps:get(membership, S, undefined) of + undefined -> [K|Acc]; + voter -> [K|Acc]; + _ -> Acc + end + end + end, [], Cluster), + Vs; +state_query(leader, State) -> + maps:get(leader_id, State, undefined); +state_query(members, #{cluster := Cluster}) -> + maps:keys(Cluster); +state_query(members_info, #{cfg := #cfg{id = Self}, cluster := Cluster, + leader_id := Self, query_index := QI, commit_index := CI, + membership := Membership}) -> + maps:map(fun(Id, Peer) -> + case {Id, Peer} of + {Self, Peer = #{voter_status := VoterStatus}} -> + %% For completeness sake, preserve `target` + %% of once promoted leader. + #{next_index => CI+1, + match_index => CI, + query_index => QI, + status => normal, + voter_status => VoterStatus#{membership => Membership}}; + {Self, _} -> + #{next_index => CI+1, + match_index => CI, + query_index => QI, + status => normal, + voter_status => #{membership => Membership}}; + {_, Peer = #{voter_status := _}} -> + Peer; + {_, Peer} -> + %% Initial cluster members have no voter_status. + Peer#{voter_status => #{membership => voter}} + end + end, Cluster); +state_query(members_info, #{cfg := #cfg{id = Self}, cluster := Cluster, + query_index := QI, commit_index := CI, + membership := Membership}) -> + %% Followers do not have sufficient information, + %% bail out and send whatever we have. + maps:map(fun(Id, Peer) -> + case {Id, Peer} of + {Self, #{voter_status := VS}} -> + #{match_index => CI, + query_index => QI, + voter_status => VS#{membership => Membership}}; + {Self, _} -> + #{match_index => CI, + query_index => QI, + voter_status => #{membership => Membership}}; + _ -> + #{} + end + end, Cluster); +state_query(initial_members, #{log := Log}) -> + case ra_log:read_config(Log) of + {ok, #{initial_members := InitialMembers}} -> + InitialMembers; + _ -> + error + end; +state_query(Query, _State) -> + {error, {unknown_query, Query}}. + %% ยง 5.4.1 Raft determines which of two logs is more up-to-date by comparing %% the index and term of the last entries in the logs. If the logs have last %% entries with different terms, then the log with the later term is more @@ -2448,7 +2558,10 @@ apply_with({Idx, Term, {noop, CmdMeta, NextMacVer}}, Cfg = Cfg0#cfg{effective_machine_version = NextMacVer, %% record this machine version "term" machine_versions = [{Idx, NextMacVer} | MacVersions], - effective_machine_module = Module}, + effective_machine_module = Module, + effective_handle_aux_fun = + ra_machine:which_aux_fun(Module) + }, State = State0#{cfg => Cfg, cluster_change_permitted => ClusterChangePerm}, Meta = augment_command_meta(Idx, Term, MacVer, CmdMeta), diff --git a/src/ra_server.hrl b/src/ra_server.hrl index d1eebec2..18afd039 100644 --- a/src/ra_server.hrl +++ b/src/ra_server.hrl @@ -20,6 +20,7 @@ machine_versions :: [{ra_index(), ra_machine:version()}, ...], effective_machine_version :: ra_machine:version(), effective_machine_module :: module(), + effective_handle_aux_fun :: undefined | {handle_aux, 5 | 6}, max_pipeline_count = ?DEFAULT_MAX_PIPELINE_COUNT :: non_neg_integer(), max_append_entries_rpc_batch_size = ?AER_CHUNK_SIZE :: non_neg_integer(), counter :: undefined | counters:counters_ref(), diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index 7acdbecf..6b6ca9fc 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -379,11 +379,12 @@ recovered(internal, next, #state{server_state = ServerState} = State) -> leader(enter, OldState, State0) -> {State, Actions} = handle_enter(?FUNCTION_NAME, OldState, State0), + ok = record_cluster_change(State), %% TODO: reset refs? {keep_state, State#state{leader_last_seen = undefined, pending_notifys = #{}, - low_priority_commands = ra_ets_queue:reset(State0#state.low_priority_commands), + low_priority_commands = ra_ets_queue:reset(State#state.low_priority_commands), election_timeout_set = false}, Actions}; leader(EventType, {leader_call, Msg}, State) -> % no need to redirect @@ -469,9 +470,8 @@ leader({call, From}, {local_query, QueryFun}, server_state = ServerState} = State) -> Reply = perform_local_query(QueryFun, id(State), ServerState, Conf), {keep_state, State, [{reply, From, Reply}]}; -leader({call, From}, {state_query, Spec}, - #state{server_state = ServerState} = State) -> - Reply = {ok, do_state_query(Spec, ServerState), id(State)}, +leader({call, From}, {state_query, Spec}, State) -> + Reply = {ok, do_state_query(Spec, State), id(State)}, {keep_state, State, [{reply, From, Reply}]}; leader({call, From}, {consistent_query, QueryFun}, #state{conf = Conf, @@ -559,9 +559,8 @@ candidate({call, From}, {local_query, QueryFun}, #state{conf = Conf, server_state = ServerState} = State) -> Reply = perform_local_query(QueryFun, not_known, ServerState, Conf), {keep_state, State, [{reply, From, Reply}]}; -candidate({call, From}, {state_query, Spec}, - #state{server_state = ServerState} = State) -> - Reply = {ok, do_state_query(Spec, ServerState), id(State)}, +candidate({call, From}, {state_query, Spec}, State) -> + Reply = {ok, do_state_query(Spec, State), id(State)}, {keep_state, State, [{reply, From, Reply}]}; candidate({call, From}, ping, State) -> {keep_state, State, [{reply, From, {pong, candidate}}]}; @@ -612,9 +611,8 @@ pre_vote({call, From}, {local_query, QueryFun}, #state{conf = Conf, server_state = ServerState} = State) -> Reply = perform_local_query(QueryFun, not_known, ServerState, Conf), {keep_state, State, [{reply, From, Reply}]}; -pre_vote({call, From}, {state_query, Spec}, - #state{server_state = ServerState} = State) -> - Reply = {ok, do_state_query(Spec, ServerState), id(State)}, +pre_vote({call, From}, {state_query, Spec}, State) -> + Reply = {ok, do_state_query(Spec, State), id(State)}, {keep_state, State, [{reply, From, Reply}]}; pre_vote({call, From}, ping, State) -> {keep_state, State, [{reply, From, {pong, pre_vote}}]}; @@ -699,9 +697,8 @@ follower({call, From}, {local_query, QueryFun}, end, Reply = perform_local_query(QueryFun, Leader, ServerState, Conf), {keep_state, State, [{reply, From, Reply}]}; -follower({call, From}, {state_query, Spec}, - #state{server_state = ServerState} = State) -> - Reply = {ok, do_state_query(Spec, ServerState), id(State)}, +follower({call, From}, {state_query, Spec}, State) -> + Reply = {ok, do_state_query(Spec, State), id(State)}, {keep_state, State, [{reply, From, Reply}]}; follower(EventType, {aux_command, Cmd}, State0) -> {_, ServerState, Effects} = ra_server:handle_aux(?FUNCTION_NAME, EventType, Cmd, @@ -891,9 +888,8 @@ await_condition({call, From}, {local_query, QueryFun}, #state{conf = Conf, server_state = ServerState} = State) -> Reply = perform_local_query(QueryFun, follower, ServerState, Conf), {keep_state, State, [{reply, From, Reply}]}; -await_condition({call, From}, {state_query, Spec}, - #state{server_state = ServerState} = State) -> - Reply = {ok, do_state_query(Spec, ServerState), id(State)}, +await_condition({call, From}, {state_query, Spec}, State) -> + Reply = {ok, do_state_query(Spec, State), id(State)}, {keep_state, State, [{reply, From, Reply}]}; await_condition(EventType, {aux_command, Cmd}, State0) -> {_, ServerState, Effects} = ra_server:handle_aux(?FUNCTION_NAME, EventType, @@ -1539,81 +1535,8 @@ gen_statem_safe_call(ServerId, Msg, Timeout) -> {error, shutdown} end. -do_state_query(all, State) -> State; -do_state_query(overview, State) -> - ra_server:overview(State); -do_state_query(machine, #{machine_state := MacState}) -> - MacState; -do_state_query(voters, #{cluster := Cluster}) -> - Vs = maps:fold(fun(K, V, Acc) -> - case maps:get(voter_status, V, undefined) of - undefined -> [K|Acc]; - S -> case maps:get(membership, S, undefined) of - undefined -> [K|Acc]; - voter -> [K|Acc]; - _ -> Acc - end - end - end, [], Cluster), - Vs; -do_state_query(leader, State) -> - maps:get(leader_id, State, undefined); -do_state_query(members, #{cluster := Cluster}) -> - maps:keys(Cluster); -do_state_query(members_info, #{cfg := #cfg{id = Self}, cluster := Cluster, - leader_id := Self, query_index := QI, commit_index := CI, - membership := Membership}) -> - maps:map(fun(Id, Peer) -> - case {Id, Peer} of - {Self, Peer = #{voter_status := VoterStatus}} -> - %% For completeness sake, preserve `target` - %% of once promoted leader. - #{next_index => CI+1, - match_index => CI, - query_index => QI, - status => normal, - voter_status => VoterStatus#{membership => Membership}}; - {Self, _} -> - #{next_index => CI+1, - match_index => CI, - query_index => QI, - status => normal, - voter_status => #{membership => Membership}}; - {_, Peer = #{voter_status := _}} -> - Peer; - {_, Peer} -> - %% Initial cluster members have no voter_status. - Peer#{voter_status => #{membership => voter}} - end - end, Cluster); -do_state_query(members_info, #{cfg := #cfg{id = Self}, cluster := Cluster, - query_index := QI, commit_index := CI, - membership := Membership}) -> - %% Followers do not have sufficient information, - %% bail out and send whatever we have. - maps:map(fun(Id, Peer) -> - case {Id, Peer} of - {Self, #{voter_status := VS}} -> - #{match_index => CI, - query_index => QI, - voter_status => VS#{membership => Membership}}; - {Self, _} -> - #{match_index => CI, - query_index => QI, - voter_status => #{membership => Membership}}; - _ -> - #{} - end - end, Cluster); -do_state_query(initial_members, #{log := Log}) -> - case ra_log:read_config(Log) of - {ok, #{initial_members := InitialMembers}} -> - InitialMembers; - _ -> - error - end; -do_state_query(Query, _State) -> - {error, {unknown_query, Query}}. +do_state_query(QueryName, #state{server_state = State}) -> + ra_server:state_query(QueryName, State). config_defaults(ServerId) -> #{broadcast_time => ?DEFAULT_BROADCAST_TIME, @@ -1819,7 +1742,7 @@ can_execute_locally(RaftState, TargetNode, leader when TargetNode =/= node() -> %% We need to evaluate whether to send the message. %% Only send if there isn't a local node for the target pid. - Members = do_state_query(voters, State#state.server_state), + Members = do_state_query(voters, State), not lists:any(fun ({_, N}) -> N == TargetNode end, Members); leader -> true; @@ -1831,7 +1754,7 @@ can_execute_on_member(_RaftState, Member, #state{server_state = #{cfg := #cfg{id = Member}}}) -> true; can_execute_on_member(leader, Member, State) -> - Members = do_state_query(members, State#state.server_state), + Members = do_state_query(members, State), not lists:member(Member, Members); can_execute_on_member(_RaftState, _Member, _State) -> false. @@ -1883,7 +1806,7 @@ maybe_record_cluster_change(#state{conf = #conf{cluster_name = ClusterName}, if (map_get(cluster_index_term, ServerStateA) =/= map_get(cluster_index_term, ServerStateB) orelse LeaderA =/= LeaderB) -> - MembersB = do_state_query(members, ServerStateB), + MembersB = ra_server:state_query(members, ServerStateB), ok = ra_leaderboard:record(ClusterName, LeaderB, MembersB); true -> ok @@ -1891,8 +1814,8 @@ maybe_record_cluster_change(#state{conf = #conf{cluster_name = ClusterName}, record_cluster_change(#state{conf = #conf{cluster_name = ClusterName}, server_state = ServerState}) -> - Leader = do_state_query(leader, ServerState), - Members = do_state_query(members, ServerState), + Leader = ra_server:state_query(leader, ServerState), + Members = ra_server:state_query(members, ServerState), ok = ra_leaderboard:record(ClusterName, Leader, Members). incr_counter(#conf{counter = Cnt}, Ix, N) when Cnt =/= undefined -> diff --git a/test/ra_machine_int_SUITE.erl b/test/ra_machine_int_SUITE.erl index af4de288..1094f8de 100644 --- a/test/ra_machine_int_SUITE.erl +++ b/test/ra_machine_int_SUITE.erl @@ -47,6 +47,8 @@ all_tests() -> aux_eval, aux_tick, aux_command, + aux_command_v2, + aux_command_v1_and_v2, aux_monitor_effect, aux_and_machine_monitor_same_process, aux_and_machine_monitor_same_node, @@ -634,6 +636,104 @@ aux_command(Config) -> ra:delete_cluster(Cluster), ok. +aux_command_v2(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId1 = ?config(server_id, Config), + Cluster = [ServerId1, + ?config(server_id2, Config), + ?config(server_id3, Config)], + Mod = ?config(modname, Config), + meck:new(Mod, [non_strict]), + meck:expect(Mod, init, fun (_) -> [] end), + meck:expect(Mod, init_aux, fun (_) -> undefined end), + meck:expect(Mod, apply, + fun (_, {monitor_me, Pid}, State) -> + {[Pid | State], ok, [{monitor, process, Pid}]}; + (_, Cmd, State) -> + ct:pal("handling ~p", [Cmd]), + %% handle all + {State, ok} + end), + meck:expect(Mod, handle_aux, + fun + (RaftState, {call, _From}, emit, AuxState, Opaque) -> + %% emits aux state + {reply, {RaftState, AuxState}, AuxState, Opaque}; + (_RaftState, cast, eval, AuxState, Opaque) -> + %% replaces aux state + {no_reply, AuxState, Opaque}; + (_RaftState, cast, NewState, _AuxState, Opaque) -> + %% replaces aux state + {no_reply, NewState, Opaque} + + end), + ok = start_cluster(ClusterName, {module, Mod, #{}}, Cluster), + {ok, _, Leader} = ra:members(ServerId1), + ok = ra:cast_aux_command(Leader, banana), + {leader, banana} = ra:aux_command(Leader, emit), + [ServerId2, ServerId3] = Cluster -- [Leader], + {follower, undefined} = ra:aux_command(ServerId2, emit), + ok = ra:cast_aux_command(ServerId2, apple), + {follower, apple} = ra:aux_command(ServerId2, emit), + {follower, undefined} = ra:aux_command(ServerId3, emit), + ok = ra:cast_aux_command(ServerId3, orange), + {follower, orange} = ra:aux_command(ServerId3, emit), + ra:delete_cluster(Cluster), + ok. + +aux_command_v1_and_v2(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId1 = ?config(server_id, Config), + Cluster = [ServerId1, + ?config(server_id2, Config), + ?config(server_id3, Config)], + Mod = ?config(modname, Config), + meck:new(Mod, [non_strict]), + meck:expect(Mod, init, fun (_) -> [] end), + meck:expect(Mod, init_aux, fun (_) -> undefined end), + meck:expect(Mod, apply, + fun (_, {monitor_me, Pid}, State) -> + {[Pid | State], ok, [{monitor, process, Pid}]}; + (_, Cmd, State) -> + ct:pal("handling ~p", [Cmd]), + %% handle all + {State, ok} + end), + meck:expect(Mod, handle_aux, + fun + (_RaftState, _, _, _AuxState, _Log, _MacState) -> + exit(wrong_callback) + end), + meck:expect(Mod, handle_aux, + fun + (RaftState, {call, _From}, emit, AuxState, Opaque) -> + %% emits aux state + {reply, {RaftState, AuxState}, AuxState, Opaque}; + (_RaftState, cast, eval, AuxState, Opaque) -> + %% replaces aux state + {no_reply, AuxState, Opaque}; + (_RaftState, cast, NewState, _AuxState, Opaque0) -> + {Idx, _} = ra_aux:log_last_index_term(Opaque0), + {{_Term, _Meta, apple}, Opaque} = ra_aux:log_fetch(Idx, Opaque0), + %% replaces aux state + {no_reply, NewState, Opaque} + + end), + ok = start_cluster(ClusterName, {module, Mod, #{}}, Cluster), + {ok, _, Leader} = ra:members(ServerId1), + {ok, _, _} = ra:process_command(Leader, apple), + ok = ra:cast_aux_command(Leader, banana), + {leader, banana} = ra:aux_command(Leader, emit), + [ServerId2, ServerId3] = Cluster -- [Leader], + {follower, undefined} = ra:aux_command(ServerId2, emit), + ok = ra:cast_aux_command(ServerId2, apple), + {follower, apple} = ra:aux_command(ServerId2, emit), + {follower, undefined} = ra:aux_command(ServerId3, emit), + ok = ra:cast_aux_command(ServerId3, orange), + {follower, orange} = ra:aux_command(ServerId3, emit), + ra:delete_cluster(Cluster), + ok. + aux_eval(Config) -> %% aux handle is automatically passed an eval command after new entries %% have been applied From 3e876bb8d8549e8c7f68d5520e81b8bb34dfb787 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Fri, 16 Feb 2024 11:48:04 +0000 Subject: [PATCH 2/5] Improve the state machine documentation And remove duplicated information from INTERNALS.md --- README.md | 4 +- docs/internals/INTERNALS.md | 202 ++------------------- docs/internals/STATE_MACHINE_TUTORIAL.md | 222 +++++++++++++++++++++-- src/ra_aux.erl | 2 +- src/ra_machine.erl | 6 +- 5 files changed, 231 insertions(+), 205 deletions(-) diff --git a/README.md b/README.md index 3c464a88..c71dcda7 100644 --- a/README.md +++ b/README.md @@ -298,8 +298,8 @@ See [Ra state machine tutorial](docs/internals/STATE_MACHINE_TUTORIAL.md) for how to write more sophisticated state machines by implementing the `ra_machine` behaviour. -A [Ra-based key/value store example](https://github.com/rabbitmq/ra-kv-store) is available -in a separate repository. +A [Ra-based key/value store example](https://github.com/rabbitmq/ra-kv-store) +is available in a separate repository. ## Documentation diff --git a/docs/internals/INTERNALS.md b/docs/internals/INTERNALS.md index 1ca18607..f8727d7f 100644 --- a/docs/internals/INTERNALS.md +++ b/docs/internals/INTERNALS.md @@ -90,13 +90,15 @@ transitions. ### Effect Application and Failure Handling -Under normal operation only the leader that first applies an entry will attempt the effect. -Followers process the same set of commands but simply throw away any effects returned by -the state machine. +Under normal operation only the leader that first applies an entry will attempt +the effect. +Followers process the same set of commands but simply throw away any effects +returned by the state machine. -To ensure we do not re-issue effects on recovery each `ra` server persists its `last_applied` index. -When the server restarts it replays its log until this point and throws away any resulting effects as they -should already have been issued. +To ensure we do not re-issue effects on recovery each `ra` server persists its +`last_applied` index. +When the server restarts it replays its log until this point and throws away any +resulting effects as they should already have been issued. As the `last_applied` index is only persisted periodically there is a small chance that some effects may be issued multiple times when all the servers in the @@ -105,180 +107,8 @@ never be issued or reach their recipients. Ra makes no allowance for this. It is worth taking this into account when implementing a state machine. -The [Automatic Repeat Query (ARQ)](https://en.wikipedia.org/wiki/Automatic_repeat_request) protocol -can be used to implement reliable communication (Erlang message delivery) given the -above limitations. - -A number of effects are available to the user. - -### Sending a message - -The `{send_msg, pid(), Msg :: term()}` effect asynchronously sends a message -to the specified `pid`. - -`ra` uses `erlang:send/3` with the `no_connect` and `no_suspend` -options which is the least reliable way of doing it. It does this so -that a state machine `send_msg` effect will never block the main `ra` process. - -To ensure message reliability, [Automatic Repeat Query (ARQ)](https://en.wikipedia.org/wiki/Automatic_repeat_request)-like -protocols between the state machine and the receiver should be implemented -if needed. - -### Monitoring - -The `{monitor, process | node, pid() | node()}` effect will ask the `ra` leader to -monitor a process or node. If `ra` receives a `DOWN` for a process it -is monitoring it will commit a `{down, pid(), term()}` command to the log that -the state machine needs to handle. If it detects a monitored node as down or up -it will commit a `{nodeup | nodedown, node()}` command to the log. - -Use `{demonitor, process | node, pid() | node()}` to stop monitoring a process -or a node. - -All monitors are invalidated when the leader changes. State machines should -re-issue monitor effects when becoming leader using the `state_enter/2` -callback. - -### Calling a function - -The `{mod_call, module(), function(), Args :: [term()]}` effect will ask the leader -to call an arbitrary function. Care need to be taken not to block the `ra` process whilst doing so. -It is recommended that expensive operations are done in another process. - -The `mod_call` effect is useful for e.g. updating an ETS table of committed entries -or similar. - -### Setting a timer - -The `{timer, Name :: term(), Time :: non_neg_integer() | infinity}` effects asks the Ra leader -to maintain a timer on behalf of the state machine and commit a `timeout` command -when the timer triggers. If setting the time to `infinity`, the timer will not be started -and any running timer with same name will be cancelled. - -The timer is relative and setting another timer with the same name before the current -timer runs out results in the current timer being reset. - -All timers are invalidated when the leader changes. State machines should -re-issue timer effects when becoming leader using the `state_enter/2` -callback. - -### Reading a log - -Use `{log, Indexes :: [ra_index()], fun(([user_command()]) -> effects()}` to read -commands from the log from the specified indexes and return a list of effects. - -Effectively this effect transforms log entries into effects. - -Potential use cases could be when a command contains large binary data and you -don't want to keep this in memory but load it on demand when needed for a side-effect. - -This is an advanced feature and will only work as long as the command is still -in the log. If a `release_cursor` has been emitted with an index higher than this, -the command may no longer be in the log and the function will not be called. - -There is currently no facility for reading partial data from a snapshot. - -### Updating the Release Cursor (Snapshotting) - -The `{release_cursor, RaftIndex, MachineState}` -effect can be used to give Ra cluster members a hint to trigger a snapshot. -This effect, when emitted, is evaluated on all nodes and not just the leader. - -It is not guaranteed that a snapshot will be taken. A decision to take -a snapshot or to delay it is taken using a number of internal Ra state factors. -The goal is to minimise disk I/O activity when possible. - -### Checkpointing - -Checkpoints are nearly the same concept as snapshots. Snapshotting truncates -the log up to the snapshot's index, which might be undesirable for machines -which read from the log with the `{log, Indexes, Fun}` effect mentioned above. - -The `{checkpoint, RaftIndex, MachineState}` effect can be used as a hint to -trigger a checkpoint. Like snapshotting, this effect is evaluated on all nodes -and when a checkpoint is taken, the machine state is saved to disk and can be -used for recovery when the machine restarts. A checkpoint being written does -not trigger any log truncation though. - -The `{release_cursor, RaftIndex}` effect can then be used to promote any -existing checkpoint older than or equal to `RaftIndex` into a proper snapshot, -and any log entries older than the checkpoint's index are then truncated. - -These two effects are intended for machines that use the `{log, Indexes, Fun}` -effect and can substantially improve machine recovery time compared to -snapshotting alone, especially when the machine needs to keep old log entries -around for a long time. - -## State Machine Versioning - -It is eventually necessary to make changes to the state machine -code. Any changes to a state machine that would result in a different end state when -the state is re-calculated from the log of entries (as is done when restarting a ra server) -should be considered breaking. - -As Ra state machines need to be deterministic any changes to the logic inside the `apply/3` function - _needs to be enabled at the same index on all members of a Ra cluster_. - -### Versioning API - -Ra considers all state machines versioned starting with version 0. State machines -that need to be updated with breaking changes need to implement the optional -versioning parts of the `ra_machine` behaviour: - -``` erlang --type version() :: non_neg_integer(). - --callback version() -> pos_integer(). - --callback which_module(version()) -> module(). - -``` - -`version/0` returns the current version which is an integer that is -higher than any previously used version number. Whenever a breaking change is -made this should be incremented. - -`which_module/1` maps a version to the module implementing it. This allows -developers to optionally keep entire modules for old versions instead of trying -to handle multiple versions in the same module. - -E.g. when moving from version 0 of `my_machine` to version 1: - -1. Copy and rename the `my_machine` module to `my_machine_v0` - -2. Implement the breaking changes in the original module and bump the version. - -``` erlang -version() -> 1. - -which_module(1) -> my_machine; -which_module(0) -> my_machine_v0. - -``` - -This would ensure that any entries added to the log are applied against the active machine version -at the time they were added, leading to a deterministic outcome. - -For smaller (but still breaking) changes that can be handled in the original -module it is also possible to switch based on the `machine_version` key included in the meta -data passed to `apply/3`. - -### Runtime Behaviour - -New versions are enabled whenever there is a quorum of members with a higher version and -one of them is elected leader. The leader will commit the new version to the -log and each follower will move to the new version when this log entry is applied. -Followers that do not yet have the new version available will receive log entries from the leader -and update their logs but will not apply log entries. When they are upgraded and have -the new version, all outstanding log entries will be applied. In practical terms this means -that Ra nodes can be upgraded one by one. - -In order to be upgradeable, the state machine implementation will need to handle the version -bump in the form of a command that is passed to the `apply/3` callback: -`{machine_version, OldVersion, NewVersion}`. This provides an -opportunity to transform the state data into a new form, if needed. Note that the version -bump may be for several versions so it may be necessary to handle multiple -state transformations. +See [State Machine Tutorial](docs/internals/STATE_MACHINE_TUTORIAL.md) for +further information on state machines and the effects available ### Limitations @@ -291,13 +121,15 @@ Ra does not support the Erlang hot code swapping mechanism. There are two approaches to forming a cluster: * All cluster members can be known ahead of time - * All cluster members can be joining existing members dynamically (this implies that one "seed" member is chosen and started first) + * All cluster members can be joining existing members dynamically + (this implies that one "seed" member is chosen and started first) ### Fixed Set of Members Known on Startup Use `ra:start_or_restart_cluster/3` on one of the nodes to set up a cluster. This will either create a new cluster or restart an existing one. -As cluster membership is persisted in Ra logs, newly added nodes will be discovered from the log. +As cluster membership is persisted in Ra logs, newly added nodes will be +discovered from the log. ### Dynamically Joining Nodes @@ -325,11 +157,13 @@ The cluster name is mostly a "human-friendly" name for a Ra cluster. Something that identifies the entity the cluster is meant to represent. The cluster name isn't strictly part of a cluster's identity. -For example, in RabbitMQ's quorum queues case cluster names are derived from queue's identity. +For example, in RabbitMQ's quorum queues case cluster names are derived +from queue's identity. ### Server ID -A Ra server is a Ra cluster member. Server ID is defined as a pair of `{atom(), node()}`. +A Ra server is a Ra cluster member. Server ID is defined as a pair +of `{atom(), node()}`. Server ID combines a locally registered name and the Erlang node it resides on. Since server IDs identify Ra cluster members, they need to be a diff --git a/docs/internals/STATE_MACHINE_TUTORIAL.md b/docs/internals/STATE_MACHINE_TUTORIAL.md index 08bed484..cb9313f8 100644 --- a/docs/internals/STATE_MACHINE_TUTORIAL.md +++ b/docs/internals/STATE_MACHINE_TUTORIAL.md @@ -178,8 +178,8 @@ Effects should be a list sorted by execution order, i.e. the effect to be action first should be at the head of the list. Only the leader that first applies an entry will attempt the effect. -Followers process the same set of commands but simply throw away any effects returned by -the state machine. +Followers process the same set of commands but simply throw away any effects +returned by the state machine unless specific effect provide the `local` option. ### Send a message @@ -193,6 +193,19 @@ To ensure message reliability normal [Automatic Repeat Query (ARQ)](https://en.w like protocols between the state machine and the receiver should be implemented if needed. +The `{send_msg, pid(), Msg :: term(), Options :: list()}` effect can be used to further +control how the effect is executed via options. The options are: + +* `ra_event`: the message will be wrapped in a `ra_event` structure like + `{ra_event, LeaderId, Msg}`. The `ra_event` format can be changed by providing + a `ra_event_formatter` server configuration. +* `cast`: the message will be wrappen in a standard `{$gen_cast, Msg}` wrapper + to conform to OTP `gen` conventions. +* `local`: if the target `pid()` is local to any member of the Ra cluster the delivery +of the message will be done from the local member even if this is a follower. +This way a network hop can be avoided. If the target `pid()` is on a node +without a Ra member it will be sent from the leader. + ### Monitors Use `{monitor, process | node, pid() | node()}` to ask the `ra` leader to @@ -204,25 +217,206 @@ it will commit a `{nodeup | nodedown, node()}` command. Use `{demonitor, process | node, pid() | node()}` to stop monitoring a process or a node. +All monitors are invalidated when the leader changes. State machines should +re-issue monitor effects when becoming leader using the `state_enter/2` +callback. + ### Call a function Use the `{mod_call, module(), function(), Args :: [term()]}` to call an arbitrary function. Care need to be taken not to block the `ra` process whilst doing so. It is recommended that expensive operations are done in another process. -The `mod_call` effect is useful for e.g. updating an ets table of committed entries +The `mod_call` effect is useful for e.g. updating an ETS table of committed entries or similar. -### Update the release cursor (Snapshotting) +### Setting a timer + +The `{timer, Name :: term(), Time :: non_neg_integer() | infinity}` effects asks the Ra leader +to maintain a timer on behalf of the state machine and commit a `timeout` command +when the timer triggers. If setting the time to `infinity`, the timer will not be started +and any running timer with same name will be cancelled. + +The timer is relative and setting another timer with the same name before the current +timer runs out results in the current timer being reset. + +All timers are invalidated when the leader changes. State machines should +re-issue timer effects when becoming leader using the `state_enter/2` +callback. + +### Reading a log + +Use `{log, Indexes :: [ra_index()], fun(([user_command()]) -> effects()}` to read +commands from the log from the specified indexes and return a list of effects. + +Effectively this effect transforms log entries into effects. + +Potential use cases could be when a command contains large binary data and you +don't want to keep this in memory but load it on demand when needed for a side-effect. + +This is an advanced feature and will only work as long as the command is still +in the log. If a `release_cursor` has been emitted with an index higher than this, +the command may no longer be in the log and the function will not be called. + +There is currently no facility for reading partial data from a snapshot. + +### Updating the Release Cursor (Snapshotting) + +The `{release_cursor, RaftIndex, MachineState}` +effect can be used to give Ra cluster members a hint to trigger a snapshot. +This effect, when emitted, is evaluated on all nodes and not just the leader. + +It is not guaranteed that a snapshot will be taken. A decision to take +a snapshot or to delay it is taken using a number of internal Ra state factors. +The goal is to minimise disk I/O activity when possible. + +### Checkpointing + +Checkpoints are nearly the same concept as snapshots. Snapshotting truncates +the log up to the snapshot's index, which might be undesirable for machines +which read from the log with the `{log, Indexes, Fun}` effect mentioned above. + +The `{checkpoint, RaftIndex, MachineState}` effect can be used as a hint to +trigger a checkpoint. Like snapshotting, this effect is evaluated on all nodes +and when a checkpoint is taken, the machine state is saved to disk and can be +used for recovery when the machine restarts. A checkpoint being written does +not trigger any log truncation though. + +The `{release_cursor, RaftIndex}` effect can then be used to promote any +existing checkpoint older than or equal to `RaftIndex` into a proper snapshot, +and any log entries older than the checkpoint's index are then truncated. + +These two effects are intended for machines that use the `{log, Indexes, Fun}` +effect and can substantially improve machine recovery time compared to +snapshotting alone, especially when the machine needs to keep old log entries +around for a long time. + +## Optional `ra_machine` callbacks + +Apart from the mandatory callbacks Ra supports some optional callbacks + +* `state_enter(ra_server:ra_state() | eol, state()) -> effects().` + + This callback allows the ra machine impl to react to state changes in the ra + server. The most common use of this callback is to re-issue monitor effects + when entering the leader state as these are transient and when the leader + changes they need to be issued again. This callback is called on all members + in a cluster whenever they change their local state. +* `tick(TimeMs :: milliseconds(), state()) -> effects().` + + This callback is called periodically the interval of which is controlled + by the `tick_interval` server configuration. This callback can be used to + trigger periodic actions. +* `init_aux/1` +* `handle_aux/5` + + ``` + -callback init_aux(Name :: atom()) -> term(). -To (potentially) trigger a snapshot return the `{release_cursor, RaftIndex, MachineState}` -effect. This is why the raft index is included in the `apply/3` function. Ra will -only create a snapshot if doing so will result in log segments being deleted. + -callback handle_aux(ra_server:ra_state(), + {call, From :: from()} | cast, + Command :: term(), + AuxState, + State) -> + {reply, Reply :: term(), AuxState, State} | + {reply, Reply :: term(), AuxState, State, effects()} | + {no_reply, AuxState, State} | + {no_reply, AuxState, State, effects()} + when AuxState :: term(), + State :: ra_aux:state(). + ``` + + These two callbacks allow each server to maintain a local state machine + in addition to the replicated and consistent state machine. They can be + interacted with two ways: + 1. Using the `ra:aux_*` APIs to cast or call into the aux machine. + 2. By emitting `aux` effects from the state machine. + + Aux machines allow implementors a lot of flexibility that isn't otherwise + offered by the more restrictive and deterministic state machine itself. + + The `ra_aux` module can be used to access internal Ra server state information + such as the current members, the state machine state itself or even reading + from the log. + +* `overview(state()) -> map()` + + Use this to return an overvciew map the state machine which is returned + when using e.g. `sys:get_status/1` +* `version/0`, `which_module/1` + + See the next section on State Machine Versioning + + +## State Machine Versioning + +It is eventually necessary to make changes to the state machine +code. Any changes to a state machine that would result in a different end state +when the state is re-calculated from the log of entries (as is done when +restarting a ra server) should be considered breaking. + +As Ra state machines need to be deterministic any changes to the logic inside +the `apply/3` function + _needs to be enabled at the same index on all members of a Ra cluster_. + +### Versioning API + +Ra considers all state machines versioned starting with version 0. State machines +that need to be updated with breaking changes need to implement the optional +versioning parts of the `ra_machine` behaviour: + +``` erlang +-type version() :: non_neg_integer(). + +-callback version() -> pos_integer(). + +-callback which_module(version()) -> module(). + +``` + +`version/0` returns the current version which is an integer that is +higher than any previously used version number. Whenever a breaking change is +made this should be incremented. + +`which_module/1` maps a version to the module implementing it. This allows +developers to optionally keep entire modules for old versions instead of trying +to handle multiple versions in the same module. + +E.g. when moving from version 0 of `my_machine` to version 1: + +1. Copy and rename the `my_machine` module to `my_machine_v0` + +2. Implement the breaking changes in the original module and bump the version. + +``` erlang +version() -> 1. + +which_module(1) -> my_machine; +which_module(0) -> my_machine_v0. + +``` -For machines that must keep log segments on disk for some time, the -`{checkpoint, RaftIndex, MachineState}` effect can be used. This creates a -snapshot-like view of the machine state on disk but doesn't trigger log -truncation. Checkpoints can later be promoted to snapshots and trigger log -truncation by emitting a `{release_cursor, RaftIndex}` effect. The most -recent checkpoint with an index smaller than or equal to `RaftIndex` will be -promoted. +This would ensure that any entries added to the log are applied against the active machine version +at the time they were added, leading to a deterministic outcome. + +For smaller (but still breaking) changes that can be handled in the original +module it is also possible to switch based on the `machine_version` key included in the meta +data passed to `apply/3`. + +### Runtime Behaviour + +New versions are enabled whenever there is a quorum of members with a higher +version and one of them is elected leader. The leader will commit the new version +to the log and each follower will move to the new version when this log entry +is applied. Followers that do not yet have the new version available will +receive log entries from the leader and update their logs but will not apply +log entries. When they are upgraded and have the new version, all outstanding +log entries will be applied. In practical terms this means that Ra nodes can be +upgraded one by one. + +In order to be upgradeable, the state machine implementation will need to handle +the version bump in the form of a command that is passed to the `apply/3` callback: +`{machine_version, OldVersion, NewVersion}`. This provides an +opportunity to transform the state data into a new form, if needed. Note that the version +bump may be for several versions so it may be necessary to handle multiple +state transformations. diff --git a/src/ra_aux.erl b/src/ra_aux.erl index 023ab5c5..115aeb84 100644 --- a/src/ra_aux.erl +++ b/src/ra_aux.erl @@ -52,7 +52,7 @@ log_fetch(Idx, #{log := Log0} = State) {{Idx, Term, {'$usr', Meta, Cmd, _ReplyMode}}, Log} -> {{Term, Meta, Cmd}, State#{log => Log}}; {_, Log} -> - %% we only allow usr commands to be read + %% we only allow user commands to be read {undefined, State#{log => Log}} end. diff --git a/src/ra_machine.erl b/src/ra_machine.erl index a2443163..68178a51 100644 --- a/src/ra_machine.erl +++ b/src/ra_machine.erl @@ -248,11 +248,9 @@ AuxState, State) -> {reply, Reply :: term(), AuxState, State} | - {reply, Reply :: term(), AuxState, State, - [{monitor, process, aux, pid()}]} | + {reply, Reply :: term(), AuxState, State, effects()} | {no_reply, AuxState, State} | - {no_reply, AuxState, State, - [{monitor, process, aux, pid()}]} + {no_reply, AuxState, State, effects()} when AuxState :: term(), State :: ra_aux:state(). From 3e55ffcf3fdf58624f7ac4d077d03e319b7161f3 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Tue, 13 Feb 2024 14:57:01 +0000 Subject: [PATCH 3/5] Test reliability fixes --- test/coordination_SUITE.erl | 16 +++++++--------- test/ra_2_SUITE.erl | 11 ++++++----- test/ra_SUITE.erl | 21 ++++++++++++++------- test/ra_log_wal_SUITE.erl | 1 + 4 files changed, 28 insertions(+), 21 deletions(-) diff --git a/test/coordination_SUITE.erl b/test/coordination_SUITE.erl index 557233ec..aa88d262 100644 --- a/test/coordination_SUITE.erl +++ b/test/coordination_SUITE.erl @@ -319,7 +319,7 @@ grow_cluster(Config) -> {ok, _, L1} = ra:members(A), [A, B, C] = rpc:call(ANode, ra_leaderboard, lookup_members, [ClusterName]), L1 = rpc:call(ANode, ra_leaderboard, lookup_leader, [ClusterName]), - %% TODO: handle race conditions + await_condition( fun () -> [A, B, C] == rpc:call(BNode, ra_leaderboard, lookup_members, [ClusterName]) andalso @@ -332,8 +332,12 @@ grow_cluster(Config) -> end, 20), ok = ra:leave_and_delete_server(?SYS, A, A), - {ok, _, _} = ra:process_command(B, banana), - {ok, _, L2} = ra:members(B), + %% wait for B to process the cluster change + await_condition( + fun () -> + [B, C] == rpc:call(CNode, ra_leaderboard, lookup_members, [ClusterName]) + end, 20), + {ok, _, L2} = ra:process_command(B, banana), %% check members [B, C] = rpc:call(BNode, ra_leaderboard, lookup_members, [ClusterName]), @@ -623,12 +627,6 @@ key_metrics(Config) -> CI > 0, StoppedMetrics), ok = ra:restart_server(?SYS, TestId), - await_condition( - fun () -> - Metrics = ra:key_metrics(TestId), - ct:pal("RecoverMetrics ~p", [Metrics]), - recover == maps:get(state, Metrics) - end, 200), {ok, _, _} = ra:process_command(Leader, {data, Data}), await_condition( fun () -> diff --git a/test/ra_2_SUITE.erl b/test/ra_2_SUITE.erl index 3ad77d01..817dd292 100644 --- a/test/ra_2_SUITE.erl +++ b/test/ra_2_SUITE.erl @@ -299,9 +299,9 @@ cluster_is_deleted_with_server_down(Config) -> % start node again ra:restart_server(?SYS, ServerId3), % validate all nodes have been shut down and terminated - ok = validate_process_down(element(1, ServerId1), 10), - ok = validate_process_down(element(1, ServerId2), 10), - ok = validate_process_down(element(1, ServerId3), 10), + ok = validate_process_down(element(1, ServerId1), 50), + ok = validate_process_down(element(1, ServerId2), 50), + ok = validate_process_down(element(1, ServerId3), 50), % validate there are no data dirs anymore [ begin @@ -814,10 +814,11 @@ init(_) -> {State0, ok} end. -state_enter(eol, State) -> +state_enter(eol = S, State) -> + ct:pal("state_enter ~w ~w", [self(), S]), [{send_msg, P, eol, ra_event} || {P, _} <- queue:to_list(State), is_pid(P)]; state_enter(S, _) -> - ct:pal("state_enter ~w", [S]), + ct:pal("state_enter ~w ~w", [self(), S]), []. flush() -> diff --git a/test/ra_SUITE.erl b/test/ra_SUITE.erl index b3a71584..d6704050 100644 --- a/test/ra_SUITE.erl +++ b/test/ra_SUITE.erl @@ -551,17 +551,24 @@ members_info(Config) -> {ok, _, _} = ra:add_member(Leader, CSpec), {ok, 9, Leader} = ra:consistent_query(C, fun(S) -> S end), ?assertMatch({ok, - #{Follower := #{status := normal, query_index := QI, - next_index := NI, match_index := MI, + #{Follower := #{status := normal, + query_index := QI, + next_index := NI, + match_index := MI, commit_index_sent := MI, voter_status := #{membership := voter}}, - Leader := #{status := normal, query_index := QI, - next_index := NI, match_index := MI, + Leader := #{status := normal, + query_index := QI, + next_index := NI, + match_index := MI, voter_status := #{membership := voter}}, - C := #{status := normal, query_index := QI, - next_index := NI, match_index := 0, + C := #{status := normal, + query_index := _, + next_index := NI, + match_index := 0, commit_index_sent := MI, - voter_status := #{membership := promotable, target := MI}}}, + voter_status := #{membership := promotable, + target := MI}}}, Leader}, ra:members_info(Follower)), ?assertMatch({ok, #{A := #{}, diff --git a/test/ra_log_wal_SUITE.erl b/test/ra_log_wal_SUITE.erl index fc369976..0101537d 100644 --- a/test/ra_log_wal_SUITE.erl +++ b/test/ra_log_wal_SUITE.erl @@ -863,6 +863,7 @@ checksum_failure_in_middle_of_file_should_fail(Config) -> {error, wal_checksum_validation_failure} = ra_log_wal:start_link(Conf), empty_mailbox(), + meck:unload(), ok. empty_mailbox() -> From 2a36e582341bf0a21c8d983bac0220b834b04579 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Fri, 16 Feb 2024 14:04:13 +0000 Subject: [PATCH 4/5] Add ra_aux to semver supported modules. --- README.md | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index c71dcda7..c147751d 100644 --- a/README.md +++ b/README.md @@ -427,13 +427,15 @@ is available in a separate repository. ## Logging -Ra will use default OTP `logger` by default, unless `logger_module` configuration key is used to override. +Ra will use default OTP `logger` by default, unless `logger_module` +configuration key is used to override. To change log level to `debug` for all applications, use ``` erl logger:set_primary_config(level, debug). ``` + ## Ra versioning Ra attempts to follow [Semantic Versioning](https://semver.org/). @@ -441,6 +443,7 @@ Ra attempts to follow [Semantic Versioning](https://semver.org/). The modules that form part of the public API are: * `ra` * `ra_machine` (behaviour callbacks only) +* `ra_aux` * `ra_system` * `ra_counters` * `ra_leaderboard` @@ -449,7 +452,8 @@ The modules that form part of the public API are: ## Copyright and License -(c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +(c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to +Broadcom Inc. and/or its subsidiaries. Dual licensed under the Apache License Version 2.0 and Mozilla Public License Version 2.0. From 2320ccc67f8ad446d39733a224d9db1d51a3662b Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Fri, 16 Feb 2024 15:50:19 +0000 Subject: [PATCH 5/5] address feedback --- docs/internals/STATE_MACHINE_TUTORIAL.md | 12 ++++---- src/ra_aux.erl | 38 ++++++++++++------------ src/ra_log.erl | 1 + src/ra_machine.erl | 14 ++++----- src/ra_server.erl | 31 +++++++++++-------- test/ra_machine_int_SUITE.erl | 24 +++++++++++++++ 6 files changed, 75 insertions(+), 45 deletions(-) diff --git a/docs/internals/STATE_MACHINE_TUTORIAL.md b/docs/internals/STATE_MACHINE_TUTORIAL.md index cb9313f8..d64e2643 100644 --- a/docs/internals/STATE_MACHINE_TUTORIAL.md +++ b/docs/internals/STATE_MACHINE_TUTORIAL.md @@ -317,13 +317,13 @@ Apart from the mandatory callbacks Ra supports some optional callbacks {call, From :: from()} | cast, Command :: term(), AuxState, - State) -> - {reply, Reply :: term(), AuxState, State} | - {reply, Reply :: term(), AuxState, State, effects()} | - {no_reply, AuxState, State} | - {no_reply, AuxState, State, effects()} + IntState) -> + {reply, Reply :: term(), AuxState, IntState} | + {reply, Reply :: term(), AuxState, IntState, effects()} | + {no_reply, AuxState, IntState} | + {no_reply, AuxState, IntState, effects()} when AuxState :: term(), - State :: ra_aux:state(). + IntState :: ra_aux:internal_state(). ``` These two callbacks allow each server to maintain a local state machine diff --git a/src/ra_aux.erl b/src/ra_aux.erl index 115aeb84..7da3c6a8 100644 --- a/src/ra_aux.erl +++ b/src/ra_aux.erl @@ -17,46 +17,46 @@ -include("ra.hrl"). --opaque state() :: ra_server:state(). +-opaque internal_state() :: ra_server:state(). --export_type([state/0]). +-export_type([internal_state/0]). --spec machine_state(ra_aux:state()) -> term(). +-spec machine_state(ra_aux:internal_state()) -> term(). machine_state(State) -> maps:get(?FUNCTION_NAME, State). --spec leader_id(ra_aux:state()) -> undefined | ra_server_id(). +-spec leader_id(ra_aux:internal_state()) -> undefined | ra_server_id(). leader_id(State) -> maps:get(?FUNCTION_NAME, State). --spec members_info(ra_aux:state()) -> ra_cluster(). +-spec members_info(ra_aux:internal_state()) -> ra_cluster(). members_info(State) -> ra_server:state_query(?FUNCTION_NAME, State). --spec overview(ra_aux:state()) -> map(). +-spec overview(ra_aux:internal_state()) -> map(). overview(State) -> ra_server:state_query(?FUNCTION_NAME, State). --spec log_last_index_term(ra_aux:state()) -> ra_idxterm(). +-spec log_last_index_term(ra_aux:internal_state()) -> ra_idxterm(). log_last_index_term(#{log := Log}) -> - ra_log:last_index_term(Log). + ra_log:last_index_term(Log). --spec log_fetch(ra_index(), ra_aux:state()) -> +-spec log_fetch(ra_index(), ra_aux:internal_state()) -> {undefined | {ra_term(), CmdMetadata :: ra_server:command_meta(), - Command :: term()}, ra_aux:state()}. + Command :: term()}, ra_aux:internal_state()}. log_fetch(Idx, #{log := Log0} = State) when is_integer(Idx) -> - case ra_log:fetch(Idx, Log0) of - {{Idx, Term, {'$usr', Meta, Cmd, _ReplyMode}}, Log} -> - {{Term, Meta, Cmd}, State#{log => Log}}; - {_, Log} -> - %% we only allow user commands to be read - {undefined, State#{log => Log}} - end. - --spec log_stats(ra_aux:state()) -> ra_log:overview(). + case ra_log:fetch(Idx, Log0) of + {{Idx, Term, {'$usr', Meta, Cmd, _ReplyMode}}, Log} -> + {{Term, Meta, Cmd}, State#{log => Log}}; + {_, Log} -> + %% we only allow user commands to be read + {undefined, State#{log => Log}} + end. + +-spec log_stats(ra_aux:internal_state()) -> ra_log:overview(). log_stats(#{log := Log}) -> ra_log:overview(Log). diff --git a/src/ra_log.erl b/src/ra_log.erl index 001ef9f9..60658262 100644 --- a/src/ra_log.erl +++ b/src/ra_log.erl @@ -137,6 +137,7 @@ open_segments => non_neg_integer(), snapshot_index => undefined | ra_index(), cache_size => non_neg_integer(), + latest_checkpoint_index => undefined | ra_index(), atom() => term()}. -export_type([state/0, diff --git a/src/ra_machine.erl b/src/ra_machine.erl index 68178a51..5e48912b 100644 --- a/src/ra_machine.erl +++ b/src/ra_machine.erl @@ -240,19 +240,19 @@ -callback tick(TimeMs :: milliseconds(), state()) -> effects(). --callback init_aux(Name :: atom()) -> term(). +-callback init_aux(Name :: atom()) -> AuxState :: term(). -callback handle_aux(ra_server:ra_state(), {call, From :: from()} | cast, Command :: term(), AuxState, - State) -> - {reply, Reply :: term(), AuxState, State} | - {reply, Reply :: term(), AuxState, State, effects()} | - {no_reply, AuxState, State} | - {no_reply, AuxState, State, effects()} + IntState) -> + {reply, Reply :: term(), AuxState, IntState} | + {reply, Reply :: term(), AuxState, IntState, effects()} | + {no_reply, AuxState, IntState} | + {no_reply, AuxState, IntState, effects()} when AuxState :: term(), - State :: ra_aux:state(). + IntState :: ra_aux:internal_state(). -callback handle_aux(ra_server:ra_state(), {call, From :: from()} | cast, diff --git a/src/ra_server.erl b/src/ra_server.erl index 12d44431..f3be30dc 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -1484,10 +1484,16 @@ is_fully_replicated(#{commit_index := CI} = State) -> MinMI >= CI andalso MinCI >= CI end. -handle_aux(RaftState, _Type, _Cmd, +handle_aux(RaftState, Type, _Cmd, #{cfg := #cfg{effective_handle_aux_fun = undefined}} = State0) -> %% todo reply with error if Type is a call? - {RaftState, State0, []}; + Effects = case Type of + cast -> + []; + _From -> + [{reply, {error, aux_handler_not_implemented}}] + end, + {RaftState, State0, Effects}; handle_aux(RaftState, Type, Cmd, #{cfg := #cfg{effective_machine_module = MacMod, effective_handle_aux_fun = {handle_aux, 5}}, @@ -2266,17 +2272,16 @@ state_query(overview, State) -> state_query(machine, #{machine_state := MacState}) -> MacState; state_query(voters, #{cluster := Cluster}) -> - Vs = maps:fold(fun(K, V, Acc) -> - case maps:get(voter_status, V, undefined) of - undefined -> [K|Acc]; - S -> case maps:get(membership, S, undefined) of - undefined -> [K|Acc]; - voter -> [K|Acc]; - _ -> Acc - end - end - end, [], Cluster), - Vs; + maps:fold(fun(K, V, Acc) -> + case maps:get(voter_status, V, undefined) of + undefined -> [K|Acc]; + S -> case maps:get(membership, S, undefined) of + undefined -> [K|Acc]; + voter -> [K|Acc]; + _ -> Acc + end + end + end, [], Cluster); state_query(leader, State) -> maps:get(leader_id, State, undefined); state_query(members, #{cluster := Cluster}) -> diff --git a/test/ra_machine_int_SUITE.erl b/test/ra_machine_int_SUITE.erl index 1094f8de..18353393 100644 --- a/test/ra_machine_int_SUITE.erl +++ b/test/ra_machine_int_SUITE.erl @@ -46,6 +46,7 @@ all_tests() -> log_effect, aux_eval, aux_tick, + aux_handler_not_impl, aux_command, aux_command_v2, aux_command_v1_and_v2, @@ -591,6 +592,29 @@ log_effect(Config) -> end, ok. +aux_handler_not_impl(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId1 = ?config(server_id, Config), + Cluster = [ServerId1, + ?config(server_id2, Config), + ?config(server_id3, Config)], + Mod = ?config(modname, Config), + meck:new(Mod, [non_strict]), + meck:expect(Mod, init, fun (_) -> [] end), + meck:expect(Mod, init_aux, fun (_) -> undefined end), + meck:expect(Mod, apply, + fun (_, {monitor_me, Pid}, State) -> + {[Pid | State], ok, [{monitor, process, Pid}]}; + (_, Cmd, State) -> + ct:pal("handling ~p", [Cmd]), + %% handle all + {State, ok} + end), + ok = start_cluster(ClusterName, {module, Mod, #{}}, Cluster), + {ok, _, Leader} = ra:members(ServerId1), + {error, aux_handler_not_implemented} = ra:aux_command(Leader, emit), + ok. + aux_command(Config) -> ClusterName = ?config(cluster_name, Config), ServerId1 = ?config(server_id, Config),