Skip to content
This repository has been archived by the owner on May 5, 2020. It is now read-only.

Websocket 1.1.0 proposal #19

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion elm-package.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"version": "1.0.2",
"version": "2.0.0",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should become 1.1.0 I suppose

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ibizaman the public function keepAlive no longer exists, making this a breaking change for consumers. Hence a major version bump is mandatory.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is a proposal, and while it's a breaking change as-is, if something were to be decided on where this PR could be cleaned up for actual merging, the intent would be to make it non-breaking.

"summary": "Persistent network connections, making client/server communication faster.",
"repository": "http://github.com/elm-lang/websocket.git",
"license": "BSD3",
Expand Down
203 changes: 113 additions & 90 deletions src/WebSocket.elm
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
effect module WebSocket where { command = MyCmd, subscription = MySub } exposing
( send
, listen
, keepAlive
, onOpen
, onClose
)

{-| Web sockets make it cheaper to talk to your servers.
Expand All @@ -20,14 +21,13 @@ The API here attempts to cover the typical usage scenarios, but if you need
many unique connections to the same endpoint, you need a different library.

# Web Sockets
@docs listen, keepAlive, send
@docs listen, onOpen, onClose, send

-}

import Dict
import Process
import Task exposing (Task)
import Time exposing (Time)
import WebSocket.LowLevel as WS


Expand All @@ -44,8 +44,8 @@ type MyCmd msg
send "ws://echo.websocket.org" "Hello!"

**Note:** It is important that you are also subscribed to this address with
`listen` or `keepAlive`. If you are not, the web socket will be created to
send one message and then closed. Not good!
`listen`. If you are not, the web socket will be created to send one message
and then closed. Not good!
-}
send : String -> String -> Cmd msg
send url message =
Expand All @@ -62,8 +62,7 @@ cmdMap _ (Send url msg) =


type MySub msg
= Listen String (String -> msg)
| KeepAlive String
= MySub String String (String -> msg)


{-| Subscribe to any incoming messages on a websocket. You might say something
Expand All @@ -75,38 +74,44 @@ like this:
listen "ws://echo.websocket.org" Echo

**Note:** If the connection goes down, the effect manager tries to reconnect
with an exponential backoff strategy. Any messages you try to `send` while the
connection is down are queued and will be sent as soon as possible.
with an exponential backoff strategy.
-}
listen : String -> (String -> msg) -> Sub msg
listen url tagger =
subscription (Listen url tagger)
subscription (MySub "listen" url tagger)


