Skip to content
This repository has been archived by the owner on Mar 21, 2024. It is now read-only.

Commit

Permalink
Merge pull request #673 from Bytom/dev
Browse files Browse the repository at this point in the history
merge dev
  • Loading branch information
wliyongfeng authored Apr 17, 2018
2 parents 9d3dab4 + 1e39e29 commit a480139
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 56 deletions.
2 changes: 1 addition & 1 deletion netsync/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (sm *SyncManager) netStart() error {

//Start start sync manager service
func (sm *SyncManager) Start() {
sm.netStart()
go sm.netStart()
// broadcast transactions
go sm.txBroadcastLoop()

Expand Down
16 changes: 15 additions & 1 deletion netsync/protocol_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package netsync

import (
"reflect"
"sync"
"time"

log "github.com/sirupsen/logrus"
Expand All @@ -19,11 +20,13 @@ const (
// BlockchainChannel is a channel for blocks and status updates
BlockchainChannel = byte(0x40)
protocolHandshakeTimeout = time.Second * 10
handshakeRetryTicker = 4 * time.Second
)

var (
//ErrProtocolHandshakeTimeout peers handshake timeout
ErrProtocolHandshakeTimeout = errors.New("Protocol handshake timeout")
ErrStatusRequest = errors.New("Status request error")
)

// Response describes the response standard.
Expand All @@ -49,6 +52,7 @@ type ProtocolReactor struct {
sw *p2p.Switch
fetcher *Fetcher
peers *peerSet
handshakeMu sync.Mutex

newPeerCh chan struct{}
quitReqBlockCh chan *string
Expand Down Expand Up @@ -111,7 +115,13 @@ func (pr *ProtocolReactor) syncTransactions(p *peer) {

// AddPeer implements Reactor by sending our state to peer.
func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}})
pr.handshakeMu.Lock()
defer pr.handshakeMu.Unlock()

if ok := peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
return ErrStatusRequest
}
retryTicker := time.Tick(handshakeRetryTicker)
handshakeWait := time.NewTimer(protocolHandshakeTimeout)
for {
select {
Expand All @@ -123,6 +133,10 @@ func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
pr.newPeerCh <- struct{}{}
return nil
}
case <-retryTicker:
if ok := peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
return ErrStatusRequest
}
case <-handshakeWait.C:
return ErrProtocolHandshakeTimeout
}
Expand Down
7 changes: 7 additions & 0 deletions netsync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ func (sm *SyncManager) synchronise() {
if peer == nil {
return
}

if ok := sm.Switch().Peers().Has(peer.Key); !ok {
log.Info("Peer disconnected")
sm.sw.StopPeerGracefully(peer)
return
}

if bestHeight > sm.chain.BestBlockHeight() {
log.Info("sync peer:", peer.Addr(), " height:", bestHeight)
sm.blockKeeper.BlockRequestWorker(peer.Key, bestHeight)
Expand Down
22 changes: 11 additions & 11 deletions p2p/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,19 +409,19 @@ FOR_LOOP:
// Block until .recvMonitor says we can read.
c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.RecvRate), true)

/*
// Peek into bufReader for debugging
if numBytes := c.bufReader.Buffered(); numBytes > 0 {
log.Infof("Peek connection buffer numBytes:", numBytes)
bytes, err := c.bufReader.Peek(cmn.MinInt(numBytes, 100))
if err == nil {
log.Infof("bytes:", bytes)
/*
// Peek into bufReader for debugging
if numBytes := c.bufReader.Buffered(); numBytes > 0 {
log.Infof("Peek connection buffer numBytes:", numBytes)
bytes, err := c.bufReader.Peek(cmn.MinInt(numBytes, 100))
if err == nil {
log.Infof("bytes:", bytes)
} else {
log.Warning("Error peeking connection buffer err:", err)
}
} else {
log.Warning("Error peeking connection buffer err:", err)
log.Warning("Received bytes number is:", numBytes)
}
} else {
log.Warning("Received bytes number is:", numBytes)
}
*/

// Read packet type
Expand Down
18 changes: 10 additions & 8 deletions p2p/pex_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ import (
"fmt"
"math/rand"
"reflect"
"strings"
"time"

log "github.com/sirupsen/logrus"
wire "github.com/tendermint/go-wire"
cmn "github.com/tendermint/tmlibs/common"

"github.com/bytom/errors"
)

Expand Down Expand Up @@ -244,6 +247,7 @@ func (r *PEXReactor) ensurePeers() {
return
}

newBias := cmn.MinInt(numOutPeers, 8)*10 + 10
toDial := make(map[string]*NetAddress)

