From 6e6dadb94b6ec4ddfaaeea007d2d7b088d82cb09 Mon Sep 17 00:00:00 2001 From: Jonathan Downing Date: Thu, 9 May 2024 16:49:13 -0500 Subject: [PATCH 1/4] addQiTx should not hold pool lock --- core/tx_pool.go | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/core/tx_pool.go b/core/tx_pool.go index 4241d2f140..85511c146b 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -283,6 +283,7 @@ type TxPool struct { signer types.Signer mu sync.RWMutex qiMu sync.RWMutex + stateMu sync.RWMutex currentState *state.StateDB // Current state in the blockchain head pendingNonces *txNoncer // Pending state tracking virtual nonces @@ -386,7 +387,9 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block pool.locals.add(addr) } pool.priced = newTxPricedList(pool.all) + pool.mu.Lock() pool.reset(nil, chain.CurrentBlock()) + pool.mu.Unlock() // Start the reorg loop early so it can handle requests generated during journal loading. pool.wg.Add(1) @@ -1060,7 +1063,7 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error { continue } if tx.Type() == types.QiTxType { - if err := pool.addQiTx(tx, true); err != nil { + if err := pool.addQiTx(tx); err != nil { errs[i] = err } continue @@ -1124,13 +1127,12 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error { // addQiTx adds a Qi transaction to the Qi pool. // If the mempool lock is already held by the caller, the caller must set grabLock to false. // If the mempool lock is not held by the caller, the caller must set grabLock to true. -func (pool *TxPool) addQiTx(tx *types.Transaction, grabLock bool) error { +func (pool *TxPool) addQiTx(tx *types.Transaction) error { pool.qiMu.RLock() if _, hasTx := pool.qiPool[tx.Hash()]; hasTx { pool.qiMu.RUnlock() return ErrAlreadyKnown } - pool.qiMu.RUnlock() currentBlock := pool.chain.CurrentBlock() @@ -1143,23 +1145,17 @@ func (pool *TxPool) addQiTx(tx *types.Transaction, grabLock bool) error { if etxPLimit < params.ETXPLimitMin { etxPLimit = params.ETXPLimitMin } - if grabLock { - pool.mu.RLock() // need to readlock the whole pool because we are reading the current state - } + pool.stateMu.RLock() fee, _, err := ProcessQiTx(tx, pool.chain, false, true, pool.chain.CurrentBlock(), pool.currentState, &gp, new(uint64), pool.signer, pool.chainconfig.Location, *pool.chainconfig.ChainID, &etxRLimit, &etxPLimit) if err != nil { - if grabLock { - pool.mu.RUnlock() - } + pool.stateMu.RUnlock() pool.logger.WithFields(logrus.Fields{ "tx": tx.Hash().String(), "err": err, }).Error("Invalid Qi transaction") return err } - if grabLock { - pool.mu.RUnlock() - } + pool.stateMu.RUnlock() txWithMinerFee, err := types.NewTxWithMinerFee(tx, nil, fee) if err != nil { return err @@ -1229,7 +1225,6 @@ func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) ([]error, errs := make([]error, len(txs)) for i, tx := range txs { if tx.Type() == types.QiTxType { - errs[i] = pool.addQiTx(tx, false) continue } replaced, err := pool.add(tx, local) @@ -1697,13 +1692,22 @@ func (pool *TxPool) reset(oldHead, newHead *types.WorkObject) { pool.logger.WithField("err", err).Error("Failed to reset txpool state") return } + pool.stateMu.Lock() pool.currentState = statedb + pool.stateMu.Unlock() pool.pendingNonces = newTxNoncer(statedb) pool.currentMaxGas = newHead.GasLimit() // Inject any transactions discarded due to reorgs pool.logger.WithField("count", len(reinject)).Debug("Reinjecting stale transactions") senderCacher.recover(pool.signer, reinject) + pool.mu.Unlock() + for _, tx := range reinject { + if tx.Type() == types.QiTxType { + pool.addQiTx(tx) + } + } + pool.mu.Lock() pool.addTxsLocked(reinject, false) if pool.reOrgCounter == c_reorgCounterThreshold { pool.logger.WithField("time", common.PrettyDuration(time.Since(start))).Debug("Time taken to resetTxPool") From 3caf0629bbec790704653c89ea70db3176a4c9fd Mon Sep 17 00:00:00 2001 From: Jonathan Downing Date: Fri, 10 May 2024 13:18:51 -0500 Subject: [PATCH 2/4] Add Qi transactions to pool in groups --- core/tx_pool.go | 88 ++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 80 insertions(+), 8 deletions(-) diff --git a/core/tx_pool.go b/core/tx_pool.go index 85511c146b..533b9829a2 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -1052,8 +1052,9 @@ func (pool *TxPool) AddRemote(tx *types.Transaction) error { func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error { // Filter out known ones without obtaining the pool lock or recovering signatures var ( - errs = make([]error, len(txs)) - news = make([]*types.Transaction, 0, len(txs)) + errs = make([]error, len(txs)) + news = make([]*types.Transaction, 0, len(txs)) + qiNews = make([]*types.Transaction, 0, len(txs)) ) for i, tx := range txs { // If the transaction is known, pre-set the error slot @@ -1063,9 +1064,14 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error { continue } if tx.Type() == types.QiTxType { - if err := pool.addQiTx(tx); err != nil { - errs[i] = err + pool.qiMu.RLock() + if _, hasTx := pool.qiPool[tx.Hash()]; hasTx { + pool.qiMu.RUnlock() + errs[i] = ErrAlreadyKnown + continue } + pool.qiMu.RUnlock() + qiNews = append(qiNews, tx) continue } // Exclude transactions with invalid signatures as soon as @@ -1099,6 +1105,12 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error { // Accumulate all unknown transactions for deeper processing news = append(news, tx) } + if len(qiNews) > 0 { + pool.qiMu.Lock() + qiErrs := pool.addQiTxsLocked(qiNews) + pool.qiMu.Unlock() + errs = append(errs, qiErrs...) + } if len(news) == 0 { return errs } @@ -1152,7 +1164,7 @@ func (pool *TxPool) addQiTx(tx *types.Transaction) error { pool.logger.WithFields(logrus.Fields{ "tx": tx.Hash().String(), "err": err, - }).Error("Invalid Qi transaction") + }).Debug("Invalid Qi transaction") return err } pool.stateMu.RUnlock() @@ -1161,7 +1173,7 @@ func (pool *TxPool) addQiTx(tx *types.Transaction) error { return err } pool.qiMu.Lock() - if uint64(len(pool.qiPool))+1 > pool.config.GlobalSlots { + if uint64(len(pool.qiPool))+1 > pool.config.QiPoolSize { // If the pool is full, don't accept the transaction pool.qiMu.Unlock() pool.logger.WithFields(logrus.Fields{ @@ -1186,6 +1198,60 @@ func (pool *TxPool) addQiTx(tx *types.Transaction) error { return nil } +// addQiTx adds Qi transactions to the Qi pool. +// The qiMu lock must be held by the caller. +func (pool *TxPool) addQiTxsLocked(txs types.Transactions) []error { + errs := make([]error, 0) + currentBlock := pool.chain.CurrentBlock() + gp := types.GasPool(currentBlock.GasLimit()) + etxRLimit := len(currentBlock.Transactions()) / params.ETXRegionMaxFraction + if etxRLimit < params.ETXRLimitMin { + etxRLimit = params.ETXRLimitMin + } + etxPLimit := len(currentBlock.Transactions()) / params.ETXPrimeMaxFraction + if etxPLimit < params.ETXPLimitMin { + etxPLimit = params.ETXPLimitMin + } + for _, tx := range txs { + + pool.stateMu.RLock() + fee, _, _, err := ProcessQiTx(tx, pool.chain, false, true, pool.chain.CurrentBlock(), pool.currentState, &gp, new(uint64), pool.signer, pool.chainconfig.Location, *pool.chainconfig.ChainID, &etxRLimit, &etxPLimit) + if err != nil { + pool.stateMu.RUnlock() + pool.logger.WithFields(logrus.Fields{ + "tx": tx.Hash().String(), + "err": err, + }).Debug("Invalid Qi transaction") + errs = append(errs, err) + continue + } + pool.stateMu.RUnlock() + txWithMinerFee, err := types.NewTxWithMinerFee(tx, nil, fee) + if err != nil { + errs = append(errs, err) + continue + } + if uint64(len(pool.qiPool))+1 > pool.config.QiPoolSize { + // If the pool is full, don't accept the transaction + errs = append(errs, ErrTxPoolOverflow) + continue + } + pool.qiPool[tx.Hash()] = txWithMinerFee + pool.queueTxEvent(tx) + select { + case pool.sendersCh <- newSender{tx.Hash(), common.InternalAddress{}}: // There is no "sender" for Qi transactions, but the sig is good + default: + pool.logger.Error("sendersCh is full, skipping until there is room") + } + pool.logger.WithFields(logrus.Fields{ + "tx": tx.Hash().String(), + "fee": fee, + }).Info("Added qi tx to pool") + qiTxGauge.Add(1) + } + return errs +} + func (pool *TxPool) RemoveQiTx(tx *types.Transaction) { defer func() { if r := recover(); r != nil { @@ -1701,13 +1767,19 @@ func (pool *TxPool) reset(oldHead, newHead *types.WorkObject) { // Inject any transactions discarded due to reorgs pool.logger.WithField("count", len(reinject)).Debug("Reinjecting stale transactions") senderCacher.recover(pool.signer, reinject) - pool.mu.Unlock() + + qiTxs := make([]*types.Transaction, 0) for _, tx := range reinject { if tx.Type() == types.QiTxType { - pool.addQiTx(tx) + qiTxs = append(qiTxs, tx) } } + pool.mu.Unlock() // Don't lock the pool mutex while holding the qiMu lock + pool.qiMu.Lock() + pool.addQiTxsLocked(qiTxs) + pool.qiMu.Unlock() pool.mu.Lock() + pool.addTxsLocked(reinject, false) if pool.reOrgCounter == c_reorgCounterThreshold { pool.logger.WithField("time", common.PrettyDuration(time.Since(start))).Debug("Time taken to resetTxPool") From 075488da5eee1f3f808449499092a167723bd4e1 Mon Sep 17 00:00:00 2001 From: Jonathan Downing Date: Mon, 13 May 2024 12:03:40 -0500 Subject: [PATCH 3/4] Added waitgroup to reset so Qi and Quai are added concurrently --- core/tx_pool.go | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/core/tx_pool.go b/core/tx_pool.go index 533b9829a2..3d79ad9d62 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -1215,7 +1215,7 @@ func (pool *TxPool) addQiTxsLocked(txs types.Transactions) []error { for _, tx := range txs { pool.stateMu.RLock() - fee, _, _, err := ProcessQiTx(tx, pool.chain, false, true, pool.chain.CurrentBlock(), pool.currentState, &gp, new(uint64), pool.signer, pool.chainconfig.Location, *pool.chainconfig.ChainID, &etxRLimit, &etxPLimit) + fee, _, err := ProcessQiTx(tx, pool.chain, false, true, pool.chain.CurrentBlock(), pool.currentState, &gp, new(uint64), pool.signer, pool.chainconfig.Location, *pool.chainconfig.ChainID, &etxRLimit, &etxPLimit) if err != nil { pool.stateMu.RUnlock() pool.logger.WithFields(logrus.Fields{ @@ -1774,13 +1774,20 @@ func (pool *TxPool) reset(oldHead, newHead *types.WorkObject) { qiTxs = append(qiTxs, tx) } } - pool.mu.Unlock() // Don't lock the pool mutex while holding the qiMu lock - pool.qiMu.Lock() - pool.addQiTxsLocked(qiTxs) - pool.qiMu.Unlock() - pool.mu.Lock() - - pool.addTxsLocked(reinject, false) + var wg sync.WaitGroup + wg.Add(1) + go func() { + pool.addTxsLocked(reinject, false) + wg.Done() + }() + wg.Add(1) + go func() { + pool.qiMu.Lock() + pool.addQiTxsLocked(qiTxs) + pool.qiMu.Unlock() + wg.Done() + }() + wg.Wait() if pool.reOrgCounter == c_reorgCounterThreshold { pool.logger.WithField("time", common.PrettyDuration(time.Since(start))).Debug("Time taken to resetTxPool") } From b1c194dbb1a879b9d29a33144394258ed1054a29 Mon Sep 17 00:00:00 2001 From: Jonathan Downing Date: Wed, 15 May 2024 16:33:51 -0500 Subject: [PATCH 4/4] Removed stateMu from txpool --- core/tx_pool.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/core/tx_pool.go b/core/tx_pool.go index 3d79ad9d62..2d118e328d 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -283,7 +283,6 @@ type TxPool struct { signer types.Signer mu sync.RWMutex qiMu sync.RWMutex - stateMu sync.RWMutex currentState *state.StateDB // Current state in the blockchain head pendingNonces *txNoncer // Pending state tracking virtual nonces @@ -1157,17 +1156,14 @@ func (pool *TxPool) addQiTx(tx *types.Transaction) error { if etxPLimit < params.ETXPLimitMin { etxPLimit = params.ETXPLimitMin } - pool.stateMu.RLock() fee, _, err := ProcessQiTx(tx, pool.chain, false, true, pool.chain.CurrentBlock(), pool.currentState, &gp, new(uint64), pool.signer, pool.chainconfig.Location, *pool.chainconfig.ChainID, &etxRLimit, &etxPLimit) if err != nil { - pool.stateMu.RUnlock() pool.logger.WithFields(logrus.Fields{ "tx": tx.Hash().String(), "err": err, }).Debug("Invalid Qi transaction") return err } - pool.stateMu.RUnlock() txWithMinerFee, err := types.NewTxWithMinerFee(tx, nil, fee) if err != nil { return err @@ -1214,10 +1210,8 @@ func (pool *TxPool) addQiTxsLocked(txs types.Transactions) []error { } for _, tx := range txs { - pool.stateMu.RLock() fee, _, err := ProcessQiTx(tx, pool.chain, false, true, pool.chain.CurrentBlock(), pool.currentState, &gp, new(uint64), pool.signer, pool.chainconfig.Location, *pool.chainconfig.ChainID, &etxRLimit, &etxPLimit) if err != nil { - pool.stateMu.RUnlock() pool.logger.WithFields(logrus.Fields{ "tx": tx.Hash().String(), "err": err, @@ -1225,7 +1219,6 @@ func (pool *TxPool) addQiTxsLocked(txs types.Transactions) []error { errs = append(errs, err) continue } - pool.stateMu.RUnlock() txWithMinerFee, err := types.NewTxWithMinerFee(tx, nil, fee) if err != nil { errs = append(errs, err) @@ -1758,9 +1751,7 @@ func (pool *TxPool) reset(oldHead, newHead *types.WorkObject) { pool.logger.WithField("err", err).Error("Failed to reset txpool state") return } - pool.stateMu.Lock() pool.currentState = statedb - pool.stateMu.Unlock() pool.pendingNonces = newTxNoncer(statedb) pool.currentMaxGas = newHead.GasLimit()