Skip to content

Commit

Permalink
Cancel checks on other nodes in the segment when check is executed
Browse files Browse the repository at this point in the history
Add random delays to the check intervals
  • Loading branch information
arcusfelis committed Jan 31, 2024
1 parent 7cb7316 commit 9724c76
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 34 deletions.
72 changes: 53 additions & 19 deletions src/cets_discovery.erl
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
%% Join result information.

-type state() :: #{
name => atom(),
phase := initial | regular,
results := [join_result()],
nodes := ordsets:ordset(node()),
Expand Down Expand Up @@ -216,7 +217,7 @@ wait_for_get_nodes(Server, Timeout) ->
%% @private
-spec init(term()) -> {ok, state()}.
init(Opts) ->
StartTime = erlang:system_time(millisecond),
StartTime = get_time(),
%% Sends nodeup / nodedown
ok = net_kernel:monitor_nodes(true),
Mod = maps:get(backend_module, Opts, cets_discovery_file),
Expand All @@ -226,6 +227,7 @@ init(Opts) ->
%% Changes phase from initial to regular (affects the check interval)
erlang:send_after(timer:minutes(5), self(), enter_regular_phase),
State = #{
name => maps:get(name, Opts, undefined),
phase => initial,
results => [],
nodes => [],
Expand Down Expand Up @@ -317,6 +319,8 @@ handle_info({ping_result, Node, Result}, State) ->
{noreply, handle_ping_result(Node, Result, State)};
handle_info(enter_regular_phase, State) ->
{noreply, State#{phase := regular}};
handle_info({cancel_check, Node}, State) ->
{noreply, handle_cancel_check(Node, State)};
handle_info(Msg, State) ->
?LOG_ERROR(#{what => unexpected_info, msg => Msg}),
{noreply, State}.
Expand All @@ -335,13 +339,34 @@ handle_check(State = #{get_nodes_status := running}) ->
handle_check(State = #{backend_module := Mod, backend_state := BackendState}) ->
Self = self(),
spawn_link(fun() ->
send_cancel_check_to_other_nodes(State),
Info = #{task => cets_discovery_get_nodes, backend_module => Mod},
F = fun() -> Mod:get_nodes(BackendState) end,
{Result, BackendState2} = cets_long:run_tracked(Info, F),
Self ! {handle_check_result, Result, BackendState2}
end),
State#{get_nodes_status := running}.

-spec send_cancel_check_to_other_nodes(state()) -> ok.
send_cancel_check_to_other_nodes(State = #{nodes := Nodes}) ->
AliveNodes = lists:sort(nodes()),
AliveKnownNodes = ordsets:intersection(Nodes, AliveNodes),
broadcast_to_other_nodes(AliveKnownNodes, {cancel_check, node()}, State).

-spec handle_cancel_check(node(), state()) -> state().
handle_cancel_check(_FromNode, State) ->
%% Remote node asked us to skip our check to reduce
%% resource usage and reduce a chance of a race condition.
%% Start a new timeout for the next check.
schedule_check(State).

-spec broadcast_to_other_nodes([node()], term(), state()) -> ok.
broadcast_to_other_nodes([_ | _] = Nodes, Msg, #{name := Name}) ->
_ = [erlang:send({Name, Node}, Msg) || Node <- Nodes],
ok;
broadcast_to_other_nodes(_Nodes, _Msg, _State) ->
ok.

-spec handle_get_nodes_result(Result, BackendState, State) -> State when
Result :: get_nodes_result(), BackendState :: backend_state(), State :: state().
handle_get_nodes_result(Result, BackendState, State) ->
Expand Down Expand Up @@ -480,11 +505,12 @@ last_node_down(#{nodedown_timestamps := Map}) ->
%% Returns timeout in milliseconds to retry calling the get_nodes function.
%% get_nodes is called after add_table without waiting.
%% It is also would be retried without waiting if should_retry_get_nodes set to true.
-spec retry_type_to_timeout(retry_type()) -> non_neg_integer().
retry_type_to_timeout(initial) -> timer:seconds(5);
retry_type_to_timeout(after_error) -> timer:seconds(1);
retry_type_to_timeout(regular) -> timer:minutes(5);
retry_type_to_timeout(after_nodedown) -> timer:seconds(30).
%% Add random extra delay, so all nodes would not try to join at the same time.
-spec retry_type_to_timeout(retry_type()) -> pos_integer().
retry_type_to_timeout(initial) -> timer:seconds(5) + rand:uniform(timer:seconds(1));
retry_type_to_timeout(after_error) -> timer:seconds(1) + rand:uniform(timer:seconds(1));
retry_type_to_timeout(regular) -> timer:minutes(5) + rand:uniform(timer:seconds(5));
retry_type_to_timeout(after_nodedown) -> timer:seconds(30) + rand:uniform(timer:seconds(5)).

-spec cancel_old_timer(state()) -> ok.
cancel_old_timer(#{timer_ref := OldRef}) when is_reference(OldRef) ->
Expand Down Expand Up @@ -582,7 +608,10 @@ has_join_result_for(Node, Table, #{results := Results}) ->

-spec handle_system_info(state()) -> system_info().
handle_system_info(State) ->
State#{verify_ready => verify_ready(State)}.
State#{
verify_ready => verify_ready(State),
connected_nodes => nodes()
}.

-spec handle_nodedown(node(), state()) -> state().
handle_nodedown(Node, State = #{unavailable_nodes := UnNodes}) ->
Expand Down Expand Up @@ -618,13 +647,13 @@ handle_nodeup(Node, State) ->

-spec remember_nodeup_timestamp(node(), state()) -> state().
remember_nodeup_timestamp(Node, State = #{nodeup_timestamps := Map}) ->
Time = erlang:system_time(millisecond),
Time = get_time(),
Map2 = Map#{Node => Time},
State#{nodeup_timestamps := Map2}.

-spec remember_nodedown_timestamp(node(), state()) -> state().
remember_nodedown_timestamp(Node, State = #{nodedown_timestamps := Map}) ->
Time = erlang:system_time(millisecond),
Time = get_time(),
Map2 = Map#{Node => Time},
State#{nodedown_timestamps := Map2}.

Expand All @@ -640,7 +669,7 @@ calculate_uptime(undefined) ->
calculate_uptime(StartTime) ->
time_since(StartTime).

-spec get_downtime(node(), state()) -> milliseconds().
-spec get_downtime(node(), state()) -> milliseconds() | undefined.
get_downtime(Node, #{nodedown_timestamps := Map}) ->
case maps:get(Node, Map, undefined) of
undefined ->
Expand All @@ -657,16 +686,21 @@ set_defined(Key, Value, Map) ->
time_since_startup_in_milliseconds(#{start_time := StartTime}) ->
time_since(StartTime).

-spec time_since(milliseconds()) -> milliseconds().
time_since(StartTime) ->
erlang:system_time(millisecond) - StartTime.

send_start_time_to(Node, #{start_time := StartTime}) ->
case erlang:process_info(self(), registered_name) of
{registered_name, Name} ->
erlang:send({Name, Node}, {start_time, node(), StartTime});
_ ->
ok
end.
%% dialyzer thinks that integer() - integer() could be float().
%% use floor to ensure it is integer().
floor(get_time() - StartTime).

-spec get_time() -> milliseconds().
get_time() ->
erlang:system_time(millisecond).

send_start_time_to(Node, #{start_time := StartTime, name := Name}) ->
erlang:send({Name, Node}, {start_time, node(), StartTime});
send_start_time_to(_Node, _State) ->
%% No name
ok.

handle_receive_start_time(Node, StartTime, State = #{node_start_timestamps := Map}) ->
case maps:get(Node, Map, undefined) of
Expand Down
86 changes: 71 additions & 15 deletions test/cets_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,10 @@ cases() ->
unexpected_nodedown_is_ignored_by_disco,
ignore_send_dump_received_when_unpaused,
ignore_send_dump_received_when_paused_with_another_pause_ref,
pause_on_remote_node_returns_if_monitor_process_dies
pause_on_remote_node_returns_if_monitor_process_dies,
disco_name_is_in_system_info,
cancel_check_is_sent_after_check,
cancel_check_resets_check_timeout
].

only_for_logger_cases() ->
Expand Down Expand Up @@ -1603,20 +1606,7 @@ test_disco_add_two_tables(Config) ->
receive_message(waited_for_sent_both),
%% try_joining would be called after set_nodes,
%% but it is async, so wait until it is done:
cets_test_wait:wait_until(
fun() ->
maps:with(
[get_nodes_status, should_retry_get_nodes, join_status, should_retry_join],
cets_discovery:system_info(Disco)
)
end,
#{
get_nodes_status => not_running,
should_retry_get_nodes => false,
join_status => not_running,
should_retry_join => false
}
),
wait_for_get_nodes_not_running(Disco),
[
#{memory := _, nodes := [Node1, Node2], size := 0, table := Tab1},
#{memory := _, nodes := [Node1, Node2], size := 0, table := Tab2}
Expand Down Expand Up @@ -2822,6 +2812,48 @@ ping_pairs_returns_earlier(Config) ->
[{Me, Me, pong}, {Me, Node2, pong}, {Me, Bad, pang}, {Me, Node3, skipped}] =
cets_ping:ping_pairs([{Me, Me}, {Me, Node2}, {Me, Bad}, {Me, Node3}]).

disco_name_is_in_system_info(Config) ->
DiscoName = disco_name(Config),
F = fun(State) -> {{ok, []}, State} end,
Disco = start_disco(node(), #{
name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F
}),
#{name := DiscoName} = cets_discovery:system_info(Disco),
ok.

cancel_check_is_sent_after_check(Config) ->
DiscoName = disco_name(Config),
register(DiscoName, self()),
#{ct2 := Node2} = proplists:get_value(nodes, Config),
Node1 = node(),
F = fun(State) -> {{ok, [Node1, Node2]}, State} end,
Disco = start_disco(Node2, #{
name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F
}),
ok = cets_discovery:wait_for_ready(Disco, 5000),
#{nodes := [Node1, Node2]} = cets_discovery:system_info(Disco),
%% Force check, so we do not wait for long.
Disco ! check,
receive_message({cancel_check, Node2}).

cancel_check_resets_check_timeout(Config) ->
DiscoName = disco_name(Config),
#{ct2 := Node2} = proplists:get_value(nodes, Config),
Node1 = node(),
F = fun(State) -> {{ok, [Node1, Node2]}, State} end,
Disco = start_disco(Node2, #{
name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F
}),
ok = cets_discovery:wait_for_ready(Disco, 5000),
Disco ! enter_regular_phase,
Disco ! check,
wait_for_get_nodes_not_running(Disco),
%% Now next check would be not soon.
%% So, we could be sure nobody would change timer_ref, interfering with our code.
#{timer_ref := OldTRef} = cets_discovery:system_info(Disco),
Disco ! {cancel_check, Node1},
wait_for_different_timer_ref(Disco, OldTRef).

%% Helper functions

receive_all_logs(Id) ->
Expand Down Expand Up @@ -3237,6 +3269,30 @@ wait_for_join_ref_to_match(Pid, JoinRef) ->
end,
cets_test_wait:wait_until(Cond, JoinRef).

wait_for_get_nodes_not_running(Disco) ->
cets_test_wait:wait_until(
fun() ->
maps:with(
[get_nodes_status, should_retry_get_nodes, join_status, should_retry_join],
cets_discovery:system_info(Disco)
)
end,
#{
get_nodes_status => not_running,
should_retry_get_nodes => false,
join_status => not_running,
should_retry_join => false
}
).

wait_for_different_timer_ref(Disco, OldTRef) ->
cets_test_wait:wait_until(
fun() ->
maps:get(timer_ref, cets_discovery:system_info(Disco)) =/= OldTRef
end,
true
).

get_disco_timestamp(Disco, MapName, NodeKey) ->
Info = cets_discovery:system_info(Disco),
#{MapName := #{NodeKey := Timestamp}} = Info,
Expand Down

0 comments on commit 9724c76

Please sign in to comment.