{-| Keep a connection alive, but do not report any messages. This is useful
for keeping a connection open for when you only need to `send` messages. So
you might say something like this:
{-| Subscribe to websocket open events. You might say something
like this:

type Msg = WsOpened String | ...

subscriptions model =
keepAlive "ws://echo.websocket.org"
onOpen WsOpened
-}
onOpen : (String -> msg) -> Sub msg
onOpen tagger =
subscription (MySub "onOpen" "" tagger)

**Note:** If the connection goes down, the effect manager tries to reconnect
with an exponential backoff strategy. Any messages you try to `send` while the
connection is down are queued and will be sent as soon as possible.

{-| Subscribe to websocket close events. You might say something
like this:

type Msg = WsClosed String | ...

subscriptions model =
onClose WsClosed
-}
keepAlive : String -> Sub msg
keepAlive url =
subscription (KeepAlive url)
onClose : (String -> msg) -> Sub msg
onClose tagger =
subscription (MySub "onClose" "" tagger)


subMap : (a -> b) -> MySub a -> MySub b
subMap func sub =
case sub of
Listen url tagger ->
Listen url (tagger >> func)

KeepAlive url ->
KeepAlive url
MySub category url tagger ->
MySub category url (tagger >> func)



Expand All @@ -115,7 +120,6 @@ subMap func sub =

type alias State msg =
{ sockets : SocketsDict
, queues : QueuesDict
, subs : SubsDict msg
}

Expand All @@ -124,12 +128,8 @@ type alias SocketsDict =
Dict.Dict String Connection


type alias QueuesDict =
Dict.Dict String (List String)


type alias SubsDict msg =
Dict.Dict String (List (String -> msg))
Dict.Dict String (Dict.Dict String (String -> msg))


type Connection
Expand All @@ -139,7 +139,7 @@ type Connection

init : Task Never (State msg)
init =
Task.succeed (State Dict.empty Dict.empty Dict.empty)
Task.succeed (State Dict.empty Dict.empty)



Expand All @@ -158,52 +158,46 @@ onEffects
-> Task Never (State msg)
onEffects router cmds subs state =
let
sendMessagesGetNewQueues =
sendMessagesHelp cmds state.sockets state.queues

newSubs =
buildSubDict subs Dict.empty

cleanup newQueues =
let
newEntries =
Dict.union newQueues (Dict.map (\k v -> []) newSubs)
newEntries =
buildEntriesDict subs Dict.empty

leftStep name _ getNewSockets =
getNewSockets
|> Task.andThen (\newSockets -> attemptOpen router 0 name
|> Task.andThen (\pid -> Task.succeed (Dict.insert name (Opening 0 pid) newSockets)))
leftStep category _ getNewSockets =
getNewSockets
|> Task.andThen (\newSockets -> attemptOpen router 0 category
|> Task.andThen (\pid -> Task.succeed (Dict.insert category (Opening 0 pid) newSockets)))

bothStep name _ connection getNewSockets =
Task.map (Dict.insert name connection) getNewSockets
bothStep category _ connection getNewSockets =
Task.map (Dict.insert category connection) getNewSockets

rightStep name connection getNewSockets =
closeConnection connection &> getNewSockets
rightStep category connection getNewSockets =
closeConnection connection &> getNewSockets

collectNewSockets =
Dict.merge leftStep bothStep rightStep newEntries state.sockets (Task.succeed Dict.empty)
in
collectNewSockets
|> Task.andThen (\newSockets -> Task.succeed (State newSockets newQueues newSubs))
collectNewSockets =
Dict.merge leftStep bothStep rightStep newEntries state.sockets (Task.succeed Dict.empty)
in
sendMessagesGetNewQueues
|> Task.andThen cleanup
cmdHelp router cmds state.sockets
&> collectNewSockets
|> Task.andThen (\newSockets -> Task.succeed (State newSockets newSubs))


sendMessagesHelp : List (MyCmd msg) -> SocketsDict -> QueuesDict -> Task x QueuesDict
sendMessagesHelp cmds socketsDict queuesDict =
cmdHelp : Platform.Router msg Msg -> List (MyCmd msg) -> SocketsDict -> Task Never SocketsDict
cmdHelp router cmds socketsDict =
case cmds of
[] ->
Task.succeed queuesDict
Task.succeed socketsDict

Send name msg :: rest ->
case Dict.get name socketsDict of
Just (Connected socket) ->
WS.send socket msg
&> sendMessagesHelp rest socketsDict queuesDict
&> cmdHelp router rest socketsDict

_ ->
sendMessagesHelp rest socketsDict (Dict.update name (add msg) queuesDict)
-- TODO: Since messages are no longer queued, this probably shouldn't just succeed
Task.succeed socketsDict


buildSubDict : List (MySub msg) -> SubsDict msg -> SubsDict msg
Expand All @@ -212,21 +206,33 @@ buildSubDict subs dict =
[] ->
dict

Listen name tagger :: rest ->
buildSubDict rest (Dict.update name (add tagger) dict)
MySub category name tagger :: rest ->
buildSubDict rest (Dict.update category (set (name, tagger)) dict)


buildEntriesDict : List (MySub msg) -> Dict.Dict String (List a) -> Dict.Dict String (List a)
buildEntriesDict subs dict =
case subs of
[] ->
dict

MySub category name tagger :: rest ->
case category of
"listen" ->
buildEntriesDict rest (Dict.update name (Just << Maybe.withDefault []) dict)

KeepAlive name :: rest ->
buildSubDict rest (Dict.update name (Just << Maybe.withDefault []) dict)
_ ->
buildEntriesDict rest dict


add : a -> Maybe (List a) -> Maybe (List a)
add value maybeList =
case maybeList of
set : (comparable, b) -> Maybe (Dict.Dict comparable b) -> Maybe (Dict.Dict comparable b)
set value maybeDict =
case maybeDict of
Nothing ->
Just [value]
Just (Dict.fromList [value])

Just list ->
Just (value :: list)
Just (Dict.fromList [value])



Expand All @@ -246,9 +252,10 @@ onSelfMsg router selfMsg state =
Receive name str ->
let
sends =
Dict.get name state.subs
|> Maybe.withDefault []
|> List.map (\tagger -> Platform.sendToApp router (tagger str))
Dict.get "listen" state.subs
|> Maybe.withDefault Dict.empty
|> Dict.toList
|> List.map (\(_, tagger) -> Platform.sendToApp router (tagger str))
in
Task.sequence sends &> Task.succeed state

Expand All @@ -257,44 +264,60 @@ onSelfMsg router selfMsg state =
Nothing ->
Task.succeed state

Just _ ->
attemptOpen router 0 name
|> Task.andThen (\pid -> Task.succeed (updateSocket name (Opening 0 pid) state))
Just (Connected _) ->
let
sends =
Dict.get "onClose" state.subs
|> Maybe.withDefault Dict.empty
|> Dict.toList
|> List.map (\(_, tagger) -> Platform.sendToApp router (tagger name))
in
Task.sequence sends
&> attemptOpen router 0 name
|> Task.andThen (\pid -> Task.succeed (updateSocket name (Opening 0 pid) state))

GoodOpen name socket ->
case Dict.get name state.queues of
Nothing ->
Task.succeed (updateSocket name (Connected socket) state)
Just (Opening n _) ->
retryConnection router n name state

Just messages ->
List.foldl
(\msg task -> WS.send socket msg &> task)
(Task.succeed (removeQueue name (updateSocket name (Connected socket) state)))
messages
GoodOpen name socket ->
let
sends =
Dict.get "onOpen" state.subs
|> Maybe.withDefault Dict.empty
|> Dict.toList
|> List.map (\(_, tagger) -> Platform.sendToApp router (tagger name))
in
Task.sequence sends
&> Task.succeed (updateSocket name (Connected socket) state)

BadOpen name ->
case Dict.get name state.sockets of
Nothing ->
Task.succeed state

Just (Opening n _) ->
attemptOpen router (n + 1) name
|> Task.andThen (\pid -> Task.succeed (updateSocket name (Opening (n + 1) pid) state))
retryConnection router n name state

Just (Connected _) ->
Task.succeed state


retryConnection
: Platform.Router msg Msg
-> Int
-> String
-> State msg
-> Task x (State msg)
retryConnection router n name state =
attemptOpen router (n + 1) name
|> Task.andThen (\pid -> Task.succeed (updateSocket name (Opening (n + 1) pid) state))


updateSocket : String -> Connection -> State msg -> State msg
updateSocket name connection state =
{ state | sockets = Dict.insert name connection state.sockets }


removeQueue : String -> State msg -> State msg
removeQueue name state =
{ state | queues = Dict.remove name state.queues }



-- OPENING WEBSOCKETS WITH EXPONENTIAL BACKOFF

Expand Down