Skip to content

Commit

Permalink
Merge pull request #45 from esl/disco_pang_result_arrives_after_nodeup
Browse files Browse the repository at this point in the history
Fix unavailableNodes list having joined nodes
  • Loading branch information
chrzaszcz authored Dec 6, 2023
2 parents 9c5b2c9 + c0c4fb8 commit 0e3f83e
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 6 deletions.
12 changes: 10 additions & 2 deletions src/cets_discovery.erl
Original file line number Diff line number Diff line change
Expand Up @@ -363,14 +363,22 @@ ping_not_connected_nodes(Nodes) ->
Self = self(),
NotConNodes = Nodes -- [node() | nodes()],
[
spawn(fun() -> Self ! {ping_result, Node, cets_ping:ping(Node)} end)
spawn_link(fun() -> cets_ping:send_ping_result(Self, Node, cets_ping:ping(Node)) end)
|| Node <- lists:sort(NotConNodes)
],
ok.

-spec handle_ping_result(node(), pong | pang, state()) -> state().
handle_ping_result(Node, pang, State = #{unavailable_nodes := UnNodes}) ->
trigger_verify_ready(State#{unavailable_nodes := ordsets:add_element(Node, UnNodes)});
State2 =
case lists:member(Node, nodes()) of
true ->
%% Race condition between nodeup and ping results
State;
false ->
State#{unavailable_nodes := ordsets:add_element(Node, UnNodes)}
end,
trigger_verify_ready(State2);
handle_ping_result(_Node, pong, State) ->
State.

Expand Down
8 changes: 7 additions & 1 deletion src/cets_ping.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-module(cets_ping).
-export([ping/1, ping_pairs/1, pre_connect/1]).
-export([ping/1, ping_pairs/1, pre_connect/1, send_ping_result/3]).

-ifdef(TEST).
-export([net_family/1]).
Expand Down Expand Up @@ -150,3 +150,9 @@ get_epmd_port() ->
error ->
4369
end.

%% Send ping result back to cets_discovery
-spec send_ping_result(pid(), node(), pong | pang) -> ok.
send_ping_result(SendTo, Node, PingResult) ->
SendTo ! {ping_result, Node, PingResult},
ok.
59 changes: 56 additions & 3 deletions test/cets_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ seq_cases() ->
disco_node_down_timestamp_is_remembered,
disco_nodeup_timestamp_is_updated_after_node_reconnects,
disco_node_start_timestamp_is_updated_after_node_restarts,
disco_late_pang_result_arrives_after_node_went_up,
disco_nodeup_triggers_check_and_get_nodes,
ping_pairs_returns_pongs,
ping_pairs_returns_earlier,
Expand All @@ -205,7 +206,8 @@ cets_seq_no_log_cases() ->
disco_node_up_timestamp_is_remembered,
disco_node_down_timestamp_is_remembered,
disco_nodeup_timestamp_is_updated_after_node_reconnects,
disco_node_start_timestamp_is_updated_after_node_restarts
disco_node_start_timestamp_is_updated_after_node_restarts,
disco_late_pang_result_arrives_after_node_went_up
].

init_per_suite(Config) ->
Expand Down Expand Up @@ -2471,6 +2473,38 @@ disco_node_start_timestamp_is_updated_after_node_restarts(Config) ->
simulate_disco_restart(Setup),
wait_for_disco_timestamp_to_be_updated(Disco, node_start_timestamps, Node2, OldTimestamp).

disco_late_pang_result_arrives_after_node_went_up(Config) ->
Node1 = node(),
#{ct2 := Node2} = proplists:get_value(nodes, Config),
%% unavailable_nodes list contains nodes which have not responded to pings.
%% Ping is async though.
%% So, there could be the situation when the result of ping would be processed
%% after the node actually got connected.
meck:new(cets_ping, [passthrough]),
Me = self(),
meck:expect(cets_ping, send_ping_result, fun(Pid, Node, _PingResult) ->
%% Wait until Node is up
Cond = fun() -> lists:member(Node, nodes()) end,
cets_test_wait:wait_until(Cond, true),
Me ! send_ping_result_called,
%% Return pang to cets_discovery.
%% cets_join does not use send_ping_result function
%% and would receive pong and join correctly.
meck:passthrough([Pid, Node, pang])
end),
try
%% setup_two_nodes_and_discovery would call disconnect_node/2 function
Setup = setup_two_nodes_and_discovery(Config, [wait, disco2]),
receive_message(send_ping_result_called),
#{disco_name := DiscoName} = Setup,
Status = cets_status:status(DiscoName),
%% Check that pang is ignored and unavailable_nodes list is empty.
?assertMatch([], maps:get(unavailable_nodes, Status)),
?assertMatch([Node1, Node2], maps:get(joined_nodes, Status))
after
meck:unload()
end.

disco_nodeup_triggers_check_and_get_nodes(Config) ->
Setup = setup_two_nodes_and_discovery(Config, [wait, notify_get_nodes]),
#{disco := Disco, node2 := Node2} = Setup,
Expand Down Expand Up @@ -2743,12 +2777,17 @@ given_n_servers(Config, N, Opts) ->
setup_two_nodes_and_discovery(Config) ->
setup_two_nodes_and_discovery(Config, []).

%% Flags:
%% - disco2 - start discovery on Node2
%% - wait - call wait_for_ready/2
setup_two_nodes_and_discovery(Config, Flags) ->
ok = net_kernel:monitor_nodes(true),
Me = self(),
Node1 = node(),
#{ct2 := Peer2} = proplists:get_value(peers, Config),
#{ct2 := Node2} = proplists:get_value(nodes, Config),
disconnect_node(Peer2, Node1),
receive_message({nodedown, Node2}),
Tab = make_name(Config),
{ok, _Pid1} = start(Node1, Tab),
{ok, _Pid2} = start(Peer2, Tab),
Expand All @@ -2771,14 +2810,21 @@ setup_two_nodes_and_discovery(Config, Flags) ->
case lists:member(disco2, Flags) of
true ->
Disco2 = start_disco(Node2, DiscoOpts),
cets_discovery:add_table(Disco2, Tab),
#{disco2 => Disco2};
false ->
#{}
end,
cets_discovery:add_table(Disco, Tab),
case lists:member(wait, Flags) of
true ->
ok = cets_discovery:wait_for_ready(Disco, 5000);
try
ok = cets_discovery:wait_for_ready(Disco, 5000)
catch
Class:Reason:Stacktrace ->
ct:pal("system_info: ~p", [cets_discovery:system_info(Disco)]),
erlang:raise(Class, Reason, Stacktrace)
end;
false ->
ok
end,
Expand All @@ -2789,7 +2835,14 @@ setup_two_nodes_and_discovery(Config, Flags) ->
false ->
ok
end,
Res#{disco_opts => DiscoOpts, disco => Disco, node1 => Node1, node2 => Node2, peer2 => Peer2}.
Res#{
disco_name => DiscoName,
disco_opts => DiscoOpts,
disco => Disco,
node1 => Node1,
node2 => Node2,
peer2 => Peer2
}.

simulate_disco_restart(#{
disco_opts := DiscoOpts,
Expand Down

0 comments on commit 0e3f83e

Please sign in to comment.