Skip to content

Commit

Permalink
Changing PeerDB and functions to filter on topics
Browse files Browse the repository at this point in the history
  • Loading branch information
Djadih committed May 7, 2024
1 parent 8492e9c commit 8b3ef66
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 89 deletions.
55 changes: 32 additions & 23 deletions p2p/node/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/dominant-strategies/go-quai/p2p"
"github.com/dominant-strategies/go-quai/p2p/peerManager"
quaiprotocol "github.com/dominant-strategies/go-quai/p2p/protocol"
"github.com/dominant-strategies/go-quai/p2p/pubsubManager"
"github.com/dominant-strategies/go-quai/quai"
"github.com/dominant-strategies/go-quai/trie"

Expand Down Expand Up @@ -122,7 +123,7 @@ func (p *P2PNode) Stop() error {
}
}

func (p *P2PNode) requestFromPeers(location common.Location, data interface{}, datatype interface{}, resultChan chan interface{}) {
func (p *P2PNode) requestFromPeers(topic string, location common.Location, requestData interface{}, responseDataType interface{}, resultChan chan interface{}) {
go func() {
defer func() {
if r := recover(); r != nil {
Expand All @@ -133,10 +134,10 @@ func (p *P2PNode) requestFromPeers(location common.Location, data interface{}, d
}
}()
defer close(resultChan)
peers := p.peerManager.GetPeers(location, data, peerManager.Best)
peers := p.peerManager.GetPeers(topic, peerManager.Best)
log.Global.WithFields(log.Fields{
"peers": peers,
"location": location,
"peers": peers,
"topic": topic,
}).Debug("Requesting data from peers")

var requestWg sync.WaitGroup
Expand All @@ -152,14 +153,14 @@ func (p *P2PNode) requestFromPeers(location common.Location, data interface{}, d
}
}()
defer requestWg.Done()
p.requestAndWait(peerID, location, data, datatype, resultChan)
p.requestAndWait(peerID, location, requestData, topic, requestData, resultChan)
}(peerID)
}
requestWg.Wait()
}()
}

