Skip to content

Commit

Permalink
Unified peer quality score
Browse files Browse the repository at this point in the history
  • Loading branch information
wizeguyy committed Jul 18, 2024
1 parent 4bd5071 commit 20c2833
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 124 deletions.
45 changes: 8 additions & 37 deletions p2p/node/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,11 @@ func (p *P2PNode) requestAndWait(peerID peer.ID, topic *pubsubManager.Topic, req
"topic": topic.String(),
}).Trace("Received data from peer")

// Mark this peer as behaving well
p.peerManager.MarkResponsivePeer(peerID, topic)
if recvd == nil {
p.peerManager.AdjustPeerQuality(peerID, p2p.QualityAdjOnNack)
} else {
p.peerManager.AdjustPeerQuality(peerID, p2p.QualityAdjOnResponse)
}
select {
case resultChan <- recvd:
// Data sent successfully
Expand All @@ -218,7 +221,7 @@ func (p *P2PNode) requestAndWait(peerID peer.ID, topic *pubsubManager.Topic, req
"err": err,
}).Error("Error requesting the data from peer")
// Mark this peer as not responding
p.peerManager.MarkUnresponsivePeer(peerID, topic)
p.peerManager.AdjustPeerQuality(peerID, p2p.QualityAdjOnTimeout)
}
}

Expand Down Expand Up @@ -252,40 +255,8 @@ func (p *P2PNode) Request(location common.Location, requestData interface{}, res
return resultChan
}

func (p *P2PNode) MarkLivelyPeer(peer p2p.PeerID, topic string) {
log.Global.WithFields(log.Fields{
"peer": peer,
"topic": topic,
}).Debug("Recording well-behaving peer")

t, err := pubsubManager.TopicFromString(topic)
if err != nil {
log.Global.WithFields(log.Fields{
"topic": topic,
"err": err,
}).Error("Error getting topic name")
panic(err)
}

p.peerManager.MarkLivelyPeer(peer, t)
}

func (p *P2PNode) MarkLatentPeer(peer p2p.PeerID, topic string) {
log.Global.WithFields(log.Fields{
"peer": peer,
"topic": topic,
}).Debug("Recording misbehaving peer")

t, err := pubsubManager.TopicFromString(topic)
if err != nil {
log.Global.WithFields(log.Fields{
"topic": topic,
"err": err,
}).Error("Error getting topic name")
panic(err)
}

p.peerManager.MarkLatentPeer(peer, t)
func (p *P2PNode) AdjustPeerQuality(peer p2p.PeerID, adjFn func(int) int) {
p.peerManager.AdjustPeerQuality(peer, adjFn)
}

func (p *P2PNode) ProtectPeer(peer p2p.PeerID) {
Expand Down
3 changes: 2 additions & 1 deletion p2p/node/p2p_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/dominant-strategies/go-quai/common"
"github.com/dominant-strategies/go-quai/core/types"
"github.com/dominant-strategies/go-quai/log"
"github.com/dominant-strategies/go-quai/p2p"
"github.com/dominant-strategies/go-quai/p2p/node/peerManager"
"github.com/dominant-strategies/go-quai/p2p/node/pubsubManager"
"github.com/dominant-strategies/go-quai/p2p/node/requestManager"
Expand Down Expand Up @@ -73,7 +74,7 @@ func (p *P2PNode) requestFromPeer(peerID peer.ID, topic *pubsubManager.Topic, re
"requestID": id,
"peerId": peerID,
}).Warn("Peer did not respond in time")
p.peerManager.MarkUnresponsivePeer(peerID, topic)
p.peerManager.AdjustPeerQuality(peerID, p2p.QualityAdjOnTimeout)
return nil, errors.New("peer did not respond in time")
}

