Skip to content

Commit

Permalink
Revert "Append every block until we figure out block not found issue"
Browse files Browse the repository at this point in the history
This reverts commit 1dbb3b5.

reordered setCurrentHeader, generateLocalPending, and AppendBlock in append to eliminate race conditions

Fix to address API crashes when not processing all blocks

Get header from current block instead of calling directly

Also modify CurrentBlock() call with orCandidate()

SetCurrentHeader including block candidates, also removed reorg lock in append

bugfix: should not append the tip first while switching

bugfix: setting the current header even in the case of the coord updates

bugfix: root not there for region in updatePhFromDom

bugfix: Cannot reject block because of pruned ancestor on State on append
  • Loading branch information
kiltsonfire authored and gameofpointers committed Aug 23, 2023
1 parent 73018dd commit ab3121d
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 35 deletions.
8 changes: 0 additions & 8 deletions core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,6 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error {
if hash := types.DeriveSha(block.ExtTransactions(), trie.NewStackTrie(nil)); hash != header.EtxHash() {
return fmt.Errorf("external transaction root hash mismatch: have %x, want %x", hash, header.EtxHash())
}
if v.hc.ProcessingState() {
if !v.hc.bc.processor.HasBlockAndState(block.ParentHash(), block.NumberU64()-1) {
if !v.hc.bc.HasBlock(block.ParentHash(), block.NumberU64()-1) {
return consensus.ErrUnknownAncestor
}
return consensus.ErrPrunedAncestor
}
}
}
return nil
}
Expand Down
8 changes: 6 additions & 2 deletions core/bodydb.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,11 @@ func NewBodyDb(db ethdb.Database, engine consensus.Engine, hc *HeaderChain, chai
}