// Try to pick numToDial addresses to dial.
Expand All @@ -253,7 +257,7 @@ func (r *PEXReactor) ensurePeers() {
// if we already have many connections. This algorithm isn't perfect, but
// it somewhat ensures that we prioritize connecting to more-vetted
// peers.
newBias := cmn.MinInt(numOutPeers, 8)*10 + 10

var picked *NetAddress
// Try to fetch a new peer 3 times.
// This caps the maximum number of tries to 3 * numToDial.
Expand All @@ -265,8 +269,9 @@ func (r *PEXReactor) ensurePeers() {
_, alreadySelected := toDial[try.IP.String()]
alreadyDialing := r.Switch.IsDialing(try)
var alreadyConnected bool

for _, v := range r.Switch.Peers().list {
if v.mconn.RemoteAddress.String() == try.String() {
if strings.Compare(v.mconn.RemoteAddress.IP.String(), try.IP.String()) == 0 {
alreadyConnected = true
break
}
Expand All @@ -287,12 +292,9 @@ func (r *PEXReactor) ensurePeers() {

// Dial picked addresses
for _, item := range toDial {
go func(picked *NetAddress) {
_, err := r.Switch.DialPeerWithAddress(picked, false)
if err != nil {
r.book.MarkAttempt(picked)
}
}(item)
if _, err := r.Switch.DialPeerWithAddress(item, false); err != nil {
r.book.MarkAttempt(item)
}
}

// If we need more addresses, pick a random peer and ask for more.
Expand Down
79 changes: 44 additions & 35 deletions p2p/switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net"
"sync"
"time"
"strings"

log "github.com/sirupsen/logrus"
crypto "github.com/tendermint/go-crypto"
Expand All @@ -19,8 +20,8 @@ import (
)

const (
reconnectAttempts = 30
reconnectInterval = 3 * time.Second
reconnectAttempts = 10
reconnectInterval = 10 * time.Second

bannedPeerKey = "BannedPeer"
defaultBanDuration = time.Hour * 24
Expand Down Expand Up @@ -94,6 +95,8 @@ type Switch struct {

var (
ErrSwitchDuplicatePeer = errors.New("Duplicate peer")
ErrConnectSelf = errors.New("Connect self")
ErrPeerConnected = errors.New("Peer is connected")
)

func NewSwitch(config *cfg.P2PConfig, trustHistoryDB dbm.DB) *Switch {
Expand Down Expand Up @@ -340,11 +343,8 @@ func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error {
// permute the list, dial them in random order.
perm := rand.Perm(len(netAddrs))
for i := 0; i < len(perm); i++ {
go func(i int) {
time.Sleep(time.Duration(rand.Int63n(3000)) * time.Millisecond)
j := perm[i]
sw.dialSeed(netAddrs[j])
}(i)
j := perm[i]
sw.dialSeed(netAddrs[j])
}
return nil
}
Expand All @@ -362,7 +362,14 @@ func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer,
if err := sw.checkBannedPeer(addr.IP.String()); err != nil {
return nil, err
}

if strings.Compare(addr.IP.String(), sw.nodeInfo.ListenHost()) == 0 {
return nil, ErrConnectSelf
}
for _, v := range sw.Peers().list {
if strings.Compare(v.mconn.RemoteAddress.IP.String(), addr.IP.String()) == 0 {
return nil, ErrPeerConnected
}
}
sw.dialing.Set(addr.IP.String(), addr)
defer sw.dialing.Delete(addr.IP.String())

Expand Down Expand Up @@ -446,38 +453,38 @@ func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
sw.stopAndRemovePeer(peer, reason)

if peer.IsPersistent() {
go func() {
log.WithField("peer", peer).Info("Reconnecting to peer")
for i := 1; i < reconnectAttempts; i++ {
if !sw.IsRunning() {
return
}
log.WithField("peer", peer).Info("Reconnecting to peer")
for i := 1; i < reconnectAttempts; i++ {
if !sw.IsRunning() {
return
}

peer, err := sw.DialPeerWithAddress(addr, true)
if err != nil {
if i == reconnectAttempts {
log.WithFields(log.Fields{
"retries": i,
"error": err,
}).Info("Error reconnecting to peer. Giving up")
return
}
if errors.Root(err) == ErrSwitchDuplicatePeer {
log.WithField("error", err).Info("Error reconnecting to peer. ")
return
}
peer, err := sw.DialPeerWithAddress(addr, true)
if err != nil {
if i == reconnectAttempts {
log.WithFields(log.Fields{
"retries": i,
"error": err,
}).Info("Error reconnecting to peer. Trying again")
time.Sleep(reconnectInterval)
continue
}).Info("Error reconnecting to peer. Giving up")
return
}

log.WithField("peer", peer).Info("Reconnected to peer")
return
if errors.Root(err) == ErrConnectBannedPeer || errors.Root(err) == ErrPeerConnected || errors.Root(err) == ErrSwitchDuplicatePeer || errors.Root(err) == ErrConnectSelf {
log.WithField("error", err).Info("Error reconnecting to peer. ")
return
}

log.WithFields(log.Fields{
"retries": i,
"error": err,
}).Info("Error reconnecting to peer. Trying again")
time.Sleep(reconnectInterval)
continue
}
}()

log.WithField("peer", peer).Info("Reconnected to peer")
return
}
}
}

Expand Down Expand Up @@ -506,6 +513,8 @@ func (sw *Switch) listenerRoutine(l Listener) {
// ignore connection if we already have enough
maxPeers := sw.config.MaxNumPeers
if maxPeers <= sw.peers.Size() {
// close inConn
inConn.Close()
log.WithFields(log.Fields{
"address": inConn.RemoteAddr().String(),
"numPeers": sw.peers.Size(),
Expand All @@ -517,6 +526,8 @@ func (sw *Switch) listenerRoutine(l Listener) {
// New inbound connection!
err := sw.addPeerWithConnectionAndConfig(inConn, sw.peerConfig)
if err != nil {
// conn close for returing err
inConn.Close()
log.WithFields(log.Fields{
"address": inConn.RemoteAddr().String(),
"error": err,
Expand Down Expand Up @@ -652,12 +663,10 @@ func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConf

peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config)
if err != nil {
conn.Close()
return err
}
peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr()))
if err = sw.AddPeer(peer); err != nil {
conn.Close()
return err
}

Expand Down

0 comments on commit a480139

Please sign in to comment.