Skip to content

Commit

Permalink
todos
Browse files Browse the repository at this point in the history
  • Loading branch information
kjnilsson committed Oct 18, 2024
1 parent decba1b commit cab911b
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 36 deletions.
85 changes: 49 additions & 36 deletions src/ra_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -326,19 +326,57 @@ commit_tx(#?MODULE{tx = false} = State) ->

-spec append(Entry :: log_entry(), State :: state()) ->
state() | no_return().
append({Idx, _, _Cmd} = Entry,
#?MODULE{last_index = LastIdx,
append({Idx, Term, Cmd0} = Entry,
#?MODULE{cfg = #cfg{uid = UId,
wal = Wal} =Cfg,
last_index = LastIdx,
tx = false,
snapshot_state = _SnapState} = State0)
mem_table = Mt0} = State)
when Idx =:= LastIdx + 1 ->
wal_write(State0, Entry);
case ra_mt:insert(Entry, Mt0) of
{ok, Mt} ->
Cmd = {ttb, term_to_iovec(Cmd0)},
case ra_log_wal:write(Wal, {UId, self()}, ra_mt:tid(Mt),
Idx, Term, Cmd) of
{ok, Pid} ->
ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, 1),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx),
State#?MODULE{last_index = Idx,
last_term = Term,
last_wal_write = {Pid, now_ms()},
mem_table = Mt};
{error, wal_down} ->
error(wal_down)
end;
{error, overwriting} ->
?DEBUG("~ts: mem table overwrite detected appending index ~b, "
"opening new mem table",
[Cfg#cfg.log_id, Idx]),
%% TODO: do we need error handling here -
%% this function uses the infinity timeout
{ok, M0} = ra_log_ets:new_mem_table_please(Cfg#cfg.names,
Cfg#cfg.uid, Mt0),
append(Entry, State#?MODULE{mem_table = M0})
end;
append({Idx, Term, _Cmd} = Entry,
#?MODULE{cfg = Cfg,
last_index = LastIdx,
tx = true,
mem_table = Mt0} = State)
when Idx =:= LastIdx + 1 ->
{ok, Mt} = ra_mt:stage(Entry, Mt0),
Mt = case ra_mt:stage(Entry, Mt0) of
{ok, M} ->
M;
{error, overwriting} ->
?DEBUG("~ts: mem table overwrite detected appending index ~b, "
"opening new mem table",
[Cfg#cfg.log_id, Idx]),
%% TODO: do we need error handling here -
%% this function uses the infinity timeout
{ok, M0} = ra_log_ets:new_mem_table_please(Cfg#cfg.names,
Cfg#cfg.uid, Mt0),
append(Entry, State#?MODULE{mem_table = M0})
end,
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, LastIdx),
State#?MODULE{last_index = Idx,
last_term = Term,
Expand Down Expand Up @@ -513,8 +551,7 @@ handle_event({written, Term, {FromIdx, ToIdx}},
#?MODULE{cfg = Cfg,
last_written_index_term = {LastWrittenIdx0,
_LastWrittenTerm0},
first_index = FirstIdx,
snapshot_state = _SnapState} = State0)
first_index = FirstIdx} = State0)
when FromIdx =< LastWrittenIdx0 + 1 ->
% We need to ignore any written events for the same index
% but in a prior term if we do not we may end up confirming
Expand Down Expand Up @@ -814,12 +851,9 @@ tick(Now, #?MODULE{cfg = #cfg{wal = Wal},
CurWalPid = whereis(Wal),
MtRange = ra_mt:range(Mt),
case Now > Ms + ?WAL_RESEND_TIMEOUT andalso
CurWalPid =/= undefined andalso
CurWalPid =/= WalPid andalso
(is_tuple(MtRange) andalso
LastWrittenIdx < element(2, MtRange))
%% TODO: mt: should this be resend if mt range end is higher than
%% last written?
is_pid(CurWalPid) andalso
CurWalPid =/= WalPid andalso
ra_range:in(LastWrittenIdx, MtRange)
of
true ->
%% the wal has restarted, it has been at least 5s and there are
Expand Down Expand Up @@ -1071,28 +1105,6 @@ delete_segments(SnapIdx, #?MODULE{cfg = #cfg{log_id = LogId,
{State, log_update_effects(Readers, Pid, State)}
end.

wal_write(#?MODULE{cfg = #cfg{uid = UId,
wal = Wal} = Cfg,
mem_table = Mt0} = State,
{Idx, Term, Cmd0} = Entry) ->
Cmd = {ttb, term_to_iovec(Cmd0)},
%% TODO: mt: handle the overwrite case
{ok, Mt} = ra_mt:insert(Entry, Mt0),
case ra_log_wal:write(Wal, {UId, self()}, ra_mt:tid(Mt),
Idx, Term, Cmd) of
{ok, Pid} ->
ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, 1),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx),
State#?MODULE{last_index = Idx,
last_term = Term,
last_wal_write = {Pid, now_ms()},
mem_table = Mt};
{error, wal_down} ->
%% TODO: mt: if we get there the entry has already been inserted
%% into the mem table
error(wal_down)
end.

