Skip to content

Commit

Permalink
Refactor reconnect code, prevent duplicate closed messages
Browse files Browse the repository at this point in the history
  • Loading branch information
mobileoverlord committed Jan 25, 2019
1 parent 082fce5 commit 2bcf002
Showing 1 changed file with 25 additions and 22 deletions.
47 changes: 25 additions & 22 deletions lib/phoenix_client/socket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,6 @@ defmodule PhoenixClient.Socket do
{:noreply, state}
end

def handle_info({:closed, reason, transport_pid}, %{transport_pid: transport_pid} = state) do
close(reason, state)
{:noreply, %{state | status: :disconnected}}
end

def handle_info(:flush, %{status: :connected} = state) do
state =
case :queue.out(state.queue) do
Expand Down Expand Up @@ -177,14 +172,17 @@ defmodule PhoenixClient.Socket do
|> to_string

{:ok, pid} = state[:transport].open(url, state[:transport_opts])
{:noreply, %{state | transport_pid: pid}}
{:noreply, %{state | transport_pid: pid, reconnect_timer: nil}}
end

# Handle Errors in the transport and channels
def handle_info({:closed, reason, transport_pid}, %{transport_pid: transport_pid, reconnect_timer: nil} = state) do
{:noreply, close(reason, state)}
end

# Trap exits on the transport pid.
# If the transport failed
def handle_info({:EXIT, transport_pid, reason}, %{transport_pid: transport_pid} = state) do
close(reason, state)
{:noreply, %{state | status: :disconnected, transport_pid: nil}}
def handle_info({:EXIT, transport_pid, reason}, %{transport_pid: transport_pid, reconnect_timer: nil} = state) do
state = %{state | transport_pid: nil}
{:noreply, close(reason, state)}
end

# Unlink a channel if the process goes down
Expand All @@ -193,31 +191,36 @@ defmodule PhoenixClient.Socket do
{:noreply, %{s | channels: channels}}
end

def handle_info(_message, state) do
{:noreply, state}
end

defp transport_receive(message, state) do
%{channels: channels, json_library: json_library} = state
decoded = Message.decode!(message, json_library)

Enum.filter(channels, fn {_channel, channel_topic} ->
decoded.topic == channel_topic
end)
|> Enum.each(fn {channel, _} ->
send(channel, decoded)
end)
channels
|> Enum.filter(&elem(&1, 1) == decoded.topic)
|> Enum.each(&elem(&1, 0) |> send(decoded))
end

def transport_send(message, %{transport_pid: pid, json_library: json_library}) do
defp transport_send(message, %{transport_pid: pid, json_library: json_library}) do
send(pid, {:send, Message.encode!(message, json_library)})
end

defp close(reason, state) do
state = %{state | status: :disconnected}
message = %Message{event: close_event(reason), payload: %{reason: reason}}

Enum.each(state.channels, fn {pid, _channel} ->
for {pid, _channel} <- state.channels do
send(pid, message)
end)
end

if state.reconnect == true do
:erlang.send_after(state[:reconnect_interval], self(), :connect)
if state.reconnect and state.reconnect_timer == nil do
timer_ref = Process.send_after(self(), :connect, state.reconnect_interval)
%{state | reconnect_timer: timer_ref}
else
state
end
end

Expand Down

0 comments on commit 2bcf002

Please sign in to comment.