// Append
func (bc *BodyDb) Append(batch ethdb.Batch, block *types.Block, newInboundEtxs types.Transactions) ([]*types.Log, error) {
func (bc *BodyDb) Append(block *types.Block, newInboundEtxs types.Transactions) ([]*types.Log, error) {
bc.chainmu.Lock()
defer bc.chainmu.Unlock()

batch := bc.db.NewBatch()
stateApply := time.Now()
nodeCtx := common.NodeLocation.Context()
var logs []*types.Log
Expand All @@ -100,7 +101,10 @@ func (bc *BodyDb) Append(batch ethdb.Batch, block *types.Block, newInboundEtxs t
}
log.Info("Time taken to", "apply state:", common.PrettyDuration(time.Since(stateApply)))

rawdb.WriteBlock(bc.db, block)
rawdb.WriteBlock(batch, block)
if err = batch.Write(); err != nil {
return nil, err
}
return logs, nil
}

Expand Down
32 changes: 28 additions & 4 deletions core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,10 @@ func (hc *HeaderChain) ProcessingState() bool {
}

// Append
func (hc *HeaderChain) AppendBlock(batch ethdb.Batch, block *types.Block, newInboundEtxs types.Transactions) error {
func (hc *HeaderChain) AppendBlock(block *types.Block, newInboundEtxs types.Transactions) error {
blockappend := time.Now()
// Append block else revert header append
logs, err := hc.bc.Append(batch, block, newInboundEtxs)
logs, err := hc.bc.Append(block, newInboundEtxs)
if err != nil {
return err
}
Expand All @@ -328,7 +328,7 @@ func (hc *HeaderChain) AppendBlock(batch ethdb.Batch, block *types.Block, newInb

// SetCurrentHeader sets the in-memory head header marker of the canonical chan
// as the given header.
func (hc *HeaderChain) SetCurrentHeader(batch ethdb.Batch, head *types.Header) error {
func (hc *HeaderChain) SetCurrentHeader(head *types.Header) error {
hc.headermu.Lock()
defer hc.headermu.Unlock()

Expand All @@ -347,6 +347,7 @@ func (hc *HeaderChain) SetCurrentHeader(batch ethdb.Batch, head *types.Header) e

// If head is the normal extension of canonical head, we can return by just wiring the canonical hash.
if prevHeader.Hash() == head.ParentHash() {
hc.ReadInboundEtxsAndAppendBlock(head)
rawdb.WriteCanonicalHash(hc.headerDb, head.Hash(), head.NumberU64())
return nil
}
Expand Down Expand Up @@ -382,12 +383,31 @@ func (hc *HeaderChain) SetCurrentHeader(batch ethdb.Batch, head *types.Header) e

// Run through the hash stack to update canonicalHash and forward state processor
for i := len(hashStack) - 1; i >= 0; i-- {
hc.ReadInboundEtxsAndAppendBlock(hashStack[i])
rawdb.WriteCanonicalHash(hc.headerDb, hashStack[i].Hash(), hashStack[i].NumberU64())
}

return nil
}

func (hc *HeaderChain) ReadInboundEtxsAndAppendBlock(header *types.Header) error {
block := hc.GetBlockOrCandidate(header.Hash(), header.NumberU64())
if block == nil {
return errors.New("Could not find block during reorg")
}
_, order, err := hc.engine.CalcOrder(block.Header())
if err != nil {
return err
}
nodeCtx := common.NodeLocation.Context()
var inboundEtxs types.Transactions
if order < nodeCtx {
inboundEtxs = rawdb.ReadInboundEtxs(hc.headerDb, header.Hash())
}
hc.AppendBlock(block, inboundEtxs)
return nil
}

// findCommonAncestor
func (hc *HeaderChain) findCommonAncestor(header *types.Header) *types.Header {
for {
Expand Down Expand Up @@ -670,7 +690,7 @@ func (hc *HeaderChain) CurrentHeader() *types.Header {

// CurrentBlock returns the block for the current header.
func (hc *HeaderChain) CurrentBlock() *types.Block {
return hc.GetBlockByHash(hc.CurrentHeader().Hash())
return hc.GetBlockOrCandidateByHash(hc.CurrentHeader().Hash())
}

// SetGenesis sets a new genesis block header for the chain
Expand Down Expand Up @@ -790,6 +810,10 @@ func (hc *HeaderChain) GetBlockByHash(hash common.Hash) *types.Block {
return hc.GetBlock(hash, *number)
}

func (hc *HeaderChain) GetBlockOrCandidate(hash common.Hash, number uint64) *types.Block {
return hc.bc.GetBlockOrCandidate(hash, number)
}

// GetBlockOrCandidateByHash retrieves any block from the database by hash, caching it if found.
func (hc *HeaderChain) GetBlockOrCandidateByHash(hash common.Hash) *types.Block {
number := hc.GetBlockNumber(hash)
Expand Down
101 changes: 81 additions & 20 deletions core/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type Slice struct {

validator Validator // Block and state validator interface
phCacheMu sync.RWMutex
reorgMu sync.RWMutex

badHashesCache map[common.Hash]bool
}
Expand Down Expand Up @@ -191,7 +192,7 @@ func (sl *Slice) Append(header *types.Header, domPendingHeader *types.Header, do
var pendingHeaderWithTermini types.PendingHeader
if nodeCtx != common.ZONE_CTX {
// Upate the local pending header
pendingHeaderWithTermini, err = sl.generateSlicePendingHeader(block, newTermini, domPendingHeader, domOrigin, false)
pendingHeaderWithTermini, err = sl.generateSlicePendingHeader(block, newTermini, domPendingHeader, domOrigin, true, false)
if err != nil {
return nil, false, err
}
Expand Down Expand Up @@ -242,11 +243,7 @@ func (sl *Slice) Append(header *types.Header, domPendingHeader *types.Header, do

time7 := common.PrettyDuration(time.Since(start))

err = sl.hc.AppendBlock(batch, block, newInboundEtxs.FilterToLocation(common.NodeLocation))
if err != nil {
return nil, false, err
}

rawdb.WriteBlock(sl.sliceDb, block)
var time8, time9 common.PrettyDuration
bestPh, exist := sl.readPhCache(sl.bestPhKey)
if !exist {
Expand All @@ -272,11 +269,15 @@ func (sl *Slice) Append(header *types.Header, domPendingHeader *types.Header, do
rawdb.WriteInboundEtxs(batch, block.Hash(), newInboundEtxs.FilterToLocation(common.NodeLocation))
}

if subReorg {
sl.hc.SetCurrentHeader(block.Header())
}
// Upate the local pending header
pendingHeaderWithTermini, err = sl.generateSlicePendingHeader(block, newTermini, domPendingHeader, domOrigin, false)
pendingHeaderWithTermini, err = sl.generateSlicePendingHeader(block, newTermini, domPendingHeader, domOrigin, subReorg, false)
if err != nil {
return nil, false, err
}

time9 = common.PrettyDuration(time.Since(start))

}
Expand All @@ -300,7 +301,9 @@ func (sl *Slice) Append(header *types.Header, domPendingHeader *types.Header, do
}

if subReorg {
sl.hc.SetCurrentHeader(batch, block.Header())
if nodeCtx != common.ZONE_CTX {
sl.hc.SetCurrentHeader(block.Header())
}
sl.hc.chainHeadFeed.Send(ChainHeadEvent{Block: block})
}

Expand Down Expand Up @@ -383,7 +386,7 @@ func (sl *Slice) UpdateDom(oldTerminus common.Hash, newTerminus common.Hash, loc
block := sl.hc.GetBlockByHash(newTerminus)
sl.bestPhKey = newTerminus
if block != nil {
pendingHeaderWithTermini, err := sl.generateSlicePendingHeader(block, *newDomTermini, types.EmptyHeader(), false, false)
pendingHeaderWithTermini, err := sl.generateSlicePendingHeader(block, *newDomTermini, types.EmptyHeader(), false, true, false)
if err != nil {
return
}
Expand All @@ -406,7 +409,7 @@ func (sl *Slice) UpdateDom(oldTerminus common.Hash, newTerminus common.Hash, loc
block := sl.hc.GetBlockByHash(newTerminus)
sl.bestPhKey = newTerminus
if block != nil {
pendingHeaderWithTermini, err := sl.generateSlicePendingHeader(block, *newDomTermini, types.EmptyHeader(), false, false)
pendingHeaderWithTermini, err := sl.generateSlicePendingHeader(block, *newDomTermini, types.EmptyHeader(), false, true, false)
if err != nil {
return
}
Expand Down Expand Up @@ -480,11 +483,36 @@ func (sl *Slice) writePhCache(hash common.Hash, pendingHeader types.PendingHeade
}

// Generate a slice pending header
func (sl *Slice) generateSlicePendingHeader(block *types.Block, newTermini types.Termini, domPendingHeader *types.Header, domOrigin bool, fill bool) (types.PendingHeader, error) {
// Upate the local pending header
localPendingHeader, err := sl.miner.worker.GeneratePendingHeader(block, fill)
if err != nil {
return types.PendingHeader{}, err
func (sl *Slice) generateSlicePendingHeader(block *types.Block, newTermini types.Termini, domPendingHeader *types.Header, domOrigin bool, subReorg bool, fill bool) (types.PendingHeader, error) {
nodeCtx := common.NodeLocation.Context()
var localPendingHeader *types.Header
var err error
if subReorg {
// Upate the local pending header
localPendingHeader, err = sl.miner.worker.GeneratePendingHeader(block, fill)
if err != nil {
return types.PendingHeader{}, err
}
} else {
localPendingHeader = types.EmptyHeader()
localPendingHeader.SetParentHash(block.Hash(), nodeCtx)
localPendingHeader.SetNumber(big.NewInt(int64(block.NumberU64()) + 1))
// Compute and set manifest hash
manifest := types.BlockManifest{}
if nodeCtx == common.PRIME_CTX {
// Nothing to do for prime chain
manifest = types.BlockManifest{}
} else if sl.engine.IsDomCoincident(sl.hc, block.Header()) {
manifest = types.BlockManifest{block.Hash()}
} else {
parentManifest := rawdb.ReadManifest(sl.sliceDb, block.ParentHash())
manifest = append(parentManifest, block.Hash())
}
manifestHash := types.DeriveSha(manifest, trie.NewStackTrie(nil))
localPendingHeader.SetManifestHash(manifestHash)

// write the manifest into the disk
rawdb.WriteManifest(sl.sliceDb, block.Hash(), manifest)
}

// Combine subordinates pending header with local pending header
Expand Down Expand Up @@ -714,13 +742,46 @@ func (sl *Slice) updatePhCacheFromDom(pendingHeader types.PendingHeader, termini
combinedPendingHeader = sl.combinePendingHeader(pendingHeader.Header(), combinedPendingHeader, i, false)
}

sl.updatePhCache(types.NewPendingHeader(combinedPendingHeader, localPendingHeader.Termini()), false, nil, subReorg)

nodeCtx := common.NodeLocation.Context()
// Pick the head
if subReorg {
log.Info("Choosing phHeader pickPhHead:", "NumberArray:", localPendingHeader.Header().NumberArray(), "Number:", localPendingHeader.Header().Number(), "ParentHash:", localPendingHeader.Header().ParentHash(), "Terminus:", localPendingHeader.Termini().DomTerminus())
sl.bestPhKey = localPendingHeader.Termini().DomTerminus()
if (localPendingHeader.Header().Root() != types.EmptyRootHash && nodeCtx == common.ZONE_CTX) || nodeCtx == common.REGION_CTX {
fmt.Println("local root is not emtpty")
block := sl.hc.GetBlockOrCandidateByHash(localPendingHeader.Header().ParentHash())
if block != nil {
sl.hc.SetCurrentHeader(block.Header())
log.Info("Choosing phHeader pickPhHead:", "NumberArray:", localPendingHeader.Header().NumberArray(), "Number:", localPendingHeader.Header().Number(), "ParentHash:", localPendingHeader.Header().ParentHash(), "Terminus:", localPendingHeader.Termini().DomTerminus())
sl.bestPhKey = localPendingHeader.Termini().DomTerminus()
if block.Hash() != sl.hc.CurrentHeader().Hash() {
sl.hc.chainHeadFeed.Send(ChainHeadEvent{block})
}
} else {
log.Warn("unable to set the current header after the cord update", "Hash", localPendingHeader.Header().ParentHash())
}
} else {
fmt.Println("local root is emtpty")
block := sl.hc.GetBlockOrCandidateByHash(localPendingHeader.Header().ParentHash())
if block != nil {
sl.hc.SetCurrentHeader(block.Header())
newPendingHeader, err := sl.generateSlicePendingHeader(block, localPendingHeader.Termini(), combinedPendingHeader, true, true, false)
if err != nil {
log.Error("Error generating slice pending header", "err", err)
return err
}
combinedPendingHeader = types.CopyHeader(newPendingHeader.Header())
log.Info("Choosing phHeader pickPhHead:", "NumberArray:", combinedPendingHeader.NumberArray(), "ParentHash:", combinedPendingHeader.ParentHash(), "Terminus:", localPendingHeader.Termini().DomTerminus())
sl.bestPhKey = localPendingHeader.Termini().DomTerminus()
if block.Hash() != sl.hc.CurrentHeader().Hash() {
sl.hc.chainHeadFeed.Send(ChainHeadEvent{block})
}
} else {
log.Warn("unable to set the current header after the cord update", "Hash", localPendingHeader.Header().ParentHash())
}
}
}

sl.updatePhCache(types.NewPendingHeader(combinedPendingHeader, localPendingHeader.Termini()), false, nil, subReorg)

return nil
}
log.Warn("no pending header found for", "terminus", hash, "pendingHeaderNumber", pendingHeader.Header().NumberArray(), "Hash", pendingHeader.Header().ParentHash(), "Termini index", terminiIndex, "indices", indices)
Expand Down Expand Up @@ -786,7 +847,7 @@ func (sl *Slice) init(genesis *Genesis) error {

// Append each of the knot blocks
sl.bestPhKey = genesisHash
sl.hc.SetCurrentHeader(nil, genesisHeader)
sl.hc.SetCurrentHeader(genesisHeader)

// Create empty pending ETX entry for genesis block -- genesis may not emit ETXs
emptyPendingEtxs := types.Transactions{}
Expand Down
2 changes: 1 addition & 1 deletion eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (b *QuaiAPIBackend) BlockByNumber(ctx context.Context, number rpc.BlockNumb
}
// Otherwise resolve and return the block
if number == rpc.LatestBlockNumber {
return b.eth.core.CurrentBlock(), nil
number = rpc.BlockNumber(b.eth.core.CurrentHeader().NumberU64())
}
block := b.eth.core.GetBlockByNumber(uint64(number))
if block != nil {
Expand Down

0 comments on commit ab3121d

Please sign in to comment.