Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Qi transactions to pool in groups #1727

Merged
merged 4 commits into from
May 17, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 93 additions & 19 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,9 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
pool.locals.add(addr)
}
pool.priced = newTxPricedList(pool.all)
pool.mu.Lock()
pool.reset(nil, chain.CurrentBlock())
pool.mu.Unlock()

// Start the reorg loop early so it can handle requests generated during journal loading.
pool.wg.Add(1)
Expand Down Expand Up @@ -1049,8 +1051,9 @@ func (pool *TxPool) AddRemote(tx *types.Transaction) error {
func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error {
// Filter out known ones without obtaining the pool lock or recovering signatures
var (
errs = make([]error, len(txs))
news = make([]*types.Transaction, 0, len(txs))
errs = make([]error, len(txs))
news = make([]*types.Transaction, 0, len(txs))
qiNews = make([]*types.Transaction, 0, len(txs))
)
for i, tx := range txs {
// If the transaction is known, pre-set the error slot
Expand All @@ -1060,9 +1063,14 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error {
continue
}
if tx.Type() == types.QiTxType {
if err := pool.addQiTx(tx, true); err != nil {
errs[i] = err
pool.qiMu.RLock()
if _, hasTx := pool.qiPool[tx.Hash()]; hasTx {
pool.qiMu.RUnlock()
errs[i] = ErrAlreadyKnown
continue
}
pool.qiMu.RUnlock()
qiNews = append(qiNews, tx)
continue
}
// Exclude transactions with invalid signatures as soon as
Expand Down Expand Up @@ -1096,6 +1104,12 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error {
// Accumulate all unknown transactions for deeper processing
news = append(news, tx)
}
if len(qiNews) > 0 {
pool.qiMu.Lock()
qiErrs := pool.addQiTxsLocked(qiNews)
pool.qiMu.Unlock()
errs = append(errs, qiErrs...)
}
if len(news) == 0 {
return errs
}
Expand Down Expand Up @@ -1124,13 +1138,12 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error {
// addQiTx adds a Qi transaction to the Qi pool.
// If the mempool lock is already held by the caller, the caller must set grabLock to false.
// If the mempool lock is not held by the caller, the caller must set grabLock to true.
func (pool *TxPool) addQiTx(tx *types.Transaction, grabLock bool) error {
func (pool *TxPool) addQiTx(tx *types.Transaction) error {
pool.qiMu.RLock()
if _, hasTx := pool.qiPool[tx.Hash()]; hasTx {
pool.qiMu.RUnlock()
return ErrAlreadyKnown
}

pool.qiMu.RUnlock()

currentBlock := pool.chain.CurrentBlock()
Expand All @@ -1143,29 +1156,20 @@ func (pool *TxPool) addQiTx(tx *types.Transaction, grabLock bool) error {
if etxPLimit < params.ETXPLimitMin {
etxPLimit = params.ETXPLimitMin
}
if grabLock {
pool.mu.RLock() // need to readlock the whole pool because we are reading the current state
}
fee, _, err := ProcessQiTx(tx, pool.chain, false, true, pool.chain.CurrentBlock(), pool.currentState, &gp, new(uint64), pool.signer, pool.chainconfig.Location, *pool.chainconfig.ChainID, &etxRLimit, &etxPLimit)
if err != nil {
if grabLock {
pool.mu.RUnlock()
}
pool.logger.WithFields(logrus.Fields{
"tx": tx.Hash().String(),
"err": err,
}).Error("Invalid Qi transaction")
}).Debug("Invalid Qi transaction")
return err
}
if grabLock {
pool.mu.RUnlock()
}
txWithMinerFee, err := types.NewTxWithMinerFee(tx, nil, fee)
if err != nil {
return err
}
pool.qiMu.Lock()
if uint64(len(pool.qiPool))+1 > pool.config.GlobalSlots {
if uint64(len(pool.qiPool))+1 > pool.config.QiPoolSize {
// If the pool is full, don't accept the transaction
pool.qiMu.Unlock()
pool.logger.WithFields(logrus.Fields{
Expand All @@ -1190,6 +1194,57 @@ func (pool *TxPool) addQiTx(tx *types.Transaction, grabLock bool) error {
return nil
}

// addQiTx adds Qi transactions to the Qi pool.
// The qiMu lock must be held by the caller.
func (pool *TxPool) addQiTxsLocked(txs types.Transactions) []error {
errs := make([]error, 0)
currentBlock := pool.chain.CurrentBlock()
gp := types.GasPool(currentBlock.GasLimit())
etxRLimit := len(currentBlock.Transactions()) / params.ETXRegionMaxFraction
if etxRLimit < params.ETXRLimitMin {
etxRLimit = params.ETXRLimitMin
}
etxPLimit := len(currentBlock.Transactions()) / params.ETXPrimeMaxFraction
if etxPLimit < params.ETXPLimitMin {
etxPLimit = params.ETXPLimitMin
}
for _, tx := range txs {

fee, _, err := ProcessQiTx(tx, pool.chain, false, true, pool.chain.CurrentBlock(), pool.currentState, &gp, new(uint64), pool.signer, pool.chainconfig.Location, *pool.chainconfig.ChainID, &etxRLimit, &etxPLimit)
if err != nil {
pool.logger.WithFields(logrus.Fields{
"tx": tx.Hash().String(),
"err": err,
}).Debug("Invalid Qi transaction")
errs = append(errs, err)
continue
}
txWithMinerFee, err := types.NewTxWithMinerFee(tx, nil, fee)
if err != nil {
errs = append(errs, err)
continue
}
if uint64(len(pool.qiPool))+1 > pool.config.QiPoolSize {
// If the pool is full, don't accept the transaction
errs = append(errs, ErrTxPoolOverflow)
continue
}
pool.qiPool[tx.Hash()] = txWithMinerFee
pool.queueTxEvent(tx)
select {
case pool.sendersCh <- newSender{tx.Hash(), common.InternalAddress{}}: // There is no "sender" for Qi transactions, but the sig is good
default:
pool.logger.Error("sendersCh is full, skipping until there is room")
}
pool.logger.WithFields(logrus.Fields{
"tx": tx.Hash().String(),
"fee": fee,
}).Info("Added qi tx to pool")
qiTxGauge.Add(1)
}
return errs
}

func (pool *TxPool) RemoveQiTx(tx *types.Transaction) {
defer func() {
if r := recover(); r != nil {
Expand Down Expand Up @@ -1229,7 +1284,6 @@ func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) ([]error,
errs := make([]error, len(txs))
for i, tx := range txs {
if tx.Type() == types.QiTxType {
errs[i] = pool.addQiTx(tx, false)
continue
}
replaced, err := pool.add(tx, local)
Expand Down Expand Up @@ -1704,7 +1758,27 @@ func (pool *TxPool) reset(oldHead, newHead *types.WorkObject) {
// Inject any transactions discarded due to reorgs
pool.logger.WithField("count", len(reinject)).Debug("Reinjecting stale transactions")
senderCacher.recover(pool.signer, reinject)
pool.addTxsLocked(reinject, false)

qiTxs := make([]*types.Transaction, 0)
for _, tx := range reinject {
if tx.Type() == types.QiTxType {
qiTxs = append(qiTxs, tx)
}
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
pool.addTxsLocked(reinject, false)
wg.Done()
}()
wg.Add(1)
go func() {
pool.qiMu.Lock()
pool.addQiTxsLocked(qiTxs)
pool.qiMu.Unlock()
wg.Done()
}()
wg.Wait()
if pool.reOrgCounter == c_reorgCounterThreshold {
pool.logger.WithField("time", common.PrettyDuration(time.Since(start))).Debug("Time taken to resetTxPool")
}
Expand Down
Loading