Skip to content

Commit

Permalink
Try a different checkpoint thinning approach
Browse files Browse the repository at this point in the history
  • Loading branch information
kjnilsson committed Jul 9, 2024
1 parent 2fbb7a3 commit cf5b431
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 87 deletions.
5 changes: 4 additions & 1 deletion src/ra.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@
"Total number of checkpoints written"},
{checkpoint_bytes_written, ?C_RA_LOG_CHECKPOINT_BYTES_WRITTEN, counter,
"Number of checkpoint bytes written"},
{checkpoints_promoted, ?C_RA_LOG_CHECKPOINTS_PROMOTED, counter,
"Number of checkpoints promoted to snapshots"},
{reserved_1, ?C_RA_LOG_RESERVED, counter, "Reserved counter"}
]).
-define(C_RA_LOG_WRITE_OPS, 1).
Expand All @@ -277,7 +279,8 @@
-define(C_RA_LOG_OPEN_SEGMENTS, 12).
-define(C_RA_LOG_CHECKPOINTS_WRITTEN, 13).
-define(C_RA_LOG_CHECKPOINT_BYTES_WRITTEN, 14).
-define(C_RA_LOG_RESERVED, 15).
-define(C_RA_LOG_CHECKPOINTS_PROMOTED, 15).
-define(C_RA_LOG_RESERVED, 16).

-define(C_RA_SRV_AER_RECEIVED_FOLLOWER, ?C_RA_LOG_RESERVED + 1).
-define(C_RA_SRV_AER_REPLIES_SUCCESS, ?C_RA_LOG_RESERVED + 2).
Expand Down
8 changes: 4 additions & 4 deletions src/ra_lib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -321,12 +321,12 @@ retry(Func, Attempt, Sleep) ->
end.

-spec write_file(file:name_all(), iodata()) ->
ok | file_err().
ok | {error, file_err()}.
write_file(Name, IOData) ->
write_file(Name, IOData, true).

-spec write_file(file:name_all(), iodata(), Sync :: boolean()) ->
ok | file_err().
ok | {error, file_err()}.
write_file(Name, IOData, Sync) ->
case file:open(Name, [binary, write, raw]) of
{ok, Fd} ->
Expand All @@ -347,7 +347,7 @@ write_file(Name, IOData, Sync) ->
end.

-spec sync_file(file:name_all()) ->
ok | file_err().
ok | {error, file_err()}.
sync_file(Name) ->
case file:open(Name, [binary, read, write, raw]) of
{ok, Fd} ->
Expand All @@ -357,7 +357,7 @@ sync_file(Name) ->
end.

-spec sync_and_close_fd(file:fd()) ->
ok | file_err().
ok | {error, file_err()}.
sync_and_close_fd(Fd) ->
case ra_file:sync(Fd) of
ok ->
Expand Down
21 changes: 15 additions & 6 deletions src/ra_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@
max_open_segments => non_neg_integer(),
snapshot_module => module(),
counter => counters:counters_ref(),
initial_access_pattern => sequential | random}.
initial_access_pattern => sequential | random,
max_checkpoints => non_neg_integer()}.


-type overview() ::
Expand Down Expand Up @@ -199,6 +200,7 @@ init(#{uid := UId,
undefined -> {-1, -1};
Curr -> Curr
end,

AccessPattern = maps:get(initial_access_pattern, Conf, random),
Reader0 = ra_log_reader:init(UId, Dir, 0, MaxOpen, AccessPattern, [],
Names, Counter),
Expand Down Expand Up @@ -244,6 +246,13 @@ init(#{uid := UId,
LastIdx = State000#?MODULE.last_index,
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, LastIdx),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, LastIdx),
case ra_snapshot:latest_checkpoint(SnapshotState) of
undefined ->
ok;
{ChIdx, _ChTerm} ->
put_counter(Cfg, ?C_RA_SVR_METRIC_CHECKPOINT_INDEX, ChIdx)
end,

