Skip to content

Commit

Permalink
[actpool] validation for blobtx (#4367)
Browse files Browse the repository at this point in the history
  • Loading branch information
envestcc authored Sep 19, 2024
1 parent 3f9b1f0 commit ef61b43
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 7 deletions.
15 changes: 15 additions & 0 deletions actpool/actpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
accountutil "github.com/iotexproject/iotex-core/action/protocol/account/util"
"github.com/iotexproject/iotex-core/blockchain/block"
"github.com/iotexproject/iotex-core/blockchain/genesis"
"github.com/iotexproject/iotex-core/pkg/lifecycle"
"github.com/iotexproject/iotex-core/pkg/log"
"github.com/iotexproject/iotex-core/pkg/prometheustimer"
"github.com/iotexproject/iotex-core/pkg/routine"
Expand Down Expand Up @@ -54,6 +55,7 @@ func init() {
// ActPool is the interface of actpool
type ActPool interface {
action.SealedEnvelopeValidator
lifecycle.StartStopper
// Reset resets actpool state
Reset()
// PendingActionMap returns an action map with all accepted actions
Expand Down Expand Up @@ -162,12 +164,25 @@ func (ap *actPool) Start(ctx context.Context) error {
if ap.store == nil {
return nil
}
// open action store and load all actions
blobs := make(SortedActions, 0)
err := ap.store.Open(func(selp *action.SealedEnvelope) error {
if len(selp.BlobHashes()) > 0 {
blobs = append(blobs, selp)
return nil
}
return ap.add(ctx, selp)
})
if err != nil {
return err
}
// add blob txs to actpool in nonce order
sort.Sort(blobs)
for _, selp := range blobs {
if err := ap.add(ctx, selp); err != nil {
return err
}
}
return ap.storeSync.Start(ctx)
}

Expand Down
110 changes: 107 additions & 3 deletions actpool/actpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,16 @@ package actpool
import (
"bytes"
"context"
"crypto/sha256"
"math/big"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/golang/mock/gomock"
"github.com/holiman/uint256"
"github.com/iotexproject/iotex-address/address"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
Expand All @@ -25,16 +30,18 @@ import (
"github.com/iotexproject/iotex-core/actpool/actioniterator"
"github.com/iotexproject/iotex-core/blockchain"
"github.com/iotexproject/iotex-core/blockchain/genesis"
"github.com/iotexproject/iotex-core/pkg/unit"
"github.com/iotexproject/iotex-core/state"
"github.com/iotexproject/iotex-core/test/identityset"
"github.com/iotexproject/iotex-core/test/mock/mock_chainmanager"
"github.com/iotexproject/iotex-core/test/mock/mock_sealed_envelope_validator"
)

const (
_maxNumActsPerPool = 8192
_maxGasLimitPerPool = 81920000
_maxNumActsPerAcct = 256
_maxNumActsPerPool = 8192
_maxGasLimitPerPool = 81920000
_maxNumActsPerAcct = 256
_maxNumBlobTxPerAcct = 3
)

var (
Expand Down Expand Up @@ -128,6 +135,8 @@ func TestActPool_AddActs(t *testing.T) {
}
if bytes.Equal(cfg.Key, identityset.Address(28).Bytes()) {
require.NoError(acct.AddBalance(big.NewInt(100)))
} else if bytes.Equal(cfg.Key, identityset.Address(1).Bytes()) {
require.NoError(acct.AddBalance(unit.ConvertIotxToRau(100)))
} else {
require.NoError(acct.AddBalance(big.NewInt(10)))
}
Expand Down Expand Up @@ -276,6 +285,100 @@ func TestActPool_AddActs(t *testing.T) {
elp = bd.SetAction(&action.PutPollResult{}).Build()
err = ap.Add(ctx, action.FakeSeal(elp, _priKey1.PublicKey()))
require.Equal(action.ErrInvalidAct, errors.Cause(err))

t.Run("blobTx", func(t *testing.T) {
blob := kzg4844.Blob{}
commitment, err := kzg4844.BlobToCommitment(blob)
require.NoError(err)
proof, err := kzg4844.ComputeBlobProof(blob, commitment)
require.NoError(err)
blobHash := kzg4844.CalcBlobHashV1(sha256.New(), &commitment)
builder := &action.EnvelopeBuilder{}
tx, err := builder.BuildTransfer(types.NewTx(&types.BlobTx{
Nonce: 2,
GasTipCap: uint256.MustFromBig(big.NewInt(10)),
GasFeeCap: uint256.MustFromBig(big.NewInt(20)),
Gas: 100000,
To: common.BytesToAddress(identityset.Address(1).Bytes()),
Value: uint256.MustFromBig(big.NewInt(1)),
BlobFeeCap: uint256.MustFromBig(big.NewInt(10)),
BlobHashes: []common.Hash{blobHash},
Sidecar: &types.BlobTxSidecar{
Blobs: []kzg4844.Blob{blob},
Commitments: []kzg4844.Commitment{commitment},
Proofs: []kzg4844.Proof{proof},
},
}))
require.NoError(err)
builder = &action.EnvelopeBuilder{}
tx2, err := builder.BuildTransfer(types.NewTx(&types.BlobTx{
Nonce: 1,
GasTipCap: uint256.MustFromBig(big.NewInt(10)),
GasFeeCap: uint256.MustFromBig(big.NewInt(20)),
Gas: 100000,
To: common.BytesToAddress(identityset.Address(1).Bytes()),
Value: uint256.MustFromBig(big.NewInt(2)),
BlobFeeCap: uint256.MustFromBig(big.NewInt(10)),
BlobHashes: []common.Hash{blobHash},
Sidecar: &types.BlobTxSidecar{
Blobs: []kzg4844.Blob{blob},
Commitments: []kzg4844.Commitment{commitment},
Proofs: []kzg4844.Proof{proof},
},
}))
require.NoError(err)
builder = &action.EnvelopeBuilder{}
tx3, err := builder.BuildTransfer(types.NewTx(&types.BlobTx{
Nonce: 1,
GasTipCap: uint256.MustFromBig(big.NewInt(20)),
GasFeeCap: uint256.MustFromBig(big.NewInt(40)),
Gas: 100000,
To: common.BytesToAddress(identityset.Address(1).Bytes()),
Value: uint256.MustFromBig(big.NewInt(2)),
BlobFeeCap: uint256.MustFromBig(big.NewInt(20)),
BlobHashes: []common.Hash{blobHash},
Sidecar: &types.BlobTxSidecar{
Blobs: []kzg4844.Blob{blob},
Commitments: []kzg4844.Commitment{commitment},
Proofs: []kzg4844.Proof{proof},
},
}))
require.NoError(err)
// reject non-continuous nonce
ap, err := NewActPool(genesis.Default, sf, apConfig)
require.NoError(err)
require.NoError(ap.Start(ctx))
defer ap.Stop(ctx)
ap.AddActionEnvelopeValidators(protocol.NewGenericValidator(sf, accountutil.AccountState))
signedTx, err := action.Sign(tx, identityset.PrivateKey(1))
require.NoError(err)
require.ErrorIs(ap.Add(ctx, signedTx), action.ErrNonceTooHigh)
// accept continuous nonce
tx.SetNonce(1)
signedTx, err = action.Sign(tx, identityset.PrivateKey(1))
require.NoError(err)
require.NoError(ap.Add(ctx, signedTx))
// 2x price bump to replace
signedTx, err = action.Sign(tx2, identityset.PrivateKey(1))
require.NoError(err)
require.ErrorIs(ap.Add(ctx, signedTx), action.ErrReplaceUnderpriced)
signedTx, err = action.Sign(tx3, identityset.PrivateKey(1))
require.NoError(err)
require.NoError(ap.Add(ctx, signedTx))
// max blob tx per account
tx.SetNonce(2)
signedTx, err = action.Sign(tx, identityset.PrivateKey(1))
require.NoError(err)
require.NoError(ap.Add(ctx, signedTx))
tx.SetNonce(3)
signedTx, err = action.Sign(tx, identityset.PrivateKey(1))
require.NoError(err)
require.NoError(ap.Add(ctx, signedTx))
tx.SetNonce(4)
signedTx, err = action.Sign(tx, identityset.PrivateKey(1))
require.NoError(err)
require.ErrorIs(ap.Add(ctx, signedTx), action.ErrNonceTooHigh)
})
}

func TestActPool_PickActs(t *testing.T) {
Expand Down Expand Up @@ -1136,6 +1239,7 @@ func getActPoolCfg() Config {
MaxNumActsPerAcct: _maxNumActsPerAcct,
MinGasPriceStr: "0",
BlackList: []string{_addr6},
MaxNumBlobsPerAcct: _maxNumBlobTxPerAcct,
}
}

Expand Down
46 changes: 44 additions & 2 deletions actpool/actqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/facebookgo/clock"
"github.com/pkg/errors"
"go.uber.org/zap"

"github.com/iotexproject/iotex-address/address"
Expand Down Expand Up @@ -59,6 +60,8 @@ type actQueue struct {
clock clock.Clock
ttl time.Duration
mu sync.RWMutex
// count of blob txs in the queue
blobCount int
}

// NewActQueue create a new action queue
Expand Down Expand Up @@ -104,7 +107,28 @@ func (q *actQueue) Put(act *action.SealedEnvelope) error {
if actInPool, exist := q.items[nonce]; exist {
// act of higher gas price can cut in line
if nonce < q.pendingNonce && act.GasFeeCap().Cmp(actInPool.GasFeeCap()) != 1 {
return action.ErrReplaceUnderpriced
return errors.Wrapf(action.ErrReplaceUnderpriced, "gas fee cap %s < %s", act.GasFeeCap(), actInPool.GasFeeCap())
}
// 2x bumps in gas price are allowed for blob tx
isPrevBlobTx, isBlobTx := len(actInPool.BlobHashes()) > 0, len(act.BlobHashes()) > 0
if isPrevBlobTx {
if !isBlobTx {
return errors.Wrap(action.ErrReplaceUnderpriced, "blob tx can only replace blob tx")
}
var (
priceBump = big.NewInt(2)
minGasFeeCap = new(big.Int).Mul(actInPool.GasFeeCap(), priceBump)
minGasTipCap = new(big.Int).Mul(actInPool.GasTipCap(), priceBump)
minBlobGasFeeCap = new(big.Int).Mul(actInPool.BlobGasFeeCap(), priceBump)
)
switch {
case act.GasFeeCap().Cmp(minGasFeeCap) < 0:
return errors.Wrapf(action.ErrReplaceUnderpriced, "gas fee cap %s < %s", act.GasFeeCap(), minGasFeeCap)
case act.GasTipCap().Cmp(minGasTipCap) < 0:
return errors.Wrapf(action.ErrReplaceUnderpriced, "gas tip cap %s < %s", act.GasTipCap(), minGasTipCap)
case act.BlobGasFeeCap().Cmp(minBlobGasFeeCap) < 0:
return errors.Wrapf(action.ErrReplaceUnderpriced, "blob gas fee cap %s < %s", act.BlobGasFeeCap(), minBlobGasFeeCap)
}
}
// update action in q.items and q.index
q.items[nonce] = act
Expand All @@ -117,13 +141,21 @@ func (q *actQueue) Put(act *action.SealedEnvelope) error {
q.updateFromNonce(nonce)
return nil
}
// check max number of blob txs per account
isBlobTx := len(act.BlobHashes()) > 0
if isBlobTx && q.blobCount >= int(q.ap.cfg.MaxNumBlobsPerAcct) {
return errors.Wrap(action.ErrNonceTooHigh, "too many blob txs in the queue")
}
nttl := &nonceWithTTL{nonce: nonce, deadline: q.clock.Now().Add(q.ttl)}
heap.Push(&q.ascQueue, nttl)
heap.Push(&q.descQueue, nttl)
q.items[nonce] = act
if nonce == q.pendingNonce {
q.updateFromNonce(q.pendingNonce)
}
if isBlobTx {
q.blobCount++
}
return nil
}

Expand Down Expand Up @@ -184,6 +216,9 @@ func (q *actQueue) cleanTimeout() []*action.SealedEnvelope {
nonce := q.ascQueue[i].nonce
if timeNow.After(q.ascQueue[i].deadline) && nonce > q.pendingNonce {
removedFromQueue = append(removedFromQueue, q.items[nonce])
if len(q.items[nonce].BlobHashes()) > 0 {
q.blobCount--
}
delete(q.items, nonce)
delete(q.pendingBalance, nonce)
q.ascQueue[i] = q.ascQueue[size-1]
Expand Down Expand Up @@ -220,7 +255,11 @@ func (q *actQueue) UpdateAccountState(nonce uint64, balance *big.Int) []*action.
heap.Remove(&q.descQueue, nttl.descIdx)
nonce := nttl.nonce
removed = append(removed, q.items[nonce])
if len(q.items[nonce].BlobHashes()) > 0 {
q.blobCount--
}
delete(q.items, nonce)

}
return removed
}
Expand Down Expand Up @@ -264,6 +303,7 @@ func (q *actQueue) Reset() {
q.pendingBalance = make(map[uint64]*big.Int)
q.accountNonce = 0
q.accountBalance = big.NewInt(0)
q.blobCount = 0
}

// PendingActs creates a consecutive nonce-sorted slice of actions
Expand Down Expand Up @@ -337,6 +377,8 @@ func (q *actQueue) PopActionWithLargestNonce() *action.SealedEnvelope {
item := q.items[itemMeta.nonce]
delete(q.items, itemMeta.nonce)
q.updateFromNonce(itemMeta.nonce)

if len(item.BlobHashes()) > 0 {
q.blobCount--
}
return item
}
3 changes: 3 additions & 0 deletions actpool/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ var (
ActionExpiry: 10 * time.Minute,
MinGasPriceStr: big.NewInt(unit.Qev).String(),
BlackList: []string{},
MaxNumBlobsPerAcct: 16,
}
)

Expand All @@ -39,6 +40,8 @@ type Config struct {
BlackList []string `yaml:"blackList"`
// Store defines the config for persistent cache
Store *StoreConfig `yaml:"store"`
// MaxNumBlobsPerAcct defines the maximum number of blob txs an account can have
MaxNumBlobsPerAcct uint64 `yaml:"maxNumBlobsPerAcct"`
}

// MinGasPrice returns the minimal gas price threshold
Expand Down
16 changes: 14 additions & 2 deletions actpool/queueworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package actpool
import (
"context"
"encoding/hex"
"errors"
"math/big"
"sort"
"strings"
Expand All @@ -12,6 +11,7 @@ import (

"github.com/iotexproject/go-pkgs/cache/ttl"
"github.com/iotexproject/iotex-address/address"
"github.com/pkg/errors"
"go.uber.org/zap"

"github.com/iotexproject/iotex-core/action"
Expand Down Expand Up @@ -105,7 +105,7 @@ func (worker *queueWorker) Handle(job workerJob) error {
}

worker.ap.allActions.Set(actHash, act)
isBlobTx := false // TODO: only store blob tx
isBlobTx := len(act.BlobHashes()) > 0
if worker.ap.store != nil && isBlobTx {
if err := worker.ap.store.Put(act); err != nil {
log.L().Warn("failed to store action", zap.Error(err), log.Hex("hash", actHash[:]))
Expand Down Expand Up @@ -180,6 +180,18 @@ func (worker *queueWorker) checkSelpWithState(act *action.SealedEnvelope, pendin
return action.ErrNonceTooHigh
}

// Nonce must be continuous for blob tx
if len(act.BlobHashes()) > 0 {
pendingNonceInPool, ok := worker.PendingNonce(act.SenderAddress())
if !ok {
pendingNonceInPool = pendingNonce
}
if act.Nonce() > pendingNonceInPool {
_actpoolMtc.WithLabelValues("nonceTooLarge").Inc()
return errors.Wrapf(action.ErrNonceTooHigh, "nonce %d is larger than pending nonce %d", act.Nonce(), pendingNonceInPool)
}
}

if cost, _ := act.Cost(); balance.Cmp(cost) < 0 {
_actpoolMtc.WithLabelValues("insufficientBalance").Inc()
sender := act.SenderAddress().String()
Expand Down
Loading

0 comments on commit ef61b43

Please sign in to comment.