Skip to content

Commit

Permalink
Worker now asynchronously sorts pool and local txs are
Browse files Browse the repository at this point in the history
no longer prioritized
  • Loading branch information
jdowning100 committed Aug 29, 2023
1 parent 9bffde1 commit 6927f05
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 36 deletions.
2 changes: 2 additions & 0 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ func (c *Core) serviceBlocks(hashNumberList []types.HashAndNumber) {
} else {
if !c.HasHeader(header.ParentHash(), header.NumberU64()-1) {
c.sl.missingParentFeed.Send(header.ParentHash())
} else {
log.Error("db has the header, but it's nil", "HasHeader", c.HasHeader(header.ParentHash(), header.NumberU64()-1), "GetHeader", c.GetHeader(header.ParentHash(), header.NumberU64()-1), "ParentHash", header.ParentHash().String(), "Number", header.NumberU64())
}
}
} else {
Expand Down
6 changes: 5 additions & 1 deletion core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type HeaderChain struct {
bc *BodyDb
engine consensus.Engine
pool *TxPool
worker *worker

chainHeadFeed event.Feed
chainSideFeed event.Feed
Expand Down Expand Up @@ -386,7 +387,10 @@ func (hc *HeaderChain) SetCurrentHeader(head *types.Header) error {
hc.ReadInboundEtxsAndAppendBlock(hashStack[i])
rawdb.WriteCanonicalHash(hc.headerDb, hashStack[i].Hash(), hashStack[i].NumberU64())
}

// Async reset the worker sorted pool cache as it is outdated
if common.NodeLocation.Context() == common.ZONE_CTX && hc.ProcessingState() {
go hc.worker.ReorgResetSortedPoolCache()
}
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion core/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func NewSlice(db ethdb.Database, config *Config, txConfig *TxPoolConfig, txLooku
sl.hc.pool = sl.txPool
}
sl.miner = New(sl.hc, sl.txPool, config, db, chainConfig, engine, isLocalBlock, sl.ProcessingState())

sl.hc.worker = sl.miner.worker
sl.phCache, _ = lru.New(c_phCacheSize)

// only set the subClients if the chain is not Zone
Expand Down
7 changes: 7 additions & 0 deletions core/types/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,13 @@ func (t *TransactionsByPriceAndNonce) Pop() {
heap.Pop(&t.heads)
}

func (t *TransactionsByPriceAndNonce) Len() int {
if t.heads == nil {
return 0
}
return t.heads.Len()
}

// Message is a fully derived transaction and implements core.Message
//
// NOTE: In a future PR this will be removed.
Expand Down
134 changes: 100 additions & 34 deletions core/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"math/big"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -43,6 +44,9 @@ const (

// pendingBlockBodyLimit is maximum number of pending block bodies to be kept in cache.
pendingBlockBodyLimit = 320

// sortedPoolCacheInternval is the interval to update the sorted transaction pool.
sortedPoolCacheInterval = 3 * time.Second
)

// environment is the worker's current environment and holds all
Expand Down Expand Up @@ -164,12 +168,15 @@ type Config struct {
// worker is the main object which takes care of submitting new work to consensus engine
// and gathering the sealing result.
type worker struct {
config *Config
chainConfig *params.ChainConfig
engine consensus.Engine
hc *HeaderChain
txPool *TxPool

config *Config
chainConfig *params.ChainConfig
engine consensus.Engine
hc *HeaderChain
txPool *TxPool
sortedPoolCache *types.TransactionsByPriceAndNonce // sortedPoolCache is the sorted transaction pool
sortedPoolCacheNil uint // number of times the sorted pool cache was nil for stats
sortedPoolCacheFull uint // number of times the sorted pool cache was not nil for stats
sortCacheLock sync.RWMutex
// Feeds
pendingLogsFeed event.Feed
pendingHeaderFeed event.Feed
Expand All @@ -184,6 +191,7 @@ type worker struct {
exitCh chan struct{}
resubmitIntervalCh chan time.Duration
resubmitAdjustCh chan *intervalAdjust
reorgCh chan bool

interrupt chan struct{}
asyncPhFeed event.Feed // asyncPhFeed sends an event after each state root update
Expand Down Expand Up @@ -245,17 +253,14 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, db ethdb.Databas
interrupt: make(chan struct{}),
resubmitIntervalCh: make(chan time.Duration),
resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize),
reorgCh: make(chan bool),
}
// Set the GasFloor of the worker to the minGasLimit
worker.config.GasFloor = params.MinGasLimit

phBodyCache, _ := lru.New(pendingBlockBodyLimit)
worker.pendingBlockBody = phBodyCache

if headerchain.ProcessingState() {
worker.chainHeadSub = worker.hc.SubscribeChainHeadEvent(worker.chainHeadCh)
}

// Sanitize recommit interval if the user-specified one is too short.
recommit := worker.config.Recommit
if recommit < minRecommitInterval {
Expand All @@ -264,10 +269,16 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, db ethdb.Databas
}

if processingState {
worker.chainHeadSub = worker.hc.SubscribeChainHeadEvent(worker.chainHeadCh)
worker.wg.Add(1)
go worker.asyncStateLoop()
}

if processingState && common.NodeLocation.Context() == common.ZONE_CTX {
ticker := time.NewTicker(sortedPoolCacheInterval)
go worker.SortPool(ticker)
}

return worker
}

Expand Down Expand Up @@ -426,6 +437,56 @@ func (w *worker) asyncStateLoop() {
}
}

// SortPool asynchronously sorts the tx pool for the worker.
func (w *worker) SortPool(tick *time.Ticker) {
statsTicker := time.NewTicker(10 * time.Second)
for {
select {
case <-tick.C:
currentHead := w.hc.CurrentHeader()
expectedBaseFee := misc.CalcBaseFee(w.chainConfig, currentHead) // calc expected next basefee (based on currentHead as parent)
etxSet := rawdb.ReadEtxSet(w.hc.bc.db, currentHead.Hash(), currentHead.NumberU64())
if etxSet == nil {
log.Error("Error reading etx set from db")
continue
}
etxSet.Update(types.Transactions{}, currentHead.NumberU64()) // Prune any expired ETXs
// Get transactions from pool
pending, err := w.txPool.TxPoolPending(true, etxSet)
if err != nil {
log.Error("Error getting pending transactions from pool", "err", err)
continue
}
if len(pending) > 0 {
txs := types.NewTransactionsByPriceAndNonce(w.current.signer, pending, expectedBaseFee, true)
w.sortCacheLock.Lock()
w.sortedPoolCache = txs
w.sortCacheLock.Unlock()
}
case <-w.reorgCh:
w.sortCacheLock.Lock()
w.sortedPoolCache = nil // reset the sortedPoolCache in the case of a reorg as it is outdated
tick = time.NewTicker(sortedPoolCacheInterval) // reset the timer to give some time for mempool to reorg
w.sortCacheLock.Unlock()
case <-statsTicker.C:
w.sortCacheLock.RLock()
if w.sortedPoolCache == nil {
w.sortCacheLock.RUnlock()
continue
}
log.Info(fmt.Sprintf("Worker pool cache has %d txs, was nil %d times and not nil %d times, or nil %s percent of the time", w.sortedPoolCache.Len(), w.sortedPoolCacheNil, w.sortedPoolCacheFull, fmt.Sprintf("%.2f", float64(w.sortedPoolCacheNil)/float64(w.sortedPoolCacheNil+w.sortedPoolCacheFull)*100)))
w.sortCacheLock.RUnlock()
case <-w.exitCh:
return
}
}
}

// ReorgResetPoolCache resets the sortedPoolCache in the case of a reorg.
func (w *worker) ReorgResetSortedPoolCache() {
w.reorgCh <- true
}

// GeneratePendingBlock generates pending block given a commited block.
func (w *worker) GeneratePendingHeader(block *types.Block, fill bool) (*types.Header, error) {
nodeCtx := common.NodeLocation.Context()
Expand Down Expand Up @@ -697,6 +758,11 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
log.Error("Skipping unsupported transaction type", "sender", from, "type", tx.Type())
txs.Pop()

case strings.Contains(err.Error(), "emits too many cross"): // This is ErrEtxLimitReached with more info
// Pop the unsupported transaction without shifting in the next from the account
log.Trace("Etx limit exceeded for current block", "sender", from, "err", err)
txs.Pop()

default:
// Strange error, discard the transaction and get the next in line (note, the
// nonce-too-high clause will prevent us from executing in vain).
Expand Down Expand Up @@ -844,35 +910,35 @@ func (w *worker) prepareWork(genParams *generateParams, block *types.Block) (*en
// into the given sealing block. The transaction selection and ordering strategy can
// be customized with the plugin in the future.
func (w *worker) fillTransactions(interrupt *int32, env *environment, block *types.Block) {
// Split the pending transactions into locals and remotes
// Fill the block with all available pending transactions.
etxSet := rawdb.ReadEtxSet(w.hc.bc.db, block.Hash(), block.NumberU64())
if etxSet == nil {
return
}
etxSet.Update(types.Transactions{}, block.NumberU64()) // Prune any expired ETXs
pending, err := w.txPool.TxPoolPending(true, etxSet)
if err != nil {
return
}
localTxs, remoteTxs := make(map[common.AddressBytes]types.Transactions), pending
for _, account := range w.txPool.Locals() {
if txs := remoteTxs[account.Bytes20()]; len(txs) > 0 {
delete(remoteTxs, account.Bytes20())
localTxs[account.Bytes20()] = txs
}

w.sortCacheLock.Lock()
if w.sortedPoolCache == nil {
w.sortedPoolCacheNil++
} else {
w.sortedPoolCacheFull++
}
if len(localTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee(), false)
if w.commitTransactions(env, txs, interrupt) {
if w.sortedPoolCache != nil && w.sortedPoolCache.Len() > 0 {
w.commitTransactions(env, w.sortedPoolCache, interrupt)
w.sortCacheLock.Unlock()
} else {
w.sortCacheLock.Unlock()
log.Info("Worker sortedPoolCache is empty, triggering failsafe")
// Split the pending transactions into locals and remotes
// Fill the block with all available pending transactions.
etxSet := rawdb.ReadEtxSet(w.hc.bc.db, block.Hash(), block.NumberU64())
if etxSet == nil {
return
}
}
if len(remoteTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee(), false)
if w.commitTransactions(env, txs, interrupt) {
etxSet.Update(types.Transactions{}, block.NumberU64()) // Prune any expired ETXs
pending, err := w.txPool.TxPoolPending(true, etxSet)
if err != nil {
return
}

if len(pending) > 0 {
txs := types.NewTransactionsByPriceAndNonce(env.signer, pending, env.header.BaseFee(), false)
w.commitTransactions(env, txs, interrupt)
}
}
}

Expand Down

0 comments on commit 6927f05

Please sign in to comment.