Skip to content

Commit

Permalink
bump to latest core
Browse files Browse the repository at this point in the history
  • Loading branch information
billettc committed Mar 7, 2024
1 parent 531fa75 commit 1b7b1ab
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 133 deletions.
4 changes: 2 additions & 2 deletions cmd/firebtc/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package main

import (
"fmt"
"github.com/streamingfast/firehose-bitcoin/fetch"
"strconv"
"strings"
"time"

"github.com/spf13/cobra"
"github.com/streamingfast/cli"
"github.com/streamingfast/cli/sflags"
"github.com/streamingfast/firehose-bitcoin/poller"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -73,7 +73,7 @@ func readerRunE(cmd *cobra.Command, args []string) error {
rpcEndpoint = strings.TrimPrefix(rpcEndpoint, "http://")
}

p := poller.New(rpcEndpoint, https, blockFetchRetryCount, readerStateStoragePath, startBlockNum, ignoreCursor, headers, zlog)
p := fetch.New(rpcEndpoint, https, blockFetchRetryCount, readerStateStoragePath, startBlockNum, ignoreCursor, headers, zlog)
app := cli.NewApplication(ctx)
app.SuperviseAndStart(p)

Expand Down
30 changes: 17 additions & 13 deletions poller/poller.go → fetch/poller.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package poller
package fetch

import (
"context"
Expand All @@ -14,7 +14,7 @@ import (
"go.uber.org/zap"
)

type Poller struct {
type BlockFetcher struct {
*shutter.Shutter
endpoint string
blockFetchRetryCount uint64
Expand All @@ -38,8 +38,8 @@ func New(
startBlockNum uint64,
ignoreCursor bool,
headers map[string]string,
logger *zap.Logger) *Poller {
return &Poller{
logger *zap.Logger) *BlockFetcher {
return &BlockFetcher{
Shutter: shutter.New(),
endpoint: endpoint,
blockFetchRetryCount: blockFetchRetryCount,
Expand All @@ -53,7 +53,7 @@ func New(
}
}

func (p *Poller) Run(ctx context.Context) error {
func (p *BlockFetcher) Run(ctx context.Context) error {
contentType := getContentType()
p.logger.Info("launching firebtc poller",
zap.String("endpoint", p.endpoint),
Expand Down Expand Up @@ -106,10 +106,10 @@ func (p *Poller) Run(ctx context.Context) error {
zap.Stringer("chain_head_block", p.headBlock),
)

return bp.Run(ctx, p.startBlockNum, finalizedBlk)
return bp.Run(ctx, p.startBlockNum, 1)
}

func (p *Poller) GetFinalizedBlock(headBlock bstream.BlockRef) (bstream.BlockRef, error) {
func (p *BlockFetcher) GetFinalizedBlock(headBlock bstream.BlockRef) (bstream.BlockRef, error) {
finalizedBlockNum := int64(headBlock.Num() - pbbitcoin.LibOffset)
finalizedBlockHash, err := p.rpcClient.GetBlockHash(finalizedBlockNum)
if err != nil {
Expand All @@ -119,7 +119,7 @@ func (p *Poller) GetFinalizedBlock(headBlock bstream.BlockRef) (bstream.BlockRef
return bstream.NewBlockRef(finalizedBlockHash.String(), uint64(finalizedBlockNum)), nil
}

func (p *Poller) GetHeadBlock() (bstream.BlockRef, error) {
func (p *BlockFetcher) GetHeadBlock() (bstream.BlockRef, error) {
bestBlockHash, err := p.rpcClient.GetBestBlockHash()
if err != nil {
return nil, fmt.Errorf("unable to get best block hash: %w", err)
Expand All @@ -134,12 +134,16 @@ func (p *Poller) GetHeadBlock() (bstream.BlockRef, error) {
return bstream.NewBlockRef(bestBlockHash.String(), uint64(bestBlock.Height)), nil
}

func (p *Poller) Fetch(_ context.Context, blkNum uint64) (*pbbstream.Block, error) {
func (p *BlockFetcher) IsBlockAvailable(requestedSlot uint64) bool {
return requestedSlot <= p.headBlock.Num()
}

func (p *BlockFetcher) Fetch(_ context.Context, blkNum uint64) (*pbbstream.Block, bool, error) {
for p.headBlock.Num() < blkNum {
var err error
p.headBlock, err = p.GetHeadBlock()
if err != nil {
return nil, fmt.Errorf("failed to get head block: %w", err)
return nil, false, fmt.Errorf("failed to get head block: %w", err)
}

if p.headBlock.Num() < blkNum {
Expand All @@ -158,14 +162,14 @@ func (p *Poller) Fetch(_ context.Context, blkNum uint64) (*pbbstream.Block, erro
t0 := time.Now()
blkHash, err := p.rpcClient.GetBlockHash(int64(blkNum))
if err != nil {
return nil, fmt.Errorf("unable to get block hash for block %d: %w", blkNum, err)
return nil, false, fmt.Errorf("unable to get block hash for block %d: %w", blkNum, err)
}
duration := time.Since(t0)

t1 := time.Now()
rpcBlk, err := p.rpcClient.GetBlockVerboseTx(blkHash)
if err != nil {
return nil, fmt.Errorf("unable to get block %d (%s): %w", blkNum, blkHash.String(), err)
return nil, false, fmt.Errorf("unable to get block %d (%s): %w", blkNum, blkHash.String(), err)
}

p.logger.Debug("found block",
Expand Down Expand Up @@ -251,7 +255,7 @@ func (p *Poller) Fetch(_ context.Context, blkNum uint64) (*pbbstream.Block, erro
blk.Tx = append(blk.Tx, trx)
}

return blk.MustToBstreamBlock(), nil
return blk.MustToBstreamBlock(), false, nil
}

func getContentType() string {
Expand Down
4 changes: 2 additions & 2 deletions poller/poller_test.go → fetch/poller_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package poller
package fetch

import (
"fmt"
Expand Down Expand Up @@ -26,7 +26,7 @@ func TestNew(t *testing.T) {
client, err := rpcclient.New(connCfg, nil)
require.NoError(t, err)

r := &Poller{
r := &BlockFetcher{
rpcClient: client,
logger: zap.NewNop(),
}
Expand Down
74 changes: 40 additions & 34 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
module github.com/streamingfast/firehose-bitcoin

go 1.21.0
go 1.22

toolchain go1.22.0

require (
github.com/btcsuite/btcd v0.23.4
github.com/spf13/cobra v1.8.0
github.com/streamingfast/bstream v0.0.2-0.20231123130020-ad84cce9666d
github.com/streamingfast/bstream v0.0.2-0.20240207154557-a98153ba4b86
github.com/streamingfast/cli v0.0.4-0.20230825151644-8cc84512cd80
github.com/streamingfast/firehose-core v0.2.4-0.20231130152151-2e7b9f992f5b
github.com/streamingfast/firehose-core v1.2.4
github.com/streamingfast/logging v0.0.0-20230608130331-f22c91403091
github.com/streamingfast/shutter v1.5.0
github.com/test-go/testify v1.1.4
go.uber.org/zap v1.26.0
google.golang.org/protobuf v1.31.0
google.golang.org/protobuf v1.32.0
)

require (
cloud.google.com/go v0.110.4 // indirect
cloud.google.com/go/compute v1.21.0 // indirect
cloud.google.com/go v0.111.0 // indirect
cloud.google.com/go/compute v1.23.3 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v1.1.1 // indirect
cloud.google.com/go/iam v1.1.5 // indirect
cloud.google.com/go/storage v1.30.1 // indirect
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
github.com/Azure/azure-storage-blob-go v0.14.0 // indirect
Expand All @@ -36,24 +38,26 @@ require (
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chzyer/readline v1.5.0 // indirect
github.com/cncf/xds/go v0.0.0-20231109132714-523115ebc101 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/crypto/blake256 v1.0.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
github.com/envoyproxy/go-control-plane v0.11.1 // indirect
github.com/envoyproxy/protoc-gen-validate v1.0.2 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang-jwt/jwt/v4 v4.2.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/s2a-go v0.1.4 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
github.com/googleapis/gax-go/v2 v2.11.0 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/compress v1.16.5 // indirect
github.com/klauspost/compress v1.16.6 // indirect
github.com/lithammer/dedent v1.1.0 // indirect
github.com/logrusorgru/aurora v2.0.3+incompatible // indirect
github.com/magiconair/properties v1.8.7 // indirect
Expand All @@ -70,6 +74,7 @@ require (
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.0 // indirect
github.com/sercand/kuberesolver/v5 v5.1.1 // indirect
github.com/sethvargo/go-retry v0.2.3 // indirect
github.com/spf13/afero v1.9.3 // indirect
github.com/spf13/cast v1.5.0 // indirect
Expand All @@ -78,37 +83,38 @@ require (
github.com/spf13/viper v1.15.0 // indirect
github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c // indirect
github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1 // indirect
github.com/streamingfast/dgrpc v0.0.0-20230929132851-893fc52687fa // indirect
github.com/streamingfast/dgrpc v0.0.0-20240222213940-b9f324ff4d5c // indirect
github.com/streamingfast/dhammer v0.0.0-20230125192823-c34bbd561bd4 // indirect
github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545 // indirect
github.com/streamingfast/dstore v0.1.1-0.20230620124109-3924b3b36c77 // indirect
github.com/streamingfast/dstore v0.1.1-0.20240215171730-493ad5a0f537 // indirect
github.com/streamingfast/opaque v0.0.0-20210811180740-0c01d37ea308 // indirect
github.com/streamingfast/pbgo v0.0.6-0.20231120172814-537d034aad5e // indirect
github.com/streamingfast/pbgo v0.0.6-0.20240131193313-6b88bc7139db // indirect
github.com/stretchr/testify v1.8.4 // indirect
github.com/subosito/gotenv v1.4.2 // indirect
github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.44.0 // indirect
go.opentelemetry.io/otel v1.18.0 // indirect
go.opentelemetry.io/otel/metric v1.18.0 // indirect
go.opentelemetry.io/otel/trace v1.18.0 // indirect
go.opentelemetry.io/otel v1.23.1 // indirect
go.opentelemetry.io/otel/metric v1.23.1 // indirect
go.opentelemetry.io/otel/trace v1.23.1 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/goleak v1.2.1 // indirect
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/crypto v0.13.0 // indirect
golang.org/x/crypto v0.19.0 // indirect
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect
golang.org/x/net v0.15.0 // indirect
golang.org/x/oauth2 v0.10.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/term v0.12.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/api v0.126.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230711160842-782d3b101e98 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect
google.golang.org/grpc v1.58.0 // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/oauth2 v0.15.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/term v0.17.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
google.golang.org/api v0.152.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto v0.0.0-20231212172506-995d672761c0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240102182953-50ed04b92917 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240102182953-50ed04b92917 // indirect
google.golang.org/grpc v1.61.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Expand Down
Loading

0 comments on commit 1b7b1ab

Please sign in to comment.