Skip to content

Commit

Permalink
Merge pull request #454 from rabbitmq/checkpoint-tweaks
Browse files Browse the repository at this point in the history
Checkpoint improvements
  • Loading branch information
kjnilsson authored Jul 11, 2024
2 parents a5d8e3b + 47984cc commit 60782c3
Show file tree
Hide file tree
Showing 13 changed files with 140 additions and 90 deletions.
5 changes: 4 additions & 1 deletion src/ra.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@
"Total number of checkpoints written"},
{checkpoint_bytes_written, ?C_RA_LOG_CHECKPOINT_BYTES_WRITTEN, counter,
"Number of checkpoint bytes written"},
{checkpoints_promoted, ?C_RA_LOG_CHECKPOINTS_PROMOTED, counter,
"Number of checkpoints promoted to snapshots"},
{reserved_1, ?C_RA_LOG_RESERVED, counter, "Reserved counter"}
]).
-define(C_RA_LOG_WRITE_OPS, 1).
Expand All @@ -277,7 +279,8 @@
-define(C_RA_LOG_OPEN_SEGMENTS, 12).
-define(C_RA_LOG_CHECKPOINTS_WRITTEN, 13).
-define(C_RA_LOG_CHECKPOINT_BYTES_WRITTEN, 14).
-define(C_RA_LOG_RESERVED, 15).
-define(C_RA_LOG_CHECKPOINTS_PROMOTED, 15).
-define(C_RA_LOG_RESERVED, 16).

-define(C_RA_SRV_AER_RECEIVED_FOLLOWER, ?C_RA_LOG_RESERVED + 1).
-define(C_RA_SRV_AER_REPLIES_SUCCESS, ?C_RA_LOG_RESERVED + 2).
Expand Down
5 changes: 5 additions & 0 deletions src/ra_aux.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
-export([
machine_state/1,
leader_id/1,
last_applied/1,
members_info/1,
overview/1,
log_last_index_term/1,
Expand All @@ -29,6 +30,10 @@ machine_state(State) ->
leader_id(State) ->
maps:get(?FUNCTION_NAME, State).

-spec last_applied(ra_aux:internal_state()) -> ra_index().
last_applied(State) ->
maps:get(?FUNCTION_NAME, State).

-spec members_info(ra_aux:internal_state()) -> ra_cluster().
members_info(State) ->
ra_server:state_query(?FUNCTION_NAME, State).
Expand Down
8 changes: 4 additions & 4 deletions src/ra_lib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -321,12 +321,12 @@ retry(Func, Attempt, Sleep) ->
end.

-spec write_file(file:name_all(), iodata()) ->
ok | file_err().
ok | {error, file_err()}.
write_file(Name, IOData) ->
write_file(Name, IOData, true).

-spec write_file(file:name_all(), iodata(), Sync :: boolean()) ->
ok | file_err().
ok | {error, file_err()}.
write_file(Name, IOData, Sync) ->
case file:open(Name, [binary, write, raw]) of
{ok, Fd} ->
Expand All @@ -347,7 +347,7 @@ write_file(Name, IOData, Sync) ->
end.

-spec sync_file(file:name_all()) ->
ok | file_err().
ok | {error, file_err()}.
sync_file(Name) ->
case file:open(Name, [binary, read, write, raw]) of
{ok, Fd} ->
Expand All @@ -357,7 +357,7 @@ sync_file(Name) ->
end.

-spec sync_and_close_fd(file:fd()) ->
ok | file_err().
ok | {error, file_err()}.
sync_and_close_fd(Fd) ->
case ra_file:sync(Fd) of
ok ->
Expand Down
22 changes: 18 additions & 4 deletions src/ra_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@
max_open_segments => non_neg_integer(),
snapshot_module => module(),
counter => counters:counters_ref(),
initial_access_pattern => sequential | random}.
initial_access_pattern => sequential | random,
max_checkpoints => non_neg_integer()}.


