From 20c28334fac1cb5624fba736606e715244a2f319 Mon Sep 17 00:00:00 2001 From: wizeguyy Date: Thu, 18 Jul 2024 12:33:33 -0500 Subject: [PATCH] Unified peer quality score --- p2p/node/api.go | 45 ++-------- p2p/node/p2p_services.go | 3 +- p2p/node/peerManager/peerManager.go | 133 ++++++++++++---------------- p2p/quality_scores.go | 27 ++++++ quai/interface.go | 7 +- quai/p2p_backend.go | 7 +- 6 files changed, 98 insertions(+), 124 deletions(-) create mode 100644 p2p/quality_scores.go diff --git a/p2p/node/api.go b/p2p/node/api.go index 7e5d092fe5..657c8ea6f7 100644 --- a/p2p/node/api.go +++ b/p2p/node/api.go @@ -190,8 +190,11 @@ func (p *P2PNode) requestAndWait(peerID peer.ID, topic *pubsubManager.Topic, req "topic": topic.String(), }).Trace("Received data from peer") - // Mark this peer as behaving well - p.peerManager.MarkResponsivePeer(peerID, topic) + if recvd == nil { + p.peerManager.AdjustPeerQuality(peerID, p2p.QualityAdjOnNack) + } else { + p.peerManager.AdjustPeerQuality(peerID, p2p.QualityAdjOnResponse) + } select { case resultChan <- recvd: // Data sent successfully @@ -218,7 +221,7 @@ func (p *P2PNode) requestAndWait(peerID peer.ID, topic *pubsubManager.Topic, req "err": err, }).Error("Error requesting the data from peer") // Mark this peer as not responding - p.peerManager.MarkUnresponsivePeer(peerID, topic) + p.peerManager.AdjustPeerQuality(peerID, p2p.QualityAdjOnTimeout) } } @@ -252,40 +255,8 @@ func (p *P2PNode) Request(location common.Location, requestData interface{}, res return resultChan } -func (p *P2PNode) MarkLivelyPeer(peer p2p.PeerID, topic string) { - log.Global.WithFields(log.Fields{ - "peer": peer, - "topic": topic, - }).Debug("Recording well-behaving peer") - - t, err := pubsubManager.TopicFromString(topic) - if err != nil { - log.Global.WithFields(log.Fields{ - "topic": topic, - "err": err, - }).Error("Error getting topic name") - panic(err) - } - - p.peerManager.MarkLivelyPeer(peer, t) -} - -func (p *P2PNode) MarkLatentPeer(peer p2p.PeerID, topic string) { - log.Global.WithFields(log.Fields{ - "peer": peer, - "topic": topic, - }).Debug("Recording misbehaving peer") - - t, err := pubsubManager.TopicFromString(topic) - if err != nil { - log.Global.WithFields(log.Fields{ - "topic": topic, - "err": err, - }).Error("Error getting topic name") - panic(err) - } - - p.peerManager.MarkLatentPeer(peer, t) +func (p *P2PNode) AdjustPeerQuality(peer p2p.PeerID, adjFn func(int) int) { + p.peerManager.AdjustPeerQuality(peer, adjFn) } func (p *P2PNode) ProtectPeer(peer p2p.PeerID) { diff --git a/p2p/node/p2p_services.go b/p2p/node/p2p_services.go index 5c50e3001d..b18a04788d 100644 --- a/p2p/node/p2p_services.go +++ b/p2p/node/p2p_services.go @@ -13,6 +13,7 @@ import ( "github.com/dominant-strategies/go-quai/common" "github.com/dominant-strategies/go-quai/core/types" "github.com/dominant-strategies/go-quai/log" + "github.com/dominant-strategies/go-quai/p2p" "github.com/dominant-strategies/go-quai/p2p/node/peerManager" "github.com/dominant-strategies/go-quai/p2p/node/pubsubManager" "github.com/dominant-strategies/go-quai/p2p/node/requestManager" @@ -73,7 +74,7 @@ func (p *P2PNode) requestFromPeer(peerID peer.ID, topic *pubsubManager.Topic, re "requestID": id, "peerId": peerID, }).Warn("Peer did not respond in time") - p.peerManager.MarkUnresponsivePeer(peerID, topic) + p.peerManager.AdjustPeerQuality(peerID, p2p.QualityAdjOnTimeout) return nil, errors.New("peer did not respond in time") } diff --git a/p2p/node/peerManager/peerManager.go b/p2p/node/peerManager/peerManager.go index 3be7afc3e4..c3e23e1a20 100644 --- a/p2p/node/peerManager/peerManager.go +++ b/p2p/node/peerManager/peerManager.go @@ -43,7 +43,8 @@ import ( const ( // Represents the minimum ratio of positive to negative reports // e.g. live_reports / latent_reports = 0.8 - c_qualityThreshold = 0.8 + c_bestThreshold = 0.8 + c_worstThreshold = 0.2 // c_minBestPeersFromDb is the number of peers we want to randomly read from the db c_minBestPeersFromDb = 6 @@ -108,15 +109,8 @@ type PeerManager interface { // RefreshBootpeers returns all the current bootpeers for bootstrapping RefreshBootpeers() []peer.AddrInfo - // Increases the peer's liveliness score - MarkLivelyPeer(peerID p2p.PeerID, topic *pubsubManager.Topic) - // Decreases the peer's liveliness score - MarkLatentPeer(peerID p2p.PeerID, topic *pubsubManager.Topic) - - // Increases the peer's liveliness score. Not exposed outside of NetworkingAPI - MarkResponsivePeer(peerID p2p.PeerID, topic *pubsubManager.Topic) - // Decreases the peer's liveliness score. Not exposed outside of NetworkingAPI - MarkUnresponsivePeer(peerID p2p.PeerID, topic *pubsubManager.Topic) + // Adjust the quality score of a peer by applying the given adjustment function + AdjustPeerQuality(p2p.PeerID, func(int) int) // Protects the peer's connection from being disconnected ProtectPeer(p2p.PeerID) @@ -403,6 +397,21 @@ func (pm *BasicPeerManager) removePeerFromTopic(peerID p2p.PeerID, topicStr stri return nil } +// Looks up which topics a peer participates in +func (pm *BasicPeerManager) getPeerTopics(peerID p2p.PeerID) []string { + key := datastore.NewKey(peerID.String()) + qualities := []PeerQuality{Best, Responsive, LastResort} + topics := []string{} + for topic, dbs := range pm.peerDBs { + for quality := range qualities { + if exists, _ := dbs[quality].Has(pm.ctx, key); exists { + topics = append(topics, topic) + } + } + } + return topics +} + func (pm *BasicPeerManager) SetSelfID(selfID p2p.PeerID) { pm.selfID = selfID } @@ -504,20 +513,12 @@ func (pm *BasicPeerManager) getLastResortPeers(topic *pubsubManager.Topic) map[p return pm.getPeersHelper(pm.peerDBs[topic.String()][LastResort], c_minLastResortPeersFromDb) } -func (pm *BasicPeerManager) MarkLivelyPeer(peer p2p.PeerID, topic *pubsubManager.Topic) { - if peer == pm.selfID { - return - } - pm.TagPeer(peer, "liveness_reports", 1) - pm.recategorizePeer(peer, topic) -} - -func (pm *BasicPeerManager) MarkLatentPeer(peer p2p.PeerID, topic *pubsubManager.Topic) { +func (pm *BasicPeerManager) AdjustPeerQuality(peer p2p.PeerID, adjFn func(int) int) { if peer == pm.selfID { return } - pm.TagPeer(peer, "latency_reports", 1) - pm.recategorizePeer(peer, topic) + pm.UpsertTag(peer, "quality", adjFn) + pm.recategorizePeer(peer) } func (pm *BasicPeerManager) calculatePeerLiveness(peer p2p.PeerID) float64 { @@ -531,16 +532,6 @@ func (pm *BasicPeerManager) calculatePeerLiveness(peer p2p.PeerID) float64 { return float64(liveness) / float64(latents) } -func (pm *BasicPeerManager) MarkResponsivePeer(peer p2p.PeerID, topic *pubsubManager.Topic) { - pm.TagPeer(peer, "responses_served", 1) - pm.recategorizePeer(peer, topic) -} - -func (pm *BasicPeerManager) MarkUnresponsivePeer(peer p2p.PeerID, topic *pubsubManager.Topic) { - pm.TagPeer(peer, "responses_missed", 1) - pm.recategorizePeer(peer, topic) -} - func (pm *BasicPeerManager) calculatePeerResponsiveness(peer p2p.PeerID) float64 { peerTag := pm.GetTagInfo(peer) if peerTag == nil { @@ -551,57 +542,49 @@ func (pm *BasicPeerManager) calculatePeerResponsiveness(peer p2p.PeerID) float64 return float64(responses) / float64(misses) } -// Each peer can only be in one of the following buckets: -// 1. bestPeers -// - peers with high liveness and responsiveness -// -// 2. responsivePeers -// - peers with high responsiveness, but low liveness -// -// 3. peers -// - all other peers -func (pm *BasicPeerManager) recategorizePeer(peerID p2p.PeerID, topic *pubsubManager.Topic) error { - liveness := pm.calculatePeerLiveness(peerID) - responsiveness := pm.calculatePeerResponsiveness(peerID) - - // remove peer from DB first - err := pm.removePeerFromTopic(peerID, topic.String()) - if err != nil { - return err - } - - key := datastore.NewKey(peerID.String()) - // TODO: construct peerDB.PeerInfo and marshal it to bytes - peerInfo, err := proto.Marshal((&peerdb.PeerInfo{ - AddrInfo: peerdb.AddrInfo{ - AddrInfo: peer.AddrInfo{ - ID: peerID, - }, - }, - }).ProtoEncode()) - if err != nil { - return errors.Wrap(err, "error marshaling peer info") - } +// Peers will be divided into three buckets (good, bad, ugly) based on their quality score +func (pm *BasicPeerManager) recategorizePeer(peerID p2p.PeerID) error { + peerQuality := pm.GetTagInfo(peerID).Tags["quality"] + topics := pm.getPeerTopics(peerID) - if liveness >= c_qualityThreshold && responsiveness >= c_qualityThreshold { - // Best peers: high liveness and responsiveness - err := pm.peerDBs[topic.String()][Best].Put(pm.ctx, key, peerInfo) + // remove peer from DBs first + for _, topic := range topics { + err := pm.removePeerFromTopic(peerID, topic) if err != nil { - return errors.Wrap(err, "error putting peer in bestPeersDB") + return err } - } else if responsiveness >= c_qualityThreshold { - // Responsive peers: high responsiveness, but low liveness - err := pm.peerDBs[topic.String()][Responsive].Put(pm.ctx, key, peerInfo) + key := datastore.NewKey(peerID.String()) + peerInfo, err := proto.Marshal((&peerdb.PeerInfo{ + AddrInfo: peerdb.AddrInfo{ + AddrInfo: peer.AddrInfo{ + ID: peerID, + }, + }, + }).ProtoEncode()) if err != nil { - return errors.Wrap(err, "error putting peer in responsivePeersDB") + return errors.Wrap(err, "error marshaling peer info") } - } else { - // All other peers - err := pm.peerDBs[topic.String()][LastResort].Put(pm.ctx, key, peerInfo) - if err != nil { - return errors.Wrap(err, "error putting peer in allPeersDB") + if peerQuality > c_bestThreshold*p2p.MaxScore { + // Best peers: high liveness and responsiveness + err := pm.peerDBs[topic][Best].Put(pm.ctx, key, peerInfo) + if err != nil { + return errors.Wrap(err, "error putting peer in bestPeersDB") + } + + } else if peerQuality >= c_worstThreshold*p2p.MaxScore { + err := pm.peerDBs[topic][Responsive].Put(pm.ctx, key, peerInfo) + if err != nil { + return errors.Wrap(err, "error putting peer in responsivePeersDB") + } + + } else { + // All other peers + err := pm.peerDBs[topic][LastResort].Put(pm.ctx, key, peerInfo) + if err != nil { + return errors.Wrap(err, "error putting peer in allPeersDB") + } } } diff --git a/p2p/quality_scores.go b/p2p/quality_scores.go new file mode 100644 index 0000000000..2f10f2cf93 --- /dev/null +++ b/p2p/quality_scores.go @@ -0,0 +1,27 @@ +package p2p + +const MinScore = 0 +const MaxScore = 1000 + +func boundedAdj(current int, adj int) int { + current += adj + if current > MaxScore { + current = MaxScore + } else if current < MinScore { + current = MinScore + } + return current +} + +func QualityAdjOnBroadcast(current int) int { + return boundedAdj(current, 1) +} +func QualityAdjOnResponse(current int) int { + return boundedAdj(current, 10) +} +func QualityAdjOnNack(current int) int { + return boundedAdj(current, -10) +} +func QualityAdjOnTimeout(current int) int { + return boundedAdj(current, -20) +} diff --git a/quai/interface.go b/quai/interface.go index 39cb1b88a3..ad397acbc7 100644 --- a/quai/interface.go +++ b/quai/interface.go @@ -86,11 +86,8 @@ type NetworkingAPI interface { // Specify location, data hash, and data type to request Request(location common.Location, requestData interface{}, responseDataType interface{}) chan interface{} - // Methods to report a peer to the P2PClient as behaving maliciously - // Should be called whenever a peer sends us data that is acceptably lively - MarkLivelyPeer(peerID core.PeerID, topic string) - // Should be called whenever a peer sends us data that is stale or latent - MarkLatentPeer(peerID core.PeerID, topic string) + // Adjust a peer's quality score + AdjustPeerQuality(core.PeerID, func(int) int) // Protects the peer's connection from being pruned ProtectPeer(core.PeerID) diff --git a/quai/p2p_backend.go b/quai/p2p_backend.go index f82053fb8e..3ccb202be5 100644 --- a/quai/p2p_backend.go +++ b/quai/p2p_backend.go @@ -102,6 +102,7 @@ func (qbe *QuaiBackend) GetBackend(location common.Location) *quaiapi.Backend { // Handle consensus data propagated to us from our peers func (qbe *QuaiBackend) OnNewBroadcast(sourcePeer p2p.PeerID, Id string, topic string, data interface{}, nodeLocation common.Location) bool { defer types.ObjectPool.Put(data) + qbe.p2pBackend.AdjustPeerQuality(sourcePeer, p2p.QualityAdjOnBroadcast) switch data := data.(type) { case types.WorkObjectBlockView: backend := *qbe.GetBackend(nodeLocation) @@ -115,8 +116,6 @@ func (qbe *QuaiBackend) OnNewBroadcast(sourcePeer p2p.PeerID, Id string, topic s backend.WriteBlock(data.WorkObject) blockIngressCounter.Inc() - // If it was a good broadcast, mark the peer as lively - qbe.p2pBackend.MarkLivelyPeer(sourcePeer, topic) case types.WorkObjectHeaderView: backend := *qbe.GetBackend(nodeLocation) if backend == nil { @@ -129,8 +128,6 @@ func (qbe *QuaiBackend) OnNewBroadcast(sourcePeer p2p.PeerID, Id string, topic s } headerIngressCounter.Inc() - // If it was a good broadcast, mark the peer as lively - qbe.p2pBackend.MarkLivelyPeer(sourcePeer, topic) case types.WorkObjectShareView: backend := *qbe.GetBackend(nodeLocation) if backend == nil { @@ -177,8 +174,6 @@ func (qbe *QuaiBackend) OnNewBroadcast(sourcePeer p2p.PeerID, Id string, topic s workShareIngressCounter.Inc() txIngressCounter.Add(float64(len(data.WorkObject.Transactions()))) } - // If it was a good broadcast, mark the peer as lively - qbe.p2pBackend.MarkLivelyPeer(sourcePeer, topic) default: log.Global.WithFields(log.Fields{ "peer": sourcePeer,