From 68d87ab10403ba06a74bf27869767c625a1b91ac Mon Sep 17 00:00:00 2001 From: Sanaz Taheri <35961250+staheri14@users.noreply.github.com> Date: Wed, 5 Jun 2024 14:38:08 -0700 Subject: [PATCH 1/8] chore: bumps go version to 1.22.4 in main (#1378) This together with #1379 close #1377 --- .github/workflows/check-generated.yml | 2 +- .github/workflows/coverage.yml | 6 +++--- .github/workflows/e2e-manual.yml | 2 +- .github/workflows/e2e-nightly-34x.yml | 2 +- .github/workflows/e2e.yml | 2 +- .github/workflows/fuzz-nightly.yml | 2 +- .github/workflows/govulncheck.yml | 2 +- .github/workflows/pre-release.yml | 2 +- .github/workflows/release-version.yml | 2 +- .github/workflows/release.yml | 2 +- .github/workflows/tests.yml | 4 ++-- DOCKER/Dockerfile | 2 +- README.md | 2 +- go.mod | 2 +- scripts/proto-gen.sh | 2 +- test/docker/Dockerfile | 2 +- test/e2e/docker/Dockerfile | 2 +- 17 files changed, 20 insertions(+), 20 deletions(-) diff --git a/.github/workflows/check-generated.yml b/.github/workflows/check-generated.yml index 330519dfa1..a47a31ada9 100644 --- a/.github/workflows/check-generated.yml +++ b/.github/workflows/check-generated.yml @@ -43,7 +43,7 @@ jobs: steps: - uses: actions/setup-go@v4 with: - go-version: "1.22.3" + go-version: "1.22.4" - uses: actions/checkout@v3 with: diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index 0df28f42a7..0467f72618 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -12,7 +12,7 @@ jobs: - uses: actions/checkout@v3 - uses: actions/setup-go@v4 with: - go-version: "1.22.3" + go-version: "1.22.4" - name: Create a file with all the pkgs run: go list ./... > pkgs.txt - name: Split pkgs into 4 files @@ -48,7 +48,7 @@ jobs: steps: - uses: actions/setup-go@v4 with: - go-version: "1.22.3" + go-version: "1.22.4" - uses: actions/checkout@v3 - uses: technote-space/get-diff-action@v6 with: @@ -70,7 +70,7 @@ jobs: steps: - uses: actions/setup-go@v4 with: - go-version: "1.22.3" + go-version: "1.22.4" - uses: actions/checkout@v3 - uses: technote-space/get-diff-action@v6 with: diff --git a/.github/workflows/e2e-manual.yml b/.github/workflows/e2e-manual.yml index 2102738cbe..a888a6b806 100644 --- a/.github/workflows/e2e-manual.yml +++ b/.github/workflows/e2e-manual.yml @@ -16,7 +16,7 @@ jobs: steps: - uses: actions/setup-go@v4 with: - go-version: '1.22.3' + go-version: '1.22.4' - uses: actions/checkout@v3 diff --git a/.github/workflows/e2e-nightly-34x.yml b/.github/workflows/e2e-nightly-34x.yml index 87662d77c6..ea18e2e07c 100644 --- a/.github/workflows/e2e-nightly-34x.yml +++ b/.github/workflows/e2e-nightly-34x.yml @@ -23,7 +23,7 @@ jobs: steps: - uses: actions/setup-go@v4 with: - go-version: '1.22.3' + go-version: '1.22.4' - uses: actions/checkout@v3 with: diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 0c48ed38c9..e33454c7cb 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -14,7 +14,7 @@ jobs: steps: - uses: actions/setup-go@v4 with: - go-version: '1.22.3' + go-version: '1.22.4' - uses: actions/checkout@v3 - uses: technote-space/get-diff-action@v6 with: diff --git a/.github/workflows/fuzz-nightly.yml b/.github/workflows/fuzz-nightly.yml index 779fc6b200..223fb80f08 100644 --- a/.github/workflows/fuzz-nightly.yml +++ b/.github/workflows/fuzz-nightly.yml @@ -11,7 +11,7 @@ jobs: steps: - uses: actions/setup-go@v4 with: - go-version: '1.22.3' + go-version: '1.22.4' - uses: actions/checkout@v3 diff --git a/.github/workflows/govulncheck.yml b/.github/workflows/govulncheck.yml index 82b312d096..936ace5117 100644 --- a/.github/workflows/govulncheck.yml +++ b/.github/workflows/govulncheck.yml @@ -16,7 +16,7 @@ jobs: steps: - uses: actions/setup-go@v3 with: - go-version: "1.22.3" + go-version: "1.22.4" - uses: actions/checkout@v3 - uses: technote-space/get-diff-action@v6 with: diff --git a/.github/workflows/pre-release.yml b/.github/workflows/pre-release.yml index c95932a73a..b92ec2d514 100644 --- a/.github/workflows/pre-release.yml +++ b/.github/workflows/pre-release.yml @@ -18,7 +18,7 @@ jobs: - uses: actions/setup-go@v4 with: - go-version: '1.22.3' + go-version: '1.22.4' # Similar check to ./release-version.yml, but enforces this when pushing # tags. The ./release-version.yml check can be bypassed and is mainly diff --git a/.github/workflows/release-version.yml b/.github/workflows/release-version.yml index 606e8708c6..7a3ef881c0 100644 --- a/.github/workflows/release-version.yml +++ b/.github/workflows/release-version.yml @@ -15,7 +15,7 @@ jobs: - uses: actions/setup-go@v4 with: - go-version: '1.22.3' + go-version: '1.22.4' - name: Check version run: | diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 33bcfeb492..707fa505ab 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -16,7 +16,7 @@ jobs: - uses: actions/setup-go@v4 with: - go-version: '1.22.3' + go-version: '1.22.4' - name: Generate release notes run: | diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 5cef244323..41cfae35bb 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -25,7 +25,7 @@ jobs: steps: - uses: actions/setup-go@v4 with: - go-version: "1.22.3" + go-version: "1.22.4" - uses: actions/checkout@v3 - uses: technote-space/get-diff-action@v6 with: @@ -58,7 +58,7 @@ jobs: # steps: # - uses: actions/setup-go@v3 # with: - # go-version: "1.22.3" + # go-version: "1.22.4" # - uses: actions/checkout@v3 # - uses: technote-space/get-diff-action@v6 # with: diff --git a/DOCKER/Dockerfile b/DOCKER/Dockerfile index dd23a5f6e5..c1ee8ad8e7 100644 --- a/DOCKER/Dockerfile +++ b/DOCKER/Dockerfile @@ -1,6 +1,6 @@ # Use a build arg to ensure that both stages use the same, # hopefully current, go version. -ARG GOLANG_BASE_IMAGE=golang:1.22.3-alpine +ARG GOLANG_BASE_IMAGE=golang:1.22.4-alpine # stage 1 Generate CometBFT Binary FROM --platform=$BUILDPLATFORM $GOLANG_BASE_IMAGE as builder diff --git a/README.md b/README.md index af127c16dc..26960c4e9e 100644 --- a/README.md +++ b/README.md @@ -48,7 +48,7 @@ This repo intends on preserving the minimal possible diff with [cometbft/cometbf - **specific to Celestia**: consider if [celestia-app](https://github.com/celestiaorg/celestia-app) is a better target - **not specific to Celestia**: consider making the contribution upstream in CometBFT -1. [Install Go](https://go.dev/doc/install) 1.22.3+ +1. [Install Go](https://go.dev/doc/install) 1.22.4+ 2. Fork this repo 3. Clone your fork 4. Find an issue to work on (see [good first issues](https://github.com/celestiaorg/celestia-core/issues?q=is%3Aopen+is%3Aissue+label%3A%22good+first+issue%22)) diff --git a/go.mod b/go.mod index c95f1f0779..97ff458c46 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/cometbft/cometbft -go 1.22.3 +go 1.22.4 require ( github.com/BurntSushi/toml v1.2.1 diff --git a/scripts/proto-gen.sh b/scripts/proto-gen.sh index 2faada1b19..4cf0656326 100755 --- a/scripts/proto-gen.sh +++ b/scripts/proto-gen.sh @@ -10,7 +10,7 @@ cd "$(git rev-parse --show-toplevel)" # Run inside Docker to install the correct versions of the required tools # without polluting the local system. -docker run --rm -i -v "$PWD":/w --workdir=/w golang:1.22.3-alpine sh <<"EOF" +docker run --rm -i -v "$PWD":/w --workdir=/w golang:1.22.4-alpine sh <<"EOF" apk add git make go install github.com/bufbuild/buf/cmd/buf diff --git a/test/docker/Dockerfile b/test/docker/Dockerfile index 7345f28bcd..364589845a 100644 --- a/test/docker/Dockerfile +++ b/test/docker/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.22.3 +FROM golang:1.22.4 # Grab deps (jq, hexdump, xxd, killall) RUN apt-get update && \ diff --git a/test/e2e/docker/Dockerfile b/test/e2e/docker/Dockerfile index 091b704f9c..41d0fc4937 100644 --- a/test/e2e/docker/Dockerfile +++ b/test/e2e/docker/Dockerfile @@ -1,7 +1,7 @@ # We need to build in a Linux environment to support C libraries, e.g. RocksDB. # We use Debian instead of Alpine, so that we can use binary database packages # instead of spending time compiling them. -FROM golang:1.22.3-bullseye +FROM golang:1.22.4-bullseye RUN apt-get -qq update -y && apt-get -qq upgrade -y >/dev/null RUN apt-get -qq install -y libleveldb-dev librocksdb-dev >/dev/null From 800924f1db5256e0d2905ae1001384ca76de9e15 Mon Sep 17 00:00:00 2001 From: Sanaz Taheri <35961250+staheri14@users.noreply.github.com> Date: Thu, 6 Jun 2024 07:46:17 -0700 Subject: [PATCH 2/8] feat: enhances S3Download to filter by traced table names (#1374) This PR updates the `S3Download` implementation to allow specifying the names of the traced tables when downloading from an S3 bucket. This enhancement enables file filtering and reduces the amount of data downloaded. It is a desirable feature when interacting with the traced data of large network tests. If no table name is provided, then `S3Download` downloads all the traced tables. --- pkg/trace/fileserver.go | 72 +++++++++++++++++++++++++---------------- 1 file changed, 44 insertions(+), 28 deletions(-) diff --git a/pkg/trace/fileserver.go b/pkg/trace/fileserver.go index 3f759ef0df..e4b4ee8f10 100644 --- a/pkg/trace/fileserver.go +++ b/pkg/trace/fileserver.go @@ -259,7 +259,9 @@ func (lt *LocalTracer) PushAll() error { // S3Download downloads files that match some prefix from an S3 bucket to a // local directory dst. -func S3Download(dst, prefix string, cfg S3Config) error { +// fileNames is a list of traced jsonl file names to download. If it is empty, all traces are downloaded. +// fileNames should not have .jsonl suffix. +func S3Download(dst, prefix string, fileNames []string, cfg S3Config) error { // Ensure local directory structure exists err := os.MkdirAll(dst, os.ModePerm) if err != nil { @@ -288,37 +290,51 @@ func S3Download(dst, prefix string, cfg S3Config) error { err = s3Svc.ListObjectsV2Pages(input, func(page *s3.ListObjectsV2Output, lastPage bool) bool { for _, content := range page.Contents { - localFilePath := filepath.Join(dst, prefix, strings.TrimPrefix(*content.Key, prefix)) - fmt.Printf("Downloading %s to %s\n", *content.Key, localFilePath) + key := *content.Key - // Create the directories in the path - if err := os.MkdirAll(filepath.Dir(localFilePath), os.ModePerm); err != nil { - return false + // If no fileNames are specified, download all files + if len(fileNames) == 0 { + fileNames = append(fileNames, strings.TrimPrefix(key, prefix)) } - // Create a file to write the S3 Object contents to. - f, err := os.Create(localFilePath) - if err != nil { - return false - } - - resp, err := s3Svc.GetObject(&s3.GetObjectInput{ - Bucket: aws.String(cfg.BucketName), - Key: aws.String(*content.Key), - }) - if err != nil { - f.Close() - continue + for _, filename := range fileNames { + // Add .jsonl suffix to the fileNames + fullFilename := filename + ".jsonl" + if strings.HasSuffix(key, fullFilename) { + localFilePath := filepath.Join(dst, prefix, strings.TrimPrefix(key, prefix)) + fmt.Printf("Downloading %s to %s\n", key, localFilePath) + + // Create the directories in the path + if err := os.MkdirAll(filepath.Dir(localFilePath), os.ModePerm); err != nil { + return false + } + + // Create a file to write the S3 Object contents to. + f, err := os.Create(localFilePath) + if err != nil { + return false + } + + resp, err := s3Svc.GetObject(&s3.GetObjectInput{ + Bucket: aws.String(cfg.BucketName), + Key: aws.String(key), + }) + if err != nil { + f.Close() + continue + } + defer resp.Body.Close() + + // Copy the contents of the S3 object to the local file + if _, err := io.Copy(f, resp.Body); err != nil { + f.Close() + return false + } + + fmt.Printf("Successfully downloaded %s to %s\n", key, localFilePath) + f.Close() + } } - defer resp.Body.Close() - - // Copy the contents of the S3 object to the local file - if _, err := io.Copy(f, resp.Body); err != nil { - return false - } - - fmt.Printf("Successfully downloaded %s to %s\n", *content.Key, localFilePath) - f.Close() } return !lastPage // continue paging }) From ff2bff4a29aec6851499e34b7d84349d8e774dd9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?nina=20/=20=E1=83=9C=E1=83=98=E1=83=9C=E1=83=90?= Date: Tue, 18 Jun 2024 15:37:06 +0200 Subject: [PATCH 3/8] feat: add tx status in mempool (#1287) ## Description Fixes #1281 Opens https://github.com/celestiaorg/celestia-core/issues/1381 --------- Co-authored-by: Rootul P Co-authored-by: Callum Waters --- consensus/replay_stubs.go | 6 +- go.mod | 17 +- go.sum | 33 ++-- mempool/cache.go | 20 +- mempool/cat/pool.go | 36 +++- mempool/cat/pool_test.go | 15 +- mempool/cat/reactor.go | 2 +- mempool/cat/store.go | 10 +- mempool/mempool.go | 9 + mempool/mock/mempool.go | 17 +- mempool/v0/clist_mempool.go | 15 ++ mempool/v0/clist_mempool_test.go | 26 +++ mempool/v1/mempool.go | 27 ++- mempool/v1/mempool_test.go | 43 ++++- rpc/client/rpc_test.go | 60 ++++-- rpc/core/blocks.go | 11 -- rpc/core/blocks_test.go | 25 --- rpc/core/mocks/mempool.go | 240 ++++++++++++++++++++++++ rpc/core/tx.go | 41 ++++ rpc/core/tx_status_test.go | 133 +++++++++++++ rpc/core/types/responses.go | 5 +- state/mocks/block_store.go | 2 +- test/maverick/consensus/replay_stubs.go | 3 + 23 files changed, 683 insertions(+), 113 deletions(-) create mode 100644 rpc/core/mocks/mempool.go create mode 100644 rpc/core/tx_status_test.go diff --git a/consensus/replay_stubs.go b/consensus/replay_stubs.go index a39fbbc989..60d587edd4 100644 --- a/consensus/replay_stubs.go +++ b/consensus/replay_stubs.go @@ -47,8 +47,10 @@ func (emptyMempool) TxsBytes() int64 { return 0 } func (emptyMempool) TxsFront() *clist.CElement { return nil } func (emptyMempool) TxsWaitChan() <-chan struct{} { return nil } -func (emptyMempool) InitWAL() error { return nil } -func (emptyMempool) CloseWAL() {} +func (emptyMempool) InitWAL() error { return nil } +func (emptyMempool) CloseWAL() {} +func (emptyMempool) GetTxByKey(types.TxKey) (types.Tx, bool) { return nil, false } +func (emptyMempool) WasRecentlyEvicted(types.TxKey) bool { return false } //----------------------------------------------------------------------------- // mockProxyApp uses ABCIResponses to give the right results. diff --git a/go.mod b/go.mod index 97ff458c46..1a171cfe7b 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/go-logfmt/logfmt v0.5.1 github.com/gofrs/uuid v4.3.0+incompatible github.com/gogo/protobuf v1.3.2 + github.com/golang/mock v1.4.4 github.com/golang/protobuf v1.5.3 github.com/golangci/golangci-lint v1.50.1 github.com/google/orderedcode v0.0.1 @@ -49,8 +50,8 @@ require ( go.opentelemetry.io/otel v1.21.0 go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.18.0 go.opentelemetry.io/otel/sdk v1.21.0 - golang.org/x/crypto v0.21.0 - golang.org/x/net v0.23.0 + golang.org/x/crypto v0.24.0 + golang.org/x/net v0.26.0 gonum.org/v1/gonum v0.8.2 google.golang.org/grpc v1.59.0 google.golang.org/protobuf v1.31.0 @@ -275,12 +276,12 @@ require ( go.uber.org/zap v1.23.0 // indirect golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect golang.org/x/exp/typeparams v0.0.0-20220827204233-334a2380cb91 // indirect - golang.org/x/mod v0.12.0 // indirect - golang.org/x/sync v0.3.0 // indirect - golang.org/x/sys v0.18.0 // indirect - golang.org/x/term v0.18.0 // indirect - golang.org/x/text v0.14.0 // indirect - golang.org/x/tools v0.13.0 // indirect + golang.org/x/mod v0.18.0 // indirect + golang.org/x/sync v0.7.0 // indirect + golang.org/x/sys v0.21.0 // indirect + golang.org/x/term v0.21.0 // indirect + golang.org/x/text v0.16.0 // indirect + golang.org/x/tools v0.22.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/warnings.v0 v0.1.2 // indirect diff --git a/go.sum b/go.sum index cb28b78e71..d1ef0dce8b 100644 --- a/go.sum +++ b/go.sum @@ -380,6 +380,7 @@ github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFU github.com/golang/mock v1.4.0/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= +github.com/golang/mock v1.4.4 h1:l75CXGRSwbaYNpl/Z2X1XIIAMSCquvXgpVZDhwEIJsc= github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -989,8 +990,8 @@ golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.3.1-0.20221117191849-2c476679df9a/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4= golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= -golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= -golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= +golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI= +golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -1038,8 +1039,8 @@ golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc= -golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.18.0 h1:5+9lSbEzPSdWkH32vYPBwEpX8KwDbM52Ud9xBUvNlb0= +golang.org/x/mod v0.18.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20180719180050-a680a1efc54d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1089,8 +1090,8 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= -golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= -golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= +golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -1117,8 +1118,8 @@ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= -golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1200,15 +1201,15 @@ golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= -golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= -golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8= -golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58= +golang.org/x/term v0.21.0 h1:WVXCp+/EBEHOj53Rvu+7KiT/iElMrO8ACK16SMZ3jaA= +golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -1220,8 +1221,8 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= -golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= -golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -1312,8 +1313,8 @@ golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E golang.org/x/tools v0.1.11/go.mod h1:SgwaegtQh8clINPpECJMqnxLv9I09HLqnW3RMqW0CA4= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= -golang.org/x/tools v0.13.0 h1:Iey4qkscZuv0VvIt8E0neZjtPVQFSc870HQ448QgEmQ= -golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58= +golang.org/x/tools v0.22.0 h1:gqSGLZqv+AI9lIQzniJ0nZDRG5GBPsSi+DRNHWNz6yA= +golang.org/x/tools v0.22.0/go.mod h1:aCwcsjqvq7Yqt6TNyX7QMU2enbQ/Gt0bo6krSeEri+c= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/mempool/cache.go b/mempool/cache.go index 3ed2b27b14..8141f2ae23 100644 --- a/mempool/cache.go +++ b/mempool/cache.go @@ -26,6 +26,9 @@ type TxCache interface { // Has reports whether tx is present in the cache. Checking for presence is // not treated as an access of the value. Has(tx types.Tx) bool + + // HasKey reports whether the given key is present in the cache. + HasKey(key types.TxKey) bool } var _ TxCache = (*LRUTxCache)(nil) @@ -113,12 +116,21 @@ func (c *LRUTxCache) Has(tx types.Tx) bool { return ok } +func (c *LRUTxCache) HasKey(key types.TxKey) bool { + c.mtx.Lock() + defer c.mtx.Unlock() + + _, ok := c.cacheMap[key] + return ok +} + // NopTxCache defines a no-op raw transaction cache. type NopTxCache struct{} var _ TxCache = (*NopTxCache)(nil) -func (NopTxCache) Reset() {} -func (NopTxCache) Push(types.Tx) bool { return true } -func (NopTxCache) Remove(types.Tx) {} -func (NopTxCache) Has(types.Tx) bool { return false } +func (NopTxCache) Reset() {} +func (NopTxCache) Push(types.Tx) bool { return true } +func (NopTxCache) Remove(types.Tx) {} +func (NopTxCache) Has(types.Tx) bool { return false } +func (NopTxCache) HasKey(types.TxKey) bool { return false } diff --git a/mempool/cat/pool.go b/mempool/cat/pool.go index a1a86e1ef4..91caa1d86e 100644 --- a/mempool/cat/pool.go +++ b/mempool/cat/pool.go @@ -64,6 +64,8 @@ type TxPool struct { // Thread-safe cache of rejected transactions for quick look-up rejectedTxCache *LRUTxCache + // Thread-safe cache of evicted transactions for quick look-up + evictedTxCache *LRUTxCache // Thread-safe list of transactions peers have seen that we have not yet seen seenByPeersSet *SeenTxSet @@ -92,6 +94,7 @@ func NewTxPool( proxyAppConn: proxyAppConn, metrics: mempool.NopMetrics(), rejectedTxCache: NewLRUTxCache(cfg.CacheSize), + evictedTxCache: NewLRUTxCache(cfg.CacheSize / 5), seenByPeersSet: NewSeenTxSet(), height: height, preCheckFn: func(_ types.Tx) error { return nil }, @@ -171,9 +174,15 @@ func (txmp *TxPool) Has(txKey types.TxKey) bool { return txmp.store.has(txKey) } -// Get retrieves a transaction based on the key. It returns a bool -// if the transaction exists or not +// Get retrieves a transaction based on the key. +// Deprecated: use GetTxByKey instead. func (txmp *TxPool) Get(txKey types.TxKey) (types.Tx, bool) { + return txmp.GetTxByKey(txKey) +} + +// GetTxByKey retrieves a transaction based on the key. It returns a bool +// indicating whether transaction was found in the cache. +func (txmp *TxPool) GetTxByKey(txKey types.TxKey) (types.Tx, bool) { wtx := txmp.store.get(txKey) if wtx != nil { return wtx.tx, true @@ -181,6 +190,12 @@ func (txmp *TxPool) Get(txKey types.TxKey) (types.Tx, bool) { return types.Tx{}, false } +// WasRecentlyEvicted returns a bool indicating whether the transaction with +// the specified key was recently evicted and is currently within the cache. +func (txmp *TxPool) WasRecentlyEvicted(txKey types.TxKey) bool { + return txmp.evictedTxCache.Has(txKey) +} + // IsRejectedTx returns true if the transaction was recently rejected and is // currently within the cache func (txmp *TxPool) IsRejectedTx(txKey types.TxKey) bool { @@ -195,9 +210,13 @@ func (txmp *TxPool) CheckToPurgeExpiredTxs() { defer txmp.updateMtx.Unlock() if txmp.config.TTLDuration > 0 && time.Since(txmp.lastPurgeTime) > txmp.config.TTLDuration { expirationAge := time.Now().Add(-txmp.config.TTLDuration) - // a height of 0 means no transactions will be removed because of height + // A height of 0 means no transactions will be removed because of height // (in other words, no transaction has a height less than 0) - numExpired := txmp.store.purgeExpiredTxs(0, expirationAge) + purgedTxs, numExpired := txmp.store.purgeExpiredTxs(0, expirationAge) + // Add the purged transactions to the evicted cache + for _, tx := range purgedTxs { + txmp.evictedTxCache.Push(tx.key) + } txmp.metrics.EvictedTxs.Add(float64(numExpired)) txmp.lastPurgeTime = time.Now() } @@ -373,6 +392,7 @@ func (txmp *TxPool) Flush() { txmp.store.reset() txmp.seenByPeersSet.Reset() txmp.rejectedTxCache.Reset() + txmp.evictedTxCache.Reset() txmp.metrics.EvictedTxs.Add(float64(size)) txmp.broadcastMtx.Lock() defer txmp.broadcastMtx.Unlock() @@ -537,6 +557,7 @@ func (txmp *TxPool) addNewTransaction(wtx *wrappedTx, checkTxRes *abci.ResponseC // drop the new one. if len(victims) == 0 || victimBytes < wtx.size() { txmp.metrics.EvictedTxs.Add(1) + txmp.evictedTxCache.Push(wtx.key) checkTxRes.MempoolError = fmt.Sprintf("rejected valid incoming transaction; mempool is full (%X)", wtx.key) return fmt.Errorf("rejected valid incoming transaction; mempool is full (%X). Size: (%d:%d)", @@ -591,6 +612,7 @@ func (txmp *TxPool) addNewTransaction(wtx *wrappedTx, checkTxRes *abci.ResponseC func (txmp *TxPool) evictTx(wtx *wrappedTx) { txmp.store.remove(wtx.key) + txmp.evictedTxCache.Push(wtx.key) txmp.metrics.EvictedTxs.Add(1) txmp.logger.Debug( "evicted valid existing transaction; mempool full", @@ -720,7 +742,11 @@ func (txmp *TxPool) purgeExpiredTxs(blockHeight int64) { expirationAge = time.Time{} } - numExpired := txmp.store.purgeExpiredTxs(expirationHeight, expirationAge) + purgedTxs, numExpired := txmp.store.purgeExpiredTxs(expirationHeight, expirationAge) + // Add the purged transactions to the evicted cache + for _, tx := range purgedTxs { + txmp.evictedTxCache.Push(tx.key) + } txmp.metrics.EvictedTxs.Add(float64(numExpired)) // purge old evicted and seen transactions diff --git a/mempool/cat/pool_test.go b/mempool/cat/pool_test.go index 7785123446..5f27448420 100644 --- a/mempool/cat/pool_test.go +++ b/mempool/cat/pool_test.go @@ -244,6 +244,7 @@ func TestTxPool_Eviction(t *testing.T) { mustCheckTx(t, txmp, "key1=0000=25") require.True(t, txExists("key1=0000=25")) require.False(t, txExists(bigTx)) + require.True(t, txmp.WasRecentlyEvicted(types.Tx(bigTx).Key())) require.Equal(t, int64(len("key1=0000=25")), txmp.SizeBytes()) // Now fill up the rest of the slots with other transactions. @@ -257,23 +258,27 @@ func TestTxPool_Eviction(t *testing.T) { require.Error(t, err) require.Contains(t, err.Error(), "mempool is full") require.False(t, txExists("key6=0005=1")) + require.True(t, txmp.WasRecentlyEvicted(types.Tx("key6=0005=1").Key())) // A new transaction with higher priority should evict key5, which is the // newest of the two transactions with lowest priority. mustCheckTx(t, txmp, "key7=0006=7") require.True(t, txExists("key7=0006=7")) // new transaction added require.False(t, txExists("key5=0004=3")) // newest low-priority tx evicted - require.True(t, txExists("key4=0003=3")) // older low-priority tx retained + require.True(t, txmp.WasRecentlyEvicted(types.Tx("key5=0004=3").Key())) + require.True(t, txExists("key4=0003=3")) // older low-priority tx retained // Another new transaction evicts the other low-priority element. mustCheckTx(t, txmp, "key8=0007=20") require.True(t, txExists("key8=0007=20")) require.False(t, txExists("key4=0003=3")) + require.True(t, txmp.WasRecentlyEvicted(types.Tx("key4=0003=3").Key())) // Now the lowest-priority tx is 5, so that should be the next to go. mustCheckTx(t, txmp, "key9=0008=9") require.True(t, txExists("key9=0008=9")) require.False(t, txExists("key2=0001=5")) + require.True(t, txmp.WasRecentlyEvicted(types.Tx("key2=0001=5").Key())) // Add a transaction that requires eviction of multiple lower-priority // entries, in order to fit the size of the element. @@ -282,8 +287,11 @@ func TestTxPool_Eviction(t *testing.T) { require.True(t, txExists("key8=0007=20")) require.True(t, txExists("key10=0123456789abcdef=11")) require.False(t, txExists("key3=0002=10")) + require.True(t, txmp.WasRecentlyEvicted(types.Tx("key3=0002=10").Key())) require.False(t, txExists("key9=0008=9")) + require.True(t, txmp.WasRecentlyEvicted(types.Tx("key9=0008=9").Key())) require.False(t, txExists("key7=0006=7")) + require.True(t, txmp.WasRecentlyEvicted(types.Tx("key7=0006=7").Key())) // Free up some space so we can add back previously evicted txs err = txmp.Update(1, types.Txs{types.Tx("key10=0123456789abcdef=11")}, []*abci.ResponseDeliverTx{{Code: abci.CodeTypeOK}}, nil, nil) @@ -296,6 +304,7 @@ func TestTxPool_Eviction(t *testing.T) { // space for the previously evicted tx require.NoError(t, txmp.RemoveTxByKey(types.Tx("key8=0007=20").Key())) require.False(t, txExists("key8=0007=20")) + require.False(t, txmp.WasRecentlyEvicted(types.Tx("key8=0007=20").Key())) } func TestTxPool_Flush(t *testing.T) { @@ -567,6 +576,10 @@ func TestTxPool_ExpiredTxs_Timestamp(t *testing.T) { // All the transactions in the original set should have been purged. for _, tx := range added1 { + // Check that it was added to the evictedTxCache + evicted := txmp.WasRecentlyEvicted(tx.tx.Key()) + require.True(t, evicted) + if txmp.store.has(tx.tx.Key()) { t.Errorf("Transaction %X should have been purged for TTL", tx.tx.Key()) } diff --git a/mempool/cat/reactor.go b/mempool/cat/reactor.go index 01d2be4c72..76c918ea23 100644 --- a/mempool/cat/reactor.go +++ b/mempool/cat/reactor.go @@ -318,7 +318,7 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) { txKey[:], schema.Download, ) - tx, has := memR.mempool.Get(txKey) + tx, has := memR.mempool.GetTxByKey(txKey) if has && !memR.opts.ListenOnly { peerID := memR.ids.GetIDForPeer(e.Src.ID()) memR.Logger.Debug("sending a tx in response to a want msg", "peer", peerID) diff --git a/mempool/cat/store.go b/mempool/cat/store.go index 8972186797..33ecf1a851 100644 --- a/mempool/cat/store.go +++ b/mempool/cat/store.go @@ -141,19 +141,23 @@ func (s *store) getTxsBelowPriority(priority int64) ([]*wrappedTx, int64) { } // purgeExpiredTxs removes all transactions that are older than the given height -// and time. Returns the amount of transactions that were removed -func (s *store) purgeExpiredTxs(expirationHeight int64, expirationAge time.Time) int { +// and time. Returns the purged txs and amount of transactions that were purged. +func (s *store) purgeExpiredTxs(expirationHeight int64, expirationAge time.Time) ([]*wrappedTx, int) { s.mtx.Lock() defer s.mtx.Unlock() + + var purgedTxs []*wrappedTx counter := 0 + for key, tx := range s.txs { if tx.height < expirationHeight || tx.timestamp.Before(expirationAge) { s.bytes -= tx.size() delete(s.txs, key) + purgedTxs = append(purgedTxs, tx) counter++ } } - return counter + return purgedTxs, counter } func (s *store) reset() { diff --git a/mempool/mempool.go b/mempool/mempool.go index 0f1b32d280..3d7d83d695 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -91,6 +91,15 @@ type Mempool interface { // trigger once every height when transactions are available. EnableTxsAvailable() + // GetTxByKey gets a tx by its key from the mempool. Returns the tx and a bool indicating its presence in the tx cache. + // Used in the RPC endpoint: TxStatus. + GetTxByKey(key types.TxKey) (types.Tx, bool) + + // WasRecentlyEvicted returns true if the tx was evicted from the mempool and exists in the + // evicted cache. + // Used in the RPC endpoint: TxStatus. + WasRecentlyEvicted(key types.TxKey) bool + // Size returns the number of transactions in the mempool. Size() int diff --git a/mempool/mock/mempool.go b/mempool/mock/mempool.go index 986c28ffca..d1adfab008 100644 --- a/mempool/mock/mempool.go +++ b/mempool/mock/mempool.go @@ -30,14 +30,15 @@ func (Mempool) Update( ) error { return nil } -func (Mempool) Flush() {} -func (Mempool) FlushAppConn() error { return nil } -func (Mempool) TxsAvailable() <-chan struct{} { return make(chan struct{}) } -func (Mempool) EnableTxsAvailable() {} -func (Mempool) SizeBytes() int64 { return 0 } - -func (Mempool) TxsFront() *clist.CElement { return nil } -func (Mempool) TxsWaitChan() <-chan struct{} { return nil } +func (Mempool) Flush() {} +func (Mempool) FlushAppConn() error { return nil } +func (Mempool) TxsAvailable() <-chan struct{} { return make(chan struct{}) } +func (Mempool) EnableTxsAvailable() {} +func (Mempool) SizeBytes() int64 { return 0 } +func (m Mempool) GetTxByKey(types.TxKey) (types.Tx, bool) { return nil, false } +func (m Mempool) WasRecentlyEvicted(types.TxKey) bool { return false } +func (Mempool) TxsFront() *clist.CElement { return nil } +func (Mempool) TxsWaitChan() <-chan struct{} { return nil } func (Mempool) InitWAL() error { return nil } func (Mempool) CloseWAL() {} diff --git a/mempool/v0/clist_mempool.go b/mempool/v0/clist_mempool.go index e81083cac8..e783113b0f 100644 --- a/mempool/v0/clist_mempool.go +++ b/mempool/v0/clist_mempool.go @@ -183,6 +183,21 @@ func (mem *CListMempool) TxsFront() *clist.CElement { return mem.txs.Front() } +// GetTxByKey retrieves a transaction from the mempool using its key. +func (mem *CListMempool) GetTxByKey(key types.TxKey) (types.Tx, bool) { + e, ok := mem.txsMap.Load(key) + if !ok { + return nil, false + } + memTx, ok := e.(*clist.CElement).Value.(*mempoolTx) + return memTx.tx, ok +} + +// WasRecentlyEvicted returns false consistently as this implementation does not support transaction eviction. +func (mem *CListMempool) WasRecentlyEvicted(key types.TxKey) bool { + return false +} + // TxsWaitChan returns a channel to wait on transactions. It will be closed // once the mempool is not empty (ie. the internal `mem.txs` has at least one // element) diff --git a/mempool/v0/clist_mempool_test.go b/mempool/v0/clist_mempool_test.go index 0c303714c9..f2e01e5aad 100644 --- a/mempool/v0/clist_mempool_test.go +++ b/mempool/v0/clist_mempool_test.go @@ -549,6 +549,32 @@ func TestMempool_CheckTxChecksTxSize(t *testing.T) { } } +func TestGetTxByKey(t *testing.T) { + app := kvstore.NewApplication() + cc := proxy.NewLocalClientCreator(app) + + mp, cleanup := newMempoolWithApp(cc) + defer cleanup() + + // Create a tx + tx := types.Tx([]byte{0x01}) + // Add it to the mempool + err := mp.CheckTx(tx, nil, mempool.TxInfo{}) + require.NoError(t, err) + + // Query the tx from the mempool + got, ok := mp.GetTxByKey(tx.Key()) + require.True(t, ok) + // Ensure the returned tx is the same as the one we added + require.Equal(t, tx, got) + + // Query a random tx from the mempool + randomTx, ok := mp.GetTxByKey(types.Tx([]byte{0x02}).Key()) + // Ensure the returned tx is nil + require.False(t, ok) + require.Nil(t, randomTx) +} + func TestMempoolTxsBytes(t *testing.T) { app := kvstore.NewApplication() cc := proxy.NewLocalClientCreator(app) diff --git a/mempool/v1/mempool.go b/mempool/v1/mempool.go index 958d229ec3..5a22ab4e24 100644 --- a/mempool/v1/mempool.go +++ b/mempool/v1/mempool.go @@ -57,6 +57,7 @@ type TxMempool struct { txs *clist.CList // valid transactions (passed CheckTx) txByKey map[types.TxKey]*clist.CElement txBySender map[string]*clist.CElement // for sender != "" + evictedTxs mempool.TxCache // for tracking evicted transactions traceClient trace.Tracer } @@ -86,6 +87,7 @@ func NewTxMempool( } if cfg.CacheSize > 0 { txmp.cache = mempool.NewLRUTxCache(cfg.CacheSize) + txmp.evictedTxs = mempool.NewLRUTxCache(cfg.CacheSize / 5) } for _, opt := range options { @@ -259,6 +261,24 @@ func (txmp *TxMempool) RemoveTxByKey(txKey types.TxKey) error { return txmp.removeTxByKey(txKey) } +// GetTxByKey retrieves a transaction based on the key. It returns a bool +// indicating whether transaction was found in the cache. +func (txmp *TxMempool) GetTxByKey(txKey types.TxKey) (types.Tx, bool) { + txmp.mtx.RLock() + defer txmp.mtx.RUnlock() + + if elt, ok := txmp.txByKey[txKey]; ok { + return elt.Value.(*WrappedTx).tx, true + } + return nil, false +} + +// WasRecentlyEvicted returns a bool indicating whether the transaction with +// the specified key was recently evicted and is currently within the evicted cache. +func (txmp *TxMempool) WasRecentlyEvicted(txKey types.TxKey) bool { + return txmp.evictedTxs.HasKey(txKey) +} + // removeTxByKey removes the specified transaction key from the mempool. // The caller must hold txmp.mtx excluxively. func (txmp *TxMempool) removeTxByKey(key types.TxKey) error { @@ -549,6 +569,8 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, checkTxRes *abci.Respon fmt.Sprintf("rejected valid incoming transaction; mempool is full (%X)", wtx.tx.Hash()) txmp.metrics.EvictedTxs.With(mempool.TypeLabel, mempool.EvictedNewTxFullMempool).Add(1) + // Add it to evicted transactions cache + txmp.evictedTxs.Push(wtx.tx) return } @@ -581,7 +603,8 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, checkTxRes *abci.Respon txmp.removeTxByElement(vic) txmp.cache.Remove(w.tx) txmp.metrics.EvictedTxs.With(mempool.TypeLabel, mempool.EvictedExistingTxFullMempool).Add(1) - + // Add it to evicted transactions cache + txmp.evictedTxs.Push(w.tx) // We may not need to evict all the eligible transactions. Bail out // early if we have made enough room. evictedBytes += w.Size() @@ -772,9 +795,11 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) { txmp.removeTxByElement(cur) txmp.cache.Remove(w.tx) txmp.metrics.EvictedTxs.With(mempool.TypeLabel, mempool.EvictedTxExpiredBlocks).Add(1) + txmp.evictedTxs.Push(w.tx) } else if txmp.config.TTLDuration > 0 && now.Sub(w.timestamp) > txmp.config.TTLDuration { txmp.removeTxByElement(cur) txmp.cache.Remove(w.tx) + txmp.evictedTxs.Push(w.tx) txmp.metrics.EvictedTxs.With(mempool.TypeLabel, mempool.EvictedTxExpiredTime).Add(1) } cur = next diff --git a/mempool/v1/mempool_test.go b/mempool/v1/mempool_test.go index 7623ecbc90..88f9de3d0b 100644 --- a/mempool/v1/mempool_test.go +++ b/mempool/v1/mempool_test.go @@ -108,6 +108,8 @@ func mustCheckTx(t *testing.T, txmp *TxMempool, spec string) { <-done } +// checkTxs generates a specified number of txs, checks them into the mempool, +// and returns them. func checkTxs(t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx { txs := make([]testTx, numTxs) txInfo := mempool.TxInfo{SenderID: peerID} @@ -239,7 +241,9 @@ func TestTxMempool_Eviction(t *testing.T) { mustCheckTx(t, txmp, "key1=0000=25") require.True(t, txExists("key1=0000=25")) require.False(t, txExists(bigTx)) - require.False(t, txmp.cache.Has([]byte(bigTx))) + bigTxKey := types.Tx((bigTx)).Key() + require.False(t, txmp.cache.HasKey(bigTxKey)) + require.True(t, txmp.WasRecentlyEvicted(bigTxKey)) // bigTx evicted require.Equal(t, int64(len("key1=0000=25")), txmp.SizeBytes()) // Now fill up the rest of the slots with other transactions. @@ -251,13 +255,15 @@ func TestTxMempool_Eviction(t *testing.T) { // A new transaction with low priority should be discarded. mustCheckTx(t, txmp, "key6=0005=1") require.False(t, txExists("key6=0005=1")) + require.True(t, txmp.WasRecentlyEvicted(types.Tx(("key6=0005=1")).Key())) // key6 evicted // A new transaction with higher priority should evict key5, which is the // newest of the two transactions with lowest priority. mustCheckTx(t, txmp, "key7=0006=7") - require.True(t, txExists("key7=0006=7")) // new transaction added - require.False(t, txExists("key5=0004=3")) // newest low-priority tx evicted - require.True(t, txExists("key4=0003=3")) // older low-priority tx retained + require.True(t, txExists("key7=0006=7")) // new transaction added + require.False(t, txExists("key5=0004=3")) // newest low-priority tx evicted + require.True(t, txmp.WasRecentlyEvicted(types.Tx(("key5=0004=3")).Key())) // key5 evicted + require.True(t, txExists("key4=0003=3")) // older low-priority tx retained // Another new transaction evicts the other low-priority element. mustCheckTx(t, txmp, "key8=0007=20") @@ -268,6 +274,7 @@ func TestTxMempool_Eviction(t *testing.T) { mustCheckTx(t, txmp, "key9=0008=9") require.True(t, txExists("key9=0008=9")) require.False(t, txExists("key2=0001=5")) + require.True(t, txmp.WasRecentlyEvicted(types.Tx(("key2=0001=5")).Key())) // key2 evicted // Add a transaction that requires eviction of multiple lower-priority // entries, in order to fit the size of the element. @@ -276,8 +283,11 @@ func TestTxMempool_Eviction(t *testing.T) { require.True(t, txExists("key8=0007=20")) require.True(t, txExists("key10=0123456789abcdef=11")) require.False(t, txExists("key3=0002=10")) + require.True(t, txmp.WasRecentlyEvicted(types.Tx(("key3=0002=10")).Key())) // key3 evicted require.False(t, txExists("key9=0008=9")) + require.True(t, txmp.WasRecentlyEvicted(types.Tx(("key9=0008=9")).Key())) // key9 evicted require.False(t, txExists("key7=0006=7")) + require.True(t, txmp.WasRecentlyEvicted(types.Tx(("key7=0006=7")).Key())) // key7 evicted } func TestTxMempool_Flush(t *testing.T) { @@ -431,7 +441,7 @@ func TestTxMempool_ReapMaxTxs(t *testing.T) { } func TestTxMempool_CheckTxExceedsMaxSize(t *testing.T) { - txmp := setup(t, 0) + txmp := setup(t, 1) rng := rand.New(rand.NewSource(time.Now().UnixNano())) tx := make([]byte, txmp.config.MaxTxBytes+1) @@ -580,6 +590,10 @@ func TestTxMempool_ExpiredTxs_Timestamp(t *testing.T) { // All the transactions in the original set should have been purged. for _, tx := range added1 { + // Check that they were added to the evicted cache. + evicted := txmp.WasRecentlyEvicted(tx.tx.Key()) + require.True(t, evicted) + if _, ok := txmp.txByKey[tx.tx.Key()]; ok { t.Errorf("Transaction %X should have been purged for TTL", tx.tx.Key()) } @@ -596,6 +610,23 @@ func TestTxMempool_ExpiredTxs_Timestamp(t *testing.T) { } } +func TestGetTxByKey_GetsTx(t *testing.T) { + txmp := setup(t, 500) + txs := checkTxs(t, txmp, 100, 0) + + // Should get all valid txs + for _, tx := range txs { + txKey := tx.tx.Key() + txFromMempool, exists := txmp.GetTxByKey(txKey) + require.Equal(t, tx.tx, txFromMempool) + require.True(t, exists) + } + + // Non-existent tx should return false + _, exists := txmp.GetTxByKey(types.Tx("non-existent-tx").Key()) + require.False(t, exists) +} + func TestTxMempool_ExpiredTxs_NumBlocks(t *testing.T) { txmp := setup(t, 500) txmp.height = 100 @@ -662,7 +693,7 @@ func TestTxMempool_CheckTxPostCheckError(t *testing.T) { postCheckFn := func(_ types.Tx, _ *abci.ResponseCheckTx) error { return testCase.err } - txmp := setup(t, 0, WithPostCheck(postCheckFn)) + txmp := setup(t, 1, WithPostCheck(postCheckFn)) rng := rand.New(rand.NewSource(time.Now().UnixNano())) tx := make([]byte, txmp.config.MaxTxBytes-1) _, err := rng.Read(tx) diff --git a/rpc/client/rpc_test.go b/rpc/client/rpc_test.go index 6d6c96056e..98831bc033 100644 --- a/rpc/client/rpc_test.go +++ b/rpc/client/rpc_test.go @@ -538,30 +538,52 @@ func TestBlockSearch(t *testing.T) { func TestTxStatus(t *testing.T) { c := getHTTPClient() + require := require.New(t) + mempool := node.Mempool() - // first we broadcast a few txs - var txHashes [][]byte - var txHeights []int64 - for i := 0; i < 10; i++ { - _, _, tx := MakeTxKV() + // Create a new transaction + _, _, tx := MakeTxKV() - result, err := c.BroadcastTxCommit(context.Background(), tx) - require.NoError(t, err) - txHashes = append(txHashes, result.Hash) - txHeights = append(txHeights, result.Height) - } + // Get the initial size of the mempool + initMempoolSize := mempool.Size() - require.NoError(t, client.WaitForHeight(c, 5, nil)) + // Add the transaction to the mempool + err := mempool.CheckTx(tx, nil, mempl.TxInfo{}) + require.NoError(err) - // check the status of each transaction - for i, hash := range txHashes { - result, err := c.TxStatus(context.Background(), hash) - require.NoError(t, err) + // Check if the size of the mempool has increased + require.Equal(initMempoolSize+1, mempool.Size()) - expectedIndex := int64(0) - require.Equal(t, txHeights[i], result.Height) - require.Equal(t, expectedIndex, result.Index) - } + // Get the tx status from the mempool + result, err := c.TxStatus(context.Background(), types.Tx(tx).Hash()) + require.NoError(err) + require.EqualValues(0, result.Height) + require.EqualValues(0, result.Index) + require.Equal("PENDING", result.Status) + + // Flush the mempool + mempool.Flush() + require.Equal(0, mempool.Size()) + + // Get tx status after flushing it from the mempool + result, err = c.TxStatus(context.Background(), types.Tx(tx).Hash()) + require.NoError(err) + require.EqualValues(0, result.Height) + require.EqualValues(0, result.Index) + require.Equal("UNKNOWN", result.Status) + + // Broadcast the tx again + bres, err := c.BroadcastTxCommit(context.Background(), tx) + require.NoError(err) + require.True(bres.CheckTx.IsOK()) + require.True(bres.DeliverTx.IsOK()) + + // Get the tx status + result, err = c.TxStatus(context.Background(), types.Tx(tx).Hash()) + require.NoError(err) + require.EqualValues(bres.Height, result.Height) + require.EqualValues(0, result.Index) + require.Equal("COMMITTED", result.Status) } func TestTxSearch(t *testing.T) { diff --git a/rpc/core/blocks.go b/rpc/core/blocks.go index 763d2b4ce8..d19757c688 100644 --- a/rpc/core/blocks.go +++ b/rpc/core/blocks.go @@ -178,17 +178,6 @@ func BlockByHash(ctx *rpctypes.Context, hash []byte) (*ctypes.ResultBlock, error return &ctypes.ResultBlock{BlockID: blockMeta.BlockID, Block: block}, nil } -// TxStatus retrieves the status of a transaction given its hash. It returns a ResultTxStatus -// containing the height and index of the transaction within the block. -func TxStatus(ctx *rpctypes.Context, hash []byte) (*ctypes.ResultTxStatus, error) { - env := GetEnvironment() - txInfo := env.BlockStore.LoadTxInfo(hash) - if txInfo == nil { - return &ctypes.ResultTxStatus{}, nil - } - return &ctypes.ResultTxStatus{Height: txInfo.Height, Index: txInfo.Index}, nil -} - // Commit gets block commit at a given height. // If no height is provided, it will fetch the commit for the latest block. // More: https://docs.cometbft.com/v0.34/rpc/#/Info/commit diff --git a/rpc/core/blocks_test.go b/rpc/core/blocks_test.go index 54a2cf4f64..7bddc82a88 100644 --- a/rpc/core/blocks_test.go +++ b/rpc/core/blocks_test.go @@ -126,31 +126,6 @@ func TestBlockResults(t *testing.T) { } } -func TestTxStatus(t *testing.T) { - env := &Environment{} - height := int64(50) - - blocks := randomBlocks(height) - blockStore := mockBlockStore{ - height: height, - blocks: blocks, - } - env.BlockStore = blockStore - - SetEnvironment(env) - - // Iterate over each block - for _, block := range blocks { - // Iterate over each transaction in the block - for i, tx := range block.Data.Txs { - txStatus, _ := TxStatus(&rpctypes.Context{}, tx.Hash()) - assert.Equal(t, block.Height, txStatus.Height) - assert.Equal(t, int64(i), txStatus.Index) - } - } - -} - func TestEncodeDataRootTuple(t *testing.T) { height := uint64(2) dataRoot, err := hex.DecodeString("82dc1607d84557d3579ce602a45f5872e821c36dbda7ec926dfa17ebc8d5c013") diff --git a/rpc/core/mocks/mempool.go b/rpc/core/mocks/mempool.go new file mode 100644 index 0000000000..57d750dad8 --- /dev/null +++ b/rpc/core/mocks/mempool.go @@ -0,0 +1,240 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: ./mempool/mempool.go + +// Package mock_mempool is a generated GoMock package. +package mock_mempool + +import ( + reflect "reflect" + + types "github.com/cometbft/cometbft/abci/types" + mempool "github.com/cometbft/cometbft/mempool" + types0 "github.com/cometbft/cometbft/types" + gomock "github.com/golang/mock/gomock" +) + +// MockMempool is a mock of Mempool interface. +type MockMempool struct { + ctrl *gomock.Controller + recorder *MockMempoolMockRecorder +} + +// MockMempoolMockRecorder is the mock recorder for MockMempool. +type MockMempoolMockRecorder struct { + mock *MockMempool +} + +// NewMockMempool creates a new mock instance. +func NewMockMempool(ctrl *gomock.Controller) *MockMempool { + mock := &MockMempool{ctrl: ctrl} + mock.recorder = &MockMempoolMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockMempool) EXPECT() *MockMempoolMockRecorder { + return m.recorder +} + +// CheckTx mocks base method. +func (m *MockMempool) CheckTx(tx types0.Tx, callback func(*types.Response), txInfo mempool.TxInfo) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CheckTx", tx, callback, txInfo) + ret0, _ := ret[0].(error) + return ret0 +} + +// CheckTx indicates an expected call of CheckTx. +func (mr *MockMempoolMockRecorder) CheckTx(tx, callback, txInfo interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckTx", reflect.TypeOf((*MockMempool)(nil).CheckTx), tx, callback, txInfo) +} + +// EnableTxsAvailable mocks base method. +func (m *MockMempool) EnableTxsAvailable() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "EnableTxsAvailable") +} + +// EnableTxsAvailable indicates an expected call of EnableTxsAvailable. +func (mr *MockMempoolMockRecorder) EnableTxsAvailable() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnableTxsAvailable", reflect.TypeOf((*MockMempool)(nil).EnableTxsAvailable)) +} + +// Flush mocks base method. +func (m *MockMempool) Flush() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Flush") +} + +// Flush indicates an expected call of Flush. +func (mr *MockMempoolMockRecorder) Flush() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Flush", reflect.TypeOf((*MockMempool)(nil).Flush)) +} + +// FlushAppConn mocks base method. +func (m *MockMempool) FlushAppConn() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FlushAppConn") + ret0, _ := ret[0].(error) + return ret0 +} + +// FlushAppConn indicates an expected call of FlushAppConn. +func (mr *MockMempoolMockRecorder) FlushAppConn() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FlushAppConn", reflect.TypeOf((*MockMempool)(nil).FlushAppConn)) +} + +// GetTxByKey mocks base method. +func (m *MockMempool) GetTxByKey(key types0.TxKey) (types0.Tx, bool) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetTxByKey", key) + ret0, _ := ret[0].(types0.Tx) + ret1, _ := ret[1].(bool) + return ret0, ret1 +} + +// GetTxByKey indicates an expected call of GetTxByKey. +func (mr *MockMempoolMockRecorder) GetTxByKey(key interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTxByKey", reflect.TypeOf((*MockMempool)(nil).GetTxByKey), key) +} + +// WasRecentlyEvicted mocks base method. +func (m *MockMempool) WasRecentlyEvicted(key types0.TxKey) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WasRecentlyEvicted", key) + ret0, _ := ret[0].(bool) + return ret0 +} + +// WasRecentlyEvicted indicates an expected call of WasRecentlyEvicted. +func (mr *MockMempoolMockRecorder) WasRecentlyEvicted(key interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WasRecentlyEvicted", reflect.TypeOf((*MockMempool)(nil).WasRecentlyEvicted), key) +} + +// Lock mocks base method. +func (m *MockMempool) Lock() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Lock") +} + +// Lock indicates an expected call of Lock. +func (mr *MockMempoolMockRecorder) Lock() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Lock", reflect.TypeOf((*MockMempool)(nil).Lock)) +} + +// ReapMaxBytesMaxGas mocks base method. +func (m *MockMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types0.Txs { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReapMaxBytesMaxGas", maxBytes, maxGas) + ret0, _ := ret[0].(types0.Txs) + return ret0 +} + +// ReapMaxBytesMaxGas indicates an expected call of ReapMaxBytesMaxGas. +func (mr *MockMempoolMockRecorder) ReapMaxBytesMaxGas(maxBytes, maxGas interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReapMaxBytesMaxGas", reflect.TypeOf((*MockMempool)(nil).ReapMaxBytesMaxGas), maxBytes, maxGas) +} + +// ReapMaxTxs mocks base method. +func (m *MockMempool) ReapMaxTxs(max int) types0.Txs { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReapMaxTxs", max) + ret0, _ := ret[0].(types0.Txs) + return ret0 +} + +// ReapMaxTxs indicates an expected call of ReapMaxTxs. +func (mr *MockMempoolMockRecorder) ReapMaxTxs(max interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReapMaxTxs", reflect.TypeOf((*MockMempool)(nil).ReapMaxTxs), max) +} + +// RemoveTxByKey mocks base method. +func (m *MockMempool) RemoveTxByKey(txKey types0.TxKey) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RemoveTxByKey", txKey) + ret0, _ := ret[0].(error) + return ret0 +} + +// RemoveTxByKey indicates an expected call of RemoveTxByKey. +func (mr *MockMempoolMockRecorder) RemoveTxByKey(txKey interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveTxByKey", reflect.TypeOf((*MockMempool)(nil).RemoveTxByKey), txKey) +} + +// Size mocks base method. +func (m *MockMempool) Size() int { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Size") + ret0, _ := ret[0].(int) + return ret0 +} + +// Size indicates an expected call of Size. +func (mr *MockMempoolMockRecorder) Size() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Size", reflect.TypeOf((*MockMempool)(nil).Size)) +} + +// SizeBytes mocks base method. +func (m *MockMempool) SizeBytes() int64 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SizeBytes") + ret0, _ := ret[0].(int64) + return ret0 +} + +// SizeBytes indicates an expected call of SizeBytes. +func (mr *MockMempoolMockRecorder) SizeBytes() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SizeBytes", reflect.TypeOf((*MockMempool)(nil).SizeBytes)) +} + +// TxsAvailable mocks base method. +func (m *MockMempool) TxsAvailable() <-chan struct{} { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TxsAvailable") + ret0, _ := ret[0].(<-chan struct{}) + return ret0 +} + +// TxsAvailable indicates an expected call of TxsAvailable. +func (mr *MockMempoolMockRecorder) TxsAvailable() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TxsAvailable", reflect.TypeOf((*MockMempool)(nil).TxsAvailable)) +} + +// Unlock mocks base method. +func (m *MockMempool) Unlock() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Unlock") +} + +// Unlock indicates an expected call of Unlock. +func (mr *MockMempoolMockRecorder) Unlock() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Unlock", reflect.TypeOf((*MockMempool)(nil).Unlock)) +} + +// Update mocks base method. +func (m *MockMempool) Update(blockHeight int64, blockTxs types0.Txs, deliverTxResponses []*types.ResponseDeliverTx, newPreFn mempool.PreCheckFunc, newPostFn mempool.PostCheckFunc) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Update", blockHeight, blockTxs, deliverTxResponses, newPreFn, newPostFn) + ret0, _ := ret[0].(error) + return ret0 +} + +// Update indicates an expected call of Update. +func (mr *MockMempoolMockRecorder) Update(blockHeight, blockTxs, deliverTxResponses, newPreFn, newPostFn interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Update", reflect.TypeOf((*MockMempool)(nil).Update), blockHeight, blockTxs, deliverTxResponses, newPreFn, newPostFn) +} diff --git a/rpc/core/tx.go b/rpc/core/tx.go index 01d4cbc0b4..a25cd477cb 100644 --- a/rpc/core/tx.go +++ b/rpc/core/tx.go @@ -17,6 +17,13 @@ import ( "github.com/cometbft/cometbft/types" ) +const ( + txStatusUnknown string = "UNKNOWN" + txStatusPending string = "PENDING" + txStatusEvicted string = "EVICTED" + txStatusCommitted string = "COMMITTED" +) + // Tx allows you to query the transaction results. `nil` could mean the // transaction is in the mempool, invalidated, or was not sent in the first // place. @@ -214,6 +221,40 @@ func ProveShares( return &ctypes.ResultShareProof{Proof: shareProof}, nil } +// TxStatus retrieves the status of a transaction given its hash. It returns a ResultTxStatus +// containing the height and index of the transaction within the block(if committed) +// or whether the transaction is pending, evicted from the mempool, or otherwise unknown. +func TxStatus(ctx *rpctypes.Context, hash []byte) (*ctypes.ResultTxStatus, error) { + env := GetEnvironment() + + // Check if the tx has been committed + txInfo := env.BlockStore.LoadTxInfo(hash) + if txInfo != nil { + return &ctypes.ResultTxStatus{Height: txInfo.Height, Index: txInfo.Index, Status: txStatusCommitted}, nil + } + + // Get the tx key from the hash + txKey, err := types.TxKeyFromBytes(hash) + if err != nil { + return nil, fmt.Errorf("failed to get tx key from hash: %v", err) + } + + // Check if the tx is in the mempool + txInMempool, ok := env.Mempool.GetTxByKey(txKey) + if txInMempool != nil && ok { + return &ctypes.ResultTxStatus{Status: txStatusPending}, nil + } + + // Check if the tx is evicted + isEvicted := env.Mempool.WasRecentlyEvicted(txKey) + if isEvicted { + return &ctypes.ResultTxStatus{Status: txStatusEvicted}, nil + } + + // If the tx is not in the mempool, evicted, or committed, return unknown + return &ctypes.ResultTxStatus{Status: txStatusUnknown}, nil +} + func loadRawBlock(bs state.BlockStore, height int64) ([]byte, error) { var blockMeta = bs.LoadBlockMeta(height) if blockMeta == nil { diff --git a/rpc/core/tx_status_test.go b/rpc/core/tx_status_test.go new file mode 100644 index 0000000000..d91afd468c --- /dev/null +++ b/rpc/core/tx_status_test.go @@ -0,0 +1,133 @@ +package core + +import ( + "testing" + + mock "github.com/cometbft/cometbft/rpc/core/mocks" + rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" + types "github.com/cometbft/cometbft/types" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" +) + +// TestTxStatus tests the TxStatus function in the RPC core +// making sure it fetches the correct status for each transaction. +func TestTxStatus(t *testing.T) { + // Create a controller + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + // Create a new environment + env := &Environment{} + + // Create a new mempool and block store + mempool := mock.NewMockMempool(ctrl) + env.Mempool = mempool + blockStore := mockBlockStore{ + height: 0, + blocks: nil, + } + env.BlockStore = blockStore + SetEnvironment(env) + + tests := []struct { + name string + setup func(*Environment, []types.Tx) + expectedStatus string + }{ + { + name: "Committed", + setup: func(env *Environment, txs []types.Tx) { + height := int64(5) + blocks := randomBlocks(height) + blockStore = mockBlockStore{ + height: height, + blocks: blocks, + } + env.BlockStore = blockStore + }, + expectedStatus: "COMMITTED", + }, + { + name: "Unknown", + setup: func(env *Environment, txs []types.Tx) { + env.BlockStore = mockBlockStore{ + height: 0, + blocks: nil, + } + for _, tx := range txs { + // Set GetTxByKey to return nil and false for all transactions + mempool.EXPECT().GetTxByKey(tx.Key()).Return(nil, false).AnyTimes() + // Set WasRecentlyEvicted to return false for all transactions + mempool.EXPECT().WasRecentlyEvicted(tx.Key()).Return(false).AnyTimes() + } + }, + + expectedStatus: "UNKNOWN", + }, + { + name: "Pending", + setup: func(env *Environment, txs []types.Tx) { + env.BlockStore = mockBlockStore{ + height: 0, + blocks: nil, + } + // Reset the mempool + mempool = mock.NewMockMempool(ctrl) + env.Mempool = mempool + + for _, tx := range txs { + // Set GetTxByKey to return the transaction and true for all transactions + mempool.EXPECT().GetTxByKey(tx.Key()).Return(tx, true).AnyTimes() + } + }, + expectedStatus: "PENDING", + }, + { + name: "Evicted", + setup: func(env *Environment, txs []types.Tx) { + env.BlockStore = mockBlockStore{ + height: 0, + blocks: nil, + } + // Reset the mempool + mempool = mock.NewMockMempool(ctrl) + env.Mempool = mempool + + for _, tx := range txs { + // Set GetTxByKey to return nil and false for all transactions + mempool.EXPECT().GetTxByKey(tx.Key()).Return(nil, false).AnyTimes() + // Set WasRecentlyEvicted to return true for all transactions + mempool.EXPECT().WasRecentlyEvicted(tx.Key()).Return(true).AnyTimes() + } + }, + expectedStatus: "EVICTED", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + height := int64(2) + // Create a set of transactions on the specified height + txs := makeTxs(height) + + tt.setup(env, txs) + + // Check the status of each transaction + for i, tx := range txs { + txStatus, _ := TxStatus(&rpctypes.Context{}, tx.Hash()) + assert.Equal(t, tt.expectedStatus, txStatus.Status) + + // Check the height and index of transactions that are committed + if blockStore.height > 0 && tt.expectedStatus == "COMMITTED" { + txStatus, _ := TxStatus(&rpctypes.Context{}, tx.Hash()) + + assert.Equal(t, txStatus.Status, tt.expectedStatus) + assert.Equal(t, height, txStatus.Height) + assert.Equal(t, int64(i), txStatus.Index) + } + } + + }) + } +} diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index bacf149cd1..c1d7f0900b 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -62,8 +62,9 @@ type ResultCommit struct { // ResultTxStatus contains info to locate a tx in a committed block. type ResultTxStatus struct { - Height int64 `json:"height"` - Index int64 `json:"index"` + Height int64 `json:"height"` + Index int64 `json:"index"` + Status string `json:"status"` } // ABCI results from a block diff --git a/state/mocks/block_store.go b/state/mocks/block_store.go index 98f47dc320..21fe049f5f 100644 --- a/state/mocks/block_store.go +++ b/state/mocks/block_store.go @@ -3,9 +3,9 @@ package mocks import ( - mock "github.com/stretchr/testify/mock" cmtstore "github.com/cometbft/cometbft/proto/tendermint/store" types "github.com/cometbft/cometbft/types" + mock "github.com/stretchr/testify/mock" ) // BlockStore is an autogenerated mock type for the BlockStore type diff --git a/test/maverick/consensus/replay_stubs.go b/test/maverick/consensus/replay_stubs.go index d8ea335c12..9fc3b45013 100644 --- a/test/maverick/consensus/replay_stubs.go +++ b/test/maverick/consensus/replay_stubs.go @@ -40,6 +40,9 @@ func (emptyMempool) TxsAvailable() <-chan struct{} { return make(chan struct{}) func (emptyMempool) EnableTxsAvailable() {} func (emptyMempool) TxsBytes() int64 { return 0 } +func (emptyMempool) GetTxByKey(txKey types.TxKey) (types.Tx, bool) { return nil, false } +func (emptyMempool) WasRecentlyEvicted(txKey types.TxKey) bool { return false } + func (emptyMempool) TxsFront() *clist.CElement { return nil } func (emptyMempool) TxsWaitChan() <-chan struct{} { return nil } From 00620f0ef69d00726656f343b435f0fd86669fc8 Mon Sep 17 00:00:00 2001 From: Rootul P Date: Fri, 21 Jun 2024 09:34:27 -0600 Subject: [PATCH 4/8] ci: add ability to backport to main (#1398) ## Motivation I just merged two PRs to v0.34.x-celestia that should probably be backported to `main` if we're still maintaining `main`. --- .github/mergify.yml | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/.github/mergify.yml b/.github/mergify.yml index c732cce75a..bd1f0f4e04 100644 --- a/.github/mergify.yml +++ b/.github/mergify.yml @@ -27,3 +27,12 @@ pull_request_rules: backport: branches: - v0.34.x-celestia + + - name: Backport to main + conditions: + - label=S:backport-to-main + - merged + actions: + backport: + branches: + - main From d8a17b50fe52b5685c21d205fe79da5a2d96b75a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?nina=20/=20=E1=83=9C=E1=83=98=E1=83=9C=E1=83=90?= Date: Mon, 24 Jun 2024 17:59:54 +0200 Subject: [PATCH 5/8] feat: add response code in TxInfo (#1399) ## Description Fixes #1397 --------- Co-authored-by: Rootul P --- consensus/replay_file.go | 2 +- consensus/replay_test.go | 14 ++++--- consensus/wal_generator.go | 2 +- node/node.go | 1 + proto/tendermint/store/types.pb.go | 62 ++++++++++++++++++++++++------ proto/tendermint/store/types.proto | 8 +++- rpc/core/blocks_test.go | 6 ++- rpc/core/tx.go | 2 +- rpc/core/tx_status_test.go | 3 +- rpc/core/types/responses.go | 10 +++-- state/execution.go | 29 ++++++++++++++ state/execution_test.go | 27 +++++++++++++ state/mocks/block_store.go | 37 +++++++++++++++--- state/services.go | 1 + store/store.go | 16 ++++---- store/store_test.go | 30 ++++++++++++--- test/maverick/consensus/replay.go | 2 +- 17 files changed, 204 insertions(+), 48 deletions(-) diff --git a/consensus/replay_file.go b/consensus/replay_file.go index b0221becfa..e9d5aabcce 100644 --- a/consensus/replay_file.go +++ b/consensus/replay_file.go @@ -330,7 +330,7 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo } mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{} - blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool) + blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, sm.WithBlockStore(blockStore)) consensusState := NewState(csConfig, state.Copy(), blockExec, blockStore, mempool, evpool) diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 87acfc235b..76153b442c 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -58,7 +58,7 @@ func TestMain(m *testing.M) { // the `Handshake Tests` are for failures in applying the block. // With the help of the WAL, we can recover from it all! -//------------------------------------------------------------------------------------------ +// ------------------------------------------------------------------------------------------ // WAL Tests // TODO: It would be better to verify explicitly which states we can recover from without the wal @@ -320,7 +320,7 @@ var ( sim testSim ) -//--------------------------------------- +// --------------------------------------- // Test handshake/replay // 0 - all synced up @@ -1041,7 +1041,7 @@ func (app *badApp) Commit() abci.ResponseCommit { panic("either allHashesAreWrong or onlyLastHashIsWrong must be set") } -//-------------------------- +// -------------------------- // utils for making blocks func makeBlockchainFromWAL(wal WAL) ([]*types.Block, []*types.Commit, error) { @@ -1187,8 +1187,9 @@ func stateAndStore( return stateDB, state, store } -//---------------------------------- +// ---------------------------------- // mock block store +var _ sm.BlockStore = &mockBlockStore{} type mockBlockStore struct { config *cfg.Config @@ -1222,6 +1223,9 @@ func (bs *mockBlockStore) LoadBlockMeta(height int64) *types.BlockMeta { func (bs *mockBlockStore) LoadBlockPart(height int64, index int) *types.Part { return nil } func (bs *mockBlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) { } +func (bs *mockBlockStore) SaveTxInfo(block *types.Block, txResponseCode []uint32) error { + return nil +} func (bs *mockBlockStore) LoadTxInfo(hash []byte) *cmtstore.TxInfo { return &cmtstore.TxInfo{} } func (bs *mockBlockStore) LoadBlockCommit(height int64) *types.Commit { @@ -1245,7 +1249,7 @@ func (bs *mockBlockStore) PruneBlocks(height int64) (uint64, error) { func (bs *mockBlockStore) DeleteLatestBlock() error { return nil } -//--------------------------------------- +// --------------------------------------- // Test handshake/init chain func TestHandshakeUpdatesValidators(t *testing.T) { diff --git a/consensus/wal_generator.go b/consensus/wal_generator.go index 54529182a0..d613ac8130 100644 --- a/consensus/wal_generator.go +++ b/consensus/wal_generator.go @@ -84,7 +84,7 @@ func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) { }) mempool := emptyMempool{} evpool := sm.EmptyEvidencePool{} - blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool) + blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, sm.WithBlockStore(blockStore)) consensusState := NewState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool) consensusState.SetLogger(logger) consensusState.SetEventBus(eventBus) diff --git a/node/node.go b/node/node.go index 3e0731372a..58d6753a94 100644 --- a/node/node.go +++ b/node/node.go @@ -886,6 +886,7 @@ func NewNode(config *cfg.Config, mempool, evidencePool, sm.BlockExecutorWithMetrics(smMetrics), + sm.WithBlockStore(blockStore), ) // Make BlockchainReactor. Don't start fast sync if we're doing a state sync first. diff --git a/proto/tendermint/store/types.pb.go b/proto/tendermint/store/types.pb.go index edc2b96a77..5383b152b1 100644 --- a/proto/tendermint/store/types.pb.go +++ b/proto/tendermint/store/types.pb.go @@ -76,8 +76,11 @@ func (m *BlockStoreState) GetHeight() int64 { // TxInfo describes the location of a tx inside a committed block. type TxInfo struct { - Height int64 `protobuf:"varint,1,opt,name=height,proto3" json:"height,omitempty"` - Index int64 `protobuf:"varint,2,opt,name=index,proto3" json:"index,omitempty"` + Height int64 `protobuf:"varint,1,opt,name=height,proto3" json:"height,omitempty"` + Index uint32 `protobuf:"varint,2,opt,name=index,proto3" json:"index,omitempty"` + // The response code of executing the tx. 0 means + // successfully executed, all others are error codes. + Code uint32 `protobuf:"varint,3,opt,name=code,proto3" json:"code,omitempty"` } func (m *TxInfo) Reset() { *m = TxInfo{} } @@ -120,13 +123,20 @@ func (m *TxInfo) GetHeight() int64 { return 0 } -func (m *TxInfo) GetIndex() int64 { +func (m *TxInfo) GetIndex() uint32 { if m != nil { return m.Index } return 0 } +func (m *TxInfo) GetCode() uint32 { + if m != nil { + return m.Code + } + return 0 +} + func init() { proto.RegisterType((*BlockStoreState)(nil), "tendermint.store.BlockStoreState") proto.RegisterType((*TxInfo)(nil), "tendermint.store.TxInfo") @@ -135,20 +145,21 @@ func init() { func init() { proto.RegisterFile("tendermint/store/types.proto", fileDescriptor_ff9e53a0a74267f7) } var fileDescriptor_ff9e53a0a74267f7 = []byte{ - // 199 bytes of a gzipped FileDescriptorProto + // 212 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x29, 0x49, 0xcd, 0x4b, 0x49, 0x2d, 0xca, 0xcd, 0xcc, 0x2b, 0xd1, 0x2f, 0x2e, 0xc9, 0x2f, 0x4a, 0xd5, 0x2f, 0xa9, 0x2c, 0x48, 0x2d, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x12, 0x40, 0xc8, 0xea, 0x81, 0x65, 0x95, 0x6c, 0xb9, 0xf8, 0x9d, 0x72, 0xf2, 0x93, 0xb3, 0x83, 0x41, 0xbc, 0xe0, 0x92, 0xc4, 0x92, 0x54, 0x21, 0x21, 0x2e, 0x96, 0xa4, 0xc4, 0xe2, 0x54, 0x09, 0x46, 0x05, 0x46, 0x0d, 0xe6, 0x20, 0x30, 0x5b, 0x48, 0x8c, 0x8b, 0x2d, 0x23, 0x35, 0x33, 0x3d, 0xa3, 0x44, 0x82, 0x09, 0x2c, 0x0a, 0xe5, - 0x29, 0x99, 0x71, 0xb1, 0x85, 0x54, 0x78, 0xe6, 0xa5, 0xe5, 0x23, 0xa9, 0x60, 0x44, 0x56, 0x21, - 0x24, 0xc2, 0xc5, 0x9a, 0x99, 0x97, 0x92, 0x5a, 0x01, 0xd5, 0x08, 0xe1, 0x38, 0xf9, 0x9e, 0x78, - 0x24, 0xc7, 0x78, 0xe1, 0x91, 0x1c, 0xe3, 0x83, 0x47, 0x72, 0x8c, 0x13, 0x1e, 0xcb, 0x31, 0x5c, - 0x78, 0x2c, 0xc7, 0x70, 0xe3, 0xb1, 0x1c, 0x43, 0x94, 0x71, 0x7a, 0x66, 0x49, 0x46, 0x69, 0x92, - 0x5e, 0x72, 0x7e, 0xae, 0x7e, 0x72, 0x7e, 0x6e, 0x6a, 0x49, 0x52, 0x5a, 0x09, 0x82, 0x01, 0xf6, - 0x86, 0x3e, 0xba, 0x1f, 0x93, 0xd8, 0xc0, 0xe2, 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0xc5, - 0x4d, 0x38, 0x3d, 0xfe, 0x00, 0x00, 0x00, + 0x29, 0x79, 0x71, 0xb1, 0x85, 0x54, 0x78, 0xe6, 0xa5, 0xe5, 0x23, 0xa9, 0x60, 0x44, 0x56, 0x21, + 0x24, 0xc2, 0xc5, 0x9a, 0x99, 0x97, 0x92, 0x5a, 0x01, 0xd6, 0xc8, 0x1b, 0x04, 0xe1, 0x80, 0xec, + 0x48, 0xce, 0x4f, 0x49, 0x95, 0x60, 0x06, 0x0b, 0x82, 0xd9, 0x4e, 0xbe, 0x27, 0x1e, 0xc9, 0x31, + 0x5e, 0x78, 0x24, 0xc7, 0xf8, 0xe0, 0x91, 0x1c, 0xe3, 0x84, 0xc7, 0x72, 0x0c, 0x17, 0x1e, 0xcb, + 0x31, 0xdc, 0x78, 0x2c, 0xc7, 0x10, 0x65, 0x9c, 0x9e, 0x59, 0x92, 0x51, 0x9a, 0xa4, 0x97, 0x9c, + 0x9f, 0xab, 0x9f, 0x9c, 0x9f, 0x9b, 0x5a, 0x92, 0x94, 0x56, 0x82, 0x60, 0x80, 0xbd, 0xa6, 0x8f, + 0xee, 0xef, 0x24, 0x36, 0xb0, 0xb8, 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0x29, 0x4c, 0x3e, 0x41, + 0x12, 0x01, 0x00, 0x00, } func (m *BlockStoreState) Marshal() (dAtA []byte, err error) { @@ -204,6 +215,11 @@ func (m *TxInfo) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.Code != 0 { + i = encodeVarintTypes(dAtA, i, uint64(m.Code)) + i-- + dAtA[i] = 0x18 + } if m.Index != 0 { i = encodeVarintTypes(dAtA, i, uint64(m.Index)) i-- @@ -255,6 +271,9 @@ func (m *TxInfo) Size() (n int) { if m.Index != 0 { n += 1 + sovTypes(uint64(m.Index)) } + if m.Code != 0 { + n += 1 + sovTypes(uint64(m.Code)) + } return n } @@ -414,7 +433,26 @@ func (m *TxInfo) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.Index |= int64(b&0x7F) << shift + m.Index |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Code", wireType) + } + m.Code = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Code |= uint32(b&0x7F) << shift if b < 0x80 { break } diff --git a/proto/tendermint/store/types.proto b/proto/tendermint/store/types.proto index 33e5401060..843667455b 100644 --- a/proto/tendermint/store/types.proto +++ b/proto/tendermint/store/types.proto @@ -8,8 +8,12 @@ message BlockStoreState { int64 height = 2; } -// TxInfo describes the location of a tx inside a committed block. +// TxInfo describes the location of a tx inside a committed block +// as well as the result of executing the transaction. message TxInfo { int64 height = 1; - int64 index = 2; + uint32 index = 2; + // The response code of executing the tx. 0 means + // successfully executed, all others are error codes. + uint32 code = 3; } diff --git a/rpc/core/blocks_test.go b/rpc/core/blocks_test.go index 7bddc82a88..11542e5e33 100644 --- a/rpc/core/blocks_test.go +++ b/rpc/core/blocks_test.go @@ -301,6 +301,9 @@ func (mockBlockStore) LoadSeenCommit(height int64) *types.Commit { retur func (mockBlockStore) PruneBlocks(height int64) (uint64, error) { return 0, nil } func (mockBlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) { } +func (mockBlockStore) SaveTxInfo(block *types.Block, txResponseCode []uint32) error { + return nil +} func (mockBlockStore) DeleteLatestBlock() error { return nil } func (store mockBlockStore) LoadBlockMeta(height int64) *types.BlockMeta { @@ -328,7 +331,8 @@ func (store mockBlockStore) LoadTxInfo(hash []byte) *cmtstore.TxInfo { if bytes.Equal(tx.Hash(), hash) { return &cmtstore.TxInfo{ Height: block.Header.Height, - Index: int64(i), + Index: uint32(i), + Code: uint32(0), } } } diff --git a/rpc/core/tx.go b/rpc/core/tx.go index a25cd477cb..97f415c029 100644 --- a/rpc/core/tx.go +++ b/rpc/core/tx.go @@ -230,7 +230,7 @@ func TxStatus(ctx *rpctypes.Context, hash []byte) (*ctypes.ResultTxStatus, error // Check if the tx has been committed txInfo := env.BlockStore.LoadTxInfo(hash) if txInfo != nil { - return &ctypes.ResultTxStatus{Height: txInfo.Height, Index: txInfo.Index, Status: txStatusCommitted}, nil + return &ctypes.ResultTxStatus{Height: txInfo.Height, Index: txInfo.Index, ExecutionCode: txInfo.Code, Status: txStatusCommitted}, nil } // Get the tx key from the hash diff --git a/rpc/core/tx_status_test.go b/rpc/core/tx_status_test.go index d91afd468c..836fab5da7 100644 --- a/rpc/core/tx_status_test.go +++ b/rpc/core/tx_status_test.go @@ -124,7 +124,8 @@ func TestTxStatus(t *testing.T) { assert.Equal(t, txStatus.Status, tt.expectedStatus) assert.Equal(t, height, txStatus.Height) - assert.Equal(t, int64(i), txStatus.Index) + assert.Equal(t, uint32(i), txStatus.Index) + assert.Equal(t, uint32(0), txStatus.ExecutionCode) } } diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index c1d7f0900b..c5b858cfc1 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -60,11 +60,13 @@ type ResultCommit struct { CanonicalCommit bool `json:"canonical"` } -// ResultTxStatus contains info to locate a tx in a committed block. +// ResultTxStatus represents the status of a transaction during its life cycle. +// It contains info to locate a tx in a committed block as well as its execution code and status. type ResultTxStatus struct { - Height int64 `json:"height"` - Index int64 `json:"index"` - Status string `json:"status"` + Height int64 `json:"height"` + Index uint32 `json:"index"` + ExecutionCode uint32 `json:"execution_code"` + Status string `json:"status"` } // ABCI results from a block diff --git a/state/execution.go b/state/execution.go index 56bdc7c742..99d35b13d7 100644 --- a/state/execution.go +++ b/state/execution.go @@ -26,6 +26,9 @@ type BlockExecutor struct { // save state, validators, consensus params, abci responses here store Store + // blockStore is optional and used to store txInfo + blockStore BlockStore + // execute the app against this proxyApp proxy.AppConnConsensus @@ -50,6 +53,13 @@ func BlockExecutorWithMetrics(metrics *Metrics) BlockExecutorOption { } } +// WithBlockStore optionally stores txInfo +func WithBlockStore(blockStore BlockStore) BlockExecutorOption { + return func(blockExec *BlockExecutor) { + blockExec.blockStore = blockStore + } +} + // NewBlockExecutor returns a new BlockExecutor with a NopEventBus. // Call SetEventBus to provide one. func NewBlockExecutor( @@ -240,6 +250,16 @@ func (blockExec *BlockExecutor) ApplyBlock( return state, 0, err } + // Save indexing info of the transaction. + // This needs to be done prior to saving state + // for correct crash recovery + if blockExec.blockStore != nil { + respCodes := getResponseCodes(abciResponses.DeliverTxs) + if err := blockExec.blockStore.SaveTxInfo(block, respCodes); err != nil { + return state, 0, err + } + } + fail.Fail() // XXX // validate the validator updates and convert to CometBFT types @@ -681,3 +701,12 @@ func ExecCommitBlock( // ResponseCommit has no error or log, just data return res.Data, nil } + +// getResponseCodes gets response codes from a list of ResponseDeliverTx. +func getResponseCodes(responses []*abci.ResponseDeliverTx) []uint32 { + responseCodes := make([]uint32, len(responses)) + for i, response := range responses { + responseCodes[i] = response.Code + } + return responseCodes +} diff --git a/state/execution_test.go b/state/execution_test.go index 4f03832a09..c16fc3bcf1 100644 --- a/state/execution_test.go +++ b/state/execution_test.go @@ -71,6 +71,33 @@ func TestApplyBlock(t *testing.T) { assert.EqualValues(t, 1, state.Version.Consensus.App, "App version wasn't updated") } +func TestApplyBlockWithBlockStore(t *testing.T) { + app := &testApp{} + cc := proxy.NewLocalClientCreator(app) + proxyApp := proxy.NewAppConns(cc) + err := proxyApp.Start() + require.Nil(t, err) + defer proxyApp.Stop() //nolint:errcheck // ignore for tests + blockStore := mocks.NewBlockStore(t) + + state, stateDB, _ := makeState(1, 1) + stateStore := sm.NewStore(stateDB, sm.StoreOptions{ + DiscardABCIResponses: false, + }) + + blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), + mmock.Mempool{}, sm.EmptyEvidencePool{}, sm.WithBlockStore(blockStore)) + + block := makeBlock(state, 1) + blockID := types.BlockID{Hash: block.Hash(), PartSetHeader: block.MakePartSet(testPartSize).Header()} + + // Check that SaveTxInfo is called with correct arguments + blockStore.On("SaveTxInfo", block, mock.AnythingOfType("[]uint32")).Return(nil) + + _, _, err = blockExec.ApplyBlock(state, blockID, block, nil) + require.Nil(t, err) +} + // TestBeginBlockValidators ensures we send absent validators list. func TestBeginBlockValidators(t *testing.T) { app := &testApp{} diff --git a/state/mocks/block_store.go b/state/mocks/block_store.go index 21fe049f5f..2b883ff3f8 100644 --- a/state/mocks/block_store.go +++ b/state/mocks/block_store.go @@ -3,9 +3,9 @@ package mocks import ( - cmtstore "github.com/cometbft/cometbft/proto/tendermint/store" - types "github.com/cometbft/cometbft/types" mock "github.com/stretchr/testify/mock" + types "github.com/cometbft/cometbft/types" + "github.com/cometbft/cometbft/proto/tendermint/store" ) // BlockStore is an autogenerated mock type for the BlockStore type @@ -87,6 +87,22 @@ func (_m *BlockStore) LoadBlock(height int64) *types.Block { return r0 } +// LoadTxInfo provides a mock function with given fields: hash +func (_m *BlockStore) LoadTxInfo(hash []byte) *store.TxInfo { + ret := _m.Called(hash) + + var r0 *store.TxInfo + if rf, ok := ret.Get(0).(func([]byte) *store.TxInfo); ok { + r0 = rf(hash) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*store.TxInfo) + } + } + + return r0 +} + // LoadBlockByHash provides a mock function with given fields: hash func (_m *BlockStore) LoadBlockByHash(hash []byte) *types.Block { ret := _m.Called(hash) @@ -204,6 +220,20 @@ func (_m *BlockStore) PruneBlocks(height int64) (uint64, error) { return r0, r1 } +// SaveTxInfo provides a mock function with given fields: block, txResponseCode +func (_m *BlockStore) SaveTxInfo(block *types.Block, txResponseCode []uint32) error { + ret := _m.Called(block, txResponseCode) + + var r0 error + if rf, ok := ret.Get(0).(func(*types.Block, []uint32) error); ok { + r0 = rf(block, txResponseCode) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // SaveBlock provides a mock function with given fields: block, blockParts, seenCommit func (_m *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) { _m.Called(block, blockParts, seenCommit) @@ -223,9 +253,6 @@ func (_m *BlockStore) Size() int64 { return r0 } -func (_m BlockStore) LoadTxInfo(txHash []byte) *cmtstore.TxInfo { - return &cmtstore.TxInfo{} -} type mockConstructorTestingTNewBlockStore interface { mock.TestingT diff --git a/state/services.go b/state/services.go index 63b1b6850b..a124a22cd2 100644 --- a/state/services.go +++ b/state/services.go @@ -26,6 +26,7 @@ type BlockStore interface { LoadBlock(height int64) *types.Block SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) + SaveTxInfo(block *types.Block, txResponseCode []uint32) error PruneBlocks(height int64) (uint64, error) diff --git a/store/store.go b/store/store.go index 80d6cd6467..90453a6d2a 100644 --- a/store/store.go +++ b/store/store.go @@ -407,11 +407,6 @@ func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, s panic(err) } - // Save Txs from the block - if err := bs.SaveTxInfo(block); err != nil { - panic(err) - } - // Done! bs.mtx.Lock() bs.height = height @@ -455,8 +450,12 @@ func (bs *BlockStore) SaveSeenCommit(height int64, seenCommit *types.Commit) err return bs.db.Set(calcSeenCommitKey(height), seenCommitBytes) } -// SaveTxInfo gets Tx hashes from the block converts them to TxInfo and persists them to the db. -func (bs *BlockStore) SaveTxInfo(block *types.Block) error { +// SaveTxInfo indexes the txs from the block with the given response codes from execution. +func (bs *BlockStore) SaveTxInfo(block *types.Block, txResponseCodes []uint32) error { + if len(txResponseCodes) != len(block.Txs) { + return fmt.Errorf("txResponseCodes length mismatch with block txs length") + } + // Create a new batch batch := bs.db.NewBatch() @@ -464,7 +463,8 @@ func (bs *BlockStore) SaveTxInfo(block *types.Block) error { for i, tx := range block.Txs { txInfo := cmtstore.TxInfo{ Height: block.Height, - Index: int64(i), + Index: uint32(i), + Code: txResponseCodes[i], } txInfoBytes, err := proto.Marshal(&txInfo) if err != nil { diff --git a/store/store_test.go b/store/store_test.go index af140660f5..2a55e6361f 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -169,7 +169,6 @@ func TestMain(m *testing.M) { } // TODO: This test should be simplified ... - func TestBlockStoreSaveLoadBlock(t *testing.T) { state, bs, cleanup := makeStateAndBlockStore(log.NewTMLogger(new(bytes.Buffer))) defer cleanup() @@ -206,7 +205,6 @@ func TestBlockStoreSaveLoadBlock(t *testing.T) { } // End of setup, test data - commitAtH10 := makeTestCommit(10, cmttime.Now()) tuples := []struct { block *types.Block @@ -379,17 +377,32 @@ func TestBlockStoreSaveLoadBlock(t *testing.T) { } } -func TestSaveBlockIndexesTxs(t *testing.T) { +func TestSaveTxInfo(t *testing.T) { // Create a state and a block store state, blockStore, cleanup := makeStateAndBlockStore(log.NewTMLogger(new(bytes.Buffer))) defer cleanup() // Create 1000 blocks + txResponseCodes := make([]uint32, len(block.Txs)) for h := int64(1); h <= 1000; h++ { block := makeBlock(h, state, new(types.Commit)) partSet := block.MakePartSet(2) seenCommit := makeTestCommit(h, cmttime.Now()) blockStore.SaveBlock(block, partSet, seenCommit) + + // Set the response codes for the transactions + for i := range block.Txs { + // If even set it to 0 + if i%2 == 0 { + txResponseCodes[i] = 0 + } else { + txResponseCodes[i] = 1 + } + } + + // Save the tx info + err := blockStore.SaveTxInfo(block, txResponseCodes) + require.NoError(t, err) } // Get the blocks from blockstore up to the height @@ -399,7 +412,8 @@ func TestSaveBlockIndexesTxs(t *testing.T) { for i, tx := range block.Txs { txInfo := blockStore.LoadTxInfo(tx.Hash()) require.Equal(t, block.Height, txInfo.Height) - require.Equal(t, int64(i), txInfo.Index) + require.Equal(t, uint32(i), txInfo.Index) + require.Equal(t, txResponseCodes[i], txInfo.Code) } } @@ -410,7 +424,8 @@ func TestSaveBlockIndexesTxs(t *testing.T) { require.Equal(t, block.Height, txInfo.Height) require.Equal(t, block.Height, int64(777)) require.Equal(t, txInfo.Height, int64(777)) - require.Equal(t, int64(5), txInfo.Index) + require.Equal(t, uint32(1), txInfo.Code) + require.Equal(t, uint32(5), txInfo.Index) } func TestLoadBaseMeta(t *testing.T) { @@ -585,6 +600,8 @@ func TestPruneBlocksPrunesTxs(t *testing.T) { partSet := block.MakePartSet(2) seenCommit := makeTestCommit(h, cmttime.Now()) blockStore.SaveBlock(block, partSet, seenCommit) + err := blockStore.SaveTxInfo(block, make([]uint32, len(block.Txs))) + require.NoError(t, err) for _, tx := range block.Txs { indexedTxHashes = append(indexedTxHashes, tx.Hash()) } @@ -619,7 +636,8 @@ func TestPruneBlocksPrunesTxs(t *testing.T) { txInfo := blockStore.LoadTxInfo(tx.Hash()) require.NoError(t, err) require.Equal(t, h, txInfo.Height) - require.Equal(t, int64(i), txInfo.Index) + require.Equal(t, uint32(i), txInfo.Index) + require.Equal(t, uint32(0), txInfo.Code) } } } diff --git a/test/maverick/consensus/replay.go b/test/maverick/consensus/replay.go index b65816d4c1..79e126cd9c 100644 --- a/test/maverick/consensus/replay.go +++ b/test/maverick/consensus/replay.go @@ -498,7 +498,7 @@ func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.Ap // Use stubs for both mempool and evidence pool since no transactions nor // evidence are needed here - block already exists. - blockExec := sm.NewBlockExecutor(h.stateStore, h.logger, proxyApp, emptyMempool{}, sm.EmptyEvidencePool{}) + blockExec := sm.NewBlockExecutor(h.stateStore, h.logger, proxyApp, emptyMempool{}, sm.EmptyEvidencePool{}, sm.WithBlockStore(h.store)) blockExec.SetEventBus(h.eventBus) var err error From 4d17212747d2fb5fd20723c6599a2c2990ddcb83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?nina=20/=20=E1=83=9C=E1=83=98=E1=83=9C=E1=83=90?= Date: Tue, 2 Jul 2024 20:09:10 +0200 Subject: [PATCH 6/8] feat: add blockstore in block executor initialisations (#1408) ## Description Adds block store in block executor initialisations --- consensus/replay.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consensus/replay.go b/consensus/replay.go index b78ac05463..fcdddc6bd2 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -495,7 +495,7 @@ func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.Ap // Use stubs for both mempool and evidence pool since no transactions nor // evidence are needed here - block already exists. - blockExec := sm.NewBlockExecutor(h.stateStore, h.logger, proxyApp, emptyMempool{}, sm.EmptyEvidencePool{}) + blockExec := sm.NewBlockExecutor(h.stateStore, h.logger, proxyApp, emptyMempool{}, sm.EmptyEvidencePool{}, sm.WithBlockStore(h.store)) blockExec.SetEventBus(h.eventBus) var err error From 3d4e4aeac899b446c3dfc2ed4c57502f6ec20cf5 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 17 Jul 2024 11:04:20 -0400 Subject: [PATCH 7/8] build(deps): Bump github.com/celestiaorg/nmt from 0.21.0 to 0.22.0 (#1421) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bumps [github.com/celestiaorg/nmt](https://github.com/celestiaorg/nmt) from 0.21.0 to 0.22.0.
Release notes

Sourced from github.com/celestiaorg/nmt's releases.

v0.22.0

What's Changed

New Contributors

Full Changelog: https://github.com/celestiaorg/nmt/compare/v0.21.0...v0.22.0

Commits

[![Dependabot compatibility score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=github.com/celestiaorg/nmt&package-manager=go_modules&previous-version=0.21.0&new-version=0.22.0)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores) Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting `@dependabot rebase`. [//]: # (dependabot-automerge-start) [//]: # (dependabot-automerge-end) ---
Dependabot commands and options
You can trigger Dependabot actions by commenting on this PR: - `@dependabot rebase` will rebase this PR - `@dependabot recreate` will recreate this PR, overwriting any edits that have been made to it - `@dependabot merge` will merge this PR after your CI passes on it - `@dependabot squash and merge` will squash and merge this PR after your CI passes on it - `@dependabot cancel merge` will cancel a previously requested merge and block automerging - `@dependabot reopen` will reopen this PR if it is closed - `@dependabot close` will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually - `@dependabot show ignore conditions` will show all of the ignore conditions of the specified dependency - `@dependabot ignore this major version` will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself) - `@dependabot ignore this minor version` will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself) - `@dependabot ignore this dependency` will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 1a171cfe7b..b1b05c1f8f 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/btcsuite/btcd/btcec/v2 v2.2.1 github.com/btcsuite/btcd/btcutil v1.1.2 github.com/bufbuild/buf v1.9.0 - github.com/celestiaorg/nmt v0.21.0 + github.com/celestiaorg/nmt v0.22.0 github.com/cometbft/cometbft-db v0.7.0 github.com/creachadair/taskgroup v0.3.2 github.com/fortytw2/leaktest v1.3.0 diff --git a/go.sum b/go.sum index d1ef0dce8b..a4a2591c13 100644 --- a/go.sum +++ b/go.sum @@ -157,8 +157,8 @@ github.com/bufbuild/protocompile v0.1.0/go.mod h1:ix/MMMdsT3fzxfw91dvbfzKW3fRRnu github.com/butuzov/ireturn v0.1.1 h1:QvrO2QF2+/Cx1WA/vETCIYBKtRjc30vesdoPUNo1EbY= github.com/butuzov/ireturn v0.1.1/go.mod h1:Wh6Zl3IMtTpaIKbmwzqi6olnM9ptYQxxVacMsOEFPoc= github.com/bwesterb/go-ristretto v1.2.3/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0= -github.com/celestiaorg/nmt v0.21.0 h1:81MBqxNn3orByoiCtdNVjwi5WsLgMkzHwP02ZMhTBHM= -github.com/celestiaorg/nmt v0.21.0/go.mod h1:ia/EpCk0enD5yO5frcxoNoFToz2Ghtk2i+blmCRjIY8= +github.com/celestiaorg/nmt v0.22.0 h1:AGtfmBiVgreR1KkIV5R7XFNeMp/H4IUDLlBbLjZZ3zk= +github.com/celestiaorg/nmt v0.22.0/go.mod h1:ia/EpCk0enD5yO5frcxoNoFToz2Ghtk2i+blmCRjIY8= github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4= From 1e71630207f4b62b9b7ec64f31b016ccbef989c9 Mon Sep 17 00:00:00 2001 From: crystalstall Date: Tue, 23 Jul 2024 18:47:24 +0900 Subject: [PATCH 8/8] chore: fix some comments for struct field (#1424) ## Description fix some comments for struct field --- #### PR checklist - [ ] Tests written/updated - [ ] Changelog entry added in `.changelog` (we use [unclog](https://github.com/informalsystems/unclog) to manage our changelog) - [ ] Updated relevant documentation (`docs/` or `spec/`) and code comments Signed-off-by: crystalstall --- config/config.go | 2 +- consensus/metrics.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/config/config.go b/config/config.go index 27b2db6a90..5289f5fe1d 100644 --- a/config/config.go +++ b/config/config.go @@ -1215,7 +1215,7 @@ type InstrumentationConfig struct { // pyroscope continuous profiling server. PyroscopeURL string `mapstructure:"pyroscope_url"` - // PyroscopeProfile is a flag that enables tracing with pyroscope. + // PyroscopeTrace is a flag that enables tracing with pyroscope. PyroscopeTrace bool `mapstructure:"pyroscope_trace"` // PyroscopeProfileTypes is a list of profile types to be traced with diff --git a/consensus/metrics.go b/consensus/metrics.go index cafa2ab6bc..fa7573c84e 100644 --- a/consensus/metrics.go +++ b/consensus/metrics.go @@ -75,7 +75,7 @@ type Metrics struct { // was relevant to the block the node is trying to gather or not. BlockGossipPartsReceived metrics.Counter - // QuroumPrevoteMessageDelay is the interval in seconds between the proposal + // QuorumPrevoteMessageDelay is the interval in seconds between the proposal // timestamp and the timestamp of the earliest prevote that achieved a quorum // during the prevote step. //