Skip to content
This repository has been archived by the owner on Mar 21, 2024. It is now read-only.

Commit

Permalink
Merge pull request #813 from Bytom/p2p_test
Browse files Browse the repository at this point in the history
Change p2p messge send to trysend
  • Loading branch information
wliyongfeng authored Apr 24, 2018
2 parents cb36d4e + 55d2409 commit 7b9022e
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 8 deletions.
4 changes: 2 additions & 2 deletions netsync/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func (ps *peerSet) BroadcastMinedBlock(block *types.Block) ([]*peer, error) {
peers := ps.PeersWithoutBlock(&hash)
abnormalPeers := make([]*peer, 0)
for _, peer := range peers {
if ok := peer.swPeer.Send(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
if ok := peer.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
abnormalPeers = append(abnormalPeers, peer)
continue
}
Expand All @@ -369,7 +369,7 @@ func (ps *peerSet) BroadcastTx(tx *types.Tx) ([]*peer, error) {
peers := ps.PeersWithoutTx(&tx.ID)
abnormalPeers := make([]*peer, 0)
for _, peer := range peers {
if ok := peer.swPeer.Send(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
if ok := peer.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
abnormalPeers = append(abnormalPeers, peer)
continue
}
Expand Down
4 changes: 2 additions & 2 deletions netsync/protocol_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
if peer == nil {
return errPeerDropped
}
if ok := peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
return ErrStatusRequest
}
retryTicker := time.Tick(handshakeRetryTicker)
Expand All @@ -156,7 +156,7 @@ func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
if peer == nil {
return errPeerDropped
}
if ok := peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
return ErrStatusRequest
}
case <-handshakeWait.C:
Expand Down
12 changes: 8 additions & 4 deletions p2p/pex_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const (
PexChannel = byte(0x00)

// period to ensure peers connected
defaultEnsurePeersPeriod = 30 * time.Second
defaultEnsurePeersPeriod = 120 * time.Second
minNumOutboundPeers = 10
maxPexMessageSize = 1048576 // 1MB

Expand Down Expand Up @@ -164,12 +164,16 @@ func (r *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) {

// RequestPEX asks peer for more addresses.
func (r *PEXReactor) RequestPEX(p *Peer) {
p.Send(PexChannel, struct{ PexMessage }{&pexRequestMessage{}})
if ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexRequestMessage{}}); !ok {
r.sw.StopPeerGracefully(p)
}
}

// SendAddrs sends addrs to the peer.
func (r *PEXReactor) SendAddrs(p *Peer, addrs []*NetAddress) {
p.Send(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
if ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}}); !ok {
r.sw.StopPeerGracefully(p)
}
}

// SetEnsurePeersPeriod sets period to ensure peers connected.
Expand Down Expand Up @@ -203,7 +207,7 @@ func (r *PEXReactor) IncrementMsgCountForPeer(addr string) {
// Ensures that sufficient peers are connected. (continuous)
func (r *PEXReactor) ensurePeersRoutine() {
// Randomize when routine starts
ensurePeersPeriodMs := r.ensurePeersPeriod.Nanoseconds() / 1e6
ensurePeersPeriodMs := int64(10000)
time.Sleep(time.Duration(rand.Int63n(ensurePeersPeriodMs)) * time.Millisecond)

// fire once immediately.
Expand Down

0 comments on commit 7b9022e

Please sign in to comment.