% recover the last term
{LastTerm0, State00} = case LastIdx of
SnapIdx ->
Expand Down Expand Up @@ -743,11 +752,11 @@ promote_checkpoint(Idx, #?MODULE{cfg = Cfg,
_ ->
{WasPromoted, SnapState, Effects} =
ra_snapshot:promote_checkpoint(Idx, SnapState0),
if WasPromoted ->
ok = incr_counter(Cfg, ?C_RA_LOG_SNAPSHOTS_WRITTEN, 1);
true ->
ok
end,
if WasPromoted ->
ok = incr_counter(Cfg, ?C_RA_LOG_CHECKPOINTS_PROMOTED, 1);
true ->
ok
end,

{State#?MODULE{snapshot_state = SnapState}, Effects}
end.
Expand Down
16 changes: 11 additions & 5 deletions src/ra_log_snapshot.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ prepare(_Index, State) -> State.
%% @end

-spec write(file:filename(), meta(), term(), Sync :: boolean()) ->
ok | {error, file_err()}.
{ok, non_neg_integer()} | {error, file_err()}.
write(Dir, Meta, MacState, Sync) ->
%% no compression on meta data to make sure reading it is as fast
%% as possible
Expand All @@ -53,10 +53,16 @@ write(Dir, Meta, MacState, Sync) ->
Data = [<<(size(MetaBin)):32/unsigned>>, MetaBin | IOVec],
Checksum = erlang:crc32(Data),
File = filename(Dir),
ra_lib:write_file(File, [<<?MAGIC,
?VERSION:8/unsigned,
Checksum:32/integer>>,
Data], Sync).
Bytes = 9 + iolist_size(Data),
case ra_lib:write_file(File, [<<?MAGIC,
?VERSION:8/unsigned,
Checksum:32/integer>>,
Data], Sync) of
ok ->
{ok, Bytes};
Err ->
Err
end.

-spec sync(file:filename()) ->
ok | {error, file_err()}.
Expand Down
9 changes: 8 additions & 1 deletion src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1698,12 +1698,19 @@ do_state_query(QueryName, #state{server_state = State}) ->
ra_server:state_query(QueryName, State).

config_defaults(ServerId) ->
Counter = case ra_counters:fetch(ServerId) of
undefined ->
ra_counters:new(ServerId,
{persistent_term, ?FIELDSPEC_KEY});
C ->
C
end,
#{broadcast_time => ?DEFAULT_BROADCAST_TIME,
tick_timeout => ?TICK_INTERVAL_MS,
install_snap_rpc_timeout => ?INSTALL_SNAP_RPC_TIMEOUT,
await_condition_timeout => ?DEFAULT_AWAIT_CONDITION_TIMEOUT,
initial_members => [],
counter => ra_counters:new(ServerId, {persistent_term, ?FIELDSPEC_KEY}),
counter => Counter,
system_config => ra_system:default_config()
}.

Expand Down
102 changes: 37 additions & 65 deletions src/ra_snapshot.erl
Original file line number Diff line number Diff line change
Expand Up @@ -582,22 +582,49 @@ take_older_checkpoints(Idx, #?MODULE{checkpoints = Checkpoints0} = State0) ->

-spec take_extra_checkpoints(state()) ->
{state(), [checkpoint()]}.
take_extra_checkpoints(State0) ->
take_extra_checkpoints(State0, []).


take_extra_checkpoints(#?MODULE{checkpoints = Checkpoints0,
max_checkpoints = MaxCheckpoints} = State0) ->
max_checkpoints = MaxCheckpoints} = State0,
Checks) ->
Len = erlang:length(Checkpoints0),
case Len - MaxCheckpoints of
ToDelete when ToDelete > 0 ->
%% Take `ToDelete' checkpoints from the list randomly without
%% ever taking the first or last checkpoint.
IdxsToTake = random_idxs_to_take(MaxCheckpoints, ToDelete),
{Checkpoints, Extras} = lists_take_idxs(Checkpoints0, IdxsToTake),
{State0#?MODULE{checkpoints = Checkpoints}, Extras};
_ ->
{State0, []}
case Len > MaxCheckpoints of
true ->
%% when the number of checkpoints grow we increase the difference
%% between checkpoints in order to keep the total count kept on disk
%% down but keep some upper limit to avoid huge differences
Mult = min(8, Len div MaxCheckpoints),
case find_checkpoint_to_delete(Mult, lists:reverse(Checkpoints0)) of
undefined ->
{State0, Checks};
{_, _} = Check ->
Checkpoints = lists:delete(Check, Checkpoints0),
State0#?MODULE{checkpoints = Checkpoints},
[Check | Checks]
end;
false ->
{State0, Checks}
end.

%% Utility

-define(MAX_DIFF, 65_536).

find_checkpoint_to_delete(Mult,
[{FstIdx, _},
{_, _} = Pot,
{ThrdIdx, _} | _] = Checks) ->
case ThrdIdx - FstIdx < (?MAX_DIFF * Mult) of
true ->
Pot;
false ->
find_checkpoint_to_delete(Mult, tl(Checks))
end;
find_checkpoint_to_delete(_, _) ->
undefined.

make_snapshot_dir(Dir, Index, Term) ->
I = ra_lib:zpad_hex(Index),
T = ra_lib:zpad_hex(Term),
Expand All @@ -608,65 +635,10 @@ counters_add(undefined, _, _) ->
counters_add(Counter, Ix, Incr) ->
counters:add(Counter, Ix, Incr).

random_idxs_to_take(Max, N) ->
%% Always retain the first and last elements.
AllIdxs = lists:seq(2, Max - 1),
%% Take a random subset of those indices of length N.
lists:sublist(ra_lib:lists_shuffle(AllIdxs), N).

%% Take items from the given list by the given indices without disturbing the
%% order of the list.
-spec lists_take_idxs(List, Idxs) -> {List1, Taken} when
List :: list(Elem),
Elem :: any(),
Idxs :: list(pos_integer()),
List1 :: list(Elem),
Taken :: list(Elem).
lists_take_idxs(List, Idxs0) ->
%% Sort the indices so `lists_take_idxs/5' may run linearly on the two lists
Idxs = lists:sort(Idxs0),
%% 1-indexing like the `lists' module.
lists_take_idxs(List, Idxs, 1, [], []).

lists_take_idxs([Elem | Elems], [Idx | Idxs], Idx, TakeAcc, ElemAcc) ->
lists_take_idxs(Elems, Idxs, Idx + 1, [Elem | TakeAcc], ElemAcc);
lists_take_idxs([Elem | Elems], Idxs, Idx, TakeAcc, ElemAcc) ->
lists_take_idxs(Elems, Idxs, Idx + 1, TakeAcc, [Elem | ElemAcc]);
lists_take_idxs(Elems, _Idxs = [], _Idx, TakeAcc, ElemAcc) ->
{lists:reverse(ElemAcc, Elems), lists:reverse(TakeAcc)};
lists_take_idxs(_Elems = [], _Idxs, _Idx, TakeAcc, ElemAcc) ->
{lists:reverse(ElemAcc), lists:reverse(TakeAcc)}.

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").

random_idxs_to_take_test() ->
Idxs = random_idxs_to_take(10, 3),
?assertEqual(3, length(Idxs)),
[Min, _, Max] = lists:sort(Idxs),
%% The first and last elements are excluded.
?assert(Min > 1),
?assert(Max < 10),
ok.

lists_take_idxs_test() ->
?assertEqual(
{[1, 3, 5, 7, 8], [2, 4, 6]},
lists_take_idxs(lists:seq(1, 8), [2, 4, 6])),

%% Ordering of `Idxs' doesn't matter.
?assertEqual(
{[1, 3, 5, 7, 8], [2, 4, 6]},
lists_take_idxs(lists:seq(1, 8), [4, 6, 2])),

