diff --git a/src/ra_log.erl b/src/ra_log.erl index 6bc94161..fc7909e2 100644 --- a/src/ra_log.erl +++ b/src/ra_log.erl @@ -22,7 +22,6 @@ sparse_read/2, last_index_term/1, set_last_index/2, - reset_to_last_known_written/1, handle_event/2, last_written/1, fetch/2, @@ -36,7 +35,6 @@ update_release_cursor/5, checkpoint/5, promote_checkpoint/2, - needs_cache_flush/1, can_write/1, exists/2, @@ -65,10 +63,8 @@ -type ra_meta_key() :: atom(). -type segment_ref() :: {From :: ra_index(), To :: ra_index(), File :: string()}. --type event_body() :: {written, {From :: ra_index(), - To :: ra_index(), - ToTerm :: ra_term()}} | - {segments, ets:tid(), [segment_ref()]} | +-type event_body() :: {written, ra_term(), ra:range()} | + {segments, [{ets:tid(), ra:range()}], [segment_ref()]} | {resend_write, ra_index()} | {snapshot_written, ra_idxterm(), ra_snapshot:kind()} | {down, pid(), term()}. @@ -106,9 +102,6 @@ last_term = 0 :: ra_term(), last_written_index_term = {0, 0} :: ra_idxterm(), snapshot_state :: ra_snapshot:state(), - % if this is set a snapshot write is in progress for the - % index specified - cache = ra_log_cache:init() :: ra_log_cache:state(), last_resend_time :: option({integer(), WalPid :: pid() | undefined}), last_wal_write :: {pid(), Ms :: integer()}, reader :: ra_log_reader:state(), @@ -145,7 +138,7 @@ num_segments := non_neg_integer(), open_segments => non_neg_integer(), snapshot_index => undefined | ra_index(), - cache_size => non_neg_integer(), + mem_table_size => non_neg_integer(), latest_checkpoint_index => undefined | ra_index(), atom() => term()}. @@ -339,17 +332,18 @@ commit_tx(#?MODULE{tx = false} = State) -> append({Idx, _, _Cmd} = Entry, #?MODULE{last_index = LastIdx, tx = false, - snapshot_state = SnapState} = State0) + snapshot_state = _SnapState} = State0) when Idx =:= LastIdx + 1 -> - case ra_snapshot:current(SnapState) of - {SnapIdx, _} when Idx =:= SnapIdx + 1 -> - % it is the next entry after a snapshot - % we need to tell the wal to truncate as we - % are not going to receive any entries prior to the snapshot - wal_truncate_write(State0, Entry); - _ -> - wal_write(State0, Entry) - end; + wal_write(State0, Entry); + % case ra_snapshot:current(SnapState) of + % {SnapIdx, _} when Idx =:= SnapIdx + 1 -> + % % it is the next entry after a snapshot + % % we need to tell the wal to truncate as we + % % are not going to receive any entries prior to the snapshot + % wal_truncate_write(State0, Entry); + % _ -> + % wal_write(State0, Entry) + % end; append({Idx, Term, _Cmd} = Entry, #?MODULE{cfg = Cfg, last_index = LastIdx, @@ -371,25 +365,27 @@ append({Idx, _, _}, #?MODULE{last_index = LastIdx}) -> -spec write(Entries :: [log_entry()], State :: state()) -> {ok, state()} | {error, {integrity_error, term()} | wal_down}. -write([{FstIdx, _, _} = First | Rest] = Entries, +write([{FstIdx, _, _} = _First | _Rest] = Entries, #?MODULE{last_index = LastIdx, - snapshot_state = SnapState} = State00) - when FstIdx =< LastIdx + 1 andalso FstIdx >= 0 -> - case ra_snapshot:current(SnapState) of - {SnapIdx, _} when FstIdx =:= SnapIdx + 1 -> - % it is the next entry after a snapshot - % we need to tell the wal to truncate as we - % are not going to receive any entries prior to the snapshot - try wal_truncate_write(State00, First) of - State0 -> - % write the rest normally - write_entries(Rest, State0) - catch error:wal_down -> - {error, wal_down} - end; - _ -> - write_entries(Entries, State00) - end; + snapshot_state = _SnapState} = State00) + when FstIdx =< LastIdx + 1 andalso + FstIdx >= 0 -> + write_entries(Entries, State00); + % case ra_snapshot:current(SnapState) of + % {SnapIdx, _} when FstIdx =:= SnapIdx + 1 -> + % % it is the next entry after a snapshot + % % we need to tell the wal to truncate as we + % % are not going to receive any entries prior to the snapshot + % try wal_truncate_write(State00, First) of + % State0 -> + % % write the rest normally + % write_entries(Rest, State0) + % catch error:wal_down -> + % {error, wal_down} + % end; + % _ -> + % write_entries(Entries, State00) + % end; write([], State) -> {ok, State}; write([{Idx, _, _} | _], #?MODULE{cfg = #cfg{uid = UId}, @@ -404,7 +400,6 @@ write([{Idx, _, _} | _], #?MODULE{cfg = #cfg{uid = UId}, {Acc, state()} when Acc :: term(). fold(From0, To0, Fun, Acc0, #?MODULE{cfg = Cfg, - % cache = Cache, mem_table = Mt, first_index = FirstIdx, last_index = LastIdx, @@ -490,7 +485,6 @@ last_written(#?MODULE{last_written_index_term = LWTI}) -> -spec set_last_index(ra_index(), state()) -> {ok, state()} | {not_found, state()}. set_last_index(Idx, #?MODULE{cfg = Cfg, - cache = Cache0, last_written_index_term = {LWIdx0, _}} = State0) -> case fetch_term(Idx, State0) of {undefined, State} -> @@ -499,13 +493,15 @@ set_last_index(Idx, #?MODULE{cfg = Cfg, LWIdx = min(Idx, LWIdx0), {LWTerm, State2} = fetch_term(LWIdx, State1), %% this should always be found but still assert just in case + %% TODO: mt: if the index genuinely goes backwards here we could + %% open a successor memtable here already + %% but we'd need to initiate it with the last index somehow something + %% memtbl doesn't currently support true = LWTerm =/= undefined, - Cache = ra_log_cache:set_last(Idx, Cache0), put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx), put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, LWIdx), {ok, State2#?MODULE{last_index = Idx, last_term = Term, - cache = Cache, last_written_index_term = {LWIdx, LWTerm}}} end. @@ -513,32 +509,32 @@ set_last_index(Idx, #?MODULE{cfg = Cfg, %% the last know index to be written to the wal. %% This is only used after the wal has been detected down %% to try to avoid ever having to resend data to the wal --spec reset_to_last_known_written(state()) -> state(). -reset_to_last_known_written(#?MODULE{cfg = Cfg, - cache = Cache0, - last_index = LastIdx, - last_written_index_term = LW} = State0) -> - {Idx, Term, State} = last_index_term_in_wal(LastIdx, State0), - ?DEBUG("~ts ~s: index: ~b term: ~b: previous ~w", - [Cfg#cfg.log_id, ?FUNCTION_NAME, Idx, Term, LW]), - Cache = ra_log_cache:set_last(Idx, Cache0), - put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx), - put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, Idx), - State#?MODULE{last_index = Idx, - last_term = Term, - cache = Cache, - last_written_index_term = {Idx, Term}}. +% -spec reset_to_last_known_written(state()) -> state(). +% reset_to_last_known_written(#?MODULE{cfg = Cfg, +% cache = Cache0, +% last_index = LastIdx, +% last_written_index_term = LW} = State0) -> +% {Idx, Term, State} = last_index_term_in_wal(LastIdx, State0), +% ?DEBUG("~ts ~s: index: ~b term: ~b: previous ~w", +% [Cfg#cfg.log_id, ?FUNCTION_NAME, Idx, Term, LW]), +% Cache = ra_log_cache:set_last(Idx, Cache0), +% put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx), +% put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, Idx), +% State#?MODULE{last_index = Idx, +% last_term = Term, +% cache = Cache, +% last_written_index_term = {Idx, Term}}. -spec handle_event(event_body(), state()) -> {state(), [effect()]}. -handle_event({written, {FromIdx, _ToIdx, _Term}}, +handle_event({written, _Term, {FromIdx, _ToIdx}}, #?MODULE{last_index = LastIdx} = State) when FromIdx > LastIdx -> %% we must have reverted back, either by explicit reset or by a snapshot %% installation taking place whilst the WAL was processing the write %% Just drop the event in this case as it is stale {State, []}; -handle_event({written, {FromIdx, ToIdx0, Term}}, +handle_event({written, Term, {FromIdx, ToIdx0}}, #?MODULE{cfg = Cfg, last_written_index_term = {LastWrittenIdx0, LastWrittenTerm0}, @@ -585,7 +581,7 @@ handle_event({written, {FromIdx, ToIdx0, Term}}, [State#?MODULE.cfg#cfg.log_id, Term, ToIdx, OtherTerm]), {State, []} end; -handle_event({written, {FromIdx, _, _Term}}, +handle_event({written, _Term, {FromIdx, _}}, #?MODULE{cfg = #cfg{log_id = LogId}, last_written_index_term = {LastWrittenIdx, _}} = State) when FromIdx > LastWrittenIdx + 1 -> @@ -594,32 +590,21 @@ handle_event({written, {FromIdx, _, _Term}}, ?INFO("~ts: ra_log: written gap detected at ~b expected ~b!", [LogId, FromIdx, Expected]), {resend_from(Expected, State), []}; -% handle_event({truncate_cache, FromIdx, ToIdx}, State) -> -% truncate_cache(FromIdx, ToIdx, State, []); -handle_event(flush_cache, State) -> - {flush_cache(State), []}; -handle_event({segments, Tid, NewSegs}, +handle_event({segments, TidRanges, NewSegs}, #?MODULE{cfg = #cfg{log_id = _LogId, names = Names}, reader = Reader0, mem_table = Mt0, %% TODO: re-enable external log readers or drop unused feature? readers = _Readers } = State0) -> - % ClosedTables = ra_log_reader:closed_mem_tables(Reader0), - % Active = lists:takewhile(fun ({_, _, _, _, T}) -> T =/= Tid end, - % ClosedTables), Reader = ra_log_reader:update_segments(NewSegs, Reader0), - Mt = case NewSegs of - [] -> - Mt0; - [{_, LastSegIdx, _} | _] -> - % ?INFO("~ts: ra_log: setting first memtbl index ~b, segments ~p", - % [LogId, LastSegIdx+1, NewSegs]), - {Spec, Mt1} = ra_log_memtbl:set_first(LastSegIdx + 1, Tid, Mt0), - ra_log_ets:execute_delete(Names, Spec, Mt1), - % _ = ra_log_memtbl:delete(Spec, Mt1), - Mt1 - end, + Mt = lists:foldl( + fun ({Tid, Range}, Acc0) -> + {Spec, Acc} = ra_log_memtbl:record_flushed(Tid, Range, Acc0), + ct:pal("Spec ~p ~p", [Spec, ra_log_memtbl:info(Acc)]), + ok = ra_log_ets:execute_delete(Names, Spec, Acc), + Acc + end, Mt0, TidRanges), State = State0#?MODULE{reader = Reader, mem_table = Mt}, {State, []}; @@ -711,8 +696,7 @@ handle_event({snapshot_written, {Idx, Term} = Snap, SnapKind}, Snap}], {State0, Effects}; handle_event({resend_write, Idx}, State) -> - % resend missing entries from cache. - % The assumption is they are available in the cache + % resend missing entries from mem tables. {resend_from(Idx, State), []}; handle_event({down, Pid, _Info}, #?MODULE{readers = Readers} = @@ -776,14 +760,11 @@ install_snapshot({SnapIdx, _} = IdxTerm, SnapState0, %% is writing at the same time? {Spec, Mt} = ra_log_memtbl:set_first(SnapIdx, Mt0), delete_mem_table(Names, Spec, Mt), - % _ = ra_log_memtbl:delete(Spec, Mt), {State#?MODULE{snapshot_state = SnapState, first_index = SnapIdx + 1, last_index = SnapIdx, mem_table = Mt, %% TODO: update last_term too? - %% cache can be reset - % cache = ra_log_cache:reset(Cache), last_written_index_term = IdxTerm}, Effs ++ CPEffects}. @@ -845,18 +826,8 @@ promote_checkpoint(Idx, #?MODULE{cfg = Cfg, {State#?MODULE{snapshot_state = SnapState}, Effects} end. --spec flush_cache(state()) -> state(). -flush_cache(#?MODULE{cache = Cache} = State) -> - State#?MODULE{cache = ra_log_cache:flush(Cache)}. - --spec needs_cache_flush(state()) -> boolean(). -needs_cache_flush(#?MODULE{}) -> - false. - % ra_log_cache:needs_flush(Cache). - -spec tick(Now :: integer(), state()) -> state(). tick(Now, #?MODULE{cfg = #cfg{wal = Wal}, - % cache = Cache, mem_table = Mt, last_written_index_term = {LastWrittenIdx, _}, last_wal_write = {WalPid, Ms}} = State) -> @@ -869,7 +840,6 @@ tick(Now, #?MODULE{cfg = #cfg{wal = Wal}, LastWrittenIdx < element(2, MtRange)) %% TODO: mt: should this be resend if mt range end is higher than %% last written? - % ra_log_cache:size(Cache) > 0 of true -> %% the wal has restarted, it has been at least 5s and there are @@ -991,8 +961,6 @@ overview(#?MODULE{last_index = LastIndex, undefined -> undefined; {I, _} -> I end, - cache_size => 0, - cache_range => undefined, mem_table_range => ra_log_memtbl:range(Mt), last_wal_write => LastMs }. @@ -1122,47 +1090,51 @@ delete_segments(SnapIdx, #?MODULE{cfg = #cfg{log_id = LogId, {State, log_update_effects(Readers, Pid, State)} end. -wal_truncate_write(#?MODULE{cfg = #cfg{uid = UId, - wal = Wal} = Cfg, - mem_table = Mt0} = State, - {Idx, Term, Cmd0} = Entry) -> - % this is the next write after a snapshot was taken or received - % we need to indicate to the WAL that this may be a non-contiguous write - % and that prior entries should be considered stale - Cmd = {ttb, term_to_iovec(Cmd0)}, - case ra_log_wal:truncate_write({UId, self()}, Wal, Idx, Term, Cmd) of - {ok, Pid} -> - ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, 1), - put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx), - State#?MODULE{last_index = Idx, last_term = Term, - last_wal_write = {Pid, now_ms()}, - mem_table = ra_log_memtbl:insert(Entry, Mt0)}; - {error, wal_down} -> - error(wal_down) - end. +% wal_truncate_write(#?MODULE{cfg = #cfg{uid = UId, +% wal = Wal} = Cfg, +% mem_table = Mt0} = State, +% {Idx, Term, Cmd0} = Entry) -> +% % this is the next write after a snapshot was taken or received +% % we need to indicate to the WAL that this may be a non-contiguous write +% % and that prior entries should be considered stale +% Cmd = {ttb, term_to_iovec(Cmd0)}, +% case ra_log_wal:truncate_write({UId, self()}, Wal, Idx, Term, Cmd) of +% {ok, Pid} -> +% ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, 1), +% put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx), +% State#?MODULE{last_index = Idx, last_term = Term, +% last_wal_write = {Pid, now_ms()}, +% mem_table = ra_log_memtbl:insert(Entry, Mt0)}; +% {error, wal_down} -> +% error(wal_down) +% end. wal_write(#?MODULE{cfg = #cfg{uid = UId, wal = Wal} = Cfg, - mem_table = Mt} = State, + mem_table = Mt0} = State, {Idx, Term, Cmd0} = Entry) -> Cmd = {ttb, term_to_iovec(Cmd0)}, - case ra_log_wal:write({UId, self()}, Wal, Idx, Term, Cmd) of + %% TODO: mt: handle the overwrite case + {ok, Mt} = ra_log_memtbl:insert(Entry, Mt0), + case ra_log_wal:write(Wal, {UId, self()}, ra_log_memtbl:tid(Mt), + Idx, Term, Cmd) of {ok, Pid} -> ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, 1), put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx), State#?MODULE{last_index = Idx, last_term = Term, last_wal_write = {Pid, now_ms()}, - mem_table = ra_log_memtbl:insert(Entry, Mt)}; + mem_table = Mt}; {error, wal_down} -> + %% TODO: mt: if we get there the entry has already been inserted + %% into the mem table error(wal_down) end. -%% unly used by resend to wal functionality and doesn't set the cache as it -%% is already set +%% unly used by resend to wal functionality and doesn't update the mem table wal_rewrite(#?MODULE{cfg = #cfg{uid = UId, wal = Wal} = Cfg} = State, - {Idx, Term, Cmd}) -> - case ra_log_wal:write({UId, self()}, Wal, Idx, Term, Cmd) of + Tid, {Idx, Term, Cmd}) -> + case ra_log_wal:write(Wal, {UId, self()}, Tid, Idx, Term, Cmd) of {ok, Pid} -> ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, 1), put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx), @@ -1179,38 +1151,36 @@ wal_write_batch(#?MODULE{cfg = #cfg{uid = UId, mem_table = Mt0} = State, Entries) -> WriterId = {UId, self()}, - {WalCommands, Num, Mt1} = - lists:foldl(fun ({Idx, Term, Cmd0} = Entry, {WC, N, M0}) -> + Tid = ra_log_memtbl:tid(Mt0), + {WalCommands, Num} = + lists:foldl(fun ({Idx, Term, Cmd0}, {WC, N}) -> Cmd = {ttb, term_to_iovec(Cmd0)}, - WalC = {append, WriterId, Idx, Term, Cmd}, - M = ra_log_memtbl:stage(Entry, M0), - {[WalC | WC], N+1, M} - end, {[], 0, Mt0}, Entries), + WalC = {append, WriterId, Tid, Idx, Term, Cmd}, + % M = ra_log_memtbl:stage(Entry, M0), + {[WalC | WC], N+1} + end, {[], 0}, Entries), - [{_, _, LastIdx, LastTerm, _} | _] = WalCommands, - {_, Mt} = ra_log_memtbl:commit(Mt1), + [{_, _, _, LastIdx, LastTerm, _} | _] = WalCommands, + {_, Mt} = ra_log_memtbl:commit(Mt0), case ra_log_wal:write_batch(Wal, lists:reverse(WalCommands)) of {ok, Pid} -> ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, Num), put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, LastIdx), - State#?MODULE{last_index = LastIdx, - last_term = LastTerm, - last_wal_write = {Pid, now_ms()}, - mem_table = Mt}; - {error, wal_down} -> - error(wal_down) + {ok, State#?MODULE{last_index = LastIdx, + last_term = LastTerm, + last_wal_write = {Pid, now_ms()}, + mem_table = Mt}}; + {error, wal_down} = Err -> + %% TODO: mt: if we get there the entry has already been inserted + %% into the mem table but never reached the wal + %% consider ra_log_memtbl:abort(Entries, Mt) + Err end. -% truncate_cache(_FromIdx, ToIdx, -% #?MODULE{cache = Cache} = State, -% Effects) -> -% CacheAfter = ra_log_cache:trim(ToIdx, Cache), -% {State#?MODULE{cache = CacheAfter}, Effects}. - maybe_append_first_entry(State0 = #?MODULE{last_index = -1}) -> State = append({0, 0, undefined}, State0), receive - {ra_log_event, {written, {0, 0, 0}}} -> ok + {ra_log_event, {written, 0, {0, 0}}} -> ok end, State#?MODULE{first_index = 0, last_written_index_term = {0, 0}}; @@ -1236,7 +1206,8 @@ resend_from0(Idx, #?MODULE{cfg = Cfg, ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_RESENDS, LastIdx - Idx + 1), lists:foldl(fun (I, Acc) -> {I, T, C} = ra_log_memtbl:lookup(I, Mt), - wal_rewrite(Acc, {I, T, C}) + Tid = ra_log_memtbl:tid_for(I, T, Mt), + wal_rewrite(Acc, Tid, {I, T, C}) end, State#?MODULE{last_resend_time = {erlang:system_time(seconds), whereis(Cfg#cfg.wal)}}, @@ -1254,32 +1225,54 @@ resend_from0(Idx, #?MODULE{last_resend_time = {LastResend, WalPid}, State end. -verify_entries(_, []) -> - ok; -verify_entries(Idx, [{NextIdx, _, _} | Tail]) when Idx + 1 == NextIdx -> - verify_entries(NextIdx, Tail); -verify_entries(Idx, Tail) -> - Msg = io_lib:format("ra_log:verify_entries/2 " - "tried writing ~p - expected ~b", - [Tail, Idx+1]), - {error, {integrity_error, lists:flatten(Msg)}}. +% verify_entries(_, []) -> +% ok; +% verify_entries(Idx, [{NextIdx, _, _} | Tail]) when Idx + 1 == NextIdx -> +% verify_entries(NextIdx, Tail); +% verify_entries(Idx, Tail) -> +% Msg = io_lib:format("ra_log:verify_entries/2 " +% "tried writing ~p - expected ~b", +% [Tail, Idx+1]), +% %% TODO mt: ra_log_memtbl:abort/1 +% {error, {integrity_error, lists:flatten(Msg)}}. write_entries([], State) -> {ok, State}; -write_entries([{FstIdx, _, _} | Rest] = Entries, State0) -> - %% TODO: verify and build up wal commands in one iteration - case verify_entries(FstIdx, Rest) of - ok -> - try - {ok, wal_write_batch(State0, Entries)} - catch - error:wal_down -> - {error, wal_down} - end; +write_entries(Entries, #?MODULE{cfg = Cfg, mem_table = Mt0} = State0) -> + case stage_entries(Cfg, Entries, Mt0) of + {ok, Mt} -> + wal_write_batch(State0#?MODULE{mem_table = Mt}, Entries); Error -> Error end. +stage_entries(Cfg, [Entry | Rem] = Entries, Mt0) -> + case ra_log_memtbl:stage(Entry, Mt0) of + {ok, Mt} -> + stage_entries0(Rem, Mt); + {error, overwriting} -> + %% TODO: mt: error handling + {ok, Mt} = ra_log_ets:new_mem_table_please(Cfg#cfg.names, + Cfg#cfg.uid, Mt0), + stage_entries(Cfg, Entries, Mt) + end. + +stage_entries0([], Mt) -> + {ok, Mt}; +stage_entries0([Entry | Rem], Mt0) -> + case ra_log_memtbl:stage(Entry, Mt0) of + {ok, Mt} -> + stage_entries0(Rem, Mt); + {error, overwriting} -> + Range = ra_log_memtbl:range(Mt0), + Msg = io_lib:format("ra_log:verify_entries/2 " + "tried writing ~p - mem table range ~w", + [Rem, Range]), + {error, {integrity_error, lists:flatten(Msg)}} + end. + + + write_snapshot(Meta, MacRef, SnapKind, #?MODULE{cfg = Cfg, snapshot_state = SnapState0} = State) -> @@ -1336,8 +1329,9 @@ pick_range([{Fst, _Lst} | Tail], {CurFst, CurLst}) -> %% TODO: implement synchronous writes using gen_batch_server:call/3 await_written_idx(Idx, Term, Log0) -> + IDX = Idx, receive - {ra_log_event, {written, {_, Idx, Term}} = Evt} -> + {ra_log_event, {written, Term, {_, IDX}} = Evt} -> {Log, _} = handle_event(Evt, Log0), Log; {ra_log_event, {written, _} = Evt} -> @@ -1383,18 +1377,18 @@ maps_with_values(Keys, Map) -> end end, [], Keys). -last_index_term_in_wal(Idx, #?MODULE{last_written_index_term = {Idx, Term}} = State) -> - % we reached the lower limit which is the last known written index - {Idx, Term, State}; -last_index_term_in_wal(Idx, #?MODULE{reader = Reader0} = State) -> - case ra_log_reader:fetch_term(Idx, Reader0) of - {undefined, Reader} -> - last_index_term_in_wal(Idx-1, State#?MODULE{reader = Reader}); - {Term, Reader} -> - %% if it can be read when bypassing the local cache it is in the - %% wal - {Idx, Term, State#?MODULE{reader = Reader}} - end. +% last_index_term_in_wal(Idx, #?MODULE{last_written_index_term = {Idx, Term}} = State) -> +% % we reached the lower limit which is the last known written index +% {Idx, Term, State}; +% last_index_term_in_wal(Idx, #?MODULE{reader = Reader0} = State) -> +% case ra_log_reader:fetch_term(Idx, Reader0) of +% {undefined, Reader} -> +% last_index_term_in_wal(Idx-1, State#?MODULE{reader = Reader}); +% {Term, Reader} -> +% %% if it can be read when bypassing the local cache it is in the +% %% wal +% {Idx, Term, State#?MODULE{reader = Reader}} +% end. now_ms() -> erlang:system_time(millisecond). diff --git a/src/ra_log_ets.erl b/src/ra_log_ets.erl index 108e9336..5ba63aa5 100644 --- a/src/ra_log_ets.erl +++ b/src/ra_log_ets.erl @@ -94,6 +94,8 @@ delete_mem_table(#{log_ets := Name}, UId) -> ra_log_memtbl:delete_spec(), ra_log_memtbl:state()) -> ok. +execute_delete(#{}, undefined, _Mt) -> + ok; execute_delete(#{log_ets := Name}, Spec, Mt) -> gen_server:cast(Name, {exec_delete, Spec, Mt}). @@ -150,9 +152,9 @@ handle_call(_Request, _From, State) -> % end; handle_cast({exec_delete, Spec, Mt}, State) -> try timer:tc(fun () -> ra_log_memtbl:delete(Spec, Mt) end) of - {Time, _} -> - ?DEBUG("ra_log_ets: ets:delete/1 took ~bms to delete ~w", - [Time div 1000, Spec]), + {Time, Num} -> + ct:pal("ra_log_ets: ets:delete/1 took ~bms to delete ~w ~b entries", + [Time div 1000, Spec, Num]), ok catch _:Err -> diff --git a/src/ra_log_memtbl.erl b/src/ra_log_memtbl.erl index 63f23909..78799102 100644 --- a/src/ra_log_memtbl.erl +++ b/src/ra_log_memtbl.erl @@ -16,9 +16,9 @@ get_items/2, record_flushed/3, set_first/2, - set_last/2, delete/2, tid/1, + is_active/2, prev/1, info/1, range/1, @@ -36,9 +36,9 @@ (is_tuple(Range) andalso Idx < element(1, Range))). --define(IS_AFTER_RANGE(Idx, Range), - (is_tuple(Range) andalso - Idx > element(2, Range))). +% -define(IS_AFTER_RANGE(Idx, Range), +% (is_tuple(Range) andalso +% Idx > element(2, Range))). % -define(RANGE_OUT(Idx, Range), % (Idx < element(1, Range) orelse @@ -57,7 +57,8 @@ -opaque state() :: #?MODULE{}. --type delete_spec() :: ra:index() | +-type delete_spec() :: undefined | + ra:index() | {'<', ra:index()} | {all, ets:tid()} | {delete, ets:tid()} | @@ -115,25 +116,26 @@ insert({Idx, _, _} = _Entry, ?IS_BEFORE_RANGE(Idx, Range) -> {error, overwriting}. --spec stage(log_entry(), state()) -> state(). +-spec stage(log_entry(), state()) -> + {ok, state()} | {error, overwriting}. stage({Idx, _, _} = Entry, - #?MODULE{ staged = {FstIdx, Staged}, - range = Range} = State) + #?MODULE{staged = {FstIdx, Staged}, + range = Range} = State) when ?IS_NEXT_IDX(Idx, Range) -> - State#?MODULE{staged = {FstIdx, [Entry | Staged]}, - range = update_range_end(Idx, Range)}; + {ok, State#?MODULE{staged = {FstIdx, [Entry | Staged]}, + range = update_range_end(Idx, Range)}}; stage({Idx, _, _} = Entry, - #?MODULE{tid = _Tid, - staged = undefined, - range = Range} = State) + #?MODULE{tid = _Tid, + staged = undefined, + range = Range} = State) when ?IS_NEXT_IDX(Idx, Range) -> - State#?MODULE{staged = {Idx, [Entry]}, - range = update_range_end(Idx, Range)}; + {ok, State#?MODULE{staged = {Idx, [Entry]}, + range = update_range_end(Idx, Range)}}; stage({Idx, _, _} = _Entry, - #?MODULE{range = Range} = _State0) + #?MODULE{range = Range} = _State0) when ?IN_RANGE(Idx, Range) orelse ?IS_BEFORE_RANGE(Idx, Range) -> - exit({error, overwriting}). + {error, overwriting}. -spec commit(state()) -> state(). commit(#?MODULE{staged = undefined} = State) -> @@ -178,12 +180,12 @@ lookup_term(Idx, #?MODULE{staged = {FstStagedIdx, Staged}}) lookup_term(Idx, #?MODULE{tid = Tid, range = Range}) when ?IN_RANGE(Idx, Range) -> - ets:lookup_element(Tid, Idx, 2); + ets:lookup_element(Tid, Idx, 2, undefined); lookup_term(_Idx, _State) -> undefined. -spec tid_for(ra:index(), ra_term(), state()) -> - undefine | ets:tid(). + undefined | ets:tid(). tid_for(_Idx, _Term, undefined) -> undefined; tid_for(Idx, Term, State) -> @@ -225,15 +227,18 @@ delete({range, Tid, {Start, End}}, #?MODULE{tid = Tid} = State) -> Limit = ets:info(Tid, size) div 2, case NumToDelete > Limit of true -> + ct:pal("delete < ~b ~b", [Start, End]), %% more than half the table is to be deleted delete({'<', Tid, End + 1}, State); false -> + ct:pal("delete ~b ~b", [Start, End]), delete(Start, End, Tid), End - Start + 1 end; delete({Op, Tid, Idx}, #?MODULE{tid = Tid}) when is_integer(Idx) and is_atom(Op) -> DelSpec = [{{'$1', '_', '_'}, [{'<', '$1', Idx}], [true]}], + ct:pal("DelSpec ~p", [DelSpec]), ets:select_delete(Tid, DelSpec); delete({all, Tid}, #?MODULE{tid = Tid}) -> Size = ets:info(Tid, size), @@ -259,6 +264,10 @@ delete(#?MODULE{tid = Tid}) -> tid(#?MODULE{tid = Tid}) -> Tid. +-spec is_active(ets:tid(), state()) -> boolean(). +is_active(Tid, State) -> + Tid =:= tid(State). + -spec prev(state()) -> undefined | state(). prev(#?MODULE{prev = Prev}) -> Prev. @@ -273,50 +282,70 @@ info(#?MODULE{tid = Tid, has_previous => Prev =/= undefined }. --spec record_flushed(ets:tid(), ra:range(), state()) -> {delete_spec(), state()}. -record_flushed(Tid, {_, End}, #?MODULE{tid = Tid} = State0) -> - case set_first(End+1, State0) of - {[], State} -> - {undefined, State}; - {[Spec], State} -> - {Spec, State} - end; +-spec record_flushed(ets:tid(), ra:range(), state()) -> + {delete_spec(), state()}. +record_flushed(TID = Tid, {Start, End}, + #?MODULE{tid = TID, + range = Range} = State) -> + case ?IN_RANGE(End, Range) of + true -> + {{range, Tid, {Start, End}}, + State#?MODULE{range = ra_range:truncate(End, Range)}}; + false -> + {undefined, State} + end; record_flushed(_Tid, _Range, #?MODULE{prev = undefined} = State) -> {undefined, State}; record_flushed(Tid, Range, #?MODULE{prev = Prev0} = State) -> - case record_flushed(Tid, Range, Prev0) of - {{all, Tid}, _Prev} -> - %% upgrade all spec to delete + {Spec, Prev} = record_flushed(Tid, Range, Prev0), + case range(Prev) of + undefined -> + %% the prev table is now empty and can be deleted, {{delete, Tid}, State#?MODULE{prev = undefined}}; - {Spec, Prev} -> + _ -> {Spec, State#?MODULE{prev = Prev}} end. -spec set_first(ra:index(), state()) -> {[delete_spec()], state()}. set_first(Idx, #?MODULE{tid = Tid, - range = {Start, _} = Range} = State) - when ?IN_RANGE(Idx, Range) -> - {[{range, Tid, {Start, Idx - 1}}], - State#?MODULE{range = update_range_start(Idx, Range)}}; -set_first(Idx, #?MODULE{tid = Tid, - range = Range} = State) - when ?IS_AFTER_RANGE(Idx, Range) -> - {[{all, Tid}], State#?MODULE{range = undefined}}; + range = Range, + prev = Prev0} = State) + when (is_tuple(Range) andalso Idx > element(1, Range)) orelse + Range == undefined -> + {PrevSpecs, Prev} = case Prev0 of + undefined -> + {[], undefined}; + _ -> + case set_first(Idx, Prev0) of + {[{range, PTID, _} | Rem], + #?MODULE{tid = PTID} = P} = Res -> + %% set_first/2 returned a range spec for + %% prev and prev is now empty, + %% upgrade to delete spec of whole tid + case range(P) of + undefined -> + {[{delete, tid(P)} | Rem], + prev(P)}; + _ -> + Res + end; + Res -> + Res + end + end, + Specs = case Range of + {Start, End} -> + [{range, Tid, {Start, min(Idx - 1, End)}} | PrevSpecs]; + undefined -> + PrevSpecs + end, + {Specs, + State#?MODULE{range = ra_range:truncate(Idx - 1, Range), + prev = Prev}}; set_first(_Idx, State) -> - %% TODO fall back to prev {[], State}. --spec set_last(ra:index(), state()) -> - {undefined | delete_spec(), state()}. -set_last(Idx, #?MODULE{range = Range} = State) - when ?IN_RANGE(Idx, Range) -> - {{'>', Idx}, State#?MODULE{range = update_range_end(Idx, Range)}}; -set_last(Idx, #?MODULE{range = {Start, _End}} = State) - when Idx < Start -> - {all, State#?MODULE{range = undefined}}; -set_last(_Idx, State) -> - {undefined, State}. %% internal @@ -327,13 +356,6 @@ update_range_end(Idx, {Start, End}) update_range_end(Idx, undefined) -> {Idx, Idx}. -update_range_start(Idx, {Start, End}) - when Idx >= Start andalso - Idx =< End -> - {Idx, End}; -update_range_start(_Idx, Range) -> - Range. - delete(End, End, Tid) -> ets:delete(Tid, End); delete(Start, End, Tid) -> diff --git a/src/ra_log_wal.erl b/src/ra_log_wal.erl index c6c93cd9..ec0bfb83 100644 --- a/src/ra_log_wal.erl +++ b/src/ra_log_wal.erl @@ -109,8 +109,9 @@ -record(recovery, {mode :: clean | dirty, ranges = #{} :: #{ra_uid() => - [{ets:tid(), {ra:index(), ra:index()}}]}, - tables = #{} :: #{ra_uid() => ra_log_memtbl:state()} + [{ets:tid(), {ra:index(), ra:index()}}]}, + tables = #{} :: #{ra_uid() => ra_log_memtbl:state()}, + writers = #{} :: #{ra_uid() => {in_seq, ra:index()}} }). -record(state, {conf = #conf{}, file_num = 0 :: non_neg_integer(), @@ -345,9 +346,7 @@ handle_op({info,{'EXIT', _, Reason}}, _State) -> throw({stop, Reason}). recover_wal(Dir, #conf{segment_writer = SegWriter, - open_mem_tbls_tid = OpenTbl, - % closed_mem_tbls_tid = ClosedTbl, - recovery_chunk_size = _RecoveryChunkSize} = Conf) -> + open_mem_tbls_tid = OpenTbl} = Conf) -> % ensure configured directory exists ok = ra_lib:make_dir(Dir), WalFiles = lists:sort(filelib:wildcard(filename:join(Dir, "*.wal"))), @@ -359,25 +358,37 @@ recover_wal(Dir, #conf{segment_writer = SegWriter, _ -> dirty end, + AllWriters = [begin FBase = filename:basename(F), ?DEBUG("wal: recovering ~ts", [FBase]), Fd = open_at_first_record(F), - {Time, #recovery{ranges = Ranges}} = + {Time, #recovery{ranges = Ranges, + writers = Writers + % tables = Mts + }} = timer:tc(fun () -> recover_wal_chunks(Conf, Fd, Mode) end), ok = ra_log_segment_writer:accept_mem_tables(SegWriter, Ranges, FBase), - ?DEBUG("wal: recovered ~ts time taken ~bms", - [FBase, Time div 1000]), - close_existing(Fd) + ?DEBUG("wal: recovered ~ts time taken ~bms - Writer state recovered ~p", + [FBase, Time div 1000, Writers]), + close_existing(Fd), + Writers end || F <- WalFiles], + FinalWriters = lists:foldl(fun (New, Acc) -> + maps:merge(Acc, New) + end, #{}, AllWriters), + + ?DEBUG("wal: final writer state recovered ~p", [FinalWriters]), + FileNum = extract_file_num(lists:reverse(WalFiles)), - State = roll_over(undefined, #state{conf = Conf, - file_num = FileNum}), + State = roll_over(#state{conf = Conf, + writers = FinalWriters, + file_num = FileNum}), true = erlang:garbage_collect(), State. @@ -424,7 +435,7 @@ write_data({UId, Pid} = Id, MtTid, Idx, Term, Data0, Trunc, SnapIdx, % we roll over to a new wal. case should_roll_wal(State0) of true -> - State = roll_over(State0), + State = complete_batch_and_roll(State0), write_data(Id, MtTid, Idx, Term, Data0, Trunc, SnapIdx, State); false -> EntryData = case Data0 of @@ -498,46 +509,31 @@ handle_msg({query, Fun}, State) -> _ = catch Fun(State), State; handle_msg(rollover, State) -> - roll_over(State). + complete_batch_and_roll(State). incr_batch(#batch{num_writes = Writes, waiting = Waiting0, pending = Pend} = Batch, - UId, Pid, MtTid, Idx, Term, Data, SnapIdx) -> + UId, Pid, MT_TID = MtTid, + Idx, TERM = Term, Data, SnapIdx) -> Waiting = case Waiting0 of - #{Pid := #batch_writer{term = Term, %% Pattern Match - tid = MtTid, %% PM + #{Pid := #batch_writer{term = TERM, + tid = MT_TID, range = Range0 - % from = From } = W} -> - %% TODO: overwrites will be signalled by a different tid - % TblStart = max(SnapIdx, table_start(Truncate, Idx, TblStart0)), - %% it is totally possible that a different tid will be the source - %% within the scope of same batch. how to handle this? - %% IDEA: perhaps if the tid is different we start a new - %% batch writer and put the old batch writers in an - %% old batch writer field? + %% The Tid and term is the same so add to current batch_writer Range = ra_range:extend(Idx, ra_range:truncate(SnapIdx, Range0)), Waiting0#{Pid => W#batch_writer{range = Range, - % from = min(Idx, From), snap_idx = SnapIdx, - % to = Idx, term = Term }}; _ -> + %% The tid is different, open a new batch writer for the + %% new tid and term PrevBatchWriter = maps:get(Pid, Waiting0, undefined), - % TblStart = case Ranges of - % #{UId := [{_MtTid, {Start0, _}} | _]} -> - % max(SnapIdx, table_start(Truncate, Idx, - % Start0)); - % _ -> - % max(SnapIdx, Idx) - % end, Writer = #batch_writer{snap_idx = SnapIdx, tid = MtTid, range = ra_range:new(Idx), - % from = Idx, - % to = Idx, uid = UId, term = Term, old = PrevBatchWriter @@ -549,47 +545,18 @@ incr_batch(#batch{num_writes = Writes, waiting = Waiting, pending = [Pend | Data]}. -% update_mem_table(#conf{open_mem_tbls_tid = OpnMemTbl} = Cfg, -% UId, Idx, Term, Entry, Truncate) -> -% % TODO: if Idx =< First we could truncate the entire table and save -% % some disk space when it later is flushed to disk -% SnapIdx = snap_idx(Cfg, UId), -% case ets:lookup(OpnMemTbl, UId) of -% [{_UId, From0, _To, Tid}] -> -% case Idx > SnapIdx of -% true -> -% true = ets:insert(Tid, {Idx, Term, Entry}), -% From = table_start(Truncate, Idx, From0), -% % update Last idx for current tbl -% % this is how followers overwrite previously seen entries -% _ = ets:update_element(OpnMemTbl, UId, [{2, From}, {3, Idx}]); -% false -> -% From = max(SnapIdx, table_start(Truncate, Idx, From0)), -% _ = ets:update_element(OpnMemTbl, UId, [{2, From}]) -% end; -% [] when Idx > SnapIdx -> -% % open new ets table -% Tid = open_mem_table(Cfg, UId), -% true = ets:insert_new(OpnMemTbl, {UId, Idx, Idx, Tid}), -% true = ets:insert(Tid, {Idx, Term, Entry}); -% _ -> -% true -% end. - -roll_over(#state{conf = #conf{open_mem_tbls_tid = Tbl}} = State0) -> +complete_batch_and_roll(#state{} = State0) -> State = complete_batch(State0), - roll_over(Tbl, start_batch(State)). + roll_over(start_batch(State)). -roll_over(_OpnMemTbls, #state{wal = Wal0, file_num = Num0, - conf = #conf{dir = Dir, - segment_writer = SegWriter, - max_size_bytes = MaxBytes - } = Conf0} = State0) -> +roll_over(#state{wal = Wal0, file_num = Num0, + conf = #conf{dir = Dir, + segment_writer = SegWriter, + max_size_bytes = MaxBytes} = Conf0} = State0) -> counters:add(Conf0#conf.counter, ?C_WAL_FILES, 1), Num = Num0 + 1, Fn = ra_lib:zpad_filename("", "wal", Num), NextFile = filename:join(Dir, Fn), - % ?DEBUG("wal: opening new file ~ts open mem tables: ~w", [Fn, OpnMemTbls]), ?DEBUG("wal: opening new file ~ts", [Fn]), %% if this is the first wal since restart randomise the first %% max wal size to reduce the likelihood that each erlang node will @@ -1020,6 +987,7 @@ update_ranges(Ranges, UId, MtTid, SnapIdx, {_Start, _} = AddRange) -> recover_entry(Names, UId, {Idx, _, _} = Entry, SnapIdx, #recovery{mode = clean, ranges = Ranges0, + writers = Writers, tables = Tables} = State) -> Mt0 = case Tables of #{UId := M} -> M; @@ -1032,6 +1000,7 @@ recover_entry(Names, UId, {Idx, _, _} = Entry, SnapIdx, Ranges = update_ranges(Ranges0, UId, ra_log_memtbl:tid(Mt1), SnapIdx, ra_range:new(Idx)), {ok, State#recovery{ranges = Ranges, + writers = Writers#{UId => {in_seq, Idx}}, tables = Tables#{UId => Mt1}}}; {error, overwriting} -> %% create successor memtable @@ -1041,6 +1010,7 @@ recover_entry(Names, UId, {Idx, _, _} = Entry, SnapIdx, recover_entry(Names, UId, {Idx, Term, _}, SnapIdx, #recovery{mode = dirty, ranges = Ranges0, + writers = Writers, tables = Tables} = State) -> Mt0 = case Tables of #{UId := M} -> M; @@ -1057,6 +1027,7 @@ recover_entry(Names, UId, {Idx, Term, _}, SnapIdx, Ranges = update_ranges(Ranges0, UId, Tid, SnapIdx, ra_range:new(Idx)), {ok, State#recovery{ranges = Ranges, + writers = Writers#{UId => {in_seq, Idx}}, tables = Tables#{UId => Mt0}}} end. diff --git a/src/ra_metrics_ets.erl b/src/ra_metrics_ets.erl index 370ad313..df2ca7b2 100644 --- a/src/ra_metrics_ets.erl +++ b/src/ra_metrics_ets.erl @@ -38,8 +38,8 @@ init([]) -> {write_concurrency, true}, public], _ = ets:new(ra_log_metrics, [set | TableFlags]), - _ = ra_counters:init(), - _ = ra_leaderboard:init(), + ok = ra_counters:init(), + ok = ra_leaderboard:init(), %% Table for ra processes to record their current snapshot index so that %% other processes such as the segment writer can use this value to skip diff --git a/src/ra_range.erl b/src/ra_range.erl index ba49878e..9d43344a 100644 --- a/src/ra_range.erl +++ b/src/ra_range.erl @@ -32,9 +32,9 @@ new(Start, End) Start =< End -> {Start, End}. --spec add(range(), range()) -> range(). -add(Range, undefined) -> - Range; +-spec add(AddRange :: range(), CurRange :: range()) -> range(). +add(_Range, undefined) -> + undefined; add({Start1, End1}, {Start2, End2}) when Start2 =< End1 + 1 andalso End2 >= Start1 -> @@ -89,7 +89,7 @@ extend(Idx, undefined) when is_integer(Idx) -> add_test() -> ?assertEqual(undefined, add(undefined, undefined)), ?assertEqual({1, 10}, add(undefined, {1, 10})), - ?assertEqual({1, 10}, add({1, 10}, undefined)), + ?assertEqual(undefined, add({1, 10}, undefined)), ?assertEqual({1, 20}, add({1, 10}, {11, 20})), ?assertEqual({1, 20}, add({1, 10}, {5, 20})), @@ -100,6 +100,9 @@ add_test() -> %% when the new range is smaller than the prior range ?assertEqual({1, 3}, add({6, 10}, {1, 3})), ?assertEqual({1, 7}, add({6, 10}, {1, 7})), + %% when the old range is smaller than the add range + ?assertEqual({1, 3}, add({6, 10}, {1, 3})), + ?assertEqual({1, 7}, add({6, 10}, {1, 7})), ok. -endif. diff --git a/src/ra_server.erl b/src/ra_server.erl index 8306e2d1..88dfd164 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -1106,7 +1106,13 @@ handle_follower(#append_entries_rpc{term = Term, %% application as applied index could be higher %% than written (if consensus has already been acheived from %% other members) - Log = ra_log:reset_to_last_known_written(Log1), + %% TODO: mt: we cannot do this anymore as the WAL does + %% not write the mem tables. We could implement something + %% alternative where the WAL writes the last index, term + %% it wrote for each UID into an ETS table and query + %% this. + % Log = ra_log:reset_to_last_known_written(Log1), + Log = Log1, {await_condition, State1#{log => Log, condition => diff --git a/test/ra_log_2_SUITE.erl b/test/ra_log_2_SUITE.erl index 6b8ba277..616eb52a 100644 --- a/test/ra_log_2_SUITE.erl +++ b/test/ra_log_2_SUITE.erl @@ -18,13 +18,13 @@ all() -> all_tests() -> [ - % resend_write, - % resend_write_after_tick, + resend_write_lost_in_wal_crash, + resend_write_after_tick, handle_overwrite, receive_segment, read_one, %% TODO: - % take_after_overwrite_and_init, + take_after_overwrite_and_init, validate_sequential_fold, validate_reads_for_overlapped_writes, cache_overwrite_then_take, @@ -42,7 +42,7 @@ all_tests() -> wal_down_write_returns_error_wal_down, detect_lost_written_range, - % snapshot_recovery, + snapshot_recovery, snapshot_installation, snapshot_written_after_installation, oldcheckpoints_deleted_after_snapshot_install, @@ -106,6 +106,9 @@ init_per_testcase(TestCase, Config) -> ok = ra_lib:write_file(filename:join([DataDir, node(), UId, "config"]), list_to_binary(io_lib:format("~p.", [ServerConf]))), + [SupPid] = [P || {ra_log_wal_sup, P, _, _} + <- supervisor:which_children(ra_log_sup)], + _ = supervisor:restart_child(SupPid, ra_log_wal), [{uid, UId}, {test_case, TestCase}, {wal_dir, DataDir} | Config]. end_per_testcase(_, Config) -> @@ -121,7 +124,7 @@ handle_overwrite(Config) -> {ok, Log1} = ra_log:write([{1, 1, "value"}, {2, 1, "value"}], Log0), receive - {ra_log_event, {written, {1, 2, 1}}} -> ok + {ra_log_event, {written, 1, {1, 2}}} -> ok after 2000 -> exit(written_timeout) end, @@ -131,11 +134,11 @@ handle_overwrite(Config) -> {ok, Log4} = ra_log:write([{2, 2, "value"}], Log3), % simulate the first written event coming after index 20 has already % been written in a new term - {Log, _} = ra_log:handle_event({written, {1, 2, 1}}, Log4), + {Log, _} = ra_log:handle_event({written, 1, {1, 2}}, Log4), % ensure last written has not been incremented {0, 0} = ra_log:last_written(Log), {2, 2} = ra_log:last_written( - element(1, ra_log:handle_event({written, {1, 2, 2}}, Log))), + element(1, ra_log:handle_event({written, 2, {1, 2}}, Log))), ok = ra_log_wal:force_roll_over(ra_log_wal), _ = deliver_all_log_events(Log, 100), ra_log:close(Log), @@ -151,17 +154,12 @@ receive_segment(Config) -> Log1 = lists:foldl(fun(E, Acc0) -> ra_log:append(E, Acc0) end, Log0, Entries), - % Log2 = deliver_all_log_events(Log1, 500), Log2 = deliver_log_events_cond( Log1, fun (L) -> {PostWritten, _} = ra_log:last_written(L), PostWritten >= (PreWritten + 3) end, 100), {3, 1} = ra_log:last_written(Log2), - UId = ?config(uid, Config), - [MemTblTid] = [Tid || {Key, _, _, Tid} - <- ets:tab2list(ra_log_open_mem_tables), Key == UId], - ?assert(ets:info(MemTblTid) =/= undefined), % force wal roll over ok = ra_log_wal:force_roll_over(ra_log_wal), % Log3 = deliver_all_log_events(Log2, 1500), @@ -169,9 +167,6 @@ receive_segment(Config) -> Log2, fun (L) -> #{mem_table_range := MtRange} = ra_log:overview(L), MtRange == undefined - % ets:info(MemTblTid) == undefined andalso - % [] =:= ets:tab2list(ra_log_open_mem_tables) andalso - % [] =:= ets:tab2list(ra_log_closed_mem_tables) end, 100), % validate reads {Entries, FinalLog} = ra_log_take(1, 3, Log3), @@ -469,7 +464,7 @@ writes_lower_than_snapshot_index_are_dropped(Config) -> %% no written notifications for anything lower than the snapshot should %% be received Log5 = receive - {ra_log_event, {written, {From, _To, _Term}} = E} + {ra_log_event, {written, _Term, {From, _To}} = E} when From == 101 -> {Log4b, Effs} = ra_log:handle_event(E, Log4), Log4c = lists:foldl( @@ -572,16 +567,15 @@ last_index_reset(Config) -> last_index_reset_before_written(Config) -> Log0 = ra_log_init(Config), - #{cache_size := 0} = ra_log:overview(Log0), Log1 = write_n(1, 5, 1, Log0), - #{cache_size := 4} = ra_log:overview(Log1), + #{mem_table_range := {0, 4}} = ra_log:overview(Log1), {0, 0} = ra_log:last_written(Log1), 5 = ra_log:next_index(Log1), {4, 1} = ra_log:last_index_term(Log1), % reverts last index to a previous index % needs to be done if a new leader sends an empty AER {ok, Log2} = ra_log:set_last_index(3, Log1), - #{cache_size := 3} = ra_log:overview(Log2), + % #{cache_size := 3} = ra_log:overview(Log2), {0, 0} = ra_log:last_written(Log2), 4 = ra_log:next_index(Log2), {3, 1} = ra_log:last_index_term(Log2), @@ -592,7 +586,7 @@ last_index_reset_before_written(Config) -> end), 4 = ra_log:next_index(Log3), {3, 1} = ra_log:last_index_term(Log3), - #{cache_size := 0} = ra_log:overview(Log3), + % #{cache_size := 0} = ra_log:overview(Log3), ok. recovery(Config) -> @@ -710,17 +704,7 @@ recovery_with_missing_config_file(Config) -> ok. -resend_write(Config) -> - % logger:set_primary_config(level, debug), - % simulate lost messages requiring the ra server to resend in flight - % writes - meck:new(ra_log_wal, [passthrough]), - meck:expect(ra_log_wal, write, fun (_, _, 10, _, _) -> - {ok, self()}; - (A, B, C, D, E) -> - meck:passthrough([A, B, C, D, E]) - end), - timer:sleep(100), +resend_write_lost_in_wal_crash(Config) -> Log0 = ra_log_init(Config), {0, 0} = ra_log:last_index_term(Log0), %% write 1..9 @@ -728,18 +712,22 @@ resend_write(Config) -> Log2 = assert_log_events(Log1, fun (L) -> {9, 2} == ra_log:last_written(L) end), - % fake missing entry - %% write 10 which will be dropped by meck interception + WalPid = whereis(ra_log_wal), + %% suspend wal, write an entry then kill it + erlang:suspend_process(WalPid), Log2b = append_n(10, 11, 2, Log2), + exit(WalPid, kill), + ra_lib:retry(fun () -> is_pid(whereis(ra_log_wal)) end, 100), %% write 11..12 which should trigger resend Log3 = append_n(11, 13, 2, Log2b), Log4 = receive {ra_log_event, {resend_write, 10} = Evt} -> - %% unload mock so that write of index 10 can go through - meck:unload(ra_log_wal), - element(1, ra_log:handle_event(Evt, Log3)) + element(1, ra_log:handle_event(Evt, Log3)); + {ra_log_event, {written, 2, {11, 12}}} -> + ct:fail("unexpected gappy write!!") after 500 -> - throw(resend_write_timeout) + flush(), + ct:fail(resend_write_timeout) end, Log5 = ra_log:append({13, 2, banana}, Log4), Log6 = assert_log_events(Log5, fun (L) -> @@ -753,12 +741,9 @@ resend_write(Config) -> resend_write_after_tick(Config) -> meck:new(ra_log_wal, [passthrough]), WalPid = whereis(ra_log_wal), - timer:sleep(100), - ct:pal("ra_log_init"), Log0 = ra_log_init(Config), {0, 0} = ra_log:last_index_term(Log0), - %% ct:pal("appending"), - meck:expect(ra_log_wal, write, fun (_, _, _, _, _) -> + meck:expect(ra_log_wal, write, fun (_, _, _, _, _, _) -> {ok, WalPid} end), Log1 = ra_log:append({1, 2, banana}, Log0), @@ -766,7 +751,6 @@ resend_write_after_tick(Config) -> meck:unload(ra_log_wal), %% restart wal to get a new wal pid so that the ra_log detects on tick %% that the wal process has changed - %% ct:pal("restart wal"), restart_wal(), Ms = erlang:system_time(millisecond) + 5001, @@ -774,7 +758,7 @@ resend_write_after_tick(Config) -> Log = assert_log_events(Log2, fun (L) -> {1, 2} == ra_log:last_written(L) end), - ct:pal("overvew ~p", [ra_log:overview(Log)]), + % ct:pal("overvew ~p", [ra_log:overview(Log)]), ra_log:close(Log), ok. @@ -790,10 +774,19 @@ wal_crash_recover(Config) -> Log3 = write_n(75, 100, 2, Log2), % wait long enough for the resend window to pass timer:sleep(1000), - Log = assert_log_events(write_n(100, 101, 2, Log3), + % debugger:start(), + % int:i(ra_log_memtbl), + % int:i(?MODULE), + % % int:break(ra_log_memtbl, 228), + % int:break(?MODULE, 788), + Log4 = write_n(100, 101, 2, Log3), + {true, _} = ra_log:exists({100, 2}, Log4), + Log = assert_log_events(Log4, fun (L) -> + {Exists, _} = ra_log:exists({100, 2}, L), + ct:pal("Exists ~w", [Exists]), {100, 2} == ra_log:last_written(L) - end), + end, 2000000), {100, 2} = ra_log:last_written(Log), validate_fold(1, 99, 2, Log), ok. @@ -818,7 +811,7 @@ wal_down_append_throws(Config) -> <- supervisor:which_children(ra_log_sup)], ok = supervisor:terminate_child(SupPid, ra_log_wal), ?assert(not ra_log:can_write(Log0)), - ?assertError(wal_down, ra_log:append({1,1,hi}, Log0)), + ?assertError(wal_down, ra_log:append({1, 1, hi}, Log0)), ok. wal_down_write_returns_error_wal_down(Config) -> @@ -826,31 +819,29 @@ wal_down_write_returns_error_wal_down(Config) -> [SupPid] = [P || {ra_log_wal_sup, P, _, _} <- supervisor:which_children(ra_log_sup)], ok = supervisor:terminate_child(SupPid, ra_log_wal), - {error, wal_down} = ra_log:write([{1,1,hi}], Log0), + {error, wal_down} = ra_log:write([{1, 1, hi}], Log0), ok. detect_lost_written_range(Config) -> Log0 = ra_log_init(Config, #{wal => ra_log_wal}), - meck:new(ra_log_wal, [passthrough]), {0, 0} = ra_log:last_index_term(Log0), % write some entries Log1 = append_and_roll_no_deliver(1, 10, 2, Log0), Log2 = assert_log_events(Log1, fun (L) -> {9, 2} == ra_log:last_written(L) end), - % WAL rolls over and WAL file is deleted % simulate wal outage - meck:expect(ra_log_wal, write, fun (_, _, _, _, _) -> {ok, self()} end), + WalPid = whereis(ra_log_wal), + true = ra_log_wal_SUITE:suspend_process(WalPid), % append some messages that will be lost Log3 = append_n(10, 15, 2, Log2), - % restart WAL to ensure lose the transient state keeping track of + % kill WAL to ensure lose the transient state keeping track of % each writer's last written index - restart_wal(), + exit(WalPid, kill), - % WAL recovers - meck:unload(ra_log_wal), + ra_lib:retry(fun () -> whereis(ra_log_wal) /= undefined end, 1000), % append some more stuff Log4 = append_n(15, 20, 2, Log3), @@ -861,6 +852,8 @@ detect_lost_written_range(Config) -> {Entries, _} = ra_log_take(0, 20, Log5), ?assertEqual(20, length(Entries)), ra_log:close(Log5), + ra_log_wal:force_roll_over(ra_log_wal), + timer:sleep(1000), Log = ra_log_init(Config), {19, 2} = ra_log:last_written(Log5), {RecoveredEntries, _} = ra_log_take(0, 20, Log), @@ -1490,6 +1483,7 @@ assert_log_events(Log0, AssertPred, Timeout) -> %% handle any next events Log = lists:foldl( fun ({next_event, {ra_log_event, E}}, Acc0) -> + ct:pal("eff log evt: ~p", [E]), {Acc, _Effs} = ra_log:handle_event(E, Acc0), Acc; (_, Acc) -> @@ -1539,7 +1533,7 @@ deliver_one_log_events(Log0, Timeout) -> deliver_written_log_events(Log0, Timeout) -> receive - {ra_log_event, {written, _} = Evt} -> + {ra_log_event, {written, _, _} = Evt} -> ct:pal("log evt: ~p", [Evt]), {Log, _} = ra_log:handle_event(Evt, Log0), deliver_written_log_events(Log, 100) diff --git a/test/ra_log_SUITE.erl b/test/ra_log_SUITE.erl index 8bad042a..46c1a0cc 100644 --- a/test/ra_log_SUITE.erl +++ b/test/ra_log_SUITE.erl @@ -33,9 +33,9 @@ all_tests() -> groups() -> [ {tests, [], [ - init_close_init - % write_recover_then_overwrite, - % write_overwrite_then_recover + init_close_init, + write_recover_then_overwrite, + write_overwrite_then_recover | all_tests()]} ]. @@ -114,7 +114,7 @@ append_then_fetch_no_wait(Config) -> % if we get async written notification check that handling that % results in the last written being updated receive - {ra_log_event, {written, _} = Evt} -> + {ra_log_event, {written, _, _} = Evt} -> {Log, _} = ra_log:handle_event(Evt, Log3), {Idx, Term} = ra_log:last_written(Log) after 0 -> @@ -129,9 +129,10 @@ write_then_overwrite(Config) -> Idx = ra_log:next_index(Log0), Log1 = write_two(Idx, Term, Log0), % overwrite Idx - Entry2 = {Idx, Term, "entry0_2"}, + Term2 = Term+1, + Entry2 = {Idx, Term2, "entry0_2"}, {ok, Log2} = ra_log:write_sync([Entry2], Log1), - {{Idx, Term, "entry0_2"}, Log} = ra_log:fetch(Idx, Log2), + {{Idx, Term2, "entry0_2"}, Log} = ra_log:fetch(Idx, Log2), ExpectedNextIndex = Idx + 1, % ensure last index is updated after overwrite ExpectedNextIndex = ra_log:next_index(Log), diff --git a/test/ra_log_memtbl_SUITE.erl b/test/ra_log_memtbl_SUITE.erl index 827aab2e..46082a9f 100644 --- a/test/ra_log_memtbl_SUITE.erl +++ b/test/ra_log_memtbl_SUITE.erl @@ -21,6 +21,13 @@ all() -> all_tests() -> [ basics, + record_flushed, + record_flushed_prev, + set_first, + set_first_with_multi_prev, + set_first_with_middle_small_range, + set_first_with_old_larger_range, + set_first_with_old_smaller_range, successor, successor_below, perf @@ -56,12 +63,208 @@ basics(_Config) -> Mt0 = ra_log_memtbl:init(Tid), Mt1 = lists:foldl( fun (I, Acc) -> - ra_log_memtbl:insert({I, I, <<"banana">>}, Acc) + element(2, ra_log_memtbl:insert({I, 1, <<"banana">>}, Acc)) end, Mt0, lists:seq(1, 1000)), - {[Spec], Mt} = ra_log_memtbl:set_first(500, Mt1), - 499 = ra_log_memtbl:delete(Spec, Mt), - ?assertEqual({500, 1000}, ra_log_memtbl:range(Mt)), + {[Spec], Mt2} = ra_log_memtbl:set_first(500, Mt1), + 499 = ra_log_memtbl:delete(Spec, Mt2), + ?assertEqual({500, 1000}, ra_log_memtbl:range(Mt2)), ?assertEqual(501, ets:info(Tid, size)), + {Spec2, Mt3} = ra_log_memtbl:record_flushed(Tid, {1, 999}, Mt2), + 500 = ra_log_memtbl:delete(Spec2, Mt3), + ?assertEqual(1, ra_log_memtbl:lookup_term(1000, Mt3)), + ok. + +record_flushed(_Config) -> + %%TODO: test that deletes the same spec twice + Tid = ets:new(t1, [set, public]), + Mt0 = ra_log_memtbl:init(Tid), + Mt1 = lists:foldl( + fun (I, Acc) -> + element(2, ra_log_memtbl:insert({I, 1, <<"banana">>}, Acc)) + end, Mt0, lists:seq(1, 100)), + {Spec, Mt2} = ra_log_memtbl:record_flushed(Tid, {1, 49}, Mt1), + ?assertMatch({range, _, {1, 49}}, Spec), + ?assertMatch({50, 100}, ra_log_memtbl:range(Mt2)), + {Spec2, Mt3} = ra_log_memtbl:record_flushed(Tid, {1, 49}, Mt2), + ?assertMatch(undefined, Spec2), + {Spec3, Mt4} = ra_log_memtbl:record_flushed(Tid, {50, 100}, Mt3), + ?assertMatch({range, _, {50, 100}}, Spec3), + ?assertEqual(undefined, ra_log_memtbl:range(Mt4)), + ok. + +record_flushed_prev(_Config) -> + %%TODO: test that deletes the same spec twice + Tid = ets:new(t1, [set, public]), + Mt0 = ra_log_memtbl:init(Tid), + Mt1 = lists:foldl( + fun (I, Acc) -> + element(2, ra_log_memtbl:insert({I, 1, <<"banana">>}, Acc)) + end, Mt0, lists:seq(1, 100)), + + Tid2 = ets:new(t2, [set, public]), + Mt2 = ra_log_memtbl:init_successor(Tid2, read_write, Mt1), + Mt3 = lists:foldl( + fun (I, Acc) -> + element(2, ra_log_memtbl:insert({I, 2, <<"banana">>}, Acc)) + end, Mt2, lists:seq(50, 80)), + ?assertMatch({1, 100}, ra_log_memtbl:range(ra_log_memtbl:prev(Mt3))), + %% + {Spec, Mt4} = ra_log_memtbl:record_flushed(Tid, {1, 49}, Mt3), + ?assertMatch({range, Tid, {1, 49}}, Spec), + ?assertMatch({50, 80}, ra_log_memtbl:range(Mt4)), + ?assertMatch({50, 100}, ra_log_memtbl:range(ra_log_memtbl:prev(Mt4))), + + %% delete the remainder of the old mt + {Spec2, Mt5} = ra_log_memtbl:record_flushed(Tid, {50, 100}, Mt4), + ?assertMatch({delete, Tid}, Spec2), + ?assertEqual(undefined, ra_log_memtbl:prev(Mt5)), + ?assertMatch({50, 80}, ra_log_memtbl:range(Mt5)), + ok. + +set_first(_Config) -> + %% test with prev + Tid = ets:new(t1, [set, public]), + Mt0 = ra_log_memtbl:init(Tid), + Mt1 = lists:foldl( + fun (I, Acc) -> + element(2, ra_log_memtbl:insert({I, 1, <<"banana">>}, Acc)) + end, Mt0, lists:seq(1, 100)), + Tid2 = ets:new(t2, [set, public]), + Mt2 = ra_log_memtbl:init_successor(Tid2, read_write, Mt1), + Mt3 = lists:foldl( + fun (I, Acc) -> + element(2, ra_log_memtbl:insert({I, 2, <<"banana">>}, Acc)) + end, Mt2, lists:seq(50, 120)), + {[Spec1, Spec2], Mt4} = ra_log_memtbl:set_first(75, Mt3), + ?assertMatch({range, Tid2, {50, 74}}, Spec1), + ?assertMatch({range, Tid, {1, 74}}, Spec2), + ?assertMatch({75, 120}, ra_log_memtbl:range(Mt4)), + + {[Spec3, Spec4], Mt5} = ra_log_memtbl:set_first(105, Mt4), + ?assertMatch({range, Tid2, {75, 104}}, Spec3), + ?assertMatch({delete, Tid}, Spec4), + ?assertMatch({105, 120}, ra_log_memtbl:range(Mt5)), + ?assertMatch(undefined, ra_log_memtbl:prev(Mt5)), + ok. + +set_first_with_multi_prev(_Config) -> + Tid1 = ets:new(t1, []), + Mt0 = ra_log_memtbl:init(Tid1), + Mt1 = lists:foldl( + fun (I, Acc) -> + element(2, ra_log_memtbl:insert({I, 1, <<"banana">>}, Acc)) + end, Mt0, lists:seq(1, 100)), + + Tid2 = ets:new(t2, []), + Mt2 = lists:foldl( + fun (I, Acc) -> + element(2, ra_log_memtbl:insert({I, 2, <<"banana">>}, Acc)) + end, ra_log_memtbl:init_successor(Tid2, read_write, Mt1), + lists:seq(50, 150)), + + Tid3 = ets:new(t2, []), + Mt3 = lists:foldl( + fun (I, Acc) -> + element(2, ra_log_memtbl:insert({I, 3, <<"banana">>}, Acc)) + end, ra_log_memtbl:init_successor(Tid3, read_write, Mt2), + lists:seq(75, 200)), + + ?assertEqual({1, 200}, ra_log_memtbl:range(Mt3)), + + {[{range, Tid3, {75, 79}}, + {range, Tid2, {50, 79}}, + {range, Tid1, {1, 79}}], Mt4} = ra_log_memtbl:set_first(80, Mt3), + + {[{range, Tid3, {80, 159}}, + {delete, Tid2}, + {delete, Tid1}], _Mt5} = ra_log_memtbl:set_first(160, Mt4), + ok. + +set_first_with_middle_small_range(_Config) -> + %% {1, 200}, {50, 120}, set_first(105) should delete prev completely as it + %% will never be needed?? (what about wal recovery?) + Tid1 = ets:new(t1, []), + Mt0 = ra_log_memtbl:init(Tid1), + Mt1 = lists:foldl( + fun (I, Acc) -> + element(2, ra_log_memtbl:insert({I, 1, <<"banana">>}, Acc)) + end, Mt0, lists:seq(1, 100)), + + Tid2 = ets:new(t2, []), + Mt2 = lists:foldl( + fun (I, Acc) -> + element(2, ra_log_memtbl:insert({I, 2, <<"banana">>}, Acc)) + end, ra_log_memtbl:init_successor(Tid2, read_write, Mt1), + lists:seq(50, 75)), + + Tid3 = ets:new(t2, []), + Mt3 = lists:foldl( + fun (I, Acc) -> + element(2, ra_log_memtbl:insert({I, 3, <<"banana">>}, Acc)) + end, ra_log_memtbl:init_successor(Tid3, read_write, Mt2), + lists:seq(75, 200)), + + ?assertEqual({1, 200}, ra_log_memtbl:range(Mt3)), + + % debugger:start(), + % int:i(ra_log_memtbl), + % int:break(ra_log_memtbl, 319), + {[{range, Tid3, {75, 84}}, + {delete, Tid2}, + {range, Tid1, {1, 84}}], Mt4} = ra_log_memtbl:set_first(85, Mt3), + ?assertEqual({85, 200}, ra_log_memtbl:range(Mt4)), + + {[{range, Tid3, {85, 100}}, + {delete, Tid1}], Mt5} = ra_log_memtbl:set_first(101, Mt4), + ?assertEqual({101, 200}, ra_log_memtbl:range(Mt5)), + ?assertEqual(undefined, ra_log_memtbl:prev(Mt5)), + + ok. + +set_first_with_old_larger_range(_Config) -> + %% {1, 200}, {50, 120}, set_first(105) should delete prev completely as it + %% will never be needed?? (what about wal recovery?) + Tid1 = ets:new(t1, []), + Mt0 = ra_log_memtbl:init(Tid1), + Mt1 = lists:foldl( + fun (I, Acc) -> + element(2, ra_log_memtbl:insert({I, 1, <<"banana">>}, Acc)) + end, Mt0, lists:seq(1, 100)), + + Tid2 = ets:new(t2, []), + Mt2 = lists:foldl( + fun (I, Acc) -> + element(2, ra_log_memtbl:insert({I, 2, <<"banana">>}, Acc)) + end, ra_log_memtbl:init_successor(Tid2, read_write, Mt1), + lists:seq(50, 75)), + {[{range, Tid2, {50, 75}}, + {range, Tid1, {1, 84}}], Mt3} = ra_log_memtbl:set_first(85, Mt2), + ?assertEqual(undefined, ra_log_memtbl:range(Mt3)), + %% eventually when set_first passes the end of the old range it gets + %% deleted + {[{delete, Tid1}], Mt4} = ra_log_memtbl:set_first(101, Mt3), + ?assertEqual(undefined, ra_log_memtbl:prev(Mt4)), + ok. + +set_first_with_old_smaller_range(_Config) -> + Tid1 = ets:new(t1, []), + Mt0 = ra_log_memtbl:init(Tid1), + Mt1 = lists:foldl( + fun (I, Acc) -> + element(2, ra_log_memtbl:insert({I, 1, <<"banana">>}, Acc)) + end, Mt0, lists:seq(50, 75)), + + Tid2 = ets:new(t2, []), + Mt2 = lists:foldl( + fun (I, Acc) -> + element(2, ra_log_memtbl:insert({I, 2, <<"banana">>}, Acc)) + end, ra_log_memtbl:init_successor(Tid2, read_write, Mt1), + lists:seq(1, 100)), + + ?assertEqual({1, 100}, ra_log_memtbl:range(Mt2)), + {[{range, Tid2, {1, 84}}, + {delete, Tid1}], Mt3} = ra_log_memtbl:set_first(85, Mt2), + ?assertEqual({85, 100}, ra_log_memtbl:range(Mt3)), ok. successor(_Config) -> @@ -69,14 +272,14 @@ successor(_Config) -> Mt0 = ra_log_memtbl:init(Tid), Mt1 = lists:foldl( fun (I, Acc) -> - ra_log_memtbl:insert({I, 1, <<"banana">>}, Acc) + element(2, ra_log_memtbl:insert({I, 1, <<"banana">>}, Acc)) end, Mt0, lists:seq(1, 100)), ?assertMatch({1, 100}, ra_log_memtbl:range(Mt1)), Tid2 = ets:new(t2, [set, public]), Mt2 = ra_log_memtbl:init_successor(Tid2, read_write, Mt1), Mt3 = lists:foldl( fun (I, Acc) -> - ra_log_memtbl:insert({I, 2, <<"banana">>}, Acc) + element(2, ra_log_memtbl:insert({I, 2, <<"banana">>}, Acc)) end, Mt2, lists:seq(50, 120)), ?assertMatch({1, 120}, ra_log_memtbl:range(Mt3)), @@ -93,14 +296,14 @@ successor_below(_Config) -> Mt0 = ra_log_memtbl:init(Tid), Mt1 = lists:foldl( fun (I, Acc) -> - ra_log_memtbl:insert({I, 1, <<"banana">>}, Acc) + element(2, ra_log_memtbl:insert({I, 1, <<"banana">>}, Acc)) end, Mt0, lists:seq(100, 200)), ?assertMatch({100, 200}, ra_log_memtbl:range(Mt1)), Tid2 = ets:new(t2, [set, public]), Mt2 = ra_log_memtbl:init_successor(Tid2, read_write, Mt1), Mt3 = lists:foldl( fun (I, Acc) -> - ra_log_memtbl:insert({I, 2, <<"banana">>}, Acc) + element(2, ra_log_memtbl:insert({I, 2, <<"banana">>}, Acc)) end, Mt2, lists:seq(50, 75)), ?assertMatch({50, 75}, ra_log_memtbl:range(Mt3)), @@ -189,7 +392,7 @@ perf(_Config) -> DelTo = (trunc(Num * 0.9)), % DelSpec = [{{'$1', '_'}, [], [{'<', '$1', DelTo}]}], [begin - {Spec, _} = ra_log_memtbl:set_first(DelTo-1, Mt), + {[Spec], _} = ra_log_memtbl:set_first(DelTo-1, Mt), {Taken, Deleted} = timer:tc(ra_log_memtbl, delete, [Spec, Mt]), #{name := Name, size := Size} = ra_log_memtbl:info(Mt), ct:pal("~s size ~b select_delete ~b entries took ~bms Spec ~p", @@ -219,7 +422,7 @@ insert_n(N, N, _Data, Mt) -> Mt; insert_n(K, N, Data, Mt) -> insert_n(K+1, N, Data, - ra_log_memtbl:insert({K, 42, Data}, Mt)). + element(2, ra_log_memtbl:insert({K, 42, Data}, Mt))). delete_n(N, N, Mt) -> Mt; diff --git a/test/ra_log_wal_SUITE.erl b/test/ra_log_wal_SUITE.erl index 5c66438f..884c1a62 100644 --- a/test/ra_log_wal_SUITE.erl +++ b/test/ra_log_wal_SUITE.erl @@ -581,6 +581,7 @@ out_of_seq_writes(Config) -> receive {ra_log_event, {resend_write, 4}} -> ok after 500 -> + flush(), throw(reset_write_timeout) end, % try writing 6 @@ -597,6 +598,7 @@ out_of_seq_writes(Config) -> receive {ra_log_event, {resend_write, 6}} -> ok after 500 -> + flush(), throw(written_timeout) end, % force a roll over @@ -606,7 +608,7 @@ out_of_seq_writes(Config) -> % ensure a written event is _NOT_ received % when a roll-over happens after out of sync write receive - {ra_log_event, {written, {8, 8, 1}}} -> + {ra_log_event, {written, 1, {8, 8}}} -> throw(unexpected_written_event) after 500 -> ok end, @@ -771,6 +773,16 @@ recover(Config) -> ct:fail("new_mem_tables_timeout") end, + %% try an out of sequence write to check the tracking state was recovered + % ensure an out of sync notification is received + {ok, _} = ra_log_wal:write(ra_log_wal, WriterId, Tid, 220, 2, Data), + receive + {ra_log_event, {resend_write, 201}} -> ok + after 500 -> + flush(), + throw(reset_write_timeout) + end, + meck:unload(), proc_lib:stop(Pid2), ok. @@ -939,6 +951,15 @@ recover_existing_mem_table(Config) -> ct:fail("new_mem_tables_timeout") end, + %% try an out of sequence write to check the tracking state was recovered + % ensure an out of sync notification is received + {ok, _} = ra_log_wal:write(ra_log_wal, WriterId, Tid, 220, 2, Data), + receive + {ra_log_event, {resend_write, 101}} -> ok + after 500 -> + flush(), + throw(reset_write_timeout) + end, meck:unload(), proc_lib:stop(Pid2), ok.