Skip to content

Commit

Permalink
Subscribe to blocks if processing state, add ProcessingState to Conse…
Browse files Browse the repository at this point in the history
…nsus api
  • Loading branch information
Djadih committed May 7, 2024
1 parent 7243dfc commit 333669a
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 8 deletions.
9 changes: 6 additions & 3 deletions cmd/utils/hierarchical_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,14 @@ func (hc *HierarchicalCoordinator) startNode(logPath string, quaiBackend quai.Co
stack, apiBackend := makeFullNode(hc.p2p, location, hc.slicesRunning, hc.currentExpansionNumber, genesisBlock, logger)
quaiBackend.SetApiBackend(&apiBackend, location)

// Only subscribe to block and transaction data if processing state
if quaiBackend.ProcessingState(location) {
hc.p2p.Subscribe(location, &types.WorkObjectBlockView{})
hc.p2p.Subscribe(location, &types.Transaction{})
}

// Subscribe to the new topics after setting the api backend
hc.p2p.Subscribe(location, &types.WorkObjectBlockView{})
hc.p2p.Subscribe(location, &types.WorkObjectHeaderView{})
hc.p2p.Subscribe(location, common.Hash{})
hc.p2p.Subscribe(location, &types.Transaction{})

StartNode(stack)

Expand Down
3 changes: 3 additions & 0 deletions quai/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ type ConsensusAPI interface {

// WriteGenesisBlock adds the genesis block to the database and also writes the block to the disk
WriteGenesisBlock(*types.WorkObject, common.Location)

// Returns if the location is processing state
ProcessingState(common.Location) bool
}

// The networking backend will implement the following interface to enable consensus to communicate with other nodes.
Expand Down
17 changes: 12 additions & 5 deletions quai/p2p_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,8 @@ func (qbe *QuaiBackend) GetBackend(location common.Location) *quaiapi.Backend {

// Handle consensus data propagated to us from our peers
func (qbe *QuaiBackend) OnNewBroadcast(sourcePeer p2p.PeerID, data interface{}, nodeLocation common.Location) bool {
switch data.(type) {
switch data := data.(type) {
case types.WorkObject:
block := data.(types.WorkObject)
backend := *qbe.GetBackend(nodeLocation)
if backend == nil {
log.Global.Error("no backend found")
Expand All @@ -89,18 +88,17 @@ func (qbe *QuaiBackend) OnNewBroadcast(sourcePeer p2p.PeerID, data interface{},
// TODO: Verify the Block before writing it
// TODO: Determine if the block information was lively or stale and rate
// the peer accordingly
backend.WriteBlock(&block)
backend.WriteBlock(&data)
case types.Header:
case types.Transaction:
tx := data.(types.Transaction)
backend := *qbe.GetBackend(nodeLocation)
if backend == nil {
log.Global.Error("no backend found")
return false
}
// check if the backend is processing state before adding the tx
if backend.ProcessingState() {
backend.SendRemoteTx(&tx)
backend.SendRemoteTx(&data)
}
// TODO: Handle the error here and mark the peers accordingly
}
Expand Down Expand Up @@ -233,3 +231,12 @@ func (qbe *QuaiBackend) LookupBlockHashByNumber(number *big.Int, location common
return nil
}
}

func (qbe *QuaiBackend) ProcessingState(location common.Location) bool {
backend := *qbe.GetBackend(location)
if backend == nil {
log.Global.Error("no backend found")
return false
}
return backend.ProcessingState()
}

0 comments on commit 333669a

Please sign in to comment.