?assertEqual(
{[a, c], [b]},
lists_take_idxs([a, b, c], [2])),

%% `List''s order is preserved even when nothing is taken.
?assertEqual(
{[a, b, c], []},
lists_take_idxs([a, b, c], [])),
ok.

-endif.
32 changes: 32 additions & 0 deletions test/coordination_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,26 @@ recover_from_checkpoint(Config) ->
LeaderIdx =:= 8 andalso Follower1Idx =:= 8 andalso
Follower2Idx =:= 8
end, 20),
CounterKeys = [
checkpoint_bytes_written,
checkpoint_index,
checkpoints,
checkpoints_written,
checkpoints_promoted
],
[begin
?assertMatch(
#{
checkpoint_bytes_written := B,
checkpoint_index := 8,
checkpoints := 1,
checkpoints_written := 1,
checkpoints_promoted := 0
} when B > 0,
ct_rpc:call(N, ra_counters, counters,
[ServerId, CounterKeys]))
end || {_, N} = ServerId <- ServerIds],


%% Restart the servers
[ok = ra:stop_server(?SYS, ServerId) || ServerId <- ServerIds],
Expand Down Expand Up @@ -814,6 +834,18 @@ recover_from_checkpoint(Config) ->
Follower2Idx =:= 8
end, 20),

[begin
?assertMatch(
#{
checkpoint_bytes_written := B,
checkpoint_index := 8,
checkpoints := 1,
checkpoints_written := 1,
checkpoints_promoted := 1
} when B > 0,
ct_rpc:call(N, ra_counters, counters,
[ServerId, CounterKeys]))
end || {_, N} = ServerId <- ServerIds],
%% Restart the servers: the servers should be able to recover from the
%% snapshot which was promoted from a checkpoint.
[ok = ra:stop_server(?SYS, ServerId) || ServerId <- ServerIds],
Expand Down
10 changes: 5 additions & 5 deletions test/ra_log_snapshot_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ roundtrip(Config) ->
Dir = ?config(dir, Config),
SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}]),
SnapshotRef = my_state,
ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true),
{ok, _} = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true),
Context = #{can_accept_full_file => true},
?assertEqual({SnapshotMeta, SnapshotRef}, read(Dir, Context)),
ok.
Expand All @@ -82,7 +82,7 @@ roundtrip_compat(Config) ->
Dir = ?config(dir, Config),
SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}]),
SnapshotRef = my_state,
ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true),
{ok, _} = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true),
?assertEqual({SnapshotMeta, SnapshotRef}, read(Dir)),
ok.

Expand All @@ -107,7 +107,7 @@ test_accept(Config, Name, DataSize, FullFile, ChunkSize) ->
ct:pal("test_accept ~w ~b ~w ~b", [Name, DataSize, FullFile, ChunkSize]),
SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}]),
SnapshotRef = crypto:strong_rand_bytes(DataSize),
ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true),
{ok, _} = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true),
Context = #{can_accept_full_file => FullFile},
{ok, Meta, St} = ra_log_snapshot:begin_read(Dir, Context),
%% how to ensure
Expand Down Expand Up @@ -180,15 +180,15 @@ read_meta_data(Config) ->
Dir = ?config(dir, Config),
SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}]),
SnapshotRef = my_state,
ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true),
{ok, _} = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true),
{ok, SnapshotMeta} = ra_log_snapshot:read_meta(Dir),
ok.

recover_same_as_read(Config) ->
Dir = ?config(dir, Config),
SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}]),
SnapshotData = my_state,
ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotData, true),
{ok, _} = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotData, true),
{ok, SnapshotMeta, SnapshotData} = ra_log_snapshot:recover(Dir),
ok.

Expand Down

0 comments on commit cf5b431

Please sign in to comment.