Skip to content

Commit

Permalink
remove invalid transactions from mempool after three invalid executions
Browse files Browse the repository at this point in the history
  • Loading branch information
p4u committed Oct 10, 2023
1 parent cfdf9fb commit 5fe633b
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 21 deletions.
31 changes: 22 additions & 9 deletions vochain/app.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package vochain

import (
"encoding/hex"
"fmt"
"path/filepath"
"sync"
Expand Down Expand Up @@ -32,6 +33,9 @@ const (
// transactionBlocksTTL is the number of blocks after which a transaction is
// removed from the mempool.
transactionBlocksTTL = 6 * 10 // 10 minutes
// maxPendingTxAttempts is the number of times a transaction can be included in a block
// and fail before being removed from the mempool.
maxPendingTxAttempts = 3
)

var (
Expand Down Expand Up @@ -64,7 +68,8 @@ type BaseApplication struct {
fnMempoolPrune func(txKey [32]byte) error
blockCache *lru.Cache[int64, *tmtypes.Block]
// txTTLReferences is a map of tx hashes to the block height where they failed.
txTTLReferences sync.Map
txReferences sync.Map

// endBlockTimestamp is the last block end timestamp calculated from local time.
endBlockTimestamp atomic.Int64
// startBlockTimestamp is the current block timestamp from tendermint's
Expand All @@ -90,6 +95,13 @@ type BaseApplication struct {
testMockBlockStore *testutil.MockBlockStore
}

// pendingTxReference is used to store the block height where the transaction was accepted by the mempool, and the number
// of times it has been included in a block but failed.
type pendingTxReference struct {
height uint32
failedCount int
}

// DeliverTxResponse is the response returned by DeliverTx after executing the transaction.
type DeliverTxResponse struct {
Code uint32
Expand All @@ -103,7 +115,7 @@ type DeliverTxResponse struct {
type ExecuteBlockResponse struct {
Responses []*DeliverTxResponse
Root []byte
InvalidTransactions bool
InvalidTransactions [][32]byte
}

// NewBaseApplication creates a new BaseApplication given a name and a DB backend.
Expand Down Expand Up @@ -151,7 +163,7 @@ func NewBaseApplication(dbType, dbpath string) (*BaseApplication, error) {
func (app *BaseApplication) ExecuteBlock(txs [][]byte, height uint32, blockTime time.Time) (*ExecuteBlockResponse, error) {
result := []*DeliverTxResponse{}
app.beginBlock(blockTime, height)
invalidTxs := false
invalidTxs := [][32]byte{}
for _, tx := range txs {
resp := app.deliverTx(tx)
if resp.Code != 0 {
Expand All @@ -160,7 +172,7 @@ func (app *BaseApplication) ExecuteBlock(txs [][]byte, height uint32, blockTime
"data", string(resp.Data),
"info", resp.Info,
"log", resp.Log)
invalidTxs = true
invalidTxs = append(invalidTxs, [32]byte{})
}
result = append(result, resp)
}
Expand Down Expand Up @@ -224,7 +236,7 @@ func (app *BaseApplication) deliverTx(rawTx []byte) *DeliverTxResponse {
log.Errorw(err, "rejected tx")
return &DeliverTxResponse{Code: 1, Data: []byte(err.Error())}
}
app.txTTLReferences.Delete(tx.TxID)
app.txReferences.Delete(tx.TxID)
// call event listeners
for _, e := range app.State.EventListeners() {
e.OnNewTx(tx, app.Height(), app.State.TxCounter())
Expand Down Expand Up @@ -327,12 +339,13 @@ func (app *BaseApplication) SetChainID(chainID string) {
}

// MempoolDeleteTx removes a transaction from the mempool. If the mempool implementation does not allow it,
// it just returns nil.
func (app *BaseApplication) MempoolDeleteTx(txID [32]byte) error {
// its a no-op function. Errors are logged but not returned.
func (app *BaseApplication) MempoolDeleteTx(txID [32]byte) {
if app.fnMempoolPrune != nil {
return app.fnMempoolPrune(txID)
if err := app.fnMempoolPrune(txID); err != nil {
log.Warnw("could not remove mempool tx", "txID", hex.EncodeToString(txID[:]), "err", err)
}
}
return nil
}

// Genesis returns the tendermint genesis information
Expand Down
40 changes: 28 additions & 12 deletions vochain/cometbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,17 +171,21 @@ func (app *BaseApplication) InitChain(_ context.Context,
func (app *BaseApplication) CheckTx(_ context.Context,
req *abcitypes.RequestCheckTx) (*abcitypes.ResponseCheckTx, error) {
txReference := vochaintx.TxKey(req.Tx)
// store the initial height of the tx
initialTTLheight, _ := app.txTTLReferences.LoadOrStore(txReference, app.Height())
// check if the tx is referenced by a previous block and the TTL has expired
if app.Height() > initialTTLheight.(uint32)+transactionBlocksTTL {
// remove tx reference and return checkTx error
log.Debugw("pruning expired tx from mempool", "height", app.Height(), "hash", fmt.Sprintf("%x", txReference))
if err := app.MempoolDeleteTx(txReference); err != nil {
log.Warnw("could not remove tx from mempool", "error", err.Error(), "txID", hex.EncodeToString(txReference[:]))
ref, ok := app.txReferences.Load(txReference)
if !ok {
// store the initial height of the tx if its the first time we see it
app.txReferences.Store(txReference, &pendingTxReference{
height: app.Height(),
})
} else {
height := ref.(*pendingTxReference).height
// check if the tx is referenced by a previous block and the TTL has expired
if app.Height() > height+transactionBlocksTTL {
// remove tx reference and return checkTx error
log.Debugw("pruning expired tx from mempool", "height", app.Height(), "hash", fmt.Sprintf("%x", txReference))
app.txReferences.Delete(txReference)
return &abcitypes.ResponseCheckTx{Code: 1, Data: []byte(fmt.Sprintf("tx expired %x", txReference))}, nil
}
app.txTTLReferences.Delete(txReference)
return &abcitypes.ResponseCheckTx{Code: 1, Data: []byte(fmt.Sprintf("tx expired %x", txReference))}, nil
}
// execute recheck mempool every recheckTxHeightInterval blocks
if req.Type == abcitypes.CheckTxType_Recheck {
Expand Down Expand Up @@ -302,6 +306,7 @@ func (app *BaseApplication) PrepareProposal(ctx context.Context,
vtx := new(vochaintx.Tx)
if err := vtx.Unmarshal(tx, app.ChainID()); err != nil {
// invalid transaction
app.MempoolDeleteTx(vochaintx.TxKey(tx))
log.Warnw("could not unmarshal transaction", "err", err)
continue
}
Expand Down Expand Up @@ -351,6 +356,16 @@ func (app *BaseApplication) PrepareProposal(ctx context.Context,
}
return ""
}())
// remove transaction from mempool if max attempts reached
val, ok := app.txReferences.Load(txInfo.DecodedTx.TxID)
if ok {
val.(*pendingTxReference).failedCount++
if val.(*pendingTxReference).failedCount > maxPendingTxAttempts {
log.Debugf("transaction %x has reached max attempts, remove from mempool", txInfo.DecodedTx.TxID)
app.MempoolDeleteTx(txInfo.DecodedTx.TxID)
app.txReferences.Delete(txInfo.DecodedTx.TxID)
}
}
continue
}
validTxs = append(validTxs, txInfo.Data)
Expand Down Expand Up @@ -395,8 +410,9 @@ func (app *BaseApplication) ProcessProposal(_ context.Context,
if err != nil {
return nil, fmt.Errorf("cannot execute block on process proposal: %w", err)
}
if resp.InvalidTransactions {
log.Warnw("invalid transactions on process proposal", "height", app.Height(),
// invalid txx on a proposed block, should actually never happened if proposer acts honestly
if len(resp.InvalidTransactions) > 0 {
log.Warnw("invalid transactions on process proposal", "height", app.Height(), "count", len(resp.InvalidTransactions),
"proposer", hex.EncodeToString(req.ProposerAddress), "action", "reject")
return &abcitypes.ResponseProcessProposal{
Status: abcitypes.ResponseProcessProposal_REJECT,
Expand Down

0 comments on commit 5fe633b

Please sign in to comment.