From 6927f05fbddcaafc7ef1c9076a79cf7b88d9442a Mon Sep 17 00:00:00 2001 From: Jonathan Downing Date: Mon, 28 Aug 2023 13:41:36 -0500 Subject: [PATCH] Worker now asynchronously sorts pool and local txs are no longer prioritized --- core/core.go | 2 + core/headerchain.go | 6 +- core/slice.go | 2 +- core/types/transaction.go | 7 ++ core/worker.go | 134 ++++++++++++++++++++++++++++---------- 5 files changed, 115 insertions(+), 36 deletions(-) diff --git a/core/core.go b/core/core.go index c896f76542..1a9475912e 100644 --- a/core/core.go +++ b/core/core.go @@ -216,6 +216,8 @@ func (c *Core) serviceBlocks(hashNumberList []types.HashAndNumber) { } else { if !c.HasHeader(header.ParentHash(), header.NumberU64()-1) { c.sl.missingParentFeed.Send(header.ParentHash()) + } else { + log.Error("db has the header, but it's nil", "HasHeader", c.HasHeader(header.ParentHash(), header.NumberU64()-1), "GetHeader", c.GetHeader(header.ParentHash(), header.NumberU64()-1), "ParentHash", header.ParentHash().String(), "Number", header.NumberU64()) } } } else { diff --git a/core/headerchain.go b/core/headerchain.go index 7e5f60fca3..fdfe838e3f 100644 --- a/core/headerchain.go +++ b/core/headerchain.go @@ -37,6 +37,7 @@ type HeaderChain struct { bc *BodyDb engine consensus.Engine pool *TxPool + worker *worker chainHeadFeed event.Feed chainSideFeed event.Feed @@ -386,7 +387,10 @@ func (hc *HeaderChain) SetCurrentHeader(head *types.Header) error { hc.ReadInboundEtxsAndAppendBlock(hashStack[i]) rawdb.WriteCanonicalHash(hc.headerDb, hashStack[i].Hash(), hashStack[i].NumberU64()) } - + // Async reset the worker sorted pool cache as it is outdated + if common.NodeLocation.Context() == common.ZONE_CTX && hc.ProcessingState() { + go hc.worker.ReorgResetSortedPoolCache() + } return nil } diff --git a/core/slice.go b/core/slice.go index a17e4fbe27..97496d43d1 100644 --- a/core/slice.go +++ b/core/slice.go @@ -99,7 +99,7 @@ func NewSlice(db ethdb.Database, config *Config, txConfig *TxPoolConfig, txLooku sl.hc.pool = sl.txPool } sl.miner = New(sl.hc, sl.txPool, config, db, chainConfig, engine, isLocalBlock, sl.ProcessingState()) - + sl.hc.worker = sl.miner.worker sl.phCache, _ = lru.New(c_phCacheSize) // only set the subClients if the chain is not Zone diff --git a/core/types/transaction.go b/core/types/transaction.go index 21cda87f17..1804b32489 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -623,6 +623,13 @@ func (t *TransactionsByPriceAndNonce) Pop() { heap.Pop(&t.heads) } +func (t *TransactionsByPriceAndNonce) Len() int { + if t.heads == nil { + return 0 + } + return t.heads.Len() +} + // Message is a fully derived transaction and implements core.Message // // NOTE: In a future PR this will be removed. diff --git a/core/worker.go b/core/worker.go index e2b956935c..a4db8afd09 100644 --- a/core/worker.go +++ b/core/worker.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "math/big" + "strings" "sync" "sync/atomic" "time" @@ -43,6 +44,9 @@ const ( // pendingBlockBodyLimit is maximum number of pending block bodies to be kept in cache. pendingBlockBodyLimit = 320 + + // sortedPoolCacheInternval is the interval to update the sorted transaction pool. + sortedPoolCacheInterval = 3 * time.Second ) // environment is the worker's current environment and holds all @@ -164,12 +168,15 @@ type Config struct { // worker is the main object which takes care of submitting new work to consensus engine // and gathering the sealing result. type worker struct { - config *Config - chainConfig *params.ChainConfig - engine consensus.Engine - hc *HeaderChain - txPool *TxPool - + config *Config + chainConfig *params.ChainConfig + engine consensus.Engine + hc *HeaderChain + txPool *TxPool + sortedPoolCache *types.TransactionsByPriceAndNonce // sortedPoolCache is the sorted transaction pool + sortedPoolCacheNil uint // number of times the sorted pool cache was nil for stats + sortedPoolCacheFull uint // number of times the sorted pool cache was not nil for stats + sortCacheLock sync.RWMutex // Feeds pendingLogsFeed event.Feed pendingHeaderFeed event.Feed @@ -184,6 +191,7 @@ type worker struct { exitCh chan struct{} resubmitIntervalCh chan time.Duration resubmitAdjustCh chan *intervalAdjust + reorgCh chan bool interrupt chan struct{} asyncPhFeed event.Feed // asyncPhFeed sends an event after each state root update @@ -245,6 +253,7 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, db ethdb.Databas interrupt: make(chan struct{}), resubmitIntervalCh: make(chan time.Duration), resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize), + reorgCh: make(chan bool), } // Set the GasFloor of the worker to the minGasLimit worker.config.GasFloor = params.MinGasLimit @@ -252,10 +261,6 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, db ethdb.Databas phBodyCache, _ := lru.New(pendingBlockBodyLimit) worker.pendingBlockBody = phBodyCache - if headerchain.ProcessingState() { - worker.chainHeadSub = worker.hc.SubscribeChainHeadEvent(worker.chainHeadCh) - } - // Sanitize recommit interval if the user-specified one is too short. recommit := worker.config.Recommit if recommit < minRecommitInterval { @@ -264,10 +269,16 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, db ethdb.Databas } if processingState { + worker.chainHeadSub = worker.hc.SubscribeChainHeadEvent(worker.chainHeadCh) worker.wg.Add(1) go worker.asyncStateLoop() } + if processingState && common.NodeLocation.Context() == common.ZONE_CTX { + ticker := time.NewTicker(sortedPoolCacheInterval) + go worker.SortPool(ticker) + } + return worker } @@ -426,6 +437,56 @@ func (w *worker) asyncStateLoop() { } } +// SortPool asynchronously sorts the tx pool for the worker. +func (w *worker) SortPool(tick *time.Ticker) { + statsTicker := time.NewTicker(10 * time.Second) + for { + select { + case <-tick.C: + currentHead := w.hc.CurrentHeader() + expectedBaseFee := misc.CalcBaseFee(w.chainConfig, currentHead) // calc expected next basefee (based on currentHead as parent) + etxSet := rawdb.ReadEtxSet(w.hc.bc.db, currentHead.Hash(), currentHead.NumberU64()) + if etxSet == nil { + log.Error("Error reading etx set from db") + continue + } + etxSet.Update(types.Transactions{}, currentHead.NumberU64()) // Prune any expired ETXs + // Get transactions from pool + pending, err := w.txPool.TxPoolPending(true, etxSet) + if err != nil { + log.Error("Error getting pending transactions from pool", "err", err) + continue + } + if len(pending) > 0 { + txs := types.NewTransactionsByPriceAndNonce(w.current.signer, pending, expectedBaseFee, true) + w.sortCacheLock.Lock() + w.sortedPoolCache = txs + w.sortCacheLock.Unlock() + } + case <-w.reorgCh: + w.sortCacheLock.Lock() + w.sortedPoolCache = nil // reset the sortedPoolCache in the case of a reorg as it is outdated + tick = time.NewTicker(sortedPoolCacheInterval) // reset the timer to give some time for mempool to reorg + w.sortCacheLock.Unlock() + case <-statsTicker.C: + w.sortCacheLock.RLock() + if w.sortedPoolCache == nil { + w.sortCacheLock.RUnlock() + continue + } + log.Info(fmt.Sprintf("Worker pool cache has %d txs, was nil %d times and not nil %d times, or nil %s percent of the time", w.sortedPoolCache.Len(), w.sortedPoolCacheNil, w.sortedPoolCacheFull, fmt.Sprintf("%.2f", float64(w.sortedPoolCacheNil)/float64(w.sortedPoolCacheNil+w.sortedPoolCacheFull)*100))) + w.sortCacheLock.RUnlock() + case <-w.exitCh: + return + } + } +} + +// ReorgResetPoolCache resets the sortedPoolCache in the case of a reorg. +func (w *worker) ReorgResetSortedPoolCache() { + w.reorgCh <- true +} + // GeneratePendingBlock generates pending block given a commited block. func (w *worker) GeneratePendingHeader(block *types.Block, fill bool) (*types.Header, error) { nodeCtx := common.NodeLocation.Context() @@ -697,6 +758,11 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP log.Error("Skipping unsupported transaction type", "sender", from, "type", tx.Type()) txs.Pop() + case strings.Contains(err.Error(), "emits too many cross"): // This is ErrEtxLimitReached with more info + // Pop the unsupported transaction without shifting in the next from the account + log.Trace("Etx limit exceeded for current block", "sender", from, "err", err) + txs.Pop() + default: // Strange error, discard the transaction and get the next in line (note, the // nonce-too-high clause will prevent us from executing in vain). @@ -844,35 +910,35 @@ func (w *worker) prepareWork(genParams *generateParams, block *types.Block) (*en // into the given sealing block. The transaction selection and ordering strategy can // be customized with the plugin in the future. func (w *worker) fillTransactions(interrupt *int32, env *environment, block *types.Block) { - // Split the pending transactions into locals and remotes - // Fill the block with all available pending transactions. - etxSet := rawdb.ReadEtxSet(w.hc.bc.db, block.Hash(), block.NumberU64()) - if etxSet == nil { - return - } - etxSet.Update(types.Transactions{}, block.NumberU64()) // Prune any expired ETXs - pending, err := w.txPool.TxPoolPending(true, etxSet) - if err != nil { - return - } - localTxs, remoteTxs := make(map[common.AddressBytes]types.Transactions), pending - for _, account := range w.txPool.Locals() { - if txs := remoteTxs[account.Bytes20()]; len(txs) > 0 { - delete(remoteTxs, account.Bytes20()) - localTxs[account.Bytes20()] = txs - } + + w.sortCacheLock.Lock() + if w.sortedPoolCache == nil { + w.sortedPoolCacheNil++ + } else { + w.sortedPoolCacheFull++ } - if len(localTxs) > 0 { - txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee(), false) - if w.commitTransactions(env, txs, interrupt) { + if w.sortedPoolCache != nil && w.sortedPoolCache.Len() > 0 { + w.commitTransactions(env, w.sortedPoolCache, interrupt) + w.sortCacheLock.Unlock() + } else { + w.sortCacheLock.Unlock() + log.Info("Worker sortedPoolCache is empty, triggering failsafe") + // Split the pending transactions into locals and remotes + // Fill the block with all available pending transactions. + etxSet := rawdb.ReadEtxSet(w.hc.bc.db, block.Hash(), block.NumberU64()) + if etxSet == nil { return } - } - if len(remoteTxs) > 0 { - txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee(), false) - if w.commitTransactions(env, txs, interrupt) { + etxSet.Update(types.Transactions{}, block.NumberU64()) // Prune any expired ETXs + pending, err := w.txPool.TxPoolPending(true, etxSet) + if err != nil { return } + + if len(pending) > 0 { + txs := types.NewTransactionsByPriceAndNonce(env.signer, pending, env.header.BaseFee(), false) + w.commitTransactions(env, txs, interrupt) + } } }