Expand Down
133 changes: 58 additions & 75 deletions p2p/node/peerManager/peerManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ import (
const (
// Represents the minimum ratio of positive to negative reports
// e.g. live_reports / latent_reports = 0.8
c_qualityThreshold = 0.8
c_bestThreshold = 0.8
c_worstThreshold = 0.2

// c_minBestPeersFromDb is the number of peers we want to randomly read from the db
c_minBestPeersFromDb = 6
Expand Down Expand Up @@ -108,15 +109,8 @@ type PeerManager interface {
// RefreshBootpeers returns all the current bootpeers for bootstrapping
RefreshBootpeers() []peer.AddrInfo

// Increases the peer's liveliness score
MarkLivelyPeer(peerID p2p.PeerID, topic *pubsubManager.Topic)
// Decreases the peer's liveliness score
MarkLatentPeer(peerID p2p.PeerID, topic *pubsubManager.Topic)

// Increases the peer's liveliness score. Not exposed outside of NetworkingAPI
MarkResponsivePeer(peerID p2p.PeerID, topic *pubsubManager.Topic)
// Decreases the peer's liveliness score. Not exposed outside of NetworkingAPI
MarkUnresponsivePeer(peerID p2p.PeerID, topic *pubsubManager.Topic)
// Adjust the quality score of a peer by applying the given adjustment function
AdjustPeerQuality(p2p.PeerID, func(int) int)

// Protects the peer's connection from being disconnected
ProtectPeer(p2p.PeerID)
Expand Down Expand Up @@ -403,6 +397,21 @@ func (pm *BasicPeerManager) removePeerFromTopic(peerID p2p.PeerID, topicStr stri
return nil
}

// Looks up which topics a peer participates in
func (pm *BasicPeerManager) getPeerTopics(peerID p2p.PeerID) []string {
key := datastore.NewKey(peerID.String())
qualities := []PeerQuality{Best, Responsive, LastResort}
topics := []string{}
for topic, dbs := range pm.peerDBs {
for quality := range qualities {
if exists, _ := dbs[quality].Has(pm.ctx, key); exists {
topics = append(topics, topic)
}
}
}
return topics
}

func (pm *BasicPeerManager) SetSelfID(selfID p2p.PeerID) {
pm.selfID = selfID
}
Expand Down Expand Up @@ -504,20 +513,12 @@ func (pm *BasicPeerManager) getLastResortPeers(topic *pubsubManager.Topic) map[p
return pm.getPeersHelper(pm.peerDBs[topic.String()][LastResort], c_minLastResortPeersFromDb)
}

func (pm *BasicPeerManager) MarkLivelyPeer(peer p2p.PeerID, topic *pubsubManager.Topic) {
if peer == pm.selfID {
return
}
pm.TagPeer(peer, "liveness_reports", 1)
pm.recategorizePeer(peer, topic)
}

func (pm *BasicPeerManager) MarkLatentPeer(peer p2p.PeerID, topic *pubsubManager.Topic) {
func (pm *BasicPeerManager) AdjustPeerQuality(peer p2p.PeerID, adjFn func(int) int) {
if peer == pm.selfID {
return
}
pm.TagPeer(peer, "latency_reports", 1)
pm.recategorizePeer(peer, topic)
pm.UpsertTag(peer, "quality", adjFn)
pm.recategorizePeer(peer)
}

func (pm *BasicPeerManager) calculatePeerLiveness(peer p2p.PeerID) float64 {
Expand All @@ -531,16 +532,6 @@ func (pm *BasicPeerManager) calculatePeerLiveness(peer p2p.PeerID) float64 {
return float64(liveness) / float64(latents)
}

func (pm *BasicPeerManager) MarkResponsivePeer(peer p2p.PeerID, topic *pubsubManager.Topic) {
pm.TagPeer(peer, "responses_served", 1)
pm.recategorizePeer(peer, topic)
}

func (pm *BasicPeerManager) MarkUnresponsivePeer(peer p2p.PeerID, topic *pubsubManager.Topic) {
pm.TagPeer(peer, "responses_missed", 1)
pm.recategorizePeer(peer, topic)
}

func (pm *BasicPeerManager) calculatePeerResponsiveness(peer p2p.PeerID) float64 {
peerTag := pm.GetTagInfo(peer)
if peerTag == nil {
Expand All @@ -551,57 +542,49 @@ func (pm *BasicPeerManager) calculatePeerResponsiveness(peer p2p.PeerID) float64
return float64(responses) / float64(misses)
}

// Each peer can only be in one of the following buckets:
// 1. bestPeers
// - peers with high liveness and responsiveness
//
// 2. responsivePeers
// - peers with high responsiveness, but low liveness
//
// 3. peers
// - all other peers
func (pm *BasicPeerManager) recategorizePeer(peerID p2p.PeerID, topic *pubsubManager.Topic) error {
liveness := pm.calculatePeerLiveness(peerID)
responsiveness := pm.calculatePeerResponsiveness(peerID)

// remove peer from DB first
err := pm.removePeerFromTopic(peerID, topic.String())
if err != nil {
return err
}

key := datastore.NewKey(peerID.String())
// TODO: construct peerDB.PeerInfo and marshal it to bytes
peerInfo, err := proto.Marshal((&peerdb.PeerInfo{
AddrInfo: peerdb.AddrInfo{
AddrInfo: peer.AddrInfo{
ID: peerID,
},
},
}).ProtoEncode())
if err != nil {
return errors.Wrap(err, "error marshaling peer info")
}
// Peers will be divided into three buckets (good, bad, ugly) based on their quality score
func (pm *BasicPeerManager) recategorizePeer(peerID p2p.PeerID) error {
peerQuality := pm.GetTagInfo(peerID).Tags["quality"]
topics := pm.getPeerTopics(peerID)

if liveness >= c_qualityThreshold && responsiveness >= c_qualityThreshold {
// Best peers: high liveness and responsiveness
err := pm.peerDBs[topic.String()][Best].Put(pm.ctx, key, peerInfo)
// remove peer from DBs first
for _, topic := range topics {
err := pm.removePeerFromTopic(peerID, topic)
if err != nil {
return errors.Wrap(err, "error putting peer in bestPeersDB")
return err
}

} else if responsiveness >= c_qualityThreshold {
// Responsive peers: high responsiveness, but low liveness
err := pm.peerDBs[topic.String()][Responsive].Put(pm.ctx, key, peerInfo)
key := datastore.NewKey(peerID.String())
peerInfo, err := proto.Marshal((&peerdb.PeerInfo{
AddrInfo: peerdb.AddrInfo{
AddrInfo: peer.AddrInfo{
ID: peerID,
},
},
}).ProtoEncode())
if err != nil {
return errors.Wrap(err, "error putting peer in responsivePeersDB")
return errors.Wrap(err, "error marshaling peer info")
}

} else {
// All other peers
err := pm.peerDBs[topic.String()][LastResort].Put(pm.ctx, key, peerInfo)
if err != nil {
return errors.Wrap(err, "error putting peer in allPeersDB")
if peerQuality > c_bestThreshold*p2p.MaxScore {
// Best peers: high liveness and responsiveness
err := pm.peerDBs[topic][Best].Put(pm.ctx, key, peerInfo)
if err != nil {
return errors.Wrap(err, "error putting peer in bestPeersDB")
}

} else if peerQuality >= c_worstThreshold*p2p.MaxScore {
err := pm.peerDBs[topic][Responsive].Put(pm.ctx, key, peerInfo)
if err != nil {
return errors.Wrap(err, "error putting peer in responsivePeersDB")
}

} else {
// All other peers
err := pm.peerDBs[topic][LastResort].Put(pm.ctx, key, peerInfo)
if err != nil {
return errors.Wrap(err, "error putting peer in allPeersDB")
}
}
}

Expand Down
27 changes: 27 additions & 0 deletions p2p/quality_scores.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package p2p

const MinScore = 0
const MaxScore = 1000

func boundedAdj(current int, adj int) int {
current += adj
if current > MaxScore {
current = MaxScore
} else if current < MinScore {
current = MinScore
}
return current
}

func QualityAdjOnBroadcast(current int) int {
return boundedAdj(current, 1)
}
func QualityAdjOnResponse(current int) int {
return boundedAdj(current, 10)
}
func QualityAdjOnNack(current int) int {
return boundedAdj(current, -10)
}
func QualityAdjOnTimeout(current int) int {
return boundedAdj(current, -20)
}
7 changes: 2 additions & 5 deletions quai/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,8 @@ type NetworkingAPI interface {
// Specify location, data hash, and data type to request
Request(location common.Location, requestData interface{}, responseDataType interface{}) chan interface{}

// Methods to report a peer to the P2PClient as behaving maliciously
// Should be called whenever a peer sends us data that is acceptably lively
MarkLivelyPeer(peerID core.PeerID, topic string)
// Should be called whenever a peer sends us data that is stale or latent
MarkLatentPeer(peerID core.PeerID, topic string)
// Adjust a peer's quality score
AdjustPeerQuality(core.PeerID, func(int) int)

// Protects the peer's connection from being pruned
ProtectPeer(core.PeerID)
Expand Down
7 changes: 1 addition & 6 deletions quai/p2p_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func (qbe *QuaiBackend) GetBackend(location common.Location) *quaiapi.Backend {
// Handle consensus data propagated to us from our peers
func (qbe *QuaiBackend) OnNewBroadcast(sourcePeer p2p.PeerID, Id string, topic string, data interface{}, nodeLocation common.Location) bool {
defer types.ObjectPool.Put(data)
qbe.p2pBackend.AdjustPeerQuality(sourcePeer, p2p.QualityAdjOnBroadcast)
switch data := data.(type) {
case types.WorkObjectBlockView:
backend := *qbe.GetBackend(nodeLocation)
Expand All @@ -115,8 +116,6 @@ func (qbe *QuaiBackend) OnNewBroadcast(sourcePeer p2p.PeerID, Id string, topic s
backend.WriteBlock(data.WorkObject)

blockIngressCounter.Inc()
// If it was a good broadcast, mark the peer as lively
qbe.p2pBackend.MarkLivelyPeer(sourcePeer, topic)
case types.WorkObjectHeaderView:
backend := *qbe.GetBackend(nodeLocation)
if backend == nil {
Expand All @@ -129,8 +128,6 @@ func (qbe *QuaiBackend) OnNewBroadcast(sourcePeer p2p.PeerID, Id string, topic s
}

headerIngressCounter.Inc()
// If it was a good broadcast, mark the peer as lively
qbe.p2pBackend.MarkLivelyPeer(sourcePeer, topic)
case types.WorkObjectShareView:
backend := *qbe.GetBackend(nodeLocation)
if backend == nil {
Expand Down Expand Up @@ -177,8 +174,6 @@ func (qbe *QuaiBackend) OnNewBroadcast(sourcePeer p2p.PeerID, Id string, topic s
workShareIngressCounter.Inc()
txIngressCounter.Add(float64(len(data.WorkObject.Transactions())))
}
// If it was a good broadcast, mark the peer as lively
qbe.p2pBackend.MarkLivelyPeer(sourcePeer, topic)
default:
log.Global.WithFields(log.Fields{
"peer": sourcePeer,
Expand Down

0 comments on commit 20c2833

Please sign in to comment.