diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 5ba19fd2dc..87acfc235b 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -27,6 +27,7 @@ import ( mempl "github.com/cometbft/cometbft/mempool" "github.com/cometbft/cometbft/privval" cmtstate "github.com/cometbft/cometbft/proto/tendermint/state" + cmtstore "github.com/cometbft/cometbft/proto/tendermint/store" cmtproto "github.com/cometbft/cometbft/proto/tendermint/types" "github.com/cometbft/cometbft/proxy" sm "github.com/cometbft/cometbft/state" @@ -1221,6 +1222,7 @@ func (bs *mockBlockStore) LoadBlockMeta(height int64) *types.BlockMeta { func (bs *mockBlockStore) LoadBlockPart(height int64, index int) *types.Part { return nil } func (bs *mockBlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) { } +func (bs *mockBlockStore) LoadTxInfo(hash []byte) *cmtstore.TxInfo { return &cmtstore.TxInfo{} } func (bs *mockBlockStore) LoadBlockCommit(height int64) *types.Commit { return bs.commits[height-1] diff --git a/go.sum b/go.sum index ff36496557..0aaee895eb 100644 --- a/go.sum +++ b/go.sum @@ -1452,6 +1452,8 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= +google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/light/proxy/routes.go b/light/proxy/routes.go index be09dc191a..82dc42d185 100644 --- a/light/proxy/routes.go +++ b/light/proxy/routes.go @@ -39,6 +39,7 @@ func RPCRoutes(c *lrpc.Client) map[string]*rpcserver.RPCFunc { "consensus_params": rpcserver.NewRPCFunc(makeConsensusParamsFunc(c), "height", rpcserver.Cacheable("height")), "unconfirmed_txs": rpcserver.NewRPCFunc(makeUnconfirmedTxsFunc(c), "limit"), "num_unconfirmed_txs": rpcserver.NewRPCFunc(makeNumUnconfirmedTxsFunc(c), ""), + "tx_status": rpcserver.NewRPCFunc(makeTxStatusFunc(c), "hash"), // tx broadcast API "broadcast_tx_commit": rpcserver.NewRPCFunc(makeBroadcastTxCommitFunc(c), "tx"), @@ -143,6 +144,14 @@ func makeBlockResultsFunc(c *lrpc.Client) rpcBlockResultsFunc { } } +type rpcTxStatusFunc func(ctx *rpctypes.Context, hash []byte) (*ctypes.ResultTxStatus, error) + +func makeTxStatusFunc(c *lrpc.Client) rpcTxStatusFunc { + return func(ctx *rpctypes.Context, hash []byte) (*ctypes.ResultTxStatus, error) { + return c.TxStatus(ctx.Context(), hash) + } +} + type rpcCommitFunc func(ctx *rpctypes.Context, height *int64) (*ctypes.ResultCommit, error) func makeCommitFunc(c *lrpc.Client) rpcCommitFunc { diff --git a/light/rpc/client.go b/light/rpc/client.go index 965b3491a4..40a11f1063 100644 --- a/light/rpc/client.go +++ b/light/rpc/client.go @@ -487,6 +487,11 @@ func (c *Client) BlockResults(ctx context.Context, height *int64) (*ctypes.Resul return res, nil } +// TxStatus retrieves the status of the transaction given its hash. +func (c *Client) TxStatus(ctx context.Context, hash []byte) (*ctypes.ResultTxStatus, error) { + return c.next.TxStatus(ctx, hash) +} + // Header fetches and verifies the header directly via the light client func (c *Client) Header(ctx context.Context, height *int64) (*ctypes.ResultHeader, error) { lb, err := c.updateLightClientIfNeededTo(ctx, height) diff --git a/proto/tendermint/store/types.pb.go b/proto/tendermint/store/types.pb.go index ce29389f5a..30a52d8987 100644 --- a/proto/tendermint/store/types.pb.go +++ b/proto/tendermint/store/types.pb.go @@ -74,25 +74,80 @@ func (m *BlockStoreState) GetHeight() int64 { return 0 } +type TxInfo struct { + Height int64 `protobuf:"varint,1,opt,name=height,proto3" json:"height,omitempty"` + Index int64 `protobuf:"varint,2,opt,name=index,proto3" json:"index,omitempty"` +} + +func (m *TxInfo) Reset() { *m = TxInfo{} } +func (m *TxInfo) String() string { return proto.CompactTextString(m) } +func (*TxInfo) ProtoMessage() {} +func (*TxInfo) Descriptor() ([]byte, []int) { + return fileDescriptor_ff9e53a0a74267f7, []int{1} +} +func (m *TxInfo) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *TxInfo) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_TxInfo.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *TxInfo) XXX_Merge(src proto.Message) { + xxx_messageInfo_TxInfo.Merge(m, src) +} +func (m *TxInfo) XXX_Size() int { + return m.Size() +} +func (m *TxInfo) XXX_DiscardUnknown() { + xxx_messageInfo_TxInfo.DiscardUnknown(m) +} + +var xxx_messageInfo_TxInfo proto.InternalMessageInfo + +func (m *TxInfo) GetHeight() int64 { + if m != nil { + return m.Height + } + return 0 +} + +func (m *TxInfo) GetIndex() int64 { + if m != nil { + return m.Index + } + return 0 +} + func init() { proto.RegisterType((*BlockStoreState)(nil), "tendermint.store.BlockStoreState") + proto.RegisterType((*TxInfo)(nil), "tendermint.store.TxInfo") } func init() { proto.RegisterFile("tendermint/store/types.proto", fileDescriptor_ff9e53a0a74267f7) } var fileDescriptor_ff9e53a0a74267f7 = []byte{ - // 171 bytes of a gzipped FileDescriptorProto + // 199 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x29, 0x49, 0xcd, 0x4b, 0x49, 0x2d, 0xca, 0xcd, 0xcc, 0x2b, 0xd1, 0x2f, 0x2e, 0xc9, 0x2f, 0x4a, 0xd5, 0x2f, 0xa9, 0x2c, 0x48, 0x2d, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x12, 0x40, 0xc8, 0xea, 0x81, 0x65, 0x95, 0x6c, 0xb9, 0xf8, 0x9d, 0x72, 0xf2, 0x93, 0xb3, 0x83, 0x41, 0xbc, 0xe0, 0x92, 0xc4, 0x92, 0x54, 0x21, 0x21, 0x2e, 0x96, 0xa4, 0xc4, 0xe2, 0x54, 0x09, 0x46, 0x05, 0x46, 0x0d, 0xe6, 0x20, 0x30, 0x5b, 0x48, 0x8c, 0x8b, 0x2d, 0x23, 0x35, 0x33, 0x3d, 0xa3, 0x44, 0x82, 0x09, 0x2c, 0x0a, 0xe5, - 0x39, 0xf9, 0x9e, 0x78, 0x24, 0xc7, 0x78, 0xe1, 0x91, 0x1c, 0xe3, 0x83, 0x47, 0x72, 0x8c, 0x13, - 0x1e, 0xcb, 0x31, 0x5c, 0x78, 0x2c, 0xc7, 0x70, 0xe3, 0xb1, 0x1c, 0x43, 0x94, 0x71, 0x7a, 0x66, - 0x49, 0x46, 0x69, 0x92, 0x5e, 0x72, 0x7e, 0xae, 0x7e, 0x72, 0x7e, 0x6e, 0x6a, 0x49, 0x52, 0x5a, - 0x09, 0x82, 0x01, 0x76, 0x8e, 0x3e, 0xba, 0x5b, 0x93, 0xd8, 0xc0, 0xe2, 0xc6, 0x80, 0x00, 0x00, - 0x00, 0xff, 0xff, 0xb7, 0x2b, 0x34, 0x2a, 0xc6, 0x00, 0x00, 0x00, + 0x29, 0x99, 0x71, 0xb1, 0x85, 0x54, 0x78, 0xe6, 0xa5, 0xe5, 0x23, 0xa9, 0x60, 0x44, 0x56, 0x21, + 0x24, 0xc2, 0xc5, 0x9a, 0x99, 0x97, 0x92, 0x5a, 0x01, 0xd5, 0x08, 0xe1, 0x38, 0xf9, 0x9e, 0x78, + 0x24, 0xc7, 0x78, 0xe1, 0x91, 0x1c, 0xe3, 0x83, 0x47, 0x72, 0x8c, 0x13, 0x1e, 0xcb, 0x31, 0x5c, + 0x78, 0x2c, 0xc7, 0x70, 0xe3, 0xb1, 0x1c, 0x43, 0x94, 0x71, 0x7a, 0x66, 0x49, 0x46, 0x69, 0x92, + 0x5e, 0x72, 0x7e, 0xae, 0x7e, 0x72, 0x7e, 0x6e, 0x6a, 0x49, 0x52, 0x5a, 0x09, 0x82, 0x01, 0xf6, + 0x86, 0x3e, 0xba, 0x1f, 0x93, 0xd8, 0xc0, 0xe2, 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0xc5, + 0x4d, 0x38, 0x3d, 0xfe, 0x00, 0x00, 0x00, } func (m *BlockStoreState) Marshal() (dAtA []byte, err error) { @@ -128,6 +183,39 @@ func (m *BlockStoreState) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *TxInfo) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *TxInfo) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *TxInfo) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Index != 0 { + i = encodeVarintTypes(dAtA, i, uint64(m.Index)) + i-- + dAtA[i] = 0x10 + } + if m.Height != 0 { + i = encodeVarintTypes(dAtA, i, uint64(m.Height)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + func encodeVarintTypes(dAtA []byte, offset int, v uint64) int { offset -= sovTypes(v) base := offset @@ -154,6 +242,21 @@ func (m *BlockStoreState) Size() (n int) { return n } +func (m *TxInfo) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Height != 0 { + n += 1 + sovTypes(uint64(m.Height)) + } + if m.Index != 0 { + n += 1 + sovTypes(uint64(m.Index)) + } + return n +} + func sovTypes(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 } @@ -248,6 +351,94 @@ func (m *BlockStoreState) Unmarshal(dAtA []byte) error { } return nil } +func (m *TxInfo) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: TxInfo: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: TxInfo: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Height", wireType) + } + m.Height = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Height |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Index", wireType) + } + m.Index = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Index |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipTypes(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthTypes + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipTypes(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 diff --git a/proto/tendermint/store/types.proto b/proto/tendermint/store/types.proto index b510169a4c..33e5401060 100644 --- a/proto/tendermint/store/types.proto +++ b/proto/tendermint/store/types.proto @@ -7,3 +7,9 @@ message BlockStoreState { int64 base = 1; int64 height = 2; } + +// TxInfo describes the location of a tx inside a committed block. +message TxInfo { + int64 height = 1; + int64 index = 2; +} diff --git a/rpc/client/http/http.go b/rpc/client/http/http.go index bd7b372167..332976d5a8 100644 --- a/rpc/client/http/http.go +++ b/rpc/client/http/http.go @@ -516,6 +516,24 @@ func (c *baseRPCClient) DataCommitment( return result, nil } +func (c *baseRPCClient) TxStatus( + ctx context.Context, + hash []byte, +) (*ctypes.ResultTxStatus, error) { + result := new(ctypes.ResultTxStatus) + params := map[string]interface{}{ + "hash": hash, + } + + _, err := c.caller.Call(ctx, "tx_status", params, result) + if err != nil { + return nil, err + } + + return result, nil + +} + func (c *baseRPCClient) DataRootInclusionProof( ctx context.Context, height uint64, diff --git a/rpc/client/interface.go b/rpc/client/interface.go index a1e281799e..c5eb468ba1 100644 --- a/rpc/client/interface.go +++ b/rpc/client/interface.go @@ -103,6 +103,9 @@ type SignClient interface { page, perPage *int, orderBy string, ) (*ctypes.ResultBlockSearch, error) + + // TxStatus returns the transaction status for a given transaction hash. + TxStatus(ctx context.Context, hash []byte) (*ctypes.ResultTxStatus, error) } // HistoryClient provides access to data from genesis to now in large chunks. diff --git a/rpc/client/local/local.go b/rpc/client/local/local.go index 9cc55b5528..ba1b7e08c5 100644 --- a/rpc/client/local/local.go +++ b/rpc/client/local/local.go @@ -239,6 +239,10 @@ func (c *Local) BlockSearch( return core.BlockSearch(c.ctx, query, page, perPage, orderBy) } +func (c *Local) TxStatus(ctx context.Context, hash []byte) (*ctypes.ResultTxStatus, error) { + return core.TxStatus(c.ctx, hash) +} + func (c *Local) BroadcastEvidence(ctx context.Context, ev types.Evidence) (*ctypes.ResultBroadcastEvidence, error) { return core.BroadcastEvidence(c.ctx, ev) } diff --git a/rpc/client/rpc_test.go b/rpc/client/rpc_test.go index 0ae1af0787..6d6c96056e 100644 --- a/rpc/client/rpc_test.go +++ b/rpc/client/rpc_test.go @@ -535,6 +535,35 @@ func TestBlockSearch(t *testing.T) { require.Equal(t, blockCount, 0) } + +func TestTxStatus(t *testing.T) { + c := getHTTPClient() + + // first we broadcast a few txs + var txHashes [][]byte + var txHeights []int64 + for i := 0; i < 10; i++ { + _, _, tx := MakeTxKV() + + result, err := c.BroadcastTxCommit(context.Background(), tx) + require.NoError(t, err) + txHashes = append(txHashes, result.Hash) + txHeights = append(txHeights, result.Height) + } + + require.NoError(t, client.WaitForHeight(c, 5, nil)) + + // check the status of each transaction + for i, hash := range txHashes { + result, err := c.TxStatus(context.Background(), hash) + require.NoError(t, err) + + expectedIndex := int64(0) + require.Equal(t, txHeights[i], result.Height) + require.Equal(t, expectedIndex, result.Index) + } +} + func TestTxSearch(t *testing.T) { c := getHTTPClient() diff --git a/rpc/core/blocks.go b/rpc/core/blocks.go index c6ae1a2571..e3dd3b71de 100644 --- a/rpc/core/blocks.go +++ b/rpc/core/blocks.go @@ -178,6 +178,17 @@ func BlockByHash(ctx *rpctypes.Context, hash []byte) (*ctypes.ResultBlock, error return &ctypes.ResultBlock{BlockID: blockMeta.BlockID, Block: block}, nil } +// TxStatus retrieves the status of a transaction given its hash. It returns a ResultTxStatus +// containing the height and index of the transaction within the block. +func TxStatus(ctx *rpctypes.Context, hash []byte) (*ctypes.ResultTxStatus, error) { + env := GetEnvironment() + txInfo := env.BlockStore.LoadTxInfo(hash) + if txInfo == nil { + return &ctypes.ResultTxStatus{}, nil + } + return &ctypes.ResultTxStatus{Height: txInfo.Height, Index: txInfo.Index}, nil +} + // Commit gets block commit at a given height. // If no height is provided, it will fetch the commit for the latest block. // More: https://docs.cometbft.com/v0.34/rpc/#/Info/commit diff --git a/rpc/core/blocks_test.go b/rpc/core/blocks_test.go index 3d0d80eac7..54a2cf4f64 100644 --- a/rpc/core/blocks_test.go +++ b/rpc/core/blocks_test.go @@ -1,7 +1,9 @@ package core import ( + "bytes" "context" + "encoding/binary" "encoding/hex" "fmt" "testing" @@ -17,6 +19,7 @@ import ( abci "github.com/cometbft/cometbft/abci/types" cmtstate "github.com/cometbft/cometbft/proto/tendermint/state" + cmtstore "github.com/cometbft/cometbft/proto/tendermint/store" ctypes "github.com/cometbft/cometbft/rpc/core/types" rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" sm "github.com/cometbft/cometbft/state" @@ -123,6 +126,31 @@ func TestBlockResults(t *testing.T) { } } +func TestTxStatus(t *testing.T) { + env := &Environment{} + height := int64(50) + + blocks := randomBlocks(height) + blockStore := mockBlockStore{ + height: height, + blocks: blocks, + } + env.BlockStore = blockStore + + SetEnvironment(env) + + // Iterate over each block + for _, block := range blocks { + // Iterate over each transaction in the block + for i, tx := range block.Data.Txs { + txStatus, _ := TxStatus(&rpctypes.Context{}, tx.Hash()) + assert.Equal(t, block.Height, txStatus.Height) + assert.Equal(t, int64(i), txStatus.Index) + } + } + +} + func TestEncodeDataRootTuple(t *testing.T) { height := uint64(2) dataRoot, err := hex.DecodeString("82dc1607d84557d3579ce602a45f5872e821c36dbda7ec926dfa17ebc8d5c013") @@ -318,6 +346,21 @@ func (store mockBlockStore) LoadBlock(height int64) *types.Block { return store.blocks[height] } +func (store mockBlockStore) LoadTxInfo(hash []byte) *cmtstore.TxInfo { + for _, block := range store.blocks { + for i, tx := range block.Data.Txs { + // Check if transaction hash matches + if bytes.Equal(tx.Hash(), hash) { + return &cmtstore.TxInfo{ + Height: block.Header.Height, + Index: int64(i), + } + } + } + } + return nil +} + // mockBlockIndexer used to mock the set of indexed blocks and return a predefined one. type mockBlockIndexer struct { height int64 @@ -349,6 +392,16 @@ func randomBlocks(height int64) []*types.Block { return blocks } +func makeTxs(height int64) (txs []types.Tx) { + for i := 0; i < 10; i++ { + numBytes := make([]byte, 8) + binary.BigEndian.PutUint64(numBytes, uint64(height)) + + txs = append(txs, types.Tx(append(numBytes, byte(i)))) + } + return txs +} + // randomBlock generates a Block with a certain height and random data hash. func randomBlock(height int64) *types.Block { return &types.Block{ @@ -356,5 +409,8 @@ func randomBlock(height int64) *types.Block { Height: height, DataHash: cmtrand.Bytes(32), }, + Data: types.Data{ + Txs: makeTxs(height), + }, } } diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index b890bfacf1..e7dacb371d 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -59,6 +59,7 @@ func BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcas } } +// DEPRECATED: Use BroadcastTxSync or BroadcastTxAsync instead. // BroadcastTxCommit returns with the responses from CheckTx and DeliverTx. // More: https://docs.cometbft.com/v0.34/rpc/#/Tx/broadcast_tx_commit func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { diff --git a/rpc/core/routes.go b/rpc/core/routes.go index 318b9bea5e..5c9f283728 100644 --- a/rpc/core/routes.go +++ b/rpc/core/routes.go @@ -40,6 +40,7 @@ var Routes = map[string]*rpc.RPCFunc{ "consensus_params": rpc.NewRPCFunc(ConsensusParams, "height", rpc.Cacheable("height")), "unconfirmed_txs": rpc.NewRPCFunc(UnconfirmedTxs, "limit"), "num_unconfirmed_txs": rpc.NewRPCFunc(NumUnconfirmedTxs, ""), + "tx_status": rpc.NewRPCFunc(TxStatus, "hash"), // tx broadcast API "broadcast_tx_commit": rpc.NewRPCFunc(BroadcastTxCommit, "tx"), diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index 2a607a7565..be30c15e7a 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -60,6 +60,12 @@ type ResultCommit struct { CanonicalCommit bool `json:"canonical"` } +// ResultTxStatus contains info to locate a tx in a committed block. +type ResultTxStatus struct { + Height int64 `json:"height"` + Index int64 `json:"index"` +} + // ABCI results from a block type ResultBlockResults struct { Height int64 `json:"height"` diff --git a/state/mocks/block_store.go b/state/mocks/block_store.go index e38de9c88b..98f47dc320 100644 --- a/state/mocks/block_store.go +++ b/state/mocks/block_store.go @@ -4,7 +4,7 @@ package mocks import ( mock "github.com/stretchr/testify/mock" - + cmtstore "github.com/cometbft/cometbft/proto/tendermint/store" types "github.com/cometbft/cometbft/types" ) @@ -223,6 +223,10 @@ func (_m *BlockStore) Size() int64 { return r0 } +func (_m BlockStore) LoadTxInfo(txHash []byte) *cmtstore.TxInfo { + return &cmtstore.TxInfo{} +} + type mockConstructorTestingTNewBlockStore interface { mock.TestingT Cleanup(func()) diff --git a/state/services.go b/state/services.go index b3e4e28bd4..63b1b6850b 100644 --- a/state/services.go +++ b/state/services.go @@ -1,6 +1,7 @@ package state import ( + cmtstore "github.com/cometbft/cometbft/proto/tendermint/store" "github.com/cometbft/cometbft/types" ) @@ -35,6 +36,8 @@ type BlockStore interface { LoadBlockCommit(height int64) *types.Commit LoadSeenCommit(height int64) *types.Commit + LoadTxInfo(hash []byte) *cmtstore.TxInfo + DeleteLatestBlock() error } diff --git a/store/store.go b/store/store.go index 1c5e4404c1..80d6cd6467 100644 --- a/store/store.go +++ b/store/store.go @@ -37,7 +37,7 @@ type BlockStore struct { // fine-grained concurrency control for its data, and thus this mutex does not apply to // database contents. The only reason for keeping these fields in the struct is that the data // can't efficiently be queried from the database since the key encoding we use is not - // lexicographically ordered (see https://github.com/cometbft/cometbft/issues/4567). + // lexicographically ordered. mtx cmtsync.RWMutex base int64 height int64 @@ -292,8 +292,7 @@ func (bs *BlockStore) PruneBlocks(height int64) (uint64, error) { bs.mtx.Unlock() bs.saveState() - err := batch.WriteSync() - if err != nil { + if err := batch.WriteSync(); err != nil { return fmt.Errorf("failed to prune up to height %v: %w", base, err) } batch.Close() @@ -305,6 +304,12 @@ func (bs *BlockStore) PruneBlocks(height int64) (uint64, error) { if meta == nil { // assume already deleted continue } + block := bs.LoadBlock(h) + for _, tx := range block.Txs { + if err := batch.Delete(calcTxHashKey(tx.Hash())); err != nil { + return 0, err + } + } if err := batch.Delete(calcBlockMetaKey(h)); err != nil { return 0, err } @@ -402,6 +407,11 @@ func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, s panic(err) } + // Save Txs from the block + if err := bs.SaveTxInfo(block); err != nil { + panic(err) + } + // Done! bs.mtx.Lock() bs.height = height @@ -445,6 +455,30 @@ func (bs *BlockStore) SaveSeenCommit(height int64, seenCommit *types.Commit) err return bs.db.Set(calcSeenCommitKey(height), seenCommitBytes) } +// SaveTxInfo gets Tx hashes from the block converts them to TxInfo and persists them to the db. +func (bs *BlockStore) SaveTxInfo(block *types.Block) error { + // Create a new batch + batch := bs.db.NewBatch() + + // Batch and save txs from the block + for i, tx := range block.Txs { + txInfo := cmtstore.TxInfo{ + Height: block.Height, + Index: int64(i), + } + txInfoBytes, err := proto.Marshal(&txInfo) + if err != nil { + return fmt.Errorf("unable to marshal tx: %w", err) + } + if err := batch.Set(calcTxHashKey(tx.Hash()), txInfoBytes); err != nil { + return err + } + } + + // Write the batch to the db + return batch.WriteSync() +} + func (bs *BlockStore) Close() error { return bs.db.Close() } @@ -471,6 +505,10 @@ func calcBlockHashKey(hash []byte) []byte { return []byte(fmt.Sprintf("BH:%x", hash)) } +func calcTxHashKey(hash []byte) []byte { + return []byte(fmt.Sprintf("TH:%x", hash)) +} + //----------------------------------------------------------------------------- var blockStoreKey = []byte("blockStore") @@ -513,6 +551,23 @@ func LoadBlockStoreState(db dbm.DB) cmtstore.BlockStoreState { return bsj } +// LoadTxInfo loads the TxInfo from disk given its hash. +func (bs *BlockStore) LoadTxInfo(txHash []byte) *cmtstore.TxInfo { + bz, err := bs.db.Get(calcTxHashKey(txHash)) + if err != nil { + panic(err) + } + if len(bz) == 0 { + return nil + } + + var txi cmtstore.TxInfo + if err = proto.Unmarshal(bz, &txi); err != nil { + panic(fmt.Errorf("unmarshal to TxInfo failed: %w", err)) + } + return &txi +} + // mustEncode proto encodes a proto.message and panics if fails func mustEncode(pb proto.Message) []byte { bz, err := proto.Marshal(pb) diff --git a/store/store_test.go b/store/store_test.go index f962fa0cf1..af140660f5 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -2,6 +2,7 @@ package store import ( "bytes" + "encoding/binary" "fmt" "os" "runtime/debug" @@ -45,7 +46,10 @@ func makeTestCommit(height int64, timestamp time.Time) *types.Commit { func makeTxs(height int64) (txs []types.Tx) { for i := 0; i < 10; i++ { - txs = append(txs, types.Tx([]byte{byte(height), byte(i)})) + numBytes := make([]byte, 8) + binary.BigEndian.PutUint64(numBytes, uint64(height)) + + txs = append(txs, types.Tx(append(numBytes, byte(i)))) } return txs } @@ -375,6 +379,40 @@ func TestBlockStoreSaveLoadBlock(t *testing.T) { } } +func TestSaveBlockIndexesTxs(t *testing.T) { + // Create a state and a block store + state, blockStore, cleanup := makeStateAndBlockStore(log.NewTMLogger(new(bytes.Buffer))) + defer cleanup() + + // Create 1000 blocks + for h := int64(1); h <= 1000; h++ { + block := makeBlock(h, state, new(types.Commit)) + partSet := block.MakePartSet(2) + seenCommit := makeTestCommit(h, cmttime.Now()) + blockStore.SaveBlock(block, partSet, seenCommit) + } + + // Get the blocks from blockstore up to the height + for h := int64(1); h <= 1000; h++ { + block := blockStore.LoadBlock(h) + // Check that transactions exist in the block + for i, tx := range block.Txs { + txInfo := blockStore.LoadTxInfo(tx.Hash()) + require.Equal(t, block.Height, txInfo.Height) + require.Equal(t, int64(i), txInfo.Index) + } + } + + // Get a random transaction and make sure it's indexed properly + block := blockStore.LoadBlock(777) + tx := block.Txs[5] + txInfo := blockStore.LoadTxInfo(tx.Hash()) + require.Equal(t, block.Height, txInfo.Height) + require.Equal(t, block.Height, int64(777)) + require.Equal(t, txInfo.Height, int64(777)) + require.Equal(t, int64(5), txInfo.Index) +} + func TestLoadBaseMeta(t *testing.T) { config := cfg.ResetTestRoot("blockchain_reactor_test") defer os.RemoveAll(config.RootDir) @@ -526,6 +564,66 @@ func TestPruneBlocks(t *testing.T) { assert.Nil(t, bs.LoadBlock(1501)) } +func TestPruneBlocksPrunesTxs(t *testing.T) { + config := cfg.ResetTestRoot("blockchain_reactor_test") + defer os.RemoveAll(config.RootDir) + + stateStore := sm.NewStore(dbm.NewMemDB(), sm.StoreOptions{ + DiscardABCIResponses: false, + }) + state, err := stateStore.LoadFromDBOrGenesisFile(config.GenesisFile()) + require.NoError(t, err) + db := dbm.NewMemDB() + blockStore := NewBlockStore(db) + + // Make more than 1000 blocks, to test batch deletions + // Make a copy of txs before batches are deleted + // to make sure that they are correctly pruned + var indexedTxHashes [][]byte + for h := int64(1); h <= 1500; h++ { + block := makeBlock(h, state, new(types.Commit)) + partSet := block.MakePartSet(2) + seenCommit := makeTestCommit(h, cmttime.Now()) + blockStore.SaveBlock(block, partSet, seenCommit) + for _, tx := range block.Txs { + indexedTxHashes = append(indexedTxHashes, tx.Hash()) + } + } + + // Check that the saved txs exist in the db + for _, hash := range indexedTxHashes { + txInfo := blockStore.LoadTxInfo(hash) + + require.NoError(t, err) + require.NotNil(t, txInfo, "Transaction was not saved in the database") + } + + pruned, err := blockStore.PruneBlocks(1200) + require.NoError(t, err) + assert.EqualValues(t, 1199, pruned) + + // Check that the transactions in the pruned blocks have been removed + // We removed 1199 blocks, each block has 10 txs + // so 11990 txs should no longer exist in the db + for i, hash := range indexedTxHashes { + if int64(i) < 1199*10 { + txInfo := blockStore.LoadTxInfo(hash) + require.Nil(t, txInfo) + } + } + + // Check that transactions in remaining blocks are still there + for h := int64(pruned + 1); h <= 1500; h++ { + block := blockStore.LoadBlock(h) + for i, tx := range block.Txs { + txInfo := blockStore.LoadTxInfo(tx.Hash()) + require.NoError(t, err) + require.Equal(t, h, txInfo.Height) + require.Equal(t, int64(i), txInfo.Index) + } + } +} + func TestLoadBlockMeta(t *testing.T) { bs, db := freshBlockStore() height := int64(10)