Skip to content

Commit

Permalink
Try a different checkpoint thinning approach
Browse files Browse the repository at this point in the history
Instead of setting a fixed upper limit the max_checkpoints ra_log
settings instead determins the point at which thinning will beging
to be applied. This means there is no upper limit on the number
of checkpoints that can exist but the differnce in indexes will
begin to increase as the number goes up.
  • Loading branch information
kjnilsson committed Jul 11, 2024
1 parent b4f4e93 commit 47984cc
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 87 deletions.
5 changes: 4 additions & 1 deletion src/ra.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@
"Total number of checkpoints written"},
{checkpoint_bytes_written, ?C_RA_LOG_CHECKPOINT_BYTES_WRITTEN, counter,
"Number of checkpoint bytes written"},
{checkpoints_promoted, ?C_RA_LOG_CHECKPOINTS_PROMOTED, counter,
"Number of checkpoints promoted to snapshots"},
{reserved_1, ?C_RA_LOG_RESERVED, counter, "Reserved counter"}
]).
-define(C_RA_LOG_WRITE_OPS, 1).
Expand All @@ -277,7 +279,8 @@
-define(C_RA_LOG_OPEN_SEGMENTS, 12).
-define(C_RA_LOG_CHECKPOINTS_WRITTEN, 13).
-define(C_RA_LOG_CHECKPOINT_BYTES_WRITTEN, 14).
-define(C_RA_LOG_RESERVED, 15).
-define(C_RA_LOG_CHECKPOINTS_PROMOTED, 15).
-define(C_RA_LOG_RESERVED, 16).

-define(C_RA_SRV_AER_RECEIVED_FOLLOWER, ?C_RA_LOG_RESERVED + 1).
-define(C_RA_SRV_AER_REPLIES_SUCCESS, ?C_RA_LOG_RESERVED + 2).
Expand Down
8 changes: 4 additions & 4 deletions src/ra_lib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -321,12 +321,12 @@ retry(Func, Attempt, Sleep) ->
end.

-spec write_file(file:name_all(), iodata()) ->
ok | file_err().
ok | {error, file_err()}.
write_file(Name, IOData) ->
write_file(Name, IOData, true).

-spec write_file(file:name_all(), iodata(), Sync :: boolean()) ->
ok | file_err().
ok | {error, file_err()}.
write_file(Name, IOData, Sync) ->
case file:open(Name, [binary, write, raw]) of
{ok, Fd} ->
Expand All @@ -347,7 +347,7 @@ write_file(Name, IOData, Sync) ->
end.

-spec sync_file(file:name_all()) ->
ok | file_err().
ok | {error, file_err()}.
sync_file(Name) ->
case file:open(Name, [binary, read, write, raw]) of
{ok, Fd} ->
Expand All @@ -357,7 +357,7 @@ sync_file(Name) ->
end.

-spec sync_and_close_fd(file:fd()) ->
ok | file_err().
ok | {error, file_err()}.
sync_and_close_fd(Fd) ->
case ra_file:sync(Fd) of
ok ->
Expand Down
21 changes: 15 additions & 6 deletions src/ra_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@
max_open_segments => non_neg_integer(),
snapshot_module => module(),
counter => counters:counters_ref(),
initial_access_pattern => sequential | random}.
initial_access_pattern => sequential | random,
max_checkpoints => non_neg_integer()}.


-type overview() ::
Expand Down Expand Up @@ -199,6 +200,7 @@ init(#{uid := UId,
undefined -> {-1, -1};
Curr -> Curr
end,

AccessPattern = maps:get(initial_access_pattern, Conf, random),
Reader0 = ra_log_reader:init(UId, Dir, 0, MaxOpen, AccessPattern, [],
Names, Counter),
Expand Down Expand Up @@ -244,6 +246,13 @@ init(#{uid := UId,
LastIdx = State000#?MODULE.last_index,
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, LastIdx),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, LastIdx),
case ra_snapshot:latest_checkpoint(SnapshotState) of
undefined ->
ok;
{ChIdx, _ChTerm} ->
put_counter(Cfg, ?C_RA_SVR_METRIC_CHECKPOINT_INDEX, ChIdx)
end,

