Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mempool: bit of refactoring, chain simulator tests #6654

Draft
wants to merge 8 commits into
base: feat/relayedv3
Choose a base branch
from
148 changes: 148 additions & 0 deletions integrationTests/chainSimulator/mempool/mempool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package mempool

import (
"testing"
"time"

"github.com/multiversx/mx-chain-core-go/data/transaction"
"github.com/multiversx/mx-chain-go/config"
"github.com/multiversx/mx-chain-go/node/chainSimulator/configs"
"github.com/multiversx/mx-chain-go/storage"
"github.com/stretchr/testify/require"
)

func TestMempoolWithChainSimulator_Selection(t *testing.T) {
if testing.Short() {
t.Skip("this is not a short test")
}

numSenders := 10000
numTransactionsPerSender := 3
shard := 0

simulator := startChainSimulator(t, func(cfg *config.Configs) {})
defer simulator.Close()

participants := createParticipants(t, simulator, numSenders)
noncesTracker := newNoncesTracker()

transactions := make([]*transaction.Transaction, 0, numSenders*numTransactionsPerSender)

for i := 0; i < numSenders; i++ {
sender := participants.sendersByShard[shard][i]
receiver := participants.receiverByShard[shard]

for j := 0; j < numTransactionsPerSender; j++ {
tx := &transaction.Transaction{
Nonce: noncesTracker.getThenIncrementNonce(sender),
Value: oneQuarterOfEGLD,
SndAddr: sender.Bytes,
RcvAddr: receiver.Bytes,
Data: []byte{},
GasLimit: 50_000,
GasPrice: 1_000_000_000,
ChainID: []byte(configs.ChainID),
Version: 2,
Signature: []byte("signature"),
}

transactions = append(transactions, tx)
}
}

sendTransactions(t, simulator, transactions)
time.Sleep(durationWaitAfterSendMany)
require.Equal(t, 30_000, getNumTransactionsInPool(simulator, shard))

selectedTransactions, gas := selectTransactions(t, simulator, shard)
require.Equal(t, 30_000, len(selectedTransactions))
require.Equal(t, 50_000*30_000, int(gas))

err := simulator.GenerateBlocks(1)
require.Nil(t, err)
require.Equal(t, 27_756, getNumTransactionsInCurrentBlock(simulator, shard))

selectedTransactions, gas = selectTransactions(t, simulator, shard)
require.Equal(t, 30_000-27_756, len(selectedTransactions))
require.Equal(t, 50_000*(30_000-27_756), int(gas))
}

func TestMempoolWithChainSimulator_Eviction(t *testing.T) {
if testing.Short() {
t.Skip("this is not a short test")
}

numSenders := 10000
numTransactionsPerSender := 30
shard := 0

simulator := startChainSimulator(t, func(cfg *config.Configs) {})
defer simulator.Close()

participants := createParticipants(t, simulator, numSenders)
noncesTracker := newNoncesTracker()

transactions := make([]*transaction.Transaction, 0, numSenders*numTransactionsPerSender)

for i := 0; i < numSenders; i++ {
sender := participants.sendersByShard[shard][i]
receiver := participants.receiverByShard[shard]

for j := 0; j < numTransactionsPerSender; j++ {
tx := &transaction.Transaction{
Nonce: noncesTracker.getThenIncrementNonce(sender),
Value: oneQuarterOfEGLD,
SndAddr: sender.Bytes,
RcvAddr: receiver.Bytes,
Data: []byte{},
GasLimit: 50_000,
GasPrice: 1_000_000_000,
ChainID: []byte(configs.ChainID),
Version: 2,
Signature: []byte("signature"),
}

transactions = append(transactions, tx)
}
}

sendTransactions(t, simulator, transactions)
time.Sleep(durationWaitAfterSendMany)
require.Equal(t, 300_000, getNumTransactionsInPool(simulator, shard))

// Send one more transaction (fill up the mempool)
sendTransaction(t, simulator, &transaction.Transaction{
Nonce: 42,
Value: oneEGLD,
SndAddr: participants.sendersByShard[shard][7].Bytes,
RcvAddr: participants.receiverByShard[shard].Bytes,
Data: []byte{},
GasLimit: 50000,
GasPrice: 1_000_000_000,
ChainID: []byte(configs.ChainID),
Version: 2,
Signature: []byte("signature"),
})

time.Sleep(durationWaitAfterSendSome)
require.Equal(t, 300_001, getNumTransactionsInPool(simulator, shard))

// Send one more transaction to trigger eviction
sendTransaction(t, simulator, &transaction.Transaction{
Nonce: 42,
Value: oneEGLD,
SndAddr: participants.sendersByShard[shard][7].Bytes,
RcvAddr: participants.receiverByShard[shard].Bytes,
Data: []byte{},
GasLimit: 50000,
GasPrice: 1_000_000_000,
ChainID: []byte(configs.ChainID),
Version: 2,
Signature: []byte("signature"),
})

time.Sleep(1 * time.Second)

expectedNumTransactionsInPool := 300_000 + 1 + 1 - int(storage.TxPoolSourceMeNumItemsToPreemptivelyEvict)
require.Equal(t, expectedNumTransactionsInPool, getNumTransactionsInPool(simulator, shard))
}
174 changes: 174 additions & 0 deletions integrationTests/chainSimulator/mempool/testutils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package mempool

