From 6cca059b4ab2b11a5652e242924853d58ec4f334 Mon Sep 17 00:00:00 2001 From: Namek Date: Fri, 14 Jul 2017 10:44:34 +0200 Subject: [PATCH] dev(WebSocket): support onOpen, onClose based on https://github.com/elm-lang/websocket/pull/19 - merged with binary data support --- client/src/Main.elm | 22 +++- client/src/WebSocket.elm | 254 +++++++++++++++++++++++---------------- 2 files changed, 173 insertions(+), 103 deletions(-) diff --git a/client/src/Main.elm b/client/src/Main.elm index 54a50a0..a4d97c1 100644 --- a/client/src/Main.elm +++ b/client/src/Main.elm @@ -99,6 +99,8 @@ updateSystemStats systems index entitiesCount maxEntitiesCount = type Msg = Input String | Send + | OnWebsocketOpen String + | OnWebsocketClose String | NewNetworkMessage MessageData | Msg_Unknown | Msg_OnAddedSystem Int String (Maybe BitVector) (Maybe BitVector) (Maybe BitVector) @@ -125,6 +127,20 @@ update msg model = , WebSocket.send websocketUrl (ArrayBuffer (stringToBufferArray input)) ) + OnWebsocketOpen url -> + let + _ = + Debug.log "websocket open" url + in + model ! [] + + OnWebsocketClose url -> + let + _ = + Debug.log "websocket close" url + in + model ! [] + NewNetworkMessage (String str) -> { model | messages = str :: messages } ! [] @@ -289,7 +305,11 @@ deserializePacket objModelNodes valueTrees componentTypes bytes = subscriptions : Model -> Sub Msg subscriptions model = - WebSocket.listen websocketUrl NewNetworkMessage + Sub.batch + [ WebSocket.listen websocketUrl NewNetworkMessage + , WebSocket.onOpen OnWebsocketOpen + , WebSocket.onClose OnWebsocketClose + ] diff --git a/client/src/WebSocket.elm b/client/src/WebSocket.elm index b7be213..6a92c18 100644 --- a/client/src/WebSocket.elm +++ b/client/src/WebSocket.elm @@ -2,8 +2,9 @@ effect module WebSocket where { command = MyCmd, subscription = MySub } exposing ( MessageData - , keepAlive , listen + , onClose + , onOpen , send ) @@ -25,7 +26,7 @@ many unique connections to the same endpoint, you need a different library. # Web Sockets -@docs MessageData, listen, keepAlive, send +@docs MessageData, listen, onOpen, onClose, send -} @@ -54,8 +55,8 @@ type MyCmd msg send "ws://echo.websocket.org" "Hello!" **Note:** It is important that you are also subscribed to this address with -`listen` or `keepAlive`. If you are not, the web socket will be created to -send one message and then closed. Not good! +`listen`. If you are not, the web socket will be created to send one message +and then closed. Not good! -} send : String -> MessageData -> Cmd msg @@ -73,8 +74,8 @@ cmdMap _ (Send url msg) = type MySub msg - = Listen String (MessageData -> msg) - | KeepAlive String + = MyMessageSub String String (MessageData -> msg) + | MyUrlSub String String (String -> msg) {-| Subscribe to any incoming messages on a websocket. You might say something @@ -86,40 +87,50 @@ like this: listen "ws://echo.websocket.org" Echo **Note:** If the connection goes down, the effect manager tries to reconnect -with an exponential backoff strategy. Any messages you try to `send` while the -connection is down are queued and will be sent as soon as possible. +with an exponential backoff strategy. -} listen : String -> (MessageData -> msg) -> Sub msg listen url tagger = - subscription (Listen url tagger) + subscription (MyMessageSub "listen" url tagger) -{-| Keep a connection alive, but do not report any messages. This is useful -for keeping a connection open for when you only need to `send` messages. So -you might say something like this: +{-| Subscribe to websocket open events. You might say something +like this: + + type Msg = WsOpened String | ... subscriptions model = - keepAlive "ws://echo.websocket.org" + onOpen WsOpened -**Note:** If the connection goes down, the effect manager tries to reconnect -with an exponential backoff strategy. Any messages you try to `send` while the -connection is down are queued and will be sent as soon as possible. +-} +onOpen : (String -> msg) -> Sub msg +onOpen tagger = + subscription (MyUrlSub "onOpen" "" tagger) + + +{-| Subscribe to websocket close events. You might say something +like this: + + type Msg = WsClosed String | ... + + subscriptions model = + onClose WsClosed -} -keepAlive : String -> Sub msg -keepAlive url = - subscription (KeepAlive url) +onClose : (String -> msg) -> Sub msg +onClose tagger = + subscription (MyUrlSub "onClose" "" tagger) subMap : (a -> b) -> MySub a -> MySub b subMap func sub = case sub of - Listen url tagger -> - Listen url (tagger >> func) + MyUrlSub category url tagger -> + MyUrlSub category url (tagger >> func) - KeepAlive url -> - KeepAlive url + MyMessageSub category message tagger -> + MyMessageSub category message (tagger >> func) @@ -128,8 +139,8 @@ subMap func sub = type alias State msg = { sockets : SocketsDict - , queues : QueuesDict - , subs : SubsDict msg + , urlSubs : UrlSubsDict msg + , messageSubs : MessageSubsDict msg } @@ -137,12 +148,12 @@ type alias SocketsDict = Dict.Dict String Connection -type alias QueuesDict = - Dict.Dict String (List MessageData) +type alias MessageSubsDict msg = + Dict.Dict String (Dict.Dict String (MessageData -> msg)) -type alias SubsDict msg = - Dict.Dict String (List (MessageData -> msg)) +type alias UrlSubsDict msg = + Dict.Dict String (Dict.Dict String (String -> msg)) type Connection @@ -171,81 +182,106 @@ onEffects : -> Task Never (State msg) onEffects router cmds subs state = let - sendMessagesGetNewQueues = - sendMessagesHelp cmds state.sockets state.queues - - newSubs = - buildSubDict subs Dict.empty - - cleanup newQueues = - let - newEntries = - Dict.union newQueues (Dict.map (\k v -> []) newSubs) - - leftStep name _ getNewSockets = - getNewSockets - |> Task.andThen - (\newSockets -> - attemptOpen router 0 name - |> Task.andThen (\pid -> Task.succeed (Dict.insert name (Opening 0 pid) newSockets)) - ) + newUrlSubs = + buildUrlSubDict subs Dict.empty - bothStep name _ connection getNewSockets = - Task.map (Dict.insert name connection) getNewSockets + newMessageSubs = + buildMessageSubDict subs Dict.empty - rightStep name connection getNewSockets = - closeConnection connection &> getNewSockets + newEntries = + buildEntriesDict subs Dict.empty - collectNewSockets = - Dict.merge leftStep bothStep rightStep newEntries state.sockets (Task.succeed Dict.empty) - in - collectNewSockets + leftStep category _ getNewSockets = + getNewSockets |> Task.andThen (\newSockets -> - Task.succeed (State newSockets newQueues newSubs) + attemptOpen router 0 category + |> Task.andThen (\pid -> Task.succeed (Dict.insert category (Opening 0 pid) newSockets)) ) + + bothStep category _ connection getNewSockets = + Task.map (Dict.insert category connection) getNewSockets + + rightStep category connection getNewSockets = + closeConnection connection &> getNewSockets + + collectNewSockets = + Dict.merge leftStep bothStep rightStep newEntries state.sockets (Task.succeed Dict.empty) in - sendMessagesGetNewQueues - |> Task.andThen cleanup + cmdHelp router cmds state.sockets + &> collectNewSockets + |> Task.andThen (\newSockets -> Task.succeed (State newSockets newUrlSubs newMessageSubs)) -sendMessagesHelp : List (MyCmd msg) -> SocketsDict -> QueuesDict -> Task x QueuesDict -sendMessagesHelp cmds socketsDict queuesDict = +cmdHelp : Platform.Router msg Msg -> List (MyCmd msg) -> SocketsDict -> Task Never SocketsDict +cmdHelp router cmds socketsDict = case cmds of [] -> - Task.succeed queuesDict + Task.succeed socketsDict (Send name msg) :: rest -> case Dict.get name socketsDict of Just (Connected socket) -> WS.send socket msg - &> sendMessagesHelp rest socketsDict queuesDict + &> cmdHelp router rest socketsDict _ -> - sendMessagesHelp rest socketsDict (Dict.update name (add msg) queuesDict) + -- TODO: Since messages are no longer queued, this probably shouldn't just succeed + Task.succeed socketsDict + + +buildUrlSubDict : List (MySub msg) -> UrlSubsDict msg -> UrlSubsDict msg +buildUrlSubDict subs dict = + case subs of + [] -> + dict + + (MyUrlSub category name tagger) :: rest -> + buildUrlSubDict rest (Dict.update category (set ( name, tagger )) dict) + _ :: rest -> + buildUrlSubDict rest dict -buildSubDict : List (MySub msg) -> SubsDict msg -> SubsDict msg -buildSubDict subs dict = + +buildMessageSubDict : List (MySub msg) -> MessageSubsDict msg -> MessageSubsDict msg +buildMessageSubDict subs dict = + case subs of + [] -> + dict + + (MyMessageSub category name tagger) :: rest -> + buildMessageSubDict rest (Dict.update category (set ( name, tagger )) dict) + + _ :: rest -> + buildMessageSubDict rest dict + + +buildEntriesDict : List (MySub msg) -> Dict.Dict String (List a) -> Dict.Dict String (List a) +buildEntriesDict subs dict = case subs of [] -> dict - (Listen name tagger) :: rest -> - buildSubDict rest (Dict.update name (add tagger) dict) + (MyMessageSub category name tagger) :: rest -> + case category of + "listen" -> + buildEntriesDict rest (Dict.update name (Just << Maybe.withDefault []) dict) + + _ -> + buildEntriesDict rest dict - (KeepAlive name) :: rest -> - buildSubDict rest (Dict.update name (Just << Maybe.withDefault []) dict) + _ :: rest -> + buildEntriesDict rest dict -add : a -> Maybe (List a) -> Maybe (List a) -add value maybeList = - case maybeList of +set : ( comparable, b ) -> Maybe (Dict.Dict comparable b) -> Maybe (Dict.Dict comparable b) +set value maybeDict = + case maybeDict of Nothing -> - Just [ value ] + Just (Dict.fromList [ value ]) Just list -> - Just (value :: list) + Just (Dict.fromList [ value ]) @@ -265,9 +301,10 @@ onSelfMsg router selfMsg state = Receive name messageData -> let sends = - Dict.get name state.subs - |> Maybe.withDefault [] - |> List.map (\tagger -> Platform.sendToApp router (tagger messageData)) + Dict.get "listen" state.messageSubs + |> Maybe.withDefault Dict.empty + |> Dict.toList + |> List.map (\( _, tagger ) -> Platform.sendToApp router (tagger messageData)) in Task.sequence sends &> Task.succeed state @@ -276,23 +313,34 @@ onSelfMsg router selfMsg state = Nothing -> Task.succeed state - Just _ -> - attemptOpen router 0 name - |> Task.andThen - (\pid -> - Task.succeed (updateSocket name (Opening 0 pid) state) - ) + Just (Connected _) -> + let + sendsl = + Dict.get "onClose" state.urlSubs + |> Maybe.withDefault Dict.empty + |> Dict.toList + + sends = + sendsl + |> List.map (\( _, tagger ) -> Platform.sendToApp router (tagger name)) + in + Task.sequence sends + &> attemptOpen router 0 name + |> Task.andThen (\pid -> Task.succeed (updateSocket name (Opening 0 pid) state)) - GoodOpen name socket -> - case Dict.get name state.queues of - Nothing -> - Task.succeed (updateSocket name (Connected socket) state) + Just (Opening n _) -> + retryConnection router n name state - Just messages -> - List.foldl - (\msg task -> WS.send socket msg &> task) - (Task.succeed (removeQueue name (updateSocket name (Connected socket) state))) - messages + GoodOpen name socket -> + let + sends = + Dict.get "onOpen" state.urlSubs + |> Maybe.withDefault Dict.empty + |> Dict.toList + |> List.map (\( _, tagger ) -> Platform.sendToApp router (tagger name)) + in + Task.sequence sends + &> Task.succeed (updateSocket name (Connected socket) state) BadOpen name -> case Dict.get name state.sockets of @@ -300,26 +348,28 @@ onSelfMsg router selfMsg state = Task.succeed state Just (Opening n _) -> - attemptOpen router (n + 1) name - |> Task.andThen - (\pid -> - Task.succeed (updateSocket name (Opening (n + 1) pid) state) - ) + retryConnection router n name state Just (Connected _) -> Task.succeed state +retryConnection : + Platform.Router msg Msg + -> Int + -> String + -> State msg + -> Task x (State msg) +retryConnection router n name state = + attemptOpen router (n + 1) name + |> Task.andThen (\pid -> Task.succeed (updateSocket name (Opening (n + 1) pid) state)) + + updateSocket : String -> Connection -> State msg -> State msg updateSocket name connection state = { state | sockets = Dict.insert name connection state.sockets } -removeQueue : String -> State msg -> State msg -removeQueue name state = - { state | queues = Dict.remove name state.queues } - - -- OPENING WEBSOCKETS WITH EXPONENTIAL BACKOFF