func (p *P2PNode) requestAndWait(peerID peer.ID, location common.Location, data interface{}, dataType interface{}, resultChan chan interface{}) {
func (p *P2PNode) requestAndWait(peerID peer.ID, location common.Location, reqData interface{}, topic string, dataType interface{}, resultChan chan interface{}) {
defer func() {
if r := recover(); r != nil {
log.Global.WithFields(log.Fields{
Expand All @@ -170,34 +171,42 @@ func (p *P2PNode) requestAndWait(peerID peer.ID, location common.Location, data
}()
var recvd interface{}
var err error
// Ask peer and wait for response
if recvd, err = p.requestFromPeer(peerID, location, data, dataType); err == nil {
if recvd, err = p.requestFromPeer(peerID, topic, location, reqData, dataType); err == nil {
log.Global.WithFields(log.Fields{
"data": data,
"reqData": reqData,
"dataType": dataType,
"peerId": peerID,
"location": location.Name(),
}).Trace("Received data from peer")

// Mark this peer as behaving well
p.peerManager.MarkResponsivePeer(peerID, location)
p.peerManager.MarkResponsivePeer(peerID, topic)
} else {
log.Global.WithFields(log.Fields{
"peerId": peerID,
"location": location.Name(),
"data": data,
"reqData": reqData,
"dataType": dataType,
"err": err,
}).Error("Error requesting the data from peer")
// Mark this peer as not responding
p.peerManager.MarkUnresponsivePeer(peerID, location)
p.peerManager.MarkUnresponsivePeer(peerID, topic)
}
// send the block to the result channel
resultChan <- recvd
}

// Request a data from the network for the specified slice
func (p *P2PNode) Request(location common.Location, requestData interface{}, responseDataType interface{}) chan interface{} {
topic, err := pubsubManager.TopicName(p.pubsub.GetGenesis(), location, responseDataType)
if err != nil {
log.Global.WithFields(log.Fields{
"location": location.Name(),
"err": err,
}).Error("Error getting topic name")
panic(err)
}

resultChan := make(chan interface{}, 1)
// If it is a hash, first check to see if it is contained in the caches
if hash, ok := requestData.(common.Hash); ok {
Expand All @@ -208,30 +217,30 @@ func (p *P2PNode) Request(location common.Location, requestData interface{}, res
}
}

p.requestFromPeers(location, requestData, responseDataType, resultChan)
p.requestFromPeers(topic, location, requestData, responseDataType, resultChan)
// TODO: optimize with waitgroups or a doneChan to only query if no peers responded
// Right now this creates too many streams, so don't call this until we have a better solution
// p.queryDHT(location, requestData, responseDataType, resultChan)

return resultChan
}

func (p *P2PNode) MarkLivelyPeer(peer p2p.PeerID, location common.Location) {
func (p *P2PNode) MarkLivelyPeer(peer p2p.PeerID, topic string) {
log.Global.WithFields(log.Fields{
"peer": peer,
"location": location,
"peer": peer,
"topic": topic,
}).Debug("Recording well-behaving peer")

p.peerManager.MarkLivelyPeer(peer, location)
p.peerManager.MarkLivelyPeer(peer, topic)
}

func (p *P2PNode) MarkLatentPeer(peer p2p.PeerID, location common.Location) {
func (p *P2PNode) MarkLatentPeer(peer p2p.PeerID, topic string) {
log.Global.WithFields(log.Fields{
"peer": peer,
"location": location,
"peer": peer,
"topic": topic,
}).Debug("Recording misbehaving peer")

p.peerManager.MarkLatentPeer(peer, location)
p.peerManager.MarkLatentPeer(peer, topic)
}

func (p *P2PNode) ProtectPeer(peer p2p.PeerID) {
Expand Down Expand Up @@ -292,7 +301,7 @@ func (p *P2PNode) GetTrieNode(hash common.Hash, location common.Location) *trie.
return p.consensus.GetTrieNode(hash, location)
}

func (p *P2PNode) handleBroadcast(sourcePeer peer.ID, data interface{}, nodeLocation common.Location) {
func (p *P2PNode) handleBroadcast(sourcePeer peer.ID, topic string, data interface{}, nodeLocation common.Location) {
switch v := data.(type) {
case types.WorkObject:
p.cacheAdd(v.Hash(), &v, nodeLocation)
Expand All @@ -306,6 +315,6 @@ func (p *P2PNode) handleBroadcast(sourcePeer peer.ID, data interface{}, nodeLoca

// If we made it here, pass the data on to the consensus backend
if p.consensus != nil {
p.consensus.OnNewBroadcast(sourcePeer, data, nodeLocation)
p.consensus.OnNewBroadcast(sourcePeer, topic, data, nodeLocation)
}
}
14 changes: 7 additions & 7 deletions p2p/node/p2p_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

// Opens a stream to the given peer and request some data for the given hash at the given location
func (p *P2PNode) requestFromPeer(peerID peer.ID, location common.Location, data interface{}, datatype interface{}) (interface{}, error) {
func (p *P2PNode) requestFromPeer(peerID peer.ID, topic string, location common.Location, reqData interface{}, datatype interface{}) (interface{}, error) {
defer func() {
if r := recover(); r != nil {
log.Global.WithFields(log.Fields{
Expand All @@ -30,7 +30,7 @@ func (p *P2PNode) requestFromPeer(peerID peer.ID, location common.Location, data
log.Global.WithFields(log.Fields{
"peerId": peerID,
"location": location.Name(),
"data": data,
"data": reqData,
"datatype": datatype,
}).Trace("Requesting the data from peer")
stream, err := p.NewStream(peerID)
Expand All @@ -49,7 +49,7 @@ func (p *P2PNode) requestFromPeer(peerID peer.ID, location common.Location, data
defer p.requestManager.CloseRequest(id)

// Create the corresponding data request
requestBytes, err := pb.EncodeQuaiRequest(id, location, data, datatype)
requestBytes, err := pb.EncodeQuaiRequest(id, location, reqData, datatype)
if err != nil {
return nil, err
}
Expand All @@ -73,7 +73,7 @@ func (p *P2PNode) requestFromPeer(peerID peer.ID, location common.Location, data
log.Global.WithFields(log.Fields{
"peerId": peerID,
}).Warn("Peer did not respond in time")
p.peerManager.MarkUnresponsivePeer(peerID, location)
p.peerManager.MarkUnresponsivePeer(peerID, topic)
return nil, nil
}

Expand All @@ -85,7 +85,7 @@ func (p *P2PNode) requestFromPeer(peerID peer.ID, location common.Location, data
switch datatype.(type) {
case *types.WorkObject:
if block, ok := recvdType.(*types.WorkObject); ok {
switch data := data.(type) {
switch data := reqData.(type) {
case common.Hash:
if block.Hash() == data {
return block, nil
Expand All @@ -101,11 +101,11 @@ func (p *P2PNode) requestFromPeer(peerID peer.ID, location common.Location, data
}
return nil, errors.New("block request invalid response")
case *types.Header:
if header, ok := recvdType.(*types.Header); ok && header.Hash() == data.(common.Hash) {
if header, ok := recvdType.(*types.Header); ok && header.Hash() == reqData.(common.Hash) {
return header, nil
}
case *types.Transaction:
if tx, ok := recvdType.(*types.Transaction); ok && tx.Hash() == data.(common.Hash) {
if tx, ok := recvdType.(*types.Transaction); ok && tx.Hash() == reqData.(common.Hash) {
return tx, nil
}
case common.Hash:
Expand Down
6 changes: 3 additions & 3 deletions p2p/pb/proto_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@ func DecodeQuaiMessage(data []byte) (*QuaiMessage, error) {

// EncodeRequestMessage creates a marshaled protobuf message for a Quai Request.
// Returns the serialized protobuf message.
func EncodeQuaiRequest(id uint32, location common.Location, data interface{}, datatype interface{}) ([]byte, error) {
func EncodeQuaiRequest(id uint32, location common.Location, reqData interface{}, datatype interface{}) ([]byte, error) {
reqMsg := QuaiRequestMessage{
Id: id,
Location: location.ProtoEncode(),
}

switch d := data.(type) {
switch d := reqData.(type) {
case common.Hash:
reqMsg.Data = &QuaiRequestMessage_Hash{Hash: d.ProtoEncode()}
case *big.Int:
reqMsg.Data = &QuaiRequestMessage_Number{Number: d.Bytes()}
default:
return nil, errors.Errorf("unsupported request input data field type: %T", data)
return nil, errors.Errorf("unsupported request input data field type: %T", reqData)
}

switch datatype.(type) {
Expand Down
Loading

0 comments on commit 8b3ef66

Please sign in to comment.