Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deduplicate peer lookup #1739

Merged
merged 2 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion p2p/node/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (p *P2PNode) requestFromPeers(location common.Location, data interface{}, d
}).Debug("Requesting data from peers")

var requestWg sync.WaitGroup
for _, peerID := range peers {
for peerID := range peers {
requestWg.Add(1)
go func(peerID peer.ID) {
defer func() {
Expand Down
40 changes: 20 additions & 20 deletions p2p/node/peerManager/peerManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package peerManager

import (
"context"
"maps"
"net"
"runtime/debug"
"strings"
Expand Down Expand Up @@ -89,7 +90,7 @@ type PeerManager interface {
// Returns c_peerCount peers starting at the requested quality level of peers
// If there are not enough peers at the requested quality, it will return lower quality peers
// If there still aren't enough peers, it will query the DHT for more
GetPeers(location common.Location, data interface{}, quality PeerQuality) []p2p.PeerID
GetPeers(location common.Location, data interface{}, quality PeerQuality) map[p2p.PeerID]struct{}

// Increases the peer's liveliness score
MarkLivelyPeer(p2p.PeerID, common.Location)
Expand Down Expand Up @@ -296,8 +297,8 @@ func (pm *BasicPeerManager) GetSelfID() p2p.PeerID {
return pm.selfID
}

func (pm *BasicPeerManager) getPeersHelper(peerDB *peerdb.PeerDB, numPeers int) []p2p.PeerID {
peerSubset := make([]p2p.PeerID, 0, numPeers)
func (pm *BasicPeerManager) getPeersHelper(peerDB *peerdb.PeerDB, numPeers int) map[p2p.PeerID]struct{} {
peerSubset := make(map[p2p.PeerID]struct{})
q := query.Query{
Limit: numPeers,
}
Expand All @@ -311,14 +312,14 @@ func (pm *BasicPeerManager) getPeersHelper(peerDB *peerdb.PeerDB, numPeers int)
if err != nil {
return nil
}
peerSubset = append(peerSubset, peerID)
peerSubset[peerID] = struct{}{}
}

return peerSubset
}

func (pm *BasicPeerManager) GetPeers(location common.Location, data interface{}, quality PeerQuality) []p2p.PeerID {
var peerList []p2p.PeerID
func (pm *BasicPeerManager) GetPeers(location common.Location, data interface{}, quality PeerQuality) map[p2p.PeerID]struct{} {
var peerList map[p2p.PeerID]struct{}
switch quality {
case Best:
peerList = pm.getBestPeersWithFallback(location)
Expand All @@ -340,26 +341,26 @@ func (pm *BasicPeerManager) GetPeers(location common.Location, data interface{},
return pm.queryDHT(location, data, peerList, C_peerCount-lenPeer)
}

func (pm *BasicPeerManager) queryDHT(location common.Location, data interface{}, peerList []p2p.PeerID, peerCount int) []p2p.PeerID {
func (pm *BasicPeerManager) queryDHT(location common.Location, data interface{}, peerList map[p2p.PeerID]struct{}, peerCount int) map[p2p.PeerID]struct{} {
// create a Cid from the slice location
topicName, _ := pubsubManager.TopicName(pm.genesis, location, data)
shardCid := pubsubManager.TopicToCid(topicName)
topicCid := pubsubManager.TopicToCid(topicName)

// Internal list of peers from the dht
dhtPeers := make([]p2p.PeerID, 0, peerCount)
log.Global.Infof("Querying DHT for slice Cid %s", shardCid)
dhtPeers := make(map[p2p.PeerID]struct{})
log.Global.Infof("Querying DHT for slice Cid %s", topicCid)
// query the DHT for peers in the slice
// TODO: need to find providers of a topic, not a shard
for peer := range pm.dht.FindProvidersAsync(pm.ctx, shardCid, peerCount) {
for peer := range pm.dht.FindProvidersAsync(pm.ctx, topicCid, peerCount) {
if peer.ID != pm.selfID {
dhtPeers = append(dhtPeers, peer.ID)
dhtPeers[peer.ID] = struct{}{}
}
}
log.Global.Warn("Found the following peers from the DHT: ", dhtPeers)
return append(peerList, dhtPeers...)
maps.Copy(peerList, dhtPeers)
return peerList
}

func (pm *BasicPeerManager) getBestPeersWithFallback(location common.Location) []p2p.PeerID {
func (pm *BasicPeerManager) getBestPeersWithFallback(location common.Location) map[p2p.PeerID]struct{} {
locName := location.Name()
if pm.peerDBs[locName] == nil {
// There have not been any peers added to this topic
Expand All @@ -369,27 +370,26 @@ func (pm *BasicPeerManager) getBestPeersWithFallback(location common.Location) [
bestPeersCount := pm.peerDBs[locName][Best].GetPeerCount()
if bestPeersCount < C_peerCount {
bestPeerList := pm.getPeersHelper(pm.peerDBs[locName][Best], bestPeersCount)
bestPeerList = append(bestPeerList, pm.getResponsivePeersWithFallback(location)...)
maps.Copy(bestPeerList, pm.getResponsivePeersWithFallback(location))
return bestPeerList
wizeguyy marked this conversation as resolved.
Show resolved Hide resolved
}
return pm.getPeersHelper(pm.peerDBs[locName][Best], C_peerCount)
}

func (pm *BasicPeerManager) getResponsivePeersWithFallback(location common.Location) []p2p.PeerID {
func (pm *BasicPeerManager) getResponsivePeersWithFallback(location common.Location) map[p2p.PeerID]struct{} {
locName := location.Name()

responsivePeersCount := pm.peerDBs[locName][Responsive].GetPeerCount()
if responsivePeersCount < C_peerCount {
responsivePeerList := pm.getPeersHelper(pm.peerDBs[locName][Responsive], responsivePeersCount)
responsivePeerList = append(responsivePeerList, pm.getLastResortPeers(location)...)

maps.Copy(responsivePeerList, pm.getLastResortPeers(location))
return responsivePeerList
}
return pm.getPeersHelper(pm.peerDBs[locName][Responsive], C_peerCount)

}

func (pm *BasicPeerManager) getLastResortPeers(location common.Location) []p2p.PeerID {
func (pm *BasicPeerManager) getLastResortPeers(location common.Location) map[p2p.PeerID]struct{} {
return pm.getPeersHelper(pm.peerDBs[location.Name()][LastResort], C_peerCount)
}

Expand Down
Loading