-
Notifications
You must be signed in to change notification settings - Fork 0
/
user_server.go
139 lines (109 loc) · 3.56 KB
/
user_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package msg2api
import (
"encoding/json"
"github.com/gorilla/websocket"
"net/http"
"time"
)
// UserServer contains the websocket connection to the User and
// stores handler functions to handler user requests.
type UserServer struct {
*apiBase
// GetMetadata handles a request for all available metadata for the user.
GetMetadata func() error
// GetValues handles a request for measurements of a give resolution in a given timespan for a given set of sensors.
// 'sensors' contains a mapping from device IDs to an array of sensor IDs.
GetValues func(since, until time.Time, resolution string, sensors map[string][]string) error
// RequestRealtimeUpdates handles a request for realtime update on a given set of sensors.
// 'sensors' contains a mapping from device IDs to a resolution to an array of sensor IDs.
RequestRealtimeUpdates func(sensors map[string][]string) error
}
// Run listens for incoming commands on the websocket and handles them.
func (u *UserServer) Run() error {
for {
var msg MessageIn
if err := u.socket.ReceiveJSON(&msg); err != nil {
u.socket.Close(websocket.CloseProtocolError, err.Error())
return err
}
var opError *Error
switch msg.Command {
case "getMetadata":
opError = u.doGetMetadata(&msg)
case "getValues":
opError = u.doGetValues(&msg)
case "requestRealtimeUpdates":
opError = u.doRequestRealtimeUpdates(&msg)
default:
u.socket.WriteJSON(MessageOut{Error: badCommand(msg.Command)})
}
if opError != nil {
u.socket.WriteJSON(MessageOut{Error: opError})
}
}
}
// SendUpdate sends a set of measuremnts to the users.
func (u *UserServer) SendUpdate(values UserEventUpdateArgs) error {
now := time.Now().UnixNano() / 1e6
return u.socket.WriteJSON(MessageOut{Command: "update", Now: &now, Args: values})
}
// SendMetadata sends a set of metadata descriptions to the user.
func (u *UserServer) SendMetadata(data UserEventMetadataArgs) error {
now := time.Now().UnixNano() / 1e6
return u.socket.WriteJSON(MessageOut{Command: "metadata", Now: &now, Args: data})
}
func (u *UserServer) doGetMetadata(cmd *MessageIn) *Error {
var err error
if u.GetMetadata == nil {
return operationFailed("not supported")
}
err = u.GetMetadata()
if err != nil {
return operationFailed(err.Error())
}
return nil
}
func (u *UserServer) doGetValues(cmd *MessageIn) *Error {
var args UserCmdGetValuesArgs
var err error
if err = json.Unmarshal(cmd.Args, &args); err != nil {
return operationFailed(err.Error())
}
if u.GetValues == nil {
return operationFailed("not supported")
}
err = u.GetValues(time.Unix(int64(args.SinceUnixMs/1000), int64(args.SinceUnixMs)%1000*1e6),
time.Unix(int64(args.UntilUnixMs/1000), int64(args.UntilUnixMs)%1000*1e6),
args.TimeResolution,
args.Sensors)
if err != nil {
return operationFailed(err.Error())
}
return nil
}
func (u *UserServer) doRequestRealtimeUpdates(cmd *MessageIn) *Error {
var args UserCmdRequestRealtimeUpdatesArgs
var err error
if err = json.Unmarshal(cmd.Args, &args); err != nil {
return operationFailed(err.Error())
}
if u.RequestRealtimeUpdates == nil {
return operationFailed("not supported")
}
err = u.RequestRealtimeUpdates(args)
if err != nil {
return operationFailed(err.Error())
}
return nil
}
// NewUserServer returns a new UserServer running on a websocket on the given http connection.
func NewUserServer(w http.ResponseWriter, r *http.Request) (*UserServer, error) {
base, err := initAPIBaseFromHTTP(w, r, []string{userAPIProtocolV3})
if err != nil {
return nil, err
}
result := &UserServer{
apiBase: base,
}
return result, nil
}