Skip to content

Commit

Permalink
Streamline and cleanup peer querying in peer and stream managers
Browse files Browse the repository at this point in the history
  • Loading branch information
Djadih committed Apr 25, 2024
1 parent f3df01c commit 1af52aa
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 43 deletions.
3 changes: 2 additions & 1 deletion p2p/node/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/dominant-strategies/go-quai/core/types"
"github.com/dominant-strategies/go-quai/log"
"github.com/dominant-strategies/go-quai/p2p"
"github.com/dominant-strategies/go-quai/p2p/peerManager"
quaiprotocol "github.com/dominant-strategies/go-quai/p2p/protocol"
"github.com/dominant-strategies/go-quai/quai"
"github.com/dominant-strategies/go-quai/trie"
Expand Down Expand Up @@ -112,7 +113,7 @@ func (p *P2PNode) Stop() error {
func (p *P2PNode) requestFromPeers(location common.Location, data interface{}, datatype interface{}, resultChan chan interface{}) {
go func() {
defer close(resultChan)
peers := p.peerManager.GetBestPeersWithFallback(location)
peers := p.peerManager.GetPeers(location, peerManager.Best)
log.Global.WithFields(log.Fields{
"peers": peers,
"location": location,
Expand Down
87 changes: 50 additions & 37 deletions p2p/peerManager/peerManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,21 @@ const (
// Represents the minimum ratio of positive to negative reports
// e.g. live_reports / latent_reports = 0.8
c_qualityThreshold = 0.8

// The number of peers to return when querying for peers
C_peerCount = 3
)

type PeerQuality int

const (
// Peer DB positions in the peerDBs slice
c_bestDBPos = iota
c_responseiveDBPos
c_lastResortDBPos

// Dir names for the peerDBs
c_bestDBName = "bestPeersDB"
c_responsiveDBName = "responsivePeersDB"
c_lastResortDBName = "lastResortPeersDB"
Best PeerQuality = iota
Responsive
LastResort
)

var (
dbNames = [3]string{"bestPeersDB", "responsivePeersDB", "lastResortPeersDB"}
)

// PeerManager is an interface that extends libp2p Connection Manager and Gater
Expand Down Expand Up @@ -71,12 +74,9 @@ type PeerManager interface {
// Returns an existing stream with that peer or opens a new one
GetStream(p peer.ID) (network.Stream, error)

// Returns c_recipientCount of the highest quality peers: lively & responsive
GetBestPeersWithFallback(common.Location) []p2p.PeerID
// Returns c_recipientCount responsive, but less lively peers
GetResponsivePeersWithFallback(common.Location) []p2p.PeerID
// Returns c_recipientCount peers regardless of status
GetLastResortPeers(common.Location) []p2p.PeerID
// Returns c_peerCount peers starting at the requested quality level of peers
// If there are not enough peers at the requested quality, it will return lower quality peers
GetPeers(common.Location, PeerQuality) []p2p.PeerID

// Increases the peer's liveliness score
MarkLivelyPeer(p2p.PeerID, common.Location)
Expand Down Expand Up @@ -146,22 +146,22 @@ func NewManager(ctx context.Context, low int, high int, datastore datastore.Data
continue
}
peerDBs[domLocName] = make([]*peerdb.PeerDB, 3)
peerDBs[domLocName][c_bestDBPos], err = peerdb.NewPeerDB(c_bestDBName, domLocName)
peerDBs[domLocName][Best], err = peerdb.NewPeerDB(dbNames[Best], domLocName)
if err != nil {
return nil, err
}
peerDBs[domLocName][c_responseiveDBPos], err = peerdb.NewPeerDB(c_responsiveDBName, domLocName)
peerDBs[domLocName][Responsive], err = peerdb.NewPeerDB(dbNames[Responsive], domLocName)
if err != nil {
return nil, err
}
peerDBs[domLocName][c_lastResortDBPos], err = peerdb.NewPeerDB(c_lastResortDBName, domLocName)
peerDBs[domLocName][LastResort], err = peerdb.NewPeerDB(dbNames[LastResort], domLocName)
if err != nil {
return nil, err
}
}
}

streamManager, err := streamManager.NewStreamManager()
streamManager, err := streamManager.NewStreamManager(C_peerCount)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -209,7 +209,7 @@ func NewManager(ctx context.Context, low int, high int, datastore datastore.Data
logger.WithFields(log.Fields{
"location": locName,
"peerCount": len(locPeer),
"bucket": dbName,
"bucket": dbNames[dbName],
"peers": locPeer}).Info("Peer stats")
}
}
Expand Down Expand Up @@ -295,38 +295,51 @@ func (pm *BasicPeerManager) getPeersHelper(peerDB *peerdb.PeerDB, numPeers int)
return peerSubset
}

func (pm *BasicPeerManager) GetBestPeersWithFallback(location common.Location) []p2p.PeerID {
func (pm *BasicPeerManager) GetPeers(location common.Location, quality PeerQuality) []p2p.PeerID {
switch quality {
case Best:
return pm.getBestPeersWithFallback(location)
case Responsive:
return pm.getResponsivePeersWithFallback(location)
case LastResort:
return pm.getLastResortPeers(location)
default:
return nil
}
}

func (pm *BasicPeerManager) getBestPeersWithFallback(location common.Location) []p2p.PeerID {
locName := location.Name()
if pm.peerDBs[locName] == nil {
// There have not been any peers added to this topic
return nil
}

bestPeersCount := pm.peerDBs[locName][c_bestDBPos].GetPeerCount()
if bestPeersCount < streamManager.C_peerCount {
bestPeerList := pm.getPeersHelper(pm.peerDBs[locName][c_bestDBPos], bestPeersCount)
bestPeerList = append(bestPeerList, pm.GetResponsivePeersWithFallback(location)...)
bestPeersCount := pm.peerDBs[locName][Best].GetPeerCount()
if bestPeersCount < C_peerCount {
bestPeerList := pm.getPeersHelper(pm.peerDBs[locName][Best], bestPeersCount)
bestPeerList = append(bestPeerList, pm.getResponsivePeersWithFallback(location)...)
return bestPeerList
}
return pm.getPeersHelper(pm.peerDBs[locName][c_bestDBPos], streamManager.C_peerCount)
return pm.getPeersHelper(pm.peerDBs[locName][Best], C_peerCount)
}

func (pm *BasicPeerManager) GetResponsivePeersWithFallback(location common.Location) []p2p.PeerID {
func (pm *BasicPeerManager) getResponsivePeersWithFallback(location common.Location) []p2p.PeerID {
locName := location.Name()

responsivePeersCount := pm.peerDBs[locName][c_responseiveDBPos].GetPeerCount()
if responsivePeersCount < streamManager.C_peerCount {
responsivePeerList := pm.getPeersHelper(pm.peerDBs[locName][c_responseiveDBPos], responsivePeersCount)
responsivePeerList = append(responsivePeerList, pm.GetLastResortPeers(location)...)
responsivePeersCount := pm.peerDBs[locName][Responsive].GetPeerCount()
if responsivePeersCount < C_peerCount {
responsivePeerList := pm.getPeersHelper(pm.peerDBs[locName][Responsive], responsivePeersCount)
responsivePeerList = append(responsivePeerList, pm.getLastResortPeers(location)...)

return responsivePeerList
}
return pm.getPeersHelper(pm.peerDBs[locName][c_responseiveDBPos], streamManager.C_peerCount)
return pm.getPeersHelper(pm.peerDBs[locName][Responsive], C_peerCount)

}

func (pm *BasicPeerManager) GetLastResortPeers(location common.Location) []p2p.PeerID {
return pm.getPeersHelper(pm.peerDBs[location.Name()][c_lastResortDBPos], streamManager.C_peerCount)
func (pm *BasicPeerManager) getLastResortPeers(location common.Location) []p2p.PeerID {
return pm.getPeersHelper(pm.peerDBs[location.Name()][LastResort], C_peerCount)
}

func (pm *BasicPeerManager) MarkLivelyPeer(peer p2p.PeerID, location common.Location) {
Expand Down Expand Up @@ -411,21 +424,21 @@ func (pm *BasicPeerManager) recategorizePeer(peerID p2p.PeerID, location common.
locationName := location.Name()
if liveness >= c_qualityThreshold && responsiveness >= c_qualityThreshold {
// Best peers: high liveness and responsiveness
err := pm.peerDBs[locationName][c_bestDBPos].Put(pm.ctx, key, peerInfo)
err := pm.peerDBs[locationName][Best].Put(pm.ctx, key, peerInfo)
if err != nil {
return errors.Wrap(err, "error putting peer in bestPeersDB")
}

} else if responsiveness >= c_qualityThreshold {
// Responsive peers: high responsiveness, but low liveness
err := pm.peerDBs[locationName][c_responseiveDBPos].Put(pm.ctx, key, peerInfo)
err := pm.peerDBs[locationName][Responsive].Put(pm.ctx, key, peerInfo)
if err != nil {
return errors.Wrap(err, "error putting peer in responsivePeersDB")
}

} else {
// All other peers
err := pm.peerDBs[locationName][c_lastResortDBPos].Put(pm.ctx, key, peerInfo)
err := pm.peerDBs[locationName][LastResort].Put(pm.ctx, key, peerInfo)
if err != nil {
return errors.Wrap(err, "error putting peer in allPeersDB")
}
Expand Down
7 changes: 2 additions & 5 deletions p2p/streamManager/streamManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ import (
)

const (
// The number of peers to return when querying for peers
C_peerCount = 3

// The amount of redundancy for open streams
// c_peerCount * c_streamReplicationFactor = total number of open streams
c_streamReplicationFactor = 3
Expand Down Expand Up @@ -45,9 +42,9 @@ type basicStreamManager struct {
mu sync.Mutex
}

func NewStreamManager() (StreamManager, error) {
func NewStreamManager(peerCount int) (StreamManager, error) {
lruCache, err := lru.NewWithEvict(
C_peerCount*c_streamReplicationFactor,
peerCount*c_streamReplicationFactor,
severStream,
)
if err != nil {
Expand Down

0 comments on commit 1af52aa

Please sign in to comment.