import (
"math/big"
"strconv"
"testing"
"time"

"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/data/transaction"
"github.com/multiversx/mx-chain-go/config"
testsChainSimulator "github.com/multiversx/mx-chain-go/integrationTests/chainSimulator"
"github.com/multiversx/mx-chain-go/node/chainSimulator"
"github.com/multiversx/mx-chain-go/node/chainSimulator/components/api"
"github.com/multiversx/mx-chain-go/node/chainSimulator/dtos"
"github.com/multiversx/mx-chain-go/process"
"github.com/multiversx/mx-chain-go/process/block/preprocess"
"github.com/multiversx/mx-chain-go/storage/txcache"
"github.com/multiversx/mx-chain-go/testscommon"
"github.com/stretchr/testify/require"
)

var (
oneEGLD = big.NewInt(1000000000000000000)
oneQuarterOfEGLD = big.NewInt(250000000000000000)
oneCentOfEGLD = big.NewInt(10000000000000000)

Check failure on line 26 in integrationTests/chainSimulator/mempool/testutils_test.go

View workflow job for this annotation

GitHub Actions / golangci linter

var `oneCentOfEGLD` is unused (unused)
durationWaitAfterSendMany = 750 * time.Millisecond
durationWaitAfterSendSome = 10 * time.Millisecond
)

func startChainSimulator(t *testing.T, alterConfigsFunction func(cfg *config.Configs)) testsChainSimulator.ChainSimulator {
simulator, err := chainSimulator.NewChainSimulator(chainSimulator.ArgsChainSimulator{
BypassTxSignatureCheck: true,
TempDir: t.TempDir(),
PathToInitialConfig: "../../../cmd/node/config/",
NumOfShards: 1,
GenesisTimestamp: time.Now().Unix(),
RoundDurationInMillis: uint64(4000),
RoundsPerEpoch: core.OptionalUint64{
HasValue: true,
Value: 10,
},
ApiInterface: api.NewNoApiInterface(),
MinNodesPerShard: 1,
MetaChainMinNodes: 1,
NumNodesWaitingListMeta: 0,
NumNodesWaitingListShard: 0,
AlterConfigsFunction: alterConfigsFunction,
})
require.NoError(t, err)
require.NotNil(t, simulator)

err = simulator.GenerateBlocksUntilEpochIsReached(1)
require.NoError(t, err)

return simulator
}

type participantsHolder struct {
sendersByShard map[int][]dtos.WalletAddress
receiverByShard map[int]dtos.WalletAddress
}

func newParticipantsHolder() *participantsHolder {
return &participantsHolder{
sendersByShard: make(map[int][]dtos.WalletAddress),
receiverByShard: make(map[int]dtos.WalletAddress),
}
}

func createParticipants(t *testing.T, simulator testsChainSimulator.ChainSimulator, numSendersPerShard int) *participantsHolder {
numShards := int(simulator.GetNodeHandler(0).GetShardCoordinator().NumberOfShards())
participants := newParticipantsHolder()

for shard := 0; shard < numShards; shard++ {
senders := make([]dtos.WalletAddress, 0, numSendersPerShard)

for i := 0; i < numSendersPerShard; i++ {
sender, err := simulator.GenerateAndMintWalletAddress(uint32(shard), oneEGLD)
require.NoError(t, err)

senders = append(senders, sender)
}

receiver, err := simulator.GenerateAndMintWalletAddress(0, big.NewInt(0))
require.NoError(t, err)

participants.sendersByShard[shard] = senders
participants.receiverByShard[shard] = receiver
}

err := simulator.GenerateBlocks(1)
require.Nil(t, err)

return participants
}

type noncesTracker struct {
nonceByAddress map[string]uint64
}

