Skip to content

Commit

Permalink
Updated senders cache to LRU
Browse files Browse the repository at this point in the history
  • Loading branch information
jdowning100 committed May 8, 2024
1 parent 0fc7d31 commit 5bb5680
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 72 deletions.
78 changes: 49 additions & 29 deletions core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,18 +237,27 @@ func (p *StateProcessor) Process(block *types.WorkObject, etxSet *types.EtxSet)
startTimeSenders := time.Now()
senders := make(map[common.Hash]*common.InternalAddress) // temporary cache for senders of internal txs
numInternalTxs := 0
p.hc.pool.SendersMutex.RLock()
for _, tx := range block.Transactions() { // get all senders of internal txs from cache - easier on the SendersMutex to do it all at once here
p.hc.pool.SendersMu.RLock() // Prevent the txpool from grabbing the lock during the entire block tx lookup
for i, tx := range block.Transactions() { // get all senders of internal txs from cache
if i == 0 && types.IsCoinBaseTx(tx, header.ParentHash(nodeCtx), nodeLocation) {
// coinbase tx is not in senders cache
continue
}
if tx.Type() == types.QuaiTxType {
numInternalTxs++
if sender, ok := p.hc.pool.GetSenderThreadUnsafe(tx.Hash()); ok {
if sender, ok := p.hc.pool.PeekSenderNoLock(tx.Hash()); ok {
senders[tx.Hash()] = &sender // This pointer must never be modified
} else {
// TODO: calcuate the sender and add it to the pool senders cache in case of reorg (not necessary for now)
}
} else if tx.Type() == types.QiTxType {
numInternalTxs++
if _, ok := p.hc.pool.PeekSenderNoLock(tx.Hash()); ok {
senders[tx.Hash()] = &common.InternalAddress{}
}
}
}
p.hc.pool.SendersMutex.RUnlock()
p.hc.pool.SendersMu.RUnlock()
timeSenders = time.Since(startTimeSenders)
blockContext := NewEVMBlockContext(header, p.hc, nil)
vmenv := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, p.vmConfig)
Expand All @@ -268,21 +277,28 @@ func (p *StateProcessor) Process(block *types.WorkObject, etxSet *types.EtxSet)
totalEtxGas := uint64(0)
totalFees := big.NewInt(0)
qiEtxs := make([]*types.Transaction, 0)
var totalQiTime time.Duration
for i, tx := range block.Transactions() {
if i == 0 && types.IsCoinBaseTx(tx, header.ParentHash(nodeCtx), nodeLocation) {
// coinbase tx currently exempt from gas and outputs are added after all txs are processed
continue
}
startProcess := time.Now()
if tx.Type() == types.QiTxType {
fees, etxs, err := ProcessQiTx(tx, p.hc, true, header, statedb, gp, usedGas, p.hc.pool.signer, p.hc.NodeLocation(), *p.config.ChainID, &etxRLimit, &etxPLimit)
qiTimeBefore := time.Now()
checkSig := true
if _, ok := senders[tx.Hash()]; ok {
checkSig = false
}
fees, etxs, err := ProcessQiTx(tx, p.hc, true, checkSig, header, statedb, gp, usedGas, p.hc.pool.signer, p.hc.NodeLocation(), *p.config.ChainID, &etxRLimit, &etxPLimit)
if err != nil {
return nil, nil, nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err)
}
for _, etx := range etxs {
qiEtxs = append(qiEtxs, types.NewTx(etx))
}
totalFees.Add(totalFees, fees)
totalQiTime += time.Since(qiTimeBefore)
continue
}

Expand Down Expand Up @@ -489,9 +505,10 @@ func (p *StateProcessor) Process(block *types.WorkObject, etxSet *types.EtxSet)
p.logger.WithFields(log.Fields{
"signing time": common.PrettyDuration(timeSign),
"prepare state time": common.PrettyDuration(timePrepare),
"etx time": common.PrettyDuration(timeEtx),
"tx time": common.PrettyDuration(timeTx),
}).Debug("Total Tx Processing Time")
"etxTime": common.PrettyDuration(timeEtx),
"txTime": common.PrettyDuration(timeTx),
"totalQiTime": common.PrettyDuration(totalQiTime),
}).Info("Total Tx Processing Time")

