Skip to content

Commit

Permalink
WAL: drop entries for UId that are lower than snapshot index.
Browse files Browse the repository at this point in the history
As noone needs these entries, best not write them at all.
  • Loading branch information
kjnilsson committed Jun 3, 2024
1 parent 3a4dacb commit a2541b1
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 6 deletions.
28 changes: 26 additions & 2 deletions src/ra_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,10 @@ handle_event({segments, Tid, NewSegs},
end;
handle_event({snapshot_written, {SnapIdx, _} = Snap, SnapKind},
#?MODULE{cfg = Cfg,
% cache = Cache,
first_index = FstIdx,
last_index = LstIdx,
last_written_index_term = {LastWrittenIdx, _} = LWIdx,
snapshot_state = SnapState0} = State0)
%% only update snapshot if it is newer than the last snapshot
when SnapIdx >= FstIdx ->
Expand All @@ -587,10 +590,27 @@ handle_event({snapshot_written, {SnapIdx, _} = Snap, SnapKind},
CPEffects = [{delete_snapshot,
ra_snapshot:directory(SnapState, checkpoint),
Checkpoint} || Checkpoint <- Checkpoints],
Effects = [DeleteCurrentSnap | CPEffects] ++ Effects0,
Effects1 = [DeleteCurrentSnap | CPEffects] ++ Effects0,
%% do not set last written index here as the snapshot may
%% be for a past index
{LWIdxTerm, Effects} =
case LastWrittenIdx > SnapIdx of
true ->
{LWIdx, Effects1};
false ->
{Snap,
[{next_event,
{ra_log_event,
{truncate_cache, LastWrittenIdx, SnapIdx}}} | Effects1]}
end,

% ?DEBUG("snapshot written at ~b LWIdxTerm ~w Effs ~p cache range ~p ",
% [SnapIdx, LWIdxTerm, Effects,
% ra_log_cache:range(Cache)]),

{State#?MODULE{first_index = SnapIdx + 1,
last_index = max(LstIdx, SnapIdx),
last_written_index_term = LWIdxTerm,
snapshot_state = SnapState}, Effects};
checkpoint ->
put_counter(Cfg, ?C_RA_SVR_METRIC_CHECKPOINT_INDEX, SnapIdx),
Expand Down Expand Up @@ -870,6 +890,7 @@ overview(#?MODULE{last_index = LastIndex,
{I, _} -> I
end,
cache_size => ra_log_cache:size(Cache),
cache_range => ra_log_cache:range(Cache),
last_wal_write => LastMs
}.

