diff --git a/cmd/utils/hierarchical_coordinator.go b/cmd/utils/hierarchical_coordinator.go index 16e54e7533..69eeee872b 100644 --- a/cmd/utils/hierarchical_coordinator.go +++ b/cmd/utils/hierarchical_coordinator.go @@ -133,10 +133,14 @@ func (hc *HierarchicalCoordinator) startNode(logPath string, quaiBackend quai.Co stack, apiBackend := makeFullNode(hc.p2p, location, hc.slicesRunning, hc.currentExpansionNumber, genesisBlock, logger) quaiBackend.SetApiBackend(&apiBackend, location) + // Only subscribe to block and transaction data if processing state + if quaiBackend.ProcessingState(location) { + hc.p2p.Subscribe(location, &types.WorkObjectBlockView{}) + hc.p2p.Subscribe(location, &types.Transactions{}) + } + // Subscribe to the new topics after setting the api backend - hc.p2p.Subscribe(location, &types.WorkObject{}) - hc.p2p.Subscribe(location, common.Hash{}) - hc.p2p.Subscribe(location, &types.Transactions{}) + hc.p2p.Subscribe(location, &types.WorkObjectHeaderView{}) StartNode(stack) diff --git a/consensus/blake3pow/consensus.go b/consensus/blake3pow/consensus.go index d72d701dae..c6c7a71968 100644 --- a/consensus/blake3pow/consensus.go +++ b/consensus/blake3pow/consensus.go @@ -616,7 +616,7 @@ func (blake3pow *Blake3pow) FinalizeAndAssemble(chain consensus.ChainHeaderReade return nil, err } // Header seems complete, assemble into a block and return - return types.NewWorkObject(header.WorkObjectHeader(), woBody, nil, types.BlockObject), nil + return types.NewWorkObject(header.WorkObjectHeader(), woBody, nil), nil } // NodeLocation returns the location of the node diff --git a/consensus/progpow/consensus.go b/consensus/progpow/consensus.go index 20120e2f78..6f5693903a 100644 --- a/consensus/progpow/consensus.go +++ b/consensus/progpow/consensus.go @@ -652,7 +652,7 @@ func (progpow *Progpow) FinalizeAndAssemble(chain consensus.ChainHeaderReader, h return nil, err } // Header seems complete, assemble into a block and return - return types.NewWorkObject(header.WorkObjectHeader(), woBody, nil, types.BlockObject), nil + return types.NewWorkObject(header.WorkObjectHeader(), woBody, nil), nil } func (progpow *Progpow) NodeLocation() common.Location { diff --git a/core/bodydb.go b/core/bodydb.go index 40262944c1..ff49b08d7b 100644 --- a/core/bodydb.go +++ b/core/bodydb.go @@ -43,7 +43,8 @@ type BodyDb struct { woCache *lru.Cache processor *StateProcessor - slicesRunning []common.Location + slicesRunning []common.Location + processingState bool logger *log.Logger } diff --git a/core/headerchain.go b/core/headerchain.go index 35dbcbc775..bf566d7533 100644 --- a/core/headerchain.go +++ b/core/headerchain.go @@ -72,9 +72,10 @@ type HeaderChain struct { running int32 // 0 if chain is running, 1 when stopped procInterrupt int32 // interrupt signaler for block processing - headermu sync.RWMutex - heads []*types.WorkObject - slicesRunning []common.Location + headermu sync.RWMutex + heads []*types.WorkObject + slicesRunning []common.Location + processingState bool logger *log.Logger @@ -124,6 +125,9 @@ func NewHeaderChain(db ethdb.Database, engine consensus.Engine, pEtxsRollupFetch return nil, err } + // Record if the chain is processing state + hc.processingState = hc.setStateProcessing() + pendingEtxsRollup, _ := lru.New(c_maxPendingEtxsRollup) hc.pendingEtxsRollup = pendingEtxsRollup @@ -314,7 +318,26 @@ func (hc *HeaderChain) AppendHeader(header *types.WorkObject) error { return nil } func (hc *HeaderChain) ProcessingState() bool { - return hc.bc.ProcessingState() + return hc.processingState +} + +func (hc *HeaderChain) setStateProcessing() bool { + nodeCtx := hc.NodeCtx() + for _, slice := range hc.slicesRunning { + switch nodeCtx { + case common.PRIME_CTX: + return true + case common.REGION_CTX: + if slice.Region() == hc.NodeLocation().Region() { + return true + } + case common.ZONE_CTX: + if slice.Equal(hc.NodeLocation()) { + return true + } + } + } + return false } // Append diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index 66494e8001..4aaa3dd8cd 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -545,7 +545,7 @@ func DeleteTermini(db ethdb.KeyValueWriter, hash common.Hash) { } // ReadWorkObjectHeader retreive's the work object header stored in hash. -func ReadWorkObjectHeader(db ethdb.Reader, hash common.Hash, woType int) *types.WorkObjectHeader { +func ReadWorkObjectHeader(db ethdb.Reader, hash common.Hash, woType types.WorkObjectView) *types.WorkObjectHeader { var key []byte switch woType { case types.BlockObject: @@ -577,7 +577,7 @@ func ReadWorkObjectHeader(db ethdb.Reader, hash common.Hash, woType int) *types. } // WriteWorkObjectHeader writes the work object header of the terminus hash. -func WriteWorkObjectHeader(db ethdb.KeyValueWriter, hash common.Hash, workObject *types.WorkObject, woType int, nodeCtx int) { +func WriteWorkObjectHeader(db ethdb.KeyValueWriter, hash common.Hash, workObject *types.WorkObject, woType types.WorkObjectView, nodeCtx int) { var key []byte switch woType { case types.BlockObject: @@ -601,7 +601,7 @@ func WriteWorkObjectHeader(db ethdb.KeyValueWriter, hash common.Hash, workObject } // DeleteWorkObjectHeader deletes the work object header stored for the header hash. -func DeleteWorkObjectHeader(db ethdb.KeyValueWriter, hash common.Hash, woType int) { +func DeleteWorkObjectHeader(db ethdb.KeyValueWriter, hash common.Hash, woType types.WorkObjectView) { var key []byte switch woType { case types.BlockObject: @@ -617,7 +617,7 @@ func DeleteWorkObjectHeader(db ethdb.KeyValueWriter, hash common.Hash, woType in } // ReadWorkObject retreive's the work object stored in hash. -func ReadWorkObject(db ethdb.Reader, hash common.Hash, woType int) *types.WorkObject { +func ReadWorkObject(db ethdb.Reader, hash common.Hash, woType types.WorkObjectView) *types.WorkObject { workObjectHeader := ReadWorkObjectHeader(db, hash, woType) if workObjectHeader == nil { return nil @@ -626,10 +626,10 @@ func ReadWorkObject(db ethdb.Reader, hash common.Hash, woType int) *types.WorkOb if workObjectBody == nil { return nil } - return types.NewWorkObject(workObjectHeader, workObjectBody, nil, woType) //TODO: mmtx transaction + return types.NewWorkObject(workObjectHeader, workObjectBody, nil) //TODO: mmtx transaction } -func ReadWorkObjectHeaderOnly(db ethdb.Reader, hash common.Hash, woType int) *types.WorkObject { +func ReadWorkObjectHeaderOnly(db ethdb.Reader, hash common.Hash, woType types.WorkObjectView) *types.WorkObject { workObjectHeader := ReadWorkObjectHeader(db, hash, woType) if workObjectHeader == nil { return nil @@ -638,17 +638,17 @@ func ReadWorkObjectHeaderOnly(db ethdb.Reader, hash common.Hash, woType int) *ty if workObjectBodyHeaderOnly == nil { return nil } - return types.NewWorkObject(workObjectHeader, workObjectBodyHeaderOnly, nil, woType) + return types.NewWorkObject(workObjectHeader, workObjectBodyHeaderOnly, nil) } // WriteWorkObject writes the work object of the terminus hash. -func WriteWorkObject(db ethdb.KeyValueWriter, hash common.Hash, workObject *types.WorkObject, woType int, nodeCtx int) { +func WriteWorkObject(db ethdb.KeyValueWriter, hash common.Hash, workObject *types.WorkObject, woType types.WorkObjectView, nodeCtx int) { WriteWorkObjectBody(db, hash, workObject, woType, nodeCtx) WriteWorkObjectHeader(db, hash, workObject, woType, nodeCtx) } // DeleteWorkObject deletes the work object stored for the header hash. -func DeleteWorkObject(db ethdb.KeyValueWriter, hash common.Hash, number uint64, woType int) { +func DeleteWorkObject(db ethdb.KeyValueWriter, hash common.Hash, number uint64, woType types.WorkObjectView) { DeleteWorkObjectBody(db, hash) DeleteWorkObjectHeader(db, hash, woType) //TODO: mmtx transaction DeleteHeader(db, hash, number) @@ -657,7 +657,7 @@ func DeleteWorkObject(db ethdb.KeyValueWriter, hash common.Hash, number uint64, // DeleteWorkObjectWithoutNumber removes all block data associated with a hash, except // the hash to number mapping. -func DeleteBlockWithoutNumber(db ethdb.KeyValueWriter, hash common.Hash, number uint64, woType int) { +func DeleteBlockWithoutNumber(db ethdb.KeyValueWriter, hash common.Hash, number uint64, woType types.WorkObjectView) { DeleteWorkObjectBody(db, hash) DeleteWorkObjectHeader(db, hash, woType) //TODO: mmtx transaction DeleteReceipts(db, hash, number) @@ -705,7 +705,7 @@ func ReadWorkObjectBodyHeaderOnly(db ethdb.Reader, hash common.Hash) *types.Work } // WriteWorkObjectBody writes the work object body of the terminus hash. -func WriteWorkObjectBody(db ethdb.KeyValueWriter, hash common.Hash, workObject *types.WorkObject, woType int, nodeCtx int) { +func WriteWorkObjectBody(db ethdb.KeyValueWriter, hash common.Hash, workObject *types.WorkObject, woType types.WorkObjectView, nodeCtx int) { key := workObjectBodyKey(hash) WriteHeaderNumber(db, hash, workObject.NumberU64(nodeCtx)) @@ -1079,7 +1079,7 @@ func ReadBadWorkObject(db ethdb.Reader, hash common.Hash) *types.WorkObject { } for _, bad := range *badWorkObjects { if bad.woHeader.Hash() == hash { - return types.NewWorkObject(bad.woHeader, bad.woBody, nil, types.BlockObject) + return types.NewWorkObject(bad.woHeader, bad.woBody, nil) } } return nil diff --git a/core/slice.go b/core/slice.go index 5897a6de34..3b341f35f7 100644 --- a/core/slice.go +++ b/core/slice.go @@ -1312,7 +1312,7 @@ func (sl *Slice) ConstructLocalMinedBlock(wo *types.WorkObject) (*types.WorkObje wo.Body().SetTransactions(nil) wo.Body().SetExtTransactions(nil) wo.Body().SetInterlinkHashes(interlinkHashes) - pendingBlockBody = types.NewWorkObject(wo.WorkObjectHeader(), wo.Body(), nil, types.BlockObject) + pendingBlockBody = types.NewWorkObject(wo.WorkObjectHeader(), wo.Body(), nil) } // Load uncles because they are not included in the block response. txs := make([]*types.Transaction, len(pendingBlockBody.Transactions())) @@ -1341,7 +1341,7 @@ func (sl *Slice) ConstructLocalMinedBlock(wo *types.WorkObject) (*types.WorkObje pendingBlockBody.Body().SetExtTransactions(etxs) pendingBlockBody.Body().SetManifest(subManifest) pendingBlockBody.Body().SetInterlinkHashes(interlinkHashes) - block := types.NewWorkObject(wo.WorkObjectHeader(), pendingBlockBody.Body(), nil, types.BlockObject) + block := types.NewWorkObject(wo.WorkObjectHeader(), pendingBlockBody.Body(), nil) if err := sl.validator.ValidateBody(block); err != nil { return block, err diff --git a/core/types/wo.go b/core/types/wo.go index c6830493f8..7cc16352d4 100644 --- a/core/types/wo.go +++ b/core/types/wo.go @@ -20,7 +20,6 @@ type WorkObject struct { tx *Transaction // caches - size atomic.Value appendTime atomic.Value // These fields are used to track @@ -46,11 +45,14 @@ type WorkObjectHeader struct { type WorkObjects []*WorkObject +type WorkObjectView int + // Work object types const ( - BlockObject = iota + BlockObject WorkObjectView = iota TxObject PEtxObject + HeaderObject PhObject ) @@ -560,14 +562,11 @@ func CalcUncleHash(uncles []*WorkObjectHeader) common.Hash { /////////////////// New Object Creation Methods //////////// //////////////////////////////////////////////////////////// -func NewWorkObject(woHeader *WorkObjectHeader, woBody *WorkObjectBody, tx *Transaction, woType int) *WorkObject { - switch woType { - default: - return &WorkObject{ - woHeader: woHeader, - woBody: woBody, - tx: tx, - } +func NewWorkObject(woHeader *WorkObjectHeader, woBody *WorkObjectBody, tx *Transaction) *WorkObject { + return &WorkObject{ + woHeader: woHeader, + woBody: woBody, + tx: tx, } } @@ -653,10 +652,10 @@ func NewWorkObjectBody(header *Header, txs []*Transaction, etxs []*Transaction, return b, nil } -func NewWorkObjectWithHeader(header *WorkObject, tx *Transaction, nodeCtx int, woType int) *WorkObject { +func NewWorkObjectWithHeader(header *WorkObject, tx *Transaction, nodeCtx int, woType WorkObjectView) *WorkObject { woHeader := NewWorkObjectHeader(header.Hash(), header.ParentHash(common.ZONE_CTX), header.Number(common.ZONE_CTX), header.woHeader.difficulty, header.woHeader.txHash, header.woHeader.nonce, header.woHeader.time, header.Location()) woBody, _ := NewWorkObjectBody(header.Body().Header(), nil, nil, nil, nil, nil, nil, nodeCtx) - return NewWorkObject(woHeader, woBody, tx, woType) + return NewWorkObject(woHeader, woBody, tx) } func CopyWorkObject(wo *WorkObject) *WorkObject { @@ -680,7 +679,7 @@ func (wo *WorkObject) RPCMarshalWorkObject() map[string]interface{} { return result } -func (wo *WorkObject) ProtoEncode(woType int) (*ProtoWorkObject, error) { +func (wo *WorkObject) ProtoEncode(woType WorkObjectView) (*ProtoWorkObject, error) { switch woType { case PEtxObject: header, err := wo.woHeader.ProtoEncode() @@ -723,7 +722,7 @@ func (wo *WorkObject) ProtoEncode(woType int) (*ProtoWorkObject, error) { } } -func (wo *WorkObject) ProtoDecode(data *ProtoWorkObject, location common.Location, woType int) error { +func (wo *WorkObject) ProtoDecode(data *ProtoWorkObject, location common.Location, woType WorkObjectView) error { switch woType { case PEtxObject: wo.woHeader = new(WorkObjectHeader) @@ -998,3 +997,32 @@ func (wb *WorkObjectBody) RPCMarshalWorkObjectBody() map[string]interface{} { return result } + +//////////////////////////////////////////////////////////// +///////////////////// Work Object Views //////////////////// +//////////////////////////////////////////////////////////// + +type WorkObjectBlockView struct { + *WorkObject +} + +type WorkObjectHeaderView struct { + *WorkObject +} + +//////////////////////////////////////////////////////////// +////////////// View Conversion/Getter Methods ////////////// +//////////////////////////////////////////////////////////// + +func (wo *WorkObject) ConvertToHeaderView() *WorkObjectHeaderView { + newWo := NewWorkObjectWithHeader(wo, nil, common.ZONE_CTX, HeaderObject) + return &WorkObjectHeaderView{ + WorkObject: newWo, + } +} + +func (wo *WorkObject) ConvertToBlockView() *WorkObjectBlockView { + return &WorkObjectBlockView{ + WorkObject: wo, + } +} diff --git a/core/worker.go b/core/worker.go index c2a98f2ecf..70afff3a81 100644 --- a/core/worker.go +++ b/core/worker.go @@ -1214,7 +1214,7 @@ func (w *worker) prepareWork(genParams *generateParams, wo *types.WorkObject) (* if err != nil { return nil, err } - proposedWo := types.NewWorkObject(proposedWoHeader, proposedWoBody, nil, types.BlockObject) + proposedWo := types.NewWorkObject(proposedWoHeader, proposedWoBody, nil) env, err := w.makeEnv(parent, proposedWo, w.coinbase) if err != nil { w.logger.WithField("err", err).Error("Failed to create sealing context") @@ -1253,7 +1253,7 @@ func (w *worker) prepareWork(genParams *generateParams, wo *types.WorkObject) (* if err != nil { return nil, err } - proposedWo := types.NewWorkObject(proposedWoHeader, proposedWoBody, nil, types.BlockObject) + proposedWo := types.NewWorkObject(proposedWoHeader, proposedWoBody, nil) return &environment{wo: proposedWo}, nil } diff --git a/internal/quaiapi/backend.go b/internal/quaiapi/backend.go index b62c668d17..50b1d270f9 100644 --- a/internal/quaiapi/backend.go +++ b/internal/quaiapi/backend.go @@ -128,7 +128,7 @@ type Backend interface { Logger() *log.Logger // P2P apis - BroadcastBlock(block *types.WorkObject, location common.Location) error + BroadcastWorkObject(*types.WorkObject, common.Location) error } func GetAPIs(apiBackend Backend) []rpc.API { diff --git a/internal/quaiapi/quai_api.go b/internal/quaiapi/quai_api.go index fe5095c6a4..338116c779 100644 --- a/internal/quaiapi/quai_api.go +++ b/internal/quaiapi/quai_api.go @@ -706,7 +706,7 @@ func (s *PublicBlockChainQuaiAPI) ReceiveMinedHeader(ctx context.Context, raw js // Broadcast the block and announce chain insertion event if block.Header() != nil { - err := s.b.BroadcastBlock(block, s.b.NodeLocation()) + err := s.b.BroadcastWorkObject(block, s.b.NodeLocation()) if err != nil { s.b.Logger().WithField("err", err).Error("Error broadcasting block") } diff --git a/p2p/node/api.go b/p2p/node/api.go index da14a56611..df0560965c 100644 --- a/p2p/node/api.go +++ b/p2p/node/api.go @@ -12,6 +12,7 @@ import ( "github.com/dominant-strategies/go-quai/log" "github.com/dominant-strategies/go-quai/p2p" "github.com/dominant-strategies/go-quai/p2p/node/peerManager" + "github.com/dominant-strategies/go-quai/p2p/node/pubsubManager" quaiprotocol "github.com/dominant-strategies/go-quai/p2p/protocol" "github.com/dominant-strategies/go-quai/quai" "github.com/dominant-strategies/go-quai/trie" @@ -114,7 +115,7 @@ func (p *P2PNode) Stop() error { } } -func (p *P2PNode) requestFromPeers(location common.Location, data interface{}, datatype interface{}, resultChan chan interface{}) { +func (p *P2PNode) requestFromPeers(topic string, location common.Location, requestData interface{}, responseDataType interface{}, resultChan chan interface{}) { go func() { defer func() { if r := recover(); r != nil { @@ -125,10 +126,10 @@ func (p *P2PNode) requestFromPeers(location common.Location, data interface{}, d } }() defer close(resultChan) - peers := p.peerManager.GetPeers(location, data, peerManager.Best) + peers := p.peerManager.GetPeers(topic, peerManager.Best) log.Global.WithFields(log.Fields{ - "peers": peers, - "location": location, + "peers": peers, + "topic": topic, }).Debug("Requesting data from peers") var requestWg sync.WaitGroup @@ -144,14 +145,14 @@ func (p *P2PNode) requestFromPeers(location common.Location, data interface{}, d } }() defer requestWg.Done() - p.requestAndWait(peerID, location, data, datatype, resultChan) + p.requestAndWait(peerID, location, requestData, topic, requestData, resultChan) }(peerID) } requestWg.Wait() }() } -func (p *P2PNode) requestAndWait(peerID peer.ID, location common.Location, data interface{}, dataType interface{}, resultChan chan interface{}) { +func (p *P2PNode) requestAndWait(peerID peer.ID, location common.Location, reqData interface{}, topic string, dataType interface{}, resultChan chan interface{}) { defer func() { if r := recover(); r != nil { log.Global.WithFields(log.Fields{ @@ -162,17 +163,16 @@ func (p *P2PNode) requestAndWait(peerID peer.ID, location common.Location, data }() var recvd interface{} var err error - // Ask peer and wait for response - if recvd, err = p.requestFromPeer(peerID, location, data, dataType); err == nil { + if recvd, err = p.requestFromPeer(peerID, topic, location, reqData, dataType); err == nil { log.Global.WithFields(log.Fields{ - "data": data, + "reqData": reqData, "dataType": dataType, "peerId": peerID, "location": location.Name(), }).Trace("Received data from peer") // Mark this peer as behaving well - p.peerManager.MarkResponsivePeer(peerID, location) + p.peerManager.MarkResponsivePeer(peerID, topic) select { case resultChan <- recvd: // Data sent successfully @@ -187,17 +187,26 @@ func (p *P2PNode) requestAndWait(peerID peer.ID, location common.Location, data log.Global.WithFields(log.Fields{ "peerId": peerID, "location": location.Name(), - "data": data, + "reqData": reqData, "dataType": dataType, "err": err, }).Error("Error requesting the data from peer") // Mark this peer as not responding - p.peerManager.MarkUnresponsivePeer(peerID, location) + p.peerManager.MarkUnresponsivePeer(peerID, topic) } } // Request a data from the network for the specified slice func (p *P2PNode) Request(location common.Location, requestData interface{}, responseDataType interface{}) chan interface{} { + topic, err := pubsubManager.TopicName(p.pubsub.GetGenesis(), location, responseDataType) + if err != nil { + log.Global.WithFields(log.Fields{ + "location": location.Name(), + "err": err, + }).Error("Error getting topic name") + panic(err) + } + resultChan := make(chan interface{}, 10) // If it is a hash, first check to see if it is contained in the caches if hash, ok := requestData.(common.Hash); ok { @@ -208,7 +217,7 @@ func (p *P2PNode) Request(location common.Location, requestData interface{}, res } } - p.requestFromPeers(location, requestData, responseDataType, resultChan) + p.requestFromPeers(topic, location, requestData, responseDataType, resultChan) // TODO: optimize with waitgroups or a doneChan to only query if no peers responded // Right now this creates too many streams, so don't call this until we have a better solution // p.queryDHT(location, requestData, responseDataType, resultChan) @@ -216,22 +225,22 @@ func (p *P2PNode) Request(location common.Location, requestData interface{}, res return resultChan } -func (p *P2PNode) MarkLivelyPeer(peer p2p.PeerID, location common.Location) { +func (p *P2PNode) MarkLivelyPeer(peer p2p.PeerID, topic string) { log.Global.WithFields(log.Fields{ - "peer": peer, - "location": location, + "peer": peer, + "topic": topic, }).Debug("Recording well-behaving peer") - p.peerManager.MarkLivelyPeer(peer, location) + p.peerManager.MarkLivelyPeer(peer, topic) } -func (p *P2PNode) MarkLatentPeer(peer p2p.PeerID, location common.Location) { +func (p *P2PNode) MarkLatentPeer(peer p2p.PeerID, topic string) { log.Global.WithFields(log.Fields{ - "peer": peer, - "location": location, + "peer": peer, + "topic": topic, }).Debug("Recording misbehaving peer") - p.peerManager.MarkLatentPeer(peer, location) + p.peerManager.MarkLatentPeer(peer, topic) } func (p *P2PNode) ProtectPeer(peer p2p.PeerID) { @@ -292,7 +301,7 @@ func (p *P2PNode) GetTrieNode(hash common.Hash, location common.Location) *trie. return p.consensus.GetTrieNode(hash, location) } -func (p *P2PNode) handleBroadcast(sourcePeer peer.ID, data interface{}, nodeLocation common.Location) { +func (p *P2PNode) handleBroadcast(sourcePeer peer.ID, topic string, data interface{}, nodeLocation common.Location) { switch v := data.(type) { case types.WorkObject: p.cacheAdd(v.Hash(), &v, nodeLocation) @@ -306,6 +315,6 @@ func (p *P2PNode) handleBroadcast(sourcePeer peer.ID, data interface{}, nodeLoca // If we made it here, pass the data on to the consensus backend if p.consensus != nil { - p.consensus.OnNewBroadcast(sourcePeer, data, nodeLocation) + p.consensus.OnNewBroadcast(sourcePeer, topic, data, nodeLocation) } } diff --git a/p2p/node/p2p_services.go b/p2p/node/p2p_services.go index 3fe97d479c..d44464fe78 100644 --- a/p2p/node/p2p_services.go +++ b/p2p/node/p2p_services.go @@ -19,7 +19,7 @@ import ( ) // Opens a stream to the given peer and request some data for the given hash at the given location -func (p *P2PNode) requestFromPeer(peerID peer.ID, location common.Location, data interface{}, datatype interface{}) (interface{}, error) { +func (p *P2PNode) requestFromPeer(peerID peer.ID, topic string, location common.Location, reqData interface{}, datatype interface{}) (interface{}, error) { defer func() { if r := recover(); r != nil { log.Global.WithFields(log.Fields{ @@ -31,7 +31,7 @@ func (p *P2PNode) requestFromPeer(peerID peer.ID, location common.Location, data log.Global.WithFields(log.Fields{ "peerId": peerID, "location": location.Name(), - "data": data, + "data": reqData, "datatype": datatype, }).Trace("Requesting the data from peer") stream, err := p.NewStream(peerID) @@ -50,7 +50,7 @@ func (p *P2PNode) requestFromPeer(peerID peer.ID, location common.Location, data defer p.requestManager.CloseRequest(id) // Create the corresponding data request - requestBytes, err := pb.EncodeQuaiRequest(id, location, data, datatype) + requestBytes, err := pb.EncodeQuaiRequest(id, location, reqData, datatype) if err != nil { return nil, err } @@ -74,7 +74,7 @@ func (p *P2PNode) requestFromPeer(peerID peer.ID, location common.Location, data log.Global.WithFields(log.Fields{ "peerId": peerID, }).Warn("Peer did not respond in time") - p.peerManager.MarkUnresponsivePeer(peerID, location) + p.peerManager.MarkUnresponsivePeer(peerID, topic) return nil, errors.New("peer did not respond in time") } @@ -86,7 +86,7 @@ func (p *P2PNode) requestFromPeer(peerID peer.ID, location common.Location, data switch datatype.(type) { case *types.WorkObject: if block, ok := recvdType.(*types.WorkObject); ok { - switch data := data.(type) { + switch data := reqData.(type) { case common.Hash: if block.Hash() == data { return block, nil @@ -102,11 +102,11 @@ func (p *P2PNode) requestFromPeer(peerID peer.ID, location common.Location, data } return nil, errors.New("block request invalid response") case *types.Header: - if header, ok := recvdType.(*types.Header); ok && header.Hash() == data.(common.Hash) { + if header, ok := recvdType.(*types.Header); ok && header.Hash() == reqData.(common.Hash) { return header, nil } case *types.Transaction: - if tx, ok := recvdType.(*types.Transaction); ok && tx.Hash() == data.(common.Hash) { + if tx, ok := recvdType.(*types.Transaction); ok && tx.Hash() == reqData.(common.Hash) { return tx, nil } case common.Hash: diff --git a/p2p/node/peerManager/peerManager.go b/p2p/node/peerManager/peerManager.go index 322a6b946a..ce5bda121d 100644 --- a/p2p/node/peerManager/peerManager.go +++ b/p2p/node/peerManager/peerManager.go @@ -89,17 +89,17 @@ type PeerManager interface { // Returns c_peerCount peers starting at the requested quality level of peers // If there are not enough peers at the requested quality, it will return lower quality peers // If there still aren't enough peers, it will query the DHT for more - GetPeers(location common.Location, data interface{}, quality PeerQuality) []p2p.PeerID + GetPeers(topic string, quality PeerQuality) []p2p.PeerID // Increases the peer's liveliness score - MarkLivelyPeer(p2p.PeerID, common.Location) + MarkLivelyPeer(peerID p2p.PeerID, topic string) // Decreases the peer's liveliness score - MarkLatentPeer(p2p.PeerID, common.Location) + MarkLatentPeer(peerID p2p.PeerID, topic string) // Increases the peer's liveliness score. Not exposed outside of NetworkingAPI - MarkResponsivePeer(p2p.PeerID, common.Location) + MarkResponsivePeer(peerID p2p.PeerID, topic string) // Decreases the peer's liveliness score. Not exposed outside of NetworkingAPI - MarkUnresponsivePeer(p2p.PeerID, common.Location) + MarkUnresponsivePeer(peerID p2p.PeerID, topic string) // Protects the peer's connection from being disconnected ProtectPeer(p2p.PeerID) @@ -317,15 +317,15 @@ func (pm *BasicPeerManager) getPeersHelper(peerDB *peerdb.PeerDB, numPeers int) return peerSubset } -func (pm *BasicPeerManager) GetPeers(location common.Location, data interface{}, quality PeerQuality) []p2p.PeerID { +func (pm *BasicPeerManager) GetPeers(topic string, quality PeerQuality) []p2p.PeerID { var peerList []p2p.PeerID switch quality { case Best: - peerList = pm.getBestPeersWithFallback(location) + peerList = pm.getBestPeersWithFallback(topic) case Responsive: - peerList = pm.getResponsivePeersWithFallback(location) + peerList = pm.getResponsivePeersWithFallback(topic) case LastResort: - peerList = pm.getLastResortPeers(location) + peerList = pm.getLastResortPeers(topic) default: panic("Invalid peer quality") } @@ -337,13 +337,12 @@ func (pm *BasicPeerManager) GetPeers(location common.Location, data interface{}, } // Query the DHT for more peers - return pm.queryDHT(location, data, peerList, C_peerCount-lenPeer) + return pm.queryDHT(topic, peerList, C_peerCount-lenPeer) } -func (pm *BasicPeerManager) queryDHT(location common.Location, data interface{}, peerList []p2p.PeerID, peerCount int) []p2p.PeerID { +func (pm *BasicPeerManager) queryDHT(topic string, peerList []p2p.PeerID, peerCount int) []p2p.PeerID { // create a Cid from the slice location - topicName, _ := pubsubManager.TopicName(pm.genesis, location, data) - shardCid := pubsubManager.TopicToCid(topicName) + shardCid := pubsubManager.TopicToCid(topic) // Internal list of peers from the dht dhtPeers := make([]p2p.PeerID, 0, peerCount) @@ -359,54 +358,51 @@ func (pm *BasicPeerManager) queryDHT(location common.Location, data interface{}, return append(peerList, dhtPeers...) } -func (pm *BasicPeerManager) getBestPeersWithFallback(location common.Location) []p2p.PeerID { - locName := location.Name() - if pm.peerDBs[locName] == nil { +func (pm *BasicPeerManager) getBestPeersWithFallback(topic string) []p2p.PeerID { + if pm.peerDBs[topic] == nil { // There have not been any peers added to this topic return nil } - bestPeersCount := pm.peerDBs[locName][Best].GetPeerCount() + bestPeersCount := pm.peerDBs[topic][Best].GetPeerCount() if bestPeersCount < C_peerCount { - bestPeerList := pm.getPeersHelper(pm.peerDBs[locName][Best], bestPeersCount) - bestPeerList = append(bestPeerList, pm.getResponsivePeersWithFallback(location)...) + bestPeerList := pm.getPeersHelper(pm.peerDBs[topic][Best], bestPeersCount) + bestPeerList = append(bestPeerList, pm.getResponsivePeersWithFallback(topic)...) return bestPeerList } - return pm.getPeersHelper(pm.peerDBs[locName][Best], C_peerCount) + return pm.getPeersHelper(pm.peerDBs[topic][Best], C_peerCount) } -func (pm *BasicPeerManager) getResponsivePeersWithFallback(location common.Location) []p2p.PeerID { - locName := location.Name() - - responsivePeersCount := pm.peerDBs[locName][Responsive].GetPeerCount() +func (pm *BasicPeerManager) getResponsivePeersWithFallback(topic string) []p2p.PeerID { + responsivePeersCount := pm.peerDBs[topic][Responsive].GetPeerCount() if responsivePeersCount < C_peerCount { - responsivePeerList := pm.getPeersHelper(pm.peerDBs[locName][Responsive], responsivePeersCount) - responsivePeerList = append(responsivePeerList, pm.getLastResortPeers(location)...) + responsivePeerList := pm.getPeersHelper(pm.peerDBs[topic][Responsive], responsivePeersCount) + responsivePeerList = append(responsivePeerList, pm.getLastResortPeers(topic)...) return responsivePeerList } - return pm.getPeersHelper(pm.peerDBs[locName][Responsive], C_peerCount) + return pm.getPeersHelper(pm.peerDBs[topic][Responsive], C_peerCount) } -func (pm *BasicPeerManager) getLastResortPeers(location common.Location) []p2p.PeerID { - return pm.getPeersHelper(pm.peerDBs[location.Name()][LastResort], C_peerCount) +func (pm *BasicPeerManager) getLastResortPeers(topic string) []p2p.PeerID { + return pm.getPeersHelper(pm.peerDBs[topic][LastResort], C_peerCount) } -func (pm *BasicPeerManager) MarkLivelyPeer(peer p2p.PeerID, location common.Location) { +func (pm *BasicPeerManager) MarkLivelyPeer(peer p2p.PeerID, topic string) { if peer == pm.selfID { return } pm.TagPeer(peer, "liveness_reports", 1) - pm.recategorizePeer(peer, location) + pm.recategorizePeer(peer, topic) } -func (pm *BasicPeerManager) MarkLatentPeer(peer p2p.PeerID, location common.Location) { +func (pm *BasicPeerManager) MarkLatentPeer(peer p2p.PeerID, topic string) { if peer == pm.selfID { return } pm.TagPeer(peer, "latency_reports", 1) - pm.recategorizePeer(peer, location) + pm.recategorizePeer(peer, topic) } func (pm *BasicPeerManager) calculatePeerLiveness(peer p2p.PeerID) float64 { @@ -420,14 +416,14 @@ func (pm *BasicPeerManager) calculatePeerLiveness(peer p2p.PeerID) float64 { return float64(liveness) / float64(latents) } -func (pm *BasicPeerManager) MarkResponsivePeer(peer p2p.PeerID, location common.Location) { +func (pm *BasicPeerManager) MarkResponsivePeer(peer p2p.PeerID, topic string) { pm.TagPeer(peer, "responses_served", 1) - pm.recategorizePeer(peer, location) + pm.recategorizePeer(peer, topic) } -func (pm *BasicPeerManager) MarkUnresponsivePeer(peer p2p.PeerID, location common.Location) { +func (pm *BasicPeerManager) MarkUnresponsivePeer(peer p2p.PeerID, topic string) { pm.TagPeer(peer, "responses_missed", 1) - pm.recategorizePeer(peer, location) + pm.recategorizePeer(peer, topic) } func (pm *BasicPeerManager) calculatePeerResponsiveness(peer p2p.PeerID) float64 { @@ -449,12 +445,12 @@ func (pm *BasicPeerManager) calculatePeerResponsiveness(peer p2p.PeerID) float64 // // 3. peers // - all other peers -func (pm *BasicPeerManager) recategorizePeer(peerID p2p.PeerID, location common.Location) error { +func (pm *BasicPeerManager) recategorizePeer(peerID p2p.PeerID, topic string) error { liveness := pm.calculatePeerLiveness(peerID) responsiveness := pm.calculatePeerResponsiveness(peerID) // remove peer from DB first - err := pm.removePeerFromTopic(peerID, location.Name()) + err := pm.removePeerFromTopic(peerID, topic) if err != nil { return err } @@ -472,24 +468,23 @@ func (pm *BasicPeerManager) recategorizePeer(peerID p2p.PeerID, location common. return errors.Wrap(err, "error marshaling peer info") } - locationName := location.Name() if liveness >= c_qualityThreshold && responsiveness >= c_qualityThreshold { // Best peers: high liveness and responsiveness - err := pm.peerDBs[locationName][Best].Put(pm.ctx, key, peerInfo) + err := pm.peerDBs[topic][Best].Put(pm.ctx, key, peerInfo) if err != nil { return errors.Wrap(err, "error putting peer in bestPeersDB") } } else if responsiveness >= c_qualityThreshold { // Responsive peers: high responsiveness, but low liveness - err := pm.peerDBs[locationName][Responsive].Put(pm.ctx, key, peerInfo) + err := pm.peerDBs[topic][Responsive].Put(pm.ctx, key, peerInfo) if err != nil { return errors.Wrap(err, "error putting peer in responsivePeersDB") } } else { // All other peers - err := pm.peerDBs[locationName][LastResort].Put(pm.ctx, key, peerInfo) + err := pm.peerDBs[topic][LastResort].Put(pm.ctx, key, peerInfo) if err != nil { return errors.Wrap(err, "error putting peer in allPeersDB") } diff --git a/p2p/node/pubsubManager/gossipsub.go b/p2p/node/pubsubManager/gossipsub.go index 0afe0b9f8b..9a841b82aa 100644 --- a/p2p/node/pubsubManager/gossipsub.go +++ b/p2p/node/pubsubManager/gossipsub.go @@ -33,7 +33,7 @@ type PubsubManager struct { genesis common.Hash // Callback function to handle received data - onReceived func(peer.ID, interface{}, common.Location) + onReceived func(peer.ID, string, interface{}, common.Location) } // creates a new gossipsub instance @@ -60,13 +60,17 @@ func NewGossipSubManager(ctx context.Context, h host.Host) (*PubsubManager, erro }, nil } +func (g *PubsubManager) GetGenesis() common.Hash { + return g.genesis +} + func (g *PubsubManager) SetQuaiBackend(consensus quai.ConsensusAPI) { g.UnsubscribeAll() // First unsubscribe from existing topics, if already registered g.consensus = consensus // Set new backend } -func (g *PubsubManager) SetReceiveHandler(receiveCb func(peer.ID, interface{}, common.Location)) { +func (g *PubsubManager) SetReceiveHandler(receiveCb func(peer.ID, string, interface{}, common.Location)) { g.onReceived = receiveCb } @@ -138,7 +142,7 @@ func (g *PubsubManager) Subscribe(location common.Location, datatype interface{} // handle the received data if g.onReceived != nil { - g.onReceived(msg.ReceivedFrom, data, location) + g.onReceived(msg.ReceivedFrom, *msg.Topic, data, location) } } }(location) diff --git a/p2p/node/pubsubManager/utils.go b/p2p/node/pubsubManager/utils.go index 35e9196de1..0576b7beda 100644 --- a/p2p/node/pubsubManager/utils.go +++ b/p2p/node/pubsubManager/utils.go @@ -24,7 +24,9 @@ const ( func TopicName(genesis common.Hash, location common.Location, data interface{}) (string, error) { baseTopic := strings.Join([]string{genesis.String(), location.Name()}, "/") switch data.(type) { - case *types.WorkObject: + case *types.WorkObjectHeaderView: + return strings.Join([]string{baseTopic, C_headerType}, "/"), nil + case *types.WorkObjectBlockView: return strings.Join([]string{baseTopic, C_workObjectType}, "/"), nil case common.Hash: return strings.Join([]string{baseTopic, C_hashType}, "/"), nil diff --git a/p2p/pb/proto_services.go b/p2p/pb/proto_services.go index e5ba0c76e7..1d69f1dc88 100644 --- a/p2p/pb/proto_services.go +++ b/p2p/pb/proto_services.go @@ -8,7 +8,6 @@ import ( "github.com/dominant-strategies/go-quai/common" "github.com/dominant-strategies/go-quai/core/types" - "github.com/dominant-strategies/go-quai/log" "github.com/dominant-strategies/go-quai/trie" ) @@ -22,19 +21,19 @@ func DecodeQuaiMessage(data []byte) (*QuaiMessage, error) { // EncodeRequestMessage creates a marshaled protobuf message for a Quai Request. // Returns the serialized protobuf message. -func EncodeQuaiRequest(id uint32, location common.Location, data interface{}, datatype interface{}) ([]byte, error) { +func EncodeQuaiRequest(id uint32, location common.Location, reqData interface{}, datatype interface{}) ([]byte, error) { reqMsg := QuaiRequestMessage{ Id: id, Location: location.ProtoEncode(), } - switch d := data.(type) { + switch d := reqData.(type) { case common.Hash: reqMsg.Data = &QuaiRequestMessage_Hash{Hash: d.ProtoEncode()} case *big.Int: reqMsg.Data = &QuaiRequestMessage_Number{Number: d.Bytes()} default: - return nil, errors.Errorf("unsupported request input data field type: %T", data) + return nil, errors.Errorf("unsupported request input data field type: %T", reqData) } switch datatype.(type) { @@ -190,29 +189,28 @@ func DecodeQuaiResponse(respMsg *QuaiResponseMessage) (uint32, interface{}, erro // Converts a custom go type to a proto type and marhsals it into a protobuf message func ConvertAndMarshal(data interface{}) ([]byte, error) { switch data := data.(type) { - case *types.WorkObject: - log.Global.Tracef("marshalling block: %+v", data) - protoBlock, err := data.ProtoEncode(types.BlockObject) - if err != nil { - return nil, err + case *types.WorkObjectHeaderView, *types.WorkObjectBlockView: + var protoBlock *types.ProtoWorkObject + var err error + switch data := data.(type) { + case *types.WorkObjectHeaderView: + protoBlock, err = data.ProtoEncode(types.HeaderObject) + case *types.WorkObjectBlockView: + protoBlock, err = data.ProtoEncode(types.BlockObject) + default: + return nil, errors.New("unsupported data type") } - return proto.Marshal(protoBlock) - case *types.Header: - log.Global.Tracef("marshalling header: %+v", data) - protoHeader, err := data.ProtoEncode() if err != nil { return nil, err } - return proto.Marshal(protoHeader) + return proto.Marshal(protoBlock) case *types.Transaction: - log.Global.Tracef("marshalling transaction: %+v", data) protoTransaction, err := data.ProtoEncode() if err != nil { return nil, err } return proto.Marshal(protoTransaction) case common.Hash: - log.Global.Tracef("marshalling hash: %+v", data) protoHash := data.ProtoEncode() return proto.Marshal(protoHash) case *types.Transactions: @@ -229,7 +227,7 @@ func ConvertAndMarshal(data interface{}) ([]byte, error) { // Unmarshals a protobuf message into a proto type and converts it to a custom go type func UnmarshalAndConvert(data []byte, sourceLocation common.Location, dataPtr *interface{}, datatype interface{}) error { switch datatype.(type) { - case *types.WorkObject: + case *types.WorkObjectHeaderView, *types.WorkObjectBlockView: protoWorkObject := &types.ProtoWorkObject{} err := proto.Unmarshal(data, protoWorkObject) if err != nil { diff --git a/quai/api_backend.go b/quai/api_backend.go index 263f21bf59..cb8662aed4 100644 --- a/quai/api_backend.go +++ b/quai/api_backend.go @@ -582,9 +582,19 @@ func (b *QuaiAPIBackend) SubscribeExpansionEvent(ch chan<- core.ExpansionEvent) return b.quai.core.SubscribeExpansionEvent(ch) } -// /////////////////////////// -// /////// P2P /////////////// -// /////////////////////////// -func (b *QuaiAPIBackend) BroadcastBlock(block *types.WorkObject, location common.Location) error { - return b.quai.p2p.Broadcast(location, block) +// //////////////////////////// +// //////// P2P /////////////// +// //////////////////////////// +func (b *QuaiAPIBackend) BroadcastWorkObject(wo *types.WorkObject, location common.Location) error { + err := b.quai.p2p.Broadcast(location, wo.ConvertToBlockView()) + if err != nil { + return err + } + + err = b.quai.p2p.Broadcast(location, wo.ConvertToHeaderView()) + if err != nil { + return err + } + + return nil } diff --git a/quai/interface.go b/quai/interface.go index 4f6335860d..4e77ec856e 100644 --- a/quai/interface.go +++ b/quai/interface.go @@ -23,7 +23,7 @@ type ConsensusAPI interface { // Handle new data propagated from the gossip network. Should return quickly. // Specify the peer which propagated the data to us, as well as the data itself. // Return true if this data should be relayed to peers. False if it should be ignored. - OnNewBroadcast(core.PeerID, interface{}, common.Location) bool + OnNewBroadcast(core.PeerID, string, interface{}, common.Location) bool // Creates the function that will be used to determine if a message should be propagated. ValidatorFunc() func(ctx context.Context, id peer.ID, msg *pubsub.Message) pubsub.ValidationResult @@ -55,6 +55,9 @@ type ConsensusAPI interface { // WriteGenesisBlock adds the genesis block to the database and also writes the block to the disk WriteGenesisBlock(*types.WorkObject, common.Location) + + // Returns if the location is processing state + ProcessingState(common.Location) bool } // The networking backend will implement the following interface to enable consensus to communicate with other nodes. @@ -82,9 +85,9 @@ type NetworkingAPI interface { // Methods to report a peer to the P2PClient as behaving maliciously // Should be called whenever a peer sends us data that is acceptably lively - MarkLivelyPeer(core.PeerID, common.Location) + MarkLivelyPeer(peerID core.PeerID, topic string) // Should be called whenever a peer sends us data that is stale or latent - MarkLatentPeer(core.PeerID, common.Location) + MarkLatentPeer(peerID core.PeerID, topic string) // Protects the peer's connection from being pruned ProtectPeer(core.PeerID) diff --git a/quai/p2p_backend.go b/quai/p2p_backend.go index 8770934ef3..f358c1730a 100644 --- a/quai/p2p_backend.go +++ b/quai/p2p_backend.go @@ -77,10 +77,9 @@ func (qbe *QuaiBackend) GetBackend(location common.Location) *quaiapi.Backend { } // Handle consensus data propagated to us from our peers -func (qbe *QuaiBackend) OnNewBroadcast(sourcePeer p2p.PeerID, data interface{}, nodeLocation common.Location) bool { - switch data.(type) { +func (qbe *QuaiBackend) OnNewBroadcast(sourcePeer p2p.PeerID, topic string, data interface{}, nodeLocation common.Location) bool { + switch data := data.(type) { case types.WorkObject: - block := data.(types.WorkObject) backend := *qbe.GetBackend(nodeLocation) if backend == nil { log.Global.Error("no backend found") @@ -89,7 +88,7 @@ func (qbe *QuaiBackend) OnNewBroadcast(sourcePeer p2p.PeerID, data interface{}, // TODO: Verify the Block before writing it // TODO: Determine if the block information was lively or stale and rate // the peer accordingly - backend.WriteBlock(&block) + backend.WriteBlock(&data) case types.Header: case types.Transactions: backend := *qbe.GetBackend(nodeLocation) @@ -97,14 +96,13 @@ func (qbe *QuaiBackend) OnNewBroadcast(sourcePeer p2p.PeerID, data interface{}, log.Global.Error("no backend found") return false } - txs := data.(types.Transactions) if backend.ProcessingState() { - backend.SendRemoteTxs(txs) + backend.SendRemoteTxs(data) } } // If it was a good broadcast, mark the peer as lively - qbe.p2pBackend.MarkLivelyPeer(sourcePeer, nodeLocation) + qbe.p2pBackend.MarkLivelyPeer(sourcePeer, topic) return true } @@ -124,15 +122,14 @@ func (qbe *QuaiBackend) ValidatorFunc() func(ctx context.Context, id p2p.PeerID, return func(ctx context.Context, id peer.ID, msg *pubsub.Message) pubsub.ValidationResult { var data interface{} data = msg.Message.GetData() - switch data.(type) { + switch data := data.(type) { case types.WorkObject: - block := data.(types.WorkObject) - backend := *qbe.GetBackend(block.Location()) + backend := *qbe.GetBackend(data.Location()) if backend == nil { log.Global.WithFields(log.Fields{ "peer": id, - "hash": block.Hash(), - "location": block.Location(), + "hash": data.Hash(), + "location": data.Location(), }).Error("no backend found for this location") return pubsub.ValidationReject }