Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert the Etx sorting work #1063

Merged
merged 2 commits into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,6 @@ func (c *Core) serviceBlocks(hashNumberList []types.HashAndNumber) {
} else {
if !c.HasHeader(header.ParentHash(), header.NumberU64()-1) {
c.sl.missingParentFeed.Send(header.ParentHash())
} else {
log.Error("db has the header, but it's nil", "HasHeader", c.HasHeader(header.ParentHash(), header.NumberU64()-1), "GetHeader", c.GetHeader(header.ParentHash(), header.NumberU64()-1), "ParentHash", header.ParentHash().String(), "Number", header.NumberU64())
}
}
} else {
Expand Down
6 changes: 1 addition & 5 deletions core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ type HeaderChain struct {
bc *BodyDb
engine consensus.Engine
pool *TxPool
worker *worker

chainHeadFeed event.Feed
chainSideFeed event.Feed
Expand Down Expand Up @@ -393,10 +392,7 @@ func (hc *HeaderChain) SetCurrentHeader(head *types.Header) error {
}
rawdb.WriteCanonicalHash(hc.headerDb, hashStack[i].Hash(), hashStack[i].NumberU64())
}
// Async reset the worker sorted pool cache as it is outdated
if common.NodeLocation.Context() == common.ZONE_CTX && hc.ProcessingState() {
go hc.worker.ReorgResetSortedPoolCache()
}

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion core/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func NewSlice(db ethdb.Database, config *Config, txConfig *TxPoolConfig, txLooku
sl.hc.pool = sl.txPool
}
sl.miner = New(sl.hc, sl.txPool, config, db, chainConfig, engine, isLocalBlock, sl.ProcessingState())
sl.hc.worker = sl.miner.worker

sl.phCache, _ = lru.New(c_phCacheSize)

// only set the subClients if the chain is not Zone
Expand Down
7 changes: 0 additions & 7 deletions core/types/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,13 +623,6 @@ func (t *TransactionsByPriceAndNonce) Pop() {
heap.Pop(&t.heads)
}

func (t *TransactionsByPriceAndNonce) Len() int {
if t.heads == nil {
return 0
}
return t.heads.Len()
}

// Message is a fully derived transaction and implements core.Message
//
// NOTE: In a future PR this will be removed.
Expand Down
134 changes: 34 additions & 100 deletions core/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"errors"
"fmt"
"math/big"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -44,9 +43,6 @@ const (

// pendingBlockBodyLimit is maximum number of pending block bodies to be kept in cache.
pendingBlockBodyLimit = 320

// sortedPoolCacheInternval is the interval to update the sorted transaction pool.
sortedPoolCacheInterval = 3 * time.Second
)

// environment is the worker's current environment and holds all
Expand Down Expand Up @@ -168,15 +164,12 @@ type Config struct {
// worker is the main object which takes care of submitting new work to consensus engine
// and gathering the sealing result.
type worker struct {
config *Config
chainConfig *params.ChainConfig
engine consensus.Engine
hc *HeaderChain
txPool *TxPool
sortedPoolCache *types.TransactionsByPriceAndNonce // sortedPoolCache is the sorted transaction pool
sortedPoolCacheNil uint // number of times the sorted pool cache was nil for stats
sortedPoolCacheFull uint // number of times the sorted pool cache was not nil for stats
sortCacheLock sync.RWMutex
config *Config
chainConfig *params.ChainConfig
engine consensus.Engine
hc *HeaderChain
txPool *TxPool

// Feeds
pendingLogsFeed event.Feed
pendingHeaderFeed event.Feed
Expand All @@ -191,7 +184,6 @@ type worker struct {
exitCh chan struct{}
resubmitIntervalCh chan time.Duration
resubmitAdjustCh chan *intervalAdjust
reorgCh chan bool

interrupt chan struct{}
asyncPhFeed event.Feed // asyncPhFeed sends an event after each state root update
Expand Down Expand Up @@ -253,14 +245,17 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, db ethdb.Databas
interrupt: make(chan struct{}),
resubmitIntervalCh: make(chan time.Duration),
resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize),
reorgCh: make(chan bool),
}
// Set the GasFloor of the worker to the minGasLimit
worker.config.GasFloor = params.MinGasLimit

phBodyCache, _ := lru.New(pendingBlockBodyLimit)
worker.pendingBlockBody = phBodyCache

if headerchain.ProcessingState() {
worker.chainHeadSub = worker.hc.SubscribeChainHeadEvent(worker.chainHeadCh)
}

// Sanitize recommit interval if the user-specified one is too short.
recommit := worker.config.Recommit
if recommit < minRecommitInterval {
Expand All @@ -269,16 +264,10 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, db ethdb.Databas
}

if processingState {
worker.chainHeadSub = worker.hc.SubscribeChainHeadEvent(worker.chainHeadCh)
worker.wg.Add(1)
go worker.asyncStateLoop()
}

if processingState && common.NodeLocation.Context() == common.ZONE_CTX {
ticker := time.NewTicker(sortedPoolCacheInterval)
go worker.SortPool(ticker)
}

return worker
}

Expand Down Expand Up @@ -437,56 +426,6 @@ func (w *worker) asyncStateLoop() {
}
}

