Skip to content

Commit

Permalink
move state query to ra_server
Browse files Browse the repository at this point in the history
  • Loading branch information
kjnilsson committed Jan 29, 2024
1 parent 8a24cdd commit 985dfe4
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 96 deletions.
78 changes: 77 additions & 1 deletion src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@
log_fold/3,
log_read/2,
get_membership/1,
recover/1
recover/1,
state_query/2
]).

-type ra_await_condition_fun() ::
Expand Down Expand Up @@ -2207,6 +2208,81 @@ update_term(_, State) ->
last_idx_term(#{log := Log}) ->
ra_log:last_index_term(Log).


state_query(all, State) -> State;
state_query(overview, State) ->
overview(State);
state_query(machine, #{machine_state := MacState}) ->
MacState;
state_query(voters, #{cluster := Cluster}) ->
Vs = maps:fold(fun(K, V, Acc) ->
case maps:get(voter_status, V, undefined) of
undefined -> [K|Acc];
S -> case maps:get(membership, S, undefined) of
undefined -> [K|Acc];
voter -> [K|Acc];
_ -> Acc
end
end
end, [], Cluster),
Vs;
state_query(members, #{cluster := Cluster}) ->
maps:keys(Cluster);
state_query(members_info, #{cfg := #cfg{id = Self}, cluster := Cluster,
leader_id := Self, query_index := QI, commit_index := CI,
membership := Membership}) ->
maps:map(fun(Id, Peer) ->
case {Id, Peer} of
{Self, Peer = #{voter_status := VoterStatus}} ->
%% For completeness sake, preserve `target`
%% of once promoted leader.
#{next_index => CI+1,
match_index => CI,
query_index => QI,
status => normal,
voter_status => VoterStatus#{membership => Membership}};
{Self, _} ->
#{next_index => CI+1,
match_index => CI,
query_index => QI,
status => normal,
voter_status => #{membership => Membership}};
{_, Peer = #{voter_status := _}} ->
Peer;
{_, Peer} ->
%% Initial cluster members have no voter_status.
Peer#{voter_status => #{membership => voter}}
end
end, Cluster);
state_query(members_info, #{cfg := #cfg{id = Self}, cluster := Cluster,
query_index := QI, commit_index := CI,
membership := Membership}) ->
%% Followers do not have sufficient information,
%% bail out and send whatever we have.
maps:map(fun(Id, Peer) ->
case {Id, Peer} of
{Self, #{voter_status := VS}} ->
#{match_index => CI,
query_index => QI,
voter_status => VS#{membership => Membership}};
{Self, _} ->
#{match_index => CI,
query_index => QI,
voter_status => #{membership => Membership}};
_ ->
#{}
end
end, Cluster);
state_query(initial_members, #{log := Log}) ->
case ra_log:read_config(Log) of
{ok, #{initial_members := InitialMembers}} ->
InitialMembers;
_ ->
error
end;
state_query(Query, _State) ->
{error, {unknown_query, Query}}.

%% § 5.4.1 Raft determines which of two logs is more up-to-date by comparing
%% the index and term of the last entries in the logs. If the logs have last
%% entries with different terms, then the log with the later term is more
Expand Down
113 changes: 18 additions & 95 deletions src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -378,11 +378,11 @@ recovered(internal, next, #state{server_state = ServerState} = State) ->

leader(enter, OldState, State0) ->
{State, Actions} = handle_enter(?FUNCTION_NAME, OldState, State0),
ok = record_leader_change(id(State0), State0),
ok = record_leader_change(id(State), State),
%% TODO: reset refs?
{keep_state, State#state{leader_last_seen = undefined,
pending_notifys = #{},
low_priority_commands = ra_ets_queue:reset(State0#state.low_priority_commands),
low_priority_commands = ra_ets_queue:reset(State#state.low_priority_commands),
election_timeout_set = false}, Actions};
leader(EventType, {leader_call, Msg}, State) ->
% no need to redirect
Expand Down Expand Up @@ -476,9 +476,8 @@ leader({call, From}, {local_query, QueryFun},
server_state = ServerState} = State) ->
Reply = perform_local_query(QueryFun, id(State), ServerState, Conf),
{keep_state, State, [{reply, From, Reply}]};
leader({call, From}, {state_query, Spec},
#state{server_state = ServerState} = State) ->
Reply = {ok, do_state_query(Spec, ServerState), id(State)},
leader({call, From}, {state_query, Spec}, State) ->
Reply = {ok, do_state_query(Spec, State), id(State)},
{keep_state, State, [{reply, From, Reply}]};
leader({call, From}, {consistent_query, QueryFun},
#state{conf = Conf,
Expand Down Expand Up @@ -571,9 +570,8 @@ candidate({call, From}, {local_query, QueryFun},
#state{conf = Conf, server_state = ServerState} = State) ->
Reply = perform_local_query(QueryFun, not_known, ServerState, Conf),
{keep_state, State, [{reply, From, Reply}]};
candidate({call, From}, {state_query, Spec},
#state{server_state = ServerState} = State) ->
Reply = {ok, do_state_query(Spec, ServerState), id(State)},
candidate({call, From}, {state_query, Spec}, State) ->
Reply = {ok, do_state_query(Spec, State), id(State)},
{keep_state, State, [{reply, From, Reply}]};
candidate({call, From}, ping, State) ->
{keep_state, State, [{reply, From, {pong, candidate}}]};
Expand Down Expand Up @@ -624,9 +622,8 @@ pre_vote({call, From}, {local_query, QueryFun},
#state{conf = Conf, server_state = ServerState} = State) ->
Reply = perform_local_query(QueryFun, not_known, ServerState, Conf),
{keep_state, State, [{reply, From, Reply}]};
pre_vote({call, From}, {state_query, Spec},
#state{server_state = ServerState} = State) ->
Reply = {ok, do_state_query(Spec, ServerState), id(State)},
pre_vote({call, From}, {state_query, Spec}, State) ->
Reply = {ok, do_state_query(Spec, State), id(State)},
{keep_state, State, [{reply, From, Reply}]};
pre_vote({call, From}, ping, State) ->
{keep_state, State, [{reply, From, {pong, pre_vote}}]};
Expand Down Expand Up @@ -711,9 +708,8 @@ follower({call, From}, {local_query, QueryFun},
end,
Reply = perform_local_query(QueryFun, Leader, ServerState, Conf),
{keep_state, State, [{reply, From, Reply}]};
follower({call, From}, {state_query, Spec},
#state{server_state = ServerState} = State) ->
Reply = {ok, do_state_query(Spec, ServerState), id(State)},
follower({call, From}, {state_query, Spec}, State) ->
Reply = {ok, do_state_query(Spec, State), id(State)},
{keep_state, State, [{reply, From, Reply}]};
follower(EventType, {aux_command, Cmd}, State0) ->
{_, ServerState, Effects} = ra_server:handle_aux(?FUNCTION_NAME, EventType, Cmd,
Expand Down Expand Up @@ -903,9 +899,8 @@ await_condition({call, From}, {local_query, QueryFun},
#state{conf = Conf, server_state = ServerState} = State) ->
Reply = perform_local_query(QueryFun, follower, ServerState, Conf),
{keep_state, State, [{reply, From, Reply}]};
await_condition({call, From}, {state_query, Spec},
#state{server_state = ServerState} = State) ->
Reply = {ok, do_state_query(Spec, ServerState), id(State)},
await_condition({call, From}, {state_query, Spec}, State) ->
Reply = {ok, do_state_query(Spec, State), id(State)},
{keep_state, State, [{reply, From, Reply}]};
await_condition(EventType, {aux_command, Cmd}, State0) ->
{_, ServerState, Effects} = ra_server:handle_aux(?FUNCTION_NAME, EventType,
Expand Down Expand Up @@ -1538,79 +1533,8 @@ gen_statem_safe_call(ServerId, Msg, Timeout) ->
{error, shutdown}
end.

do_state_query(all, State) -> State;
do_state_query(overview, State) ->
ra_server:overview(State);
do_state_query(machine, #{machine_state := MacState}) ->
MacState;
do_state_query(voters, #{cluster := Cluster}) ->
Vs = maps:fold(fun(K, V, Acc) ->
case maps:get(voter_status, V, undefined) of
undefined -> [K|Acc];
S -> case maps:get(membership, S, undefined) of
undefined -> [K|Acc];
voter -> [K|Acc];
_ -> Acc
end
end
end, [], Cluster),
Vs;
do_state_query(members, #{cluster := Cluster}) ->
maps:keys(Cluster);
do_state_query(members_info, #{cfg := #cfg{id = Self}, cluster := Cluster,
leader_id := Self, query_index := QI, commit_index := CI,
membership := Membership}) ->
maps:map(fun(Id, Peer) ->
case {Id, Peer} of
{Self, Peer = #{voter_status := VoterStatus}} ->
%% For completeness sake, preserve `target`
%% of once promoted leader.
#{next_index => CI+1,
match_index => CI,
query_index => QI,
status => normal,
voter_status => VoterStatus#{membership => Membership}};
{Self, _} ->
#{next_index => CI+1,
match_index => CI,
query_index => QI,
status => normal,
voter_status => #{membership => Membership}};
{_, Peer = #{voter_status := _}} ->
Peer;
{_, Peer} ->
%% Initial cluster members have no voter_status.
Peer#{voter_status => #{membership => voter}}
end
end, Cluster);
do_state_query(members_info, #{cfg := #cfg{id = Self}, cluster := Cluster,
query_index := QI, commit_index := CI,
membership := Membership}) ->
%% Followers do not have sufficient information,
%% bail out and send whatever we have.
maps:map(fun(Id, Peer) ->
case {Id, Peer} of
{Self, #{voter_status := VS}} ->
#{match_index => CI,
query_index => QI,
voter_status => VS#{membership => Membership}};
{Self, _} ->
#{match_index => CI,
query_index => QI,
voter_status => #{membership => Membership}};
_ ->
#{}
end
end, Cluster);
do_state_query(initial_members, #{log := Log}) ->
case ra_log:read_config(Log) of
{ok, #{initial_members := InitialMembers}} ->
InitialMembers;
_ ->
error
end;
do_state_query(Query, _State) ->
{error, {unknown_query, Query}}.
do_state_query(QueryName, #state{server_state = State}) ->
ra_server:state_query(QueryName, State).

config_defaults(ServerId) ->
#{broadcast_time => ?DEFAULT_BROADCAST_TIME,
Expand Down Expand Up @@ -1816,7 +1740,7 @@ can_execute_locally(RaftState, TargetNode,
leader when TargetNode =/= node() ->
%% We need to evaluate whether to send the message.
%% Only send if there isn't a local node for the target pid.
Members = do_state_query(voters, State#state.server_state),
Members = do_state_query(voters, State),
not lists:any(fun ({_, N}) -> N == TargetNode end, Members);
leader ->
true;
Expand All @@ -1828,7 +1752,7 @@ can_execute_on_member(_RaftState, Member,
#state{server_state = #{cfg := #cfg{id = Member}}}) ->
true;
can_execute_on_member(leader, Member, State) ->
Members = do_state_query(members, State#state.server_state),
Members = do_state_query(members, State),
not lists:member(Member, Members);
can_execute_on_member(_RaftState, _Member, _State) ->
false.
Expand Down Expand Up @@ -1872,9 +1796,8 @@ handle_process_down(Pid, Info, RaftState,
monitors = Monitors}),
{keep_state, State, Actions}.

record_leader_change(Leader, #state{conf = #conf{cluster_name = ClusterName},
server_state = ServerState}) ->
Members = do_state_query(members, ServerState),
record_leader_change(Leader, #state{conf = #conf{cluster_name = ClusterName}} = State) ->
Members = do_state_query(members, State),
ok = ra_leaderboard:record(ClusterName, Leader, Members),
ok.

Expand Down

0 comments on commit 985dfe4

Please sign in to comment.