-
Notifications
You must be signed in to change notification settings - Fork 0
/
connection_manager.go
122 lines (110 loc) · 3.42 KB
/
connection_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package main
import (
"context"
"fmt"
"net"
"net/netip"
"go.uber.org/zap"
"github.com/wavesplatform/gowaves/pkg/p2p/incoming"
"github.com/wavesplatform/gowaves/pkg/p2p/outgoing"
"github.com/wavesplatform/gowaves/pkg/p2p/peer"
"github.com/wavesplatform/gowaves/pkg/proto"
"github.com/alexeykiselev/waves-fork-detector/peers"
)
const (
defaultApplication = "waves"
)
type ConnectionManager struct {
network string
name string
nonce uint32
declared proto.TCPAddr
parent peer.Parent
vp peers.VersionProvider
discarded map[proto.PeerMessageID]bool
}
func NewConnectionManager(
scheme byte, name string, nonce uint32, declared proto.TCPAddr, vp peers.VersionProvider, parent peer.Parent,
) *ConnectionManager {
discardedMessages := map[proto.PeerMessageID]bool{
proto.ContentIDGetPeers: false,
proto.ContentIDPeers: false,
proto.ContentIDGetSignatures: true,
proto.ContentIDSignatures: false,
proto.ContentIDGetBlock: true,
proto.ContentIDBlock: false,
proto.ContentIDScore: false,
proto.ContentIDTransaction: true,
proto.ContentIDInvMicroblock: false,
proto.ContentIDCheckpoint: true,
proto.ContentIDMicroblockRequest: true,
proto.ContentIDMicroblock: true,
proto.ContentIDPBBlock: false,
proto.ContentIDPBMicroBlock: true,
proto.ContentIDPBTransaction: true,
proto.ContentIDGetBlockIDs: true,
proto.ContentIDBlockIDs: false,
proto.ContentIDGetBlockSnapshot: true,
proto.ContentIDMicroBlockSnapshot: true,
proto.ContentIDBlockSnapshot: true,
proto.ContentIDMicroBlockSnapshotRequest: true,
}
return &ConnectionManager{
network: fmt.Sprintf("%s%c", defaultApplication, scheme),
name: name,
nonce: nonce,
declared: declared,
parent: parent,
vp: vp,
discarded: discardedMessages,
}
}
func (h *ConnectionManager) Accept(ctx context.Context, conn net.Conn) error {
zap.S().Debugf("[CON] New incoming connection from %s", conn.RemoteAddr())
ap, err := netip.ParseAddrPort(conn.RemoteAddr().String())
if err != nil {
return fmt.Errorf("failed to handle incoming connection: %w", err)
}
ver, err := h.vp.SuggestVersion(ap.Addr())
if err != nil {
return err
}
params := incoming.PeerParams{
WavesNetwork: h.network,
Conn: conn,
Skip: h.skipFunc,
Parent: h.parent,
DeclAddr: h.declared,
NodeName: h.name,
NodeNonce: uint64(h.nonce),
Version: ver,
}
return incoming.RunIncomingPeer(ctx, params)
}
func (h *ConnectionManager) Connect(ctx context.Context, addr proto.TCPAddr) error {
zap.S().Debugf("[CON] New outgoing connection to %s", addr)
params := outgoing.EstablishParams{
Address: addr,
WavesNetwork: h.network,
Parent: h.parent,
DeclAddr: h.declared,
Skip: h.skipFunc,
NodeName: h.name,
NodeNonce: uint64(h.nonce),
}
ap, err := netip.ParseAddrPort(addr.String())
if err != nil {
return err
}
ver, err := h.vp.SuggestVersion(ap.Addr())
if err != nil {
return err
}
return outgoing.EstablishConnection(ctx, params, ver)
}
func (h *ConnectionManager) skipFunc(header proto.Header) bool {
if r, ok := h.discarded[header.ContentID]; ok {
return r
}
return false
}