Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streamline and cleanup peer querying in peer and stream managers #1663

Merged
merged 1 commit into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion p2p/node/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/dominant-strategies/go-quai/core/types"
"github.com/dominant-strategies/go-quai/log"
"github.com/dominant-strategies/go-quai/p2p"
"github.com/dominant-strategies/go-quai/p2p/peerManager"
quaiprotocol "github.com/dominant-strategies/go-quai/p2p/protocol"
"github.com/dominant-strategies/go-quai/quai"
"github.com/dominant-strategies/go-quai/trie"
Expand Down Expand Up @@ -129,7 +130,7 @@ func (p *P2PNode) requestFromPeers(location common.Location, data interface{}, d
}
}()
defer close(resultChan)
peers := p.peerManager.GetBestPeersWithFallback(location)
peers := p.peerManager.GetPeers(location, peerManager.Best)
log.Global.WithFields(log.Fields{
"peers": peers,
"location": location,
Expand Down
87 changes: 50 additions & 37 deletions p2p/peerManager/peerManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,21 @@ const (
// Represents the minimum ratio of positive to negative reports
// e.g. live_reports / latent_reports = 0.8
c_qualityThreshold = 0.8

// The number of peers to return when querying for peers
C_peerCount = 3
)

type PeerQuality int

const (
// Peer DB positions in the peerDBs slice
c_bestDBPos = iota
c_responseiveDBPos
c_lastResortDBPos

// Dir names for the peerDBs
c_bestDBName = "bestPeersDB"
c_responsiveDBName = "responsivePeersDB"
c_lastResortDBName = "lastResortPeersDB"
Best PeerQuality = iota
Responsive
LastResort
)

var (
dbNames = [3]string{"bestPeersDB", "responsivePeersDB", "lastResortPeersDB"}
)

// PeerManager is an interface that extends libp2p Connection Manager and Gater
Expand Down Expand Up @@ -72,12 +75,9 @@ type PeerManager interface {
// Returns an existing stream with that peer or opens a new one
GetStream(p peer.ID) (network.Stream, error)

// Returns c_recipientCount of the highest quality peers: lively & responsive
GetBestPeersWithFallback(common.Location) []p2p.PeerID
// Returns c_recipientCount responsive, but less lively peers
GetResponsivePeersWithFallback(common.Location) []p2p.PeerID
// Returns c_recipientCount peers regardless of status
GetLastResortPeers(common.Location) []p2p.PeerID
// Returns c_peerCount peers starting at the requested quality level of peers
// If there are not enough peers at the requested quality, it will return lower quality peers
GetPeers(common.Location, PeerQuality) []p2p.PeerID

// Increases the peer's liveliness score
MarkLivelyPeer(p2p.PeerID, common.Location)
Expand Down Expand Up @@ -147,22 +147,22 @@ func NewManager(ctx context.Context, low int, high int, datastore datastore.Data
continue
}
peerDBs[domLocName] = make([]*peerdb.PeerDB, 3)
peerDBs[domLocName][c_bestDBPos], err = peerdb.NewPeerDB(c_bestDBName, domLocName)
peerDBs[domLocName][Best], err = peerdb.NewPeerDB(dbNames[Best], domLocName)
if err != nil {
return nil, err
}
peerDBs[domLocName][c_responseiveDBPos], err = peerdb.NewPeerDB(c_responsiveDBName, domLocName)
peerDBs[domLocName][Responsive], err = peerdb.NewPeerDB(dbNames[Responsive], domLocName)
if err != nil {
return nil, err
}
peerDBs[domLocName][c_lastResortDBPos], err = peerdb.NewPeerDB(c_lastResortDBName, domLocName)
peerDBs[domLocName][LastResort], err = peerdb.NewPeerDB(dbNames[LastResort], domLocName)
if err != nil {
return nil, err
}
}
}

streamManager, err := streamManager.NewStreamManager()
streamManager, err := streamManager.NewStreamManager(C_peerCount)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -210,7 +210,7 @@ func NewManager(ctx context.Context, low int, high int, datastore datastore.Data
logger.WithFields(log.Fields{
"location": locName,
"peerCount": len(locPeer),
"bucket": dbName,
"bucket": dbNames[dbName],
"peers": locPeer}).Info("Peer stats")
}
}
Expand Down Expand Up @@ -296,38 +296,51 @@ func (pm *BasicPeerManager) getPeersHelper(peerDB *peerdb.PeerDB, numPeers int)
return peerSubset
}

