diff --git a/src/ra_directory.erl b/src/ra_directory.erl index bf96c71d..8f1242f6 100644 --- a/src/ra_directory.erl +++ b/src/ra_directory.erl @@ -19,7 +19,8 @@ pid_of/2, uid_of/2, overview/1, - list_registered/1 + list_registered/1, + is_registered_uid/2 ]). -export_type([ @@ -199,6 +200,13 @@ list_registered(System) when is_atom(System) -> Tbl = get_reverse(System), dets:select(Tbl, [{'_', [], ['$_']}]). +-spec is_registered_uid(atom(), ra_uid()) -> boolean(). +is_registered_uid(System, UId) + when is_atom(System) andalso + is_binary(UId) -> + Tbl = get_reverse(System), + [] =/= dets:select(Tbl, [{{'_', UId}, [], ['$_']}]). + get_name(#{directory := Tbl}) -> Tbl; get_name(System) when is_atom(System) -> diff --git a/src/ra_log_pre_init.erl b/src/ra_log_pre_init.erl index 0831aff0..dc04fcc2 100644 --- a/src/ra_log_pre_init.erl +++ b/src/ra_log_pre_init.erl @@ -22,6 +22,7 @@ -record(state, {}). +-define(ETSTBL, ra_log_snapshot_state). %%%=================================================================== %%% API functions %%%=================================================================== @@ -40,7 +41,16 @@ init([System]) -> Regd = ra_directory:list_registered(System), ?INFO("ra system '~ts' running pre init for ~b registered servers", [System, length(Regd)]), - _ = [catch(pre_init(System, Name)) || {Name, _U} <- Regd], + _ = [begin + try pre_init(System, Name, UId) of + ok -> ok + catch _:Err -> + ?ERROR("pre_init failed in system ~s for UId ~ts with name ~ts" + " This error may need manual intervention", + [System, UId, Name]), + throw({stop, {error, Err}}) + end + end|| {Name, UId} <- Regd], {ok, #state{} , hibernate}. handle_call(_Request, _From, State) -> @@ -63,8 +73,20 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%=================================================================== -pre_init(System, Name) -> - {ok, #{log_init_args := Log}} = ra_server_sup_sup:recover_config(System, Name), - _ = ra_log:pre_init(Log), - ok. +pre_init(System, Name, UId) -> + case ets:lookup(?ETSTBL, UId) of + [{_, _}] -> + %% already initialised + ok; + [] -> + case ra_system:fetch(System) of + undefined -> + {error, system_not_started}; + SysCfg -> + {ok, #{log_init_args := Log}} = + ra_server_sup_sup:recover_config(System, Name), + ok = ra_log:pre_init(Log#{system_config => SysCfg}), + ok + end + end. diff --git a/src/ra_log_segment_writer.erl b/src/ra_log_segment_writer.erl index 83791f9d..e555801f 100644 --- a/src/ra_log_segment_writer.erl +++ b/src/ra_log_segment_writer.erl @@ -308,40 +308,57 @@ append_to_segment(_, _, StartIdx, EndIdx, Seg, Closed, _State) when StartIdx >= EndIdx -> {Seg, Closed}; append_to_segment(UId, Tid, Idx, EndIdx, Seg0, Closed, State) -> - [{_, Term, Data0}] = ets:lookup(Tid, Idx), - Data = term_to_iovec(Data0), - DataSize = iolist_size(Data), - case ra_log_segment:append(Seg0, Idx, Term, {DataSize, Data}) of - {ok, Seg} -> - ok = counters:add(State#state.counter, ?C_ENTRIES, 1), - %% this isn't completely accurate as firstly the segment may not - %% have written it to disk and it doesn't include data written to - %% the segment index but is probably good enough to get comparative - %% data rates for different Ra components - ok = counters:add(State#state.counter, ?C_BYTES_WRITTEN, DataSize), - append_to_segment(UId, Tid, Idx+1, EndIdx, Seg, Closed, State); - {error, full} -> - % close and open a new segment - case open_successor_segment(Seg0, State#state.segment_conf) of - undefined -> - %% a successor cannot be opened - this is most likely due - %% to the directory having been deleted. - %% clear close mem tables here - _ = ets:delete(Tid), - _ = clean_closed_mem_tables(State#state.system, UId, Tid), - undefined; - Seg -> - ok = counters:add(State#state.counter, ?C_SEGMENTS, 1), - %% re-evaluate snapshot state for the server in case - %% a snapshot has completed during segment flush - StartIdx = start_index(UId, Idx), - % recurse - append_to_segment(UId, Tid, StartIdx, EndIdx, Seg, - [Seg0 | Closed], State) + case ets:lookup(Tid, Idx) of + [] -> + %% oh dear, an expected index was not found in the mem table. + ?WARN("segment_writer: missing index ~b in mem table ~s for uid ~s" + "checking to see if UId has been unregistered", + [Idx, Tid, UId]), + case ra_directory:is_registered_uid(State#state.system, UId) of + true -> + ?ERROR("segment_writer: uid ~s is registered, exiting...", + [UId]), + exit({missing_index, UId, Idx}); + false -> + ?INFO("segment_writer: UId ~s was not registered, skipping", + [UId]), + undefined end; - {error, Posix} -> - FileName = ra_log_segment:filename(Seg0), - exit({segment_writer_append_error, FileName, Posix}) + [{_, Term, Data0}] -> + Data = term_to_iovec(Data0), + DataSize = iolist_size(Data), + case ra_log_segment:append(Seg0, Idx, Term, {DataSize, Data}) of + {ok, Seg} -> + ok = counters:add(State#state.counter, ?C_ENTRIES, 1), + %% this isn't completely accurate as firstly the segment may not + %% have written it to disk and it doesn't include data written to + %% the segment index but is probably good enough to get comparative + %% data rates for different Ra components + ok = counters:add(State#state.counter, ?C_BYTES_WRITTEN, DataSize), + append_to_segment(UId, Tid, Idx+1, EndIdx, Seg, Closed, State); + {error, full} -> + % close and open a new segment + case open_successor_segment(Seg0, State#state.segment_conf) of + undefined -> + %% a successor cannot be opened - this is most likely due + %% to the directory having been deleted. + %% clear close mem tables here + _ = ets:delete(Tid), + _ = clean_closed_mem_tables(State#state.system, UId, Tid), + undefined; + Seg -> + ok = counters:add(State#state.counter, ?C_SEGMENTS, 1), + %% re-evaluate snapshot state for the server in case + %% a snapshot has completed during segment flush + StartIdx = start_index(UId, Idx), + % recurse + append_to_segment(UId, Tid, StartIdx, EndIdx, Seg, + [Seg0 | Closed], State) + end; + {error, Posix} -> + FileName = ra_log_segment:filename(Seg0), + exit({segment_writer_append_error, FileName, Posix}) + end end. find_segment_files(Dir) -> diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index 9218fe6c..6d4216de 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -1022,11 +1022,17 @@ terminate(Reason, StateName, ra_server:system_config(ServerState), UId = uid(State), Id = id(State), - _ = ra_server:terminate(ServerState, Reason), case Reason of {shutdown, delete} -> Parent = ra_directory:where_is_parent(Names, UId), + %% we need to unregister _before_ the log closes + %% in the ra_server:terminate/2 function + %% as we want the directory to be deleted + %% after the server is removed from the ra directory. + %% This is so that the segment writer can avoid + %% crashing if it detects a missing key catch ra_directory:unregister_name(Names, UId), + _ = ra_server:terminate(ServerState, Reason), catch ra_log_meta:delete_sync(MetaName, UId), catch ra_counters:delete(Id), Self = self(), @@ -1044,9 +1050,9 @@ terminate(Reason, StateName, end end), ok; - - - _ -> ok + _ -> + _ = ra_server:terminate(ServerState, Reason), + ok end, catch ra_leaderboard:clear(ClusterName), _ = ets:delete(ra_metrics, MetricsKey), diff --git a/test/ra_SUITE.erl b/test/ra_SUITE.erl index 6c209502..59f606a4 100644 --- a/test/ra_SUITE.erl +++ b/test/ra_SUITE.erl @@ -977,11 +977,12 @@ snapshot_installation_with_call_crash(Config) -> {ok, _, _} = ra:process_command(Leader, deq), meck:new(ra_server, [passthrough]), - meck:expect(ra_server, handle_follower, fun (#install_snapshot_rpc{}, _) -> - exit(timeout); - (A, B) -> - meck:passthrough([A, B]) - end), + meck:expect(ra_server, handle_follower, + fun (#install_snapshot_rpc{}, _) -> + exit(timeout); + (A, B) -> + meck:passthrough([A, B]) + end), %% start the down node again, catchup should involve sending a snapshot ok = ra:restart_server(?SYS, Down), @@ -994,7 +995,7 @@ snapshot_installation_with_call_crash(Config) -> {ok, {N2Idx, _}, _} = ra:local_query(N2, fun ra_lib:id/1), {ok, {N3Idx, _}, _} = ra:local_query(N3, fun ra_lib:id/1), (N1Idx == N2Idx) and (N1Idx == N3Idx) - end, 20)), + end, 200)), ok. diff --git a/test/ra_dbg_SUITE.erl b/test/ra_dbg_SUITE.erl index cfeed7af..0f2745be 100644 --- a/test/ra_dbg_SUITE.erl +++ b/test/ra_dbg_SUITE.erl @@ -9,7 +9,6 @@ -compile(nowarn_export_all). -compile(export_all). --include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). all() -> diff --git a/test/ra_directory_SUITE.erl b/test/ra_directory_SUITE.erl index 0d9b5d71..22369115 100644 --- a/test/ra_directory_SUITE.erl +++ b/test/ra_directory_SUITE.erl @@ -72,6 +72,7 @@ basics(_Config) -> % registrations should always succeed - no negative test Self = ra_directory:where_is(?SYS, UId), UId = ra_directory:uid_of(?SYS, test1), + ?assert(ra_directory:is_registered_uid(?SYS, UId)), % ensure it can be read from another process _ = spawn_link( fun () -> @@ -86,6 +87,7 @@ basics(_Config) -> undefined = ra_directory:name_of(?SYS, UId), undefined = ra_directory:cluster_name_of(?SYS, UId), undefined = ra_directory:uid_of(?SYS, test1), + ?assertNot(ra_directory:is_registered_uid(?SYS, UId)), ok. persistence(_Config) -> diff --git a/test/ra_log_2_SUITE.erl b/test/ra_log_2_SUITE.erl index 28622b66..19bf6d1e 100644 --- a/test/ra_log_2_SUITE.erl +++ b/test/ra_log_2_SUITE.erl @@ -67,31 +67,38 @@ groups() -> ]. init_per_suite(Config) -> - {ok, _} = ra:start([{data_dir, ?config(priv_dir, Config)}, - {segment_max_entries, 128}]), Config. end_per_suite(Config) -> - application:stop(ra), Config. init_per_group(G, Config) -> - [{access_pattern, G} | Config]. + DataDir = filename:join(?config(priv_dir, Config), G), + [{access_pattern, G}, + {work_dir, DataDir} + | Config]. end_per_group(_, Config) -> Config. init_per_testcase(TestCase, Config) -> + ok = start_ra(Config), ra_env:configure_logger(logger), - PrivDir = ?config(priv_dir, Config), + DataDir = ?config(work_dir, Config), UId = <<(atom_to_binary(TestCase, utf8))/binary, (atom_to_binary(?config(access_pattern, Config)))/binary>>, - ra:start(), ok = ra_directory:register_name(default, UId, self(), undefined, TestCase, TestCase), - [{uid, UId}, {test_case, TestCase}, {wal_dir, PrivDir} | Config]. + ServerConf = #{log_init_args => #{uid => UId}}, + + ok = ra_lib:make_dir(filename:join([DataDir, node(), UId])), + ok = ra_lib:write_file(filename:join([DataDir, node(), UId, "config"]), + list_to_binary(io_lib:format("~p.", [ServerConf]))), + + [{uid, UId}, {test_case, TestCase}, {wal_dir, DataDir} | Config]. end_per_testcase(_, _Config) -> + application:stop(ra), ok. -define(N1, {n1, node()}). @@ -429,13 +436,13 @@ written_event_after_snapshot(Config) -> ok. writes_lower_than_snapshot_index_are_dropped(Config) -> + logger:set_primary_config(level, debug), Log0 = ra_log_init(Config, #{min_snapshot_interval => 1}), Log1 = ra_log:append({1, 1, <<"one">>}, Log0), Log1b = deliver_all_log_events(ra_log:append({2, 1, <<"two">>}, Log1), 500), true = erlang:suspend_process(whereis(ra_log_wal)), Log2 = write_n(3, 500, 1, Log1b), - {Log3, _} = ra_log:update_release_cursor(100, #{}, 1, - <<"100">>, Log2), + {Log3, _} = ra_log:update_release_cursor(100, #{}, 1, <<"100">>, Log2), Log4 = deliver_all_log_events(Log3, 500), Overview = ra_log:overview(Log4), @@ -473,8 +480,10 @@ writes_lower_than_snapshot_index_are_dropped(Config) -> cache_size := 0, cache_range := undefined, last_written_index_term := {499, 1}}, OverviewAfter), + %% restart the app to test recovery with a "gappy" wal + application:stop(ra), + start_ra(Config), erlang:monitor(process, whereis(ra_log_segment_writer)), - exit(whereis(ra_log_wal), kill), receive {'DOWN', _, _, _, _} = D -> ct:fail("DOWN received ~p", [D]) @@ -589,7 +598,7 @@ recovery(Config) -> Log4 = assert_log_events(Log3, Pred, 2000), ra_log:close(Log4), application:stop(ra), - ra:start(), + start_ra(Config), Log5 = ra_log_init(Config), {20, 3} = ra_log:last_index_term(Log5), @@ -610,7 +619,7 @@ recover_bigly(Config) -> Log2 = assert_log_events(Log1, Pred, 2000), ra_log:close(Log2), application:stop(ra), - ra:start(), + start_ra(Config), Log = ra_log_init(Config), {9999, 1} = ra_log:last_written(Log), {9999, 1} = ra_log:last_index_term(Log), @@ -1150,7 +1159,7 @@ transient_writer_is_handled(Config) -> Self = self(), UId2 = <<(?config(uid, Config))/binary, "sub_proc">>, _Pid = spawn(fun () -> - ra_directory:register_name(default, <<"sub_proc">>, + ra_directory:register_name(default, UId2, self(), undefined, sub_proc, sub_proc), Log0 = ra_log_init(Config, #{uid => UId2}), @@ -1158,13 +1167,15 @@ transient_writer_is_handled(Config) -> % ignore events Log2 = deliver_all_log_events(Log1, 500), ra_log:close(Log2), - Self ! done + Self ! done, + ok end), receive done -> ok after 2000 -> exit(timeout) end, - ra:start(), + UId2 = ra_directory:unregister_name(default, UId2), _ = ra_log_init(Config), + ct:pal("~p", [ra_directory:list_registered(default)]), ok. open_segments_limit(Config) -> @@ -1499,8 +1510,8 @@ meta(Idx, Term, Cluster) -> machine_version => 1}. create_snapshot_chunk(Config, #{index := Idx} = Meta, Context) -> - OthDir = filename:join(?config(priv_dir, Config), "snapshot_installation"), - CPDir = filename:join(?config(priv_dir, Config), "checkpoints"), + OthDir = filename:join(?config(work_dir, Config), "snapshot_installation"), + CPDir = filename:join(?config(work_dir, Config), "checkpoints"), ok = ra_lib:make_dir(OthDir), ok = ra_lib:make_dir(CPDir), Sn0 = ra_snapshot:init(<<"someotheruid_adsfasdf">>, ra_log_snapshot, @@ -1538,3 +1549,8 @@ restart_wal() -> ok = supervisor:terminate_child(SupPid, ra_log_wal), {ok, _} = supervisor:restart_child(SupPid, ra_log_wal), ok. + +start_ra(Config) -> + {ok, _} = ra:start([{data_dir, ?config(work_dir, Config)}, + {segment_max_entries, 128}]), + ok. diff --git a/test/ra_log_segment_writer_SUITE.erl b/test/ra_log_segment_writer_SUITE.erl index 40299ab1..b8c150fe 100644 --- a/test/ra_log_segment_writer_SUITE.erl +++ b/test/ra_log_segment_writer_SUITE.erl @@ -29,6 +29,7 @@ all_tests() -> accept_mem_tables_rollover, accept_mem_tables_for_down_server, accept_mem_tables_with_corrupt_segment, + accept_mem_tables_with_gap, accept_mem_tables_with_delete_server, truncate_segments, truncate_segments_with_pending_update, @@ -521,6 +522,37 @@ accept_mem_tables_with_corrupt_segment(Config) -> ok = gen_server:stop(TblWriterPid), ok. +accept_mem_tables_with_gap(Config) -> + ets:new(ra_log_closed_mem_tables, [named_table, bag, public]), + Dir = ?config(wal_dir, Config), + UId = ?config(uid, Config), + application:start(sasl), + {ok, TblWriterPid} = ra_log_segment_writer:start_link(#{system => default, + name => ?SEGWR, + data_dir => Dir}), + % fake up a mem segment with a gap due to snapshot index + Tid = make_mem_table(UId, [{1, 42, a}, {3, 42, c}, {4, 42, d}]), + MemTables = [{UId, 1, 4, Tid}], + % ets:insert(ra_log_closed_mem_tables, {FakeUId, 1, 1, 3, Tid}), + WalFile = filename:join(Dir, "00001.wal"), + ok = file:write_file(WalFile, <<"waldata">>), + %% however the server is being deleted and is at the stage where it's + %% diretory still exists but it has been unregistered + UId = ra_directory:unregister_name(default, UId), + ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, MemTables, WalFile), + receive + {ra_log_event, {segments, Tid, [{1, 3, _Fn}]}} -> + ct:fail("unexpected segments received") + after 200 -> + ok + end, + ok = ra_lib:retry(fun() -> + false = filelib:is_file(WalFile), + ok + end, 5, 100), + ok = gen_server:stop(TblWriterPid), + ok. + accept_mem_tables_with_delete_server(Config) -> ets:new(ra_log_closed_mem_tables, [named_table, bag, public]), Dir = ?config(wal_dir, Config), @@ -573,6 +605,7 @@ accept_mem_tables_with_delete_server(Config) -> end, 5, 100), ok = gen_server:stop(TblWriterPid), ok. + %%% Internal fake_mem_table(UId, Dir, Entries) -> diff --git a/test/ra_log_wal_SUITE.erl b/test/ra_log_wal_SUITE.erl index c00c14e1..d885c7f5 100644 --- a/test/ra_log_wal_SUITE.erl +++ b/test/ra_log_wal_SUITE.erl @@ -886,13 +886,34 @@ drop_writes_if_snapshot_has_higher_index(Config) -> ets:insert(ra_log_snapshot_state, {UId, 20}), {ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 14, 1, "value2"), + {ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 21, 1, "value2"), + ra_lib:dump(ets:tab2list(ra_log_open_mem_tables)), + {ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 22, 1, "value2"), + ets:insert(ra_log_snapshot_state, {UId, 21}), + {ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 23, 1, "value2"), timer:sleep(500), + Self = self(), + meck:new(ra_log_segment_writer, [passthrough]), + meck:expect(ra_log_segment_writer, await, fun(_) -> ok end), + meck:expect(ra_log_segment_writer, accept_mem_tables, + fun(_, M, _) -> Self ! M, ok end), undefined = mem_tbl_read(UId, 14), ra_lib:dump(ets:tab2list(ra_log_open_mem_tables)), + true = ets:delete(ra_log_snapshot_state, UId), proc_lib:stop(Pid), [{_, _, _, Tid}] = ets:lookup(ra_log_open_mem_tables, UId), ?assert(not ets:info(Tid, compressed)), + flush(), + {ok, _Pid} = ra_log_wal:start_link(Conf), + receive + [{UId, _, _, Tbl} = MT] -> + ct:pal("MT ~p Tbl: ~p", [MT, ets:tab2list(Tbl)]) + after 5000 -> + flush(), + ct:fail("bah") + end, + meck:unload(), ok. empty_mailbox() -> @@ -962,6 +983,7 @@ tbl_lookup([{_, _First, Last, Tid} | Tail], Idx) when Last >= Idx -> end; tbl_lookup([_ | Tail], Idx) -> tbl_lookup(Tail, Idx). + flush() -> receive Msg -> ct:pal("flush: ~p", [Msg]),