diff --git a/cmd/utils/hierarchical_coordinator.go b/cmd/utils/hierarchical_coordinator.go index d267693c3b..b71c20a5e8 100644 --- a/cmd/utils/hierarchical_coordinator.go +++ b/cmd/utils/hierarchical_coordinator.go @@ -133,11 +133,14 @@ func (hc *HierarchicalCoordinator) startNode(logPath string, quaiBackend quai.Co stack, apiBackend := makeFullNode(hc.p2p, location, hc.slicesRunning, hc.currentExpansionNumber, genesisBlock, logger) quaiBackend.SetApiBackend(&apiBackend, location) + // Only subscribe to block and transaction data if processing state + if quaiBackend.ProcessingState(location) { + hc.p2p.Subscribe(location, &types.WorkObjectBlockView{}) + hc.p2p.Subscribe(location, &types.Transaction{}) + } + // Subscribe to the new topics after setting the api backend - hc.p2p.Subscribe(location, &types.WorkObjectBlockView{}) hc.p2p.Subscribe(location, &types.WorkObjectHeaderView{}) - hc.p2p.Subscribe(location, common.Hash{}) - hc.p2p.Subscribe(location, &types.Transaction{}) StartNode(stack) diff --git a/quai/interface.go b/quai/interface.go index 4f6335860d..9217061643 100644 --- a/quai/interface.go +++ b/quai/interface.go @@ -55,6 +55,9 @@ type ConsensusAPI interface { // WriteGenesisBlock adds the genesis block to the database and also writes the block to the disk WriteGenesisBlock(*types.WorkObject, common.Location) + + // Returns if the location is processing state + ProcessingState(common.Location) bool } // The networking backend will implement the following interface to enable consensus to communicate with other nodes. diff --git a/quai/p2p_backend.go b/quai/p2p_backend.go index a088481d1b..bc04428283 100644 --- a/quai/p2p_backend.go +++ b/quai/p2p_backend.go @@ -78,9 +78,8 @@ func (qbe *QuaiBackend) GetBackend(location common.Location) *quaiapi.Backend { // Handle consensus data propagated to us from our peers func (qbe *QuaiBackend) OnNewBroadcast(sourcePeer p2p.PeerID, data interface{}, nodeLocation common.Location) bool { - switch data.(type) { + switch data := data.(type) { case types.WorkObject: - block := data.(types.WorkObject) backend := *qbe.GetBackend(nodeLocation) if backend == nil { log.Global.Error("no backend found") @@ -89,10 +88,9 @@ func (qbe *QuaiBackend) OnNewBroadcast(sourcePeer p2p.PeerID, data interface{}, // TODO: Verify the Block before writing it // TODO: Determine if the block information was lively or stale and rate // the peer accordingly - backend.WriteBlock(&block) + backend.WriteBlock(&data) case types.Header: case types.Transaction: - tx := data.(types.Transaction) backend := *qbe.GetBackend(nodeLocation) if backend == nil { log.Global.Error("no backend found") @@ -100,7 +98,7 @@ func (qbe *QuaiBackend) OnNewBroadcast(sourcePeer p2p.PeerID, data interface{}, } // check if the backend is processing state before adding the tx if backend.ProcessingState() { - backend.SendRemoteTx(&tx) + backend.SendRemoteTx(&data) } // TODO: Handle the error here and mark the peers accordingly } @@ -233,3 +231,12 @@ func (qbe *QuaiBackend) LookupBlockHashByNumber(number *big.Int, location common return nil } } + +func (qbe *QuaiBackend) ProcessingState(location common.Location) bool { + backend := *qbe.GetBackend(location) + if backend == nil { + log.Global.Error("no backend found") + return false + } + return backend.ProcessingState() +} \ No newline at end of file