-
Notifications
You must be signed in to change notification settings - Fork 42
/
Copy pathwebsocketconnection.go
79 lines (70 loc) · 1.83 KB
/
websocketconnection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package signalr
import (
"bytes"
"context"
"fmt"
"net/http"
"net/url"
"nhooyr.io/websocket"
)
func NewWebSocketConnection(ctx context.Context, reqURL *url.URL, connectionID string, headers http.Header) (Connection, error) {
ws, _, err := websocket.Dial(ctx, reqURL.String(), &websocket.DialOptions{HTTPHeader: headers})
if err != nil {
return nil, err
}
return newWebSocketConnection(ctx, connectionID, ws), nil
}
type webSocketConnection struct {
ConnectionBase
conn *websocket.Conn
transferMode TransferMode
}
func newWebSocketConnection(ctx context.Context, connectionID string, conn *websocket.Conn) *webSocketConnection {
w := &webSocketConnection{
conn: conn,
ConnectionBase: *NewConnectionBase(ctx, connectionID),
}
return w
}
func (w *webSocketConnection) Write(p []byte) (n int, err error) {
messageType := websocket.MessageText
if w.transferMode == BinaryTransferMode {
messageType = websocket.MessageBinary
}
n, err = ReadWriteWithContext(w.Context(),
func() (int, error) {
err := w.conn.Write(w.Context(), messageType, p)
if err != nil {
return 0, err
}
return len(p), nil
},
func() {})
if err != nil {
err = fmt.Errorf("%T: %w", w, err)
_ = w.conn.Close(1000, err.Error())
}
return n, err
}
func (w *webSocketConnection) Read(p []byte) (n int, err error) {
n, err = ReadWriteWithContext(w.Context(),
func() (int, error) {
_, data, err := w.conn.Read(w.Context())
if err != nil {
return 0, err
}
return bytes.NewReader(data).Read(p)
},
func() {})
if err != nil {
err = fmt.Errorf("%T: %w", w, err)
_ = w.conn.Close(1000, err.Error())
}
return n, err
}
func (w *webSocketConnection) TransferMode() TransferMode {
return w.transferMode
}
func (w *webSocketConnection) SetTransferMode(transferMode TransferMode) {
w.transferMode = transferMode
}