diff --git a/core/core.go b/core/core.go index 1a9475912e..c896f76542 100644 --- a/core/core.go +++ b/core/core.go @@ -216,8 +216,6 @@ func (c *Core) serviceBlocks(hashNumberList []types.HashAndNumber) { } else { if !c.HasHeader(header.ParentHash(), header.NumberU64()-1) { c.sl.missingParentFeed.Send(header.ParentHash()) - } else { - log.Error("db has the header, but it's nil", "HasHeader", c.HasHeader(header.ParentHash(), header.NumberU64()-1), "GetHeader", c.GetHeader(header.ParentHash(), header.NumberU64()-1), "ParentHash", header.ParentHash().String(), "Number", header.NumberU64()) } } } else { diff --git a/core/headerchain.go b/core/headerchain.go index 90e623d1b6..a9a57c72b1 100644 --- a/core/headerchain.go +++ b/core/headerchain.go @@ -37,7 +37,6 @@ type HeaderChain struct { bc *BodyDb engine consensus.Engine pool *TxPool - worker *worker chainHeadFeed event.Feed chainSideFeed event.Feed @@ -393,10 +392,7 @@ func (hc *HeaderChain) SetCurrentHeader(head *types.Header) error { } rawdb.WriteCanonicalHash(hc.headerDb, hashStack[i].Hash(), hashStack[i].NumberU64()) } - // Async reset the worker sorted pool cache as it is outdated - if common.NodeLocation.Context() == common.ZONE_CTX && hc.ProcessingState() { - go hc.worker.ReorgResetSortedPoolCache() - } + return nil } diff --git a/core/slice.go b/core/slice.go index cd27ef071c..da50d0456b 100644 --- a/core/slice.go +++ b/core/slice.go @@ -99,7 +99,7 @@ func NewSlice(db ethdb.Database, config *Config, txConfig *TxPoolConfig, txLooku sl.hc.pool = sl.txPool } sl.miner = New(sl.hc, sl.txPool, config, db, chainConfig, engine, isLocalBlock, sl.ProcessingState()) - sl.hc.worker = sl.miner.worker + sl.phCache, _ = lru.New(c_phCacheSize) // only set the subClients if the chain is not Zone diff --git a/core/types/transaction.go b/core/types/transaction.go index 1804b32489..21cda87f17 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -623,13 +623,6 @@ func (t *TransactionsByPriceAndNonce) Pop() { heap.Pop(&t.heads) } -func (t *TransactionsByPriceAndNonce) Len() int { - if t.heads == nil { - return 0 - } - return t.heads.Len() -} - // Message is a fully derived transaction and implements core.Message // // NOTE: In a future PR this will be removed. diff --git a/core/worker.go b/core/worker.go index be7e1a30ed..adf4a2540b 100644 --- a/core/worker.go +++ b/core/worker.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" "math/big" - "strings" "sync" "sync/atomic" "time" @@ -44,9 +43,6 @@ const ( // pendingBlockBodyLimit is maximum number of pending block bodies to be kept in cache. pendingBlockBodyLimit = 320 - - // sortedPoolCacheInternval is the interval to update the sorted transaction pool. - sortedPoolCacheInterval = 3 * time.Second ) // environment is the worker's current environment and holds all @@ -168,15 +164,12 @@ type Config struct { // worker is the main object which takes care of submitting new work to consensus engine // and gathering the sealing result. type worker struct { - config *Config - chainConfig *params.ChainConfig - engine consensus.Engine - hc *HeaderChain - txPool *TxPool - sortedPoolCache *types.TransactionsByPriceAndNonce // sortedPoolCache is the sorted transaction pool - sortedPoolCacheNil uint // number of times the sorted pool cache was nil for stats - sortedPoolCacheFull uint // number of times the sorted pool cache was not nil for stats - sortCacheLock sync.RWMutex + config *Config + chainConfig *params.ChainConfig + engine consensus.Engine + hc *HeaderChain + txPool *TxPool + // Feeds pendingLogsFeed event.Feed pendingHeaderFeed event.Feed @@ -191,7 +184,6 @@ type worker struct { exitCh chan struct{} resubmitIntervalCh chan time.Duration resubmitAdjustCh chan *intervalAdjust - reorgCh chan bool interrupt chan struct{} asyncPhFeed event.Feed // asyncPhFeed sends an event after each state root update @@ -253,7 +245,6 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, db ethdb.Databas interrupt: make(chan struct{}), resubmitIntervalCh: make(chan time.Duration), resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize), - reorgCh: make(chan bool), } // Set the GasFloor of the worker to the minGasLimit worker.config.GasFloor = params.MinGasLimit @@ -261,6 +252,10 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, db ethdb.Databas phBodyCache, _ := lru.New(pendingBlockBodyLimit) worker.pendingBlockBody = phBodyCache + if headerchain.ProcessingState() { + worker.chainHeadSub = worker.hc.SubscribeChainHeadEvent(worker.chainHeadCh) + } + // Sanitize recommit interval if the user-specified one is too short. recommit := worker.config.Recommit if recommit < minRecommitInterval { @@ -269,16 +264,10 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, db ethdb.Databas } if processingState { - worker.chainHeadSub = worker.hc.SubscribeChainHeadEvent(worker.chainHeadCh) worker.wg.Add(1) go worker.asyncStateLoop() } - if processingState && common.NodeLocation.Context() == common.ZONE_CTX { - ticker := time.NewTicker(sortedPoolCacheInterval) - go worker.SortPool(ticker) - } - return worker } @@ -437,56 +426,6 @@ func (w *worker) asyncStateLoop() { } } -// SortPool asynchronously sorts the tx pool for the worker. -func (w *worker) SortPool(tick *time.Ticker) { - statsTicker := time.NewTicker(10 * time.Second) - for { - select { - case <-tick.C: - currentHead := w.hc.CurrentHeader() - expectedBaseFee := misc.CalcBaseFee(w.chainConfig, currentHead) // calc expected next basefee (based on currentHead as parent) - etxSet := rawdb.ReadEtxSet(w.hc.bc.db, currentHead.Hash(), currentHead.NumberU64()) - if etxSet == nil { - log.Error("Error reading etx set from db") - continue - } - etxSet.Update(types.Transactions{}, currentHead.NumberU64()+1) // Prune any expired ETXs - // Get transactions from pool - pending, err := w.txPool.TxPoolPending(true, etxSet) - if err != nil { - log.Error("Error getting pending transactions from pool", "err", err) - continue - } - if len(pending) > 0 { - txs := types.NewTransactionsByPriceAndNonce(w.current.signer, pending, expectedBaseFee, true) - w.sortCacheLock.Lock() - w.sortedPoolCache = txs - w.sortCacheLock.Unlock() - } - case <-w.reorgCh: - w.sortCacheLock.Lock() - w.sortedPoolCache = nil // reset the sortedPoolCache in the case of a reorg as it is outdated - tick = time.NewTicker(sortedPoolCacheInterval) // reset the timer to give some time for mempool to reorg - w.sortCacheLock.Unlock() - case <-statsTicker.C: - w.sortCacheLock.RLock() - if w.sortedPoolCache == nil { - w.sortCacheLock.RUnlock() - continue - } - log.Info(fmt.Sprintf("Worker pool cache has %d txs, was nil %d times and not nil %d times, or nil %s percent of the time", w.sortedPoolCache.Len(), w.sortedPoolCacheNil, w.sortedPoolCacheFull, fmt.Sprintf("%.2f", float64(w.sortedPoolCacheNil)/float64(w.sortedPoolCacheNil+w.sortedPoolCacheFull)*100))) - w.sortCacheLock.RUnlock() - case <-w.exitCh: - return - } - } -} - -// ReorgResetPoolCache resets the sortedPoolCache in the case of a reorg. -func (w *worker) ReorgResetSortedPoolCache() { - w.reorgCh <- true -} - // GeneratePendingBlock generates pending block given a commited block. func (w *worker) GeneratePendingHeader(block *types.Block, fill bool) (*types.Header, error) { nodeCtx := common.NodeLocation.Context() @@ -758,11 +697,6 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP log.Error("Skipping unsupported transaction type", "sender", from, "type", tx.Type()) txs.Pop() - case strings.Contains(err.Error(), "emits too many cross"): // This is ErrEtxLimitReached with more info - // Pop the unsupported transaction without shifting in the next from the account - log.Trace("Etx limit exceeded for current block", "sender", from, "err", err) - txs.Pop() - default: // Strange error, discard the transaction and get the next in line (note, the // nonce-too-high clause will prevent us from executing in vain). @@ -910,34 +844,34 @@ func (w *worker) prepareWork(genParams *generateParams, block *types.Block) (*en // into the given sealing block. The transaction selection and ordering strategy can // be customized with the plugin in the future. func (w *worker) fillTransactions(interrupt *int32, env *environment, block *types.Block) { - - w.sortCacheLock.Lock() - if w.sortedPoolCache == nil { - w.sortedPoolCacheNil++ - } else { - w.sortedPoolCacheFull++ + // Split the pending transactions into locals and remotes + // Fill the block with all available pending transactions. + etxSet := rawdb.ReadEtxSet(w.hc.bc.db, block.Hash(), block.NumberU64()) + if etxSet == nil { + return } - if w.sortedPoolCache != nil && w.sortedPoolCache.Len() > 0 { - w.commitTransactions(env, w.sortedPoolCache, interrupt) - w.sortCacheLock.Unlock() - } else { - w.sortCacheLock.Unlock() - log.Info("Worker sortedPoolCache is empty, triggering failsafe") - // Split the pending transactions into locals and remotes - // Fill the block with all available pending transactions. - etxSet := rawdb.ReadEtxSet(w.hc.bc.db, block.Hash(), block.NumberU64()) - if etxSet == nil { - return + etxSet.Update(types.Transactions{}, block.NumberU64()+1) // Prune any expired ETXs + pending, err := w.txPool.TxPoolPending(true, etxSet) + if err != nil { + return + } + localTxs, remoteTxs := make(map[common.AddressBytes]types.Transactions), pending + for _, account := range w.txPool.Locals() { + if txs := remoteTxs[account.Bytes20()]; len(txs) > 0 { + delete(remoteTxs, account.Bytes20()) + localTxs[account.Bytes20()] = txs } - etxSet.Update(types.Transactions{}, block.NumberU64()+1) // Prune any expired ETXs - pending, err := w.txPool.TxPoolPending(true, etxSet) - if err != nil { + } + if len(localTxs) > 0 { + txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee(), false) + if w.commitTransactions(env, txs, interrupt) { return } - - if len(pending) > 0 { - txs := types.NewTransactionsByPriceAndNonce(env.signer, pending, env.header.BaseFee(), false) - w.commitTransactions(env, txs, interrupt) + } + if len(remoteTxs) > 0 { + txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee(), false) + if w.commitTransactions(env, txs, interrupt) { + return } } }