diff --git a/src/cets_discovery.erl b/src/cets_discovery.erl index 5970eaa8..b930609d 100644 --- a/src/cets_discovery.erl +++ b/src/cets_discovery.erl @@ -104,6 +104,7 @@ %% Join result information. -type state() :: #{ + name => atom(), phase := initial | regular, results := [join_result()], nodes := ordsets:ordset(node()), @@ -216,7 +217,7 @@ wait_for_get_nodes(Server, Timeout) -> %% @private -spec init(term()) -> {ok, state()}. init(Opts) -> - StartTime = erlang:system_time(millisecond), + StartTime = get_time(), %% Sends nodeup / nodedown ok = net_kernel:monitor_nodes(true), Mod = maps:get(backend_module, Opts, cets_discovery_file), @@ -226,6 +227,7 @@ init(Opts) -> %% Changes phase from initial to regular (affects the check interval) erlang:send_after(timer:minutes(5), self(), enter_regular_phase), State = #{ + name => maps:get(name, Opts, undefined), phase => initial, results => [], nodes => [], @@ -317,6 +319,8 @@ handle_info({ping_result, Node, Result}, State) -> {noreply, handle_ping_result(Node, Result, State)}; handle_info(enter_regular_phase, State) -> {noreply, State#{phase := regular}}; +handle_info({cancel_check, Node}, State) -> + {noreply, handle_cancel_check(Node, State)}; handle_info(Msg, State) -> ?LOG_ERROR(#{what => unexpected_info, msg => Msg}), {noreply, State}. @@ -335,6 +339,7 @@ handle_check(State = #{get_nodes_status := running}) -> handle_check(State = #{backend_module := Mod, backend_state := BackendState}) -> Self = self(), spawn_link(fun() -> + send_cancel_check_to_other_nodes(State), Info = #{task => cets_discovery_get_nodes, backend_module => Mod}, F = fun() -> Mod:get_nodes(BackendState) end, {Result, BackendState2} = cets_long:run_tracked(Info, F), @@ -342,6 +347,26 @@ handle_check(State = #{backend_module := Mod, backend_state := BackendState}) -> end), State#{get_nodes_status := running}. +-spec send_cancel_check_to_other_nodes(state()) -> ok. +send_cancel_check_to_other_nodes(State = #{nodes := Nodes}) -> + AliveNodes = lists:sort(nodes()), + AliveKnownNodes = ordsets:intersection(Nodes, AliveNodes), + broadcast_to_other_nodes(AliveKnownNodes, {cancel_check, node()}, State). + +-spec handle_cancel_check(node(), state()) -> state(). +handle_cancel_check(_FromNode, State) -> + %% Remote node asked us to skip our check to reduce + %% resource usage and reduce a chance of a race condition. + %% Start a new timeout for the next check. + schedule_check(State). + +-spec broadcast_to_other_nodes([node()], term(), state()) -> ok. +broadcast_to_other_nodes([_ | _] = Nodes, Msg, #{name := Name}) -> + _ = [erlang:send({Name, Node}, Msg) || Node <- Nodes], + ok; +broadcast_to_other_nodes(_Nodes, _Msg, _State) -> + ok. + -spec handle_get_nodes_result(Result, BackendState, State) -> State when Result :: get_nodes_result(), BackendState :: backend_state(), State :: state(). handle_get_nodes_result(Result, BackendState, State) -> @@ -480,11 +505,12 @@ last_node_down(#{nodedown_timestamps := Map}) -> %% Returns timeout in milliseconds to retry calling the get_nodes function. %% get_nodes is called after add_table without waiting. %% It is also would be retried without waiting if should_retry_get_nodes set to true. --spec retry_type_to_timeout(retry_type()) -> non_neg_integer(). -retry_type_to_timeout(initial) -> timer:seconds(5); -retry_type_to_timeout(after_error) -> timer:seconds(1); -retry_type_to_timeout(regular) -> timer:minutes(5); -retry_type_to_timeout(after_nodedown) -> timer:seconds(30). +%% Add random extra delay, so all nodes would not try to join at the same time. +-spec retry_type_to_timeout(retry_type()) -> pos_integer(). +retry_type_to_timeout(initial) -> timer:seconds(5) + rand:uniform(timer:seconds(1)); +retry_type_to_timeout(after_error) -> timer:seconds(1) + rand:uniform(timer:seconds(1)); +retry_type_to_timeout(regular) -> timer:minutes(5) + rand:uniform(timer:seconds(5)); +retry_type_to_timeout(after_nodedown) -> timer:seconds(30) + rand:uniform(timer:seconds(5)). -spec cancel_old_timer(state()) -> ok. cancel_old_timer(#{timer_ref := OldRef}) when is_reference(OldRef) -> @@ -582,7 +608,10 @@ has_join_result_for(Node, Table, #{results := Results}) -> -spec handle_system_info(state()) -> system_info(). handle_system_info(State) -> - State#{verify_ready => verify_ready(State)}. + State#{ + verify_ready => verify_ready(State), + connected_nodes => nodes() + }. -spec handle_nodedown(node(), state()) -> state(). handle_nodedown(Node, State = #{unavailable_nodes := UnNodes}) -> @@ -618,13 +647,13 @@ handle_nodeup(Node, State) -> -spec remember_nodeup_timestamp(node(), state()) -> state(). remember_nodeup_timestamp(Node, State = #{nodeup_timestamps := Map}) -> - Time = erlang:system_time(millisecond), + Time = get_time(), Map2 = Map#{Node => Time}, State#{nodeup_timestamps := Map2}. -spec remember_nodedown_timestamp(node(), state()) -> state(). remember_nodedown_timestamp(Node, State = #{nodedown_timestamps := Map}) -> - Time = erlang:system_time(millisecond), + Time = get_time(), Map2 = Map#{Node => Time}, State#{nodedown_timestamps := Map2}. @@ -640,7 +669,7 @@ calculate_uptime(undefined) -> calculate_uptime(StartTime) -> time_since(StartTime). --spec get_downtime(node(), state()) -> milliseconds(). +-spec get_downtime(node(), state()) -> milliseconds() | undefined. get_downtime(Node, #{nodedown_timestamps := Map}) -> case maps:get(Node, Map, undefined) of undefined -> @@ -657,16 +686,21 @@ set_defined(Key, Value, Map) -> time_since_startup_in_milliseconds(#{start_time := StartTime}) -> time_since(StartTime). +-spec time_since(milliseconds()) -> milliseconds(). time_since(StartTime) -> - erlang:system_time(millisecond) - StartTime. - -send_start_time_to(Node, #{start_time := StartTime}) -> - case erlang:process_info(self(), registered_name) of - {registered_name, Name} -> - erlang:send({Name, Node}, {start_time, node(), StartTime}); - _ -> - ok - end. + %% dialyzer thinks that integer() - integer() could be float(). + %% use floor to ensure it is integer(). + floor(get_time() - StartTime). + +-spec get_time() -> milliseconds(). +get_time() -> + erlang:system_time(millisecond). + +send_start_time_to(Node, #{start_time := StartTime, name := Name}) -> + erlang:send({Name, Node}, {start_time, node(), StartTime}); +send_start_time_to(_Node, _State) -> + %% No name + ok. handle_receive_start_time(Node, StartTime, State = #{node_start_timestamps := Map}) -> case maps:get(Node, Map, undefined) of diff --git a/test/cets_SUITE.erl b/test/cets_SUITE.erl index 8a92f1af..8c3988ca 100644 --- a/test/cets_SUITE.erl +++ b/test/cets_SUITE.erl @@ -165,7 +165,10 @@ cases() -> unexpected_nodedown_is_ignored_by_disco, ignore_send_dump_received_when_unpaused, ignore_send_dump_received_when_paused_with_another_pause_ref, - pause_on_remote_node_returns_if_monitor_process_dies + pause_on_remote_node_returns_if_monitor_process_dies, + disco_name_is_in_system_info, + cancel_check_is_sent_after_check, + cancel_check_resets_check_timeout ]. only_for_logger_cases() -> @@ -1603,20 +1606,7 @@ test_disco_add_two_tables(Config) -> receive_message(waited_for_sent_both), %% try_joining would be called after set_nodes, %% but it is async, so wait until it is done: - cets_test_wait:wait_until( - fun() -> - maps:with( - [get_nodes_status, should_retry_get_nodes, join_status, should_retry_join], - cets_discovery:system_info(Disco) - ) - end, - #{ - get_nodes_status => not_running, - should_retry_get_nodes => false, - join_status => not_running, - should_retry_join => false - } - ), + wait_for_get_nodes_not_running(Disco), [ #{memory := _, nodes := [Node1, Node2], size := 0, table := Tab1}, #{memory := _, nodes := [Node1, Node2], size := 0, table := Tab2} @@ -2822,6 +2812,48 @@ ping_pairs_returns_earlier(Config) -> [{Me, Me, pong}, {Me, Node2, pong}, {Me, Bad, pang}, {Me, Node3, skipped}] = cets_ping:ping_pairs([{Me, Me}, {Me, Node2}, {Me, Bad}, {Me, Node3}]). +disco_name_is_in_system_info(Config) -> + DiscoName = disco_name(Config), + F = fun(State) -> {{ok, []}, State} end, + Disco = start_disco(node(), #{ + name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F + }), + #{name := DiscoName} = cets_discovery:system_info(Disco), + ok. + +cancel_check_is_sent_after_check(Config) -> + DiscoName = disco_name(Config), + register(DiscoName, self()), + #{ct2 := Node2} = proplists:get_value(nodes, Config), + Node1 = node(), + F = fun(State) -> {{ok, [Node1, Node2]}, State} end, + Disco = start_disco(Node2, #{ + name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F + }), + ok = cets_discovery:wait_for_ready(Disco, 5000), + #{nodes := [Node1, Node2]} = cets_discovery:system_info(Disco), + %% Force check, so we do not wait for long. + Disco ! check, + receive_message({cancel_check, Node2}). + +cancel_check_resets_check_timeout(Config) -> + DiscoName = disco_name(Config), + #{ct2 := Node2} = proplists:get_value(nodes, Config), + Node1 = node(), + F = fun(State) -> {{ok, [Node1, Node2]}, State} end, + Disco = start_disco(Node2, #{ + name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F + }), + ok = cets_discovery:wait_for_ready(Disco, 5000), + Disco ! enter_regular_phase, + Disco ! check, + wait_for_get_nodes_not_running(Disco), + %% Now next check would be not soon. + %% So, we could be sure nobody would change timer_ref, interfering with our code. + #{timer_ref := OldTRef} = cets_discovery:system_info(Disco), + Disco ! {cancel_check, Node1}, + wait_for_different_timer_ref(Disco, OldTRef). + %% Helper functions receive_all_logs(Id) -> @@ -3237,6 +3269,30 @@ wait_for_join_ref_to_match(Pid, JoinRef) -> end, cets_test_wait:wait_until(Cond, JoinRef). +wait_for_get_nodes_not_running(Disco) -> + cets_test_wait:wait_until( + fun() -> + maps:with( + [get_nodes_status, should_retry_get_nodes, join_status, should_retry_join], + cets_discovery:system_info(Disco) + ) + end, + #{ + get_nodes_status => not_running, + should_retry_get_nodes => false, + join_status => not_running, + should_retry_join => false + } + ). + +wait_for_different_timer_ref(Disco, OldTRef) -> + cets_test_wait:wait_until( + fun() -> + maps:get(timer_ref, cets_discovery:system_info(Disco)) =/= OldTRef + end, + true + ). + get_disco_timestamp(Disco, MapName, NodeKey) -> Info = cets_discovery:system_info(Disco), #{MapName := #{NodeKey := Timestamp}} = Info,