diff --git a/p2p/node/api.go b/p2p/node/api.go index a9c4bb9a2d..a489dda18f 100644 --- a/p2p/node/api.go +++ b/p2p/node/api.go @@ -12,6 +12,7 @@ import ( "github.com/dominant-strategies/go-quai/core/types" "github.com/dominant-strategies/go-quai/log" "github.com/dominant-strategies/go-quai/p2p" + "github.com/dominant-strategies/go-quai/p2p/peerManager" quaiprotocol "github.com/dominant-strategies/go-quai/p2p/protocol" "github.com/dominant-strategies/go-quai/quai" "github.com/dominant-strategies/go-quai/trie" @@ -112,7 +113,7 @@ func (p *P2PNode) Stop() error { func (p *P2PNode) requestFromPeers(location common.Location, data interface{}, datatype interface{}, resultChan chan interface{}) { go func() { defer close(resultChan) - peers := p.peerManager.GetBestPeersWithFallback(location) + peers := p.peerManager.GetPeers(location, peerManager.Best) log.Global.WithFields(log.Fields{ "peers": peers, "location": location, diff --git a/p2p/peerManager/peerManager.go b/p2p/peerManager/peerManager.go index b14ee4c3df..d2734a7d98 100644 --- a/p2p/peerManager/peerManager.go +++ b/p2p/peerManager/peerManager.go @@ -31,18 +31,21 @@ const ( // Represents the minimum ratio of positive to negative reports // e.g. live_reports / latent_reports = 0.8 c_qualityThreshold = 0.8 + + // The number of peers to return when querying for peers + C_peerCount = 3 ) +type PeerQuality int + const ( - // Peer DB positions in the peerDBs slice - c_bestDBPos = iota - c_responseiveDBPos - c_lastResortDBPos - - // Dir names for the peerDBs - c_bestDBName = "bestPeersDB" - c_responsiveDBName = "responsivePeersDB" - c_lastResortDBName = "lastResortPeersDB" + Best PeerQuality = iota + Responsive + LastResort +) + +var ( + dbNames = [3]string{"bestPeersDB", "responsivePeersDB", "lastResortPeersDB"} ) // PeerManager is an interface that extends libp2p Connection Manager and Gater @@ -71,12 +74,9 @@ type PeerManager interface { // Returns an existing stream with that peer or opens a new one GetStream(p peer.ID) (network.Stream, error) - // Returns c_recipientCount of the highest quality peers: lively & responsive - GetBestPeersWithFallback(common.Location) []p2p.PeerID - // Returns c_recipientCount responsive, but less lively peers - GetResponsivePeersWithFallback(common.Location) []p2p.PeerID - // Returns c_recipientCount peers regardless of status - GetLastResortPeers(common.Location) []p2p.PeerID + // Returns c_peerCount peers starting at the requested quality level of peers + // If there are not enough peers at the requested quality, it will return lower quality peers + GetPeers(common.Location, PeerQuality) []p2p.PeerID // Increases the peer's liveliness score MarkLivelyPeer(p2p.PeerID, common.Location) @@ -146,22 +146,22 @@ func NewManager(ctx context.Context, low int, high int, datastore datastore.Data continue } peerDBs[domLocName] = make([]*peerdb.PeerDB, 3) - peerDBs[domLocName][c_bestDBPos], err = peerdb.NewPeerDB(c_bestDBName, domLocName) + peerDBs[domLocName][Best], err = peerdb.NewPeerDB(dbNames[Best], domLocName) if err != nil { return nil, err } - peerDBs[domLocName][c_responseiveDBPos], err = peerdb.NewPeerDB(c_responsiveDBName, domLocName) + peerDBs[domLocName][Responsive], err = peerdb.NewPeerDB(dbNames[Responsive], domLocName) if err != nil { return nil, err } - peerDBs[domLocName][c_lastResortDBPos], err = peerdb.NewPeerDB(c_lastResortDBName, domLocName) + peerDBs[domLocName][LastResort], err = peerdb.NewPeerDB(dbNames[LastResort], domLocName) if err != nil { return nil, err } } } - streamManager, err := streamManager.NewStreamManager() + streamManager, err := streamManager.NewStreamManager(C_peerCount) if err != nil { return nil, err } @@ -209,7 +209,7 @@ func NewManager(ctx context.Context, low int, high int, datastore datastore.Data logger.WithFields(log.Fields{ "location": locName, "peerCount": len(locPeer), - "bucket": dbName, + "bucket": dbNames[dbName], "peers": locPeer}).Info("Peer stats") } } @@ -295,38 +295,51 @@ func (pm *BasicPeerManager) getPeersHelper(peerDB *peerdb.PeerDB, numPeers int) return peerSubset } -func (pm *BasicPeerManager) GetBestPeersWithFallback(location common.Location) []p2p.PeerID { +func (pm *BasicPeerManager) GetPeers(location common.Location, quality PeerQuality) []p2p.PeerID { + switch quality { + case Best: + return pm.getBestPeersWithFallback(location) + case Responsive: + return pm.getResponsivePeersWithFallback(location) + case LastResort: + return pm.getLastResortPeers(location) + default: + return nil + } +} + +func (pm *BasicPeerManager) getBestPeersWithFallback(location common.Location) []p2p.PeerID { locName := location.Name() if pm.peerDBs[locName] == nil { // There have not been any peers added to this topic return nil } - bestPeersCount := pm.peerDBs[locName][c_bestDBPos].GetPeerCount() - if bestPeersCount < streamManager.C_peerCount { - bestPeerList := pm.getPeersHelper(pm.peerDBs[locName][c_bestDBPos], bestPeersCount) - bestPeerList = append(bestPeerList, pm.GetResponsivePeersWithFallback(location)...) + bestPeersCount := pm.peerDBs[locName][Best].GetPeerCount() + if bestPeersCount < C_peerCount { + bestPeerList := pm.getPeersHelper(pm.peerDBs[locName][Best], bestPeersCount) + bestPeerList = append(bestPeerList, pm.getResponsivePeersWithFallback(location)...) return bestPeerList } - return pm.getPeersHelper(pm.peerDBs[locName][c_bestDBPos], streamManager.C_peerCount) + return pm.getPeersHelper(pm.peerDBs[locName][Best], C_peerCount) } -func (pm *BasicPeerManager) GetResponsivePeersWithFallback(location common.Location) []p2p.PeerID { +func (pm *BasicPeerManager) getResponsivePeersWithFallback(location common.Location) []p2p.PeerID { locName := location.Name() - responsivePeersCount := pm.peerDBs[locName][c_responseiveDBPos].GetPeerCount() - if responsivePeersCount < streamManager.C_peerCount { - responsivePeerList := pm.getPeersHelper(pm.peerDBs[locName][c_responseiveDBPos], responsivePeersCount) - responsivePeerList = append(responsivePeerList, pm.GetLastResortPeers(location)...) + responsivePeersCount := pm.peerDBs[locName][Responsive].GetPeerCount() + if responsivePeersCount < C_peerCount { + responsivePeerList := pm.getPeersHelper(pm.peerDBs[locName][Responsive], responsivePeersCount) + responsivePeerList = append(responsivePeerList, pm.getLastResortPeers(location)...) return responsivePeerList } - return pm.getPeersHelper(pm.peerDBs[locName][c_responseiveDBPos], streamManager.C_peerCount) + return pm.getPeersHelper(pm.peerDBs[locName][Responsive], C_peerCount) } -func (pm *BasicPeerManager) GetLastResortPeers(location common.Location) []p2p.PeerID { - return pm.getPeersHelper(pm.peerDBs[location.Name()][c_lastResortDBPos], streamManager.C_peerCount) +func (pm *BasicPeerManager) getLastResortPeers(location common.Location) []p2p.PeerID { + return pm.getPeersHelper(pm.peerDBs[location.Name()][LastResort], C_peerCount) } func (pm *BasicPeerManager) MarkLivelyPeer(peer p2p.PeerID, location common.Location) { @@ -411,21 +424,21 @@ func (pm *BasicPeerManager) recategorizePeer(peerID p2p.PeerID, location common. locationName := location.Name() if liveness >= c_qualityThreshold && responsiveness >= c_qualityThreshold { // Best peers: high liveness and responsiveness - err := pm.peerDBs[locationName][c_bestDBPos].Put(pm.ctx, key, peerInfo) + err := pm.peerDBs[locationName][Best].Put(pm.ctx, key, peerInfo) if err != nil { return errors.Wrap(err, "error putting peer in bestPeersDB") } } else if responsiveness >= c_qualityThreshold { // Responsive peers: high responsiveness, but low liveness - err := pm.peerDBs[locationName][c_responseiveDBPos].Put(pm.ctx, key, peerInfo) + err := pm.peerDBs[locationName][Responsive].Put(pm.ctx, key, peerInfo) if err != nil { return errors.Wrap(err, "error putting peer in responsivePeersDB") } } else { // All other peers - err := pm.peerDBs[locationName][c_lastResortDBPos].Put(pm.ctx, key, peerInfo) + err := pm.peerDBs[locationName][LastResort].Put(pm.ctx, key, peerInfo) if err != nil { return errors.Wrap(err, "error putting peer in allPeersDB") } diff --git a/p2p/streamManager/streamManager.go b/p2p/streamManager/streamManager.go index 8c8bed4883..419334ca6c 100644 --- a/p2p/streamManager/streamManager.go +++ b/p2p/streamManager/streamManager.go @@ -15,9 +15,6 @@ import ( ) const ( - // The number of peers to return when querying for peers - C_peerCount = 3 - // The amount of redundancy for open streams // c_peerCount * c_streamReplicationFactor = total number of open streams c_streamReplicationFactor = 3 @@ -45,9 +42,9 @@ type basicStreamManager struct { mu sync.Mutex } -func NewStreamManager() (StreamManager, error) { +func NewStreamManager(peerCount int) (StreamManager, error) { lruCache, err := lru.NewWithEvict( - C_peerCount*c_streamReplicationFactor, + peerCount*c_streamReplicationFactor, severStream, ) if err != nil {