diff --git a/src/ra_log.erl b/src/ra_log.erl index 42de14dd..091a4edf 100644 --- a/src/ra_log.erl +++ b/src/ra_log.erl @@ -326,19 +326,57 @@ commit_tx(#?MODULE{tx = false} = State) -> -spec append(Entry :: log_entry(), State :: state()) -> state() | no_return(). -append({Idx, _, _Cmd} = Entry, - #?MODULE{last_index = LastIdx, +append({Idx, Term, Cmd0} = Entry, + #?MODULE{cfg = #cfg{uid = UId, + wal = Wal} =Cfg, + last_index = LastIdx, tx = false, - snapshot_state = _SnapState} = State0) + mem_table = Mt0} = State) when Idx =:= LastIdx + 1 -> - wal_write(State0, Entry); + case ra_mt:insert(Entry, Mt0) of + {ok, Mt} -> + Cmd = {ttb, term_to_iovec(Cmd0)}, + case ra_log_wal:write(Wal, {UId, self()}, ra_mt:tid(Mt), + Idx, Term, Cmd) of + {ok, Pid} -> + ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, 1), + put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx), + State#?MODULE{last_index = Idx, + last_term = Term, + last_wal_write = {Pid, now_ms()}, + mem_table = Mt}; + {error, wal_down} -> + error(wal_down) + end; + {error, overwriting} -> + ?DEBUG("~ts: mem table overwrite detected appending index ~b, " + "opening new mem table", + [Cfg#cfg.log_id, Idx]), + %% TODO: do we need error handling here - + %% this function uses the infinity timeout + {ok, M0} = ra_log_ets:new_mem_table_please(Cfg#cfg.names, + Cfg#cfg.uid, Mt0), + append(Entry, State#?MODULE{mem_table = M0}) + end; append({Idx, Term, _Cmd} = Entry, #?MODULE{cfg = Cfg, last_index = LastIdx, tx = true, mem_table = Mt0} = State) when Idx =:= LastIdx + 1 -> - {ok, Mt} = ra_mt:stage(Entry, Mt0), + Mt = case ra_mt:stage(Entry, Mt0) of + {ok, M} -> + M; + {error, overwriting} -> + ?DEBUG("~ts: mem table overwrite detected appending index ~b, " + "opening new mem table", + [Cfg#cfg.log_id, Idx]), + %% TODO: do we need error handling here - + %% this function uses the infinity timeout + {ok, M0} = ra_log_ets:new_mem_table_please(Cfg#cfg.names, + Cfg#cfg.uid, Mt0), + append(Entry, State#?MODULE{mem_table = M0}) + end, put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, LastIdx), State#?MODULE{last_index = Idx, last_term = Term, @@ -513,8 +551,7 @@ handle_event({written, Term, {FromIdx, ToIdx}}, #?MODULE{cfg = Cfg, last_written_index_term = {LastWrittenIdx0, _LastWrittenTerm0}, - first_index = FirstIdx, - snapshot_state = _SnapState} = State0) + first_index = FirstIdx} = State0) when FromIdx =< LastWrittenIdx0 + 1 -> % We need to ignore any written events for the same index % but in a prior term if we do not we may end up confirming @@ -814,12 +851,9 @@ tick(Now, #?MODULE{cfg = #cfg{wal = Wal}, CurWalPid = whereis(Wal), MtRange = ra_mt:range(Mt), case Now > Ms + ?WAL_RESEND_TIMEOUT andalso - CurWalPid =/= undefined andalso - CurWalPid =/= WalPid andalso - (is_tuple(MtRange) andalso - LastWrittenIdx < element(2, MtRange)) - %% TODO: mt: should this be resend if mt range end is higher than - %% last written? + is_pid(CurWalPid) andalso + CurWalPid =/= WalPid andalso + ra_range:in(LastWrittenIdx, MtRange) of true -> %% the wal has restarted, it has been at least 5s and there are @@ -1071,28 +1105,6 @@ delete_segments(SnapIdx, #?MODULE{cfg = #cfg{log_id = LogId, {State, log_update_effects(Readers, Pid, State)} end. -wal_write(#?MODULE{cfg = #cfg{uid = UId, - wal = Wal} = Cfg, - mem_table = Mt0} = State, - {Idx, Term, Cmd0} = Entry) -> - Cmd = {ttb, term_to_iovec(Cmd0)}, - %% TODO: mt: handle the overwrite case - {ok, Mt} = ra_mt:insert(Entry, Mt0), - case ra_log_wal:write(Wal, {UId, self()}, ra_mt:tid(Mt), - Idx, Term, Cmd) of - {ok, Pid} -> - ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, 1), - put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx), - State#?MODULE{last_index = Idx, - last_term = Term, - last_wal_write = {Pid, now_ms()}, - mem_table = Mt}; - {error, wal_down} -> - %% TODO: mt: if we get there the entry has already been inserted - %% into the mem table - error(wal_down) - end. - %% unly used by resend to wal functionality and doesn't update the mem table wal_rewrite(#?MODULE{cfg = #cfg{uid = UId, wal = Wal} = Cfg} = State, @@ -1199,7 +1211,8 @@ stage_entries(Cfg, [Entry | Rem] = Entries, Mt0) -> {error, overwriting} -> ?DEBUG("~ts: mem table overwrite detected, opening new mem table", [Cfg#cfg.log_id]), - %% TODO: mt: error handling + %% TODO: do we need error handling here - this function uses the infinity + %% timeout {ok, Mt} = ra_log_ets:new_mem_table_please(Cfg#cfg.names, Cfg#cfg.uid, Mt0), stage_entries(Cfg, Entries, Mt) diff --git a/src/ra_server.erl b/src/ra_server.erl index d1e985a6..662d0f0a 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -589,6 +589,8 @@ handle_leader({commands, Cmds}, #{cfg := #cfg{id = Self, {not_appended, wal_down, State0, Effects} -> ?WARN("~ts ~b commands NOT appended to Raft log. Reason: wal_down", [LogId, length(Cmds)]), + %% TODO: abort transaction here? + %% TODO: select the peer with the higest match index? CondEffs = case maps:to_list(maps:remove(Self, Cluster)) of [] -> []; [{PeerId, _} | _] -> diff --git a/test/ra_log_2_SUITE.erl b/test/ra_log_2_SUITE.erl index 37f09ca7..a9477bd3 100644 --- a/test/ra_log_2_SUITE.erl +++ b/test/ra_log_2_SUITE.erl @@ -21,6 +21,7 @@ all_tests() -> resend_write_lost_in_wal_crash, resend_write_after_tick, handle_overwrite, + handle_overwrite_append, receive_segment, read_one, take_after_overwrite_and_init, @@ -145,6 +146,40 @@ handle_overwrite(Config) -> flush(), ok. +handle_overwrite_append(Config) -> + %% this is a theoretical case where a follower has written some entries + %% then another leader advised to reset last index backwards, _then_ + %% somehow the current follower become leader + Log0 = ra_log_init(Config), + {ok, Log1} = ra_log:write([{1, 1, "value"}, + {2, 1, "value"}], Log0), + receive + {ra_log_event, {written, 1, {1, 2}}} -> ok + after 2000 -> + flush(), + exit(written_timeout) + end, + {ok, Log2} = ra_log:set_last_index(1, Log1), + {0, 0} = ra_log:last_written(Log2), + {1, 1} = ra_log:last_index_term(Log2), + Log3 = ra_log:append({2, 3, "value"}, Log2), + {2, 3} = ra_log:last_index_term(Log3), + % ensure immediate truncation + Log4 = ra_log:append({3, 3, "value"}, Log3), + {3, 3} = ra_log:last_index_term(Log4), + % simulate the first written event coming after index has already + % been written in a new term + {Log, _} = ra_log:handle_event({written, 1, {1, 2}}, Log4), + % ensure last written has not been incremented + {1, 1} = ra_log:last_written(Log), + {3, 3} = ra_log:last_written( + element(1, ra_log:handle_event({written, 3, {2, 3}}, Log))), + ok = ra_log_wal:force_roll_over(ra_log_wal), + _ = deliver_all_log_events(Log, 100), + ra_log:close(Log), + flush(), + ok. + receive_segment(Config) -> Log0 = ra_log_init(Config), % write a few entries