-
Notifications
You must be signed in to change notification settings - Fork 2
/
comm.go
218 lines (196 loc) · 5.4 KB
/
comm.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
package main
import (
"encoding/binary"
"flag"
"fmt"
"io"
"time"
)
const magic = 1736434764
const recvResponseMagic = magic + 1
const sendOpType = 3
const recvOpType = 4
type header struct {
Magic uint32
NodeCount int32
NodeID int32
}
type recvResponse struct {
RecvResponseMagic uint32
SourceID int32
Length int32
// Message []byte
}
type recvHeader struct {
// OpType byte
SourceID int32
Time int32 // milliseconds
}
type sendHeader struct {
// OpType byte
TargetID int32
Time int32 // milliseconds
Length int32
// Message []byte
}
// TODO(robryk): Move this to instance-creation-time options
var messageCountLimit = flag.Int("message_count_limit", 1000, "Limit for the number of messages sent per instance")
var messageSizeLimit = flag.Int("message_size_limit", 8*1024*1024, "Limit for the total size of messages sent by an instance, in bytes")
type Message struct {
Source int
Target int
SendTime time.Duration
Message []byte
}
// ErrMessageCount is returned when an instance exceeds the per-instance message count limit.
// It is usually encapsulated in an InstanceError that specifies the instance ID.
type ErrMessageCount struct {
}
func (err ErrMessageCount) Error() string {
return fmt.Sprintf("sent message count limit (%d) exceeded", *messageCountLimit)
}
// ErrMessageSize is returned when an instance exceeds the per-instance total messages size limit.
// It is usually encapsulated in an InstanceError that specifies the instance ID.
type ErrMessageSize struct {
}
func (err ErrMessageSize) Error() string {
return fmt.Sprintf("total sent message size limit (%d bytes) exceeded", *messageSizeLimit)
}
func writeMessage(w io.Writer, message *Message) error {
rr := recvResponse{
RecvResponseMagic: recvResponseMagic,
SourceID: int32(message.Source),
Length: int32(len(message.Message)),
}
if err := binary.Write(w, binary.LittleEndian, &rr); err != nil {
return err
}
if n, err := w.Write(message.Message); n < len(message.Message) {
if err == nil {
err = io.ErrShortWrite
}
return err
}
return nil
}
func writeHeader(w io.Writer, id int, instanceCount int) error {
h := header{
Magic: magic,
NodeCount: int32(instanceCount),
NodeID: int32(id),
}
return binary.Write(w, binary.LittleEndian, &h)
}
const (
requestSend = iota
requestRecv
requestRecvAny
// requestNop
)
type request struct {
requestType int
time time.Duration
// for requestSend:
destination int
message []byte
// for requestRecv:
source int
}
func (req request) hasResponse() bool {
switch req.requestType {
case requestRecv:
return true
case requestRecvAny:
return true
default:
return false
}
}
type response struct {
message *Message
}
func readRequest(r io.Reader) (*request, error) {
var opType [1]byte
if _, err := r.Read(opType[:]); err != nil {
return nil, err
}
switch opType[0] {
case sendOpType:
var sh sendHeader
if err := binary.Read(r, binary.LittleEndian, &sh); err != nil {
return nil, err
}
if sh.Length < 0 || int(sh.Length) > *messageSizeLimit {
return nil, fmt.Errorf("invalid size of a message to be sent: %d", sh.Length)
}
if sh.TargetID < 0 || sh.TargetID >= MaxInstances {
return nil, fmt.Errorf("invalid target instance in a send request: %d", sh.TargetID)
}
message := make([]byte, sh.Length)
if _, err := io.ReadFull(r, message); err != nil {
return nil, err
}
return &request{
requestType: requestSend,
time: time.Duration(sh.Time) * time.Millisecond,
destination: int(sh.TargetID),
message: message}, nil
case recvOpType:
var rh recvHeader
if err := binary.Read(r, binary.LittleEndian, &rh); err != nil {
return nil, err
}
if rh.SourceID < -1 || rh.SourceID >= MaxInstances {
return nil, fmt.Errorf("invalid source instance in a receive request: %d", rh.SourceID)
}
if rh.SourceID == -1 {
return &request{requestType: requestRecvAny, time: time.Duration(rh.Time) * time.Millisecond}, nil
} else {
return &request{requestType: requestRecv, time: time.Duration(rh.Time) * time.Millisecond, source: int(rh.SourceID)}, nil
}
default:
return nil, fmt.Errorf("invalid operation type 0x%x", opType[0])
}
}
func (i *Instance) communicate(r io.Reader, w io.Writer, reqCh chan<- *request, respCh <-chan *response) error {
i.TimeBlocked = time.Duration(0)
// TODO: Figure out what errors should be returned from this function. We currently error if the instance fails to read the header (which is mitigated by delaying the closure of other ends of the pipes), for example.
if err := writeHeader(w, i.ID, i.TotalInstances); err != nil {
return err
}
for {
req, err := readRequest(r)
if err != nil {
if err == io.EOF {
//return nil
}
return err
}
req.time += i.TimeBlocked
if req.requestType == requestSend {
i.MessagesSent++
if i.MessagesSent > *messageCountLimit {
return ErrMessageCount{}
}
i.MessageBytesSent += len(req.message)
if i.MessageBytesSent > *messageSizeLimit {
return ErrMessageSize{}
}
}
currentTime := req.time
hasResponse := req.hasResponse()
reqCh <- req
if hasResponse {
resp, ok := <-respCh
if !ok {
return fmt.Errorf("Received no response for a receive request")
}
if resp.message.SendTime > currentTime {
i.TimeBlocked += resp.message.SendTime - currentTime
}
if err := writeMessage(w, resp.message); err != nil {
return err
}
}
}
}