Skip to content

Commit

Permalink
kernel: Prepare pg for protocol upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
sverker authored and Ledest committed Jun 10, 2024
1 parent 680b0ef commit 89e54d6
Showing 1 changed file with 23 additions and 11 deletions.
34 changes: 23 additions & 11 deletions src_ext/pg.erl
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ handle_cast(_, _State) ->

-spec handle_info(
{discover, Peer :: pid()} |
{discover, Peer :: pid(), any()} |
{join, Peer :: pid(), group(), pid() | [pid()]} |
{leave, Peer :: pid(), pid() | [pid()], [group()]} |
{'DOWN', reference(), process, pid(), term()} |
Expand Down Expand Up @@ -396,17 +397,13 @@ handle_info({leave, Peer, PidOrPids, Groups}, #state{scope = Scope, remote = Rem
end;

%% we're being discovered, let's exchange!
handle_info({discover, Peer}, #state{remote = Remote, local = Local} = State) ->
gen_server:cast(Peer, {sync, self(), all_local_pids(Local)}),
%% do we know who is looking for us?
case maps:is_key(Peer, Remote) of
true ->
{noreply, State};
false ->
MRef = erlang:monitor(process, Peer),
erlang:send(Peer, {discover, self()}, [noconnect]),
{noreply, State#state{remote = Remote#{Peer => {MRef, #{}}}}}
end;
handle_info({discover, Peer}, State) ->
handle_discover(Peer, State);

%% New discover message sent by a future pg version.
%% Accepted first in OTP 26, to be used by OTP 28 or later.
handle_info({discover, Peer, _ProtocolVersion}, State) ->
handle_discover(Peer, State);

%% handle local process exit, or a local monitor exit
handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, local = Local,
Expand Down Expand Up @@ -454,6 +451,21 @@ terminate(_Reason, #state{scope = Scope}) ->
%%--------------------------------------------------------------------
%% Internal implementation

handle_discover(Peer, #state{remote = Remote, local = Local} = State) ->
gen_server:cast(Peer, {sync, self(), all_local_pids(Local)}),
%% do we know who is looking for us?
case maps:is_key(Peer, Remote) of
true ->
{noreply, State};
false ->
MRef = erlang:monitor(process, Peer),
erlang:send(Peer, {discover, self()}, [noconnect]),
{noreply, State#state{remote = Remote#{Peer => {MRef, #{}}}}}
end;
handle_discover(_, _) ->
erlang:error(badarg).


%% Ensures argument is either a node-local pid or a list of such, or it throws an error
ensure_local(Pid) when is_pid(Pid), node(Pid) =:= node() ->
ok;
Expand Down

0 comments on commit 89e54d6

Please sign in to comment.