// SortPool asynchronously sorts the tx pool for the worker.
func (w *worker) SortPool(tick *time.Ticker) {
statsTicker := time.NewTicker(10 * time.Second)
for {
select {
case <-tick.C:
currentHead := w.hc.CurrentHeader()
expectedBaseFee := misc.CalcBaseFee(w.chainConfig, currentHead) // calc expected next basefee (based on currentHead as parent)
etxSet := rawdb.ReadEtxSet(w.hc.bc.db, currentHead.Hash(), currentHead.NumberU64())
if etxSet == nil {
log.Error("Error reading etx set from db")
continue
}
etxSet.Update(types.Transactions{}, currentHead.NumberU64()+1) // Prune any expired ETXs
// Get transactions from pool
pending, err := w.txPool.TxPoolPending(true, etxSet)
if err != nil {
log.Error("Error getting pending transactions from pool", "err", err)
continue
}
if len(pending) > 0 {
txs := types.NewTransactionsByPriceAndNonce(w.current.signer, pending, expectedBaseFee, true)
w.sortCacheLock.Lock()
w.sortedPoolCache = txs
w.sortCacheLock.Unlock()
}
case <-w.reorgCh:
w.sortCacheLock.Lock()
w.sortedPoolCache = nil // reset the sortedPoolCache in the case of a reorg as it is outdated
tick = time.NewTicker(sortedPoolCacheInterval) // reset the timer to give some time for mempool to reorg
w.sortCacheLock.Unlock()
case <-statsTicker.C:
w.sortCacheLock.RLock()
if w.sortedPoolCache == nil {
w.sortCacheLock.RUnlock()
continue
}
log.Info(fmt.Sprintf("Worker pool cache has %d txs, was nil %d times and not nil %d times, or nil %s percent of the time", w.sortedPoolCache.Len(), w.sortedPoolCacheNil, w.sortedPoolCacheFull, fmt.Sprintf("%.2f", float64(w.sortedPoolCacheNil)/float64(w.sortedPoolCacheNil+w.sortedPoolCacheFull)*100)))
w.sortCacheLock.RUnlock()
case <-w.exitCh:
return
}
}
}

// ReorgResetPoolCache resets the sortedPoolCache in the case of a reorg.
func (w *worker) ReorgResetSortedPoolCache() {
w.reorgCh <- true
}

// GeneratePendingBlock generates pending block given a commited block.
func (w *worker) GeneratePendingHeader(block *types.Block, fill bool) (*types.Header, error) {
nodeCtx := common.NodeLocation.Context()
Expand Down Expand Up @@ -758,11 +697,6 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
log.Error("Skipping unsupported transaction type", "sender", from, "type", tx.Type())
txs.Pop()

case strings.Contains(err.Error(), "emits too many cross"): // This is ErrEtxLimitReached with more info
// Pop the unsupported transaction without shifting in the next from the account
log.Trace("Etx limit exceeded for current block", "sender", from, "err", err)
txs.Pop()

default:
// Strange error, discard the transaction and get the next in line (note, the
// nonce-too-high clause will prevent us from executing in vain).
Expand Down Expand Up @@ -910,34 +844,34 @@ func (w *worker) prepareWork(genParams *generateParams, block *types.Block) (*en
// into the given sealing block. The transaction selection and ordering strategy can
// be customized with the plugin in the future.
func (w *worker) fillTransactions(interrupt *int32, env *environment, block *types.Block) {

w.sortCacheLock.Lock()
if w.sortedPoolCache == nil {
w.sortedPoolCacheNil++
} else {
w.sortedPoolCacheFull++
// Split the pending transactions into locals and remotes
// Fill the block with all available pending transactions.
etxSet := rawdb.ReadEtxSet(w.hc.bc.db, block.Hash(), block.NumberU64())
if etxSet == nil {
return
}
if w.sortedPoolCache != nil && w.sortedPoolCache.Len() > 0 {
w.commitTransactions(env, w.sortedPoolCache, interrupt)
w.sortCacheLock.Unlock()
} else {
w.sortCacheLock.Unlock()
log.Info("Worker sortedPoolCache is empty, triggering failsafe")
// Split the pending transactions into locals and remotes
// Fill the block with all available pending transactions.
etxSet := rawdb.ReadEtxSet(w.hc.bc.db, block.Hash(), block.NumberU64())
if etxSet == nil {
return
etxSet.Update(types.Transactions{}, block.NumberU64()+1) // Prune any expired ETXs
pending, err := w.txPool.TxPoolPending(true, etxSet)
if err != nil {
return
}
localTxs, remoteTxs := make(map[common.AddressBytes]types.Transactions), pending
for _, account := range w.txPool.Locals() {
if txs := remoteTxs[account.Bytes20()]; len(txs) > 0 {
delete(remoteTxs, account.Bytes20())
localTxs[account.Bytes20()] = txs
}
etxSet.Update(types.Transactions{}, block.NumberU64()+1) // Prune any expired ETXs
pending, err := w.txPool.TxPoolPending(true, etxSet)
if err != nil {
}
if len(localTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee(), false)
if w.commitTransactions(env, txs, interrupt) {
return
}

if len(pending) > 0 {
txs := types.NewTransactionsByPriceAndNonce(env.signer, pending, env.header.BaseFee(), false)
w.commitTransactions(env, txs, interrupt)
}
if len(remoteTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee(), false)
if w.commitTransactions(env, txs, interrupt) {
return
}
}
}
Expand Down