-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathnode.go
337 lines (290 loc) · 10.4 KB
/
node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
// Package noise implements the Noise Protocol for peer-to-peer communication.
// For more information about the Noise Protocol, please visit: [Noise Protocol].
//
// [Noise Protocol]: http://www.noiseprotocol.org/noise.html
package noise
import (
"context"
"fmt"
"log"
"net"
"time"
"github.com/oxtoacart/bpool"
)
// futureDeadline calculate and return a new time for deadline since now.
func futureDeadLine(deadline time.Duration) time.Time {
if deadline == 0 {
// deadline 0 = no deadline
return time.Time{}
}
// how long should i wait for activity?
// since now add a new future deadline
return time.Now().Add(deadline * time.Second)
}
type Config interface {
// Default "tcp"
Protocol() string
// Default 0.0.0.0:8010
SelfListeningAddress() string
// Default 0
Linger() int
// Default 100
MaxPeersConnected() uint8
// Default 10 << 20 = 10MB
PoolBufferSize() int
// Default 0
IdleTimeout() time.Duration
// Default 5 seconds
DialTimeout() time.Duration
// Default 1800 seconds
KeepAlive() time.Duration
}
// Node represents a network node capable of handling connections,
// routing messages, and managing configurations.
type Node struct {
// Bound local network listener.
listener net.Listener
// Routing hash table eg. {Socket: Conn interface}.
router *router
// Pubsub notifications.
events *events
// Global buffer pool
pool BytePool
// Configuration settings
config Config
}
// New create a new node with defaults
func New(config Config) *Node {
// Max allowed "pools" is related to max active peers.
maxPools := int(config.MaxPeersConnected())
// Width of global pool buffer
maxBufferSize := config.PoolBufferSize()
pool := bpool.NewBytePool(maxPools, maxBufferSize)
return &Node{
router: newRouter(),
events: newEvents(),
pool: pool,
config: config,
}
}
// Signals initiates the signaling process to proxy channels to subscribers.
// It returns a channel of type Signal to intercept events and a cancel function to stop the listening routine.
// The channel is closed during the cancellation of listening.
func (n *Node) Signals() (<-chan Signal, context.CancelFunc) {
ctx, cancel := context.WithCancel(context.Background())
// this channel is closed during listening cancellation
ch := make(chan Signal)
// forward signals to internal events signaling
go n.events.Listen(ctx, ch)
return ch, cancel // read only channel for raw messages
}
// Disconnect close all the peer connections without stop listening.
func (n *Node) Disconnect() {
log.Print("closing connections and shutting down node..")
for peer := range n.router.Table() {
if err := peer.Close(); err != nil {
log.Printf("error when shutting down connection: %v", err)
}
}
}
// Send emits a new message using a peer ID.
// It returns the total bytes sent if there is no error; otherwise, it returns 0.
// If the peer ID doesn't exist or the peer is not connected, it returns an error.
// Calling Send extends the write deadline.
func (n *Node) Send(rawID string, message []byte) (uint32, error) {
id := newIDFromString(rawID)
// Check if id exists in connected peers
// check in-band error
peer, ok := n.router.Query(id)
if !ok {
err := fmt.Errorf("remote peer disconnected: %s", id.String())
return 0, errSendingMessage(err)
}
bytes, err := peer.Send(message)
// An idle timeout can be implemented by repeatedly extending
// the deadline after successful Read or Write calls.
idle := futureDeadLine(n.config.IdleTimeout())
peer.SetDeadline(idle)
return bytes, err
}
// watch keeps running, waiting for incoming messages.
// After receiving each new message, the connection is verified. If the local connection is closed or the remote peer is disconnected, the routine stops.
// It is suggested to process incoming messages in separate goroutines.
func (n *Node) watch(peer *peer) {
KEEPALIVE:
for {
// Waiting for new incoming message
buf, err := peer.Listen()
if err != nil {
// net: don't return io.EOF from zero byte reads
// Notify about the remote peer state
n.events.PeerDisconnected(peer)
// Remove peer from router table
n.router.Remove(peer)
return
}
if buf == nil {
log.Printf("buffer nil with err: %v", err)
// `buf` is nil if no more bytes received but peer is still connected
// Keep alive always that zero bytes are not received
break KEEPALIVE
}
log.Print("receiving message from remote")
// Emit new incoming message notification
n.events.NewMessage(peer, buf)
// An idle timeout can be implemented by repeatedly extending
// the deadline after successful Read or Write calls.
idle := futureDeadLine(n.config.IdleTimeout())
peer.SetDeadline(idle)
}
}
// setupTCPConnection configures the behavior of a TCP connection.
// It takes a net.TCPConn connection and modifies its settings according to the Node configuration.
// If any of the configurations cannot be fulfilled, it returns an error.
func (n *Node) setupTCPConnection(conn *net.TCPConn) error {
// If tcp enforce keep alive connection.
// SetKeepAlive sets whether the operating system should send keep-alive messages on the connection.
// We can modify the behavior of the connection using idle timeout and keep alive or just disable keep alive and force the use of idle timeout to determine the inactivity of remote nodes.
// ref: https://support.f5.com/csp/article/K13004262
if err := conn.SetKeepAlivePeriod(n.config.KeepAlive()); err != nil {
return err
}
// Set linger time in seconds to wait to discard unsent data after close.
// discard after N seconds unsent packages on close connection.
if err := conn.SetLinger(n.config.Linger()); err != nil {
return err
}
return nil
}
// handshake initiates a new handshake for an incoming or dialed connection.
// After the handshake completes, a new session is created, and a new peer is added to the router.
// If the TCP protocol is used, the connection is enforced to keep alive.
// Returns an error if the maximum number of connected peers exceeds MaxPeersConnected; otherwise, returns nil.
func (n *Node) handshake(conn net.Conn, initialize bool) error {
// Assertion for tcp connection to keep alive
log.Print("starting handshake")
connection, isTCP := conn.(*net.TCPConn)
if isTCP {
// Setup network parameters to control connection behavior.
if err := n.setupTCPConnection(connection); err != nil {
return errSettingUpConnection(err)
}
}
// Drop connections if max peers exceeded
if n.router.Len() >= n.config.MaxPeersConnected() {
connection.Close() // Drop connection :(
log.Printf("max peers exceeded: MaxPeerConnected = %d", n.config.MaxPeersConnected())
return errExceededMaxPeers(n.config.MaxPeersConnected())
}
// Stage 1 -> run handshake
h, err := newHandshake(connection, initialize)
if err != nil {
log.Printf("error while creating handshake: %s", err)
return err
}
err = h.Start() // start the handshake
if err != nil {
log.Printf("error while starting handshake: %s", err)
return err
}
// Stage 2 -> get a secure session
// All good with handshake? Then get a secure session.
log.Print("handshake complete")
session := h.Session()
// Stage 3 -> create a peer and add it to router
// Routing for secure session
peer := n.routing(session)
// Keep watching for incoming messages
// This routine will stop when Close() is called
go n.watch(peer)
// Dispatch event for new peer connected
n.events.PeerConnected(peer)
return nil
}
// routing initializes a route in the routing table from a session.
// It returns the recently added peer.
func (n *Node) routing(conn *session) *peer {
// Initial deadline for connection.
// A deadline is an absolute time after which I/O operations
// fail instead of blocking. The deadline applies to all future
// and pending I/O, not just the immediately following call to
// Read or Write. After a deadline has been exceeded, the
// connection can be refreshed by setting a deadline in the future.
// ref: https://pkg.go.dev/net#Conn
idle := futureDeadLine(n.config.IdleTimeout())
conn.SetDeadline(idle)
// We need to know how interact with peer based on socket and connection
peer := newPeer(conn)
// Bind global buffer pool to peer.
// Pool buffering reduce memory allocation latency.
peer.BindPool(n.pool)
// Store new peer in router table
n.router.Add(peer)
return peer
}
// LocalAddr returns the local address assigned to node.
// If node is not listening nil is returned instead.
func (n *Node) LocalAddr() net.Addr {
if n.listener == nil {
return nil
}
return n.listener.Addr()
}
// Listen start listening on the given address and wait for new connection.
// Return error if error occurred while listening.
func (n *Node) Listen() error {
addr := n.config.SelfListeningAddress() // eg. 0.0.0.0
protocol := n.config.Protocol() // eg. tcp
listener, err := net.Listen(protocol, addr)
log.Printf("listening on %s", addr)
if err != nil {
log.Printf("error listening on %s: %v", addr, err)
return err
}
// The order here is IMPORTANT.
// We set listener first then we notify listening event, otherwise a race condition is caused.
n.listener = listener // keep reference to current listener.
n.events.SelfListening(addr) // emit listening event
for {
// Block/Hold while waiting for new incoming connection
// Synchronized incoming connections
conn, err := listener.Accept()
if err != nil {
log.Printf("error accepting connection %s", err)
return errBindingConnection(err)
}
// Run handshake for incoming connection
// We need to run in a separate goroutine to improve time performance between nodes requesting connections.
go n.handshake(conn, false)
}
}
// Close all peers connections and stop listening.
func (n *Node) Close() error {
// close peer connections
go n.Disconnect()
// stop listener for listening node only
if n.listener == nil {
return nil
}
if err := n.listener.Close(); err != nil {
return err
}
return nil
}
// Dial attempts to connect to a remote node and adds the connected peer to the routing table.
// It returns an error if an error occurred while dialing the node.
func (n *Node) Dial(addr string) error {
protocol := n.config.Protocol() // eg. tcp
timeout := n.config.DialTimeout() // max time waiting for dial.
// Start dialing to address
conn, err := net.DialTimeout(protocol, addr, timeout)
log.Printf("dialing to %s", addr)
if err != nil {
return errDialingNode(err)
}
// Run handshake for dialed connection
if err = n.handshake(conn, true); err != nil {
return err
}
return nil
}