From 985dfe4cd962bba7ae366d6c00252f91d42d7b37 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Mon, 29 Jan 2024 13:55:09 +0000 Subject: [PATCH] move state query to ra_server --- src/ra_server.erl | 78 +++++++++++++++++++++++++++- src/ra_server_proc.erl | 113 +++++++---------------------------------- 2 files changed, 95 insertions(+), 96 deletions(-) diff --git a/src/ra_server.erl b/src/ra_server.erl index 62e16d0a..dc57d016 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -56,7 +56,8 @@ log_fold/3, log_read/2, get_membership/1, - recover/1 + recover/1, + state_query/2 ]). -type ra_await_condition_fun() :: @@ -2207,6 +2208,81 @@ update_term(_, State) -> last_idx_term(#{log := Log}) -> ra_log:last_index_term(Log). + +state_query(all, State) -> State; +state_query(overview, State) -> + overview(State); +state_query(machine, #{machine_state := MacState}) -> + MacState; +state_query(voters, #{cluster := Cluster}) -> + Vs = maps:fold(fun(K, V, Acc) -> + case maps:get(voter_status, V, undefined) of + undefined -> [K|Acc]; + S -> case maps:get(membership, S, undefined) of + undefined -> [K|Acc]; + voter -> [K|Acc]; + _ -> Acc + end + end + end, [], Cluster), + Vs; +state_query(members, #{cluster := Cluster}) -> + maps:keys(Cluster); +state_query(members_info, #{cfg := #cfg{id = Self}, cluster := Cluster, + leader_id := Self, query_index := QI, commit_index := CI, + membership := Membership}) -> + maps:map(fun(Id, Peer) -> + case {Id, Peer} of + {Self, Peer = #{voter_status := VoterStatus}} -> + %% For completeness sake, preserve `target` + %% of once promoted leader. + #{next_index => CI+1, + match_index => CI, + query_index => QI, + status => normal, + voter_status => VoterStatus#{membership => Membership}}; + {Self, _} -> + #{next_index => CI+1, + match_index => CI, + query_index => QI, + status => normal, + voter_status => #{membership => Membership}}; + {_, Peer = #{voter_status := _}} -> + Peer; + {_, Peer} -> + %% Initial cluster members have no voter_status. + Peer#{voter_status => #{membership => voter}} + end + end, Cluster); +state_query(members_info, #{cfg := #cfg{id = Self}, cluster := Cluster, + query_index := QI, commit_index := CI, + membership := Membership}) -> + %% Followers do not have sufficient information, + %% bail out and send whatever we have. + maps:map(fun(Id, Peer) -> + case {Id, Peer} of + {Self, #{voter_status := VS}} -> + #{match_index => CI, + query_index => QI, + voter_status => VS#{membership => Membership}}; + {Self, _} -> + #{match_index => CI, + query_index => QI, + voter_status => #{membership => Membership}}; + _ -> + #{} + end + end, Cluster); +state_query(initial_members, #{log := Log}) -> + case ra_log:read_config(Log) of + {ok, #{initial_members := InitialMembers}} -> + InitialMembers; + _ -> + error + end; +state_query(Query, _State) -> + {error, {unknown_query, Query}}. + %% ยง 5.4.1 Raft determines which of two logs is more up-to-date by comparing %% the index and term of the last entries in the logs. If the logs have last %% entries with different terms, then the log with the later term is more diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index 2f5189b8..872c161d 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -378,11 +378,11 @@ recovered(internal, next, #state{server_state = ServerState} = State) -> leader(enter, OldState, State0) -> {State, Actions} = handle_enter(?FUNCTION_NAME, OldState, State0), - ok = record_leader_change(id(State0), State0), + ok = record_leader_change(id(State), State), %% TODO: reset refs? {keep_state, State#state{leader_last_seen = undefined, pending_notifys = #{}, - low_priority_commands = ra_ets_queue:reset(State0#state.low_priority_commands), + low_priority_commands = ra_ets_queue:reset(State#state.low_priority_commands), election_timeout_set = false}, Actions}; leader(EventType, {leader_call, Msg}, State) -> % no need to redirect @@ -476,9 +476,8 @@ leader({call, From}, {local_query, QueryFun}, server_state = ServerState} = State) -> Reply = perform_local_query(QueryFun, id(State), ServerState, Conf), {keep_state, State, [{reply, From, Reply}]}; -leader({call, From}, {state_query, Spec}, - #state{server_state = ServerState} = State) -> - Reply = {ok, do_state_query(Spec, ServerState), id(State)}, +leader({call, From}, {state_query, Spec}, State) -> + Reply = {ok, do_state_query(Spec, State), id(State)}, {keep_state, State, [{reply, From, Reply}]}; leader({call, From}, {consistent_query, QueryFun}, #state{conf = Conf, @@ -571,9 +570,8 @@ candidate({call, From}, {local_query, QueryFun}, #state{conf = Conf, server_state = ServerState} = State) -> Reply = perform_local_query(QueryFun, not_known, ServerState, Conf), {keep_state, State, [{reply, From, Reply}]}; -candidate({call, From}, {state_query, Spec}, - #state{server_state = ServerState} = State) -> - Reply = {ok, do_state_query(Spec, ServerState), id(State)}, +candidate({call, From}, {state_query, Spec}, State) -> + Reply = {ok, do_state_query(Spec, State), id(State)}, {keep_state, State, [{reply, From, Reply}]}; candidate({call, From}, ping, State) -> {keep_state, State, [{reply, From, {pong, candidate}}]}; @@ -624,9 +622,8 @@ pre_vote({call, From}, {local_query, QueryFun}, #state{conf = Conf, server_state = ServerState} = State) -> Reply = perform_local_query(QueryFun, not_known, ServerState, Conf), {keep_state, State, [{reply, From, Reply}]}; -pre_vote({call, From}, {state_query, Spec}, - #state{server_state = ServerState} = State) -> - Reply = {ok, do_state_query(Spec, ServerState), id(State)}, +pre_vote({call, From}, {state_query, Spec}, State) -> + Reply = {ok, do_state_query(Spec, State), id(State)}, {keep_state, State, [{reply, From, Reply}]}; pre_vote({call, From}, ping, State) -> {keep_state, State, [{reply, From, {pong, pre_vote}}]}; @@ -711,9 +708,8 @@ follower({call, From}, {local_query, QueryFun}, end, Reply = perform_local_query(QueryFun, Leader, ServerState, Conf), {keep_state, State, [{reply, From, Reply}]}; -follower({call, From}, {state_query, Spec}, - #state{server_state = ServerState} = State) -> - Reply = {ok, do_state_query(Spec, ServerState), id(State)}, +follower({call, From}, {state_query, Spec}, State) -> + Reply = {ok, do_state_query(Spec, State), id(State)}, {keep_state, State, [{reply, From, Reply}]}; follower(EventType, {aux_command, Cmd}, State0) -> {_, ServerState, Effects} = ra_server:handle_aux(?FUNCTION_NAME, EventType, Cmd, @@ -903,9 +899,8 @@ await_condition({call, From}, {local_query, QueryFun}, #state{conf = Conf, server_state = ServerState} = State) -> Reply = perform_local_query(QueryFun, follower, ServerState, Conf), {keep_state, State, [{reply, From, Reply}]}; -await_condition({call, From}, {state_query, Spec}, - #state{server_state = ServerState} = State) -> - Reply = {ok, do_state_query(Spec, ServerState), id(State)}, +await_condition({call, From}, {state_query, Spec}, State) -> + Reply = {ok, do_state_query(Spec, State), id(State)}, {keep_state, State, [{reply, From, Reply}]}; await_condition(EventType, {aux_command, Cmd}, State0) -> {_, ServerState, Effects} = ra_server:handle_aux(?FUNCTION_NAME, EventType, @@ -1538,79 +1533,8 @@ gen_statem_safe_call(ServerId, Msg, Timeout) -> {error, shutdown} end. -do_state_query(all, State) -> State; -do_state_query(overview, State) -> - ra_server:overview(State); -do_state_query(machine, #{machine_state := MacState}) -> - MacState; -do_state_query(voters, #{cluster := Cluster}) -> - Vs = maps:fold(fun(K, V, Acc) -> - case maps:get(voter_status, V, undefined) of - undefined -> [K|Acc]; - S -> case maps:get(membership, S, undefined) of - undefined -> [K|Acc]; - voter -> [K|Acc]; - _ -> Acc - end - end - end, [], Cluster), - Vs; -do_state_query(members, #{cluster := Cluster}) -> - maps:keys(Cluster); -do_state_query(members_info, #{cfg := #cfg{id = Self}, cluster := Cluster, - leader_id := Self, query_index := QI, commit_index := CI, - membership := Membership}) -> - maps:map(fun(Id, Peer) -> - case {Id, Peer} of - {Self, Peer = #{voter_status := VoterStatus}} -> - %% For completeness sake, preserve `target` - %% of once promoted leader. - #{next_index => CI+1, - match_index => CI, - query_index => QI, - status => normal, - voter_status => VoterStatus#{membership => Membership}}; - {Self, _} -> - #{next_index => CI+1, - match_index => CI, - query_index => QI, - status => normal, - voter_status => #{membership => Membership}}; - {_, Peer = #{voter_status := _}} -> - Peer; - {_, Peer} -> - %% Initial cluster members have no voter_status. - Peer#{voter_status => #{membership => voter}} - end - end, Cluster); -do_state_query(members_info, #{cfg := #cfg{id = Self}, cluster := Cluster, - query_index := QI, commit_index := CI, - membership := Membership}) -> - %% Followers do not have sufficient information, - %% bail out and send whatever we have. - maps:map(fun(Id, Peer) -> - case {Id, Peer} of - {Self, #{voter_status := VS}} -> - #{match_index => CI, - query_index => QI, - voter_status => VS#{membership => Membership}}; - {Self, _} -> - #{match_index => CI, - query_index => QI, - voter_status => #{membership => Membership}}; - _ -> - #{} - end - end, Cluster); -do_state_query(initial_members, #{log := Log}) -> - case ra_log:read_config(Log) of - {ok, #{initial_members := InitialMembers}} -> - InitialMembers; - _ -> - error - end; -do_state_query(Query, _State) -> - {error, {unknown_query, Query}}. +do_state_query(QueryName, #state{server_state = State}) -> + ra_server:state_query(QueryName, State). config_defaults(ServerId) -> #{broadcast_time => ?DEFAULT_BROADCAST_TIME, @@ -1816,7 +1740,7 @@ can_execute_locally(RaftState, TargetNode, leader when TargetNode =/= node() -> %% We need to evaluate whether to send the message. %% Only send if there isn't a local node for the target pid. - Members = do_state_query(voters, State#state.server_state), + Members = do_state_query(voters, State), not lists:any(fun ({_, N}) -> N == TargetNode end, Members); leader -> true; @@ -1828,7 +1752,7 @@ can_execute_on_member(_RaftState, Member, #state{server_state = #{cfg := #cfg{id = Member}}}) -> true; can_execute_on_member(leader, Member, State) -> - Members = do_state_query(members, State#state.server_state), + Members = do_state_query(members, State), not lists:member(Member, Members); can_execute_on_member(_RaftState, _Member, _State) -> false. @@ -1872,9 +1796,8 @@ handle_process_down(Pid, Info, RaftState, monitors = Monitors}), {keep_state, State, Actions}. -record_leader_change(Leader, #state{conf = #conf{cluster_name = ClusterName}, - server_state = ServerState}) -> - Members = do_state_query(members, ServerState), +record_leader_change(Leader, #state{conf = #conf{cluster_name = ClusterName}} = State) -> + Members = do_state_query(members, State), ok = ra_leaderboard:record(ClusterName, Leader, Members), ok.