-type overview() ::
Expand Down Expand Up @@ -199,6 +200,7 @@ init(#{uid := UId,
undefined -> {-1, -1};
Curr -> Curr
end,

AccessPattern = maps:get(initial_access_pattern, Conf, random),
Reader0 = ra_log_reader:init(UId, Dir, 0, MaxOpen, AccessPattern, [],
Names, Counter),
Expand Down Expand Up @@ -244,6 +246,13 @@ init(#{uid := UId,
LastIdx = State000#?MODULE.last_index,
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, LastIdx),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, LastIdx),
case ra_snapshot:latest_checkpoint(SnapshotState) of
undefined ->
ok;
{ChIdx, _ChTerm} ->
put_counter(Cfg, ?C_RA_SVR_METRIC_CHECKPOINT_INDEX, ChIdx)
end,

% recover the last term
{LastTerm0, State00} = case LastIdx of
SnapIdx ->
Expand Down Expand Up @@ -741,9 +750,14 @@ promote_checkpoint(Idx, #?MODULE{cfg = Cfg,
%% checkpoint.
{State, []};
_ ->
ok = incr_counter(Cfg, ?C_RA_LOG_SNAPSHOTS_WRITTEN, 1),
{SnapState, Effects} = ra_snapshot:promote_checkpoint(Idx,
SnapState0),
{WasPromoted, SnapState, Effects} =
ra_snapshot:promote_checkpoint(Idx, SnapState0),
if WasPromoted ->
ok = incr_counter(Cfg, ?C_RA_LOG_CHECKPOINTS_PROMOTED, 1);
true ->
ok
end,

{State#?MODULE{snapshot_state = SnapState}, Effects}
end.

Expand Down
16 changes: 11 additions & 5 deletions src/ra_log_snapshot.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ prepare(_Index, State) -> State.
%% @end

-spec write(file:filename(), meta(), term(), Sync :: boolean()) ->
ok | {error, file_err()}.
{ok, non_neg_integer()} | {error, file_err()}.
write(Dir, Meta, MacState, Sync) ->
%% no compression on meta data to make sure reading it is as fast
%% as possible
Expand All @@ -53,10 +53,16 @@ write(Dir, Meta, MacState, Sync) ->
Data = [<<(size(MetaBin)):32/unsigned>>, MetaBin | IOVec],
Checksum = erlang:crc32(Data),
File = filename(Dir),
ra_lib:write_file(File, [<<?MAGIC,
?VERSION:8/unsigned,
Checksum:32/integer>>,
Data], Sync).
Bytes = 9 + iolist_size(Data),
case ra_lib:write_file(File, [<<?MAGIC,
?VERSION:8/unsigned,
Checksum:32/integer>>,
Data], Sync) of
ok ->
{ok, Bytes};
Err ->
Err
end.

-spec sync(file:filename()) ->
ok | {error, file_err()}.
Expand Down
3 changes: 3 additions & 0 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1384,6 +1384,9 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
log => Log,
commit_index => SnapIndex,
last_applied => SnapIndex,
%% this may not be the actual cluster index
cluster_index_term => {SnapIndex,
SnapTerm},
cluster => make_cluster(Id, ClusterIds),
membership => get_membership(ClusterIds, State0),
machine_state => MacState}),
Expand Down
9 changes: 8 additions & 1 deletion src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1704,12 +1704,19 @@ do_state_query(QueryName, #state{server_state = State}) ->
ra_server:state_query(QueryName, State).

config_defaults(ServerId) ->
Counter = case ra_counters:fetch(ServerId) of
undefined ->
ra_counters:new(ServerId,
{persistent_term, ?FIELDSPEC_KEY});
C ->
C
end,
#{broadcast_time => ?DEFAULT_BROADCAST_TIME,
tick_timeout => ?TICK_INTERVAL_MS,
install_snap_rpc_timeout => ?INSTALL_SNAP_RPC_TIMEOUT,
await_condition_timeout => ?DEFAULT_AWAIT_CONDITION_TIMEOUT,
initial_members => [],
counter => ra_counters:new(ServerId, {persistent_term, ?FIELDSPEC_KEY}),
counter => Counter,
system_config => ra_system:default_config()
}.

Expand Down
1 change: 1 addition & 0 deletions src/ra_server_sup_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ delete_server_rpc(System, RaName) ->
catch ets:delete(ra_state, RaName),
catch ets:delete(ra_open_file_metrics, Pid),
catch ra_counters:delete({RaName, node()}),
catch ra_leaderboard:clear(RaName),
ok
end.

