diff --git a/core/tx_pool.go b/core/tx_pool.go index 8dceccf5c..5257224db 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -314,6 +314,7 @@ type TxPool struct { scope event.SubscriptionScope signer types.Signer mu sync.RWMutex + pendingMu sync.RWMutex currentState *state.StateDB // Current state in the blockchain head db ethdb.Reader @@ -325,6 +326,7 @@ type TxPool struct { qiPool *lru.Cache[common.Hash, *types.TxWithMinerFee] // Qi pool to store Qi transactions qiTxFees *lru.Cache[[16]byte, *big.Int] // Recent Qi transaction fees (hash is truncated to 16 bytes to save space) pending map[common.InternalAddress]*txList // All currently processable transactions + pendingSorted []common.InternalAddress // Sorted list of account addresses for pending pool queue map[common.InternalAddress]*txList // Queued but non-processable transactions beats map[common.InternalAddress]time.Time // Last heartbeat from each known account all *txLookup // All transactions to allow lookups @@ -343,14 +345,15 @@ type TxPool struct { reOrgCounter int // keeps track of the number of times the runReorg is called, it is reset every c_reorgCounterThreshold times - chainHeadCh chan ChainHeadEvent - chainHeadSub event.Subscription - reqResetCh chan *txpoolResetRequest - reqPromoteCh chan *accountSet - queueTxEventCh chan *types.Transaction - reorgDoneCh chan chan struct{} - reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop - wg sync.WaitGroup // tracks loop, scheduleReorgLoop + chainHeadCh chan ChainHeadEvent + chainHeadSub event.Subscription + reqResetCh chan *txpoolResetRequest + reqPromoteCh chan *accountSet + queueTxEventCh chan *types.Transaction + reorgDoneCh chan chan struct{} + reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop + pendingTxSortCancelCh chan struct{} // requests shutdown of pendingTxSort + wg sync.WaitGroup // tracks loop, scheduleReorgLoop logger *log.Logger } @@ -400,30 +403,31 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block // Create the transaction pool with its initial settings pool := &TxPool{ - config: config, - chainconfig: chainconfig, - chain: chain, - signer: types.LatestSigner(chainconfig), - pending: make(map[common.InternalAddress]*txList), - queue: make(map[common.InternalAddress]*txList), - beats: make(map[common.InternalAddress]time.Time), - sendersCh: make(chan newSender, config.SendersChBuffer), - feesCh: make(chan newFee, config.SendersChBuffer), - invalidQiTxsCh: make(chan []*common.Hash, config.SendersChBuffer), - all: newTxLookup(), - chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), - reqResetCh: make(chan *txpoolResetRequest, chainHeadChanSize), - reqPromoteCh: make(chan *accountSet, chainHeadChanSize), - queueTxEventCh: make(chan *types.Transaction, chainHeadChanSize), - broadcastSet: make(types.Transactions, 0), - reorgDoneCh: make(chan chan struct{}, chainHeadChanSize), - reorgShutdownCh: make(chan struct{}), - gasPrice: new(big.Int).SetUint64(config.PriceLimit), - localTxsCount: 0, - remoteTxsCount: 0, - reOrgCounter: 0, - logger: logger, - db: db, + config: config, + chainconfig: chainconfig, + chain: chain, + signer: types.LatestSigner(chainconfig), + pending: make(map[common.InternalAddress]*txList), + queue: make(map[common.InternalAddress]*txList), + beats: make(map[common.InternalAddress]time.Time), + sendersCh: make(chan newSender, config.SendersChBuffer), + feesCh: make(chan newFee, config.SendersChBuffer), + invalidQiTxsCh: make(chan []*common.Hash, config.SendersChBuffer), + all: newTxLookup(), + chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), + reqResetCh: make(chan *txpoolResetRequest, chainHeadChanSize), + reqPromoteCh: make(chan *accountSet, chainHeadChanSize), + queueTxEventCh: make(chan *types.Transaction, chainHeadChanSize), + broadcastSet: make(types.Transactions, 0), + reorgDoneCh: make(chan chan struct{}, chainHeadChanSize), + reorgShutdownCh: make(chan struct{}), + pendingTxSortCancelCh: make(chan struct{}, 1), + gasPrice: new(big.Int).SetUint64(config.PriceLimit), + localTxsCount: 0, + remoteTxsCount: 0, + reOrgCounter: 0, + logger: logger, + db: db, } qiPool, _ := lru.New[common.Hash, *types.TxWithMinerFee](int(config.QiPoolSize)) @@ -473,6 +477,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block go pool.feesGoroutine() go pool.invalidQiTxGoroutine() go pool.qiTxExpirationGoroutine() + go pool.sortTxPoolGoroutine() return pool } @@ -514,6 +519,7 @@ func (pool *TxPool) loop() { // System shutdown. case <-pool.chainHeadSub.Err(): close(pool.reorgShutdownCh) + close(pool.pendingTxSortCancelCh) return // Handle stats reporting ticks @@ -629,9 +635,11 @@ func (pool *TxPool) Stats() (int, int, int) { // number of queued (non-executable) transactions. func (pool *TxPool) stats() (int, int, int) { pending := 0 + pool.pendingMu.RLock() for _, list := range pool.pending { pending += list.Len() } + pool.pendingMu.RUnlock() queued := 0 for _, list := range pool.queue { queued += list.Len() @@ -646,9 +654,11 @@ func (pool *TxPool) Content() (map[common.InternalAddress]types.Transactions, ma defer pool.mu.Unlock() pending := make(map[common.InternalAddress]types.Transactions) + pool.pendingMu.RLock() for addr, list := range pool.pending { pending[addr] = list.Flatten() } + pool.pendingMu.RUnlock() queued := make(map[common.InternalAddress]types.Transactions) for addr, list := range pool.queue { queued[addr] = list.Flatten() @@ -663,9 +673,11 @@ func (pool *TxPool) ContentFrom(addr common.InternalAddress) (types.Transactions defer pool.mu.RUnlock() var pending types.Transactions + pool.pendingMu.RLock() if list, ok := pool.pending[addr]; ok { pending = list.Flatten() } + pool.pendingMu.RUnlock() var queued types.Transactions if list, ok := pool.queue[addr]; ok { queued = list.Flatten() @@ -699,6 +711,7 @@ func (pool *TxPool) TxPoolPending(enforceTips bool) (map[common.AddressBytes]typ defer pool.mu.RUnlock() pending := make(map[common.AddressBytes]types.Transactions) + pool.pendingMu.RLock() for addr, list := range pool.pending { txs := list.Flatten() @@ -721,6 +734,7 @@ func (pool *TxPool) TxPoolPending(enforceTips bool) (map[common.AddressBytes]typ pending[addr.Bytes20()] = txs } } + pool.pendingMu.RUnlock() return pending, nil } @@ -737,6 +751,7 @@ func (pool *TxPool) Locals() []common.InternalAddress { // freely modified by calling code. func (pool *TxPool) local() map[common.InternalAddress]types.Transactions { txs := make(map[common.InternalAddress]types.Transactions) + pool.pendingMu.RLock() for addr := range pool.locals.accounts { if pending := pool.pending[addr]; pending != nil { txs[addr] = append(txs[addr], pending.Flatten()...) @@ -745,6 +760,7 @@ func (pool *TxPool) local() map[common.InternalAddress]types.Transactions { txs[addr] = append(txs[addr], queued.Flatten()...) } } + pool.pendingMu.RUnlock() return txs } @@ -1019,9 +1035,13 @@ func (pool *TxPool) journalTx(from common.InternalAddress, tx *types.Transaction func (pool *TxPool) promoteTx(addr common.InternalAddress, hash common.Hash, tx *types.Transaction) bool { // Try to insert the transaction into the pending queue if pool.pending[addr] == nil { + pool.pendingMu.Lock() pool.pending[addr] = newTxList(true) + pool.pendingMu.Unlock() } + pool.pendingMu.RLock() list := pool.pending[addr] + pool.pendingMu.RUnlock() inserted, old := list.Add(tx, pool.config.PriceBump) if !inserted { @@ -1401,11 +1421,13 @@ func (pool *TxPool) Status(hashes []common.Hash) []TxStatus { continue } pool.mu.RLock() + pool.pendingMu.RLock() if txList := pool.pending[internal]; txList != nil && txList.txs.items[tx.Nonce()] != nil { status[i] = TxStatusPending } else if txList := pool.queue[internal]; txList != nil && txList.txs.items[tx.Nonce()] != nil { status[i] = TxStatusQueued } + pool.pendingMu.RUnlock() // implicit else: the tx may have been included into a block between // checking pool.Get and obtaining the lock. In that case, TxStatusUnknown is correct pool.mu.RUnlock() @@ -1449,6 +1471,7 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { localTxGauge.Dec() } // Remove the transaction from the pending lists and reset the account nonce + pool.pendingMu.Lock() if pending := pool.pending[internal]; pending != nil { if removed, invalids := pending.Remove(tx); removed { // If no more pending transactions are left, remove the list @@ -1467,6 +1490,7 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { return } } + pool.pendingMu.Unlock() // Transaction is in the future queue if future := pool.queue[internal]; future != nil { if removed, _ := future.Remove(tx); removed { @@ -1677,10 +1701,12 @@ func (pool *TxPool) runReorg(done chan struct{}, cancel chan struct{}, reset *tx pool.truncateQueue() // Update all accounts to the latest known pending nonce + pool.pendingMu.RLock() for addr, list := range pool.pending { highestPending := list.LastElement() pool.pendingNonces.set(addr, highestPending.Nonce()+1) } + pool.pendingMu.RUnlock() pool.mu.Unlock() // Notify subsystems for newly added transactions @@ -1973,9 +1999,11 @@ func (pool *TxPool) truncatePending() { start = time.Now() } pending := uint64(0) + pool.pendingMu.RLock() for _, list := range pool.pending { pending += uint64(list.Len()) } + pool.pendingMu.RUnlock() if pending <= pool.config.GlobalSlots { return } @@ -1983,12 +2011,14 @@ func (pool *TxPool) truncatePending() { pendingBeforeCap := pending // Assemble a spam order to penalize large transactors first spammers := prque.New(nil) + pool.pendingMu.RLock() for addr, list := range pool.pending { // Only evict transactions from high rollers if uint64(list.Len()) > pool.config.AccountSlots { spammers.Push(addr, int64(list.Len())) } } + pool.pendingMu.RUnlock() // Gradually drop transactions from offenders offenders := []common.InternalAddress{} for pending > pool.config.GlobalSlots && !spammers.Empty() { @@ -1997,6 +2027,7 @@ func (pool *TxPool) truncatePending() { offenders = append(offenders, offender.(common.InternalAddress)) // Equalize balances until all the same or below threshold + pool.pendingMu.RLock() if len(offenders) > 1 { // Calculate the equalization threshold for all current offenders threshold := pool.pending[offender.(common.InternalAddress)].Len() @@ -2025,9 +2056,11 @@ func (pool *TxPool) truncatePending() { } } } + pool.pendingMu.RUnlock() } // If still above threshold, reduce to limit or min allowance + pool.pendingMu.RLock() if pending > pool.config.GlobalSlots && len(offenders) > 0 { for pending > pool.config.GlobalSlots && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > pool.config.AccountSlots { for _, addr := range offenders { @@ -2052,6 +2085,7 @@ func (pool *TxPool) truncatePending() { } } } + pool.pendingMu.RUnlock() pendingRateLimitMeter.Add(float64(pendingBeforeCap - pending)) if pool.reOrgCounter == c_reorgCounterThreshold { pool.logger.WithField("time", common.PrettyDuration(time.Since(start))).Debug("Time taken to truncatePending") @@ -2121,6 +2155,7 @@ func (pool *TxPool) demoteUnexecutables() { start = time.Now() } // Iterate over all accounts and demote any non-executable transactions + pool.pendingMu.Lock() for addr, list := range pool.pending { nonce := pool.currentState.GetNonce(addr) @@ -2169,6 +2204,7 @@ func (pool *TxPool) demoteUnexecutables() { delete(pool.pending, addr) } } + pool.pendingMu.Unlock() if pool.reOrgCounter == c_reorgCounterThreshold { pool.logger.WithField("time", common.PrettyDuration(time.Since(start))).Debug("Time taken to demoteUnexecutables") } @@ -2282,9 +2318,11 @@ func (pool *TxPool) poolLimiterGoroutine() { queued += uint64(list.Len()) } pending := uint64(0) + pool.pendingMu.RLock() for _, list := range pool.pending { pending += uint64(list.Len()) } + pool.pendingMu.RUnlock() pool.mu.RUnlock() pool.logger.Infof("PoolSize: Pending: %d, Queued: %d, Number of accounts in queue: %d, Qi Pool: %d", pending, queued, len(pool.queue), pool.qiPool.Len()) pendingTxGauge.Set(float64(pending)) @@ -2389,6 +2427,106 @@ func (pool *TxPool) qiTxExpirationGoroutine() { } } +func (pool *TxPool) sortTxPoolGoroutine() { + defer func() { + if r := recover(); r != nil { + pool.logger.WithFields(log.Fields{ + "debug": string(debug.Stack()), + "error": r, + }).Error("Go-Quai Panicked") + } + }() + + ticker := time.NewTicker(30 * time.Second) + + for { + select { + case <-pool.pendingTxSortCancelCh: + continue + case <-ticker.C: + start := time.Now() + t0 := time.Now() + // Step 1: Append keys into slice + pool.pendingMu.RLock() + pendingSorted := make([]common.InternalAddress, len(pool.pending)) + i := 0 + for k := range pool.pending { + pendingSorted[i] = k + i++ + } + pool.pendingMu.RUnlock() + + t1 := time.Now() + // Step 2: Sort keys + // sort.Slice(pendingSorted, func(i, j int) bool { + // return pool.pending[pendingSorted[i]].txs.LastElement().GasFeeCap().Cmp(pool.pending[pendingSorted[j]].txs.LastElement().GasFeeCap()) > 0 + // }) + pool.pendingMu.Lock() + pool.heapSortInterrupt(pendingSorted) + t2 := time.Now() + + // Step 3: Provide a list of sorted keys + pool.pendingSorted = pendingSorted + pool.pendingMu.Unlock() + t3 := time.Now() + + log.Global.WithFields(log.Fields{ + "elements": len(pool.pending), + "d0": t0.Sub(start), + "d1": t1.Sub(t0), + "sortTime": t2.Sub(t1), + "d3": t3.Sub(t2), + }).Warn("Sort stats") + } + } +} + +func (pool *TxPool) heapify(pendingSorted []common.InternalAddress, n, i int) { + largest := i + left := 2*i + 1 + right := 2*i + 2 + + if left < n && pool.pending[pendingSorted[left]].txs.LastElement().GasPrice().Cmp(pool.pending[pendingSorted[largest]].txs.LastElement().GasPrice()) > 0 { + largest = left + } + + if right < n && pool.pending[pendingSorted[right]].txs.LastElement().GasPrice().Cmp(pool.pending[pendingSorted[largest]].txs.LastElement().GasPrice()) < 0 { + largest = right + } + + if largest != i { + pendingSorted[i], pendingSorted[largest] = pendingSorted[largest], pendingSorted[i] + pool.heapify(pendingSorted, n, largest) + } +} + +func (pool *TxPool) heapSortInterrupt(pendingSorted []common.InternalAddress) { + n := len(pendingSorted) + + // max heap + for i := n/2 - 1; i >= 0; i-- { + select { + case <-pool.pendingTxSortCancelCh: + return + default: + // Continue the sorting process + pool.heapify(pendingSorted, n, i) + } + } + + for i := n - 1; i >= 0; i-- { + select { + case <-pool.pendingTxSortCancelCh: + return + default: + // Move current to end + pendingSorted[0], pendingSorted[i] = pendingSorted[i], pendingSorted[0] + + pool.heapify(pendingSorted, i, 0) + } + } +} + // addressByHeartbeat is an account address tagged with its last activity timestamp. type addressByHeartbeat struct { address common.InternalAddress