Expand Down Expand Up @@ -1058,7 +1079,10 @@ wal_write_batch(#?MODULE{cfg = #cfg{uid = UId,
truncate_cache(_FromIdx, ToIdx,
#?MODULE{cache = Cache} = State,
Effects) ->
{State#?MODULE{cache = ra_log_cache:trim(ToIdx, Cache)}, Effects}.
CacheAfter = ra_log_cache:trim(ToIdx, Cache),
% ?DEBUG("truncate cache at ~b range after trim ~p",
% [ToIdx, ra_log_cache:range(CacheAfter)]),
{State#?MODULE{cache = CacheAfter}, Effects}.

maybe_append_first_entry(State0 = #?MODULE{last_index = -1}) ->
State = append({0, 0, undefined}, State0),
Expand Down
7 changes: 5 additions & 2 deletions src/ra_log_cache.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ init() ->
-spec reset(state()) -> state().
reset(#?MODULE{range = undefined} = State) ->
State;
reset(#?MODULE{tbl = Tid,
cache = _Cache} = State) ->
reset(#?MODULE{tbl = Tid} = State) ->
% ?DEBUG("RA LOG CACHE RESET ~p", [Range]),
true = ets:delete_all_objects(Tid),
State#?MODULE{cache = #{},
range = undefined}.
Expand Down Expand Up @@ -133,6 +133,9 @@ trim(To, #?MODULE{tbl = Tid,
NewRange = {To + 1, RangeTo},
State#?MODULE{range = NewRange,
cache = cache_without(From, To, Cache, Tid)};
trim(To, #?MODULE{range = {From, _RangeTo}} = State)
when To < From ->
State;
trim(_To, State) ->
reset(State).

Expand Down
12 changes: 12 additions & 0 deletions src/ra_log_wal.erl
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,11 @@ write_data({UId, _} = Id, Idx, Term, Data0, Trunc,

handle_msg({append, {UId, Pid} = Id, Idx, Term, Entry},
#state{writers = Writers} = State0) ->
SnapIdx = snap_idx(UId),
case maps:find(UId, Writers) of
_ when Idx =< SnapIdx ->
%% a snapshot already exists that is higher - just drop the write
State0#state{writers = Writers#{UId => {in_seq, SnapIdx}}};
{ok, {_, PrevIdx}} when Idx =< PrevIdx + 1 ->
write_data(Id, Idx, Term, Entry, false, State0);
error ->
Expand Down Expand Up @@ -1004,3 +1008,11 @@ table_start(false, Idx, TblStart) ->
min(TblStart, Idx);
table_start(true, Idx, _TblStart) ->
Idx.

snap_idx(ServerUId) ->
case ets:lookup(ra_log_snapshot_state, ServerUId) of
[{_, SnapIdx}] ->
SnapIdx;
[] ->
-1
end.
5 changes: 4 additions & 1 deletion src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1899,7 +1899,7 @@ make_rpcs_for(Peers, State) ->
make_rpc_effect(PeerId, Peer, MaxBatchSize, State) ->
make_rpc_effect(PeerId, Peer, MaxBatchSize, State, []).

make_rpc_effect(PeerId, #{next_index := Next}, MaxBatchSize,
make_rpc_effect(PeerId, #{next_index := Next} = Peer, MaxBatchSize,
#{cfg := #cfg{id = Id}, log := Log0,
current_term := Term} = State, EntryCache) ->
PrevIdx = Next - 1,
Expand All @@ -1912,6 +1912,8 @@ make_rpc_effect(PeerId, #{next_index := Next}, MaxBatchSize,
{undefined, Log} ->
% The assumption here is that a missing entry means we need
% to send a snapshot.
?DEBUG("need to send snapshot idx ~p not found in ~p log ov ~p",
[PrevIdx, Peer, ra_log:overview(Log)]),
case ra_log:snapshot_index_term(Log) of
{PrevIdx, PrevTerm} ->
% Previous index is the same as snapshot index
Expand All @@ -1920,6 +1922,7 @@ make_rpc_effect(PeerId, #{next_index := Next}, MaxBatchSize,
State#{log => Log},
EntryCache);
{LastIdx, _} ->
?DEBUG("snapshot idx ~b", [LastIdx]),
SnapState = ra_log:snapshot_state(Log),
%% don't increment the next index here as we will do
%% that once the snapshot is fully replicated
Expand Down
25 changes: 24 additions & 1 deletion test/ra_log_wal_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ all_tests() ->
recover_with_last_entry_corruption_pre_allocate,
checksum_failure_in_middle_of_file_should_fail,
recover_with_partial_last_entry,
sys_get_status
sys_get_status,
drop_writes_if_snapshot_has_higher_index
].

groups() ->
Expand Down Expand Up @@ -108,6 +109,7 @@ init_per_testcase(TestCase, Config) ->
max_size_bytes => ?MAX_SIZE_BYTES},
_ = ets:new(ra_open_file_metrics, [named_table, public, {write_concurrency, true}]),
_ = ets:new(ra_io_metrics, [named_table, public, {write_concurrency, true}]),
_ = ets:new(ra_log_snapshot_state, [named_table, public, {write_concurrency, true}]),
[{ra_log_ets, Ets},
{writer_id, {UId, self()}},
{test_case, TestCase},
Expand Down Expand Up @@ -872,6 +874,27 @@ checksum_failure_in_middle_of_file_should_fail(Config) ->
meck:unload(),
ok.

drop_writes_if_snapshot_has_higher_index(Config) ->
ok = logger:set_primary_config(level, all),
Conf = ?config(wal_conf, Config),
{UId, _} = WriterId = ?config(writer_id, Config),
{ok, Pid} = ra_log_wal:start_link(Conf),
{ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 12, 1, "value"),
{12, 1, "value"} = await_written(WriterId, {12, 12, 1}),
{ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 13, 1, "value2"),
{13, 1, "value2"} = await_written(WriterId, {13, 13, 1}),

ets:insert(ra_log_snapshot_state, {UId, 20}),
{ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 14, 1, "value2"),
timer:sleep(500),

undefined = mem_tbl_read(UId, 14),
ra_lib:dump(ets:tab2list(ra_log_open_mem_tables)),
proc_lib:stop(Pid),
[{_, _, _, Tid}] = ets:lookup(ra_log_open_mem_tables, UId),
?assert(not ets:info(Tid, compressed)),
ok.

empty_mailbox() ->
receive
_ ->
Expand Down

0 comments on commit a2541b1

Please sign in to comment.