Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: backport tx status #1405

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion consensus/replay_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo
}

mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{}
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, sm.WithBlockStore(blockStore))

consensusState := NewState(csConfig, state.Copy(), blockExec,
blockStore, mempool, evpool)
Expand Down
6 changes: 4 additions & 2 deletions consensus/replay_stubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ func (emptyMempool) TxsBytes() int64 { return 0 }
func (emptyMempool) TxsFront() *clist.CElement { return nil }
func (emptyMempool) TxsWaitChan() <-chan struct{} { return nil }

func (emptyMempool) InitWAL() error { return nil }
func (emptyMempool) CloseWAL() {}
func (emptyMempool) InitWAL() error { return nil }
func (emptyMempool) CloseWAL() {}
func (emptyMempool) GetTxByKey(types.TxKey) (types.Tx, bool) { return nil, false }
func (emptyMempool) WasRecentlyEvicted(types.TxKey) bool { return false }

//-----------------------------------------------------------------------------
// mockProxyApp uses ABCIResponses to give the right results.
Expand Down
16 changes: 11 additions & 5 deletions consensus/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/privval"
cmtstate "github.com/tendermint/tendermint/proto/tendermint/state"
cmtstore "github.com/tendermint/tendermint/proto/tendermint/store"
cmtproto "github.com/tendermint/tendermint/proto/tendermint/types"
"github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state"
Expand Down Expand Up @@ -58,7 +59,7 @@ func TestMain(m *testing.M) {
// the `Handshake Tests` are for failures in applying the block.
// With the help of the WAL, we can recover from it all!

//------------------------------------------------------------------------------------------
// ------------------------------------------------------------------------------------------
// WAL Tests

// TODO: It would be better to verify explicitly which states we can recover from without the wal
Expand Down Expand Up @@ -320,7 +321,7 @@ var (
sim testSim
)

//---------------------------------------
// ---------------------------------------
// Test handshake/replay

// 0 - all synced up
Expand Down Expand Up @@ -1041,7 +1042,7 @@ func (app *badApp) Commit() abci.ResponseCommit {
panic("either allHashesAreWrong or onlyLastHashIsWrong must be set")
}

//--------------------------
// --------------------------
// utils for making blocks

func makeBlockchainFromWAL(wal WAL) ([]*types.Block, []*types.Commit, error) {
Expand Down Expand Up @@ -1187,8 +1188,9 @@ func stateAndStore(
return stateDB, state, store
}

//----------------------------------
// ----------------------------------
// mock block store
var _ sm.BlockStore = &mockBlockStore{}

type mockBlockStore struct {
config *cfg.Config
Expand Down Expand Up @@ -1222,6 +1224,10 @@ func (bs *mockBlockStore) LoadBlockMeta(height int64) *types.BlockMeta {
func (bs *mockBlockStore) LoadBlockPart(height int64, index int) *types.Part { return nil }
func (bs *mockBlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) {
}
func (bs *mockBlockStore) SaveTxInfo(block *types.Block, txResponseCode []uint32) error {
return nil
}
func (bs *mockBlockStore) LoadTxInfo(hash []byte) *cmtstore.TxInfo { return &cmtstore.TxInfo{} }

func (bs *mockBlockStore) LoadBlockCommit(height int64) *types.Commit {
return bs.commits[height-1]
Expand All @@ -1242,7 +1248,7 @@ func (bs *mockBlockStore) PruneBlocks(height int64) (uint64, error) {
return pruned, nil
}

//---------------------------------------
// ---------------------------------------
// Test handshake/init chain

func TestHandshakeUpdatesValidators(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion consensus/wal_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) {
})
mempool := emptyMempool{}
evpool := sm.EmptyEvidencePool{}
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, sm.WithBlockStore(blockStore))
consensusState := NewState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool)
consensusState.SetLogger(logger)
consensusState.SetEventBus(eventBus)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
github.com/go-logfmt/logfmt v0.6.0
github.com/gofrs/uuid v4.4.0+incompatible
github.com/gogo/protobuf v1.3.2
github.com/golang/mock v1.4.4
github.com/golang/protobuf v1.5.3
github.com/golangci/golangci-lint v1.52.0
github.com/google/orderedcode v0.0.1
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFU
github.com/golang/mock v1.4.0/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
github.com/golang/mock v1.4.4 h1:l75CXGRSwbaYNpl/Z2X1XIIAMSCquvXgpVZDhwEIJsc=
github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand Down
9 changes: 9 additions & 0 deletions light/proxy/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func RPCRoutes(c *lrpc.Client) map[string]*rpcserver.RPCFunc {
"consensus_params": rpcserver.NewRPCFunc(makeConsensusParamsFunc(c), "height", rpcserver.Cacheable("height")),
"unconfirmed_txs": rpcserver.NewRPCFunc(makeUnconfirmedTxsFunc(c), "limit"),
"num_unconfirmed_txs": rpcserver.NewRPCFunc(makeNumUnconfirmedTxsFunc(c), ""),
"tx_status": rpcserver.NewRPCFunc(makeTxStatusFunc(c), "hash"),

// tx broadcast API
"broadcast_tx_commit": rpcserver.NewRPCFunc(makeBroadcastTxCommitFunc(c), "tx"),
Expand Down Expand Up @@ -143,6 +144,14 @@ func makeBlockResultsFunc(c *lrpc.Client) rpcBlockResultsFunc {
}
}

type rpcTxStatusFunc func(ctx *rpctypes.Context, hash []byte) (*ctypes.ResultTxStatus, error)

func makeTxStatusFunc(c *lrpc.Client) rpcTxStatusFunc {
return func(ctx *rpctypes.Context, hash []byte) (*ctypes.ResultTxStatus, error) {
return c.TxStatus(ctx.Context(), hash)
}
}

type rpcCommitFunc func(ctx *rpctypes.Context, height *int64) (*ctypes.ResultCommit, error)

func makeCommitFunc(c *lrpc.Client) rpcCommitFunc {
Expand Down
5 changes: 5 additions & 0 deletions light/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,11 @@ func (c *Client) BlockResults(ctx context.Context, height *int64) (*ctypes.Resul
return res, nil
}

// TxStatus retrieves the status of the transaction given its hash.
func (c *Client) TxStatus(ctx context.Context, hash []byte) (*ctypes.ResultTxStatus, error) {
return c.next.TxStatus(ctx, hash)
}

// Header fetches and verifies the header directly via the light client
func (c *Client) Header(ctx context.Context, height *int64) (*ctypes.ResultHeader, error) {
lb, err := c.updateLightClientIfNeededTo(ctx, height)
Expand Down
20 changes: 16 additions & 4 deletions mempool/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ type TxCache interface {
// Has reports whether tx is present in the cache. Checking for presence is
// not treated as an access of the value.
Has(tx types.Tx) bool

// HasKey reports whether the given key is present in the cache.
HasKey(key types.TxKey) bool
}

var _ TxCache = (*LRUTxCache)(nil)
Expand Down Expand Up @@ -113,12 +116,21 @@ func (c *LRUTxCache) Has(tx types.Tx) bool {
return ok
}

func (c *LRUTxCache) HasKey(key types.TxKey) bool {
c.mtx.Lock()
defer c.mtx.Unlock()

_, ok := c.cacheMap[key]
return ok
}

// NopTxCache defines a no-op raw transaction cache.
type NopTxCache struct{}

var _ TxCache = (*NopTxCache)(nil)

func (NopTxCache) Reset() {}
func (NopTxCache) Push(types.Tx) bool { return true }
func (NopTxCache) Remove(types.Tx) {}
func (NopTxCache) Has(types.Tx) bool { return false }
func (NopTxCache) Reset() {}
func (NopTxCache) Push(types.Tx) bool { return true }
func (NopTxCache) Remove(types.Tx) {}
func (NopTxCache) Has(types.Tx) bool { return false }
func (NopTxCache) HasKey(types.TxKey) bool { return false }
36 changes: 31 additions & 5 deletions mempool/cat/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type TxPool struct {

// Thread-safe cache of rejected transactions for quick look-up
rejectedTxCache *LRUTxCache
// Thread-safe cache of evicted transactions for quick look-up
evictedTxCache *LRUTxCache
// Thread-safe list of transactions peers have seen that we have not yet seen
seenByPeersSet *SeenTxSet

Expand Down Expand Up @@ -92,6 +94,7 @@ func NewTxPool(
proxyAppConn: proxyAppConn,
metrics: mempool.NopMetrics(),
rejectedTxCache: NewLRUTxCache(cfg.CacheSize),
evictedTxCache: NewLRUTxCache(cfg.CacheSize / 5),
seenByPeersSet: NewSeenTxSet(),
height: height,
preCheckFn: func(_ types.Tx) error { return nil },
Expand Down Expand Up @@ -171,16 +174,28 @@ func (txmp *TxPool) Has(txKey types.TxKey) bool {
return txmp.store.has(txKey)
}

// Get retrieves a transaction based on the key. It returns a bool
// if the transaction exists or not
// Get retrieves a transaction based on the key.
// Deprecated: use GetTxByKey instead.
func (txmp *TxPool) Get(txKey types.TxKey) (types.Tx, bool) {
return txmp.GetTxByKey(txKey)
}

// GetTxByKey retrieves a transaction based on the key. It returns a bool
// indicating whether transaction was found in the cache.
func (txmp *TxPool) GetTxByKey(txKey types.TxKey) (types.Tx, bool) {
wtx := txmp.store.get(txKey)
if wtx != nil {
return wtx.tx, true
}
return types.Tx{}, false
}

// WasRecentlyEvicted returns a bool indicating whether the transaction with
// the specified key was recently evicted and is currently within the cache.
func (txmp *TxPool) WasRecentlyEvicted(txKey types.TxKey) bool {
return txmp.evictedTxCache.Has(txKey)
}

// IsRejectedTx returns true if the transaction was recently rejected and is
// currently within the cache
func (txmp *TxPool) IsRejectedTx(txKey types.TxKey) bool {
Expand All @@ -195,9 +210,13 @@ func (txmp *TxPool) CheckToPurgeExpiredTxs() {
defer txmp.updateMtx.Unlock()
if txmp.config.TTLDuration > 0 && time.Since(txmp.lastPurgeTime) > txmp.config.TTLDuration {
expirationAge := time.Now().Add(-txmp.config.TTLDuration)
// a height of 0 means no transactions will be removed because of height
// A height of 0 means no transactions will be removed because of height
// (in other words, no transaction has a height less than 0)
numExpired := txmp.store.purgeExpiredTxs(0, expirationAge)
purgedTxs, numExpired := txmp.store.purgeExpiredTxs(0, expirationAge)
// Add the purged transactions to the evicted cache
for _, tx := range purgedTxs {
txmp.evictedTxCache.Push(tx.key)
}
txmp.metrics.EvictedTxs.Add(float64(numExpired))
txmp.lastPurgeTime = time.Now()
}
Expand Down Expand Up @@ -373,6 +392,7 @@ func (txmp *TxPool) Flush() {
txmp.store.reset()
txmp.seenByPeersSet.Reset()
txmp.rejectedTxCache.Reset()
txmp.evictedTxCache.Reset()
txmp.metrics.EvictedTxs.Add(float64(size))
txmp.broadcastMtx.Lock()
defer txmp.broadcastMtx.Unlock()
Expand Down Expand Up @@ -537,6 +557,7 @@ func (txmp *TxPool) addNewTransaction(wtx *wrappedTx, checkTxRes *abci.ResponseC
// drop the new one.
if len(victims) == 0 || victimBytes < wtx.size() {
txmp.metrics.EvictedTxs.Add(1)
txmp.evictedTxCache.Push(wtx.key)
checkTxRes.MempoolError = fmt.Sprintf("rejected valid incoming transaction; mempool is full (%X)",
wtx.key)
return fmt.Errorf("rejected valid incoming transaction; mempool is full (%X). Size: (%d:%d)",
Expand Down Expand Up @@ -591,6 +612,7 @@ func (txmp *TxPool) addNewTransaction(wtx *wrappedTx, checkTxRes *abci.ResponseC

func (txmp *TxPool) evictTx(wtx *wrappedTx) {
txmp.store.remove(wtx.key)
txmp.evictedTxCache.Push(wtx.key)
txmp.metrics.EvictedTxs.Add(1)
txmp.logger.Debug(
"evicted valid existing transaction; mempool full",
Expand Down Expand Up @@ -720,7 +742,11 @@ func (txmp *TxPool) purgeExpiredTxs(blockHeight int64) {
expirationAge = time.Time{}
}

numExpired := txmp.store.purgeExpiredTxs(expirationHeight, expirationAge)
purgedTxs, numExpired := txmp.store.purgeExpiredTxs(expirationHeight, expirationAge)
// Add the purged transactions to the evicted cache
for _, tx := range purgedTxs {
txmp.evictedTxCache.Push(tx.key)
}
txmp.metrics.EvictedTxs.Add(float64(numExpired))

// purge old evicted and seen transactions
Expand Down
15 changes: 14 additions & 1 deletion mempool/cat/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ func TestTxPool_Eviction(t *testing.T) {
mustCheckTx(t, txmp, "key1=0000=25")
require.True(t, txExists("key1=0000=25"))
require.False(t, txExists(bigTx))
require.True(t, txmp.WasRecentlyEvicted(types.Tx(bigTx).Key()))
require.Equal(t, int64(len("key1=0000=25")), txmp.SizeBytes())

// Now fill up the rest of the slots with other transactions.
Expand All @@ -257,23 +258,27 @@ func TestTxPool_Eviction(t *testing.T) {
require.Error(t, err)
require.Contains(t, err.Error(), "mempool is full")
require.False(t, txExists("key6=0005=1"))
require.True(t, txmp.WasRecentlyEvicted(types.Tx("key6=0005=1").Key()))

// A new transaction with higher priority should evict key5, which is the
// newest of the two transactions with lowest priority.
mustCheckTx(t, txmp, "key7=0006=7")
require.True(t, txExists("key7=0006=7")) // new transaction added
require.False(t, txExists("key5=0004=3")) // newest low-priority tx evicted
require.True(t, txExists("key4=0003=3")) // older low-priority tx retained
require.True(t, txmp.WasRecentlyEvicted(types.Tx("key5=0004=3").Key()))
require.True(t, txExists("key4=0003=3")) // older low-priority tx retained

// Another new transaction evicts the other low-priority element.
mustCheckTx(t, txmp, "key8=0007=20")
require.True(t, txExists("key8=0007=20"))
require.False(t, txExists("key4=0003=3"))
require.True(t, txmp.WasRecentlyEvicted(types.Tx("key4=0003=3").Key()))

// Now the lowest-priority tx is 5, so that should be the next to go.
mustCheckTx(t, txmp, "key9=0008=9")
require.True(t, txExists("key9=0008=9"))
require.False(t, txExists("key2=0001=5"))
require.True(t, txmp.WasRecentlyEvicted(types.Tx("key2=0001=5").Key()))

// Add a transaction that requires eviction of multiple lower-priority
// entries, in order to fit the size of the element.
Expand All @@ -282,8 +287,11 @@ func TestTxPool_Eviction(t *testing.T) {
require.True(t, txExists("key8=0007=20"))
require.True(t, txExists("key10=0123456789abcdef=11"))
require.False(t, txExists("key3=0002=10"))
require.True(t, txmp.WasRecentlyEvicted(types.Tx("key3=0002=10").Key()))
require.False(t, txExists("key9=0008=9"))
require.True(t, txmp.WasRecentlyEvicted(types.Tx("key9=0008=9").Key()))
require.False(t, txExists("key7=0006=7"))
require.True(t, txmp.WasRecentlyEvicted(types.Tx("key7=0006=7").Key()))

// Free up some space so we can add back previously evicted txs
err = txmp.Update(1, types.Txs{types.Tx("key10=0123456789abcdef=11")}, []*abci.ResponseDeliverTx{{Code: abci.CodeTypeOK}}, nil, nil)
Expand All @@ -296,6 +304,7 @@ func TestTxPool_Eviction(t *testing.T) {
// space for the previously evicted tx
require.NoError(t, txmp.RemoveTxByKey(types.Tx("key8=0007=20").Key()))
require.False(t, txExists("key8=0007=20"))
require.False(t, txmp.WasRecentlyEvicted(types.Tx("key8=0007=20").Key()))
}

func TestTxPool_Flush(t *testing.T) {
Expand Down Expand Up @@ -567,6 +576,10 @@ func TestTxPool_ExpiredTxs_Timestamp(t *testing.T) {

// All the transactions in the original set should have been purged.
for _, tx := range added1 {
// Check that it was added to the evictedTxCache
evicted := txmp.WasRecentlyEvicted(tx.tx.Key())
require.True(t, evicted)

if txmp.store.has(tx.tx.Key()) {
t.Errorf("Transaction %X should have been purged for TTL", tx.tx.Key())
}
Expand Down
2 changes: 1 addition & 1 deletion mempool/cat/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
txKey[:],
schema.Download,
)
tx, has := memR.mempool.Get(txKey)
tx, has := memR.mempool.GetTxByKey(txKey)
if has && !memR.opts.ListenOnly {
peerID := memR.ids.GetIDForPeer(e.Src.ID())
memR.Logger.Debug("sending a tx in response to a want msg", "peer", peerID)
Expand Down
10 changes: 7 additions & 3 deletions mempool/cat/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,19 +141,23 @@ func (s *store) getTxsBelowPriority(priority int64) ([]*wrappedTx, int64) {
}

// purgeExpiredTxs removes all transactions that are older than the given height
// and time. Returns the amount of transactions that were removed
func (s *store) purgeExpiredTxs(expirationHeight int64, expirationAge time.Time) int {
// and time. Returns the purged txs and amount of transactions that were purged.
func (s *store) purgeExpiredTxs(expirationHeight int64, expirationAge time.Time) ([]*wrappedTx, int) {
s.mtx.Lock()
defer s.mtx.Unlock()

var purgedTxs []*wrappedTx
counter := 0

for key, tx := range s.txs {
if tx.height < expirationHeight || tx.timestamp.Before(expirationAge) {
s.bytes -= tx.size()
delete(s.txs, key)
purgedTxs = append(purgedTxs, tx)
counter++
}
}
return counter
return purgedTxs, counter
}

func (s *store) reset() {
Expand Down
Loading
Loading