Expand Down
114 changes: 44 additions & 70 deletions src/ra_snapshot.erl
Original file line number Diff line number Diff line change
Expand Up @@ -347,9 +347,10 @@ begin_snapshot(#{index := Idx, term := Term} = Meta, MacRef, SnapKind,
[{monitor, process, snapshot_writer, Pid}]}.

-spec promote_checkpoint(Idx :: ra_index(), State0 :: state()) ->
{State :: state(), Effects :: [effect()]}.
{boolean(), State :: state(), Effects :: [effect()]}.
promote_checkpoint(PromotionIdx,
#?MODULE{module = Mod,
#?MODULE{uid = UId,
module = Mod,
snapshot_directory = SnapDir,
checkpoint_directory = CheckpointDir,
checkpoints = Checkpoints0} = State0) ->
Expand All @@ -367,16 +368,17 @@ promote_checkpoint(PromotionIdx,
%% sync the checkpoint before promoting it
%% into a snapshot.
ok = Mod:sync(Checkpoint),
ok = file:rename(Checkpoint, Snapshot),
ok = prim_file:rename(Checkpoint, Snapshot),
true = ets:insert(?ETSTBL, {UId, Idx}),
Self ! {ra_log_event,
{snapshot_written,
{Idx, Term}, snapshot}}
end),
State = State0#?MODULE{pending = {Pid, {Idx, Term}, snapshot},
checkpoints = Checkpoints},
{State, [{monitor, process, snapshot_writer, Pid}]};
{true, State, [{monitor, process, snapshot_writer, Pid}]};
undefined ->
{State0, []}
{false, State0, []}
end.

%% Find the first checkpoint smaller than or equal to the promotion index and
Expand Down Expand Up @@ -580,22 +582,49 @@ take_older_checkpoints(Idx, #?MODULE{checkpoints = Checkpoints0} = State0) ->

-spec take_extra_checkpoints(state()) ->
{state(), [checkpoint()]}.
take_extra_checkpoints(State0) ->
take_extra_checkpoints(State0, []).


take_extra_checkpoints(#?MODULE{checkpoints = Checkpoints0,
max_checkpoints = MaxCheckpoints} = State0) ->
max_checkpoints = MaxCheckpoints} = State0,
Checks) ->
Len = erlang:length(Checkpoints0),
case Len - MaxCheckpoints of
ToDelete when ToDelete > 0 ->
%% Take `ToDelete' checkpoints from the list randomly without
%% ever taking the first or last checkpoint.
IdxsToTake = random_idxs_to_take(MaxCheckpoints, ToDelete),
{Checkpoints, Extras} = lists_take_idxs(Checkpoints0, IdxsToTake),
{State0#?MODULE{checkpoints = Checkpoints}, Extras};
_ ->
{State0, []}
case Len > MaxCheckpoints of
true ->
%% when the number of checkpoints grow we increase the difference
%% between checkpoints in order to keep the total count kept on disk
%% down but keep some upper limit (~500k) to avoid huge differences
Mult = min(8, Len div MaxCheckpoints),
case find_checkpoint_to_delete(Mult, lists:reverse(Checkpoints0)) of
undefined ->
{State0, Checks};
{_, _} = Check ->
Checkpoints = lists:delete(Check, Checkpoints0),
{State0#?MODULE{checkpoints = Checkpoints},
[Check | Checks]}
end;
false ->
{State0, Checks}
end.

%% Utility

-define(MAX_DIFF, 65_536).

find_checkpoint_to_delete(Mult,
[{FstIdx, _},
{_, _} = Pot,
{ThrdIdx, _} | _] = Checks) ->
case ThrdIdx - FstIdx < (?MAX_DIFF * Mult) of
true ->
Pot;
false ->
find_checkpoint_to_delete(Mult, tl(Checks))
end;
find_checkpoint_to_delete(_, _) ->
undefined.

make_snapshot_dir(Dir, Index, Term) ->
I = ra_lib:zpad_hex(Index),
T = ra_lib:zpad_hex(Term),
Expand All @@ -606,65 +635,10 @@ counters_add(undefined, _, _) ->
counters_add(Counter, Ix, Incr) ->
counters:add(Counter, Ix, Incr).

random_idxs_to_take(Max, N) ->
%% Always retain the first and last elements.
AllIdxs = lists:seq(2, Max - 1),
%% Take a random subset of those indices of length N.
lists:sublist(ra_lib:lists_shuffle(AllIdxs), N).

%% Take items from the given list by the given indices without disturbing the
%% order of the list.
-spec lists_take_idxs(List, Idxs) -> {List1, Taken} when
List :: list(Elem),
Elem :: any(),
Idxs :: list(pos_integer()),
List1 :: list(Elem),
Taken :: list(Elem).
lists_take_idxs(List, Idxs0) ->
%% Sort the indices so `lists_take_idxs/5' may run linearly on the two lists
Idxs = lists:sort(Idxs0),
%% 1-indexing like the `lists' module.
lists_take_idxs(List, Idxs, 1, [], []).

lists_take_idxs([Elem | Elems], [Idx | Idxs], Idx, TakeAcc, ElemAcc) ->
lists_take_idxs(Elems, Idxs, Idx + 1, [Elem | TakeAcc], ElemAcc);
lists_take_idxs([Elem | Elems], Idxs, Idx, TakeAcc, ElemAcc) ->
lists_take_idxs(Elems, Idxs, Idx + 1, TakeAcc, [Elem | ElemAcc]);
lists_take_idxs(Elems, _Idxs = [], _Idx, TakeAcc, ElemAcc) ->
{lists:reverse(ElemAcc, Elems), lists:reverse(TakeAcc)};
lists_take_idxs(_Elems = [], _Idxs, _Idx, TakeAcc, ElemAcc) ->
{lists:reverse(ElemAcc), lists:reverse(TakeAcc)}.

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").

random_idxs_to_take_test() ->
Idxs = random_idxs_to_take(10, 3),
?assertEqual(3, length(Idxs)),
[Min, _, Max] = lists:sort(Idxs),
%% The first and last elements are excluded.
?assert(Min > 1),
?assert(Max < 10),
ok.

lists_take_idxs_test() ->
?assertEqual(
{[1, 3, 5, 7, 8], [2, 4, 6]},
lists_take_idxs(lists:seq(1, 8), [2, 4, 6])),

%% Ordering of `Idxs' doesn't matter.
?assertEqual(
{[1, 3, 5, 7, 8], [2, 4, 6]},
lists_take_idxs(lists:seq(1, 8), [4, 6, 2])),