p.logger.WithFields(log.Fields{
"time1": time1,
Expand Down Expand Up @@ -580,7 +597,7 @@ func applyTransaction(msg types.Message, parent *types.WorkObject, config *param
// ProcessQiTx processes a QiTx by spending the inputs and creating the outputs.
// Math is performed to verify the fee provided is sufficient to cover the gas cost.
// updateState is set to update the statedb in the case of the state processor, but not in the case of the txpool.
func ProcessQiTx(tx *types.Transaction, chain ChainContext, updateState bool, currentHeader *types.WorkObject, statedb *state.StateDB, gp *types.GasPool, usedGas *uint64, signer types.Signer, location common.Location, chainId big.Int, etxRLimit, etxPLimit *int) (*big.Int, []*types.ExternalTx, error) {
func ProcessQiTx(tx *types.Transaction, chain ChainContext, updateState bool, checkSig bool, currentHeader *types.WorkObject, statedb *state.StateDB, gp *types.GasPool, usedGas *uint64, signer types.Signer, location common.Location, chainId big.Int, etxRLimit, etxPLimit *int) (*big.Int, []*types.ExternalTx, error) {
// Sanity checks
if tx == nil || tx.Type() != types.QiTxType {
return nil, nil, fmt.Errorf("tx %032x is not a QiTx", tx.Hash())
Expand Down Expand Up @@ -616,14 +633,15 @@ func ProcessQiTx(tx *types.Transaction, chain ChainContext, updateState bool, cu
address := crypto.PubkeyBytesToAddress(txIn.PubKey, location)
entryAddr := common.BytesToAddress(utxo.Address, location)
if !address.Equal(entryAddr) {
return nil, nil, errors.New("invalid address")
return nil, nil, fmt.Errorf("tx %032x spends UTXO %032x:%d with invalid pubkey, have %s want %s", tx.Hash(), txIn.PreviousOutPoint.TxHash, txIn.PreviousOutPoint.Index, address.String(), entryAddr.String())
}

pubKey, err := btcec.ParsePubKey(txIn.PubKey)
if err != nil {
return nil, nil, err
if checkSig {
pubKey, err := btcec.ParsePubKey(txIn.PubKey)
if err != nil {
return nil, nil, err
}
pubKeys = append(pubKeys, pubKey)
}
pubKeys = append(pubKeys, pubKey)
// Check for duplicate addresses. This also checks for duplicate inputs.
if _, exists := addresses[common.AddressBytes(utxo.Address)]; exists {
return nil, nil, errors.New("Duplicate address in QiTx inputs: " + common.AddressBytes(utxo.Address).String())
Expand Down Expand Up @@ -734,22 +752,24 @@ func ProcessQiTx(tx *types.Transaction, chain ChainContext, updateState bool, cu
}

// Ensure the transaction signature is valid
var finalKey *btcec.PublicKey
if len(tx.TxIn()) > 1 {
aggKey, _, _, err := musig2.AggregateKeys(
pubKeys, false,
)
if err != nil {
return nil, nil, err
if checkSig {
var finalKey *btcec.PublicKey
if len(tx.TxIn()) > 1 {
aggKey, _, _, err := musig2.AggregateKeys(
pubKeys, false,
)
if err != nil {
return nil, nil, err
}
finalKey = aggKey.FinalKey
} else {
finalKey = pubKeys[0]
}
finalKey = aggKey.FinalKey
} else {
finalKey = pubKeys[0]
}

txDigestHash := signer.Hash(tx)
if !tx.GetSchnorrSignature().Verify(txDigestHash[:], finalKey) {
return nil, nil, errors.New("invalid signature for digest hash " + txDigestHash.String())
txDigestHash := signer.Hash(tx)
if !tx.GetSchnorrSignature().Verify(txDigestHash[:], finalKey) {
return nil, nil, errors.New("invalid signature for digest hash " + txDigestHash.String())
}
}
// the fee to pay the basefee/miner is the difference between inputs and outputs
txFeeInQit := new(big.Int).Sub(totalQitIn, totalQitOut)
Expand Down
93 changes: 50 additions & 43 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ import (
"github.com/dominant-strategies/go-quai/log"
"github.com/dominant-strategies/go-quai/metrics_config"
"github.com/dominant-strategies/go-quai/params"
lru "github.com/hashicorp/golang-lru"
"github.com/sirupsen/logrus"
orderedmap "github.com/wk8/go-ordered-map/v2"
)

const (
Expand Down Expand Up @@ -280,19 +280,19 @@ type TxPool struct {
pendingNonces *txNoncer // Pending state tracking virtual nonces
currentMaxGas uint64 // Current gas limit for transaction caps

locals *accountSet // Set of local transaction to exempt from eviction rules
journal *txJournal // Journal of local transaction to back up to disk
qiPool map[common.Hash]*types.TxWithMinerFee // Qi pool to store Qi transactions
pending map[common.InternalAddress]*txList // All currently processable transactions
queue map[common.InternalAddress]*txList // Queued but non-processable transactions
beats map[common.InternalAddress]time.Time // Last heartbeat from each known account
all *txLookup // All transactions to allow lookups
priced *txPricedList // All transactions sorted by price
senders *orderedmap.OrderedMap[common.Hash, common.InternalAddress] // Tx hash to sender lookup cache (async populated)
sendersCh chan newSender // Channel for async senders cache goroutine
SendersMutex sync.RWMutex // Mutex for senders map
localTxsCount int // count of txs in last 1 min. Purely for logging purpose
remoteTxsCount int // count of txs in last 1 min. Purely for logging purpose
locals *accountSet // Set of local transaction to exempt from eviction rules
journal *txJournal // Journal of local transaction to back up to disk
qiPool map[common.Hash]*types.TxWithMinerFee // Qi pool to store Qi transactions
pending map[common.InternalAddress]*txList // All currently processable transactions
queue map[common.InternalAddress]*txList // Queued but non-processable transactions
beats map[common.InternalAddress]time.Time // Last heartbeat from each known account
all *txLookup // All transactions to allow lookups
priced *txPricedList // All transactions sorted by price
senders *lru.Cache // Tx hash to sender lookup cache (async populated)
sendersCh chan newSender // Channel for async senders cache goroutine
SendersMu sync.RWMutex // Mutex for priority access of senders cache
localTxsCount int // count of txs in last 1 min. Purely for logging purpose
remoteTxsCount int // count of txs in last 1 min. Purely for logging purpose

reOrgCounter int // keeps track of the number of times the runReorg is called, it is reset every c_reorgCounterThreshold times

Expand Down Expand Up @@ -357,7 +357,6 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
qiPool: make(map[common.Hash]*types.TxWithMinerFee),
queue: make(map[common.InternalAddress]*txList),
beats: make(map[common.InternalAddress]time.Time),
senders: orderedmap.New[common.Hash, common.InternalAddress](),
sendersCh: make(chan newSender, config.SendersChBuffer),
all: newTxLookup(),
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
Expand All @@ -372,6 +371,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
reOrgCounter: 0,
logger: logger,
}
pool.senders, _ = lru.New(int(config.MaxSenders))
pool.locals = newAccountSet(pool.signer)
for _, addr := range config.Locals {
logger.WithField("address", addr).Debug("Setting new local account")
Expand Down Expand Up @@ -717,7 +717,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
if err != nil {
return err
}
} else if sender, found := pool.GetSender(tx.Hash()); found {
} else if sender, found := pool.PeekSender(tx.Hash()); found {
internal = sender
addToCache = false
} else {
Expand Down Expand Up @@ -758,9 +758,6 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
}).Warn("tx has insufficient gas")
return ErrIntrinsicGas
}
if len(pool.sendersCh) == int(pool.config.SendersChBuffer) {
pool.logger.Error("sendersCh is full, skipping until there is room")
}
if addToCache {
select {
case pool.sendersCh <- newSender{tx.Hash(), internal}: // Non-blocking
Expand Down Expand Up @@ -1073,7 +1070,7 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error {
invalidTxMeter.Add(1)
continue
}
} else if _, found := pool.GetSender(tx.Hash()); found {
} else if found := pool.ContainsSender(tx.Hash()); found {
// if the sender is cached in the tx or in the pool cache, we don't need to add it into the cache
} else {
from, err := types.Sender(pool.signer, tx)
Expand Down Expand Up @@ -1143,7 +1140,7 @@ func (pool *TxPool) addQiTx(tx *types.Transaction, grabLock bool) error {
if grabLock {
pool.mu.RLock() // need to readlock the whole pool because we are reading the current state
}
fee, _, err := ProcessQiTx(tx, pool.chain, false, pool.chain.CurrentBlock(), pool.currentState, &gp, new(uint64), pool.signer, pool.chainconfig.Location, *pool.chainconfig.ChainID, &etxRLimit, &etxPLimit)
fee, _, err := ProcessQiTx(tx, pool.chain, false, true, pool.chain.CurrentBlock(), pool.currentState, &gp, new(uint64), pool.signer, pool.chainconfig.Location, *pool.chainconfig.ChainID, &etxRLimit, &etxPLimit)
if err != nil {
if grabLock {
pool.mu.RUnlock()
Expand All @@ -1165,6 +1162,11 @@ func (pool *TxPool) addQiTx(tx *types.Transaction, grabLock bool) error {
pool.qiPool[tx.Hash()] = txWithMinerFee
pool.qiMu.Unlock()
pool.queueTxEvent(tx)
select {
case pool.sendersCh <- newSender{tx.Hash(), common.InternalAddress{}}: // There is no "sender" for Qi transactions, but the sig is good
default:
pool.logger.Error("sendersCh is full, skipping until there is room")
}
pool.logger.WithFields(logrus.Fields{
"tx": tx.Hash().String(),
"fee": fee,
Expand Down Expand Up @@ -1969,24 +1971,36 @@ func (pool *TxPool) demoteUnexecutables() {
}
}

// GetSender returns the sender of a stored transaction.
func (pool *TxPool) GetSender(hash common.Hash) (common.InternalAddress, bool) {
pool.SendersMutex.RLock()
defer pool.SendersMutex.RUnlock()
return pool.senders.Get(hash)
// PeekSender returns the sender of a stored transaction without updating the LRU cache and without grabbing
// the SendersMu lock.
func (pool *TxPool) PeekSenderNoLock(hash common.Hash) (common.InternalAddress, bool) {
addr, ok := pool.senders.Peek(hash)
if ok {
return addr.(common.InternalAddress), true
}
return common.InternalAddress{}, false
}

// GetSenderThreadUnsafe returns the sender of a stored transaction.
// It is not thread safe and should only be used when the pool senders mutex is locked.
func (pool *TxPool) GetSenderThreadUnsafe(hash common.Hash) (common.InternalAddress, bool) {
return pool.senders.Get(hash)
func (pool *TxPool) ContainsSender(hash common.Hash) bool {
pool.SendersMu.RLock()
defer pool.SendersMu.RUnlock()
return pool.senders.Contains(hash)
}

// SetSender caches the sender of a transaction.
func (pool *TxPool) SetSender(hash common.Hash, address common.InternalAddress) {
pool.SendersMutex.Lock()
defer pool.SendersMutex.Unlock()
pool.senders.Set(hash, address)
func (pool *TxPool) ContainsOrAddSender(hash common.Hash, sender common.InternalAddress) (bool, bool) {
pool.SendersMu.Lock()
defer pool.SendersMu.Unlock()
return pool.senders.ContainsOrAdd(hash, sender)
}

func (pool *TxPool) PeekSender(hash common.Hash) (common.InternalAddress, bool) {
pool.SendersMu.RLock()
defer pool.SendersMu.RUnlock()
addr, ok := pool.senders.Peek(hash)
if ok {
return addr.(common.InternalAddress), true
}
return common.InternalAddress{}, false
}

// sendersGoroutine asynchronously adds a new sender to the cache
Expand All @@ -2006,19 +2020,12 @@ func (pool *TxPool) sendersGoroutine() {
return
case tx := <-pool.sendersCh:
// Add transaction to sender cache
pool.SendersMutex.Lock() // We could RLock here but it's unlikely to just be a read
if _, ok := pool.senders.Get(tx.hash); !ok {
pool.senders.Set(tx.hash, tx.sender)
if pool.senders.Len() > int(pool.config.MaxSenders) {
pool.senders.Delete(pool.senders.Oldest().Key) // FIFO
}
} else {
if contains, _ := pool.ContainsOrAddSender(tx.hash, tx.sender); contains {
pool.logger.WithFields(log.Fields{
"tx": tx.hash.String(),
"sender": tx.sender.String(),
}).Debug("Tx already seen in sender cache (reorg?)")
}
pool.SendersMutex.Unlock()

case <-resetMetersTicker.C:
// Reset the tx meters every 5 minutes
Expand Down

0 comments on commit 5bb5680

Please sign in to comment.