diff --git a/p2p/node/api.go b/p2p/node/api.go index 5f26a34653..ee84b44ed4 100644 --- a/p2p/node/api.go +++ b/p2p/node/api.go @@ -15,6 +15,7 @@ import ( "github.com/dominant-strategies/go-quai/p2p" "github.com/dominant-strategies/go-quai/p2p/peerManager" quaiprotocol "github.com/dominant-strategies/go-quai/p2p/protocol" + "github.com/dominant-strategies/go-quai/p2p/pubsubManager" "github.com/dominant-strategies/go-quai/quai" "github.com/dominant-strategies/go-quai/trie" @@ -122,7 +123,7 @@ func (p *P2PNode) Stop() error { } } -func (p *P2PNode) requestFromPeers(location common.Location, data interface{}, datatype interface{}, resultChan chan interface{}) { +func (p *P2PNode) requestFromPeers(topic string, location common.Location, requestData interface{}, responseDataType interface{}, resultChan chan interface{}) { go func() { defer func() { if r := recover(); r != nil { @@ -133,10 +134,10 @@ func (p *P2PNode) requestFromPeers(location common.Location, data interface{}, d } }() defer close(resultChan) - peers := p.peerManager.GetPeers(location, data, peerManager.Best) + peers := p.peerManager.GetPeers(topic, peerManager.Best) log.Global.WithFields(log.Fields{ - "peers": peers, - "location": location, + "peers": peers, + "topic": topic, }).Debug("Requesting data from peers") var requestWg sync.WaitGroup @@ -152,14 +153,14 @@ func (p *P2PNode) requestFromPeers(location common.Location, data interface{}, d } }() defer requestWg.Done() - p.requestAndWait(peerID, location, data, datatype, resultChan) + p.requestAndWait(peerID, location, requestData, topic, requestData, resultChan) }(peerID) } requestWg.Wait() }() } -func (p *P2PNode) requestAndWait(peerID peer.ID, location common.Location, data interface{}, dataType interface{}, resultChan chan interface{}) { +func (p *P2PNode) requestAndWait(peerID peer.ID, location common.Location, reqData interface{}, topic string, dataType interface{}, resultChan chan interface{}) { defer func() { if r := recover(); r != nil { log.Global.WithFields(log.Fields{ @@ -170,27 +171,26 @@ func (p *P2PNode) requestAndWait(peerID peer.ID, location common.Location, data }() var recvd interface{} var err error - // Ask peer and wait for response - if recvd, err = p.requestFromPeer(peerID, location, data, dataType); err == nil { + if recvd, err = p.requestFromPeer(peerID, topic, location, reqData, dataType); err == nil { log.Global.WithFields(log.Fields{ - "data": data, + "reqData": reqData, "dataType": dataType, "peerId": peerID, "location": location.Name(), }).Trace("Received data from peer") // Mark this peer as behaving well - p.peerManager.MarkResponsivePeer(peerID, location) + p.peerManager.MarkResponsivePeer(peerID, topic) } else { log.Global.WithFields(log.Fields{ "peerId": peerID, "location": location.Name(), - "data": data, + "reqData": reqData, "dataType": dataType, "err": err, }).Error("Error requesting the data from peer") // Mark this peer as not responding - p.peerManager.MarkUnresponsivePeer(peerID, location) + p.peerManager.MarkUnresponsivePeer(peerID, topic) } // send the block to the result channel resultChan <- recvd @@ -198,6 +198,15 @@ func (p *P2PNode) requestAndWait(peerID peer.ID, location common.Location, data // Request a data from the network for the specified slice func (p *P2PNode) Request(location common.Location, requestData interface{}, responseDataType interface{}) chan interface{} { + topic, err := pubsubManager.TopicName(p.pubsub.GetGenesis(), location, responseDataType) + if err != nil { + log.Global.WithFields(log.Fields{ + "location": location.Name(), + "err": err, + }).Error("Error getting topic name") + panic(err) + } + resultChan := make(chan interface{}, 1) // If it is a hash, first check to see if it is contained in the caches if hash, ok := requestData.(common.Hash); ok { @@ -208,7 +217,7 @@ func (p *P2PNode) Request(location common.Location, requestData interface{}, res } } - p.requestFromPeers(location, requestData, responseDataType, resultChan) + p.requestFromPeers(topic, location, requestData, responseDataType, resultChan) // TODO: optimize with waitgroups or a doneChan to only query if no peers responded // Right now this creates too many streams, so don't call this until we have a better solution // p.queryDHT(location, requestData, responseDataType, resultChan) @@ -216,22 +225,22 @@ func (p *P2PNode) Request(location common.Location, requestData interface{}, res return resultChan } -func (p *P2PNode) MarkLivelyPeer(peer p2p.PeerID, location common.Location) { +func (p *P2PNode) MarkLivelyPeer(peer p2p.PeerID, topic string) { log.Global.WithFields(log.Fields{ - "peer": peer, - "location": location, + "peer": peer, + "topic": topic, }).Debug("Recording well-behaving peer") - p.peerManager.MarkLivelyPeer(peer, location) + p.peerManager.MarkLivelyPeer(peer, topic) } -func (p *P2PNode) MarkLatentPeer(peer p2p.PeerID, location common.Location) { +func (p *P2PNode) MarkLatentPeer(peer p2p.PeerID, topic string) { log.Global.WithFields(log.Fields{ - "peer": peer, - "location": location, + "peer": peer, + "topic": topic, }).Debug("Recording misbehaving peer") - p.peerManager.MarkLatentPeer(peer, location) + p.peerManager.MarkLatentPeer(peer, topic) } func (p *P2PNode) ProtectPeer(peer p2p.PeerID) { @@ -292,7 +301,7 @@ func (p *P2PNode) GetTrieNode(hash common.Hash, location common.Location) *trie. return p.consensus.GetTrieNode(hash, location) } -func (p *P2PNode) handleBroadcast(sourcePeer peer.ID, data interface{}, nodeLocation common.Location) { +func (p *P2PNode) handleBroadcast(sourcePeer peer.ID, topic string, data interface{}, nodeLocation common.Location) { switch v := data.(type) { case types.WorkObject: p.cacheAdd(v.Hash(), &v, nodeLocation) @@ -306,6 +315,6 @@ func (p *P2PNode) handleBroadcast(sourcePeer peer.ID, data interface{}, nodeLoca // If we made it here, pass the data on to the consensus backend if p.consensus != nil { - p.consensus.OnNewBroadcast(sourcePeer, data, nodeLocation) + p.consensus.OnNewBroadcast(sourcePeer, topic, data, nodeLocation) } } diff --git a/p2p/node/p2p_services.go b/p2p/node/p2p_services.go index ffb61bceac..e031615975 100644 --- a/p2p/node/p2p_services.go +++ b/p2p/node/p2p_services.go @@ -18,7 +18,7 @@ import ( ) // Opens a stream to the given peer and request some data for the given hash at the given location -func (p *P2PNode) requestFromPeer(peerID peer.ID, location common.Location, data interface{}, datatype interface{}) (interface{}, error) { +func (p *P2PNode) requestFromPeer(peerID peer.ID, topic string, location common.Location, reqData interface{}, datatype interface{}) (interface{}, error) { defer func() { if r := recover(); r != nil { log.Global.WithFields(log.Fields{ @@ -30,7 +30,7 @@ func (p *P2PNode) requestFromPeer(peerID peer.ID, location common.Location, data log.Global.WithFields(log.Fields{ "peerId": peerID, "location": location.Name(), - "data": data, + "data": reqData, "datatype": datatype, }).Trace("Requesting the data from peer") stream, err := p.NewStream(peerID) @@ -49,7 +49,7 @@ func (p *P2PNode) requestFromPeer(peerID peer.ID, location common.Location, data defer p.requestManager.CloseRequest(id) // Create the corresponding data request - requestBytes, err := pb.EncodeQuaiRequest(id, location, data, datatype) + requestBytes, err := pb.EncodeQuaiRequest(id, location, reqData, datatype) if err != nil { return nil, err } @@ -73,7 +73,7 @@ func (p *P2PNode) requestFromPeer(peerID peer.ID, location common.Location, data log.Global.WithFields(log.Fields{ "peerId": peerID, }).Warn("Peer did not respond in time") - p.peerManager.MarkUnresponsivePeer(peerID, location) + p.peerManager.MarkUnresponsivePeer(peerID, topic) return nil, nil } @@ -85,7 +85,7 @@ func (p *P2PNode) requestFromPeer(peerID peer.ID, location common.Location, data switch datatype.(type) { case *types.WorkObject: if block, ok := recvdType.(*types.WorkObject); ok { - switch data := data.(type) { + switch data := reqData.(type) { case common.Hash: if block.Hash() == data { return block, nil @@ -101,11 +101,11 @@ func (p *P2PNode) requestFromPeer(peerID peer.ID, location common.Location, data } return nil, errors.New("block request invalid response") case *types.Header: - if header, ok := recvdType.(*types.Header); ok && header.Hash() == data.(common.Hash) { + if header, ok := recvdType.(*types.Header); ok && header.Hash() == reqData.(common.Hash) { return header, nil } case *types.Transaction: - if tx, ok := recvdType.(*types.Transaction); ok && tx.Hash() == data.(common.Hash) { + if tx, ok := recvdType.(*types.Transaction); ok && tx.Hash() == reqData.(common.Hash) { return tx, nil } case common.Hash: diff --git a/p2p/pb/proto_services.go b/p2p/pb/proto_services.go index 0c28224b82..1fdf73a45a 100644 --- a/p2p/pb/proto_services.go +++ b/p2p/pb/proto_services.go @@ -21,19 +21,19 @@ func DecodeQuaiMessage(data []byte) (*QuaiMessage, error) { // EncodeRequestMessage creates a marshaled protobuf message for a Quai Request. // Returns the serialized protobuf message. -func EncodeQuaiRequest(id uint32, location common.Location, data interface{}, datatype interface{}) ([]byte, error) { +func EncodeQuaiRequest(id uint32, location common.Location, reqData interface{}, datatype interface{}) ([]byte, error) { reqMsg := QuaiRequestMessage{ Id: id, Location: location.ProtoEncode(), } - switch d := data.(type) { + switch d := reqData.(type) { case common.Hash: reqMsg.Data = &QuaiRequestMessage_Hash{Hash: d.ProtoEncode()} case *big.Int: reqMsg.Data = &QuaiRequestMessage_Number{Number: d.Bytes()} default: - return nil, errors.Errorf("unsupported request input data field type: %T", data) + return nil, errors.Errorf("unsupported request input data field type: %T", reqData) } switch datatype.(type) { diff --git a/p2p/peerManager/peerManager.go b/p2p/peerManager/peerManager.go index d98a1ee5b1..d0d2a28739 100644 --- a/p2p/peerManager/peerManager.go +++ b/p2p/peerManager/peerManager.go @@ -88,17 +88,17 @@ type PeerManager interface { // Returns c_peerCount peers starting at the requested quality level of peers // If there are not enough peers at the requested quality, it will return lower quality peers // If there still aren't enough peers, it will query the DHT for more - GetPeers(location common.Location, data interface{}, quality PeerQuality) []p2p.PeerID + GetPeers(topic string, quality PeerQuality) []p2p.PeerID // Increases the peer's liveliness score - MarkLivelyPeer(p2p.PeerID, common.Location) + MarkLivelyPeer(peerID p2p.PeerID, topic string) // Decreases the peer's liveliness score - MarkLatentPeer(p2p.PeerID, common.Location) + MarkLatentPeer(peerID p2p.PeerID, topic string) // Increases the peer's liveliness score. Not exposed outside of NetworkingAPI - MarkResponsivePeer(p2p.PeerID, common.Location) + MarkResponsivePeer(peerID p2p.PeerID, topic string) // Decreases the peer's liveliness score. Not exposed outside of NetworkingAPI - MarkUnresponsivePeer(p2p.PeerID, common.Location) + MarkUnresponsivePeer(peerID p2p.PeerID, topic string) // Protects the peer's connection from being disconnected ProtectPeer(p2p.PeerID) @@ -330,15 +330,15 @@ func (pm *BasicPeerManager) getPeersHelper(peerDB *peerdb.PeerDB, numPeers int) return peerSubset } -func (pm *BasicPeerManager) GetPeers(location common.Location, data interface{}, quality PeerQuality) []p2p.PeerID { +func (pm *BasicPeerManager) GetPeers(topic string, quality PeerQuality) []p2p.PeerID { var peerList []p2p.PeerID switch quality { case Best: - peerList = pm.getBestPeersWithFallback(location) + peerList = pm.getBestPeersWithFallback(topic) case Responsive: - peerList = pm.getResponsivePeersWithFallback(location) + peerList = pm.getResponsivePeersWithFallback(topic) case LastResort: - peerList = pm.getLastResortPeers(location) + peerList = pm.getLastResortPeers(topic) default: panic("Invalid peer quality") } @@ -350,13 +350,12 @@ func (pm *BasicPeerManager) GetPeers(location common.Location, data interface{}, } // Query the DHT for more peers - return pm.queryDHT(location, data, peerList, C_peerCount-lenPeer) + return pm.queryDHT(topic, peerList, C_peerCount-lenPeer) } -func (pm *BasicPeerManager) queryDHT(location common.Location, data interface{}, peerList []p2p.PeerID, peerCount int) []p2p.PeerID { +func (pm *BasicPeerManager) queryDHT(topic string, peerList []p2p.PeerID, peerCount int) []p2p.PeerID { // create a Cid from the slice location - topicName, _ := pubsubManager.TopicName(pm.genesis, location, data) - shardCid := pubsubManager.TopicToCid(topicName) + shardCid := pubsubManager.TopicToCid(topic) // Internal list of peers from the dht dhtPeers := make([]p2p.PeerID, 0, peerCount) @@ -372,54 +371,51 @@ func (pm *BasicPeerManager) queryDHT(location common.Location, data interface{}, return append(peerList, dhtPeers...) } -func (pm *BasicPeerManager) getBestPeersWithFallback(location common.Location) []p2p.PeerID { - locName := location.Name() - if pm.peerDBs[locName] == nil { +func (pm *BasicPeerManager) getBestPeersWithFallback(topic string) []p2p.PeerID { + if pm.peerDBs[topic] == nil { // There have not been any peers added to this topic return nil } - bestPeersCount := pm.peerDBs[locName][Best].GetPeerCount() + bestPeersCount := pm.peerDBs[topic][Best].GetPeerCount() if bestPeersCount < C_peerCount { - bestPeerList := pm.getPeersHelper(pm.peerDBs[locName][Best], bestPeersCount) - bestPeerList = append(bestPeerList, pm.getResponsivePeersWithFallback(location)...) + bestPeerList := pm.getPeersHelper(pm.peerDBs[topic][Best], bestPeersCount) + bestPeerList = append(bestPeerList, pm.getResponsivePeersWithFallback(topic)...) return bestPeerList } - return pm.getPeersHelper(pm.peerDBs[locName][Best], C_peerCount) + return pm.getPeersHelper(pm.peerDBs[topic][Best], C_peerCount) } -func (pm *BasicPeerManager) getResponsivePeersWithFallback(location common.Location) []p2p.PeerID { - locName := location.Name() - - responsivePeersCount := pm.peerDBs[locName][Responsive].GetPeerCount() +func (pm *BasicPeerManager) getResponsivePeersWithFallback(topic string) []p2p.PeerID { + responsivePeersCount := pm.peerDBs[topic][Responsive].GetPeerCount() if responsivePeersCount < C_peerCount { - responsivePeerList := pm.getPeersHelper(pm.peerDBs[locName][Responsive], responsivePeersCount) - responsivePeerList = append(responsivePeerList, pm.getLastResortPeers(location)...) + responsivePeerList := pm.getPeersHelper(pm.peerDBs[topic][Responsive], responsivePeersCount) + responsivePeerList = append(responsivePeerList, pm.getLastResortPeers(topic)...) return responsivePeerList } - return pm.getPeersHelper(pm.peerDBs[locName][Responsive], C_peerCount) + return pm.getPeersHelper(pm.peerDBs[topic][Responsive], C_peerCount) } -func (pm *BasicPeerManager) getLastResortPeers(location common.Location) []p2p.PeerID { - return pm.getPeersHelper(pm.peerDBs[location.Name()][LastResort], C_peerCount) +func (pm *BasicPeerManager) getLastResortPeers(topic string) []p2p.PeerID { + return pm.getPeersHelper(pm.peerDBs[topic][LastResort], C_peerCount) } -func (pm *BasicPeerManager) MarkLivelyPeer(peer p2p.PeerID, location common.Location) { +func (pm *BasicPeerManager) MarkLivelyPeer(peer p2p.PeerID, topic string) { if peer == pm.selfID { return } pm.TagPeer(peer, "liveness_reports", 1) - pm.recategorizePeer(peer, location) + pm.recategorizePeer(peer, topic) } -func (pm *BasicPeerManager) MarkLatentPeer(peer p2p.PeerID, location common.Location) { +func (pm *BasicPeerManager) MarkLatentPeer(peer p2p.PeerID, topic string) { if peer == pm.selfID { return } pm.TagPeer(peer, "latency_reports", 1) - pm.recategorizePeer(peer, location) + pm.recategorizePeer(peer, topic) } func (pm *BasicPeerManager) calculatePeerLiveness(peer p2p.PeerID) float64 { @@ -433,14 +429,14 @@ func (pm *BasicPeerManager) calculatePeerLiveness(peer p2p.PeerID) float64 { return float64(liveness) / float64(latents) } -func (pm *BasicPeerManager) MarkResponsivePeer(peer p2p.PeerID, location common.Location) { +func (pm *BasicPeerManager) MarkResponsivePeer(peer p2p.PeerID, topic string) { pm.TagPeer(peer, "responses_served", 1) - pm.recategorizePeer(peer, location) + pm.recategorizePeer(peer, topic) } -func (pm *BasicPeerManager) MarkUnresponsivePeer(peer p2p.PeerID, location common.Location) { +func (pm *BasicPeerManager) MarkUnresponsivePeer(peer p2p.PeerID, topic string) { pm.TagPeer(peer, "responses_missed", 1) - pm.recategorizePeer(peer, location) + pm.recategorizePeer(peer, topic) } func (pm *BasicPeerManager) calculatePeerResponsiveness(peer p2p.PeerID) float64 { @@ -462,12 +458,12 @@ func (pm *BasicPeerManager) calculatePeerResponsiveness(peer p2p.PeerID) float64 // // 3. peers // - all other peers -func (pm *BasicPeerManager) recategorizePeer(peerID p2p.PeerID, location common.Location) error { +func (pm *BasicPeerManager) recategorizePeer(peerID p2p.PeerID, topic string) error { liveness := pm.calculatePeerLiveness(peerID) responsiveness := pm.calculatePeerResponsiveness(peerID) // remove peer from DB first - err := pm.removePeerFromTopic(peerID, location.Name()) + err := pm.removePeerFromTopic(peerID, topic) if err != nil { return err } @@ -485,24 +481,23 @@ func (pm *BasicPeerManager) recategorizePeer(peerID p2p.PeerID, location common. return errors.Wrap(err, "error marshaling peer info") } - locationName := location.Name() if liveness >= c_qualityThreshold && responsiveness >= c_qualityThreshold { // Best peers: high liveness and responsiveness - err := pm.peerDBs[locationName][Best].Put(pm.ctx, key, peerInfo) + err := pm.peerDBs[topic][Best].Put(pm.ctx, key, peerInfo) if err != nil { return errors.Wrap(err, "error putting peer in bestPeersDB") } } else if responsiveness >= c_qualityThreshold { // Responsive peers: high responsiveness, but low liveness - err := pm.peerDBs[locationName][Responsive].Put(pm.ctx, key, peerInfo) + err := pm.peerDBs[topic][Responsive].Put(pm.ctx, key, peerInfo) if err != nil { return errors.Wrap(err, "error putting peer in responsivePeersDB") } } else { // All other peers - err := pm.peerDBs[locationName][LastResort].Put(pm.ctx, key, peerInfo) + err := pm.peerDBs[topic][LastResort].Put(pm.ctx, key, peerInfo) if err != nil { return errors.Wrap(err, "error putting peer in allPeersDB") } diff --git a/p2p/pubsubManager/gossipsub.go b/p2p/pubsubManager/gossipsub.go index 68db576161..961c36f757 100644 --- a/p2p/pubsubManager/gossipsub.go +++ b/p2p/pubsubManager/gossipsub.go @@ -30,7 +30,7 @@ type PubsubManager struct { genesis common.Hash // Callback function to handle received data - onReceived func(peer.ID, interface{}, common.Location) + onReceived func(peer.ID, string, interface{}, common.Location) } // creates a new gossipsub instance @@ -52,13 +52,17 @@ func NewGossipSubManager(ctx context.Context, h host.Host) (*PubsubManager, erro }, nil } +func (g *PubsubManager) GetGenesis() common.Hash { + return g.genesis +} + func (g *PubsubManager) SetQuaiBackend(consensus quai.ConsensusAPI) { g.UnsubscribeAll() // First unsubscribe from existing topics, if already registered g.consensus = consensus // Set new backend } -func (g *PubsubManager) Start(receiveCb func(peer.ID, interface{}, common.Location)) { +func (g *PubsubManager) Start(receiveCb func(peer.ID, string, interface{}, common.Location)) { g.onReceived = receiveCb } @@ -135,7 +139,7 @@ func (g *PubsubManager) Subscribe(location common.Location, datatype interface{} // handle the received data if g.onReceived != nil { - g.onReceived(msg.ReceivedFrom, data, location) + g.onReceived(msg.ReceivedFrom, *msg.Topic, data, location) } } }(location, subscription) diff --git a/quai/interface.go b/quai/interface.go index 9217061643..4e77ec856e 100644 --- a/quai/interface.go +++ b/quai/interface.go @@ -23,7 +23,7 @@ type ConsensusAPI interface { // Handle new data propagated from the gossip network. Should return quickly. // Specify the peer which propagated the data to us, as well as the data itself. // Return true if this data should be relayed to peers. False if it should be ignored. - OnNewBroadcast(core.PeerID, interface{}, common.Location) bool + OnNewBroadcast(core.PeerID, string, interface{}, common.Location) bool // Creates the function that will be used to determine if a message should be propagated. ValidatorFunc() func(ctx context.Context, id peer.ID, msg *pubsub.Message) pubsub.ValidationResult @@ -85,9 +85,9 @@ type NetworkingAPI interface { // Methods to report a peer to the P2PClient as behaving maliciously // Should be called whenever a peer sends us data that is acceptably lively - MarkLivelyPeer(core.PeerID, common.Location) + MarkLivelyPeer(peerID core.PeerID, topic string) // Should be called whenever a peer sends us data that is stale or latent - MarkLatentPeer(core.PeerID, common.Location) + MarkLatentPeer(peerID core.PeerID, topic string) // Protects the peer's connection from being pruned ProtectPeer(core.PeerID) diff --git a/quai/p2p_backend.go b/quai/p2p_backend.go index bc04428283..d713409698 100644 --- a/quai/p2p_backend.go +++ b/quai/p2p_backend.go @@ -77,7 +77,7 @@ func (qbe *QuaiBackend) GetBackend(location common.Location) *quaiapi.Backend { } // Handle consensus data propagated to us from our peers -func (qbe *QuaiBackend) OnNewBroadcast(sourcePeer p2p.PeerID, data interface{}, nodeLocation common.Location) bool { +func (qbe *QuaiBackend) OnNewBroadcast(sourcePeer p2p.PeerID, topic string, data interface{}, nodeLocation common.Location) bool { switch data := data.(type) { case types.WorkObject: backend := *qbe.GetBackend(nodeLocation) @@ -104,7 +104,7 @@ func (qbe *QuaiBackend) OnNewBroadcast(sourcePeer p2p.PeerID, data interface{}, } // If it was a good broadcast, mark the peer as lively - qbe.p2pBackend.MarkLivelyPeer(sourcePeer, nodeLocation) + qbe.p2pBackend.MarkLivelyPeer(sourcePeer, topic) return true } @@ -124,15 +124,14 @@ func (qbe *QuaiBackend) ValidatorFunc() func(ctx context.Context, id p2p.PeerID, return func(ctx context.Context, id peer.ID, msg *pubsub.Message) pubsub.ValidationResult { var data interface{} data = msg.Message.GetData() - switch data.(type) { + switch data := data.(type) { case types.WorkObject: - block := data.(types.WorkObject) - backend := *qbe.GetBackend(block.Location()) + backend := *qbe.GetBackend(data.Location()) if backend == nil { log.Global.WithFields(log.Fields{ "peer": id, - "hash": block.Hash(), - "location": block.Location(), + "hash": data.Hash(), + "location": data.Location(), }).Error("no backend found for this location") return pubsub.ValidationReject }