Skip to content

Commit

Permalink
Process State of a block only after getting picked to be mined
Browse files Browse the repository at this point in the history
  • Loading branch information
kiltsonfire authored and gameofpointers committed Aug 23, 2023
1 parent 9b3cc74 commit d65580d
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 48 deletions.
8 changes: 0 additions & 8 deletions core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,6 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error {
if hash := types.DeriveSha(block.ExtTransactions(), trie.NewStackTrie(nil)); hash != header.EtxHash() {
return fmt.Errorf("external transaction root hash mismatch: have %x, want %x", hash, header.EtxHash())
}
if v.hc.ProcessingState() {
if !v.hc.bc.processor.HasBlockAndState(block.ParentHash(), block.NumberU64()-1) {
if !v.hc.bc.HasBlock(block.ParentHash(), block.NumberU64()-1) {
return consensus.ErrUnknownAncestor
}
return consensus.ErrPrunedAncestor
}
}
}
return nil
}
Expand Down
8 changes: 6 additions & 2 deletions core/bodydb.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,11 @@ func NewBodyDb(db ethdb.Database, engine consensus.Engine, hc *HeaderChain, chai
}

// Append
func (bc *BodyDb) Append(batch ethdb.Batch, block *types.Block, newInboundEtxs types.Transactions) ([]*types.Log, error) {
func (bc *BodyDb) Append(block *types.Block, newInboundEtxs types.Transactions) ([]*types.Log, error) {
bc.chainmu.Lock()
defer bc.chainmu.Unlock()

batch := bc.db.NewBatch()
stateApply := time.Now()
nodeCtx := common.NodeLocation.Context()
var logs []*types.Log
Expand All @@ -100,7 +101,10 @@ func (bc *BodyDb) Append(batch ethdb.Batch, block *types.Block, newInboundEtxs t
}
log.Info("Time taken to", "apply state:", common.PrettyDuration(time.Since(stateApply)))

rawdb.WriteBlock(bc.db, block)
rawdb.WriteBlock(batch, block)
if err = batch.Write(); err != nil {
return nil, err
}
return logs, nil
}

Expand Down
33 changes: 29 additions & 4 deletions core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,10 @@ func (hc *HeaderChain) ProcessingState() bool {
}

// Append
func (hc *HeaderChain) AppendBlock(batch ethdb.Batch, block *types.Block, newInboundEtxs types.Transactions) error {
func (hc *HeaderChain) AppendBlock(block *types.Block, newInboundEtxs types.Transactions) error {
blockappend := time.Now()
// Append block else revert header append
logs, err := hc.bc.Append(batch, block, newInboundEtxs)
logs, err := hc.bc.Append(block, newInboundEtxs)
if err != nil {
return err
}
Expand All @@ -328,7 +328,7 @@ func (hc *HeaderChain) AppendBlock(batch ethdb.Batch, block *types.Block, newInb

// SetCurrentHeader sets the in-memory head header marker of the canonical chan
// as the given header.
func (hc *HeaderChain) SetCurrentHeader(batch ethdb.Batch, head *types.Header) error {
func (hc *HeaderChain) SetCurrentHeader(head *types.Header) error {
hc.headermu.Lock()
defer hc.headermu.Unlock()

Expand All @@ -347,6 +347,7 @@ func (hc *HeaderChain) SetCurrentHeader(batch ethdb.Batch, head *types.Header) e

// If head is the normal extension of canonical head, we can return by just wiring the canonical hash.
if prevHeader.Hash() == head.ParentHash() {
hc.ReadInboundEtxsAndAppendBlock(head)
rawdb.WriteCanonicalHash(hc.headerDb, head.Hash(), head.NumberU64())
return nil
}
Expand Down Expand Up @@ -382,12 +383,32 @@ func (hc *HeaderChain) SetCurrentHeader(batch ethdb.Batch, head *types.Header) e

// Run through the hash stack to update canonicalHash and forward state processor
for i := len(hashStack) - 1; i >= 0; i-- {
hc.ReadInboundEtxsAndAppendBlock(hashStack[i])
rawdb.WriteCanonicalHash(hc.headerDb, hashStack[i].Hash(), hashStack[i].NumberU64())
}

return nil
}

// ReadInboundEtxsAndAppendBlock reads the inbound etxs from database and appends the block
func (hc *HeaderChain) ReadInboundEtxsAndAppendBlock(header *types.Header) error {
block := hc.GetBlockOrCandidate(header.Hash(), header.NumberU64())
if block == nil {
return errors.New("Could not find block during reorg")
}
_, order, err := hc.engine.CalcOrder(block.Header())
if err != nil {
return err
}
nodeCtx := common.NodeLocation.Context()
var inboundEtxs types.Transactions
if order < nodeCtx {
inboundEtxs = rawdb.ReadInboundEtxs(hc.headerDb, header.Hash())
}
hc.AppendBlock(block, inboundEtxs)
return nil
}

// findCommonAncestor
func (hc *HeaderChain) findCommonAncestor(header *types.Header) *types.Header {
for {
Expand Down Expand Up @@ -670,7 +691,7 @@ func (hc *HeaderChain) CurrentHeader() *types.Header {

// CurrentBlock returns the block for the current header.
func (hc *HeaderChain) CurrentBlock() *types.Block {
return hc.GetBlockByHash(hc.CurrentHeader().Hash())
return hc.GetBlockOrCandidateByHash(hc.CurrentHeader().Hash())
}

// SetGenesis sets a new genesis block header for the chain
Expand Down Expand Up @@ -790,6 +811,10 @@ func (hc *HeaderChain) GetBlockByHash(hash common.Hash) *types.Block {
return hc.GetBlock(hash, *number)
}

func (hc *HeaderChain) GetBlockOrCandidate(hash common.Hash, number uint64) *types.Block {
return hc.bc.GetBlockOrCandidate(hash, number)
}

// GetBlockOrCandidateByHash retrieves any block from the database by hash, caching it if found.
func (hc *HeaderChain) GetBlockOrCandidateByHash(hash common.Hash) *types.Block {
number := hc.GetBlockNumber(hash)
Expand Down
87 changes: 67 additions & 20 deletions core/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type Slice struct {

validator Validator // Block and state validator interface
phCacheMu sync.RWMutex
reorgMu sync.RWMutex

badHashesCache map[common.Hash]bool
}
Expand Down Expand Up @@ -191,7 +192,7 @@ func (sl *Slice) Append(header *types.Header, domPendingHeader *types.Header, do
var pendingHeaderWithTermini types.PendingHeader
if nodeCtx != common.ZONE_CTX {
// Upate the local pending header
pendingHeaderWithTermini, err = sl.generateSlicePendingHeader(block, newTermini, domPendingHeader, domOrigin, false)
pendingHeaderWithTermini, err = sl.generateSlicePendingHeader(block, newTermini, domPendingHeader, domOrigin, true, false)
if err != nil {
return nil, false, err
}
Expand Down Expand Up @@ -242,11 +243,6 @@ func (sl *Slice) Append(header *types.Header, domPendingHeader *types.Header, do

time7 := common.PrettyDuration(time.Since(start))

err = sl.hc.AppendBlock(batch, block, newInboundEtxs.FilterToLocation(common.NodeLocation))
if err != nil {
return nil, false, err
}

var time8, time9 common.PrettyDuration
bestPh, exist := sl.readPhCache(sl.bestPhKey)
if !exist {
Expand All @@ -266,11 +262,15 @@ func (sl *Slice) Append(header *types.Header, domPendingHeader *types.Header, do
rawdb.WriteInboundEtxs(batch, block.Hash(), newInboundEtxs.FilterToLocation(common.NodeLocation))
}

if subReorg {
sl.hc.SetCurrentHeader(block.Header())
}
// Upate the local pending header
pendingHeaderWithTermini, err = sl.generateSlicePendingHeader(block, newTermini, domPendingHeader, domOrigin, false)
pendingHeaderWithTermini, err = sl.generateSlicePendingHeader(block, newTermini, domPendingHeader, domOrigin, subReorg, false)
if err != nil {
return nil, false, err
}

time9 = common.PrettyDuration(time.Since(start))

}
Expand All @@ -294,7 +294,9 @@ func (sl *Slice) Append(header *types.Header, domPendingHeader *types.Header, do
}

if subReorg {
sl.hc.SetCurrentHeader(batch, block.Header())
if nodeCtx != common.ZONE_CTX {
sl.hc.SetCurrentHeader(block.Header())
}
sl.hc.chainHeadFeed.Send(ChainHeadEvent{Block: block})
}

Expand Down Expand Up @@ -377,7 +379,7 @@ func (sl *Slice) UpdateDom(oldTerminus common.Hash, newTerminus common.Hash, loc
block := sl.hc.GetBlockByHash(newTerminus)
sl.bestPhKey = newTerminus
if block != nil {
pendingHeaderWithTermini, err := sl.generateSlicePendingHeader(block, *newDomTermini, types.EmptyHeader(), false, false)
pendingHeaderWithTermini, err := sl.generateSlicePendingHeader(block, *newDomTermini, types.EmptyHeader(), false, true, false)
if err != nil {
return
}
Expand All @@ -400,7 +402,7 @@ func (sl *Slice) UpdateDom(oldTerminus common.Hash, newTerminus common.Hash, loc
block := sl.hc.GetBlockByHash(newTerminus)
sl.bestPhKey = newTerminus
if block != nil {
pendingHeaderWithTermini, err := sl.generateSlicePendingHeader(block, *newDomTermini, types.EmptyHeader(), false, false)
pendingHeaderWithTermini, err := sl.generateSlicePendingHeader(block, *newDomTermini, types.EmptyHeader(), false, true, false)
if err != nil {
return
}
Expand Down Expand Up @@ -474,11 +476,25 @@ func (sl *Slice) writePhCache(hash common.Hash, pendingHeader types.PendingHeade
}

// Generate a slice pending header
func (sl *Slice) generateSlicePendingHeader(block *types.Block, newTermini types.Termini, domPendingHeader *types.Header, domOrigin bool, fill bool) (types.PendingHeader, error) {
// Upate the local pending header
localPendingHeader, err := sl.miner.worker.GeneratePendingHeader(block, fill)
if err != nil {
return types.PendingHeader{}, err
func (sl *Slice) generateSlicePendingHeader(block *types.Block, newTermini types.Termini, domPendingHeader *types.Header, domOrigin bool, subReorg bool, fill bool) (types.PendingHeader, error) {
nodeCtx := common.NodeLocation.Context()
var localPendingHeader *types.Header
var err error
if subReorg {
// Upate the local pending header
localPendingHeader, err = sl.miner.worker.GeneratePendingHeader(block, fill)
if err != nil {
return types.PendingHeader{}, err
}
} else {
// Just compute the necessary information for the pending Header
// i.e ParentHash field, Number and writing manifest to the disk
localPendingHeader = types.EmptyHeader()
localPendingHeader.SetParentHash(block.Hash(), nodeCtx)
localPendingHeader.SetNumber(big.NewInt(int64(block.NumberU64()) + 1))

manifestHash := sl.miner.worker.ComputeManifestHash(block.Header())
localPendingHeader.SetManifestHash(manifestHash)
}

// Combine subordinates pending header with local pending header
Expand Down Expand Up @@ -708,13 +724,44 @@ func (sl *Slice) updatePhCacheFromDom(pendingHeader types.PendingHeader, termini
combinedPendingHeader = sl.combinePendingHeader(pendingHeader.Header(), combinedPendingHeader, i, false)
}

sl.updatePhCache(types.NewPendingHeader(combinedPendingHeader, localPendingHeader.Termini()), false, nil, subReorg)

nodeCtx := common.NodeLocation.Context()
// Pick the head
if subReorg {
log.Info("Choosing phHeader pickPhHead:", "NumberArray:", localPendingHeader.Header().NumberArray(), "Number:", localPendingHeader.Header().Number(), "ParentHash:", localPendingHeader.Header().ParentHash(), "Terminus:", localPendingHeader.Termini().DomTerminus())
sl.bestPhKey = localPendingHeader.Termini().DomTerminus()
if (localPendingHeader.Header().Root() != types.EmptyRootHash && nodeCtx == common.ZONE_CTX) || nodeCtx == common.REGION_CTX {
block := sl.hc.GetBlockOrCandidateByHash(localPendingHeader.Header().ParentHash())
if block != nil {
sl.hc.SetCurrentHeader(block.Header())
log.Info("Choosing phHeader pickPhHead:", "NumberArray:", localPendingHeader.Header().NumberArray(), "Number:", localPendingHeader.Header().Number(), "ParentHash:", localPendingHeader.Header().ParentHash(), "Terminus:", localPendingHeader.Termini().DomTerminus())
sl.bestPhKey = localPendingHeader.Termini().DomTerminus()
if block.Hash() != sl.hc.CurrentHeader().Hash() {
sl.hc.chainHeadFeed.Send(ChainHeadEvent{block})
}
} else {
log.Warn("unable to set the current header after the cord update", "Hash", localPendingHeader.Header().ParentHash())
}
} else {
block := sl.hc.GetBlockOrCandidateByHash(localPendingHeader.Header().ParentHash())
if block != nil {
sl.hc.SetCurrentHeader(block.Header())
newPendingHeader, err := sl.generateSlicePendingHeader(block, localPendingHeader.Termini(), combinedPendingHeader, true, true, false)
if err != nil {
log.Error("Error generating slice pending header", "err", err)
return err
}
combinedPendingHeader = types.CopyHeader(newPendingHeader.Header())
log.Info("Choosing phHeader pickPhHead:", "NumberArray:", combinedPendingHeader.NumberArray(), "ParentHash:", combinedPendingHeader.ParentHash(), "Terminus:", localPendingHeader.Termini().DomTerminus())
sl.bestPhKey = localPendingHeader.Termini().DomTerminus()
if block.Hash() != sl.hc.CurrentHeader().Hash() {
sl.hc.chainHeadFeed.Send(ChainHeadEvent{block})
}
} else {
log.Warn("unable to set the current header after the cord update", "Hash", localPendingHeader.Header().ParentHash())
}
}
}

sl.updatePhCache(types.NewPendingHeader(combinedPendingHeader, localPendingHeader.Termini()), false, nil, subReorg)

return nil
}
log.Warn("no pending header found for", "terminus", hash, "pendingHeaderNumber", pendingHeader.Header().NumberArray(), "Hash", pendingHeader.Header().ParentHash(), "Termini index", terminiIndex, "indices", indices)
Expand Down Expand Up @@ -780,7 +827,7 @@ func (sl *Slice) init(genesis *Genesis) error {

// Append each of the knot blocks
sl.bestPhKey = genesisHash
sl.hc.SetCurrentHeader(nil, genesisHeader)
sl.hc.SetCurrentHeader(genesisHeader)

// Create empty pending ETX entry for genesis block -- genesis may not emit ETXs
emptyPendingEtxs := types.Transactions{}
Expand Down
34 changes: 21 additions & 13 deletions core/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,29 +888,37 @@ func (w *worker) adjustGasLimit(interrupt *int32, env *environment, parent *type
}
}

func (w *worker) FinalizeAssemble(chain consensus.ChainHeaderReader, header *types.Header, parent *types.Block, state *state.StateDB, txs []*types.Transaction, uncles []*types.Header, etxs []*types.Transaction, subManifest types.BlockManifest, receipts []*types.Receipt) (*types.Block, error) {
// ComputeManifestHash given a header computes the manifest hash for the header
// and stores it in the database
func (w *worker) ComputeManifestHash(header *types.Header) common.Hash {
nodeCtx := common.NodeLocation.Context()
block, err := w.engine.FinalizeAndAssemble(chain, header, state, txs, uncles, etxs, subManifest, receipts)
if err != nil {
return nil, err
}

// Compute and set manifest hash
manifest := types.BlockManifest{}
if nodeCtx == common.PRIME_CTX {
// Nothing to do for prime chain
manifest = types.BlockManifest{}
} else if w.engine.IsDomCoincident(w.hc, parent.Header()) {
manifest = types.BlockManifest{parent.Hash()}
} else if w.engine.IsDomCoincident(w.hc, header) {
manifest = types.BlockManifest{header.Hash()}
} else {
parentManifest := rawdb.ReadManifest(w.workerDb, parent.ParentHash())
manifest = append(parentManifest, parent.Hash())
parentManifest := rawdb.ReadManifest(w.workerDb, header.ParentHash())
manifest = append(parentManifest, header.Hash())
}
// write the manifest into the disk
rawdb.WriteManifest(w.workerDb, header.Hash(), manifest)
manifestHash := types.DeriveSha(manifest, trie.NewStackTrie(nil))
block.Header().SetManifestHash(manifestHash)

// write the manifest into the disk
rawdb.WriteManifest(w.workerDb, parent.Hash(), manifest)
return manifestHash
}

func (w *worker) FinalizeAssemble(chain consensus.ChainHeaderReader, header *types.Header, parent *types.Block, state *state.StateDB, txs []*types.Transaction, uncles []*types.Header, etxs []*types.Transaction, subManifest types.BlockManifest, receipts []*types.Receipt) (*types.Block, error) {
nodeCtx := common.NodeLocation.Context()
block, err := w.engine.FinalizeAndAssemble(chain, header, state, txs, uncles, etxs, subManifest, receipts)
if err != nil {
return nil, err
}

manifestHash := w.ComputeManifestHash(parent.Header())
block.Header().SetManifestHash(manifestHash)

if nodeCtx == common.ZONE_CTX {
// Compute and set etx rollup hash
Expand Down
2 changes: 1 addition & 1 deletion eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (b *QuaiAPIBackend) BlockByNumber(ctx context.Context, number rpc.BlockNumb
}
// Otherwise resolve and return the block
if number == rpc.LatestBlockNumber {
return b.eth.core.CurrentBlock(), nil
number = rpc.BlockNumber(b.eth.core.CurrentHeader().NumberU64())
}
block := b.eth.core.GetBlockByNumber(uint64(number))
if block != nil {
Expand Down

0 comments on commit d65580d

Please sign in to comment.