diff --git a/core/tx_pool.go b/core/tx_pool.go index 4241d2f140..2d118e328d 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -386,7 +386,9 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block pool.locals.add(addr) } pool.priced = newTxPricedList(pool.all) + pool.mu.Lock() pool.reset(nil, chain.CurrentBlock()) + pool.mu.Unlock() // Start the reorg loop early so it can handle requests generated during journal loading. pool.wg.Add(1) @@ -1049,8 +1051,9 @@ func (pool *TxPool) AddRemote(tx *types.Transaction) error { func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error { // Filter out known ones without obtaining the pool lock or recovering signatures var ( - errs = make([]error, len(txs)) - news = make([]*types.Transaction, 0, len(txs)) + errs = make([]error, len(txs)) + news = make([]*types.Transaction, 0, len(txs)) + qiNews = make([]*types.Transaction, 0, len(txs)) ) for i, tx := range txs { // If the transaction is known, pre-set the error slot @@ -1060,9 +1063,14 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error { continue } if tx.Type() == types.QiTxType { - if err := pool.addQiTx(tx, true); err != nil { - errs[i] = err + pool.qiMu.RLock() + if _, hasTx := pool.qiPool[tx.Hash()]; hasTx { + pool.qiMu.RUnlock() + errs[i] = ErrAlreadyKnown + continue } + pool.qiMu.RUnlock() + qiNews = append(qiNews, tx) continue } // Exclude transactions with invalid signatures as soon as @@ -1096,6 +1104,12 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error { // Accumulate all unknown transactions for deeper processing news = append(news, tx) } + if len(qiNews) > 0 { + pool.qiMu.Lock() + qiErrs := pool.addQiTxsLocked(qiNews) + pool.qiMu.Unlock() + errs = append(errs, qiErrs...) + } if len(news) == 0 { return errs } @@ -1124,13 +1138,12 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error { // addQiTx adds a Qi transaction to the Qi pool. // If the mempool lock is already held by the caller, the caller must set grabLock to false. // If the mempool lock is not held by the caller, the caller must set grabLock to true. -func (pool *TxPool) addQiTx(tx *types.Transaction, grabLock bool) error { +func (pool *TxPool) addQiTx(tx *types.Transaction) error { pool.qiMu.RLock() if _, hasTx := pool.qiPool[tx.Hash()]; hasTx { pool.qiMu.RUnlock() return ErrAlreadyKnown } - pool.qiMu.RUnlock() currentBlock := pool.chain.CurrentBlock() @@ -1143,29 +1156,20 @@ func (pool *TxPool) addQiTx(tx *types.Transaction, grabLock bool) error { if etxPLimit < params.ETXPLimitMin { etxPLimit = params.ETXPLimitMin } - if grabLock { - pool.mu.RLock() // need to readlock the whole pool because we are reading the current state - } fee, _, err := ProcessQiTx(tx, pool.chain, false, true, pool.chain.CurrentBlock(), pool.currentState, &gp, new(uint64), pool.signer, pool.chainconfig.Location, *pool.chainconfig.ChainID, &etxRLimit, &etxPLimit) if err != nil { - if grabLock { - pool.mu.RUnlock() - } pool.logger.WithFields(logrus.Fields{ "tx": tx.Hash().String(), "err": err, - }).Error("Invalid Qi transaction") + }).Debug("Invalid Qi transaction") return err } - if grabLock { - pool.mu.RUnlock() - } txWithMinerFee, err := types.NewTxWithMinerFee(tx, nil, fee) if err != nil { return err } pool.qiMu.Lock() - if uint64(len(pool.qiPool))+1 > pool.config.GlobalSlots { + if uint64(len(pool.qiPool))+1 > pool.config.QiPoolSize { // If the pool is full, don't accept the transaction pool.qiMu.Unlock() pool.logger.WithFields(logrus.Fields{ @@ -1190,6 +1194,57 @@ func (pool *TxPool) addQiTx(tx *types.Transaction, grabLock bool) error { return nil } +// addQiTx adds Qi transactions to the Qi pool. +// The qiMu lock must be held by the caller. +func (pool *TxPool) addQiTxsLocked(txs types.Transactions) []error { + errs := make([]error, 0) + currentBlock := pool.chain.CurrentBlock() + gp := types.GasPool(currentBlock.GasLimit()) + etxRLimit := len(currentBlock.Transactions()) / params.ETXRegionMaxFraction + if etxRLimit < params.ETXRLimitMin { + etxRLimit = params.ETXRLimitMin + } + etxPLimit := len(currentBlock.Transactions()) / params.ETXPrimeMaxFraction + if etxPLimit < params.ETXPLimitMin { + etxPLimit = params.ETXPLimitMin + } + for _, tx := range txs { + + fee, _, err := ProcessQiTx(tx, pool.chain, false, true, pool.chain.CurrentBlock(), pool.currentState, &gp, new(uint64), pool.signer, pool.chainconfig.Location, *pool.chainconfig.ChainID, &etxRLimit, &etxPLimit) + if err != nil { + pool.logger.WithFields(logrus.Fields{ + "tx": tx.Hash().String(), + "err": err, + }).Debug("Invalid Qi transaction") + errs = append(errs, err) + continue + } + txWithMinerFee, err := types.NewTxWithMinerFee(tx, nil, fee) + if err != nil { + errs = append(errs, err) + continue + } + if uint64(len(pool.qiPool))+1 > pool.config.QiPoolSize { + // If the pool is full, don't accept the transaction + errs = append(errs, ErrTxPoolOverflow) + continue + } + pool.qiPool[tx.Hash()] = txWithMinerFee + pool.queueTxEvent(tx) + select { + case pool.sendersCh <- newSender{tx.Hash(), common.InternalAddress{}}: // There is no "sender" for Qi transactions, but the sig is good + default: + pool.logger.Error("sendersCh is full, skipping until there is room") + } + pool.logger.WithFields(logrus.Fields{ + "tx": tx.Hash().String(), + "fee": fee, + }).Info("Added qi tx to pool") + qiTxGauge.Add(1) + } + return errs +} + func (pool *TxPool) RemoveQiTx(tx *types.Transaction) { defer func() { if r := recover(); r != nil { @@ -1229,7 +1284,6 @@ func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) ([]error, errs := make([]error, len(txs)) for i, tx := range txs { if tx.Type() == types.QiTxType { - errs[i] = pool.addQiTx(tx, false) continue } replaced, err := pool.add(tx, local) @@ -1704,7 +1758,27 @@ func (pool *TxPool) reset(oldHead, newHead *types.WorkObject) { // Inject any transactions discarded due to reorgs pool.logger.WithField("count", len(reinject)).Debug("Reinjecting stale transactions") senderCacher.recover(pool.signer, reinject) - pool.addTxsLocked(reinject, false) + + qiTxs := make([]*types.Transaction, 0) + for _, tx := range reinject { + if tx.Type() == types.QiTxType { + qiTxs = append(qiTxs, tx) + } + } + var wg sync.WaitGroup + wg.Add(1) + go func() { + pool.addTxsLocked(reinject, false) + wg.Done() + }() + wg.Add(1) + go func() { + pool.qiMu.Lock() + pool.addQiTxsLocked(qiTxs) + pool.qiMu.Unlock() + wg.Done() + }() + wg.Wait() if pool.reOrgCounter == c_reorgCounterThreshold { pool.logger.WithField("time", common.PrettyDuration(time.Since(start))).Debug("Time taken to resetTxPool") }