func newNoncesTracker() *noncesTracker {
return &noncesTracker{
nonceByAddress: make(map[string]uint64),
}
}

func (tracker *noncesTracker) getThenIncrementNonce(address dtos.WalletAddress) uint64 {
nonce, ok := tracker.nonceByAddress[address.Bech32]
if !ok {
tracker.nonceByAddress[address.Bech32] = 0
}

tracker.nonceByAddress[address.Bech32]++
return nonce
}

func sendTransactions(t *testing.T, simulator testsChainSimulator.ChainSimulator, transactions []*transaction.Transaction) {
transactionsBySenderShard := make(map[int][]*transaction.Transaction)
shardCoordinator := simulator.GetNodeHandler(0).GetShardCoordinator()

for _, tx := range transactions {
shard := int(shardCoordinator.ComputeId(tx.SndAddr))
transactionsBySenderShard[shard] = append(transactionsBySenderShard[shard], tx)
}

for shard, transactionsFromShard := range transactionsBySenderShard {
node := simulator.GetNodeHandler(uint32(shard))
numSent, err := node.GetFacadeHandler().SendBulkTransactions(transactionsFromShard)

require.NoError(t, err)
require.Equal(t, len(transactionsFromShard), int(numSent))
}
}

func sendTransaction(t *testing.T, simulator testsChainSimulator.ChainSimulator, tx *transaction.Transaction) {
sendTransactions(t, simulator, []*transaction.Transaction{tx})
}

func selectTransactions(t *testing.T, simulator testsChainSimulator.ChainSimulator, shard int) ([]*txcache.WrappedTransaction, uint64) {
shardAsString := strconv.Itoa(shard)
node := simulator.GetNodeHandler(uint32(shard))
accountsAdapter := node.GetStateComponents().AccountsAdapter()
poolsHolder := node.GetDataComponents().Datapool().Transactions()

selectionSession, err := preprocess.NewSelectionSession(preprocess.ArgsSelectionSession{
AccountsAdapter: accountsAdapter,
TransactionsProcessor: &testscommon.TxProcessorStub{},
})
require.NoError(t, err)

mempool := poolsHolder.ShardDataStore(shardAsString).(*txcache.TxCache)

selectedTransactions, gas := mempool.SelectTransactions(
selectionSession,
process.TxCacheSelectionGasRequested,
process.TxCacheSelectionMaxNumTxs,
process.TxCacheSelectionLoopMaximumDuration,
)

return selectedTransactions, gas
}

func getNumTransactionsInPool(simulator testsChainSimulator.ChainSimulator, shard int) int {
node := simulator.GetNodeHandler(uint32(shard))
poolsHolder := node.GetDataComponents().Datapool().Transactions()
return int(poolsHolder.GetCounts().GetTotal())
}

func getNumTransactionsInCurrentBlock(simulator testsChainSimulator.ChainSimulator, shard int) int {
node := simulator.GetNodeHandler(uint32(shard))
currentBlock := node.GetDataComponents().Blockchain().GetCurrentBlockHeader()
return int(currentBlock.GetTxCount())
}
18 changes: 10 additions & 8 deletions process/block/preprocess/selectionSession.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,24 @@ type selectionSession struct {
ephemeralAccountsCache map[string]vmcommon.AccountHandler
}

type argsSelectionSession struct {
accountsAdapter state.AccountsAdapter
transactionsProcessor process.TransactionProcessor
// ArgsSelectionSession holds the arguments for creating a new selection session.
type ArgsSelectionSession struct {
AccountsAdapter state.AccountsAdapter
TransactionsProcessor process.TransactionProcessor
}

func newSelectionSession(args argsSelectionSession) (*selectionSession, error) {
if check.IfNil(args.accountsAdapter) {
// NewSelectionSession creates a new selection session.
func NewSelectionSession(args ArgsSelectionSession) (*selectionSession, error) {
if check.IfNil(args.AccountsAdapter) {
return nil, process.ErrNilAccountsAdapter
}
if check.IfNil(args.transactionsProcessor) {
if check.IfNil(args.TransactionsProcessor) {
return nil, process.ErrNilTxProcessor
}

return &selectionSession{
accountsAdapter: args.accountsAdapter,
transactionsProcessor: args.transactionsProcessor,
accountsAdapter: args.AccountsAdapter,
transactionsProcessor: args.TransactionsProcessor,
ephemeralAccountsCache: make(map[string]vmcommon.AccountHandler),
}, nil
}
Expand Down
Loading
Loading