From ffdc73589a5a25a342b81379e165c38a8af7d06e Mon Sep 17 00:00:00 2001 From: Joshua Elliott Date: Fri, 8 Aug 2014 22:27:41 -0600 Subject: [PATCH] Move wamp-2 to project root - issue #37 --- README.md | 32 +- wampv2/broker.go => broker.go | 0 client.go | 371 ------------ client/client.go | 95 ---- client_test.go | 150 ----- wampv2/dealer.go => dealer.go | 0 wampv2/dealer_test.go => dealer_test.go | 0 wampv2/doc.go => doc.go | 0 wampv2/endpoint.go => endpoint.go | 0 exampleclient_test.go | 15 - examples/chat/.gitignore | 4 - examples/chat/build.sh | 4 - examples/chat/chatserver/main.go | 6 + examples/chat/pubspec.lock | 4 - examples/chat/pubspec.yaml | 6 - examples/chat/server.go | 19 - examples/chat/web/chat.dart | 28 - examples/chat/web/index.html | 10 - examples/hello/server.go | 19 - examples/hello/web/hello.js | 18 - examples/hello/web/index.html | 9 - examples/pubsub/client.go | 26 - examples/pubsub/server.go | 16 - examples/rpc/client.go | 22 - examples/rpc/server.go | 21 - exampleserver_test.go | 16 - wampv2/message.go => message.go | 0 prefix.go | 53 -- wampv2/realm.go => realm.go | 0 wampv2/router.go => router.go | 0 wampv2/router_test.go => router_test.go | 0 wampv2/serialize.go => serialize.go | 0 wampv2/serialize_test.go => serialize_test.go | 0 server.go | 528 ------------------ server/server.go | 23 - server_test.go | 247 -------- wampv2/session.go => session.go | 0 test_test.go.back | 18 + turnpike.go | 28 - wampv2/util.go => util.go | 0 wamp.go | 427 -------------- wamp_test.go | 497 ----------------- wampv2/README.md | 6 - wampv2/websocket.go => websocket.go | 0 ...websocket_server.go => websocket_server.go | 0 wampv2/websocket_test.go => websocket_test.go | 0 46 files changed, 28 insertions(+), 2690 deletions(-) rename wampv2/broker.go => broker.go (100%) delete mode 100644 client.go delete mode 100644 client/client.go delete mode 100644 client_test.go rename wampv2/dealer.go => dealer.go (100%) rename wampv2/dealer_test.go => dealer_test.go (100%) rename wampv2/doc.go => doc.go (100%) rename wampv2/endpoint.go => endpoint.go (100%) delete mode 100644 exampleclient_test.go delete mode 100644 examples/chat/.gitignore delete mode 100755 examples/chat/build.sh create mode 100644 examples/chat/chatserver/main.go delete mode 100644 examples/chat/pubspec.lock delete mode 100644 examples/chat/pubspec.yaml delete mode 100644 examples/chat/server.go delete mode 100644 examples/chat/web/chat.dart delete mode 100644 examples/chat/web/index.html delete mode 100644 examples/hello/server.go delete mode 100644 examples/hello/web/hello.js delete mode 100644 examples/hello/web/index.html delete mode 100644 examples/pubsub/client.go delete mode 100644 examples/pubsub/server.go delete mode 100644 examples/rpc/client.go delete mode 100644 examples/rpc/server.go delete mode 100644 exampleserver_test.go rename wampv2/message.go => message.go (100%) delete mode 100644 prefix.go rename wampv2/realm.go => realm.go (100%) rename wampv2/router.go => router.go (100%) rename wampv2/router_test.go => router_test.go (100%) rename wampv2/serialize.go => serialize.go (100%) rename wampv2/serialize_test.go => serialize_test.go (100%) delete mode 100644 server.go delete mode 100644 server/server.go delete mode 100644 server_test.go rename wampv2/session.go => session.go (100%) create mode 100644 test_test.go.back delete mode 100644 turnpike.go rename wampv2/util.go => util.go (100%) delete mode 100644 wamp.go delete mode 100644 wamp_test.go delete mode 100644 wampv2/README.md rename wampv2/websocket.go => websocket.go (100%) rename wampv2/websocket_server.go => websocket_server.go (100%) rename wampv2/websocket_test.go => websocket_test.go (100%) diff --git a/README.md b/README.md index f8c0cfa..50e238a 100644 --- a/README.md +++ b/README.md @@ -1,32 +1,8 @@ -turnpike +Turnpike ======== -Go implementation of [WAMP](http://www.wamp.ws/) - The Websocket Application Messaging Protocol +Go implementation of [WAMP](http://www.wamp.ws/) - The Web Application Messaging Protocol -Turnpike provides a WAMP server and client. - -Examples -======== - -chat ----- - -Very simple chat server and client written in Dart. To run, first install [Dart](http://www.dartlang.org/tools/sdk/) (Dart editor should work as well), then: - - cd examples/dart - pub install - ./build.sh - go run server.go - -Open a browser (or more) to localhost:8080. Type in the textbox and hit enter. - -hello ------ - -Connect to a Turnpike server with autobahn.js: - - cd examples/hello - go run server.go - -Open a browser to localhost:8080. You should see a message when autobahn.js connects to Turnpike. +This version of Turnpike supports WAMP v2. For WAMP v1 support see the [v1 branch](https://github.com/jcelliott/turnpike/tree/v1). +This package is under heavy development and is *not* production ready. diff --git a/wampv2/broker.go b/broker.go similarity index 100% rename from wampv2/broker.go rename to broker.go diff --git a/client.go b/client.go deleted file mode 100644 index f98cdeb..0000000 --- a/client.go +++ /dev/null @@ -1,371 +0,0 @@ -// Copyright (c) 2013 Joshua Elliott -// Released under the MIT License -// http://opensource.org/licenses/MIT - -package turnpike - -import ( - "code.google.com/p/go.net/websocket" - "encoding/json" - "fmt" - "io" - "log" - "math/rand" -) - -const ( - wampProtocolId = "wamp" -) - -var clientBacklog = 10 - -// Client represents a WAMP client that handles RPC and pub/sub. -type Client struct { - // SessionId is a ID of the session in UUID4 format received at the start of the session. - SessionId string - // ProtocolVersion is the version of the WAMP protocol received at the start of the session. - ProtocolVersion int - // ServerIdent is the server ID (ie "turnpike, autobahn") received at the start of the session. - ServerIdent string - ws *websocket.Conn - messages chan string - prefixes prefixMap - eventHandlers map[string]EventHandler - calls map[string]chan CallResult - sessionOpenCallback func(string) -} - -// CallResult represents either a sucess or a failure after a RPC call. -type CallResult struct { - // Result contains the RPC call result returned by the server. - Result interface{} - // Error is nil on call success otherwise it contains the RPC error. - Error error -} - -// EventHandler is an interface for handlers to published events. The topicURI -// is the URI of the event and event is the event centents. -type EventHandler func(topicURI string, event interface{}) - -// NewClient creates a new WAMP client. -func NewClient() *Client { - return &Client{ - messages: make(chan string, clientBacklog), - prefixes: make(prefixMap), - eventHandlers: make(map[string]EventHandler), - calls: make(map[string]chan CallResult), - } -} - -// Prefix sets a CURIE prefix at the server for later use when interacting with -// the server. prefix is the first part of a CURIE (ie "calc") and URI is a full -// identifier (ie "http://example.com/simple/calc#") that is mapped to the prefix. -// -// Ref: http://wamp.ws/spec#prefix_message -func (c *Client) Prefix(prefix, URI string) error { - if debug { - log.Print("turnpike: sending prefix") - } - err := c.prefixes.registerPrefix(prefix, URI) - if err != nil { - return fmt.Errorf("turnpike: %s", err) - } - msg, err := createPrefix(prefix, URI) - if err != nil { - return fmt.Errorf("turnpike: %s", err) - } - c.messages <- string(msg) - return nil -} - -// Call makes a RPC call on the server identified by procURI (in either full URI -// or CURIE format) with zero or more args. Returns a channel that will receive -// the call result (or error) on completion. -// -// Ref: http://wamp.ws/spec#call_message -func (c *Client) Call(procURI string, args ...interface{}) chan CallResult { - if debug { - log.Print("turnpike: sending call") - } - // Channel size must be 1 to avoid blocking if no one is receiving the channel later. - resultCh := make(chan CallResult, 1) - callId := newId(16) - msg, err := createCall(callId, procURI, args...) - if err != nil { - r := CallResult{ - Result: nil, - Error: fmt.Errorf("turnpike: %s", err), - } - resultCh <- r - return resultCh - } - c.calls[callId] = resultCh - c.messages <- string(msg) - return resultCh -} - -// Subscribe adds a subscription at the server for events with topicURI lasting -// for the session or until Unsubscribe is called. -// -// Ref: http://wamp.ws/spec#subscribe_message -func (c *Client) Subscribe(topicURI string, f EventHandler) error { - if debug { - log.Print("turnpike: sending subscribe") - } - msg, err := createSubscribe(topicURI) - if err != nil { - return fmt.Errorf("turnpike: %s", err) - } - c.messages <- string(msg) - if f != nil { - c.eventHandlers[topicURI] = f - } - return nil -} - -// Unsubscribe removes a previous subscription with topicURI at the server. -// -// Ref: http://wamp.ws/spec#unsubscribe_message -func (c *Client) Unsubscribe(topicURI string) error { - if debug { - log.Print("turnpike: sending unsubscribe") - } - msg, err := createUnsubscribe(topicURI) - if err != nil { - return fmt.Errorf("turnpike: %s", err) - } - c.messages <- string(msg) - delete(c.eventHandlers, topicURI) - return nil -} - -// Publish publishes an event to the topicURI that gets sent to all subscribers -// of that topicURI by the server. opts can can be either empty, one boolean -// that can be used to exclude outself from receiving the event or two lists; -// the first a list of clients to exclude and the second a list of clients that -// are eligible to receive the event. Either list can be empty. -// -// Ref: http://wamp.ws/spec#publish_message -func (c *Client) Publish(topicURI string, event interface{}, opts ...interface{}) error { - if debug { - log.Print("turnpike: sending publish") - } - msg, err := createPublish(topicURI, event, opts...) - if err != nil { - return fmt.Errorf("turnpike: %s", err) - } - c.messages <- string(msg) - return nil -} - -// PublishExcludeMe is a short hand for Publish(tobicURI, event, true) that will -// not send the event to ourself. -func (c *Client) PublishExcludeMe(topicURI string, event interface{}) error { - return c.Publish(topicURI, event, true) -} - -func (c *Client) handleCallResult(msg callResultMsg) { - if debug { - log.Print("turnpike: handling call result message") - } - resultCh, ok := c.calls[msg.CallID] - if !ok { - if debug { - log.Print("turnpike: missing call result handler") - } - return - } - delete(c.calls, msg.CallID) - r := CallResult{ - Result: msg.Result, - Error: nil, - } - resultCh <- r -} - -func (c *Client) handleCallError(msg callErrorMsg) { - if debug { - log.Print("turnpike: handling call error message") - } - resultCh, ok := c.calls[msg.CallID] - if !ok { - if debug { - log.Print("turnpike: missing call result handler") - } - return - } - delete(c.calls, msg.CallID) - r := CallResult{ - Error: RPCError{ - URI: msg.ErrorURI, - Description: msg.ErrorDesc, - Details: msg.ErrorDetails, - }, - } - resultCh <- r -} - -func (c *Client) handleEvent(msg eventMsg) { - if debug { - log.Print("turnpike: handling event message") - } - if f, ok := c.eventHandlers[msg.TopicURI]; ok && f != nil { - f(msg.TopicURI, msg.Event) - } else { - if debug { - log.Printf("turnpike: missing event handler for URI: %s", msg.TopicURI) - } - } -} - -func (c *Client) receiveWelcome() error { - if debug { - log.Print("turnpike: receive welcome") - } - var rec string - err := websocket.Message.Receive(c.ws, &rec) - if err != nil { - return fmt.Errorf("Error receiving welcome message: %s", err) - } - if typ := parseMessageType(rec); typ != msgWelcome { - return fmt.Errorf("First message received was not welcome") - } - var msg welcomeMsg - err = json.Unmarshal([]byte(rec), &msg) - if err != nil { - return fmt.Errorf("Error unmarshalling welcome message: %s", err) - } - c.SessionId = msg.SessionId - c.ProtocolVersion = msg.ProtocolVersion - c.ServerIdent = msg.ServerIdent - if debug { - log.Print("turnpike: session id: %s", c.SessionId) - log.Print("turnpike: protocol version: %d", c.ProtocolVersion) - log.Print("turnpike: server ident: %s", c.ServerIdent) - } - - if c.sessionOpenCallback != nil { - c.sessionOpenCallback(c.SessionId) - } - - return nil -} - -func (c *Client) receive() { - for { - var rec string - err := websocket.Message.Receive(c.ws, &rec) - if err != nil { - if err != io.EOF { - if debug { - log.Printf("turnpike: error receiving message, aborting connection: %s", err) - } - } - break - } - if debug { - log.Printf("turnpike: message received: %s", rec) - } - - data := []byte(rec) - - switch typ := parseMessageType(rec); typ { - case msgCallResult: - var msg callResultMsg - err := json.Unmarshal(data, &msg) - if err != nil { - if debug { - log.Printf("turnpike: error unmarshalling call result message: %s", err) - } - continue - } - c.handleCallResult(msg) - case msgCallError: - var msg callErrorMsg - err := json.Unmarshal(data, &msg) - if err != nil { - if debug { - log.Printf("turnpike: error unmarshalling call error message: %s", err) - } - continue - } - c.handleCallError(msg) - case msgEvent: - var msg eventMsg - err := json.Unmarshal(data, &msg) - if err != nil { - if debug { - log.Printf("turnpike: error unmarshalling event message: %s", err) - } - continue - } - c.handleEvent(msg) - case msgPrefix, msgCall, msgSubscribe, msgUnsubscribe, msgPublish: - if debug { - log.Printf("turnpike: client -> server message received, ignored: %s", messageTypeString(typ)) - } - case msgWelcome: - if debug { - log.Print("turnpike: received extraneous welcome message, ignored") - } - default: - if debug { - log.Printf("turnpike: invalid message format, message dropped: %s", data) - } - } - } -} - -func (c *Client) send() { - for msg := range c.messages { - if debug { - log.Printf("turnpike: sending message: %s", msg) - } - if err := websocket.Message.Send(c.ws, msg); err != nil { - if debug { - log.Printf("turnpike: error sending message: %s", err) - } - } - } -} - -// Connect will connect to server with an optional origin. -// More details here: http://godoc.org/code.google.com/p/go.net/websocket#Dial -func (c *Client) Connect(server, origin string) error { - if debug { - log.Print("turnpike: connect") - } - var err error - if c.ws, err = websocket.Dial(server, wampProtocolId, origin); err != nil { - return fmt.Errorf("Error connecting to websocket server: %s", err) - } - - // Receive welcome message - if err = c.receiveWelcome(); err != nil { - return err - } - if debug { - log.Printf("turnpike: connected to server: %s", server) - } - - go c.receive() - go c.send() - - return nil -} - -// SetSessionOpenCallback adds a callback function that is run when a new session begins. -// The callback function must accept a string argument that is the session ID. -func (c *Client) SetSessionOpenCallback(f func(string)) { - c.sessionOpenCallback = f -} - -// newId generates a random string of fixed size. -func newId(size int) string { - const alpha = "ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnpqrstuvwxyz0123456789-_" - buf := make([]byte, size) - for i := 0; i < size; i++ { - buf[i] = alpha[rand.Intn(len(alpha))] - } - return string(buf) -} diff --git a/client/client.go b/client/client.go deleted file mode 100644 index 923a119..0000000 --- a/client/client.go +++ /dev/null @@ -1,95 +0,0 @@ -// Copyright (c) 2013 Joshua Elliott -// Released under the MIT License -// http://opensource.org/licenses/MIT - -package main - -import ( - "bufio" - "fmt" - "os" - "strconv" - "strings" - "turnpike" -) - -func main() { - c := turnpike.NewClient() - fmt.Print("Server address (default: localhost:8080)\n> ") - read := bufio.NewReader(os.Stdin) - // if _, err := fmt.Scanln(&server); err != nil { - server, err := read.ReadString('\n') - if err != nil { - fmt.Println("Error reading from stdin:", err) - return - } - server = strings.TrimSpace(server) - if server == "" { - server = "localhost:8080" - } - if err := c.Connect("ws://"+server, "http://localhost:8070"); err != nil { - fmt.Println("Error connecting:", err) - return - } - - fmt.Print( - "----------------------------------------------------------------------\n", - "Connected to server at: ", server, "\n", - "With session id: ", c.SessionId, "\n", - "Enter WAMP message, parameters separated by spaces\n", - "PREFIX=1, CALL=2, SUBSCRIBE=5, UNSUBSCRIBE=6, PUBLISH=7\n", - "----------------------------------------------------------------------\n") - - for { - fmt.Print(c.ServerIdent, "> ") - // if _, err := fmt.Scanln(&msgType, &args); err != nil { - line, err := read.ReadString('\n') - if err != nil { - fmt.Println("Error reading from stdin:", err) - return - } - line = strings.TrimSpace(line) - if line == "" { - continue - } - // fmt.Println(line) - - // get the type - params := strings.SplitN(line, " ", 2) - line = params[1] - msgType, err := strconv.Atoi(params[0]) - if err != nil { - fmt.Println("Error parsing message type:", params[0]) - continue - } - - err = nil - switch msgType { - case turnpike.PREFIX: - var prefix, URI string - fmt.Sscan(line, &prefix, &URI) - err = c.Prefix(prefix, URI) - case turnpike.CALL: - args := strings.Split(line, " ") - err = c.Call(args[0], args[1:]) - case turnpike.SUBSCRIBE: - err = c.Subscribe(line) - case turnpike.UNSUBSCRIBE: - err = c.Unsubscribe(line) - case turnpike.PUBLISH: - args := strings.Split(line, " ") - if len(args) > 2 { - err = c.Publish(args[0], args[1], args[2:]) - } else { - err = c.Publish(args[0], args[1]) - } - default: - fmt.Println("Invalid message type:", msgType) - continue - } - - if err != nil { - fmt.Println("Error sending message:", err) - } - } -} diff --git a/client_test.go b/client_test.go deleted file mode 100644 index 8d697f0..0000000 --- a/client_test.go +++ /dev/null @@ -1,150 +0,0 @@ -// Copyright (c) 2013 Joshua Elliott -// Released under the MIT License -// http://opensource.org/licenses/MIT - -package turnpike - -import ( - "errors" - "net/http" - "runtime" - "testing" - "time" -) - -func TestClient_CallResult(t *testing.T) { - s := NewServer() - s.RegisterRPC("rpc:test_result", - func(client, uri string, args ...interface{}) (interface{}, error) { - return "ok", nil - }) - http.Handle("/ws1", s.Handler) - // TODO: needs better way of running multiple listen and serve. - // Currently there is no way of closing the listener. A cusom server and - // handler will work but requires more work. TBD. - go func() { - err := http.ListenAndServe(":8001", nil) - if err != nil { - t.Fatal("ListenAndServe: " + err.Error()) - } - }() - - // Let the server goroutine start. - runtime.Gosched() - - c := NewClient() - err := c.Connect("ws://127.0.0.1:8001/ws1", "http://localhost/") - if err != nil { - t.Fatal("error connecting: " + err.Error()) - } - - resultCh := c.Call("rpc:test_result") - r := <-resultCh - if r.Error != nil { - t.Error(r.Error) - } - if r.Result != "ok" { - t.Fail() - } -} - -func TestClient_CallRestultGenericError(t *testing.T) { - s := NewServer() - s.RegisterRPC("rpc:test_generic_error", - func(client, uri string, args ...interface{}) (interface{}, error) { - return nil, errors.New("error") - }) - http.Handle("/ws2", s.Handler) - go func() { - err := http.ListenAndServe(":8002", nil) - if err != nil { - t.Fatal("ListenAndServe: " + err.Error()) - } - }() - - runtime.Gosched() - - c := NewClient() - err := c.Connect("ws://127.0.0.1:8002/ws2", "http://localhost/") - if err != nil { - t.Fatal("error connecting: " + err.Error()) - } - - resultCh := c.Call("rpc:test_generic_error") - r := <-resultCh - if r.Result != nil { - t.Fail() - } - if r.Error.Error() != "turnpike: RPC error with URI rpc:test_generic_error#generic-error: error" { - t.Fail() - } -} - -func TestClient_CallRestultCustomError(t *testing.T) { - s := NewServer() - s.RegisterRPC("rpc:test_custom_error", - func(client, uri string, args ...interface{}) (interface{}, error) { - return nil, RPCError{uri, "custom error", nil} - }) - http.Handle("/ws3", s.Handler) - go func() { - err := http.ListenAndServe(":8003", nil) - if err != nil { - t.Fatal("ListenAndServe: " + err.Error()) - } - }() - - runtime.Gosched() - - c := NewClient() - err := c.Connect("ws://127.0.0.1:8003/ws3", "http://localhost/") - if err != nil { - t.Fatal("error connecting: " + err.Error()) - } - - resultCh := c.Call("rpc:test_custom_error") - r := <-resultCh - if r.Result != nil { - t.Fail() - } - if r.Error.Error() != "turnpike: RPC error with URI rpc:test_custom_error: custom error" { - t.Fail() - } -} - -func TestClient_Event(t *testing.T) { - s := NewServer() - http.Handle("/ws4", s.Handler) - // TODO: needs better way of running multiple listen and serve. - // Currently there is no way of closing the listener. A cusom server and - // handler will work but requires more work. TBD. - go func() { - err := http.ListenAndServe(":8004", nil) - if err != nil { - t.Fatal("ListenAndServe: " + err.Error()) - } - }() - - // Let the server goroutine start. - runtime.Gosched() - - c := NewClient() - err := c.Connect("ws://127.0.0.1:8004/ws4", "http://localhost/") - if err != nil { - t.Fatal("error connecting: " + err.Error()) - } - - eventCh := make(chan bool) - c.Subscribe("event:test", func(uri string, event interface{}) { - eventCh <- true - }) - - c.Publish("event:test", "test") - - select { - case <-eventCh: - return - case <-time.After(time.Second): - t.Fail() - } -} diff --git a/wampv2/dealer.go b/dealer.go similarity index 100% rename from wampv2/dealer.go rename to dealer.go diff --git a/wampv2/dealer_test.go b/dealer_test.go similarity index 100% rename from wampv2/dealer_test.go rename to dealer_test.go diff --git a/wampv2/doc.go b/doc.go similarity index 100% rename from wampv2/doc.go rename to doc.go diff --git a/wampv2/endpoint.go b/endpoint.go similarity index 100% rename from wampv2/endpoint.go rename to endpoint.go diff --git a/exampleclient_test.go b/exampleclient_test.go deleted file mode 100644 index e667bdf..0000000 --- a/exampleclient_test.go +++ /dev/null @@ -1,15 +0,0 @@ -package turnpike_test - -import ( - "github.com/jcelliott/turnpike" -) - -func ExampleClient_NewClient() { - c := turnpike.NewClient() - err := c.Connect("ws://127.0.0.1:8080/ws", "http://localhost/") - if err != nil { - panic("Error connecting:" + err.Error()) - } - - c.Call("rpc:test") -} diff --git a/examples/chat/.gitignore b/examples/chat/.gitignore deleted file mode 100644 index f6db14a..0000000 --- a/examples/chat/.gitignore +++ /dev/null @@ -1,4 +0,0 @@ -packages -*.dart.js -*.map -*.deps diff --git a/examples/chat/build.sh b/examples/chat/build.sh deleted file mode 100755 index 76553ab..0000000 --- a/examples/chat/build.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/usr/bin/env bash - -go get -dart2js -oweb/chat.dart.js web/chat.dart diff --git a/examples/chat/chatserver/main.go b/examples/chat/chatserver/main.go new file mode 100644 index 0000000..0635afb --- /dev/null +++ b/examples/chat/chatserver/main.go @@ -0,0 +1,6 @@ +package main + +import ( + "fmt" + "github.com/jcelliott/turnpike" +) diff --git a/examples/chat/pubspec.lock b/examples/chat/pubspec.lock deleted file mode 100644 index 74f4d39..0000000 --- a/examples/chat/pubspec.lock +++ /dev/null @@ -1,4 +0,0 @@ -# Generated by pub -# See http://pub.dartlang.org/doc/glossary.html#lockfile - -{"packages":{"wamp":{"version":"0.1.0","source":"hosted","description":"wamp"},"browser":{"version":"0.4.0","source":"hosted","description":"browser"}}} diff --git a/examples/chat/pubspec.yaml b/examples/chat/pubspec.yaml deleted file mode 100644 index 678e9fe..0000000 --- a/examples/chat/pubspec.yaml +++ /dev/null @@ -1,6 +0,0 @@ -name: turnpike-chat -version: 0.1.0 -description: Example WAMP-based chat server using Turnpike and dart-wamp -dependencies: - browser: '>=0.4.0 <0.4.1' - wamp: '0.1.0' diff --git a/examples/chat/server.go b/examples/chat/server.go deleted file mode 100644 index 04a4656..0000000 --- a/examples/chat/server.go +++ /dev/null @@ -1,19 +0,0 @@ -package main - -import ( - "fmt" - "github.com/jcelliott/turnpike" - "log" - "net/http" -) - -func main() { - s := turnpike.NewServer() - http.Handle("/ws", s.Handler) - http.Handle("/", http.FileServer(http.Dir("web"))) - - fmt.Println("Listening on port 8080") - if err := http.ListenAndServe(":8080", nil); err != nil { - log.Fatal("ListenAndServe:", err) - } -} diff --git a/examples/chat/web/chat.dart b/examples/chat/web/chat.dart deleted file mode 100644 index f9e2f66..0000000 --- a/examples/chat/web/chat.dart +++ /dev/null @@ -1,28 +0,0 @@ -import 'dart:html'; -import 'package:wamp/wamp_client.dart'; - -class ChatClient extends WampClient { - ChatClient(socket) : super(socket); - - onWelcome() { - subscribe('chat:room'); - } - - onEvent(topicUri, event) { - query('#history').appendHtml('${event}
'); - } -} - -void main() { - var socket = new WebSocket('ws://127.0.0.1:8080/ws'), - client = new ChatClient(socket); - - var prompt = query('#prompt') as InputElement; - - prompt.onKeyPress.listen((e) { - if (e.keyCode == 13) { - client.publish('chat:room', prompt.value, true); - prompt.value = ''; - } - }); -} diff --git a/examples/chat/web/index.html b/examples/chat/web/index.html deleted file mode 100644 index acd5fe7..0000000 --- a/examples/chat/web/index.html +++ /dev/null @@ -1,10 +0,0 @@ - - - - - - - -
- - diff --git a/examples/hello/server.go b/examples/hello/server.go deleted file mode 100644 index 04a4656..0000000 --- a/examples/hello/server.go +++ /dev/null @@ -1,19 +0,0 @@ -package main - -import ( - "fmt" - "github.com/jcelliott/turnpike" - "log" - "net/http" -) - -func main() { - s := turnpike.NewServer() - http.Handle("/ws", s.Handler) - http.Handle("/", http.FileServer(http.Dir("web"))) - - fmt.Println("Listening on port 8080") - if err := http.ListenAndServe(":8080", nil); err != nil { - log.Fatal("ListenAndServe:", err) - } -} diff --git a/examples/hello/web/hello.js b/examples/hello/web/hello.js deleted file mode 100644 index ba86ec6..0000000 --- a/examples/hello/web/hello.js +++ /dev/null @@ -1,18 +0,0 @@ -(function () { - "use strict"; - - function connect(session) { - console.log('WAMP client connected'); - document.querySelector('#content').innerHTML = "WAMP client connected to Turnpike server"; - } - - function disconnect(code, reason) { - console.log('WAMP client disconnected: ' + code + ": " + reason); - document.querySelector('#content').innerHTML = "WAMP client disconnected: " + reason + " (" + code + ")"; - } - - function init() { - ab.connect("ws://localhost:8080/ws", connect, disconnect); - } - window.addEventListener('load', init); -})(); diff --git a/examples/hello/web/index.html b/examples/hello/web/index.html deleted file mode 100644 index 39f6271..0000000 --- a/examples/hello/web/index.html +++ /dev/null @@ -1,9 +0,0 @@ - - - - - - -
- - diff --git a/examples/pubsub/client.go b/examples/pubsub/client.go deleted file mode 100644 index dfca0eb..0000000 --- a/examples/pubsub/client.go +++ /dev/null @@ -1,26 +0,0 @@ -package main - -import ( - "fmt" - "github.com/jcelliott/turnpike" - "time" -) - -func testHandler(uri string, event interface{}) { - fmt.Printf("Received event: %s\n", event) -} - -func main() { - c := turnpike.NewClient() - err := c.Connect("ws://127.0.0.1:8080/ws", "http://localhost/") - if err != nil { - panic("Error connecting:" + err.Error()) - } - - c.Subscribe("event:test", testHandler) - - for { - c.Publish("event:test", "test") - <-time.After(time.Second) - } -} diff --git a/examples/pubsub/server.go b/examples/pubsub/server.go deleted file mode 100644 index d313a2e..0000000 --- a/examples/pubsub/server.go +++ /dev/null @@ -1,16 +0,0 @@ -package main - -import ( - "github.com/jcelliott/turnpike" - "net/http" -) - -func main() { - s := turnpike.NewServer() - - http.Handle("/ws", s.Handler) - err := http.ListenAndServe(":8080", nil) - if err != nil { - panic("ListenAndServe: " + err.Error()) - } -} diff --git a/examples/rpc/client.go b/examples/rpc/client.go deleted file mode 100644 index c0de1a3..0000000 --- a/examples/rpc/client.go +++ /dev/null @@ -1,22 +0,0 @@ -package main - -import ( - "fmt" - "github.com/jcelliott/turnpike" -) - -func main() { - c := turnpike.NewClient() - err := c.Connect("ws://127.0.0.1:8080/ws", "http://localhost/") - if err != nil { - panic("Error connecting:" + err.Error()) - } - - resultCh, err := c.Call("rpc:test") - if err != nil { - panic("Error calling: " + err.Error()) - } - - result := <-resultCh - fmt.Printf("Call result is: %s\n", result.Result) -} diff --git a/examples/rpc/server.go b/examples/rpc/server.go deleted file mode 100644 index a49dfbb..0000000 --- a/examples/rpc/server.go +++ /dev/null @@ -1,21 +0,0 @@ -package main - -import ( - "github.com/jcelliott/turnpike" - "net/http" -) - -func handleTest(client, uri string, args ...interface{}) (interface{}, error) { - return "hello world", nil -} - -func main() { - s := turnpike.NewServer() - s.RegisterRPC("rpc:test", handleTest) - - http.Handle("/ws", s.Handler) - err := http.ListenAndServe(":8080", nil) - if err != nil { - panic("ListenAndServe: " + err.Error()) - } -} diff --git a/exampleserver_test.go b/exampleserver_test.go deleted file mode 100644 index e378cd4..0000000 --- a/exampleserver_test.go +++ /dev/null @@ -1,16 +0,0 @@ -package turnpike_test - -import ( - "github.com/jcelliott/turnpike" - "net/http" -) - -func ExampleServer_NewServer() { - s := turnpike.NewServer() - - http.Handle("/ws", s.Handler) - err := http.ListenAndServe(":8080", nil) - if err != nil { - panic("ListenAndServe: " + err.Error()) - } -} diff --git a/wampv2/message.go b/message.go similarity index 100% rename from wampv2/message.go rename to message.go diff --git a/prefix.go b/prefix.go deleted file mode 100644 index bade579..0000000 --- a/prefix.go +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright (c) 2013 Joshua Elliott -// Released under the MIT License -// http://opensource.org/licenses/MIT - -package turnpike - -import ( - "fmt" - "net/url" - "strings" -) - -type prefixMap map[string]string - -func (pm prefixMap) registerPrefix(prefix, URI string) error { - if _, err := url.ParseRequestURI(URI); err != nil { - return fmt.Errorf("Invalid URI: %s", URI) - } - pm[prefix] = URI - return nil -} - -// returns the full URI that the curie represents -func (pm prefixMap) resolveCurie(curie string) (string, error) { - parts := strings.SplitN(curie, ":", 2) - // if there is no reference, return the URI for the prefix - if len(parts) < 2 { - if URI, ok := pm[parts[0]]; ok { - return URI, nil - } - return "", fmt.Errorf("Unable to resolve curie: %s", curie) - } - // with a reference, append the ref to the URI - if URI, ok := pm[parts[0]]; ok { - return URI + parts[1], nil - } - return "", fmt.Errorf("Unable to resolve curie: %s", curie) -} - -// convenience function that will resolve a curie and pass through a URI -func checkCurie(pm prefixMap, curie string) string { - if pm == nil { - // no prefixes defined for this client, curie is a uri - return curie - } - uri, err := pm.resolveCurie(curie) - if err != nil { - // not registered for this client or it's a uri - return curie - } - // curie registered for this client, return real uri - return uri -} diff --git a/wampv2/realm.go b/realm.go similarity index 100% rename from wampv2/realm.go rename to realm.go diff --git a/wampv2/router.go b/router.go similarity index 100% rename from wampv2/router.go rename to router.go diff --git a/wampv2/router_test.go b/router_test.go similarity index 100% rename from wampv2/router_test.go rename to router_test.go diff --git a/wampv2/serialize.go b/serialize.go similarity index 100% rename from wampv2/serialize.go rename to serialize.go diff --git a/wampv2/serialize_test.go b/serialize_test.go similarity index 100% rename from wampv2/serialize_test.go rename to serialize_test.go diff --git a/server.go b/server.go deleted file mode 100644 index 376427a..0000000 --- a/server.go +++ /dev/null @@ -1,528 +0,0 @@ -// Copyright (c) 2013 Joshua Elliott -// Released under the MIT License -// http://opensource.org/licenses/MIT - -package turnpike - -import ( - "code.google.com/p/go.net/websocket" - "encoding/json" - "fmt" - "github.com/nu7hatch/gouuid" - "io" - "log" - "net" - "net/http" - "sync" - "time" -) - -var ( - // The amount of messages to buffer before sending to client. - serverBacklog = 20 -) - -const ( - clientConnTimeout = 6 - clientMaxFailures = 3 -) - -// Server represents a WAMP server that handles RPC and pub/sub. -type Server struct { - // Client ID -> send channel - clients map[string]chan string - // Client ID -> prefix mapping - prefixes map[string]prefixMap - // Proc URI -> handler - rpcHandlers map[string]RPCHandler - subHandlers map[string]SubHandler - pubHandlers map[string]PubHandler - // Topic URI -> subscribed clients - subscriptions map[string]listenerMap - subLock *sync.Mutex - sessionOpenCallback func(string) - websocket.Server -} - -// RPCHandler is an interface that handlers to RPC calls should implement. -// The first parameter is the call ID, the second is the proc URI. Last comes -// all optional arguments to the RPC call. The return can be of any type that -// can be marshaled to JSON, or a error (preferably RPCError but any error works.) -// NOTE: this may be broken in v2 if multiple-return is implemented -type RPCHandler func(clientID string, topicURI string, args ...interface{}) (interface{}, error) - -// RPCError represents a call error and is the recommended way to return an -// error from a RPC handler. -type RPCError struct { - URI string - Description string - Details interface{} -} - -// Error returns an error description. -func (e RPCError) Error() string { - return fmt.Sprintf("turnpike: RPC error with URI %s: %s", e.URI, e.Description) -} - -// SubHandler is an interface that handlers for subscriptions should implement to -// control with subscriptions are valid. A subscription is allowed by returning -// true or denied by returning false. -type SubHandler func(clientID string, topicURI string) bool - -// PubHandler is an interface that handlers for publishes should implement to -// get notified on a client publish with the possibility to modify the event. -// The event that will be published should be returned. -type PubHandler func(topicURI string, event interface{}) interface{} - -// NewServer creates a new WAMP server. -func NewServer() *Server { - s := &Server{ - clients: make(map[string]chan string), - prefixes: make(map[string]prefixMap), - rpcHandlers: make(map[string]RPCHandler), - subHandlers: make(map[string]SubHandler), - pubHandlers: make(map[string]PubHandler), - subscriptions: make(map[string]listenerMap), - subLock: new(sync.Mutex), - } - s.Server = websocket.Server{ - Handshake: checkWAMPHandshake, - Handler: websocket.Handler(s.HandleWebsocket), - } - return s -} - -// SetSessionOpenCallback adds a callback function that is run when a new session begins. -// The callback function must accept a string argument that is the session ID. -func (t *Server) SetSessionOpenCallback(f func(string)) { - t.sessionOpenCallback = f -} - -// RegisterRPC adds a handler for the RPC named uri. -func (t *Server) RegisterRPC(uri string, f RPCHandler) { - if f != nil { - t.rpcHandlers[uri] = f - } -} - -// UnregisterRPC removes a handler for the RPC named uri. -func (t *Server) UnregisterRPC(uri string) { - delete(t.rpcHandlers, uri) -} - -// RegisterSubHandler adds a handler called when a client subscribes to URI. -// The subscription can be canceled in the handler by returning false, or -// approved by returning true. -func (t *Server) RegisterSubHandler(uri string, f SubHandler) { - if f != nil { - t.subHandlers[uri] = f - } -} - -// UnregisterSubHandler removes a subscription handler for the URI. -func (t *Server) UnregisterSubHandler(uri string) { - delete(t.subHandlers, uri) -} - -// RegisterPubHandler adds a handler called when a client publishes to URI. -// The event can be modified in the handler and the returned event is what is -// published to the other clients. -func (t *Server) RegisterPubHandler(uri string, f PubHandler) { - if f != nil { - t.pubHandlers[uri] = f - } -} - -// UnregisterPubHandler removes a publish handler for the URI. -func (t *Server) UnregisterPubHandler(uri string) { - delete(t.pubHandlers, uri) -} - -// SendEvent sends an event with topic directly (not via Client.Publish()) -func (t *Server) SendEvent(topic string, event interface{}) { - t.handlePublish(topic, publishMsg{ - TopicURI: topic, - Event: event, - }) -} - -// HandleWebsocket implements the go.net/websocket.Handler interface. -func (t *Server) HandleWebsocket(conn *websocket.Conn) { - defer conn.Close() - - if debug { - log.Print("turnpike: received websocket connection") - } - - tid, err := uuid.NewV4() - if err != nil { - if debug { - log.Print("turnpike: could not create unique id, refusing client connection") - } - return - } - id := tid.String() - if debug { - log.Printf("turnpike: client connected: %s", id) - } - - arr, err := createWelcome(id, turnpikeServerIdent) - if err != nil { - if debug { - log.Print("turnpike: error encoding welcome message") - } - return - } - if debug { - log.Printf("turnpike: sending welcome message: %s", arr) - } - err = websocket.Message.Send(conn, string(arr)) - if err != nil { - if debug { - log.Printf("turnpike: error sending welcome message, aborting connection: %s", err) - } - return - } - - c := make(chan string, serverBacklog) - t.clients[id] = c - - if t.sessionOpenCallback != nil { - t.sessionOpenCallback(id) - } - - failures := 0 - go func() { - for msg := range c { - if debug { - log.Printf("turnpike: sending message: %s", msg) - } - conn.SetWriteDeadline(time.Now().Add(clientConnTimeout * time.Second)) - err := websocket.Message.Send(conn, msg) - if err != nil { - if nErr, ok := err.(net.Error); ok && (nErr.Timeout() || nErr.Temporary()) { - log.Printf("Network error: %s", nErr) - failures++ - if failures > clientMaxFailures { - break - } - } else { - if debug { - log.Printf("turnpike: error sending message: %s", err) - } - break - } - } - } - if debug { - log.Printf("Client %s disconnected", id) - } - conn.Close() - }() - - for { - var rec string - err := websocket.Message.Receive(conn, &rec) - if err != nil { - if err != io.EOF { - if debug { - log.Printf("turnpike: error receiving message, aborting connection: %s", err) - } - } - break - } - if debug { - log.Printf("turnpike: message received: %s", rec) - } - - data := []byte(rec) - - switch typ := parseMessageType(rec); typ { - case msgPrefix: - var msg prefixMsg - err := json.Unmarshal(data, &msg) - if err != nil { - if debug { - log.Printf("turnpike: error unmarshalling prefix message: %s", err) - } - continue - } - t.handlePrefix(id, msg) - case msgCall: - var msg callMsg - err := json.Unmarshal(data, &msg) - if err != nil { - if debug { - log.Printf("turnpike: error unmarshalling call message: %s", err) - } - continue - } - t.handleCall(id, msg) - case msgSubscribe: - var msg subscribeMsg - err := json.Unmarshal(data, &msg) - if err != nil { - if debug { - log.Printf("turnpike: error unmarshalling subscribe message: %s", err) - } - continue - } - t.handleSubscribe(id, msg) - case msgUnsubscribe: - var msg unsubscribeMsg - err := json.Unmarshal(data, &msg) - if err != nil { - if debug { - log.Printf("turnpike: error unmarshalling unsubscribe message: %s", err) - } - continue - } - t.handleUnsubscribe(id, msg) - case msgPublish: - var msg publishMsg - err := json.Unmarshal(data, &msg) - if err != nil { - if debug { - log.Printf("turnpike: error unmarshalling publish message: %s", err) - } - continue - } - t.handlePublish(id, msg) - case msgWelcome, msgCallResult, msgCallError, msgEvent: - if debug { - log.Printf("turnpike: server -> client message received, ignored: %s", messageTypeString(typ)) - } - default: - if debug { - log.Printf("turnpike: invalid message format, message dropped: %s", data) - } - } - } - - delete(t.clients, id) - close(c) -} - -func (t *Server) handlePrefix(id string, msg prefixMsg) { - if debug { - log.Print("turnpike: handling prefix message") - } - if _, ok := t.prefixes[id]; !ok { - t.prefixes[id] = make(prefixMap) - } - if err := t.prefixes[id].registerPrefix(msg.Prefix, msg.URI); err != nil { - if debug { - log.Printf("turnpike: error registering prefix: %s", err) - } - } - if debug { - log.Printf("turnpike: client %s registered prefix '%s' for URI: %s", id, msg.Prefix, msg.URI) - } -} - -func (t *Server) handleCall(id string, msg callMsg) { - if debug { - log.Print("turnpike: handling call message") - } - - var out string - var err error - - if f, ok := t.rpcHandlers[msg.ProcURI]; ok && f != nil { - var res interface{} - res, err = f(id, msg.ProcURI, msg.CallArgs...) - if err != nil { - var errorURI, desc string - var details interface{} - if er, ok := err.(RPCError); ok { - errorURI = er.URI - desc = er.Description - details = er.Details - } else { - errorURI = msg.ProcURI + "#generic-error" - desc = err.Error() - } - - if details != nil { - out, err = createCallError(msg.CallID, errorURI, desc, details) - } else { - out, err = createCallError(msg.CallID, errorURI, desc) - } - } else { - out, err = createCallResult(msg.CallID, res) - } - } else { - if debug { - log.Printf("turnpike: RPC call not registered: %s", msg.ProcURI) - } - out, err = createCallError(msg.CallID, "error:notimplemented", "RPC call '%s' not implemented", msg.ProcURI) - } - - if err != nil { - // whatever, let the client hang... - if debug { - log.Printf("turnpike: error creating callError message: %s", err) - } - return - } - if client, ok := t.clients[id]; ok { - client <- out - } -} - -func (t *Server) handleSubscribe(id string, msg subscribeMsg) { - if debug { - log.Print("turnpike: handling subscribe message") - } - - uri := checkCurie(t.prefixes[id], msg.TopicURI) - h := t.getSubHandler(uri) - if h != nil && !h(id, uri) { - if debug { - log.Printf("turnpike: client %s denied subscription of topic: %s", id, uri) - } - return - } - - t.subLock.Lock() - defer t.subLock.Unlock() - if _, ok := t.subscriptions[uri]; !ok { - t.subscriptions[uri] = make(map[string]bool) - } - t.subscriptions[uri].add(id) - if debug { - log.Printf("turnpike: client %s subscribed to topic: %s", id, uri) - } -} - -func (t *Server) handleUnsubscribe(id string, msg unsubscribeMsg) { - if debug { - log.Print("turnpike: handling unsubscribe message") - } - t.subLock.Lock() - uri := checkCurie(t.prefixes[id], msg.TopicURI) - if lm, ok := t.subscriptions[uri]; ok { - lm.remove(id) - } - t.subLock.Unlock() - if debug { - log.Printf("turnpike: client %s unsubscribed from topic: %s", id, uri) - } -} - -func (t *Server) handlePublish(id string, msg publishMsg) { - if debug { - log.Print("turnpike: handling publish message") - } - uri := checkCurie(t.prefixes[id], msg.TopicURI) - - h := t.getPubHandler(uri) - event := msg.Event - if h != nil { - event = h(uri, event) - } - - lm, ok := t.subscriptions[uri] - if !ok { - return - } - - out, err := createEvent(uri, event) - if err != nil { - if debug { - log.Printf("turnpike: error creating event message: %s", err) - } - return - } - - var sendTo []string - if len(msg.ExcludeList) > 0 || len(msg.EligibleList) > 0 { - // this is super ugly, but I couldn't think of a better way... - for tid := range lm { - include := true - for _, _tid := range msg.ExcludeList { - if tid == _tid { - include = false - break - } - } - if include { - sendTo = append(sendTo, tid) - } - } - - for _, tid := range msg.EligibleList { - include := true - for _, _tid := range sendTo { - if _tid == tid { - include = false - break - } - } - if include { - sendTo = append(sendTo, tid) - } - } - } else { - for tid := range lm { - if tid == id && msg.ExcludeMe { - continue - } - sendTo = append(sendTo, tid) - } - } - - for _, tid := range sendTo { - // we're not locking anything, so we need - // to make sure the client didn't disconnecct in the - // last few nanoseconds... - if client, ok := t.clients[tid]; ok { - if len(client) == cap(client) { - <-client - } - client <- string(out) - } - } -} - -func (t *Server) getSubHandler(uri string) SubHandler { - for i := len(uri); i >= 0; i-- { - u := uri[:i] - if h, ok := t.subHandlers[u]; ok { - return h - } - } - return nil -} - -func (t *Server) getPubHandler(uri string) PubHandler { - for i := len(uri); i >= 0; i-- { - u := uri[:i] - if h, ok := t.pubHandlers[u]; ok { - return h - } - } - return nil -} - -type listenerMap map[string]bool - -func (lm listenerMap) add(id string) { - lm[id] = true -} -func (lm listenerMap) contains(id string) bool { - return lm[id] -} -func (lm listenerMap) remove(id string) { - delete(lm, id) -} - -func checkWAMPHandshake(config *websocket.Config, req *http.Request) error { - for _, protocol := range config.Protocol { - if protocol == "wamp" { - config.Protocol = []string{protocol} - return nil - } - } - return websocket.ErrBadWebSocketProtocol -} diff --git a/server/server.go b/server/server.go deleted file mode 100644 index d96a87e..0000000 --- a/server/server.go +++ /dev/null @@ -1,23 +0,0 @@ -// Copyright (c) 2013 Joshua Elliott -// Released under the MIT License -// http://opensource.org/licenses/MIT - -package main - -import ( - "code.google.com/p/go.net/websocket" - "fmt" - "log" - "net/http" - "turnpike" -) - -func main() { - s := turnpike.NewServer() - // for Go1.1, this will be just websocket.Handler(s.HandleWebsocket) - http.Handle("/", websocket.Handler(turnpike.HandleWebsocket(s))) - fmt.Println("Listening on port 8080") - if err := http.ListenAndServe(":8080", nil); err != nil { - log.Fatal("ListenAndServe:", err) - } -} diff --git a/server_test.go b/server_test.go deleted file mode 100644 index c440f16..0000000 --- a/server_test.go +++ /dev/null @@ -1,247 +0,0 @@ -// Copyright (c) 2013 Joshua Elliott -// Released under the MIT License -// http://opensource.org/licenses/MIT - -package turnpike - -import ( - "net/http" - "runtime" - "testing" - "time" -) - -func TestServer_SubNoHandler(t *testing.T) { - s := NewServer() - - http.Handle("/ws_s1", s.Handler) - // TODO: needs better way of running multiple listen and serve. - // Currently there is no way of closing the listener. A cusom server and - // handler will work but requires more work. TBD. - go func() { - err := http.ListenAndServe(":8101", nil) - if err != nil { - t.Fatal("ListenAndServe: " + err.Error()) - } - }() - - // Let the server goroutine start. - runtime.Gosched() - - c := NewClient() - err := c.Connect("ws://127.0.0.1:8101/ws_s1", "http://localhost/") - if err != nil { - t.Fatal("error connecting: " + err.Error()) - } - - eventCh := make(chan bool) - c.Subscribe("event:test", func(uri string, event interface{}) { - eventCh <- true - }) - - c.Publish("event:test", "test") - - select { - case <-eventCh: - return - case <-time.After(time.Second): - t.Fail() - } - -} - -func TestServer_RegisterSubHandler(t *testing.T) { - s := NewServer() - subCh := make(chan bool) - s.RegisterSubHandler("event:test", func(clientID, topicURI string) bool { - subCh <- true - return true - }) - - http.Handle("/ws_s2", s.Handler) - // TODO: needs better way of running multiple listen and serve. - // Currently there is no way of closing the listener. A cusom server and - // handler will work but requires more work. TBD. - go func() { - err := http.ListenAndServe(":8102", nil) - if err != nil { - t.Fatal("ListenAndServe: " + err.Error()) - } - }() - - // Let the server goroutine start. - runtime.Gosched() - - c := NewClient() - err := c.Connect("ws://127.0.0.1:8102/ws_s2", "http://localhost/") - if err != nil { - t.Fatal("error connecting: " + err.Error()) - } - - c.Subscribe("event:test", func(uri string, event interface{}) {}) - - select { - case <-subCh: - return - case <-time.After(time.Second): - t.Fail() - } -} - -func TestServer_SubHandlerAccept(t *testing.T) { - s := NewServer() - s.RegisterSubHandler("event:test", func(clientID, topicURI string) bool { - return true - }) - - http.Handle("/ws_s3", s.Handler) - // TODO: needs better way of running multiple listen and serve. - // Currently there is no way of closing the listener. A cusom server and - // handler will work but requires more work. TBD. - go func() { - err := http.ListenAndServe(":8103", nil) - if err != nil { - t.Fatal("ListenAndServe: " + err.Error()) - } - }() - - // Let the server goroutine start. - runtime.Gosched() - - c := NewClient() - err := c.Connect("ws://127.0.0.1:8103/ws_s3", "http://localhost/") - if err != nil { - t.Fatal("error connecting: " + err.Error()) - } - - eventCh := make(chan bool) - c.Subscribe("event:test", func(uri string, event interface{}) { - eventCh <- true - }) - - c.Publish("event:test", "test") - - select { - case <-eventCh: - return - case <-time.After(time.Second): - t.Fail() - } -} - -func TestServer_SubHandlerDeny(t *testing.T) { - s := NewServer() - s.RegisterSubHandler("event:test", func(clientID, topicURI string) bool { - return false - }) - - http.Handle("/ws_s4", s.Handler) - // TODO: needs better way of running multiple listen and serve. - // Currently there is no way of closing the listener. A cusom server and - // handler will work but requires more work. TBD. - go func() { - err := http.ListenAndServe(":8104", nil) - if err != nil { - t.Fatal("ListenAndServe: " + err.Error()) - } - }() - - // Let the server goroutine start. - runtime.Gosched() - - c := NewClient() - err := c.Connect("ws://127.0.0.1:8104/ws_s4", "http://localhost/") - if err != nil { - t.Fatal("error connecting: " + err.Error()) - } - - c.Subscribe("event:test", func(uri string, event interface{}) { - t.Fail() - }) - - c.Publish("event:test", "test") - <-time.After(time.Second) -} - -func TestServer_RegisterPubHandler(t *testing.T) { - s := NewServer() - pubCh := make(chan bool) - s.RegisterPubHandler("event:test", func(topicURI string, event interface{}) interface{} { - pubCh <- true - return event - }) - - http.Handle("/ws_s5", s.Handler) - // TODO: needs better way of running multiple listen and serve. - // Currently there is no way of closing the listener. A cusom server and - // handler will work but requires more work. TBD. - go func() { - err := http.ListenAndServe(":8105", nil) - if err != nil { - t.Fatal("ListenAndServe: " + err.Error()) - } - }() - - // Let the server goroutine start. - runtime.Gosched() - - c := NewClient() - err := c.Connect("ws://127.0.0.1:8105/ws_s5", "http://localhost/") - if err != nil { - t.Fatal("error connecting: " + err.Error()) - } - - // c.Subscribe("event:test", func(uri string, event interface{}) {}) - c.Publish("event:test", "test") - - select { - case <-pubCh: - return - case <-time.After(time.Second): - t.Fail() - } -} - -func TestServer_PubHandlerChange(t *testing.T) { - s := NewServer() - s.RegisterPubHandler("event:test", func(topicURI string, event interface{}) interface{} { - return event.(string) + "2" - }) - - http.Handle("/ws_s6", s.Handler) - // TODO: needs better way of running multiple listen and serve. - // Currently there is no way of closing the listener. A cusom server and - // handler will work but requires more work. TBD. - go func() { - err := http.ListenAndServe(":8106", nil) - if err != nil { - t.Fatal("ListenAndServe: " + err.Error()) - } - }() - - // Let the server goroutine start. - runtime.Gosched() - - c := NewClient() - err := c.Connect("ws://127.0.0.1:8106/ws_s6", "http://localhost/") - if err != nil { - t.Fatal("error connecting: " + err.Error()) - } - - eventCh := make(chan bool) - c.Subscribe("event:test", func(uri string, event interface{}) { - if event != "test2" { - t.Fail() - } - eventCh <- true - }) - - c.Publish("event:test", "test") - - select { - case <-eventCh: - return - case <-time.After(time.Second): - t.Fail() - } -} diff --git a/wampv2/session.go b/session.go similarity index 100% rename from wampv2/session.go rename to session.go diff --git a/test_test.go.back b/test_test.go.back new file mode 100644 index 0000000..f340ed5 --- /dev/null +++ b/test_test.go.back @@ -0,0 +1,18 @@ +package wampv2 + +import ( + _ "github.com/smartystreets/goconvey/convey" + // "testing" +) + +// func TestSomething(t *testing.T) { +// Convey("Given something", t, func() { +// x := 1 +// Convey("After something else", func() { +// x++ +// Convey("The value should be something else", func() { +// So(x, ShouldEqual, 2) +// }) +// }) +// }) +// } diff --git a/turnpike.go b/turnpike.go deleted file mode 100644 index acb8ee2..0000000 --- a/turnpike.go +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright (c) 2013 Joshua Elliott -// Released under the MIT License -// http://opensource.org/licenses/MIT - -// Package turnpike provides a Websocket Application Messaging Protocol (WAMP) server and client -package turnpike - -import ( - "code.google.com/p/go.net/websocket" -) - -const ( - turnpikeVersion = "0.2.0" - turnpikeServerIdent = "turnpike-" + turnpikeVersion - debug = false -) - -// Handler is a interface to support Go1.0. -type Handler interface { - HandleWebsocket(*websocket.Conn) -} - -// HandleWebsocket is a Go1.0 shim for method values. -func HandleWebsocket(t Handler) func(*websocket.Conn) { - return func(conn *websocket.Conn) { - t.HandleWebsocket(conn) - } -} diff --git a/wampv2/util.go b/util.go similarity index 100% rename from wampv2/util.go rename to util.go diff --git a/wamp.go b/wamp.go deleted file mode 100644 index 46590fe..0000000 --- a/wamp.go +++ /dev/null @@ -1,427 +0,0 @@ -// Copyright (c) 2013 Joshua Elliott -// Released under the MIT License -// http://opensource.org/licenses/MIT - -package turnpike - -import ( - "encoding/json" - "net/url" - "regexp" - "strconv" -) - -const ( - msgWelcome = iota - msgPrefix - msgCall - msgCallResult - msgCallError - msgSubscribe - msgUnsubscribe - msgPublish - msgEvent -) - -const wampProtocolVersion = 1 - -var ( - typeReg = regexp.MustCompile("^\\s*\\[\\s*(\\d+)\\s*,") -) - -// A WAMPError is returned when attempting to create a message that does not follow the WAMP -// protocol -type WAMPError struct { - Msg string -} - -var ( - // An ErrInvalidURI describes an invalid URI. - ErrInvalidURI = &WAMPError{"invalid URI"} - // An ErrInvalidNumArgs describes invalid number of arguments in a message. - ErrInvalidNumArgs = &WAMPError{"invalid number of arguments in message"} - // An ErrUnsupportedProtocol describes an unsupported WAMP protocol in a welcome message. - ErrUnsupportedProtocol = &WAMPError{"unsupported protocol"} -) - -// Error implements the error interface to provide a message. -func (e *WAMPError) Error() string { - return "wamp: " + e.Msg -} - -func parseMessageType(msg string) int { - match := typeReg.FindStringSubmatch(msg) - if match == nil { - return -1 - } - i, _ := strconv.Atoi(match[1]) - return i -} - -func messageTypeString(typ int) string { - types := []string{"WELCOME", "PREFIX", "CALL", "CALLRESULT", "CALLERROR", "SUBSCRIBE", "UNSUBSCRIBE", "PUBLISH", "EVENT"} - if typ >= 0 && typ < 9 { - return types[typ] - } - return "" -} - -// WELCOME -type welcomeMsg struct { - SessionId string - ProtocolVersion int - ServerIdent string -} - -func (msg *welcomeMsg) UnmarshalJSON(jsonData []byte) error { - var data []interface{} - err := json.Unmarshal(jsonData, &data) - if err != nil { - return err - } - if len(data) != 4 { - return ErrInvalidNumArgs - } - var ok bool - if msg.SessionId, ok = data[1].(string); !ok { - return &WAMPError{"invalid session ID"} - } - if protocolVersion, ok := data[2].(float64); ok { - msg.ProtocolVersion = int(protocolVersion) - } else { - return ErrUnsupportedProtocol - } - if msg.ServerIdent, ok = data[3].(string); !ok { - return &WAMPError{"invalid server identity"} - } - return nil -} - -// Welcome returns a json encoded WAMP 'WELCOME' message as a byte slice -// sessionId is a randomly generated string provided by the server, serverIdent -// is a string that identifies the WAMP server -func createWelcome(sessionId, serverIdent string) (string, error) { - return createWAMPMessage(msgWelcome, sessionId, wampProtocolVersion, serverIdent) -} - -// PREFIX -type prefixMsg struct { - Prefix string - URI string -} - -func (msg *prefixMsg) UnmarshalJSON(jsonData []byte) error { - var data []interface{} - err := json.Unmarshal(jsonData, &data) - if err != nil { - return err - } - if len(data) != 3 { - return ErrInvalidNumArgs - } - var ok bool - if msg.Prefix, ok = data[1].(string); !ok { - return &WAMPError{"invalid prefix"} - } - if msg.URI, ok = data[2].(string); !ok { - return &WAMPError{"invalid URI"} - } - return nil -} - -// Prefix returns a json encoded WAMP 'PREFIX' message as a byte slice -func createPrefix(prefix, URI string) (string, error) { - if _, err := url.ParseRequestURI(URI); err != nil { - return "", &WAMPError{"invalid URI: " + URI} - } - return createWAMPMessage(msgPrefix, prefix, URI) -} - -// CALL -type callMsg struct { - CallID string - ProcURI string - CallArgs []interface{} -} - -func (msg *callMsg) UnmarshalJSON(jsonData []byte) error { - var data []interface{} - err := json.Unmarshal(jsonData, &data) - if err != nil { - return err - } - if len(data) < 3 { - return ErrInvalidNumArgs - } - var ok bool - if msg.CallID, ok = data[1].(string); !ok { - return &WAMPError{"invalid callID"} - } - if msg.ProcURI, ok = data[2].(string); !ok { - return &WAMPError{"invalid procURI"} - } - if len(data) > 3 { - msg.CallArgs = data[3:] - } - return nil -} - -// Call returns a json encoded WAMP 'CALL' message as a byte slice -// callID must be a randomly generated string, procURI is the URI of the remote -// procedure to be called, followed by zero or more call arguments -func createCall(callID, procURI string, args ...interface{}) (string, error) { - if _, err := url.ParseRequestURI(procURI); err != nil { - return "", &WAMPError{"invalid URI: " + procURI} - } - var data []interface{} - data = append(data, msgCall, callID, procURI) - data = append(data, args...) - b, err := json.Marshal(data) - return string(b), err -} - -// CALLRESULT -type callResultMsg struct { - CallID string - Result interface{} -} - -func (msg *callResultMsg) UnmarshalJSON(jsonData []byte) error { - var data []interface{} - err := json.Unmarshal(jsonData, &data) - if err != nil { - return err - } - if len(data) != 3 { - return ErrInvalidNumArgs - } - var ok bool - if msg.CallID, ok = data[1].(string); !ok { - return &WAMPError{"invalid callID"} - } - msg.Result = data[2] - - return nil -} - -// CallResult returns a json encoded WAMP 'CALLRESULT' message as a byte slice -// callID is the randomly generated string provided by the client -func createCallResult(callID string, result interface{}) (string, error) { - return createWAMPMessage(msgCallResult, callID, result) -} - -// CALLERROR -type callErrorMsg struct { - CallID string - ErrorURI string - ErrorDesc string - ErrorDetails interface{} -} - -func (msg *callErrorMsg) UnmarshalJSON(jsonData []byte) error { - var data []interface{} - err := json.Unmarshal(jsonData, &data) - if err != nil { - return err - } - if len(data) < 4 || len(data) > 5 { - return ErrInvalidNumArgs - } - var ok bool - if msg.CallID, ok = data[1].(string); !ok { - return &WAMPError{"invalid callID"} - } - if msg.ErrorURI, ok = data[2].(string); !ok { - return &WAMPError{"invalid errorURI"} - } - if msg.ErrorDesc, ok = data[3].(string); !ok { - return &WAMPError{"invalid error description"} - } - if len(data) == 5 { - msg.ErrorDetails = data[4] - } - return nil -} - -// CallError returns a json encoded WAMP 'CALLERROR' message as a byte slice -// callID is the randomly generated string provided by the client, errorURI is -// a URI identifying the error, errorDesc is a human-readable description of the -// error (for developers), errorDetails, if present, is a non-nil object -func createCallError(callID, errorURI, errorDesc string, errorDetails ...interface{}) (string, error) { - if _, err := url.ParseRequestURI(errorURI); err != nil { - return "", &WAMPError{"invalid URI: " + errorURI} - } - var data []interface{} - data = append(data, msgCallError, callID, errorURI, errorDesc) - data = append(data, errorDetails...) - b, err := json.Marshal(data) - return string(b), err -} - -// SUBSCRIBE -type subscribeMsg struct { - TopicURI string -} - -func (msg *subscribeMsg) UnmarshalJSON(jsonData []byte) error { - var data []interface{} - err := json.Unmarshal(jsonData, &data) - if err != nil { - return err - } - if len(data) != 2 { - return ErrInvalidNumArgs - } - var ok bool - if msg.TopicURI, ok = data[1].(string); !ok { - return &WAMPError{"invalid topicURI"} - } - return nil -} - -// Subscribe returns a json encoded WAMP 'SUBSCRIBE' message as a byte slice -// topicURI is the topic that the client wants to subscribe to -func createSubscribe(topicURI string) (string, error) { - return createWAMPMessagePubSub(msgSubscribe, topicURI) -} - -// UNSUBSCRIBE -type unsubscribeMsg struct { - TopicURI string -} - -func (msg *unsubscribeMsg) UnmarshalJSON(jsonData []byte) error { - var data []interface{} - err := json.Unmarshal(jsonData, &data) - if err != nil { - return err - } - if len(data) != 2 { - return ErrInvalidNumArgs - } - var ok bool - if msg.TopicURI, ok = data[1].(string); !ok { - return &WAMPError{"invalid topicURI"} - } - return nil -} - -// Unsubscribe returns a json encoded WAMP 'UNSUBSCRIBE' message as a byte slice -// topicURI is the topic that the client wants to unsubscribe from -func createUnsubscribe(topicURI string) (string, error) { - return createWAMPMessagePubSub(msgUnsubscribe, topicURI) -} - -// PUBLISH -type publishMsg struct { - TopicURI string - Event interface{} - ExcludeMe bool - ExcludeList []string - EligibleList []string -} - -func (msg *publishMsg) UnmarshalJSON(jsonData []byte) error { - var data []interface{} - err := json.Unmarshal(jsonData, &data) - if err != nil { - return err - } - if len(data) < 3 || len(data) > 5 { - return ErrInvalidNumArgs - } - var ok bool - if msg.TopicURI, ok = data[1].(string); !ok { - return &WAMPError{"invalid topicURI"} - } - msg.Event = data[2] - if len(data) > 3 { - if msg.ExcludeMe, ok = data[3].(bool); !ok { - var arr []interface{} - if arr, ok = data[3].([]interface{}); !ok && data[3] != nil { - return &WAMPError{"invalid exclude argument"} - } - for _, v := range arr { - if val, ok := v.(string); !ok { - return &WAMPError{"invalid exclude list"} - } else { - msg.ExcludeList = append(msg.ExcludeList, val) - } - } - if len(data) == 5 { - if arr, ok = data[4].([]interface{}); !ok && data[3] != nil { - return &WAMPError{"invalid eligable list"} - } - for _, v := range arr { - if val, ok := v.(string); !ok { - return &WAMPError{"invalid eligable list"} - } else { - msg.EligibleList = append(msg.EligibleList, val) - } - } - } - } - } - return nil -} - -// Publish returns a json encoded WAMP 'PUBLISH' message as a byte slice -// arguments must be given in one of the following formats: -// [ topicURI, event ] -// [ topicURI, event, excludeMe ] -// [ topicURI, event, exclude ] -// [ topicURI, event, exclude, eligible ] -// event can be nil, a simple json type, or a complex json type -func createPublish(topicURI string, event interface{}, opts ...interface{}) (string, error) { - var data []interface{} - data = append(data, msgPublish, topicURI, event) - data = append(data, opts...) - return createWAMPMessagePubSub(data...) -} - -// EVENT -type eventMsg struct { - TopicURI string - Event interface{} -} - -func (msg *eventMsg) UnmarshalJSON(jsonData []byte) error { - var data []interface{} - err := json.Unmarshal(jsonData, &data) - if err != nil { - return err - } - if len(data) != 3 { - return ErrInvalidNumArgs - } - var ok bool - if msg.TopicURI, ok = data[1].(string); !ok { - return &WAMPError{"invalid topicURI"} - } - msg.Event = data[2] - - return nil -} - -// Event returns a json encoded WAMP 'EVENT' message as a byte slice -// event can be nil, a simple json type, or a complex json type -func createEvent(topicURI string, event interface{}) (string, error) { - return createWAMPMessagePubSub(msgEvent, topicURI, event) -} - -// createWAMPMessagePubSub checks that the second argument (topicURI) is a valid -// URI and then passes the request on to createWAMPMessage -func createWAMPMessagePubSub(args ...interface{}) (string, error) { - if _, err := url.ParseRequestURI(args[1].(string)); err != nil { - return "", &WAMPError{"invalid URI: " + args[1].(string)} - } - return createWAMPMessage(args...) -} - -// createWAMPMessage returns a JSON encoded list from all the arguments passed to it -func createWAMPMessage(args ...interface{}) (string, error) { - var data []interface{} - data = append(data, args...) - b, err := json.Marshal(data) - return string(b), err -} diff --git a/wamp_test.go b/wamp_test.go deleted file mode 100644 index 873bc12..0000000 --- a/wamp_test.go +++ /dev/null @@ -1,497 +0,0 @@ -// Copyright (c) 2013 Joshua Elliott -// Released under the MIT License -// http://opensource.org/licenses/MIT - -package turnpike - -import ( - "encoding/json" - "github.com/stretchrcom/testify/assert" - "testing" -) - -type testObj struct { - Name string `json:"name"` - Value float32 `json:"value"` - List []int `json:"list"` -} - -func TestWelcome(t *testing.T) { - exp := `[0,"12345678",1,"turnpike-0.1.0"]` - msg, err := createWelcome("12345678", "turnpike-0.1.0") - if err != nil { - t.Errorf("error creating welcome message: %s", err) - } - assert.Equal(t, exp, string(msg)) - - exp = `[0,"87654321",1,"a different server"]` - msg, err = createWelcome("87654321", "a different server") - if err != nil { - t.Errorf("error creating welcome message: %s", err) - } - assert.Equal(t, exp, string(msg)) -} - -func TestParseWelcome(t *testing.T) { - data := []byte(`[0,"12345678",1,"turnpike-0.1.0"]`) - var msg welcomeMsg - assert.Implements(t, (*json.Unmarshaler)(nil), new(welcomeMsg)) - err := json.Unmarshal(data, &msg) - if err != nil { - t.Errorf("error unmarshalling json: %s", err) - } - assert.Equal(t, "12345678", msg.SessionId) - assert.Equal(t, 1, msg.ProtocolVersion) - assert.Equal(t, "turnpike-0.1.0", msg.ServerIdent) -} - -func TestPrefix(t *testing.T) { - exp := `[1,"prefix","http://www.example.com/api/start"]` - msg, err := createPrefix("prefix", "http://www.example.com/api/start") - if err != nil { - t.Errorf("error creating prefix message: %s", err) - } - assert.Equal(t, exp, string(msg)) - - // test bad uri - _, err = createPrefix("prefix", "httpppppppp") - assert.Error(t, err) -} - -func TestParsePrefix(t *testing.T) { - data := []byte(`[1,"prefix","http://www.example.com/api/start"]`) - var msg prefixMsg - assert.Implements(t, (*json.Unmarshaler)(nil), new(prefixMsg)) - err := json.Unmarshal(data, &msg) - if err != nil { - t.Errorf("error unmarshalling json: %s", err) - } - assert.Equal(t, "prefix", msg.Prefix) - assert.Equal(t, "http://www.example.com/api/start", msg.URI) -} - -func TestCall(t *testing.T) { - // zero call arguments - exp := `[2,"123456","http://example.com/testRPC"]` - compareCall(t, exp, "123456", "http://example.com/testRPC") - - // nil call argument - exp = `[2,"654321","http://example.com/otherRPC",null]` - compareCall(t, exp, "654321", "http://example.com/otherRPC", nil) - - // one call argument - exp = `[2,"a1b2c3d4","http://example.com/dosomething/rpc","call arg"]` - compareCall(t, exp, "a1b2c3d4", "http://example.com/dosomething/rpc", "call arg") - - // more call arguments - exp = `[2,"abcdefg","http://example.com/rpc","arg1","arg2"]` - compareCall(t, exp, "abcdefg", "http://example.com/rpc", "arg1", "arg2") - - // complex call argument - exp = `[2,"1234","http://example.com/rpc",{"name":"george","value":14.98,"list":[1,3,5]},"astring"]` - obj := testObj{Name: "george", Value: 14.98, List: []int{1, 3, 5}} - compareCall(t, exp, "1234", "http://example.com/rpc", obj, "astring") - - // test bad uri - _, err := createCall("abcd", "httpnopenopenope") - assert.Error(t, err) -} - -func compareCall(t *testing.T, expected, callID, procURI string, args ...interface{}) { - msg, err := createCall(callID, procURI, args...) - if err != nil { - t.Errorf("error creating call message: %s", err) - } - assert.Equal(t, expected, string(msg)) -} - -func TestParseCall(t *testing.T) { - // no call args - data := []byte(`[2,"123456","http://example.com/testRPC"]`) - var msg callMsg - assert.Implements(t, (*json.Unmarshaler)(nil), new(callMsg)) - err := json.Unmarshal(data, &msg) - if err != nil { - t.Errorf("error unmarshalling json: %s", err) - } - assert.Equal(t, "123456", msg.CallID) - assert.Equal(t, "http://example.com/testRPC", msg.ProcURI) - assert.Nil(t, msg.CallArgs) - - // simple call args - data = []byte(`[2,"a1b2c3d4","http://example.com/dosomething/rpc","call arg"]`) - err = json.Unmarshal(data, &msg) - if err != nil { - t.Errorf("error unmarshalling json: %s", err) - } - assert.Equal(t, "a1b2c3d4", msg.CallID) - assert.Equal(t, "http://example.com/dosomething/rpc", msg.ProcURI) - assert.Equal(t, "call arg", msg.CallArgs[0].(string)) - - // complex call args - data = []byte(`[2,"1234","http://example.com/rpc",{"name":"george","value":14.98,"list":[1,3,5]},"astring"]`) - err = json.Unmarshal(data, &msg) - if err != nil { - t.Errorf("error unmarshalling json: %s", err) - } - assert.Equal(t, "1234", msg.CallID) - assert.Equal(t, "http://example.com/rpc", msg.ProcURI) - assert.Equal(t, "george", msg.CallArgs[0].(map[string]interface{})["name"]) - assert.Equal(t, 14.98, msg.CallArgs[0].(map[string]interface{})["value"]) - assert.Equal(t, 5, msg.CallArgs[0].(map[string]interface{})["list"].([]interface{})[2]) - assert.Equal(t, "astring", msg.CallArgs[1].(string)) -} - -func TestCallResult(t *testing.T) { - // null result - exp := `[3,"123456",null]` - compareCallResult(t, exp, "123456", nil) - - // simple result - exp = `[3,"abcdefg","a cool result"]` - compareCallResult(t, exp, "abcdefg", "a cool result") - - // complex result - exp = `[3,"asdf",{"name":"sally","value":43.1,"list":[2,4,6]}]` - obj := testObj{Name: "sally", Value: 43.1, List: []int{2, 4, 6}} - compareCallResult(t, exp, "asdf", obj) -} - -func compareCallResult(t *testing.T, expected, callID string, result interface{}) { - msg, err := createCallResult(callID, result) - if err != nil { - t.Errorf("error creating callresult message: %s", err) - } - assert.Equal(t, expected, string(msg)) -} - -func TestParseCallResult(t *testing.T) { - // null result - data := []byte(`[3,"123456",null]`) - var msg callResultMsg - assert.Implements(t, (*json.Unmarshaler)(nil), new(callResultMsg)) - err := json.Unmarshal(data, &msg) - if err != nil { - t.Errorf("error unmarshalling json: %s", err) - } - assert.Equal(t, "123456", msg.CallID) - assert.Nil(t, msg.Result) - - // simple result - data = []byte(`[3,"abcdefg","a cool result"]`) - err = json.Unmarshal(data, &msg) - if err != nil { - t.Errorf("error unmarshalling json: %s", err) - } - assert.Equal(t, "abcdefg", msg.CallID) - assert.Equal(t, "a cool result", msg.Result) - - // complex result - data = []byte(`[3,"asdf",{"name":"sally","value":43.1,"list":[2,4,6]}]`) - err = json.Unmarshal(data, &msg) - if err != nil { - t.Errorf("error unmarshalling json: %s", err) - } - assert.Equal(t, "asdf", msg.CallID) - assert.Equal(t, "sally", msg.Result.(map[string]interface{})["name"]) - assert.Equal(t, 43.1, msg.Result.(map[string]interface{})["value"]) - assert.Equal(t, 6, msg.Result.(map[string]interface{})["list"].([]interface{})[2]) -} - -func TestCallError(t *testing.T) { - // generic error - exp := `[4,"1234","http://example.com/app/error#generic","there was an error"]` - compareCallError(t, exp, "1234", "http://example.com/app/error#generic", "there was an error") - - // integer error details - exp = `[4,"asdf","http://example.com/error","integer error",4567]` - compareCallError(t, exp, "asdf", "http://example.com/error", "integer error", 4567) - - // complex error details - exp = `[4,"asd123","http://example.com/error","big error",{"name":"huge","value":9000,"list":[10,60]}]` - obj := testObj{Name: "huge", Value: 9000, List: []int{10, 60}} - compareCallError(t, exp, "asd123", "http://example.com/error", "big error", obj) -} - -func compareCallError(t *testing.T, expected, callID, errorURI, errorDesc string, errorDetails ...interface{}) { - msg, err := createCallError(callID, errorURI, errorDesc, errorDetails...) - if err != nil { - t.Errorf("error creating callerror message: %s", err) - } - assert.Equal(t, expected, string(msg)) -} - -func TestParseCallError(t *testing.T) { - // generic error - data := []byte(`[4,"1234","http://example.com/app/error#generic","there was an error"]`) - var msg callErrorMsg - assert.Implements(t, (*json.Unmarshaler)(nil), new(callErrorMsg)) - err := json.Unmarshal(data, &msg) - if err != nil { - t.Errorf("error unmarshalling json: %s", err) - } - assert.Equal(t, "1234", msg.CallID) - assert.Equal(t, "http://example.com/app/error#generic", msg.ErrorURI) - assert.Equal(t, "there was an error", msg.ErrorDesc) - assert.Nil(t, msg.ErrorDetails) - - // integer error details - data = []byte(`[4,"asdf","http://example.com/error","integer error",4567]`) - err = json.Unmarshal(data, &msg) - if err != nil { - t.Errorf("error unmarshalling json: %s", err) - } - assert.Equal(t, "asdf", msg.CallID) - assert.Equal(t, "http://example.com/error", msg.ErrorURI) - assert.Equal(t, "integer error", msg.ErrorDesc) - assert.Equal(t, 4567, msg.ErrorDetails) - - // complex error details - data = []byte(`[4,"asd123","http://example.com/error","big error",{"name":"huge","value":9000,"list":[10,60]}]`) - err = json.Unmarshal(data, &msg) - if err != nil { - t.Errorf("error unmarshalling json: %s", err) - } - assert.Equal(t, "asd123", msg.CallID) - assert.Equal(t, "http://example.com/error", msg.ErrorURI) - assert.Equal(t, "big error", msg.ErrorDesc) - assert.Equal(t, "huge", msg.ErrorDetails.(map[string]interface{})["name"]) - assert.Equal(t, 9000, msg.ErrorDetails.(map[string]interface{})["value"]) - assert.Equal(t, 60, msg.ErrorDetails.(map[string]interface{})["list"].([]interface{})[1]) -} - -func TestSubscribe(t *testing.T) { - exp := `[5,"http://example.com/simple"]` - msg, err := createSubscribe("http://example.com/simple") - if err != nil { - t.Errorf("error creating subscribe message: %s", err) - } - assert.Equal(t, exp, string(msg)) - - // test bad uri - _, err = createSubscribe("qwerty") - assert.Error(t, err) -} - -func TestParseSubscribe(t *testing.T) { - data := []byte(`[5,"http://example.com/simple"]`) - var msg subscribeMsg - assert.Implements(t, (*json.Unmarshaler)(nil), new(subscribeMsg)) - err := json.Unmarshal(data, &msg) - if err != nil { - t.Errorf("error unmarshalling json: %s", err) - } - assert.Equal(t, "http://example.com/simple", msg.TopicURI) -} - -func TestUnsubscribe(t *testing.T) { - exp := `[6,"http://example.com/something"]` - msg, err := createUnsubscribe("http://example.com/something") - if err != nil { - t.Errorf("error creating unsubscribe message: %s", err) - } - assert.Equal(t, exp, string(msg)) - - // test bad uri - _, err = createUnsubscribe("qwerty") - assert.Error(t, err) -} - -func TestParseUnsubscribe(t *testing.T) { - data := []byte(`[6,"http://example.com/something"]`) - var msg unsubscribeMsg - assert.Implements(t, (*json.Unmarshaler)(nil), new(unsubscribeMsg)) - err := json.Unmarshal(data, &msg) - if err != nil { - t.Errorf("error unmarshalling json: %s", err) - } - assert.Equal(t, "http://example.com/something", msg.TopicURI) -} - -func TestPublish(t *testing.T) { - // test nil event - exp := `[7,"http://example.com/api/test",null]` - comparePublish(t, exp, "http://example.com/api/test", nil) - - // test simple event - exp = `[7,"http://example.com/api/testing:thing","this is an event"]` - comparePublish(t, exp, "http://example.com/api/testing:thing", "this is an event") - - // test complex event - obj := testObj{"the test", 17.3, []int{1, 2, 3}} - exp = `[7,"http://www.example.com/doc#thing",{"name":"the test","value":17.3,"list":[1,2,3]}]` - comparePublish(t, exp, "http://www.example.com/doc#thing", obj) - - // test with excludeMe - exp = `[7,"http://example.com/api/testing:thing","this is an event",true]` - comparePublish(t, exp, "http://example.com/api/testing:thing", "this is an event", true) - - // test with exclude list - exp = `[7,"http://example.com/api/testing:thing","this is an event",["bob","john"]]` - comparePublish(t, exp, "http://example.com/api/testing:thing", "this is an event", []string{"bob", "john"}) - - // test with eligible list - exp = `[7,"http://example.com/api/testing:thing","this is an event",[],["sam","fred"]]` - comparePublish(t, exp, "http://example.com/api/testing:thing", "this is an event", []string{}, []string{"sam", "fred"}) - - // test bad uri - _, err := createPublish("asdfasdf", "bad uri") - assert.Error(t, err) -} - -func comparePublish(t *testing.T, expected, topic string, event interface{}, args ...interface{}) { - msg, err := createPublish(topic, event, args...) - if err != nil { - t.Errorf("error creating message: %s", err) - } - assert.Equal(t, expected, string(msg)) -} - -func TestParsePublish(t *testing.T) { - // nill event - data := []byte(`[7,"http://example.com/api/test",null]`) - var msg publishMsg - assert.Implements(t, (*json.Unmarshaler)(nil), new(publishMsg)) - err := json.Unmarshal(data, &msg) - if err != nil { - t.Errorf("error unmarshalling json: %s", err) - } - assert.Equal(t, "http://example.com/api/test", msg.TopicURI) - assert.Nil(t, msg.Event) - assert.False(t, msg.ExcludeMe) - assert.Nil(t, msg.ExcludeList) - assert.Nil(t, msg.EligibleList) - - // simple event - data = []byte(`[7,"http://example.com/api/testing:thing","this is an event"]`) - err = json.Unmarshal(data, &msg) - if err != nil { - t.Errorf("error unmarshalling json: %s", err) - } - assert.Equal(t, "http://example.com/api/testing:thing", msg.TopicURI) - assert.Equal(t, "this is an event", msg.Event) - assert.False(t, msg.ExcludeMe) - assert.Nil(t, msg.ExcludeList) - assert.Nil(t, msg.EligibleList) - - // complex event - data = []byte(`[7,"http://www.example.com/doc#thing",{"name":"the test","value":17.3,"list":[1,2,3]}]`) - err = json.Unmarshal(data, &msg) - if err != nil { - t.Errorf("error unmarshalling json: %s", err) - } - assert.Equal(t, "http://www.example.com/doc#thing", msg.TopicURI) - assert.Equal(t, "the test", msg.Event.(map[string]interface{})["name"]) - assert.Equal(t, 17.3, msg.Event.(map[string]interface{})["value"]) - assert.Equal(t, 3, msg.Event.(map[string]interface{})["list"].([]interface{})[2]) - - // with excludeMe - data = []byte(`[7,"http://example.com/api/testing:thing","this is an event",true]`) - err = json.Unmarshal(data, &msg) - if err != nil { - t.Errorf("error unmarshalling json: %s", err) - } - assert.Equal(t, "http://example.com/api/testing:thing", msg.TopicURI) - assert.Equal(t, "this is an event", msg.Event) - assert.True(t, msg.ExcludeMe) - - // with exclude list - data = []byte(`[7,"http://example.com/api/testing:thing","this is an event",["bob","john"]]`) - err = json.Unmarshal(data, &msg) - if err != nil { - t.Errorf("error unmarshalling json: %s", err) - } - assert.Equal(t, "http://example.com/api/testing:thing", msg.TopicURI) - assert.Equal(t, "this is an event", msg.Event) - assert.Equal(t, "bob", msg.ExcludeList[0]) - assert.Equal(t, "john", msg.ExcludeList[1]) - assert.Nil(t, msg.EligibleList) - - // with eligible list - data = []byte(`[7,"http://example.com/api/testing:thing","this is an event",[],["sam","fred"]]`) - err = json.Unmarshal(data, &msg) - if err != nil { - t.Errorf("error unmarshalling json: %s", err) - } - assert.Equal(t, "http://example.com/api/testing:thing", msg.TopicURI) - assert.Equal(t, "sam", msg.EligibleList[0]) - assert.Equal(t, "fred", msg.EligibleList[1]) -} - -func TestEvent(t *testing.T) { - // test nil event - exp := `[8,"http://example.com/api/test",null]` - compareEvent(t, exp, "http://example.com/api/test", nil) - - // test simple event - exp = `[8,"http://example.com/api/testing:thing","this is an event"]` - compareEvent(t, exp, "http://example.com/api/testing:thing", "this is an event") - - // test complex event - obj := testObj{"the test", 17.3, []int{1, 2, 3}} - exp = `[8,"http://www.example.com/doc#thing",{"name":"the test","value":17.3,"list":[1,2,3]}]` - compareEvent(t, exp, "http://www.example.com/doc#thing", obj) - - // test bad uri - _, err := createEvent("asdfasdf", "bad uri") - assert.Error(t, err) -} - -func compareEvent(t *testing.T, expected, topic string, event interface{}) { - msg, err := createEvent(topic, event) - if err != nil { - t.Errorf("error creating message: %s", err) - } - assert.Equal(t, expected, string(msg)) -} - -func TestParseEvent(t *testing.T) { - // nil event - data := []byte(`[8,"http://example.com/api/test",null]`) - var msg eventMsg - assert.Implements(t, (*json.Unmarshaler)(nil), new(eventMsg)) - err := json.Unmarshal(data, &msg) - if err != nil { - t.Errorf("error unmarshalling json: %s", err) - } - assert.Equal(t, "http://example.com/api/test", msg.TopicURI) - assert.Nil(t, msg.Event) - - // simple event - data = []byte(`[8,"http://example.com/api/testing:thing","this is an event"]`) - assert.Implements(t, (*json.Unmarshaler)(nil), new(eventMsg)) - err = json.Unmarshal(data, &msg) - if err != nil { - t.Errorf("error unmarshalling json: %s", err) - } - assert.Equal(t, "http://example.com/api/testing:thing", msg.TopicURI) - assert.Equal(t, "this is an event", msg.Event) - - // complex event - data = []byte(`[8,"http://www.example.com/doc#thing",{"name":"the test","value":17.3,"list":[1,2,3]}]`) - assert.Implements(t, (*json.Unmarshaler)(nil), new(eventMsg)) - err = json.Unmarshal(data, &msg) - if err != nil { - t.Errorf("error unmarshalling json: %s", err) - } - assert.Equal(t, "http://www.example.com/doc#thing", msg.TopicURI) - assert.Equal(t, "the test", msg.Event.(map[string]interface{})["name"]) - assert.Equal(t, 17.3, msg.Event.(map[string]interface{})["value"]) - assert.Equal(t, 3, msg.Event.(map[string]interface{})["list"].([]interface{})[2]) -} - -func TestParseMessageType(t *testing.T) { - data := `[8,"http://example.com/api/test",null]` - i := parseMessageType(data) - assert.Equal(t, msgEvent, i) - - data = `[true,"blah"]` - i = parseMessageType(data) - assert.Equal(t, -1, i) -} - -func TestMessageTypeString(t *testing.T) { - assert.Equal(t, "WELCOME", messageTypeString(0)) - assert.Equal(t, "SUBSCRIBE", messageTypeString(5)) - assert.Equal(t, "", messageTypeString(9)) -} diff --git a/wampv2/README.md b/wampv2/README.md deleted file mode 100644 index 0a71590..0000000 --- a/wampv2/README.md +++ /dev/null @@ -1,6 +0,0 @@ -wampv2 -====== - -This package is under heavy development and is *not* production ready. - -For information on how to use this package, please refer to doc.go. diff --git a/wampv2/websocket.go b/websocket.go similarity index 100% rename from wampv2/websocket.go rename to websocket.go diff --git a/wampv2/websocket_server.go b/websocket_server.go similarity index 100% rename from wampv2/websocket_server.go rename to websocket_server.go diff --git a/wampv2/websocket_test.go b/websocket_test.go similarity index 100% rename from wampv2/websocket_test.go rename to websocket_test.go