?assertEqual(
{[a, c], [b]},
lists_take_idxs([a, b, c], [2])),

%% `List''s order is preserved even when nothing is taken.
?assertEqual(
{[a, b, c], []},
lists_take_idxs([a, b, c], [])),
ok.

-endif.
32 changes: 32 additions & 0 deletions test/coordination_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,26 @@ recover_from_checkpoint(Config) ->
LeaderIdx =:= 8 andalso Follower1Idx =:= 8 andalso
Follower2Idx =:= 8
end, 20),
CounterKeys = [
checkpoint_bytes_written,
checkpoint_index,
checkpoints,
checkpoints_written,
checkpoints_promoted
],
[begin
?assertMatch(
#{
checkpoint_bytes_written := B,
checkpoint_index := 8,
checkpoints := 1,
checkpoints_written := 1,
checkpoints_promoted := 0
} when B > 0,
ct_rpc:call(N, ra_counters, counters,
[ServerId, CounterKeys]))
end || {_, N} = ServerId <- ServerIds],


%% Restart the servers
[ok = ra:stop_server(?SYS, ServerId) || ServerId <- ServerIds],
Expand Down Expand Up @@ -814,6 +834,18 @@ recover_from_checkpoint(Config) ->
Follower2Idx =:= 8
end, 20),

[begin
?assertMatch(
#{
checkpoint_bytes_written := B,
checkpoint_index := 8,
checkpoints := 1,
checkpoints_written := 1,
checkpoints_promoted := 1
} when B > 0,
ct_rpc:call(N, ra_counters, counters,
[ServerId, CounterKeys]))
end || {_, N} = ServerId <- ServerIds],
%% Restart the servers: the servers should be able to recover from the
%% snapshot which was promoted from a checkpoint.
[ok = ra:stop_server(?SYS, ServerId) || ServerId <- ServerIds],
Expand Down
Loading

0 comments on commit 60782c3

Please sign in to comment.