% recover the last term
{LastTerm0, State00} = case LastIdx of
SnapIdx ->
Expand Down Expand Up @@ -743,11 +752,11 @@ promote_checkpoint(Idx, #?MODULE{cfg = Cfg,
_ ->
{WasPromoted, SnapState, Effects} =
ra_snapshot:promote_checkpoint(Idx, SnapState0),
if WasPromoted ->
ok = incr_counter(Cfg, ?C_RA_LOG_SNAPSHOTS_WRITTEN, 1);
true ->
ok
end,
if WasPromoted ->
ok = incr_counter(Cfg, ?C_RA_LOG_CHECKPOINTS_PROMOTED, 1);
true ->
ok
end,

{State#?MODULE{snapshot_state = SnapState}, Effects}
end.
Expand Down
16 changes: 11 additions & 5 deletions src/ra_log_snapshot.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ prepare(_Index, State) -> State.
%% @end

-spec write(file:filename(), meta(), term(), Sync :: boolean()) ->
ok | {error, file_err()}.
{ok, non_neg_integer()} | {error, file_err()}.
write(Dir, Meta, MacState, Sync) ->
%% no compression on meta data to make sure reading it is as fast
%% as possible
Expand All @@ -53,10 +53,16 @@ write(Dir, Meta, MacState, Sync) ->
Data = [<<(size(MetaBin)):32/unsigned>>, MetaBin | IOVec],
Checksum = erlang:crc32(Data),
File = filename(Dir),
ra_lib:write_file(File, [<<?MAGIC,
?VERSION:8/unsigned,
Checksum:32/integer>>,
Data], Sync).
Bytes = 9 + iolist_size(Data),
case ra_lib:write_file(File, [<<?MAGIC,
?VERSION:8/unsigned,
Checksum:32/integer>>,
Data], Sync) of
ok ->
{ok, Bytes};
Err ->
Err
end.

-spec sync(file:filename()) ->
ok | {error, file_err()}.
Expand Down
9 changes: 8 additions & 1 deletion src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1698,12 +1698,19 @@ do_state_query(QueryName, #state{server_state = State}) ->
ra_server:state_query(QueryName, State).

config_defaults(ServerId) ->
Counter = case ra_counters:fetch(ServerId) of
undefined ->
ra_counters:new(ServerId,
{persistent_term, ?FIELDSPEC_KEY});
C ->
C
end,
#{broadcast_time => ?DEFAULT_BROADCAST_TIME,
tick_timeout => ?TICK_INTERVAL_MS,
install_snap_rpc_timeout => ?INSTALL_SNAP_RPC_TIMEOUT,
await_condition_timeout => ?DEFAULT_AWAIT_CONDITION_TIMEOUT,
initial_members => [],
counter => ra_counters:new(ServerId, {persistent_term, ?FIELDSPEC_KEY}),
counter => Counter,
system_config => ra_system:default_config()
}.

Expand Down
102 changes: 37 additions & 65 deletions src/ra_snapshot.erl
Original file line number Diff line number Diff line change
Expand Up @@ -582,22 +582,49 @@ take_older_checkpoints(Idx, #?MODULE{checkpoints = Checkpoints0} = State0) ->

-spec take_extra_checkpoints(state()) ->
{state(), [checkpoint()]}.
take_extra_checkpoints(State0) ->
take_extra_checkpoints(State0, []).


take_extra_checkpoints(#?MODULE{checkpoints = Checkpoints0,
max_checkpoints = MaxCheckpoints} = State0) ->
max_checkpoints = MaxCheckpoints} = State0,
Checks) ->
Len = erlang:length(Checkpoints0),
case Len - MaxCheckpoints of
ToDelete when ToDelete > 0 ->
%% Take `ToDelete' checkpoints from the list randomly without
%% ever taking the first or last checkpoint.
IdxsToTake = random_idxs_to_take(MaxCheckpoints, ToDelete),
{Checkpoints, Extras} = lists_take_idxs(Checkpoints0, IdxsToTake),
{State0#?MODULE{checkpoints = Checkpoints}, Extras};
_ ->
{State0, []}
case Len > MaxCheckpoints of
true ->
%% when the number of checkpoints grow we increase the difference
%% between checkpoints in order to keep the total count kept on disk
%% down but keep some upper limit (~500k) to avoid huge differences
Mult = min(8, Len div MaxCheckpoints),
case find_checkpoint_to_delete(Mult, lists:reverse(Checkpoints0)) of
undefined ->
{State0, Checks};
{_, _} = Check ->
Checkpoints = lists:delete(Check, Checkpoints0),
{State0#?MODULE{checkpoints = Checkpoints},
[Check | Checks]}
end;
false ->
{State0, Checks}
end.

%% Utility

-define(MAX_DIFF, 65_536).

find_checkpoint_to_delete(Mult,
[{FstIdx, _},
{_, _} = Pot,
{ThrdIdx, _} | _] = Checks) ->
case ThrdIdx - FstIdx < (?MAX_DIFF * Mult) of
true ->
Pot;
false ->
find_checkpoint_to_delete(Mult, tl(Checks))
end;
find_checkpoint_to_delete(_, _) ->
undefined.

make_snapshot_dir(Dir, Index, Term) ->
I = ra_lib:zpad_hex(Index),
T = ra_lib:zpad_hex(Term),
Expand All @@ -608,65 +635,10 @@ counters_add(undefined, _, _) ->
counters_add(Counter, Ix, Incr) ->
counters:add(Counter, Ix, Incr).

random_idxs_to_take(Max, N) ->
%% Always retain the first and last elements.
AllIdxs = lists:seq(2, Max - 1),
%% Take a random subset of those indices of length N.
lists:sublist(ra_lib:lists_shuffle(AllIdxs), N).

%% Take items from the given list by the given indices without disturbing the
%% order of the list.
-spec lists_take_idxs(List, Idxs) -> {List1, Taken} when
List :: list(Elem),
Elem :: any(),
Idxs :: list(pos_integer()),
List1 :: list(Elem),
Taken :: list(Elem).
lists_take_idxs(List, Idxs0) ->
%% Sort the indices so `lists_take_idxs/5' may run linearly on the two lists
Idxs = lists:sort(Idxs0),
%% 1-indexing like the `lists' module.
lists_take_idxs(List, Idxs, 1, [], []).

lists_take_idxs([Elem | Elems], [Idx | Idxs], Idx, TakeAcc, ElemAcc) ->
lists_take_idxs(Elems, Idxs, Idx + 1, [Elem | TakeAcc], ElemAcc);
lists_take_idxs([Elem | Elems], Idxs, Idx, TakeAcc, ElemAcc) ->
lists_take_idxs(Elems, Idxs, Idx + 1, TakeAcc, [Elem | ElemAcc]);
lists_take_idxs(Elems, _Idxs = [], _Idx, TakeAcc, ElemAcc) ->
{lists:reverse(ElemAcc, Elems), lists:reverse(TakeAcc)};
lists_take_idxs(_Elems = [], _Idxs, _Idx, TakeAcc, ElemAcc) ->
{lists:reverse(ElemAcc), lists:reverse(TakeAcc)}.

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").

random_idxs_to_take_test() ->
Idxs = random_idxs_to_take(10, 3),
?assertEqual(3, length(Idxs)),
[Min, _, Max] = lists:sort(Idxs),
%% The first and last elements are excluded.
?assert(Min > 1),
?assert(Max < 10),
ok.

lists_take_idxs_test() ->
?assertEqual(
{[1, 3, 5, 7, 8], [2, 4, 6]},
lists_take_idxs(lists:seq(1, 8), [2, 4, 6])),

%% Ordering of `Idxs' doesn't matter.
?assertEqual(
{[1, 3, 5, 7, 8], [2, 4, 6]},
lists_take_idxs(lists:seq(1, 8), [4, 6, 2])),

