Skip to content

Commit

Permalink
Add periodic check for commands stuck in cache
Browse files Browse the repository at this point in the history
  • Loading branch information
kjnilsson committed Apr 19, 2024
1 parent 90e7cf0 commit 1e0dede
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 60 deletions.
47 changes: 39 additions & 8 deletions src/ra_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@

% external reader
register_reader/2,
readers/1
readers/1,
tick/2
]).

-include("ra.hrl").
Expand Down Expand Up @@ -106,6 +107,7 @@
% index specified
cache = ra_log_cache:init() :: ra_log_cache:state(),
last_resend_time :: option({integer(), WalPid :: pid() | undefined}),
last_wal_write :: {pid(), Ms :: ra_machine:milliseconds()},
reader :: ra_log_reader:state(),
readers = [] :: [pid()]
}).
Expand Down Expand Up @@ -234,7 +236,8 @@ init(#{uid := UId,
first_index = max(SnapIdx + 1, FirstIdx),
last_index = max(SnapIdx, LastIdx0),
reader = Reader,
snapshot_state = SnapshotState
snapshot_state = SnapshotState,
last_wal_write = {whereis(Wal), now_ms()}
},
put_counter(Cfg, ?C_RA_SVR_METRIC_SNAPSHOT_INDEX, SnapIdx),
LastIdx = State000#?MODULE.last_index,
Expand Down Expand Up @@ -736,6 +739,24 @@ flush_cache(#?MODULE{cache = Cache} = State) ->
needs_cache_flush(#?MODULE{cache = Cache}) ->
ra_log_cache:needs_flush(Cache).

-spec tick(Now :: integer(), state()) -> state().
tick(Now, #?MODULE{cfg = #cfg{wal = Wal},
cache = Cache,
last_written_index_term = {LastWrittenIdx, _},
last_wal_write = {WalPid, Ms}} = State) ->
CurWalPid = whereis(Wal),
case Now > Ms + 5000 andalso
CurWalPid =/= undefined andalso
CurWalPid =/= WalPid andalso
ra_log_cache:size(Cache) > 0 of
true ->
%% the wal has restarted, it has been at least 5s and there are
%% cached items, resend them
resend_from(LastWrittenIdx + 1, State);
false ->
State
end.

suggest_snapshot0(SnapKind, Idx, Cluster, MacVersion, MacState, State0) ->
ClusterServerIds = maps:map(fun (_, V) ->
maps:with([voter_status], V)
Expand Down Expand Up @@ -830,6 +851,7 @@ overview(#?MODULE{last_index = LastIndex,
last_written_index_term = LWIT,
snapshot_state = SnapshotState,
reader = Reader,
last_wal_write = {_LastPid, LastMs},
cache = Cache}) ->
#{type => ?MODULE,
last_index => LastIndex,
Expand All @@ -846,7 +868,8 @@ overview(#?MODULE{last_index = LastIndex,
undefined -> undefined;
{I, _} -> I
end,
cache_size => ra_log_cache:size(Cache)
cache_size => ra_log_cache:size(Cache),
last_wal_write => LastMs
}.

-spec write_config(ra_server:config(), state()) -> ok.
Expand Down Expand Up @@ -964,10 +987,11 @@ wal_truncate_write(#?MODULE{cfg = #cfg{uid = UId,
% we need to indicate to the WAL that this may be a non-contiguous write
% and that prior entries should be considered stale
case ra_log_wal:truncate_write({UId, self()}, Wal, Idx, Term, Cmd) of
ok ->
{ok, Pid} ->
ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, 1),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx),
State#?MODULE{last_index = Idx, last_term = Term,
last_wal_write = {Pid, now_ms()},
cache = ra_log_cache:add(Entry, Cache)};
{error, wal_down} ->
error(wal_down)
Expand All @@ -978,10 +1002,11 @@ wal_write(#?MODULE{cfg = #cfg{uid = UId,
cache = Cache} = State,
{Idx, Term, Cmd} = Entry) ->
case ra_log_wal:write({UId, self()}, Wal, Idx, Term, Cmd) of
ok ->
{ok, Pid} ->
ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, 1),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx),
State#?MODULE{last_index = Idx, last_term = Term,
last_wal_write = {Pid, now_ms()},
cache = ra_log_cache:add(Entry, Cache)};
{error, wal_down} ->
error(wal_down)
Expand All @@ -993,11 +1018,13 @@ wal_rewrite(#?MODULE{cfg = #cfg{uid = UId,
wal = Wal} = Cfg} = State,
{Idx, Term, Cmd}) ->
case ra_log_wal:write({UId, self()}, Wal, Idx, Term, Cmd) of
ok ->
{ok, Pid} ->
ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, 1),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx),
State#?MODULE{last_index = Idx,
last_term = Term};
last_term = Term,
last_wal_write = {Pid, now_ms()}
};
{error, wal_down} ->
error(wal_down)
end.
Expand All @@ -1016,11 +1043,12 @@ wal_write_batch(#?MODULE{cfg = #cfg{uid = UId,

[{_, _, LastIdx, LastTerm, _} | _] = WalCommands,
case ra_log_wal:write_batch(Wal, lists:reverse(WalCommands)) of
ok ->
{ok, Pid} ->
ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, Num),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, LastIdx),
State#?MODULE{last_index = LastIdx,
last_term = LastTerm,
last_wal_write = {Pid, now_ms()},
cache = Cache};
{error, wal_down} ->
error(wal_down)
Expand Down Expand Up @@ -1221,6 +1249,9 @@ last_index_term_in_wal(Idx, #?MODULE{reader = Reader0} = State) ->
{Idx, Term, State#?MODULE{reader = Reader}}
end.

now_ms() ->
erlang:system_time(millisecond).

%%%% TESTS

-ifdef(TEST).
Expand Down
12 changes: 7 additions & 5 deletions src/ra_log_wal.erl
Original file line number Diff line number Diff line change
Expand Up @@ -149,19 +149,20 @@
{call, from(), wal_command()}.

-spec write(writer_id(), atom() | pid(), ra_index(), ra_term(), term()) ->
ok | {error, wal_down}.
{ok, pid()} | {error, wal_down}.
write(From, Wal, Idx, Term, Cmd) ->
named_cast(Wal, {append, From, Idx, Term, Cmd}).

-spec truncate_write(writer_id(), atom(), ra_index(), ra_term(), term()) ->
ok | {error, wal_down}.
{ok, pid()} | {error, wal_down}.
truncate_write(From, Wal, Idx, Term, Cmd) ->
named_cast(Wal, {truncate, From, Idx, Term, Cmd}).

-spec write_batch(Wal :: atom() | pid(), [wal_command()]) ->
ok | {error, wal_down}.
{ok, pid()} | {error, wal_down}.
write_batch(Wal, WalCommands) when is_pid(Wal) ->
gen_batch_server:cast_batch(Wal, WalCommands);
gen_batch_server:cast_batch(Wal, WalCommands),
{ok, Wal};
write_batch(Wal, WalCommands) when is_atom(Wal) ->
case whereis(Wal) of
undefined ->
Expand All @@ -171,7 +172,8 @@ write_batch(Wal, WalCommands) when is_atom(Wal) ->
end.

named_cast(To, Msg) when is_pid(To) ->
gen_batch_server:cast(To, Msg);
gen_batch_server:cast(To, Msg),
{ok, To};
named_cast(Wal, Msg) ->
case whereis(Wal) of
undefined ->
Expand Down
9 changes: 9 additions & 0 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
handle_aux/4,
handle_state_enter/2,
tick/1,
log_tick/1,
overview/1,
metrics/1,
is_new/1,
Expand Down Expand Up @@ -1464,6 +1465,14 @@ tick(#{cfg := #cfg{effective_machine_module = MacMod},
Now = erlang:system_time(millisecond),
ra_machine:tick(MacMod, Now, MacState).

-spec log_tick(ra_server_state()) -> effects().
log_tick(#{cfg := #cfg{},
log := Log0} = State) ->
Now = erlang:system_time(millisecond),
Log = ra_log:tick(Now, Log0),
State#{log => Log}.


-spec handle_state_enter(ra_state() | eol, ra_server_state()) ->
{ra_server_state() | eol, effects()}.
handle_state_enter(RaftState, #{cfg := #cfg{effective_machine_module = MacMod},
Expand Down
46 changes: 40 additions & 6 deletions test/ra_log_2_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ all() ->
all_tests() ->
[
resend_write,
resend_write_after_tick,
handle_overwrite,
receive_segment,
read_one,
Expand Down Expand Up @@ -564,7 +565,8 @@ resend_write(Config) ->
% simulate lost messages requiring the ra server to resend in flight
% writes
meck:new(ra_log_wal, [passthrough]),
meck:expect(ra_log_wal, write, fun (_, _, 10, _, _) -> ok;
meck:expect(ra_log_wal, write, fun (_, _, 10, _, _) ->
{ok, self()};
(A, B, C, D, E) ->
meck:passthrough([A, B, C, D, E])
end),
Expand Down Expand Up @@ -598,6 +600,34 @@ resend_write(Config) ->

ok.

resend_write_after_tick(Config) ->
meck:new(ra_log_wal, [passthrough]),
WalPid = whereis(ra_log_wal),
timer:sleep(100),
ct:pal("ra_log_init"),
Log0 = ra_log_init(Config),
{0, 0} = ra_log:last_index_term(Log0),
ct:pal("appending"),
meck:expect(ra_log_wal, write, fun (_, _, _, _, _) ->
{ok, WalPid}
end),
Log1 = ra_log:append({1, 2, banana}, Log0),
%% this append should be lost
meck:unload(ra_log_wal),
%% restart wal to get a new wal pid so that the ra_log detects on tick
%% that the walhas changed
ct:pal("restart wal"),
restart_wal(),

Ms = erlang:system_time(millisecond) + 5001,
Log2 = ra_log:tick(Ms, Log1),
Log = assert_log_events(Log2, fun (L) ->
{1, 2} == ra_log:last_written(L)
end),
ct:pal("overvew ~p", [ra_log:overview(Log)]),
ra_log:close(Log),
ok.

wal_crash_recover(Config) ->
Log0 = ra_log_init(Config, #{resend_window => 1}),
Log1 = write_n(1, 50, 2, Log0),
Expand Down Expand Up @@ -660,17 +690,14 @@ detect_lost_written_range(Config) ->
end),
% WAL rolls over and WAL file is deleted
% simulate wal outage
meck:expect(ra_log_wal, write, fun (_, _, _, _, _) -> ok end),
meck:expect(ra_log_wal, write, fun (_, _, _, _, _) -> {ok, self()} end),

% append some messages that will be lost
Log3 = append_n(10, 15, 2, Log2),

% restart WAL to ensure lose the transient state keeping track of
% each writer's last written index
[SupPid] = [P || {ra_log_wal_sup, P, _, _}
<- supervisor:which_children(ra_log_sup)],
ok = supervisor:terminate_child(SupPid, ra_log_wal),
{ok, _} = supervisor:restart_child(SupPid, ra_log_wal),
restart_wal(),

% WAL recovers
meck:unload(ra_log_wal),
Expand Down Expand Up @@ -1397,3 +1424,10 @@ ra_log_init(Config, Cfg0) ->
ra_log_take(From, To, Log0) ->
{Acc, Log} = ra_log:fold(From, To, fun (E, Acc) -> [E | Acc] end, [], Log0),
{lists:reverse(Acc), Log}.

restart_wal() ->
[SupPid] = [P || {ra_log_wal_sup, P, _, _}
<- supervisor:which_children(ra_log_sup)],
ok = supervisor:terminate_child(SupPid, ra_log_wal),
{ok, _} = supervisor:restart_child(SupPid, ra_log_wal),
ok.
Loading

0 comments on commit 1e0dede

Please sign in to comment.