From bd075581a23a35e8f881ee3c116873736ff788aa Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Fri, 5 Jul 2024 15:33:37 +0100 Subject: [PATCH] Try a different checkpoint thinning approach --- src/ra.hrl | 5 +- src/ra_lib.erl | 8 +-- src/ra_log.erl | 21 +++++-- src/ra_log_snapshot.erl | 16 ++++-- src/ra_server_proc.erl | 9 ++- src/ra_snapshot.erl | 102 ++++++++++++--------------------- test/coordination_SUITE.erl | 32 +++++++++++ test/ra_log_snapshot_SUITE.erl | 10 ++-- 8 files changed, 116 insertions(+), 87 deletions(-) diff --git a/src/ra.hrl b/src/ra.hrl index 9d46ec21..06a63629 100644 --- a/src/ra.hrl +++ b/src/ra.hrl @@ -261,6 +261,8 @@ "Total number of checkpoints written"}, {checkpoint_bytes_written, ?C_RA_LOG_CHECKPOINT_BYTES_WRITTEN, counter, "Number of checkpoint bytes written"}, + {checkpoints_promoted, ?C_RA_LOG_CHECKPOINTS_PROMOTED, counter, + "Number of checkpoints promoted to snapshots"}, {reserved_1, ?C_RA_LOG_RESERVED, counter, "Reserved counter"} ]). -define(C_RA_LOG_WRITE_OPS, 1). @@ -277,7 +279,8 @@ -define(C_RA_LOG_OPEN_SEGMENTS, 12). -define(C_RA_LOG_CHECKPOINTS_WRITTEN, 13). -define(C_RA_LOG_CHECKPOINT_BYTES_WRITTEN, 14). --define(C_RA_LOG_RESERVED, 15). +-define(C_RA_LOG_CHECKPOINTS_PROMOTED, 15). +-define(C_RA_LOG_RESERVED, 16). -define(C_RA_SRV_AER_RECEIVED_FOLLOWER, ?C_RA_LOG_RESERVED + 1). -define(C_RA_SRV_AER_REPLIES_SUCCESS, ?C_RA_LOG_RESERVED + 2). diff --git a/src/ra_lib.erl b/src/ra_lib.erl index 7133b25e..77edf82c 100644 --- a/src/ra_lib.erl +++ b/src/ra_lib.erl @@ -321,12 +321,12 @@ retry(Func, Attempt, Sleep) -> end. -spec write_file(file:name_all(), iodata()) -> - ok | file_err(). + ok | {error, file_err()}. write_file(Name, IOData) -> write_file(Name, IOData, true). -spec write_file(file:name_all(), iodata(), Sync :: boolean()) -> - ok | file_err(). + ok | {error, file_err()}. write_file(Name, IOData, Sync) -> case file:open(Name, [binary, write, raw]) of {ok, Fd} -> @@ -347,7 +347,7 @@ write_file(Name, IOData, Sync) -> end. -spec sync_file(file:name_all()) -> - ok | file_err(). + ok | {error, file_err()}. sync_file(Name) -> case file:open(Name, [binary, read, write, raw]) of {ok, Fd} -> @@ -357,7 +357,7 @@ sync_file(Name) -> end. -spec sync_and_close_fd(file:fd()) -> - ok | file_err(). + ok | {error, file_err()}. sync_and_close_fd(Fd) -> case ra_file:sync(Fd) of ok -> diff --git a/src/ra_log.erl b/src/ra_log.erl index 34294350..767a7362 100644 --- a/src/ra_log.erl +++ b/src/ra_log.erl @@ -129,7 +129,8 @@ max_open_segments => non_neg_integer(), snapshot_module => module(), counter => counters:counters_ref(), - initial_access_pattern => sequential | random}. + initial_access_pattern => sequential | random, + max_checkpoints => non_neg_integer()}. -type overview() :: @@ -199,6 +200,7 @@ init(#{uid := UId, undefined -> {-1, -1}; Curr -> Curr end, + AccessPattern = maps:get(initial_access_pattern, Conf, random), Reader0 = ra_log_reader:init(UId, Dir, 0, MaxOpen, AccessPattern, [], Names, Counter), @@ -244,6 +246,13 @@ init(#{uid := UId, LastIdx = State000#?MODULE.last_index, put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, LastIdx), put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, LastIdx), + case ra_snapshot:latest_checkpoint(SnapshotState) of + undefined -> + ok; + {ChIdx, _ChTerm} -> + put_counter(Cfg, ?C_RA_SVR_METRIC_CHECKPOINT_INDEX, ChIdx) + end, + % recover the last term {LastTerm0, State00} = case LastIdx of SnapIdx -> @@ -743,11 +752,11 @@ promote_checkpoint(Idx, #?MODULE{cfg = Cfg, _ -> {WasPromoted, SnapState, Effects} = ra_snapshot:promote_checkpoint(Idx, SnapState0), - if WasPromoted -> - ok = incr_counter(Cfg, ?C_RA_LOG_SNAPSHOTS_WRITTEN, 1); - true -> - ok - end, + if WasPromoted -> + ok = incr_counter(Cfg, ?C_RA_LOG_CHECKPOINTS_PROMOTED, 1); + true -> + ok + end, {State#?MODULE{snapshot_state = SnapState}, Effects} end. diff --git a/src/ra_log_snapshot.erl b/src/ra_log_snapshot.erl index 43bb7bcb..21acee4c 100644 --- a/src/ra_log_snapshot.erl +++ b/src/ra_log_snapshot.erl @@ -44,7 +44,7 @@ prepare(_Index, State) -> State. %% @end -spec write(file:filename(), meta(), term(), Sync :: boolean()) -> - ok | {error, file_err()}. + {ok, non_neg_integer()} | {error, file_err()}. write(Dir, Meta, MacState, Sync) -> %% no compression on meta data to make sure reading it is as fast %% as possible @@ -53,10 +53,16 @@ write(Dir, Meta, MacState, Sync) -> Data = [<<(size(MetaBin)):32/unsigned>>, MetaBin | IOVec], Checksum = erlang:crc32(Data), File = filename(Dir), - ra_lib:write_file(File, [<>, - Data], Sync). + Bytes = 9 + iolist_size(Data), + case ra_lib:write_file(File, [<>, + Data], Sync) of + ok -> + {ok, Bytes}; + Err -> + Err + end. -spec sync(file:filename()) -> ok | {error, file_err()}. diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index d039310c..92517816 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -1698,12 +1698,19 @@ do_state_query(QueryName, #state{server_state = State}) -> ra_server:state_query(QueryName, State). config_defaults(ServerId) -> + Counter = case ra_counters:fetch(ServerId) of + undefined -> + ra_counters:new(ServerId, + {persistent_term, ?FIELDSPEC_KEY}); + C -> + C + end, #{broadcast_time => ?DEFAULT_BROADCAST_TIME, tick_timeout => ?TICK_INTERVAL_MS, install_snap_rpc_timeout => ?INSTALL_SNAP_RPC_TIMEOUT, await_condition_timeout => ?DEFAULT_AWAIT_CONDITION_TIMEOUT, initial_members => [], - counter => ra_counters:new(ServerId, {persistent_term, ?FIELDSPEC_KEY}), + counter => Counter, system_config => ra_system:default_config() }. diff --git a/src/ra_snapshot.erl b/src/ra_snapshot.erl index efc5734b..f3d77869 100644 --- a/src/ra_snapshot.erl +++ b/src/ra_snapshot.erl @@ -582,22 +582,49 @@ take_older_checkpoints(Idx, #?MODULE{checkpoints = Checkpoints0} = State0) -> -spec take_extra_checkpoints(state()) -> {state(), [checkpoint()]}. +take_extra_checkpoints(State0) -> + take_extra_checkpoints(State0, []). + + take_extra_checkpoints(#?MODULE{checkpoints = Checkpoints0, - max_checkpoints = MaxCheckpoints} = State0) -> + max_checkpoints = MaxCheckpoints} = State0, + Checks) -> Len = erlang:length(Checkpoints0), - case Len - MaxCheckpoints of - ToDelete when ToDelete > 0 -> - %% Take `ToDelete' checkpoints from the list randomly without - %% ever taking the first or last checkpoint. - IdxsToTake = random_idxs_to_take(MaxCheckpoints, ToDelete), - {Checkpoints, Extras} = lists_take_idxs(Checkpoints0, IdxsToTake), - {State0#?MODULE{checkpoints = Checkpoints}, Extras}; - _ -> - {State0, []} + case Len > MaxCheckpoints of + true -> + %% when the number of checkpoints grow we increase the difference + %% between checkpoints in order to keep the total count kept on disk + %% down but keep some upper limit (~500k) to avoid huge differences + Mult = min(8, Len div MaxCheckpoints), + case find_checkpoint_to_delete(Mult, lists:reverse(Checkpoints0)) of + undefined -> + {State0, Checks}; + {_, _} = Check -> + Checkpoints = lists:delete(Check, Checkpoints0), + State0#?MODULE{checkpoints = Checkpoints}, + [Check | Checks] + end; + false -> + {State0, Checks} end. %% Utility +-define(MAX_DIFF, 65_536). + +find_checkpoint_to_delete(Mult, + [{FstIdx, _}, + {_, _} = Pot, + {ThrdIdx, _} | _] = Checks) -> + case ThrdIdx - FstIdx < (?MAX_DIFF * Mult) of + true -> + Pot; + false -> + find_checkpoint_to_delete(Mult, tl(Checks)) + end; +find_checkpoint_to_delete(_, _) -> + undefined. + make_snapshot_dir(Dir, Index, Term) -> I = ra_lib:zpad_hex(Index), T = ra_lib:zpad_hex(Term), @@ -608,65 +635,10 @@ counters_add(undefined, _, _) -> counters_add(Counter, Ix, Incr) -> counters:add(Counter, Ix, Incr). -random_idxs_to_take(Max, N) -> - %% Always retain the first and last elements. - AllIdxs = lists:seq(2, Max - 1), - %% Take a random subset of those indices of length N. - lists:sublist(ra_lib:lists_shuffle(AllIdxs), N). - -%% Take items from the given list by the given indices without disturbing the -%% order of the list. --spec lists_take_idxs(List, Idxs) -> {List1, Taken} when - List :: list(Elem), - Elem :: any(), - Idxs :: list(pos_integer()), - List1 :: list(Elem), - Taken :: list(Elem). -lists_take_idxs(List, Idxs0) -> - %% Sort the indices so `lists_take_idxs/5' may run linearly on the two lists - Idxs = lists:sort(Idxs0), - %% 1-indexing like the `lists' module. - lists_take_idxs(List, Idxs, 1, [], []). - -lists_take_idxs([Elem | Elems], [Idx | Idxs], Idx, TakeAcc, ElemAcc) -> - lists_take_idxs(Elems, Idxs, Idx + 1, [Elem | TakeAcc], ElemAcc); -lists_take_idxs([Elem | Elems], Idxs, Idx, TakeAcc, ElemAcc) -> - lists_take_idxs(Elems, Idxs, Idx + 1, TakeAcc, [Elem | ElemAcc]); -lists_take_idxs(Elems, _Idxs = [], _Idx, TakeAcc, ElemAcc) -> - {lists:reverse(ElemAcc, Elems), lists:reverse(TakeAcc)}; -lists_take_idxs(_Elems = [], _Idxs, _Idx, TakeAcc, ElemAcc) -> - {lists:reverse(ElemAcc), lists:reverse(TakeAcc)}. -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). -random_idxs_to_take_test() -> - Idxs = random_idxs_to_take(10, 3), - ?assertEqual(3, length(Idxs)), - [Min, _, Max] = lists:sort(Idxs), - %% The first and last elements are excluded. - ?assert(Min > 1), - ?assert(Max < 10), - ok. - -lists_take_idxs_test() -> - ?assertEqual( - {[1, 3, 5, 7, 8], [2, 4, 6]}, - lists_take_idxs(lists:seq(1, 8), [2, 4, 6])), - - %% Ordering of `Idxs' doesn't matter. - ?assertEqual( - {[1, 3, 5, 7, 8], [2, 4, 6]}, - lists_take_idxs(lists:seq(1, 8), [4, 6, 2])), - ?assertEqual( - {[a, c], [b]}, - lists_take_idxs([a, b, c], [2])), - - %% `List''s order is preserved even when nothing is taken. - ?assertEqual( - {[a, b, c], []}, - lists_take_idxs([a, b, c], [])), - ok. -endif. diff --git a/test/coordination_SUITE.erl b/test/coordination_SUITE.erl index f5182579..a0ab7be8 100644 --- a/test/coordination_SUITE.erl +++ b/test/coordination_SUITE.erl @@ -787,6 +787,26 @@ recover_from_checkpoint(Config) -> LeaderIdx =:= 8 andalso Follower1Idx =:= 8 andalso Follower2Idx =:= 8 end, 20), + CounterKeys = [ + checkpoint_bytes_written, + checkpoint_index, + checkpoints, + checkpoints_written, + checkpoints_promoted + ], + [begin + ?assertMatch( + #{ + checkpoint_bytes_written := B, + checkpoint_index := 8, + checkpoints := 1, + checkpoints_written := 1, + checkpoints_promoted := 0 + } when B > 0, + ct_rpc:call(N, ra_counters, counters, + [ServerId, CounterKeys])) + end || {_, N} = ServerId <- ServerIds], + %% Restart the servers [ok = ra:stop_server(?SYS, ServerId) || ServerId <- ServerIds], @@ -814,6 +834,18 @@ recover_from_checkpoint(Config) -> Follower2Idx =:= 8 end, 20), + [begin + ?assertMatch( + #{ + checkpoint_bytes_written := B, + checkpoint_index := 8, + checkpoints := 1, + checkpoints_written := 1, + checkpoints_promoted := 1 + } when B > 0, + ct_rpc:call(N, ra_counters, counters, + [ServerId, CounterKeys])) + end || {_, N} = ServerId <- ServerIds], %% Restart the servers: the servers should be able to recover from the %% snapshot which was promoted from a checkpoint. [ok = ra:stop_server(?SYS, ServerId) || ServerId <- ServerIds], diff --git a/test/ra_log_snapshot_SUITE.erl b/test/ra_log_snapshot_SUITE.erl index adfd560c..a1210cff 100644 --- a/test/ra_log_snapshot_SUITE.erl +++ b/test/ra_log_snapshot_SUITE.erl @@ -73,7 +73,7 @@ roundtrip(Config) -> Dir = ?config(dir, Config), SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}]), SnapshotRef = my_state, - ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true), + {ok, _} = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true), Context = #{can_accept_full_file => true}, ?assertEqual({SnapshotMeta, SnapshotRef}, read(Dir, Context)), ok. @@ -82,7 +82,7 @@ roundtrip_compat(Config) -> Dir = ?config(dir, Config), SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}]), SnapshotRef = my_state, - ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true), + {ok, _} = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true), ?assertEqual({SnapshotMeta, SnapshotRef}, read(Dir)), ok. @@ -107,7 +107,7 @@ test_accept(Config, Name, DataSize, FullFile, ChunkSize) -> ct:pal("test_accept ~w ~b ~w ~b", [Name, DataSize, FullFile, ChunkSize]), SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}]), SnapshotRef = crypto:strong_rand_bytes(DataSize), - ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true), + {ok, _} = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true), Context = #{can_accept_full_file => FullFile}, {ok, Meta, St} = ra_log_snapshot:begin_read(Dir, Context), %% how to ensure @@ -180,7 +180,7 @@ read_meta_data(Config) -> Dir = ?config(dir, Config), SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}]), SnapshotRef = my_state, - ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true), + {ok, _} = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true), {ok, SnapshotMeta} = ra_log_snapshot:read_meta(Dir), ok. @@ -188,7 +188,7 @@ recover_same_as_read(Config) -> Dir = ?config(dir, Config), SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}]), SnapshotData = my_state, - ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotData, true), + {ok, _} = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotData, true), {ok, SnapshotMeta, SnapshotData} = ra_log_snapshot:recover(Dir), ok.