?assertEqual(
{[a, c], [b]},
lists_take_idxs([a, b, c], [2])),

%% `List''s order is preserved even when nothing is taken.
?assertEqual(
{[a, b, c], []},
lists_take_idxs([a, b, c], [])),
ok.

-endif.
32 changes: 32 additions & 0 deletions test/coordination_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,26 @@ recover_from_checkpoint(Config) ->
LeaderIdx =:= 8 andalso Follower1Idx =:= 8 andalso
Follower2Idx =:= 8
end, 20),
CounterKeys = [
checkpoint_bytes_written,
checkpoint_index,
checkpoints,
checkpoints_written,
checkpoints_promoted
],
[begin
?assertMatch(
#{
checkpoint_bytes_written := B,
checkpoint_index := 8,
checkpoints := 1,
checkpoints_written := 1,
checkpoints_promoted := 0
} when B > 0,
ct_rpc:call(N, ra_counters, counters,
[ServerId, CounterKeys]))
end || {_, N} = ServerId <- ServerIds],


%% Restart the servers
[ok = ra:stop_server(?SYS, ServerId) || ServerId <- ServerIds],
Expand Down Expand Up @@ -814,6 +834,18 @@ recover_from_checkpoint(Config) ->
Follower2Idx =:= 8
end, 20),

[begin
?assertMatch(
#{
checkpoint_bytes_written := B,
checkpoint_index := 8,
checkpoints := 1,
checkpoints_written := 1,
checkpoints_promoted := 1
} when B > 0,
ct_rpc:call(N, ra_counters, counters,
[ServerId, CounterKeys]))
end || {_, N} = ServerId <- ServerIds],
%% Restart the servers: the servers should be able to recover from the
%% snapshot which was promoted from a checkpoint.
[ok = ra:stop_server(?SYS, ServerId) || ServerId <- ServerIds],
Expand Down
10 changes: 5 additions & 5 deletions test/ra_log_snapshot_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ roundtrip(Config) ->
Dir = ?config(dir, Config),
SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}]),
SnapshotRef = my_state,
ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true),
{ok, _} = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true),
Context = #{can_accept_full_file => true},
?assertEqual({SnapshotMeta, SnapshotRef}, read(Dir, Context)),
ok.
Expand All @@ -82,7 +82,7 @@ roundtrip_compat(Config) ->
Dir = ?config(dir, Config),
SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}]),
SnapshotRef = my_state,
ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true),
{ok, _} = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true),
?assertEqual({SnapshotMeta, SnapshotRef}, read(Dir)),
ok.

Expand All @@ -107,7 +107,7 @@ test_accept(Config, Name, DataSize, FullFile, ChunkSize) ->
ct:pal("test_accept ~w ~b ~w ~b", [Name, DataSize, FullFile, ChunkSize]),
SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}]),
SnapshotRef = crypto:strong_rand_bytes(DataSize),
ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true),
{ok, _} = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true),
Context = #{can_accept_full_file => FullFile},
{ok, Meta, St} = ra_log_snapshot:begin_read(Dir, Context),
%% how to ensure
Expand Down Expand Up @@ -180,15 +180,15 @@ read_meta_data(Config) ->
Dir = ?config(dir, Config),
SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}]),
SnapshotRef = my_state,
ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true),
{ok, _} = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true),
{ok, SnapshotMeta} = ra_log_snapshot:read_meta(Dir),
ok.

recover_same_as_read(Config) ->
Dir = ?config(dir, Config),
SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}]),
SnapshotData = my_state,
ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotData, true),
{ok, _} = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotData, true),
{ok, SnapshotMeta, SnapshotData} = ra_log_snapshot:recover(Dir),
ok.

Expand Down

0 comments on commit 47984cc

Please sign in to comment.