func (pm *BasicPeerManager) GetBestPeersWithFallback(location common.Location) []p2p.PeerID {
func (pm *BasicPeerManager) GetPeers(location common.Location, quality PeerQuality) []p2p.PeerID {
switch quality {
case Best:
return pm.getBestPeersWithFallback(location)
case Responsive:
return pm.getResponsivePeersWithFallback(location)
case LastResort:
return pm.getLastResortPeers(location)
default:
panic("Invalid peer quality")
}
}

func (pm *BasicPeerManager) getBestPeersWithFallback(location common.Location) []p2p.PeerID {
locName := location.Name()
if pm.peerDBs[locName] == nil {
// There have not been any peers added to this topic
return nil
}

bestPeersCount := pm.peerDBs[locName][c_bestDBPos].GetPeerCount()
if bestPeersCount < streamManager.C_peerCount {
bestPeerList := pm.getPeersHelper(pm.peerDBs[locName][c_bestDBPos], bestPeersCount)
bestPeerList = append(bestPeerList, pm.GetResponsivePeersWithFallback(location)...)
bestPeersCount := pm.peerDBs[locName][Best].GetPeerCount()
if bestPeersCount < C_peerCount {
bestPeerList := pm.getPeersHelper(pm.peerDBs[locName][Best], bestPeersCount)
bestPeerList = append(bestPeerList, pm.getResponsivePeersWithFallback(location)...)
return bestPeerList
}
return pm.getPeersHelper(pm.peerDBs[locName][c_bestDBPos], streamManager.C_peerCount)
return pm.getPeersHelper(pm.peerDBs[locName][Best], C_peerCount)
}

func (pm *BasicPeerManager) GetResponsivePeersWithFallback(location common.Location) []p2p.PeerID {
func (pm *BasicPeerManager) getResponsivePeersWithFallback(location common.Location) []p2p.PeerID {
locName := location.Name()

responsivePeersCount := pm.peerDBs[locName][c_responseiveDBPos].GetPeerCount()
if responsivePeersCount < streamManager.C_peerCount {
responsivePeerList := pm.getPeersHelper(pm.peerDBs[locName][c_responseiveDBPos], responsivePeersCount)
responsivePeerList = append(responsivePeerList, pm.GetLastResortPeers(location)...)
responsivePeersCount := pm.peerDBs[locName][Responsive].GetPeerCount()
if responsivePeersCount < C_peerCount {
responsivePeerList := pm.getPeersHelper(pm.peerDBs[locName][Responsive], responsivePeersCount)
responsivePeerList = append(responsivePeerList, pm.getLastResortPeers(location)...)

return responsivePeerList
}
return pm.getPeersHelper(pm.peerDBs[locName][c_responseiveDBPos], streamManager.C_peerCount)
return pm.getPeersHelper(pm.peerDBs[locName][Responsive], C_peerCount)

}

func (pm *BasicPeerManager) GetLastResortPeers(location common.Location) []p2p.PeerID {
return pm.getPeersHelper(pm.peerDBs[location.Name()][c_lastResortDBPos], streamManager.C_peerCount)
func (pm *BasicPeerManager) getLastResortPeers(location common.Location) []p2p.PeerID {
return pm.getPeersHelper(pm.peerDBs[location.Name()][LastResort], C_peerCount)
}

func (pm *BasicPeerManager) MarkLivelyPeer(peer p2p.PeerID, location common.Location) {
Expand Down Expand Up @@ -412,21 +425,21 @@ func (pm *BasicPeerManager) recategorizePeer(peerID p2p.PeerID, location common.
locationName := location.Name()
if liveness >= c_qualityThreshold && responsiveness >= c_qualityThreshold {
// Best peers: high liveness and responsiveness
err := pm.peerDBs[locationName][c_bestDBPos].Put(pm.ctx, key, peerInfo)
err := pm.peerDBs[locationName][Best].Put(pm.ctx, key, peerInfo)
if err != nil {
return errors.Wrap(err, "error putting peer in bestPeersDB")
}

} else if responsiveness >= c_qualityThreshold {
// Responsive peers: high responsiveness, but low liveness
err := pm.peerDBs[locationName][c_responseiveDBPos].Put(pm.ctx, key, peerInfo)
err := pm.peerDBs[locationName][Responsive].Put(pm.ctx, key, peerInfo)
if err != nil {
return errors.Wrap(err, "error putting peer in responsivePeersDB")
}

} else {
// All other peers
err := pm.peerDBs[locationName][c_lastResortDBPos].Put(pm.ctx, key, peerInfo)
err := pm.peerDBs[locationName][LastResort].Put(pm.ctx, key, peerInfo)
if err != nil {
return errors.Wrap(err, "error putting peer in allPeersDB")
}
Expand Down
7 changes: 2 additions & 5 deletions p2p/streamManager/streamManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ import (
)

const (
// The number of peers to return when querying for peers
C_peerCount = 3

// The amount of redundancy for open streams
// c_peerCount * c_streamReplicationFactor = total number of open streams
c_streamReplicationFactor = 3
Expand Down Expand Up @@ -45,9 +42,9 @@ type basicStreamManager struct {
mu sync.Mutex
}

func NewStreamManager() (StreamManager, error) {
func NewStreamManager(peerCount int) (StreamManager, error) {
lruCache, err := lru.NewWithEvict(
C_peerCount*c_streamReplicationFactor,
peerCount*c_streamReplicationFactor,
severStream,
)
if err != nil {
Expand Down
Loading