%% unly used by resend to wal functionality and doesn't update the mem table
wal_rewrite(#?MODULE{cfg = #cfg{uid = UId,
wal = Wal} = Cfg} = State,
Expand Down Expand Up @@ -1199,7 +1211,8 @@ stage_entries(Cfg, [Entry | Rem] = Entries, Mt0) ->
{error, overwriting} ->
?DEBUG("~ts: mem table overwrite detected, opening new mem table",
[Cfg#cfg.log_id]),
%% TODO: mt: error handling
%% TODO: do we need error handling here - this function uses the infinity
%% timeout
{ok, Mt} = ra_log_ets:new_mem_table_please(Cfg#cfg.names,
Cfg#cfg.uid, Mt0),
stage_entries(Cfg, Entries, Mt)
Expand Down
2 changes: 2 additions & 0 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,8 @@ handle_leader({commands, Cmds}, #{cfg := #cfg{id = Self,
{not_appended, wal_down, State0, Effects} ->
?WARN("~ts ~b commands NOT appended to Raft log. Reason: wal_down",
[LogId, length(Cmds)]),
%% TODO: abort transaction here?
%% TODO: select the peer with the higest match index?
CondEffs = case maps:to_list(maps:remove(Self, Cluster)) of
[] -> [];
[{PeerId, _} | _] ->
Expand Down
35 changes: 35 additions & 0 deletions test/ra_log_2_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ all_tests() ->
resend_write_lost_in_wal_crash,
resend_write_after_tick,
handle_overwrite,
handle_overwrite_append,
receive_segment,
read_one,
take_after_overwrite_and_init,
Expand Down Expand Up @@ -145,6 +146,40 @@ handle_overwrite(Config) ->
flush(),
ok.

handle_overwrite_append(Config) ->
%% this is a theoretical case where a follower has written some entries
%% then another leader advised to reset last index backwards, _then_
%% somehow the current follower become leader
Log0 = ra_log_init(Config),
{ok, Log1} = ra_log:write([{1, 1, "value"},
{2, 1, "value"}], Log0),
receive
{ra_log_event, {written, 1, {1, 2}}} -> ok
after 2000 ->
flush(),
exit(written_timeout)
end,
{ok, Log2} = ra_log:set_last_index(1, Log1),
{0, 0} = ra_log:last_written(Log2),
{1, 1} = ra_log:last_index_term(Log2),
Log3 = ra_log:append({2, 3, "value"}, Log2),
{2, 3} = ra_log:last_index_term(Log3),
% ensure immediate truncation
Log4 = ra_log:append({3, 3, "value"}, Log3),
{3, 3} = ra_log:last_index_term(Log4),
% simulate the first written event coming after index has already
% been written in a new term
{Log, _} = ra_log:handle_event({written, 1, {1, 2}}, Log4),
% ensure last written has not been incremented
{1, 1} = ra_log:last_written(Log),
{3, 3} = ra_log:last_written(
element(1, ra_log:handle_event({written, 3, {2, 3}}, Log))),
ok = ra_log_wal:force_roll_over(ra_log_wal),
_ = deliver_all_log_events(Log, 100),
ra_log:close(Log),
flush(),
ok.

receive_segment(Config) ->
Log0 = ra_log_init(Config),
% write a few entries
Expand Down

0 comments on commit cab911b

Please sign in to comment.