From a51f3a996bc067ad60b7e290fe1bcf333967c9d2 Mon Sep 17 00:00:00 2001 From: Yahtoo Ma Date: Thu, 12 Apr 2018 14:24:24 +0800 Subject: [PATCH 1/8] Filter connected nodes --- p2p/pex_reactor.go | 2 +- p2p/switch.go | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index b5c668362..3db22e36a 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -268,7 +268,7 @@ func (r *PEXReactor) ensurePeers() { alreadyDialing := r.Switch.IsDialing(try) var alreadyConnected bool for _, v := range r.Switch.Peers().list { - if strings.Compare(v.mconn.RemoteAddress.String(), try.String()) == 0 { + if strings.Compare(v.mconn.RemoteAddress.IP.String(), try.IP.String()) == 0 { alreadyConnected = true break } diff --git a/p2p/switch.go b/p2p/switch.go index 462aa651a..58e18578c 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -16,6 +16,7 @@ import ( cfg "github.com/bytom/config" "github.com/bytom/errors" "github.com/bytom/p2p/trust" + "strings" ) const ( @@ -362,7 +363,11 @@ func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, if err := sw.checkBannedPeer(addr.IP.String()); err != nil { return nil, err } - + for _, v := range sw.Peers().list { + if strings.Compare(v.mconn.RemoteAddress.IP.String(), addr.IP.String()) == 0 { + return nil, errors.New("Peer is connected") + } + } sw.dialing.Set(addr.IP.String(), addr) defer sw.dialing.Delete(addr.IP.String()) From 7627cf250c464baa33aa0738bc8579386cae6fe7 Mon Sep 17 00:00:00 2001 From: Yahtoo Ma Date: Thu, 12 Apr 2018 15:10:49 +0800 Subject: [PATCH 2/8] Avoid connecting to self --- p2p/switch.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/p2p/switch.go b/p2p/switch.go index 58e18578c..052a53425 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -363,6 +363,9 @@ func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, if err := sw.checkBannedPeer(addr.IP.String()); err != nil { return nil, err } + if strings.Compare(addr.IP.String(), sw.nodeInfo.ListenHost())==0 { + return nil, errors.New("Connect self") + } for _, v := range sw.Peers().list { if strings.Compare(v.mconn.RemoteAddress.IP.String(), addr.IP.String()) == 0 { return nil, errors.New("Peer is connected") From 71801c20540abebdda19ececb4dc4fe20c6fc191 Mon Sep 17 00:00:00 2001 From: Yahtoo Ma Date: Thu, 12 Apr 2018 17:16:16 +0800 Subject: [PATCH 3/8] Change StopPeerForError to sequential execution --- p2p/switch.go | 62 ++++++++++++++++++++++++++------------------------- 1 file changed, 32 insertions(+), 30 deletions(-) diff --git a/p2p/switch.go b/p2p/switch.go index 052a53425..37bf4e486 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -20,8 +20,8 @@ import ( ) const ( - reconnectAttempts = 30 - reconnectInterval = 3 * time.Second + reconnectAttempts = 10 + reconnectInterval = 10 * time.Second bannedPeerKey = "BannedPeer" defaultBanDuration = time.Hour * 24 @@ -95,6 +95,8 @@ type Switch struct { var ( ErrSwitchDuplicatePeer = errors.New("Duplicate peer") + ErrConnectSelf = errors.New("Connect self") + ErrPeerConnected = errors.New("Peer is connected") ) func NewSwitch(config *cfg.P2PConfig, trustHistoryDB dbm.DB) *Switch { @@ -363,12 +365,12 @@ func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, if err := sw.checkBannedPeer(addr.IP.String()); err != nil { return nil, err } - if strings.Compare(addr.IP.String(), sw.nodeInfo.ListenHost())==0 { - return nil, errors.New("Connect self") + if strings.Compare(addr.IP.String(), sw.nodeInfo.ListenHost()) == 0 { + return nil, ErrConnectSelf } for _, v := range sw.Peers().list { if strings.Compare(v.mconn.RemoteAddress.IP.String(), addr.IP.String()) == 0 { - return nil, errors.New("Peer is connected") + return nil, ErrPeerConnected } } sw.dialing.Set(addr.IP.String(), addr) @@ -454,38 +456,38 @@ func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) { sw.stopAndRemovePeer(peer, reason) if peer.IsPersistent() { - go func() { - log.WithField("peer", peer).Info("Reconnecting to peer") - for i := 1; i < reconnectAttempts; i++ { - if !sw.IsRunning() { - return - } + log.WithField("peer", peer).Info("Reconnecting to peer") + for i := 1; i < reconnectAttempts; i++ { + if !sw.IsRunning() { + return + } - peer, err := sw.DialPeerWithAddress(addr, true) - if err != nil { - if i == reconnectAttempts { - log.WithFields(log.Fields{ - "retries": i, - "error": err, - }).Info("Error reconnecting to peer. Giving up") - return - } - if errors.Root(err) == ErrSwitchDuplicatePeer { - log.WithField("error", err).Info("Error reconnecting to peer. ") - return - } + peer, err := sw.DialPeerWithAddress(addr, true) + if err != nil { + if i == reconnectAttempts { log.WithFields(log.Fields{ "retries": i, "error": err, - }).Info("Error reconnecting to peer. Trying again") - time.Sleep(reconnectInterval) - continue + }).Info("Error reconnecting to peer. Giving up") + return } - log.WithField("peer", peer).Info("Reconnected to peer") - return + if errors.Root(err) == ErrConnectBannedPeer || errors.Root(err) == ErrPeerConnected || errors.Root(err) == ErrSwitchDuplicatePeer || errors.Root(err) == ErrConnectSelf { + log.WithField("error", err).Info("Error reconnecting to peer. ") + return + } + + log.WithFields(log.Fields{ + "retries": i, + "error": err, + }).Info("Error reconnecting to peer. Trying again") + time.Sleep(reconnectInterval) + continue } - }() + + log.WithField("peer", peer).Info("Reconnected to peer") + return + } } } From 53b221e4c4776dcfc5d64b0e277912900fbce584 Mon Sep 17 00:00:00 2001 From: Yahtoo Ma Date: Thu, 12 Apr 2018 17:37:22 +0800 Subject: [PATCH 4/8] Add block sync sw peer check --- netsync/sync.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/netsync/sync.go b/netsync/sync.go index 5afc4bc7c..1411a05c7 100644 --- a/netsync/sync.go +++ b/netsync/sync.go @@ -88,6 +88,10 @@ func (sm *SyncManager) synchronise() { if peer == nil { return } + if ok:=sm.Switch().Peers().Has(peer.Key); !ok{ + log.Info("Peer disconnected") + sm.sw.StopPeerGracefully(peer) + } if bestHeight > sm.chain.BestBlockHeight() { log.Info("sync peer:", peer.Addr(), " height:", bestHeight) sm.blockKeeper.BlockRequestWorker(peer.Key, bestHeight) From 62e52d49db704df771a40dfee597cf25414c1edf Mon Sep 17 00:00:00 2001 From: wz Date: Fri, 13 Apr 2018 15:32:42 +0800 Subject: [PATCH 5/8] Add broken link to return --- netsync/sync.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/netsync/sync.go b/netsync/sync.go index 1411a05c7..6bb2a6a0f 100644 --- a/netsync/sync.go +++ b/netsync/sync.go @@ -88,10 +88,13 @@ func (sm *SyncManager) synchronise() { if peer == nil { return } - if ok:=sm.Switch().Peers().Has(peer.Key); !ok{ + + if ok := sm.Switch().Peers().Has(peer.Key); !ok { log.Info("Peer disconnected") sm.sw.StopPeerGracefully(peer) + return } + if bestHeight > sm.chain.BestBlockHeight() { log.Info("sync peer:", peer.Addr(), " height:", bestHeight) sm.blockKeeper.BlockRequestWorker(peer.Key, bestHeight) From 0af2e399160a671896126ab3eb682ef183c0ca09 Mon Sep 17 00:00:00 2001 From: wz Date: Sat, 14 Apr 2018 10:37:33 +0800 Subject: [PATCH 6/8] fix conn close --- p2p/connection.go | 22 +++++++++++----------- p2p/pex_reactor.go | 8 +++++--- p2p/switch.go | 9 ++++++--- 3 files changed, 22 insertions(+), 17 deletions(-) diff --git a/p2p/connection.go b/p2p/connection.go index 5eecebc76..32ef469ee 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -409,19 +409,19 @@ FOR_LOOP: // Block until .recvMonitor says we can read. c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.RecvRate), true) -/* - // Peek into bufReader for debugging - if numBytes := c.bufReader.Buffered(); numBytes > 0 { - log.Infof("Peek connection buffer numBytes:", numBytes) - bytes, err := c.bufReader.Peek(cmn.MinInt(numBytes, 100)) - if err == nil { - log.Infof("bytes:", bytes) + /* + // Peek into bufReader for debugging + if numBytes := c.bufReader.Buffered(); numBytes > 0 { + log.Infof("Peek connection buffer numBytes:", numBytes) + bytes, err := c.bufReader.Peek(cmn.MinInt(numBytes, 100)) + if err == nil { + log.Infof("bytes:", bytes) + } else { + log.Warning("Error peeking connection buffer err:", err) + } } else { - log.Warning("Error peeking connection buffer err:", err) + log.Warning("Received bytes number is:", numBytes) } - } else { - log.Warning("Received bytes number is:", numBytes) - } */ // Read packet type diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index 3db22e36a..bc2d3247d 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -5,13 +5,13 @@ import ( "fmt" "math/rand" "reflect" - "time" "strings" + "time" + "github.com/bytom/errors" log "github.com/sirupsen/logrus" wire "github.com/tendermint/go-wire" cmn "github.com/tendermint/tmlibs/common" - "github.com/bytom/errors" ) const ( @@ -246,6 +246,7 @@ func (r *PEXReactor) ensurePeers() { return } + newBias := cmn.MinInt(numOutPeers, 8)*10 + 10 toDial := make(map[string]*NetAddress) // Try to pick numToDial addresses to dial. @@ -255,7 +256,7 @@ func (r *PEXReactor) ensurePeers() { // if we already have many connections. This algorithm isn't perfect, but // it somewhat ensures that we prioritize connecting to more-vetted // peers. - newBias := cmn.MinInt(numOutPeers, 8)*10 + 10 + var picked *NetAddress // Try to fetch a new peer 3 times. // This caps the maximum number of tries to 3 * numToDial. @@ -267,6 +268,7 @@ func (r *PEXReactor) ensurePeers() { _, alreadySelected := toDial[try.IP.String()] alreadyDialing := r.Switch.IsDialing(try) var alreadyConnected bool + for _, v := range r.Switch.Peers().list { if strings.Compare(v.mconn.RemoteAddress.IP.String(), try.IP.String()) == 0 { alreadyConnected = true diff --git a/p2p/switch.go b/p2p/switch.go index 37bf4e486..e8207b988 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -13,10 +13,11 @@ import ( cmn "github.com/tendermint/tmlibs/common" dbm "github.com/tendermint/tmlibs/db" + "strings" + cfg "github.com/bytom/config" "github.com/bytom/errors" "github.com/bytom/p2p/trust" - "strings" ) const ( @@ -516,6 +517,8 @@ func (sw *Switch) listenerRoutine(l Listener) { // ignore connection if we already have enough maxPeers := sw.config.MaxNumPeers if maxPeers <= sw.peers.Size() { + // close inConn + inConn.Close() log.WithFields(log.Fields{ "address": inConn.RemoteAddr().String(), "numPeers": sw.peers.Size(), @@ -527,6 +530,8 @@ func (sw *Switch) listenerRoutine(l Listener) { // New inbound connection! err := sw.addPeerWithConnectionAndConfig(inConn, sw.peerConfig) if err != nil { + // conn close for returing err + inConn.Close() log.WithFields(log.Fields{ "address": inConn.RemoteAddr().String(), "error": err, @@ -662,12 +667,10 @@ func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConf peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config) if err != nil { - conn.Close() return err } peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr())) if err = sw.AddPeer(peer); err != nil { - conn.Close() return err } From 713ccadc384139e28bfda02bf3dfeb62685d53d8 Mon Sep 17 00:00:00 2001 From: Yahtoo Ma Date: Sat, 14 Apr 2018 15:30:43 +0800 Subject: [PATCH 7/8] Change dial peer to sequential execution --- netsync/handle.go | 2 +- netsync/protocol_reactor.go | 16 +++++++++++++++- p2p/pex_reactor.go | 10 ++++------ p2p/switch.go | 7 ++----- 4 files changed, 22 insertions(+), 13 deletions(-) diff --git a/netsync/handle.go b/netsync/handle.go index 770b4d4d0..9da1da031 100644 --- a/netsync/handle.go +++ b/netsync/handle.go @@ -141,7 +141,7 @@ func (sm *SyncManager) netStart() error { //Start start sync manager service func (sm *SyncManager) Start() { - sm.netStart() + go sm.netStart() // broadcast transactions go sm.txBroadcastLoop() diff --git a/netsync/protocol_reactor.go b/netsync/protocol_reactor.go index b857da710..9d733ab3e 100644 --- a/netsync/protocol_reactor.go +++ b/netsync/protocol_reactor.go @@ -3,6 +3,7 @@ package netsync import ( "reflect" "strings" + "sync" "time" log "github.com/sirupsen/logrus" @@ -20,11 +21,13 @@ const ( // BlockchainChannel is a channel for blocks and status updates BlockchainChannel = byte(0x40) protocolHandshakeTimeout = time.Second * 10 + handshakeRetryTicker = 4 * time.Second ) var ( //ErrProtocolHandshakeTimeout peers handshake timeout ErrProtocolHandshakeTimeout = errors.New("Protocol handshake timeout") + ErrStatusRequest = errors.New("Status request error") ) // Response describes the response standard. @@ -50,6 +53,7 @@ type ProtocolReactor struct { sw *p2p.Switch fetcher *Fetcher peers *peerSet + handshakeMu sync.Mutex newPeerCh chan struct{} quitReqBlockCh chan *string @@ -112,7 +116,13 @@ func (pr *ProtocolReactor) syncTransactions(p *peer) { // AddPeer implements Reactor by sending our state to peer. func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error { - peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}) + pr.handshakeMu.Lock() + defer pr.handshakeMu.Unlock() + + if ok := peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok { + return ErrStatusRequest + } + retryTicker := time.Tick(handshakeRetryTicker) handshakeWait := time.NewTimer(protocolHandshakeTimeout) for { select { @@ -124,6 +134,10 @@ func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error { pr.newPeerCh <- struct{}{} return nil } + case <-retryTicker: + if ok := peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok { + return ErrStatusRequest + } case <-handshakeWait.C: return ErrProtocolHandshakeTimeout } diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index bc2d3247d..cc7e4ddd8 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -291,12 +291,10 @@ func (r *PEXReactor) ensurePeers() { // Dial picked addresses for _, item := range toDial { - go func(picked *NetAddress) { - _, err := r.Switch.DialPeerWithAddress(picked, false) - if err != nil { - r.book.MarkAttempt(picked) - } - }(item) + _, err := r.Switch.DialPeerWithAddress(item, false) + if err != nil { + r.book.MarkAttempt(item) + } } // If we need more addresses, pick a random peer and ask for more. diff --git a/p2p/switch.go b/p2p/switch.go index e8207b988..a3a8f7f7a 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -344,11 +344,8 @@ func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error { // permute the list, dial them in random order. perm := rand.Perm(len(netAddrs)) for i := 0; i < len(perm); i++ { - go func(i int) { - time.Sleep(time.Duration(rand.Int63n(3000)) * time.Millisecond) - j := perm[i] - sw.dialSeed(netAddrs[j]) - }(i) + j := perm[i] + sw.dialSeed(netAddrs[j]) } return nil } From 3be4f80e5a8cae4695f8eb7290dba945a197e267 Mon Sep 17 00:00:00 2001 From: Yahtoo Ma Date: Tue, 17 Apr 2018 13:27:38 +0800 Subject: [PATCH 8/8] Optimized code format --- netsync/protocol_reactor.go | 1 - p2p/pex_reactor.go | 5 ++--- p2p/switch.go | 3 +-- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/netsync/protocol_reactor.go b/netsync/protocol_reactor.go index df9ef09c1..cea17e25f 100644 --- a/netsync/protocol_reactor.go +++ b/netsync/protocol_reactor.go @@ -2,7 +2,6 @@ package netsync import ( "reflect" - "strings" "sync" "time" diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index 651262e3a..d42276fe2 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -12,7 +12,7 @@ import ( wire "github.com/tendermint/go-wire" cmn "github.com/tendermint/tmlibs/common" - "github.com/bytom/errors" + "github.com/bytom/errors" ) const ( @@ -292,8 +292,7 @@ func (r *PEXReactor) ensurePeers() { // Dial picked addresses for _, item := range toDial { - _, err := r.Switch.DialPeerWithAddress(item, false) - if err != nil { + if _, err := r.Switch.DialPeerWithAddress(item, false); err != nil { r.book.MarkAttempt(item) } } diff --git a/p2p/switch.go b/p2p/switch.go index a3a8f7f7a..9f9882036 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -7,14 +7,13 @@ import ( "net" "sync" "time" + "strings" log "github.com/sirupsen/logrus" crypto "github.com/tendermint/go-crypto" cmn "github.com/tendermint/tmlibs/common" dbm "github.com/tendermint/tmlibs/db" - "strings" - cfg "github.com/bytom/config" "github.com/bytom/errors" "github.com/bytom/p2p/trust"