This repository has been archived by the owner on Jun 25, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 16
/
socket.go
142 lines (121 loc) · 2.97 KB
/
socket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
// Copyright 2013 The Gorilla WebSocket Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package sdm630
import (
"encoding/json"
"log"
"net/http"
"time"
"github.com/gorilla/websocket"
)
const (
// Time allowed to write a message to the peer.
socketWriteWait = 10 * time.Second
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
// Client is a middleman between the websocket connection and the hub.
type Client struct {
hub *SocketHub
// The websocket connection.
conn *websocket.Conn
// Buffered channel of outbound messages.
send chan []byte
}
// writePump pumps messages from the hub to the websocket connection.
func (c *Client) writePump() {
defer func() {
c.conn.Close()
}()
for {
select {
case msg := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(socketWriteWait))
if err := c.conn.WriteMessage(websocket.TextMessage, msg); err != nil {
return
}
}
}
}
// ServeWebsocket handles websocket requests from the peer.
func ServeWebsocket(hub *SocketHub, w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)}
client.hub.register <- client
// run writing to client in goroutine
go client.writePump()
}
// SocketHub maintains the set of active clients and broadcasts messages to the
// clients.
type SocketHub struct {
// Registered clients.
clients map[*Client]bool
// Register requests from the clients.
register chan *Client
// Unregister requests from clients.
unregister chan *Client
// meter data stream
in QuerySnipChannel
// status stream
statusStream chan *Status
}
func NewSocketHub(inChannel QuerySnipChannel, status *Status) *SocketHub {
// Attach a goroutine that will push meter status information
// periodically
var statusstream = make(chan *Status)
go func() {
for {
time.Sleep(SECONDS_BETWEEN_STATUSUPDATE * time.Second)
status.Update()
statusstream <- status
}
}()
return &SocketHub{
register: make(chan *Client),
unregister: make(chan *Client),
clients: make(map[*Client]bool),
in: inChannel,
statusStream: statusstream,
}
}
func (h *SocketHub) Broadcast(i interface{}) {
if len(h.clients) > 0 {
message, err := json.Marshal(i)
if err != nil {
log.Fatal(err)
}
for client := range h.clients {
select {
case client.send <- message:
default:
close(client.send)
delete(h.clients, client)
}
}
}
}
func (h *SocketHub) Run() {
for {
select {
case client := <-h.register:
h.clients[client] = true
case client := <-h.unregister:
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
close(client.send)
}
case obj := <-h.in:
// make sure to pass a pointer or MarshalJSON won't work
h.Broadcast(&obj)
case obj := <-h.statusStream:
h.Broadcast(obj)
}
}
}