Skip to content

Commit

Permalink
Changed sort to true on tx addition in filltransactions and added tim…
Browse files Browse the repository at this point in the history
…e metrics for filltransactions
  • Loading branch information
jdowning100 committed Aug 31, 2023
1 parent b1062fc commit e8d50e9
Showing 1 changed file with 34 additions and 30 deletions.
64 changes: 34 additions & 30 deletions core/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,16 +168,17 @@ type Config struct {
// worker is the main object which takes care of submitting new work to consensus engine
// and gathering the sealing result.
type worker struct {
config *Config
chainConfig *params.ChainConfig
engine consensus.Engine
hc *HeaderChain
txPool *TxPool
sortedPoolCache *types.TransactionsByPriceAndNonce // sortedPoolCache is the sorted transaction pool
sortedPoolCacheNil uint // number of times the sorted pool cache was nil for stats
sortedPoolCacheFull uint // number of times the sorted pool cache was not nil for stats
sortedPoolCacheEmpty uint // number of times the sorted pool cache was empty for stats
sortCacheLock sync.RWMutex
config *Config
chainConfig *params.ChainConfig
engine consensus.Engine
hc *HeaderChain
txPool *TxPool
sortedPoolCache *types.TransactionsByPriceAndNonce // sortedPoolCache is the sorted transaction pool
sortedPoolCacheNil uint // number of times the sorted pool cache was nil for stats
sortedPoolCacheFull uint // number of times the sorted pool cache was not nil for stats
sortedPoolCacheEmpty uint // number of times the sorted pool cache was empty for stats
fillTransactionsRollingAverage *RollingAverage
sortCacheLock sync.RWMutex
// Feeds
pendingLogsFeed event.Feed
pendingHeaderFeed event.Feed
Expand Down Expand Up @@ -237,24 +238,25 @@ type worker struct {

func newWorker(config *Config, chainConfig *params.ChainConfig, db ethdb.Database, engine consensus.Engine, headerchain *HeaderChain, txPool *TxPool, isLocalBlock func(header *types.Header) bool, init bool, processingState bool) *worker {
worker := &worker{
config: config,
chainConfig: chainConfig,
engine: engine,
hc: headerchain,
txPool: txPool,
coinbase: config.Etherbase,
isLocalBlock: isLocalBlock,
workerDb: db,
localUncles: make(map[common.Hash]*types.Block),
remoteUncles: make(map[common.Hash]*types.Block),
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
taskCh: make(chan *task),
resultCh: make(chan *types.Block, resultQueueSize),
exitCh: make(chan struct{}),
interrupt: make(chan struct{}),
resubmitIntervalCh: make(chan time.Duration),
resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize),
reorgCh: make(chan bool),
config: config,
chainConfig: chainConfig,
engine: engine,
hc: headerchain,
txPool: txPool,
coinbase: config.Etherbase,
isLocalBlock: isLocalBlock,
workerDb: db,
localUncles: make(map[common.Hash]*types.Block),
remoteUncles: make(map[common.Hash]*types.Block),
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
taskCh: make(chan *task),
resultCh: make(chan *types.Block, resultQueueSize),
exitCh: make(chan struct{}),
interrupt: make(chan struct{}),
resubmitIntervalCh: make(chan time.Duration),
resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize),
reorgCh: make(chan bool),
fillTransactionsRollingAverage: &RollingAverage{windowSize: 100},
}
// Set the GasFloor of the worker to the minGasLimit
worker.config.GasFloor = params.MinGasLimit
Expand Down Expand Up @@ -514,7 +516,7 @@ func (w *worker) SortPool(tick *time.Ticker) {
w.sortCacheLock.RUnlock()
continue
}
log.Info(fmt.Sprintf("Worker sorted poolcache has %d txs, took %s to sort, average sort time %s. Poolcache was nil %d times, empty %d and not empty %d times, empty %s percent of the time and nil %s percent of the time", w.sortedPoolCache.Len(), common.PrettyDuration(timeTaken), common.PrettyDuration(ra.Average()), w.sortedPoolCacheNil, w.sortedPoolCacheEmpty, w.sortedPoolCacheFull, fmt.Sprintf("%.2f", float64(w.sortedPoolCacheEmpty)/float64(w.sortedPoolCacheNil+w.sortedPoolCacheFull+w.sortedPoolCacheEmpty)*100), fmt.Sprintf("%.2f", float64(w.sortedPoolCacheNil)/float64(w.sortedPoolCacheNil+w.sortedPoolCacheFull+w.sortedPoolCacheEmpty)*100)))
log.Info(fmt.Sprintf("Worker sorted poolcache has %d txs, took %s to sort, average sort time %s. Average fillTransactions time is %s. Poolcache was nil %d times, empty %d and not empty %d times, empty %s percent of the time and nil %s percent of the time", w.sortedPoolCache.Len(), common.PrettyDuration(timeTaken), common.PrettyDuration(ra.Average()), common.PrettyDuration(w.fillTransactionsRollingAverage.Average()), w.sortedPoolCacheNil, w.sortedPoolCacheEmpty, w.sortedPoolCacheFull, fmt.Sprintf("%.2f", float64(w.sortedPoolCacheEmpty)/float64(w.sortedPoolCacheNil+w.sortedPoolCacheFull+w.sortedPoolCacheEmpty)*100), fmt.Sprintf("%.2f", float64(w.sortedPoolCacheNil)/float64(w.sortedPoolCacheNil+w.sortedPoolCacheFull+w.sortedPoolCacheEmpty)*100)))
w.sortCacheLock.RUnlock()
case <-w.exitCh:
return
Expand Down Expand Up @@ -800,7 +802,7 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
// Everything ok, collect the logs and shift in the next transaction from the same account
coalescedLogs = append(coalescedLogs, logs...)
env.tcount++
txs.Shift(from.Bytes20(), false)
txs.Shift(from.Bytes20(), true)

case errors.Is(err, ErrTxTypeNotSupported):
// Pop the unsupported transaction without shifting in the next from the account
Expand Down Expand Up @@ -971,7 +973,9 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment, block *typ
if w.sortedPoolCache != nil && w.sortedPoolCache.Len() > 0 {
etxSet := rawdb.ReadEtxSet(w.hc.bc.db, block.Hash(), block.NumberU64())
etxSet.Update(types.Transactions{}, block.NumberU64()+1) // Prune any expired ETXs
start := time.Now()
w.commitTransactions(env, w.sortedPoolCache, interrupt, etxSet)
w.fillTransactionsRollingAverage.Add(time.Since(start))
w.sortCacheLock.Unlock()
} else {
w.sortCacheLock.Unlock()
Expand Down

0 comments on commit e8d50e9

Please sign in to comment.