Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[actpool] validation for blobtx #4367

Merged
merged 5 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions actpool/actpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
accountutil "github.com/iotexproject/iotex-core/action/protocol/account/util"
"github.com/iotexproject/iotex-core/blockchain/block"
"github.com/iotexproject/iotex-core/blockchain/genesis"
"github.com/iotexproject/iotex-core/pkg/lifecycle"
"github.com/iotexproject/iotex-core/pkg/log"
"github.com/iotexproject/iotex-core/pkg/prometheustimer"
"github.com/iotexproject/iotex-core/pkg/routine"
Expand Down Expand Up @@ -54,6 +55,7 @@ func init() {
// ActPool is the interface of actpool
type ActPool interface {
action.SealedEnvelopeValidator
lifecycle.StartStopper
// Reset resets actpool state
Reset()
// PendingActionMap returns an action map with all accepted actions
Expand Down Expand Up @@ -162,12 +164,25 @@ func (ap *actPool) Start(ctx context.Context) error {
if ap.store == nil {
return nil
}
// open action store and load all actions
blobs := make(SortedActions, 0)
err := ap.store.Open(func(selp *action.SealedEnvelope) error {
if len(selp.BlobHashes()) > 0 {
blobs = append(blobs, selp)
return nil
}
return ap.add(ctx, selp)
})
if err != nil {
return err
}
// add blob txs to actpool in nonce order
sort.Sort(blobs)
for _, selp := range blobs {
if err := ap.add(ctx, selp); err != nil {
return err
}
}
return ap.storeSync.Start(ctx)
}

Expand Down
110 changes: 107 additions & 3 deletions actpool/actpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,16 @@ package actpool
import (
"bytes"
"context"
"crypto/sha256"
"math/big"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/golang/mock/gomock"
"github.com/holiman/uint256"
"github.com/iotexproject/iotex-address/address"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
Expand All @@ -25,16 +30,18 @@ import (
"github.com/iotexproject/iotex-core/actpool/actioniterator"
"github.com/iotexproject/iotex-core/blockchain"
"github.com/iotexproject/iotex-core/blockchain/genesis"
"github.com/iotexproject/iotex-core/pkg/unit"
"github.com/iotexproject/iotex-core/state"
"github.com/iotexproject/iotex-core/test/identityset"
"github.com/iotexproject/iotex-core/test/mock/mock_chainmanager"
"github.com/iotexproject/iotex-core/test/mock/mock_sealed_envelope_validator"
)

const (
_maxNumActsPerPool = 8192
_maxGasLimitPerPool = 81920000
_maxNumActsPerAcct = 256
_maxNumActsPerPool = 8192
_maxGasLimitPerPool = 81920000
_maxNumActsPerAcct = 256
_maxNumBlobTxPerAcct = 3
)

var (
Expand Down Expand Up @@ -128,6 +135,8 @@ func TestActPool_AddActs(t *testing.T) {
}
if bytes.Equal(cfg.Key, identityset.Address(28).Bytes()) {
require.NoError(acct.AddBalance(big.NewInt(100)))
} else if bytes.Equal(cfg.Key, identityset.Address(1).Bytes()) {
require.NoError(acct.AddBalance(unit.ConvertIotxToRau(100)))
} else {
require.NoError(acct.AddBalance(big.NewInt(10)))
}
Expand Down Expand Up @@ -276,6 +285,100 @@ func TestActPool_AddActs(t *testing.T) {
elp = bd.SetAction(&action.PutPollResult{}).Build()
err = ap.Add(ctx, action.FakeSeal(elp, _priKey1.PublicKey()))
require.Equal(action.ErrInvalidAct, errors.Cause(err))

t.Run("blobTx", func(t *testing.T) {
blob := kzg4844.Blob{}
commitment, err := kzg4844.BlobToCommitment(blob)
require.NoError(err)
proof, err := kzg4844.ComputeBlobProof(blob, commitment)
require.NoError(err)
blobHash := kzg4844.CalcBlobHashV1(sha256.New(), &commitment)
builder := &action.EnvelopeBuilder{}
tx, err := builder.BuildTransfer(types.NewTx(&types.BlobTx{
Nonce: 2,
GasTipCap: uint256.MustFromBig(big.NewInt(10)),
GasFeeCap: uint256.MustFromBig(big.NewInt(20)),
Gas: 100000,
To: common.BytesToAddress(identityset.Address(1).Bytes()),
Value: uint256.MustFromBig(big.NewInt(1)),
BlobFeeCap: uint256.MustFromBig(big.NewInt(10)),
BlobHashes: []common.Hash{blobHash},
Sidecar: &types.BlobTxSidecar{
Blobs: []kzg4844.Blob{blob},
Commitments: []kzg4844.Commitment{commitment},
Proofs: []kzg4844.Proof{proof},
},
}))
require.NoError(err)
builder = &action.EnvelopeBuilder{}
tx2, err := builder.BuildTransfer(types.NewTx(&types.BlobTx{
Nonce: 1,
GasTipCap: uint256.MustFromBig(big.NewInt(10)),
GasFeeCap: uint256.MustFromBig(big.NewInt(20)),
Gas: 100000,
To: common.BytesToAddress(identityset.Address(1).Bytes()),
Value: uint256.MustFromBig(big.NewInt(2)),
BlobFeeCap: uint256.MustFromBig(big.NewInt(10)),
BlobHashes: []common.Hash{blobHash},
Sidecar: &types.BlobTxSidecar{
Blobs: []kzg4844.Blob{blob},
Commitments: []kzg4844.Commitment{commitment},
Proofs: []kzg4844.Proof{proof},
},
}))
require.NoError(err)
builder = &action.EnvelopeBuilder{}
tx3, err := builder.BuildTransfer(types.NewTx(&types.BlobTx{
Nonce: 1,
GasTipCap: uint256.MustFromBig(big.NewInt(20)),
GasFeeCap: uint256.MustFromBig(big.NewInt(40)),
Gas: 100000,
To: common.BytesToAddress(identityset.Address(1).Bytes()),
Value: uint256.MustFromBig(big.NewInt(2)),
BlobFeeCap: uint256.MustFromBig(big.NewInt(20)),
BlobHashes: []common.Hash{blobHash},
Sidecar: &types.BlobTxSidecar{
Blobs: []kzg4844.Blob{blob},
Commitments: []kzg4844.Commitment{commitment},
Proofs: []kzg4844.Proof{proof},
},
}))
require.NoError(err)
// reject non-continuous nonce
ap, err := NewActPool(genesis.Default, sf, apConfig)
require.NoError(err)
require.NoError(ap.Start(ctx))
defer ap.Stop(ctx)
ap.AddActionEnvelopeValidators(protocol.NewGenericValidator(sf, accountutil.AccountState))
signedTx, err := action.Sign(tx, identityset.PrivateKey(1))
require.NoError(err)
require.ErrorIs(ap.Add(ctx, signedTx), action.ErrNonceTooHigh)
// accept continuous nonce
tx.SetNonce(1)
signedTx, err = action.Sign(tx, identityset.PrivateKey(1))
require.NoError(err)
require.NoError(ap.Add(ctx, signedTx))
// 2x price bump to replace
signedTx, err = action.Sign(tx2, identityset.PrivateKey(1))
require.NoError(err)
require.ErrorIs(ap.Add(ctx, signedTx), action.ErrReplaceUnderpriced)
signedTx, err = action.Sign(tx3, identityset.PrivateKey(1))
require.NoError(err)
require.NoError(ap.Add(ctx, signedTx))
// max blob tx per account
tx.SetNonce(2)
signedTx, err = action.Sign(tx, identityset.PrivateKey(1))
require.NoError(err)
require.NoError(ap.Add(ctx, signedTx))
tx.SetNonce(3)
signedTx, err = action.Sign(tx, identityset.PrivateKey(1))
require.NoError(err)
require.NoError(ap.Add(ctx, signedTx))
tx.SetNonce(4)
signedTx, err = action.Sign(tx, identityset.PrivateKey(1))
require.NoError(err)
require.ErrorIs(ap.Add(ctx, signedTx), action.ErrNonceTooHigh)
})
}

func TestActPool_PickActs(t *testing.T) {
Expand Down Expand Up @@ -1136,6 +1239,7 @@ func getActPoolCfg() Config {
MaxNumActsPerAcct: _maxNumActsPerAcct,
MinGasPriceStr: "0",
BlackList: []string{_addr6},
MaxNumBlobsPerAcct: _maxNumBlobTxPerAcct,
}
}

Expand Down
46 changes: 44 additions & 2 deletions actpool/actqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/facebookgo/clock"
"github.com/pkg/errors"
"go.uber.org/zap"

"github.com/iotexproject/iotex-address/address"
Expand Down Expand Up @@ -59,6 +60,8 @@ type actQueue struct {
clock clock.Clock
ttl time.Duration
mu sync.RWMutex
// count of blob txs in the queue
blobCount int
}

// NewActQueue create a new action queue
Expand Down Expand Up @@ -104,7 +107,28 @@ func (q *actQueue) Put(act *action.SealedEnvelope) error {
if actInPool, exist := q.items[nonce]; exist {
// act of higher gas price can cut in line
if nonce < q.pendingNonce && act.GasFeeCap().Cmp(actInPool.GasFeeCap()) != 1 {
return action.ErrReplaceUnderpriced
return errors.Wrapf(action.ErrReplaceUnderpriced, "gas fee cap %s < %s", act.GasFeeCap(), actInPool.GasFeeCap())
}
// 2x bumps in gas price are allowed for blob tx
isPrevBlobTx, isBlobTx := len(actInPool.BlobHashes()) > 0, len(act.BlobHashes()) > 0
if isPrevBlobTx {
if !isBlobTx {
return errors.Wrap(action.ErrReplaceUnderpriced, "blob tx can only replace blob tx")
}
var (
priceBump = big.NewInt(2)
minGasFeeCap = new(big.Int).Mul(actInPool.GasFeeCap(), priceBump)
minGasTipCap = new(big.Int).Mul(actInPool.GasTipCap(), priceBump)
minBlobGasFeeCap = new(big.Int).Mul(actInPool.BlobGasFeeCap(), priceBump)
)
switch {
case act.GasFeeCap().Cmp(minGasFeeCap) < 0:
return errors.Wrapf(action.ErrReplaceUnderpriced, "gas fee cap %s < %s", act.GasFeeCap(), minGasFeeCap)
case act.GasTipCap().Cmp(minGasTipCap) < 0:
return errors.Wrapf(action.ErrReplaceUnderpriced, "gas tip cap %s < %s", act.GasTipCap(), minGasTipCap)
case act.BlobGasFeeCap().Cmp(minBlobGasFeeCap) < 0:
return errors.Wrapf(action.ErrReplaceUnderpriced, "blob gas fee cap %s < %s", act.BlobGasFeeCap(), minBlobGasFeeCap)
}
}
// update action in q.items and q.index
q.items[nonce] = act
Expand All @@ -117,13 +141,21 @@ func (q *actQueue) Put(act *action.SealedEnvelope) error {
q.updateFromNonce(nonce)
return nil
}
// check max number of blob txs per account
isBlobTx := len(act.BlobHashes()) > 0
if isBlobTx && q.blobCount >= int(q.ap.cfg.MaxNumBlobsPerAcct) {
return errors.Wrap(action.ErrNonceTooHigh, "too many blob txs in the queue")
}
nttl := &nonceWithTTL{nonce: nonce, deadline: q.clock.Now().Add(q.ttl)}
heap.Push(&q.ascQueue, nttl)
heap.Push(&q.descQueue, nttl)
q.items[nonce] = act
if nonce == q.pendingNonce {
q.updateFromNonce(q.pendingNonce)
}
if isBlobTx {
q.blobCount++
}
return nil
}

Expand Down Expand Up @@ -184,6 +216,9 @@ func (q *actQueue) cleanTimeout() []*action.SealedEnvelope {
nonce := q.ascQueue[i].nonce
if timeNow.After(q.ascQueue[i].deadline) && nonce > q.pendingNonce {
removedFromQueue = append(removedFromQueue, q.items[nonce])
if len(q.items[nonce].BlobHashes()) > 0 {
q.blobCount--
}
delete(q.items, nonce)
delete(q.pendingBalance, nonce)
q.ascQueue[i] = q.ascQueue[size-1]
Expand Down Expand Up @@ -220,7 +255,11 @@ func (q *actQueue) UpdateAccountState(nonce uint64, balance *big.Int) []*action.
heap.Remove(&q.descQueue, nttl.descIdx)
nonce := nttl.nonce
removed = append(removed, q.items[nonce])
if len(q.items[nonce].BlobHashes()) > 0 {
q.blobCount--
}
delete(q.items, nonce)

}
return removed
}
Expand Down Expand Up @@ -264,6 +303,7 @@ func (q *actQueue) Reset() {
q.pendingBalance = make(map[uint64]*big.Int)
q.accountNonce = 0
q.accountBalance = big.NewInt(0)
q.blobCount = 0
}

// PendingActs creates a consecutive nonce-sorted slice of actions
Expand Down Expand Up @@ -337,6 +377,8 @@ func (q *actQueue) PopActionWithLargestNonce() *action.SealedEnvelope {
item := q.items[itemMeta.nonce]
delete(q.items, itemMeta.nonce)
q.updateFromNonce(itemMeta.nonce)

if len(item.BlobHashes()) > 0 {
q.blobCount--
}
return item
}
3 changes: 3 additions & 0 deletions actpool/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ var (
ActionExpiry: 10 * time.Minute,
MinGasPriceStr: big.NewInt(unit.Qev).String(),
BlackList: []string{},
MaxNumBlobsPerAcct: 16,
}
)

Expand All @@ -39,6 +40,8 @@ type Config struct {
BlackList []string `yaml:"blackList"`
// Store defines the config for persistent cache
Store *StoreConfig `yaml:"store"`
// MaxNumBlobsPerAcct defines the maximum number of blob txs an account can have
MaxNumBlobsPerAcct uint64 `yaml:"maxNumBlobsPerAcct"`
}

// MinGasPrice returns the minimal gas price threshold
Expand Down
16 changes: 14 additions & 2 deletions actpool/queueworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package actpool
import (
"context"
"encoding/hex"
"errors"
"math/big"
"sort"
"strings"
Expand All @@ -12,6 +11,7 @@ import (

"github.com/iotexproject/go-pkgs/cache/ttl"
"github.com/iotexproject/iotex-address/address"
"github.com/pkg/errors"
"go.uber.org/zap"

"github.com/iotexproject/iotex-core/action"
Expand Down Expand Up @@ -105,7 +105,7 @@ func (worker *queueWorker) Handle(job workerJob) error {
}

worker.ap.allActions.Set(actHash, act)
isBlobTx := false // TODO: only store blob tx
isBlobTx := len(act.BlobHashes()) > 0
if worker.ap.store != nil && isBlobTx {
if err := worker.ap.store.Put(act); err != nil {
log.L().Warn("failed to store action", zap.Error(err), log.Hex("hash", actHash[:]))
Expand Down Expand Up @@ -180,6 +180,18 @@ func (worker *queueWorker) checkSelpWithState(act *action.SealedEnvelope, pendin
return action.ErrNonceTooHigh
}

// Nonce must be continuous for blob tx
if len(act.BlobHashes()) > 0 {
pendingNonceInPool, ok := worker.PendingNonce(act.SenderAddress())
if !ok {
pendingNonceInPool = pendingNonce
}
if act.Nonce() > pendingNonceInPool {
_actpoolMtc.WithLabelValues("nonceTooLarge").Inc()
return errors.Wrapf(action.ErrNonceTooHigh, "nonce %d is larger than pending nonce %d", act.Nonce(), pendingNonceInPool)
}
}

if cost, _ := act.Cost(); balance.Cmp(cost) < 0 {
_actpoolMtc.WithLabelValues("insufficientBalance").Inc()
sender := act.SenderAddress().String()
Expand Down
Loading
Loading