diff --git a/node/node.go b/node/node.go index 608df5b90..466c9cfdb 100644 --- a/node/node.go +++ b/node/node.go @@ -59,13 +59,13 @@ func NewNode(genDoc *genesis.Genesis, conf *config.Config, eventCh = nil } - txPool := txpool.NewTxPool(conf.TxPool, messageCh) - str, err := store.NewStore(conf.Store) if err != nil { return nil, err } + txPool := txpool.NewTxPool(conf.TxPool, str, messageCh) + st, err := state.LoadOrNewState(genDoc, valKeys, str, txPool, eventCh) if err != nil { return nil, err diff --git a/state/state.go b/state/state.go index 1b1906906..bdb4046f6 100644 --- a/state/state.go +++ b/state/state.go @@ -325,7 +325,6 @@ func (st *state) ProposeBlock(valKey *bls.ValidatorKey, rewardAddr crypto.Addres // Only one subsidy transaction per blk if txs[i].IsSubsidyTx() { st.logger.Error("found duplicated subsidy transaction", "tx", txs[i]) - st.txPool.RemoveTx(txs[i].ID()) txs.Remove(i) i-- @@ -434,15 +433,15 @@ func (st *state) CommitBlock(blk *block.Block, cert *certificate.BlockCertificat st.store.SaveBlock(blk, cert) - // Remove transactions from pool - for _, trx := range blk.Transactions() { - st.txPool.RemoveTx(trx.ID()) - } - if err := st.store.WriteBatch(); err != nil { st.logger.Panic("unable to update state", "error", err) } + // Remove transactions from pool and update consumption + if err := st.txPool.HandleCommittedBlock(blk); err != nil { + return err + } + st.logger.Info("new block committed", "block", blk, "round", cert.Round()) st.evaluateSortition() diff --git a/txpool/config.go b/txpool/config.go index 02b86242c..9c64f7769 100644 --- a/txpool/config.go +++ b/txpool/config.go @@ -7,6 +7,9 @@ import ( type Config struct { MaxSize int `toml:"max_size"` Fee *FeeConfig `toml:"fee"` + + // Private configs + ConsumptionWindow uint32 `toml:"-"` } type FeeConfig struct { @@ -17,8 +20,9 @@ type FeeConfig struct { func DefaultConfig() *Config { return &Config{ - MaxSize: 1000, - Fee: DefaultFeeConfig(), + MaxSize: 1000, + Fee: DefaultFeeConfig(), + ConsumptionWindow: 8640, } } diff --git a/txpool/interface.go b/txpool/interface.go index 895bd5a75..34ee3b4ac 100644 --- a/txpool/interface.go +++ b/txpool/interface.go @@ -23,5 +23,5 @@ type TxPool interface { SetNewSandboxAndRecheck(sb sandbox.Sandbox) AppendTxAndBroadcast(trx *tx.Tx) error AppendTx(trx *tx.Tx) error - RemoveTx(id tx.ID) + HandleCommittedBlock(blk *block.Block) error } diff --git a/txpool/mock.go b/txpool/mock.go index 5b5fa5ea6..e305a6aca 100644 --- a/txpool/mock.go +++ b/txpool/mock.go @@ -79,6 +79,10 @@ func (m *MockTxPool) RemoveTx(id hash.Hash) { } } +func (*MockTxPool) HandleCommittedBlock(_ *block.Block) error { + return nil +} + func (m *MockTxPool) PrepareBlockTransactions() block.Txs { txs := make([]*tx.Tx, m.Size()) copy(txs, m.Txs) diff --git a/txpool/txpool.go b/txpool/txpool.go index e0b725b7c..5c1dd02bb 100644 --- a/txpool/txpool.go +++ b/txpool/txpool.go @@ -4,8 +4,10 @@ import ( "fmt" "sync" + "github.com/pactus-project/pactus/crypto" "github.com/pactus-project/pactus/execution" "github.com/pactus-project/pactus/sandbox" + "github.com/pactus-project/pactus/store" "github.com/pactus-project/pactus/sync/bundle/message" "github.com/pactus-project/pactus/types/amount" "github.com/pactus-project/pactus/types/block" @@ -19,14 +21,18 @@ import ( type txPool struct { lk sync.RWMutex - config *Config - sandbox sandbox.Sandbox - pools map[payload.Type]pool - broadcastCh chan message.Message - logger *logger.SubLogger + config *Config + sandbox sandbox.Sandbox + pools map[payload.Type]pool + consumptionMap map[crypto.Address]uint32 + broadcastCh chan message.Message + strReader store.Reader + logger *logger.SubLogger } -func NewTxPool(conf *Config, broadcastCh chan message.Message) TxPool { +// NewTxPool constructs a new transaction pool with various sub-pools for different transaction types. +// The transaction pool also maintains a consumption map for tracking byte usage per address. +func NewTxPool(conf *Config, storeReader store.Reader, broadcastCh chan message.Message) TxPool { pools := make(map[payload.Type]pool) pools[payload.TypeTransfer] = newPool(conf.transferPoolSize(), conf.minFee()) pools[payload.TypeBond] = newPool(conf.bondPoolSize(), conf.minFee()) @@ -35,9 +41,11 @@ func NewTxPool(conf *Config, broadcastCh chan message.Message) TxPool { pools[payload.TypeSortition] = newPool(conf.sortitionPoolSize(), 0) pool := &txPool{ - config: conf, - pools: pools, - broadcastCh: broadcastCh, + config: conf, + pools: pools, + consumptionMap: make(map[crypto.Address]uint32), + strReader: storeReader, + broadcastCh: broadcastCh, } pool.logger = logger.NewSubLogger("_pool", pool) @@ -45,6 +53,8 @@ func NewTxPool(conf *Config, broadcastCh chan message.Message) TxPool { return pool } +// SetNewSandboxAndRecheck updates the sandbox and rechecks all transactions, +// removing expired or invalid ones. func (p *txPool) SetNewSandboxAndRecheck(sb sandbox.Sandbox) { p.lk.Lock() defer p.lk.Unlock() @@ -133,10 +143,67 @@ func (p *txPool) checkTx(trx *tx.Tx) error { return nil } -func (p *txPool) RemoveTx(id tx.ID) { +func (p *txPool) HandleCommittedBlock(blk *block.Block) error { p.lk.Lock() defer p.lk.Unlock() + for _, trx := range blk.Transactions() { + p.removeTx(trx.ID()) + + p.handleIncreaseConsumption(trx) + } + + return p.handleDecreaseConsumption(blk.Height()) +} + +func (p *txPool) handleIncreaseConsumption(trx *tx.Tx) { + if trx.IsTransferTx() || trx.IsBondTx() || trx.IsWithdrawTx() { + signer := trx.Payload().Signer() + + p.consumptionMap[signer] += uint32(trx.SerializeSize()) + } +} + +func (p *txPool) handleDecreaseConsumption(height uint32) error { + // If height is less than or equal to ConsumptionWindow, nothing to do. + if height <= p.config.ConsumptionWindow { + return nil + } + + // Calculate the block height that has passed out of the consumption window. + windowedBlockHeight := height - p.config.ConsumptionWindow + committedBlock, err := p.strReader.Block(windowedBlockHeight) + if err != nil { + return err + } + + blk, err := committedBlock.ToBlock() + if err != nil { + return err + } + + for _, trx := range blk.Transactions() { + if trx.IsTransferTx() || trx.IsBondTx() || trx.IsWithdrawTx() { + signer := trx.Payload().Signer() + if v, ok := p.consumptionMap[signer]; ok { + // Decrease the consumption by the size of the transaction + v -= uint32(trx.SerializeSize()) + + if v == 0 { + // If the new value is zero, remove the signer from the consumptionMap + delete(p.consumptionMap, signer) + } else { + // Otherwise, update the map with the new value + p.consumptionMap[signer] = v + } + } + } + } + + return nil +} + +func (p *txPool) removeTx(id tx.ID) { for _, pool := range p.pools { if pool.list.Remove(id) { break diff --git a/txpool/txpool_test.go b/txpool/txpool_test.go index f1e0570bb..452e31c06 100644 --- a/txpool/txpool_test.go +++ b/txpool/txpool_test.go @@ -5,8 +5,10 @@ import ( "testing" "time" + "github.com/pactus-project/pactus/crypto" "github.com/pactus-project/pactus/execution" "github.com/pactus-project/pactus/sandbox" + "github.com/pactus-project/pactus/store" "github.com/pactus-project/pactus/sync/bundle/message" "github.com/pactus-project/pactus/types/account" "github.com/pactus-project/pactus/types/tx" @@ -22,12 +24,14 @@ type testData struct { pool *txPool sandbox *sandbox.MockSandbox + str *store.MockStore ch chan message.Message } func testConfig() *Config { return &Config{ - MaxSize: 10, + MaxSize: 10, + ConsumptionWindow: 3, Fee: &FeeConfig{ FixedFee: 0.000001, DailyLimit: 280, @@ -44,7 +48,8 @@ func setup(t *testing.T) *testData { ch := make(chan message.Message, 10) sb := sandbox.MockingSandbox(ts) config := testConfig() - p := NewTxPool(config, ch) + mockStore := store.MockingStore(ts) + p := NewTxPool(config, mockStore, ch) p.SetNewSandboxAndRecheck(sb) pool := p.(*txPool) assert.NotNil(t, pool) @@ -53,6 +58,7 @@ func setup(t *testing.T) *testData { TestSuite: ts, pool: pool, sandbox: sb, + str: mockStore, ch: ch, } } @@ -96,11 +102,62 @@ func TestAppendAndRemove(t *testing.T) { // Appending the same transaction again, should not return any error assert.NoError(t, td.pool.AppendTx(testTrx)) - td.pool.RemoveTx(testTrx.ID()) + td.pool.removeTx(testTrx.ID()) assert.False(t, td.pool.HasTx(testTrx.ID()), "Transaction should be removed") assert.Nil(t, td.pool.PendingTx(testTrx.ID())) } +func TestCalculatingConsumption(t *testing.T) { + td := setup(t) + + // Generate keys for different transaction signers + _, prv1 := td.RandEd25519KeyPair() + _, prv2 := td.RandEd25519KeyPair() + _, prv3 := td.RandBLSKeyPair() + _, prv4 := td.RandBLSKeyPair() + + // Generate different types of transactions + trx11 := td.GenerateTestTransferTx(testsuite.TransactionWithEd25519Signer(prv1)) + trx12 := td.GenerateTestBondTx(testsuite.TransactionWithEd25519Signer(prv1)) + trx13 := td.GenerateTestWithdrawTx(testsuite.TransactionWithBLSSigner(prv3)) + trx14 := td.GenerateTestUnbondTx(testsuite.TransactionWithBLSSigner(prv4)) + trx21 := td.GenerateTestTransferTx(testsuite.TransactionWithEd25519Signer(prv2)) + trx31 := td.GenerateTestBondTx(testsuite.TransactionWithBLSSigner(prv4)) + trx41 := td.GenerateTestWithdrawTx(testsuite.TransactionWithBLSSigner(prv3)) + trx42 := td.GenerateTestTransferTx(testsuite.TransactionWithEd25519Signer(prv2)) + + // Expected consumption map after transactions + expected := map[crypto.Address]uint32{ + prv2.PublicKeyNative().AccountAddress(): uint32(trx21.SerializeSize()) + uint32(trx42.SerializeSize()), + prv4.PublicKeyNative().AccountAddress(): uint32(trx31.SerializeSize()), + prv3.PublicKeyNative().ValidatorAddress(): uint32(trx41.SerializeSize()), + } + + tests := []struct { + height uint32 + txs []*tx.Tx + }{ + {1, []*tx.Tx{trx11, trx12, trx13, trx14}}, + {2, []*tx.Tx{trx21}}, + {3, []*tx.Tx{trx31}}, + {4, []*tx.Tx{trx41, trx42}}, + } + + for _, tt := range tests { + // Generate a block with the transactions for the given height + blk, cert := td.TestSuite.GenerateTestBlock(tt.height, func(bm *testsuite.BlockMaker) { + bm.Txs = tt.txs + }) + td.str.SaveBlock(blk, cert) + + // Handle the block in the transaction pool + err := td.pool.HandleCommittedBlock(blk) + require.NoError(t, err) + } + + require.Equal(t, expected, td.pool.consumptionMap) +} + func TestAppendInvalidTransaction(t *testing.T) { td := setup(t) diff --git a/util/io_test.go b/util/io_test.go index 5c21a1835..ac1ecb8ba 100644 --- a/util/io_test.go +++ b/util/io_test.go @@ -217,16 +217,23 @@ func TestSanitizeArchivePath(t *testing.T) { func TestListFilesInDir(t *testing.T) { tmpDir := TempDirPath() - file1, err := os.Create(filepath.Join(tmpDir, ".file1")) + file1Path := filepath.Join(tmpDir, ".public_file") + file1, err := os.Create(file1Path) require.NoError(t, err) require.NoError(t, file1.Close()) - file2, err := os.Create(filepath.Join(tmpDir, ".file2")) + file2Path := filepath.Join(tmpDir, ".hidden_file") + file2, err := os.Create(file2Path) require.NoError(t, err) require.NoError(t, file2.Close()) + err = os.Mkdir(filepath.Join(tmpDir, "directory"), 0o750) + require.NoError(t, err) + files, err := ListFilesInDir(tmpDir) require.NoError(t, err) assert.Len(t, files, 2) + assert.Contains(t, files, file1Path) + assert.Contains(t, files, file2Path) }