diff --git a/cmd/fireeth/cli/node.go b/cmd/fireeth/cli/node.go index bee6b01e..754bbc03 100644 --- a/cmd/fireeth/cli/node.go +++ b/cmd/fireeth/cli/node.go @@ -25,6 +25,7 @@ import ( "github.com/streamingfast/bstream" "github.com/streamingfast/bstream/blockstream" "github.com/streamingfast/dlauncher/launcher" + firecore "github.com/streamingfast/firehose-core" "github.com/streamingfast/firehose-ethereum/codec" nodemanager "github.com/streamingfast/firehose-ethereum/node-manager" "github.com/streamingfast/firehose-ethereum/node-manager/dev" @@ -203,7 +204,7 @@ func nodeFactoryFunc(isReader bool, backupModuleFactories map[string]operator.Ba oneBlocksStoreURL, workingDir, func(lines chan string) (reader.ConsolerReader, error) { - return codec.NewConsoleReader(appLogger, lines) + return codec.NewConsoleReader(lines, firecore.NewBlockEncoder(), appLogger, appTracer) }, batchStartBlockNum, batchStopBlockNum, diff --git a/cmd/fireeth/cli/reader-node-stdin.go b/cmd/fireeth/cli/reader-node-stdin.go index 0bc5cde1..cbc31008 100644 --- a/cmd/fireeth/cli/reader-node-stdin.go +++ b/cmd/fireeth/cli/reader-node-stdin.go @@ -20,6 +20,7 @@ import ( "github.com/spf13/cobra" "github.com/spf13/viper" "github.com/streamingfast/dlauncher/launcher" + firecore "github.com/streamingfast/firehose-core" "github.com/streamingfast/firehose-ethereum/codec" "github.com/streamingfast/logging" nodeManager "github.com/streamingfast/node-manager" @@ -41,7 +42,7 @@ func init() { archiveStoreURL := MustReplaceDataDir(sfDataDir, viper.GetString("common-one-block-store-url")) consoleReaderFactory := func(lines chan string) (mindreader.ConsolerReader, error) { - r, err := codec.NewConsoleReader(appLogger, lines) + r, err := codec.NewConsoleReader(lines, firecore.NewBlockEncoder(), appLogger, appTracer) if err != nil { return nil, fmt.Errorf("initiating console reader: %w", err) } diff --git a/cmd/fireeth/main.go b/cmd/fireeth/main.go index 468e8112..1532e49e 100644 --- a/cmd/fireeth/main.go +++ b/cmd/fireeth/main.go @@ -63,8 +63,10 @@ var Chain = &firecore.Chain[*pbeth.Block]{ BlockPrinter: printBlock, RegisterExtraCmd: func(chain *firecore.Chain[*pbeth.Block], toolsCmd *cobra.Command, zlog *zap.Logger, tracer logging.Tracer) error { - // toolsCmd.AddCommand(newToolsGenerateNodeKeyCmd(chain)) - // toolsCmd.AddCommand(newToolsBackfillCmd(zlog)) + toolsCmd.AddCommand(compareOneblockRPCCmd) + toolsCmd.AddCommand(newCompareBlocksRPCCmd(zlog)) + toolsCmd.AddCommand(newFixPolygonIndexCmd(zlog)) + toolsCmd.AddCommand(newPollRPCBlocksCmd(zlog)) return nil }, diff --git a/cmd/fireeth/main_test.go b/cmd/fireeth/main_test.go index 4fecc806..919b39dd 100644 --- a/cmd/fireeth/main_test.go +++ b/cmd/fireeth/main_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/streamingfast/bstream" + firecore "github.com/streamingfast/firehose-core" pbeth "github.com/streamingfast/firehose-ethereum/types/pb/sf/ethereum/type/v2" "github.com/test-go/testify/require" ) @@ -13,10 +14,11 @@ func Test_Encode_Decode_Block(t *testing.T) { Chain.Validate() Chain.Init() - original, err := Chain.BlockEncoder.Encode(&pbeth.Block{ + original, err := Chain.BlockEncoder.Encode(firecore.BlockEnveloppe{Block: &pbeth.Block{ Number: 1, Header: &pbeth.BlockHeader{}, - }) + Ver: 1, + }, LIBNum: 0}) require.NoError(t, err) require.Equal(t, uint64(1), original.ToProtocol().(*pbeth.Block).Number) diff --git a/cmd/fireeth/tools.go b/cmd/fireeth/tools.go index de0b735f..ec0d1b9e 100644 --- a/cmd/fireeth/tools.go +++ b/cmd/fireeth/tools.go @@ -6,6 +6,7 @@ import ( "io" "github.com/streamingfast/bstream" + "github.com/streamingfast/cli" pbeth "github.com/streamingfast/firehose-ethereum/types/pb/sf/ethereum/type/v2" ) @@ -31,3 +32,7 @@ func printBlock(blk *bstream.Block, alsoPrintTransactions bool, out io.Writer) e return nil } + +func ExamplePrefixed(prefix, examples string) string { + return string(cli.ExamplePrefixed(prefix, examples)) +} diff --git a/cmd/fireeth/tools_compare_blocks_rpc.go b/cmd/fireeth/tools_compare_blocks_rpc.go new file mode 100644 index 00000000..b4517f5a --- /dev/null +++ b/cmd/fireeth/tools_compare_blocks_rpc.go @@ -0,0 +1,444 @@ +// Copyright 2021 dfuse Platform Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "bytes" + "fmt" + "io" + "os" + "strconv" + "time" + + "github.com/holiman/uint256" + jd "github.com/josephburnett/jd/lib" + "github.com/mostynb/go-grpc-compression/zstd" + "github.com/spf13/cobra" + "github.com/streamingfast/bstream" + "github.com/streamingfast/cli" + "github.com/streamingfast/cli/sflags" + "github.com/streamingfast/eth-go" + "github.com/streamingfast/eth-go/rpc" + firecore "github.com/streamingfast/firehose-core" + "github.com/streamingfast/firehose-ethereum/types" + pbeth "github.com/streamingfast/firehose-ethereum/types/pb/sf/ethereum/type/v2" + "github.com/streamingfast/firehose/client" + pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2" + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" +) + +func init() { + + //compareBlocksRPCCmd.PersistentFlags().String("write-rpc-cache", "compared-rpc-blocks.jsonl", "When non-empty, the results of the RPC calls will be appended to this JSONL file") + //compareBlocksRPCCmd.PersistentFlags().String("read-rpc-cache", "compared-rpc-blocks.jsonl", "When non-empty, this file will be parsed before doing any RPC calls") +} + +func newCompareBlocksRPCCmd(logger *zap.Logger) *cobra.Command { + cmd := &cobra.Command{ + Use: "compare-blocks-rpc ", + Short: "Checks for any differences between a Firehose and RPC endpoint (get_block) for a specified range.", + Long: cli.Dedent(` + The 'compare-blocks-rpc' takes in a firehose URL, an RPC endpoint URL and inclusive start/stop block numbers. + `), + Args: cobra.ExactArgs(4), + RunE: createCompareBlocksRPCE(logger), + Example: ExamplePrefixed("fireeth tools compare-blocks-rpc", ` + # Run over full block range + mainnet.eth.streamingfast.io:443 http://localhost:8545 1000000 1001000 + `), + } + + cmd.PersistentFlags().Bool("diff", false, "When activated, difference is displayed for each block with a difference") + cmd.Flags().BoolP("plaintext", "p", false, "Use plaintext connection to Firehose") + cmd.Flags().BoolP("insecure", "k", false, "Use SSL connection to Firehose but skip SSL certificate validation") + + cmd.Flags().StringP("api-token-env-var", "a", "FIREHOSE_API_TOKEN", "Look for a JWT in this environment variable to authenticate against endpoint") + + return cmd +} + +func createCompareBlocksRPCE(logger *zap.Logger) firecore.CommandExecutor { + return func(cmd *cobra.Command, args []string) error { + ctx := cmd.Context() + + firehoseEndpoint := args[0] + rpcEndpoint := args[1] + cli := rpc.NewClient(rpcEndpoint) + start, err := strconv.ParseInt(args[2], 10, 64) + if err != nil { + return fmt.Errorf("parsing start block num: %w", err) + } + stop, err := strconv.ParseUint(args[3], 10, 64) + if err != nil { + return fmt.Errorf("parsing stop block num: %w", err) + } + apiTokenEnvVar := sflags.MustGetString(cmd, "api-token-env-var") + jwt := os.Getenv(apiTokenEnvVar) + + plaintext := sflags.MustGetBool(cmd, "plaintext") + insecure := sflags.MustGetBool(cmd, "insecure") + + firehoseClient, connClose, grpcCallOpts, err := client.NewFirehoseClient(firehoseEndpoint, jwt, insecure, plaintext) + if err != nil { + return err + } + defer connClose() + + grpcCallOpts = append(grpcCallOpts, grpc.UseCompressor(zstd.Name)) + + request := &pbfirehose.Request{ + StartBlockNum: start, + StopBlockNum: stop, + FinalBlocksOnly: true, + } + + stream, err := firehoseClient.Blocks(ctx, request, grpcCallOpts...) + if err != nil { + return fmt.Errorf("unable to start blocks stream: %w", err) + } + + meta, err := stream.Header() + if err != nil { + logger.Warn("cannot read header") + } else { + if hosts := meta.Get("hostname"); len(hosts) != 0 { + logger = logger.With(zap.String("remote_hostname", hosts[0])) + } + } + logger.Info("connected") + + respChan := make(chan *pbeth.Block, 100) + + allDone := make(chan struct{}) + go func() { + + for fhBlock := range respChan { + + rpcBlock, err := cli.GetBlockByNumber(ctx, rpc.BlockNumber(fhBlock.Number), rpc.WithGetBlockFullTransaction()) + if err != nil { + panic(err) + } + + logs, err := cli.Logs(ctx, rpc.LogsParams{ + FromBlock: rpc.BlockNumber(fhBlock.Number), + ToBlock: rpc.BlockNumber(fhBlock.Number), + }) + if err != nil { + panic(err) + } + + identical, diffs := CompareFirehoseToRPC(fhBlock, rpcBlock, logs) + if !identical { + fmt.Println("different", diffs) + } else { + fmt.Println(fhBlock.Number, "identical") + } + } + close(allDone) + }() + + for { + response, err := stream.Recv() + if err != nil { + if err == io.EOF { + break + } + return fmt.Errorf("stream error while receiving: %w", err) + } + blk, err := decodeAnyPB(response.Block) + if err != nil { + return fmt.Errorf("error while decoding block: %w", err) + } + respChan <- blk.ToProtocol().(*pbeth.Block) + } + close(respChan) + <-allDone + + return nil + } +} + +func bigIntFromEthUint256(in *eth.Uint256) *pbeth.BigInt { + if in == nil { + return &pbeth.BigInt{} + } + + in32 := (*uint256.Int)(in).Bytes32() + slice := bytes.TrimLeft(in32[:], string([]byte{0})) + if len(slice) == 0 { + return &pbeth.BigInt{} + } + return pbeth.BigIntFromBytes(slice) +} + +func toFirehoseBlock(in *rpc.Block, logs []*rpc.LogEntry) (*pbeth.Block, map[string]bool) { + + trx, hashesWithoutTo := toFirehoseTraces(in.Transactions, logs) + + out := &pbeth.Block{ + Hash: in.Hash.Bytes(), + Number: uint64(in.Number), + Ver: 3, + Size: uint64(in.BlockSize), + Uncles: toFirehoseUncles(in.Uncles), + TransactionTraces: trx, + Header: &pbeth.BlockHeader{ + ParentHash: in.ParentHash.Bytes(), + // Coinbase: nil, // FIXME + // UncleHash: nil, + StateRoot: in.StateRoot.Bytes(), + TransactionsRoot: in.TransactionsRoot.Bytes(), + ReceiptRoot: in.ReceiptsRoot.Bytes(), + LogsBloom: in.LogsBloom.Bytes(), + Difficulty: bigIntFromEthUint256(in.Difficulty), + TotalDifficulty: bigIntFromEthUint256(in.TotalDifficulty), + Number: uint64(in.Number), + GasLimit: uint64(in.GasLimit), + GasUsed: uint64(in.GasUsed), + Timestamp: timestamppb.New(time.Time(in.Timestamp)), + ExtraData: in.ExtraData.Bytes(), + Nonce: uint64(in.Nonce), + Hash: in.Hash.Bytes(), + MixHash: in.MixHash.Bytes(), + BaseFeePerGas: bigIntFromEthUint256(in.BaseFeePerGas), + // WithdrawalsRoot: in.WithdrawalsRoot, // FIXME + // TxDependency: in.TxDependency // FIXME + }, + } + return out, hashesWithoutTo +} + +func toFirehoseUncles(in []eth.Hash) []*pbeth.BlockHeader { + out := make([]*pbeth.BlockHeader, len(in)) + for i := range in { + out[i] = &pbeth.BlockHeader{ + Hash: in[i].Bytes(), + } + } + return out +} + +func toFirehoseTraces(in *rpc.BlockTransactions, logs []*rpc.LogEntry) (traces []*pbeth.TransactionTrace, hashesWithoutTo map[string]bool) { + + receipts, _ := in.Receipts() + out := make([]*pbeth.TransactionTrace, len(receipts)) + hashesWithoutTo = make(map[string]bool) + for i := range receipts { + txHash := eth.Hash(receipts[i].Hash.Bytes()).String() + var toBytes []byte + if receipts[i].To != nil { + toBytes = receipts[i].To.Bytes() + } else { + hashesWithoutTo[txHash] = true + } + + out[i] = &pbeth.TransactionTrace{ + Hash: receipts[i].Hash.Bytes(), + To: toBytes, + Nonce: uint64(receipts[i].Nonce), + GasLimit: uint64(receipts[i].Gas), + GasPrice: bigIntFromEthUint256(receipts[i].GasPrice), + Input: receipts[i].Input.Bytes(), + Value: bigIntFromEthUint256(receipts[i].Value), + From: receipts[i].From.Bytes(), + Index: uint32(receipts[i].TransactionIndex), + Receipt: &pbeth.TransactionReceipt{ + // filled next + }, + V: pbeth.NewBigInt(int64(receipts[i].V)).Bytes, + //R: bigIntFromEthUint256(receipts[i].R).Bytes, + //S: bigIntFromEthUint256(receipts[i].S).Bytes, + } + + for _, log := range logs { + if eth.Hash(log.TransactionHash).String() == txHash { + out[i].Receipt.Logs = append(out[i].Receipt.Logs, &pbeth.Log{ + Address: log.Address.Bytes(), //[]byte `protobuf:"bytes,1,opt,name=address,proto3" json:"address,omitempty"` + Topics: hashesToBytes(log.Topics), //[][]byte `protobuf:"bytes,2,rep,name=topics,proto3" json:"topics,omitempty"` + Data: log.Data.Bytes(), //[]byte `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"` + BlockIndex: uint32(log.ToLog().BlockIndex), //uint32 `protobuf:"varint,6,opt,name=blockIndex,proto3" json:"blockIndex,omitempty"` + }) + } + } + + } + return out, hashesWithoutTo +} + +func hashesToBytes(in []eth.Hash) [][]byte { + out := make([][]byte, len(in)) + for i := range in { + out[i] = in[i].Bytes() + } + return out +} + +// only keep hash +func stripFirehoseUncles(in []*pbeth.BlockHeader) { + for _, uncle := range in { + uncle.BaseFeePerGas = nil + uncle.Coinbase = nil + uncle.Difficulty = nil + uncle.ExtraData = nil + uncle.GasLimit = 0 + uncle.GasUsed = 0 + uncle.LogsBloom = nil + uncle.MixHash = nil + uncle.Nonce = 0 + uncle.Number = 0 + uncle.ParentHash = nil + uncle.ReceiptRoot = nil + uncle.StateRoot = nil + uncle.Timestamp = nil + uncle.TotalDifficulty = nil + uncle.TransactionsRoot = nil + uncle.TxDependency = nil + uncle.UncleHash = nil + uncle.WithdrawalsRoot = nil + } +} + +func stripFirehoseHeader(in *pbeth.BlockHeader) { + in.Coinbase = nil + in.Timestamp = nil + in.TxDependency = nil + in.UncleHash = nil + in.WithdrawalsRoot = nil + + if in.BaseFeePerGas == nil { + in.BaseFeePerGas = &pbeth.BigInt{} + } + + if len(in.Difficulty.Bytes) == 1 && in.Difficulty.Bytes[0] == 0x0 { + in.Difficulty.Bytes = nil + } +} + +func stripFirehoseBlock(in *pbeth.Block, hashesWithoutTo map[string]bool) { + // clean up internal values + msg := in.ProtoReflect() + msg.SetUnknown(nil) + in = msg.Interface().(*pbeth.Block) + + in.Ver = 0 + stripFirehoseHeader(in.Header) + stripFirehoseUncles(in.Uncles) + stripFirehoseTransactionTraces(in.TransactionTraces, hashesWithoutTo) + + // ARB-ONE FIX + if in.Header.TotalDifficulty.Uint64() == 2 { + in.Header.TotalDifficulty = pbeth.NewBigInt(int64(in.Number) - 22207816) + } + + // FIXME temp + in.BalanceChanges = nil + in.CodeChanges = nil +} + +func stripFirehoseTransactionTraces(in []*pbeth.TransactionTrace, hashesWithoutTo map[string]bool) { + idx := uint32(0) + for _, trace := range in { + + if hashesWithoutTo[eth.Hash(trace.Hash).String()] { + trace.To = nil // FIXME: we could compute this from nonce+address + } + + trace.BeginOrdinal = 0 + trace.EndOrdinal = 0 + trace.AccessList = nil + + trace.GasUsed = 0 // FIXME receipt? + + if trace.GasPrice == nil { + trace.GasPrice = &pbeth.BigInt{} + } + + // FIXME ... + trace.R = nil + trace.S = nil + + trace.Type = 0 + trace.AccessList = nil + trace.MaxFeePerGas = nil + trace.MaxPriorityFeePerGas = nil + + trace.ReturnData = nil + trace.PublicKey = nil + + trace.Status = 0 + stripFirehoseTrxReceipt(trace.Receipt) + trace.Calls = nil + + if trace.Value == nil { + trace.Value = &pbeth.BigInt{} + } + idx++ + } +} + +func stripFirehoseTrxReceipt(in *pbeth.TransactionReceipt) { + for _, log := range in.Logs { + log.Ordinal = 0 + log.Index = 0 // index inside transaction is a pbeth construct, it doesn't exist in RPC interface and we can't reconstruct exactly the same from RPC because the pbeth ones are increased even when a call is reverted. + } + in.LogsBloom = nil + in.StateRoot = nil + in.CumulativeGasUsed = 0 +} + +func CompareFirehoseToRPC(fhBlock *pbeth.Block, rpcBlock *rpc.Block, logs []*rpc.LogEntry) (isEqual bool, differences []string) { + if fhBlock == nil && rpcBlock == nil { + return true, nil + } + + rpcAsPBEth, hashesWithoutTo := toFirehoseBlock(rpcBlock, logs) + stripFirehoseBlock(fhBlock, hashesWithoutTo) + + if !proto.Equal(fhBlock, rpcAsPBEth) { + fh, err := rpc.MarshalJSONRPCIndent(fhBlock, "", " ") + cli.NoError(err, "cannot marshal Firehose block to JSON") + rpc, err := rpc.MarshalJSONRPCIndent(rpcAsPBEth, "", " ") + cli.NoError(err, "cannot marshal RPC block to JSON") + f, err := jd.ReadJsonString(string(fh)) + cli.NoError(err, "cannot read Firehose block JSON") + r, err := jd.ReadJsonString(string(rpc)) + cli.NoError(err, "cannot read RPC block JSON") + // fmt.Println(string(fh)) + // fmt.Println("RPC") + // fmt.Println(string(rpc)) + + if diff := r.Diff(f).Render(); diff != "" { + differences = append(differences, diff) + } + return false, differences + } + return true, nil +} + +func decodeAnyPB(in *anypb.Any) (*bstream.Block, error) { + block := &pbeth.Block{} + if err := anypb.UnmarshalTo(in, block, proto.UnmarshalOptions{}); err != nil { + return nil, fmt.Errorf("unmarshal anypb: %w", err) + } + + // We are downloading only final blocks from the Firehose connection which means the LIB for them + // can be set to themself (althought we use `- 1` to ensure problem would occur if codde don't like + // `LIBNum == self.BlockNum`). + return types.BlockFromProto(block, block.Number-1) +} diff --git a/cmd/fireeth/tools_compare_oneblock_rpc.go b/cmd/fireeth/tools_compare_oneblock_rpc.go new file mode 100644 index 00000000..3518c918 --- /dev/null +++ b/cmd/fireeth/tools_compare_oneblock_rpc.go @@ -0,0 +1,102 @@ +// Copyright 2021 dfuse Platform Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "fmt" + "os" + + "github.com/DataDog/zstd" + "github.com/spf13/cobra" + "github.com/streamingfast/bstream" + "github.com/streamingfast/cli" + "github.com/streamingfast/eth-go/rpc" + pbeth "github.com/streamingfast/firehose-ethereum/types/pb/sf/ethereum/type/v2" +) + +var compareOneblockRPCCmd = &cobra.Command{ + Use: "compare-oneblock-rpc ", + Short: "Checks for any differences between a firehose one-block and the same block from RPC endpoint (get_block).", + Long: cli.Dedent(` + The 'compare-oneblock-rpc' takes in a local path, an RPC endpoint URL and compares a single block at a time. + `), + Args: cobra.ExactArgs(2), + RunE: compareOneblockRPCE, + Example: ExamplePrefixed("fireeth tools compare-oneblock-rpc", ` + /path/to/oneblocks/0046904064-0061a308bf12bc2e-5b6ef5eed4e06d5b-46903864-default.dbin.zst http://localhost:8545 + `), +} + +func compareOneblockRPCE(cmd *cobra.Command, args []string) error { + + ctx := cmd.Context() + filepath := args[0] + rpcEndpoint := args[1] + + fhBlock, err := getOneBlock(filepath) + if err != nil { + return err + } + + cli := rpc.NewClient(rpcEndpoint) + + rpcBlock, err := cli.GetBlockByNumber(ctx, rpc.BlockNumber(fhBlock.Number), rpc.WithGetBlockFullTransaction()) + if err != nil { + return err + } + + logs, err := cli.Logs(ctx, rpc.LogsParams{ + FromBlock: rpc.BlockNumber(fhBlock.Number), + ToBlock: rpc.BlockNumber(fhBlock.Number), + }) + if err != nil { + return err + } + + identical, diffs := CompareFirehoseToRPC(fhBlock, rpcBlock, logs) + if !identical { + fmt.Println("different", diffs) + } else { + fmt.Println(fhBlock.Number, "identical") + } + return nil +} + +func getOneBlock(path string) (*pbeth.Block, error) { + // Check if it's a file and if it exists + if !cli.FileExists(path) { + return nil, os.ErrNotExist + } + + file, err := os.Open(path) + if err != nil { + return nil, err + } + + uncompressedReader := zstd.NewReader(file) + defer uncompressedReader.Close() + + readerFactory, err := bstream.GetBlockReaderFactory.New(uncompressedReader) + if err != nil { + return nil, fmt.Errorf("new block reader: %w", err) + } + + block, err := readerFactory.Read() + if err != nil { + return nil, fmt.Errorf("reading block: %w", err) + } + + return block.ToProtocol().(*pbeth.Block), nil +} diff --git a/cmd/fireeth/tools_fix_polygon_index.go b/cmd/fireeth/tools_fix_polygon_index.go new file mode 100644 index 00000000..ab45ced7 --- /dev/null +++ b/cmd/fireeth/tools_fix_polygon_index.go @@ -0,0 +1,165 @@ +package main + +import ( + "context" + "fmt" + "io" + "strconv" + + "github.com/spf13/cobra" + "github.com/streamingfast/bstream" + "github.com/streamingfast/dstore" + firecore "github.com/streamingfast/firehose-core" + "github.com/streamingfast/firehose-ethereum/types" + pbeth "github.com/streamingfast/firehose-ethereum/types/pb/sf/ethereum/type/v2" + "go.uber.org/zap" +) + +func newFixPolygonIndexCmd(logger *zap.Logger) *cobra.Command { + return &cobra.Command{ + Use: "fix-polygon-index ", + Short: "look for blocks containing a single transaction with index==1 (where it should be index==0) and rewrite the affected 100-block-files to dest. it does not rewrite correct merged-files-bundles", + Args: cobra.ExactArgs(4), + RunE: createFixPolygonIndexE(logger), + } +} + +func createFixPolygonIndexE(logger *zap.Logger) firecore.CommandExecutor { + return func(cmd *cobra.Command, args []string) error { + ctx := cmd.Context() + + srcStore, err := dstore.NewDBinStore(args[0]) + if err != nil { + return fmt.Errorf("unable to create source store: %w", err) + } + + destStore, err := dstore.NewDBinStore(args[1]) + if err != nil { + return fmt.Errorf("unable to create destination store: %w", err) + } + + start := mustParseUint64(args[2]) + stop := mustParseUint64(args[3]) + + if stop <= start { + return fmt.Errorf("stop block must be greater than start block") + } + + startWalkFrom := fmt.Sprintf("%010d", start-(start%100)) + err = srcStore.WalkFrom(ctx, "", startWalkFrom, func(filename string) error { + logger.Debug("checking merged block file", zap.String("filename", filename)) + + startBlock := mustParseUint64(filename) + + if startBlock > stop { + logger.Debug("skipping merged block file", zap.String("reason", "past stop block"), zap.String("filename", filename)) + return io.EOF + } + + if startBlock+100 < start { + logger.Debug("skipping merged block file", zap.String("reason", "before start block"), zap.String("filename", filename)) + return nil + } + + rc, err := srcStore.OpenObject(ctx, filename) + if err != nil { + return fmt.Errorf("failed to open %s: %w", filename, err) + } + defer rc.Close() + + br, err := bstream.GetBlockReaderFactory.New(rc) + if err != nil { + return fmt.Errorf("creating block reader: %w", err) + } + + var mustWrite bool + blocks := make([]*bstream.Block, 100) + i := 0 + for { + block, err := br.Read() + if err == io.EOF { + break + } + + ethBlock := block.ToProtocol().(*pbeth.Block) + if len(ethBlock.TransactionTraces) == 1 && + ethBlock.TransactionTraces[0].Index == 1 { + fmt.Println("ERROR FOUND AT BLOCK", block.Number) + mustWrite = true + ethBlock.TransactionTraces[0].Index = 0 + block, err = types.BlockFromProto(ethBlock, block.LibNum) + if err != nil { + return fmt.Errorf("re-packing the block: %w", err) + } + } + blocks[i] = block + i++ + } + if i != 100 { + return fmt.Errorf("expected to have read 100 blocks, we have read %d. Bailing out.", i) + } + if mustWrite { + if err := writeMergedBlocks(startBlock, destStore, blocks); err != nil { + return fmt.Errorf("writing merged block %d: %w", startBlock, err) + } + } + + return nil + }) + + if err == io.EOF { + return nil + } + + if err != nil { + return err + } + + return nil + } +} + +func writeMergedBlocks(lowBlockNum uint64, store dstore.Store, blocks []*bstream.Block) error { + file := filename(lowBlockNum) + fmt.Printf("writing merged file %s.dbin.zst\n", file) + + if len(blocks) == 0 { + return fmt.Errorf("no blocks to write to bundle") + } + + pr, pw := io.Pipe() + + go func() { + var err error + defer func() { + pw.CloseWithError(err) + }() + + blockWriter, err := bstream.GetBlockWriterFactory.New(pw) + if err != nil { + return + } + + for _, blk := range blocks { + err = blockWriter.Write(blk) + if err != nil { + return + } + } + }() + + return store.WriteObject(context.Background(), file, pr) +} + +func filename(num uint64) string { + return fmt.Sprintf("%010d", num) +} + +func mustParseUint64(in string) uint64 { + out, err := strconv.ParseUint(in, 0, 64) + if err != nil { + panic(fmt.Errorf("unable to parse %q as uint64: %w", in, err)) + } + + return out +} diff --git a/cmd/fireeth/tools_poll_rpc_blocks.go b/cmd/fireeth/tools_poll_rpc_blocks.go new file mode 100644 index 00000000..9a98caac --- /dev/null +++ b/cmd/fireeth/tools_poll_rpc_blocks.go @@ -0,0 +1,101 @@ +package main + +import ( + "encoding/base64" + "encoding/hex" + "fmt" + "strconv" + "time" + + "github.com/spf13/cobra" + "github.com/streamingfast/eth-go/rpc" + firecore "github.com/streamingfast/firehose-core" + "go.uber.org/zap" + "google.golang.org/protobuf/proto" +) + +func newPollRPCBlocksCmd(logger *zap.Logger) *cobra.Command { + return &cobra.Command{ + Use: "poll-rpc-blocks ", + Short: "Generate 'light' firehose blocks from an RPC endpoint", + Args: cobra.ExactArgs(2), + RunE: createPollRPCBlocksE(logger), + } +} + +var pollDelay = time.Millisecond * 100 + +var lastDelayWarning time.Time + +func createPollRPCBlocksE(logger *zap.Logger) firecore.CommandExecutor { + delay := func(err error) { + if err != nil { + logger.Warn("retrying...", zap.Error(err)) + } + time.Sleep(pollDelay) + } + + return func(cmd *cobra.Command, args []string) error { + ctx := cmd.Context() + + rpcEndpoint := args[0] + startBlockNumStr := args[1] + + logger.Info("retrieving from rpc endpoint", + zap.String("start_block_num", startBlockNumStr), + zap.String("rpc_endpoint", rpcEndpoint), + ) + startBlockNum, err := strconv.ParseUint(startBlockNumStr, 10, 64) + if err != nil { + return fmt.Errorf("unable to parse start block number %s: %w", startBlockNumStr, err) + } + client := rpc.NewClient(rpcEndpoint) + + fmt.Println("FIRE INIT 2.3 local v1.0.0") + + blockNum := startBlockNum + for { + latest, err := client.LatestBlockNum(ctx) + if err != nil { + delay(err) + continue + } + + if latest < blockNum { + delay(nil) + continue + } + rpcBlock, err := client.GetBlockByNumber(ctx, rpc.BlockNumber(blockNum), rpc.WithGetBlockFullTransaction()) + if err != nil { + delay(err) + continue + } + + logs, err := client.Logs(ctx, rpc.LogsParams{ + FromBlock: rpc.BlockNumber(blockNum), + ToBlock: rpc.BlockNumber(blockNum), + }) + if err != nil { + delay(err) + continue + } + + block, _ := toFirehoseBlock(rpcBlock, logs) + cnt, err := proto.Marshal(block) + if err != nil { + return fmt.Errorf("failed to proto marshal pb sol block: %w", err) + } + + libNum := uint64(0) + if blockNum != 0 { + libNum = blockNum - 1 + } + b64Cnt := base64.StdEncoding.EncodeToString(cnt) + lineCnt := fmt.Sprintf("FIRE BLOCK %d %s %d %s %s", blockNum, hex.EncodeToString(block.Hash), libNum, hex.EncodeToString(block.Header.ParentHash), b64Cnt) + if _, err := fmt.Println(lineCnt); err != nil { + return fmt.Errorf("failed to write log line (char lenght %d): %w", len(lineCnt), err) + } + blockNum++ + } + } +} diff --git a/go.mod b/go.mod index c2569bbe..ee0e6dcc 100644 --- a/go.mod +++ b/go.mod @@ -3,18 +3,19 @@ module github.com/streamingfast/firehose-ethereum go 1.21 require ( + github.com/DataDog/zstd v1.5.5 github.com/RoaringBitmap/roaring v0.9.4 github.com/ShinyTrinkets/overseer v0.3.0 github.com/golang/protobuf v1.5.3 github.com/holiman/uint256 v1.2.0 github.com/josephburnett/jd v1.7.1 - github.com/klauspost/compress v1.16.5 github.com/lithammer/dedent v1.1.0 github.com/logrusorgru/aurora v2.0.3+incompatible github.com/manifoldco/promptui v0.9.0 github.com/mitchellh/go-testing-interface v1.14.1 github.com/mostynb/go-grpc-compression v1.1.17 - github.com/spf13/cobra v1.6.1 + github.com/spf13/cobra v1.7.0 + github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.15.0 github.com/streamingfast/bstream v0.0.2-0.20230829131224-b9272048dc6a github.com/streamingfast/cli v0.0.4-0.20230825151644-8cc84512cd80 @@ -39,7 +40,6 @@ require ( github.com/streamingfast/sf-tools v0.0.0-20230811131237-97a5d4afa459 github.com/streamingfast/sf-tracing v0.0.0-20230616174903-cd2ade641ca9 github.com/streamingfast/shutter v1.5.0 - github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0 github.com/streamingfast/substreams v1.1.20 github.com/stretchr/testify v1.8.4 github.com/test-go/testify v1.1.4 @@ -76,7 +76,7 @@ require ( github.com/benbjohnson/clock v1.3.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.3.1 // indirect - github.com/blendle/zapdriver v1.3.1 // indirect + github.com/blendle/zapdriver v1.3.2-0.20200203083823-9200777f8a3d // indirect github.com/bobg/go-generics/v2 v2.1.1 // indirect github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect github.com/bufbuild/connect-go v1.10.0 // indirect @@ -130,6 +130,7 @@ require ( github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect + github.com/klauspost/compress v1.16.5 // indirect github.com/klauspost/cpuid/v2 v2.2.3 // indirect github.com/libp2p/go-buffer-pool v0.1.0 // indirect github.com/libp2p/go-flow-metrics v0.1.0 // indirect @@ -172,7 +173,6 @@ require ( github.com/spf13/afero v1.9.3 // indirect github.com/spf13/cast v1.5.0 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect - github.com/spf13/pflag v1.0.5 // indirect github.com/streamingfast/atm v0.0.0-20220131151839-18c87005e680 // indirect github.com/streamingfast/dbin v0.9.1-0.20220513054835-1abebbb944ad // indirect github.com/streamingfast/dtracing v0.0.0-20220305214756-b5c0e8699839 // indirect diff --git a/go.sum b/go.sum index fb2c7845..fafceff0 100644 --- a/go.sum +++ b/go.sum @@ -112,6 +112,8 @@ github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUM github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/DataDog/zstd v1.5.5 h1:oWf5W7GtOLgp6bciQYDmhHHjdhYkALu6S/5Ni9ZgSvQ= +github.com/DataDog/zstd v1.5.5/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v0.32.3 h1:fiyErF/p5fz79DvMCca9ayvYiWYsFP1oJbskt9fjo8I= github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v0.32.3/go.mod h1:s7Gpwj0tk7XnVCm4BQEmx/mbS36SuTCY/vMB2SNxe8o= github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v1.8.6/go.mod h1:NE8dAwJ1VU4WFdJYTlO0tdobQFdy70z8wNDU1L3VAr4= @@ -172,8 +174,9 @@ github.com/bits-and-blooms/bitset v1.2.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edY github.com/bits-and-blooms/bitset v1.3.1 h1:y+qrlmq3XsWi+xZqSaueaE8ry8Y127iMxlMfqcK8p0g= github.com/bits-and-blooms/bitset v1.3.1/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA= github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqOes/6LfM= -github.com/blendle/zapdriver v1.3.1 h1:C3dydBOWYRiOk+B8X9IVZ5IOe+7cl+tGOexN4QqHfpE= github.com/blendle/zapdriver v1.3.1/go.mod h1:mdXfREi6u5MArG4j9fewC+FGnXaBR+T4Ox4J2u4eHCc= +github.com/blendle/zapdriver v1.3.2-0.20200203083823-9200777f8a3d h1:fSlGu5ePbkjBidXuj2O5j9EcYrVB5Cr6/wdkYyDgxZk= +github.com/blendle/zapdriver v1.3.2-0.20200203083823-9200777f8a3d/go.mod h1:yCBkgASmKHgUOFjK9h1sOytUVgA+JkQjqj3xYP4AdWY= github.com/bobg/go-generics/v2 v2.1.1 h1:4rN9upY6Xm4TASSMeH+NzUghgO4h/SbNrQphIjRd/R0= github.com/bobg/go-generics/v2 v2.1.1/go.mod h1:iPMSRVFlzkJSYOCXQ0n92RA3Vxw0RBv2E8j9ZODXgHk= github.com/btcsuite/btcd/btcec/v2 v2.2.0 h1:fzn1qaOt32TuLjFlkzYSsBC35Q3KUjT1SwPxiMSCF5k= @@ -478,7 +481,6 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= -github.com/inconshreveable/mousetrap v1.0.1/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/ipfs/boxo v0.8.0 h1:UdjAJmHzQHo/j3g3b1bAcAXCj/GM6iTwvSlBDvPBNBs= @@ -574,8 +576,8 @@ github.com/mattn/go-ieproxy v0.0.1 h1:qiyop7gCflfhwCzGyeT0gro3sF9AIg9HU98JORTkqf github.com/mattn/go-ieproxy v0.0.1/go.mod h1:pYabZ6IHcRpFh7vIaLfK7rdcWgFEb3SFJ6/gNWuh88E= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= -github.com/mattn/go-sqlite3 v1.14.12 h1:TJ1bhYJPV44phC+IMu1u2K/i5RriLTPe+yc68XDJ1Z0= -github.com/mattn/go-sqlite3 v1.14.12/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= +github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y= +github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= @@ -741,8 +743,8 @@ github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkU github.com/spf13/cast v1.5.0 h1:rj3WzYc11XZaIZMPKmwP96zkFEnnAmV8s6XbB2aY32w= github.com/spf13/cast v1.5.0/go.mod h1:SpXXQ5YoyJw6s3/6cMTQuxvgRl3PCJiyaX9p6b155UU= github.com/spf13/cobra v1.4.0/go.mod h1:Wo4iy3BUC+X2Fybo0PDqwJIv3dNRiZLHQymsfxlB84g= -github.com/spf13/cobra v1.6.1 h1:o94oiPyS4KD1mPy2fmcYYHHfCxLqYjJOhGsCHFZtEzA= -github.com/spf13/cobra v1.6.1/go.mod h1:IOw/AERYS7UzyrGinqmz6HLUo219MORXGxhbaJUqzrY= +github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I= +github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0= github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmqnqMYADFk= github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0EXowPYD95IqWIGo= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= @@ -830,8 +832,6 @@ github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0 h1:Y15G1 github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0/go.mod h1:/Rnz2TJvaShjUct0scZ9kKV2Jr9/+KBAoWy4UMYxgv4= github.com/streamingfast/substreams v1.1.20 h1:61k/HKti9xo7vDAu5zew/VL8qzY+ye/9Zzt1om+tVks= github.com/streamingfast/substreams v1.1.20/go.mod h1:Ak7a+EM8MRehep0ZaQD1NwG27ZE9auZY9+/VLbhBnDU= -github.com/streamingfast/substreams v1.1.12-0.20230825185616-0ba49a2ee9ee h1:Q78BDGVmulzXgPoovmhLCUcxc98RFxYhifoukQflWHk= -github.com/streamingfast/substreams v1.1.12-0.20230825185616-0ba49a2ee9ee/go.mod h1:Thcw2blrOD8uwRMGKhjD4BXwy1Fp9UADxIRHRme+raQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= diff --git a/tools/blocks.go b/tools/blocks.go deleted file mode 100644 index a81b6747..00000000 --- a/tools/blocks.go +++ /dev/null @@ -1,431 +0,0 @@ -// Copyright 2021 dfuse Platform Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tools - -import ( - "context" - "errors" - "fmt" - "io" - "os" - "regexp" - "strconv" - - "github.com/klauspost/compress/zstd" - "github.com/spf13/cobra" - "github.com/streamingfast/bstream" - "github.com/streamingfast/cli" - "github.com/streamingfast/dstore" - "github.com/streamingfast/eth-go" - pbeth "github.com/streamingfast/firehose-ethereum/types/pb/sf/ethereum/type/v2" - "github.com/streamingfast/jsonpb" - "go.uber.org/zap" -) - -var printCmd = &cobra.Command{ - Use: "print", - Short: "Prints of one block or merged blocks file", -} - -var oneBlockCmd = &cobra.Command{ - Use: "one-block ", - Short: "Prints a block from a one-block file", - Args: cobra.ExactArgs(1), - RunE: printOneBlockE, -} - -var blocksCmd = &cobra.Command{ - Use: "blocks ", - Short: "Prints the content summary of a merged block file", - Args: cobra.ExactArgs(1), - RunE: printBlocksE, -} - -var blockCmd = &cobra.Command{ - Use: "block ", - Short: "Finds and prints one block from a merged block file", - Args: cobra.ExactArgs(1), - RunE: printBlockE, -} - -func init() { - Cmd.AddCommand(printCmd) - - printCmd.AddCommand(oneBlockCmd) - - printCmd.AddCommand(blocksCmd) - blocksCmd.PersistentFlags().Bool("transactions", false, "Include transaction IDs in output") - - printCmd.AddCommand(blockCmd) - blockCmd.Flags().String("transaction", "", "Filters transaction by this hash") - - printCmd.PersistentFlags().Uint64("transactions-for-block", 0, "Include transaction IDs in output") - printCmd.PersistentFlags().Bool("transactions", false, "Include transaction IDs in output") - printCmd.PersistentFlags().Bool("calls", false, "Include transaction's Call data in output") - printCmd.PersistentFlags().Bool("full", false, "print the fullblock instead of just header/ID") - printCmd.PersistentFlags().String("store", "", "block store") -} - -var integerRegex = regexp.MustCompile("^[0-9]+$") - -func printBlocksE(cmd *cobra.Command, args []string) error { - var identifier string - var reader io.Reader - var closer func() error - - if integerRegex.MatchString(args[0]) { - blockNum, err := strconv.ParseUint(args[0], 10, 64) - if err != nil { - return fmt.Errorf("unable to parse block number %q: %w", args[0], err) - } - - str := mustGetString(cmd, "store") - - store, err := dstore.NewDBinStore(str) - if err != nil { - return fmt.Errorf("unable to create store at path %q: %w", store, err) - } - - identifier = fmt.Sprintf("%010d", blockNum) - gsReader, err := store.OpenObject(context.Background(), identifier) - if err != nil { - fmt.Printf("❌ Unable to read blocks filename %q: %s\n", identifier, err) - return err - } - - reader = gsReader - closer = gsReader.Close - } else { - identifier = args[0] - - // Check if it's a file and if it exists - if !cli.FileExists(identifier) { - fmt.Printf("❌ File %q does not exist\n", identifier) - return os.ErrNotExist - } - - file, err := os.Open(identifier) - if err != nil { - fmt.Printf("❌ Unable to read blocks filename %q: %s\n", identifier, err) - return err - } - reader = file - closer = file.Close - } - - defer closer() - - return printBlocks(identifier, reader, mustGetBool(cmd, "transactions")) -} - -func printBlocks(inputIdentifier string, reader io.Reader, printTransactions bool) error { - readerFactory, err := bstream.GetBlockReaderFactory.New(reader) - if err != nil { - fmt.Printf("❌ Unable to read blocks %s: %s\n", inputIdentifier, err) - return err - } - - seenBlockCount := 0 - for { - block, err := readerFactory.Read() - if err != nil { - if err == io.EOF { - fmt.Printf("Total blocks: %d\n", seenBlockCount) - return nil - } - return fmt.Errorf("error receiving blocks: %w", err) - } - - seenBlockCount++ - - //payloadSize, err := len(block.Payload.Get()) //disabled after rework - ethBlock := block.ToProtocol().(*pbeth.Block) - - fmt.Printf("Block #%d (%s) (prev: %s, lib: %d): %d transactions, %d balance changes\n", - block.Num(), - block.ID()[0:7], - block.PreviousID()[0:7], - block.LibNum, - len(ethBlock.TransactionTraces), - len(ethBlock.BalanceChanges), - ) - if printTransactions { - fmt.Println("- Transactions: ") - for _, t := range ethBlock.TransactionTraces { - fmt.Println(" * ", t.Hash) - } - fmt.Println() - } - } -} - -func printBlockE(cmd *cobra.Command, args []string) error { - printTransactions := mustGetBool(cmd, "transactions") - printCall := mustGetBool(cmd, "calls") - printFull := mustGetBool(cmd, "full") - transactionFilter := mustGetString(cmd, "transaction") - - zlog.Info("printing block", - zap.Bool("print_transactions", printTransactions), - zap.String("transaction_filter", transactionFilter), - ) - - blockNum, err := strconv.ParseUint(args[0], 10, 64) - if err != nil { - return fmt.Errorf("unable to parse block number %q: %w", args[0], err) - } - - str := mustGetString(cmd, "store") - - store, err := dstore.NewDBinStore(str) - if err != nil { - return fmt.Errorf("unable to create store at path %q: %w", store, err) - } - - mergedBlockNum := blockNum - (blockNum % 100) - zlog.Info("finding merged block file", - zap.Uint64("merged_block_num", mergedBlockNum), - zap.Uint64("block_num", blockNum), - ) - - filename := fmt.Sprintf("%010d", mergedBlockNum) - reader, err := store.OpenObject(context.Background(), filename) - if err != nil { - fmt.Printf("❌ Unable to read blocks filename %s: %s\n", filename, err) - return err - } - defer reader.Close() - - readerFactory, err := bstream.GetBlockReaderFactory.New(reader) - if err != nil { - fmt.Printf("❌ Unable to read blocks filename %s: %s\n", filename, err) - return err - } - - for { - block, err := readerFactory.Read() - if err != nil { - if err == io.EOF { - return nil - } - return fmt.Errorf("error reading blocks: %w", err) - } - - if block.Number != blockNum { - zlog.Debug("skipping block", - zap.Uint64("desired_block_num", blockNum), - zap.Uint64("block_num", block.Number), - ) - continue - } - ethBlock := block.ToProtocol().(*pbeth.Block) - - if printFull { - jsonPayload, _ := jsonpb.MarshalIndentToString(ethBlock, " ") - fmt.Println(jsonPayload) - continue - } - - fmt.Printf("Block #%d (%s) (prev: %s, lib: %d): %d transactions, %d balance changes\n", - block.Num(), - block.ID()[0:7], - block.PreviousID()[0:7], - block.LibNum, - len(ethBlock.TransactionTraces), - len(ethBlock.BalanceChanges), - ) - if printTransactions { - fmt.Println("- Transactions: ") - for _, t := range ethBlock.TransactionTraces { - hash := eth.Hash(t.Hash) - if transactionFilter != "" { - if transactionFilter != hash.String() && transactionFilter != hash.Pretty() { - continue - } - } - printTrx(t, printCall) - } - - } - - continue - } -} - -func getOneBlock(path string) (*pbeth.Block, error) { - - // Check if it's a file and if it exists - if !cli.FileExists(path) { - return nil, os.ErrNotExist - } - - file, err := os.Open(path) - if err != nil { - return nil, err - } - - uncompressedReader, err := zstd.NewReader(file) - if err != nil { - return nil, fmt.Errorf("new zstd reader: %w", err) - } - defer uncompressedReader.Close() - - readerFactory, err := bstream.GetBlockReaderFactory.New(uncompressedReader) - if err != nil { - return nil, fmt.Errorf("new block reader: %w", err) - } - - block, err := readerFactory.Read() - if err != nil { - return nil, fmt.Errorf("reading block: %w", err) - } - - return block.ToProtocol().(*pbeth.Block), nil - -} - -func printOneBlockE(cmd *cobra.Command, args []string) error { - if integerRegex.MatchString(args[0]) { - blockNum, err := strconv.ParseUint(args[0], 10, 64) - if err != nil { - return fmt.Errorf("unable to parse block number %q: %w", args[0], err) - } - - return printOneBlockFromStore(cmd.Context(), blockNum, mustGetString(cmd, "store"), mustGetBool(cmd, "transactions")) - } - - path := args[0] - - // Check if it's a file and if it exists - if !cli.FileExists(path) { - fmt.Printf("❌ File %q does not exist\n", path) - return os.ErrNotExist - } - - file, err := os.Open(path) - if err != nil { - fmt.Printf("❌ Unable to open file %q: %s\n", path, err) - return err - } - - uncompressedReader, err := zstd.NewReader(file) - if err != nil { - return fmt.Errorf("new zstd reader: %w", err) - } - defer uncompressedReader.Close() - - if err := printBlockFromReader(path, uncompressedReader); err != nil { - if errors.Is(err, io.EOF) { - fmt.Printf("❌ One block file is empty %q: %s\n", path, err) - return err - } - - fmt.Printf("❌ Unable to print one-block file %s: %s\n", path, err) - return err - } - - return nil -} - -func printOneBlockFromStore(ctx context.Context, blockNum uint64, storeDSN string, printTransactions bool) error { - store, err := dstore.NewDBinStore(storeDSN) - if err != nil { - return fmt.Errorf("unable to create store at path %q: %w", store, err) - } - - var files []string - filePrefix := fmt.Sprintf("%010d", blockNum) - err = store.Walk(ctx, filePrefix, func(filename string) (err error) { - files = append(files, filename) - return nil - }) - if err != nil { - return fmt.Errorf("unable to find on block files: %w", err) - } - - for _, filepath := range files { - reader, err := store.OpenObject(ctx, filepath) - if err != nil { - fmt.Printf("❌ Unable to open one-block file %s: %s\n", filepath, err) - return err - } - defer reader.Close() - - if err := printBlockFromReader(filepath, reader); err != nil { - if errors.Is(err, io.EOF) { - break - } - - return err - } - } - return nil -} - -func printBlockFromReader(identifier string, reader io.Reader) error { - readerFactory, err := bstream.GetBlockReaderFactory.New(reader) - if err != nil { - return fmt.Errorf("new block reader: %w", err) - } - - block, err := readerFactory.Read() - if err != nil { - return fmt.Errorf("reading block: %w", err) - } - - return printBlock(block) -} - -func printBlock(block *bstream.Block) error { - nativeBlock := block.ToProtocol().(*pbeth.Block) - - data, err := jsonpb.MarshalIndentToString(nativeBlock, " ") - if err != nil { - return fmt.Errorf("json marshall: %w", err) - } - - fmt.Println(string(data)) - - return nil -} - -func printTrx(trx *pbeth.TransactionTrace, withCall bool) { - hash := eth.Hash(trx.Hash) - - fmt.Printf(" * %s\n", hash.Pretty()) - if withCall { - for _, call := range trx.Calls { - - str := "" - //if len(call.Input) > 8 { - // str = hex.EncodeToString(call.Input[0:8]) - //} else { - // str = hex.EncodeToString(call.Input) - //} - - fmt.Printf(" -> Call: %d , input: %s (parent: %d, depth: %d, Statusfailed: %v, StatusReverted: %v, FailureReason: %s, StateReverted: %v)\n", - call.Index, - str, - call.ParentIndex, - call.Depth, - call.StatusFailed, - call.StatusReverted, - call.FailureReason, - call.StateReverted, - ) - } - } - -} diff --git a/tools/check.go b/tools/check.go deleted file mode 100644 index d2f7ab3c..00000000 --- a/tools/check.go +++ /dev/null @@ -1,89 +0,0 @@ -// Copyright 2021 dfuse Platform Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tools - -import ( - "fmt" - - "github.com/spf13/cobra" - "github.com/streamingfast/bstream" - pbeth "github.com/streamingfast/firehose-ethereum/types/pb/sf/ethereum/type/v2" - sftools "github.com/streamingfast/sf-tools" -) - -// CmdCheck is used in sf-ethereum-priv where additional checks are added. -var CheckCmd = &cobra.Command{Use: "check", Short: "Various checks for deployment, data integrity & debugging"} - -var checkMergedBlocksCmd = &cobra.Command{ - // TODO: Not sure, it's now a required thing, but we could probably use the same logic as `start` - // and avoid altogether passing the args. If this would also load the config and everything else, - // that would be much more seamless! - Use: "merged-blocks ", - Short: "Checks for any holes in merged blocks as well as ensuring merged blocks integrity", - Args: cobra.ExactArgs(1), - RunE: checkMergedBlocksE, - Example: ExamplePrefixed("fireeth tools check merged-blocks", ` - "./sf-data/storage/merged-blocks" - "gs://// -s" - "s3://// -f" - "az://// -r \"10 000 - 1 000 000" - `), -} - -func init() { - Cmd.AddCommand(CheckCmd) - CheckCmd.AddCommand(checkMergedBlocksCmd) - - CheckCmd.PersistentFlags().StringP("range", "r", "", "Block range to use for the check") - - checkMergedBlocksCmd.Flags().BoolP("print-stats", "s", false, "Natively decode each block in the segment and print statistics about it, ensuring it contains the required blocks") - checkMergedBlocksCmd.Flags().BoolP("print-full", "f", false, "Natively decode each block and print the full JSON representation of the block, should be used with a small range only if you don't want to be overwhelmed") -} - -func checkMergedBlocksE(cmd *cobra.Command, args []string) error { - storeURL := args[0] - fileBlockSize := uint32(100) - - blockRange, err := sftools.Flags.GetBlockRange("range") - if err != nil { - return err - } - - printDetails := sftools.PrintNoDetails - if mustGetBool(cmd, "print-stats") { - printDetails = sftools.PrintStats - } - - if mustGetBool(cmd, "print-full") { - printDetails = sftools.PrintFull - } - - return sftools.CheckMergedBlocks(cmd.Context(), zlog, storeURL, fileBlockSize, blockRange, blockPrinter, printDetails) -} - -func blockPrinter(block *bstream.Block) { - ethBlock := block.ToProtocol().(*pbeth.Block) - - callCount := 0 - for _, trxTrace := range ethBlock.TransactionTraces { - callCount += len(trxTrace.Calls) - } - - fmt.Printf("Block %s %d transactions, %d calls\n", - block, - len(ethBlock.TransactionTraces), - callCount, - ) -} diff --git a/tools/compare-rpc-oneblock.go b/tools/compare-rpc-oneblock.go index 1590acd1..a86f6726 100644 --- a/tools/compare-rpc-oneblock.go +++ b/tools/compare-rpc-oneblock.go @@ -16,10 +16,14 @@ package tools import ( "fmt" + "os" + "github.com/DataDog/zstd" "github.com/spf13/cobra" + "github.com/streamingfast/bstream" "github.com/streamingfast/cli" "github.com/streamingfast/eth-go/rpc" + pbeth "github.com/streamingfast/firehose-ethereum/types/pb/sf/ethereum/type/v2" ) var compareOneblockRPCCmd = &cobra.Command{ @@ -73,3 +77,30 @@ func compareOneblockRPCE(cmd *cobra.Command, args []string) error { } return nil } + +func getOneBlock(path string) (*pbeth.Block, error) { + // Check if it's a file and if it exists + if !cli.FileExists(path) { + return nil, os.ErrNotExist + } + + file, err := os.Open(path) + if err != nil { + return nil, err + } + + uncompressedReader := zstd.NewReader(file) + defer uncompressedReader.Close() + + readerFactory, err := bstream.GetBlockReaderFactory.New(uncompressedReader) + if err != nil { + return nil, fmt.Errorf("new block reader: %w", err) + } + + block, err := readerFactory.Read() + if err != nil { + return nil, fmt.Errorf("reading block: %w", err) + } + + return block.ToProtocol().(*pbeth.Block), nil +} diff --git a/tools/compare-rpc.go b/tools/compare-rpc.go index 70d9a06b..5dc56edd 100644 --- a/tools/compare-rpc.go +++ b/tools/compare-rpc.go @@ -26,15 +26,18 @@ import ( jd "github.com/josephburnett/jd/lib" "github.com/mostynb/go-grpc-compression/zstd" "github.com/spf13/cobra" + "github.com/streamingfast/bstream" "github.com/streamingfast/cli" "github.com/streamingfast/eth-go" "github.com/streamingfast/eth-go/rpc" + "github.com/streamingfast/firehose-ethereum/types" pbeth "github.com/streamingfast/firehose-ethereum/types/pb/sf/ethereum/type/v2" "github.com/streamingfast/firehose/client" pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" timestamppb "google.golang.org/protobuf/types/known/timestamppb" ) @@ -420,3 +423,15 @@ func CompareFirehoseToRPC(fhBlock *pbeth.Block, rpcBlock *rpc.Block, logs []*rpc } return true, nil } + +func decodeAnyPB(in *anypb.Any) (*bstream.Block, error) { + block := &pbeth.Block{} + if err := anypb.UnmarshalTo(in, block, proto.UnmarshalOptions{}); err != nil { + return nil, fmt.Errorf("unmarshal anypb: %w", err) + } + + // We are downloading only final blocks from the Firehose connection which means the LIB for them + // can be set to themself (althought we use `- 1` to ensure problem would occur if codde don't like + // `LIBNum == self.BlockNum`). + return types.BlockFromProto(block, block.Number-1) +} diff --git a/tools/firehose-client.go b/tools/firehose-client.go deleted file mode 100644 index 1cd2dad4..00000000 --- a/tools/firehose-client.go +++ /dev/null @@ -1,164 +0,0 @@ -package tools - -import ( - "fmt" - "strings" - - "github.com/spf13/cobra" - "github.com/streamingfast/eth-go" - pbtransform "github.com/streamingfast/firehose-ethereum/types/pb/sf/ethereum/transform/v1" - sftools "github.com/streamingfast/sf-tools" - "google.golang.org/protobuf/types/known/anypb" -) - -func init() { - firehoseClientCmd := sftools.GetFirehoseClientCmd(zlog, tracer, transformsSetter) - firehoseClientCmd.Flags().Bool("header-only", false, "Apply the HeaderOnly transform sending back Block's header only (with few top-level fields), exclusive option") - firehoseClientCmd.Flags().String("call-filters", "", "call filters (format: '[address1[+address2[+...]]]:[eventsig1[+eventsig2[+...]]]") - firehoseClientCmd.Flags().String("log-filters", "", "log filters (format: '[address1[+address2[+...]]]:[eventsig1[+eventsig2[+...]]]") - firehoseClientCmd.Flags().Bool("send-all-block-headers", false, "ask for all the blocks to be sent (header-only if there is no match)") - Cmd.AddCommand(firehoseClientCmd) - - firehoseSingleBlockClientCmd := sftools.GetFirehoseSingleBlockClientCmd(zlog, tracer) - Cmd.AddCommand(firehoseSingleBlockClientCmd) -} - -var transformsSetter = func(cmd *cobra.Command) (transforms []*anypb.Any, err error) { - filters, err := parseFilters(mustGetString(cmd, "call-filters"), mustGetString(cmd, "log-filters"), mustGetBool(cmd, "send-all-block-headers")) - if err != nil { - return nil, err - } - - headerOnly := mustGetBool(cmd, "header-only") - if filters != nil && headerOnly { - return nil, fmt.Errorf("'header-only' flag is exclusive with 'call-filters', 'log-filters' and 'send-all-block-headers' choose either 'header-only' or a combination of the others") - } - - if headerOnly { - t, err := anypb.New(&pbtransform.HeaderOnly{}) - if err != nil { - return nil, err - } - - return []*anypb.Any{t}, nil - } - - if filters != nil { - t, err := anypb.New(filters) - if err != nil { - return nil, err - } - - return []*anypb.Any{t}, nil - } - - return -} - -func parseFilters(callFilters, logFilters string, sendAllBlockHeaders bool) (*pbtransform.CombinedFilter, error) { - mf := &pbtransform.CombinedFilter{} - - if callFilters == "" && logFilters == "" && !sendAllBlockHeaders { - return nil, nil - } - if callFilters != "" { - for _, filter := range strings.Split(callFilters, ",") { - if filter == "" { - continue - } - parts := strings.Split(filter, ":") - if len(parts) != 2 { - return nil, fmt.Errorf("option --call-filters must be of type address_hash+address_hash+address_hash:event_sig_hash+event_sig_hash (repeated, separated by comma)") - } - var addrs []eth.Address - for _, a := range strings.Split(parts[0], "+") { - if a != "" { - addr := eth.MustNewAddressLoose(a) - addrs = append(addrs, addr) - } - } - var sigs []eth.Hash - for _, s := range strings.Split(parts[1], "+") { - if s != "" { - sig := eth.MustNewHash(s) - sigs = append(sigs, sig) - } - } - - mf.CallFilters = append(mf.CallFilters, basicCallToFilter(addrs, sigs)) - } - } - - if logFilters != "" { - for _, filter := range strings.Split(logFilters, ",") { - if filter == "" { - continue - } - parts := strings.Split(filter, ":") - if len(parts) != 2 { - return nil, fmt.Errorf("option --log-filters must be of type address_hash+address_hash+address_hash:event_sig_hash+event_sig_hash (repeated, separated by comma)") - } - var addrs []eth.Address - for _, a := range strings.Split(parts[0], "+") { - if a != "" { - addr := eth.MustNewAddress(a) - addrs = append(addrs, addr) - } - } - var sigs []eth.Hash - for _, s := range strings.Split(parts[1], "+") { - if s != "" { - sig := eth.MustNewHash(s) - sigs = append(sigs, sig) - } - } - - mf.LogFilters = append(mf.LogFilters, basicLogFilter(addrs, sigs)) - } - } - - if sendAllBlockHeaders { - mf.SendAllBlockHeaders = true - } - return mf, nil -} - -func basicCallToFilter(addrs []eth.Address, sigs []eth.Hash) *pbtransform.CallToFilter { - var addrBytes [][]byte - var sigsBytes [][]byte - - for _, addr := range addrs { - b := addr.Bytes() - addrBytes = append(addrBytes, b) - } - - for _, sig := range sigs { - b := sig.Bytes() - sigsBytes = append(sigsBytes, b) - } - - return &pbtransform.CallToFilter{ - Addresses: addrBytes, - Signatures: sigsBytes, - } -} - -func basicLogFilter(addrs []eth.Address, sigs []eth.Hash) *pbtransform.LogFilter { - var addrBytes [][]byte - var sigsBytes [][]byte - - for _, addr := range addrs { - b := addr.Bytes() - addrBytes = append(addrBytes, b) - } - - for _, sig := range sigs { - b := sig.Bytes() - sigsBytes = append(sigsBytes, b) - } - - return &pbtransform.LogFilter{ - Addresses: addrBytes, - EventSignatures: sigsBytes, - } -} diff --git a/tools/fix-polygon-index.go b/tools/fix-polygon-index.go index e5eb8069..8985aadd 100644 --- a/tools/fix-polygon-index.go +++ b/tools/fix-polygon-index.go @@ -4,14 +4,14 @@ import ( "context" "fmt" "io" - - "go.uber.org/zap" + "strconv" "github.com/spf13/cobra" "github.com/streamingfast/bstream" "github.com/streamingfast/dstore" "github.com/streamingfast/firehose-ethereum/types" pbeth "github.com/streamingfast/firehose-ethereum/types/pb/sf/ethereum/type/v2" + "go.uber.org/zap" ) var fixPolygonIndexCmd = &cobra.Command{ @@ -120,7 +120,7 @@ func fixPolygonIndexE(cmd *cobra.Command, args []string) error { func writeMergedBlocks(lowBlockNum uint64, store dstore.Store, blocks []*bstream.Block) error { file := filename(lowBlockNum) - fmt.Printf("writing merged file %s.dbin.zst\n", file) + fmt.Printf("writing merged file %s.dbin.zst\n", file) if len(blocks) == 0 { return fmt.Errorf("no blocks to write to bundle") @@ -142,7 +142,7 @@ func writeMergedBlocks(lowBlockNum uint64, store dstore.Store, blocks []*bstream for _, blk := range blocks { err = blockWriter.Write(blk) if err != nil { - return + return } } }() @@ -153,3 +153,12 @@ func writeMergedBlocks(lowBlockNum uint64, store dstore.Store, blocks []*bstream func filename(num uint64) string { return fmt.Sprintf("%010d", num) } + +func mustParseUint64(in string) uint64 { + out, err := strconv.ParseUint(in, 0, 64) + if err != nil { + panic(fmt.Errorf("unable to parse %q as uint64: %w", in, err)) + } + + return out +} diff --git a/tools/unmerge.go b/tools/unmerge.go deleted file mode 100644 index 88579db9..00000000 --- a/tools/unmerge.go +++ /dev/null @@ -1,143 +0,0 @@ -package tools - -import ( - "fmt" - "io" - "strconv" - - "go.uber.org/zap" - - "github.com/spf13/cobra" - "github.com/streamingfast/bstream" - "github.com/streamingfast/dstore" -) - -var unmergeBlocksCmd = &cobra.Command{ - Use: "unmerge ", - Short: "unmerges merged block files into one-block-files", - Args: cobra.ExactArgs(4), - RunE: unmergeBlocksE, -} - -func init() { - Cmd.AddCommand(unmergeBlocksCmd) -} - -func mustParseUint64(s string) uint64 { - i, err := strconv.Atoi(s) - if err != nil { - panic(err) - } - return uint64(i) -} - -func unmergeBlocksE(cmd *cobra.Command, args []string) error { - ctx := cmd.Context() - - srcStore, err := dstore.NewDBinStore(args[0]) - if err != nil { - return fmt.Errorf("unable to create source store: %w", err) - } - - destStore, err := dstore.NewDBinStore(args[1]) - if err != nil { - return fmt.Errorf("unable to create destination store: %w", err) - } - - start := mustParseUint64(args[2]) - stop := mustParseUint64(args[3]) - - if stop <= start { - return fmt.Errorf("stop block must be greater than start block") - } - - startWalkFrom := fmt.Sprintf("%010d", start-(start%100)) - err = srcStore.WalkFrom(ctx, "", startWalkFrom, func(filename string) error { - zlog.Debug("checking merged block file", zap.String("filename", filename)) - - startBlock := mustParseUint64(filename) - - if startBlock > stop { - zlog.Debug("skipping merged block file", zap.String("reason", "past stop block"), zap.String("filename", filename)) - return io.EOF - } - - if startBlock+100 < start { - zlog.Debug("skipping merged block file", zap.String("reason", "before start block"), zap.String("filename", filename)) - return nil - } - - rc, err := srcStore.OpenObject(ctx, filename) - if err != nil { - return fmt.Errorf("failed to open %s: %w", filename, err) - } - defer rc.Close() - - br, err := bstream.GetBlockReaderFactory.New(rc) - if err != nil { - return fmt.Errorf("creating block reader: %w", err) - } - - // iterate through the blocks in the file - for { - block, err := br.Read() - if err == io.EOF { - break - } - - if block.Number < start { - continue - } - - if block.Number > stop { - break - } - - oneblockFilename := bstream.BlockFileNameWithSuffix(block, "extracted") - zlog.Debug("writing block", zap.Uint64("block_num", block.Number), zap.String("filename", oneblockFilename)) - - pr, pw := io.Pipe() - - //write block data to pipe, and then close to signal end of data - go func(block *bstream.Block) { - var err error - defer func() { - pw.CloseWithError(err) - }() - - var bw bstream.BlockWriter - bw, err = bstream.GetBlockWriterFactory.New(pw) - if err != nil { - zlog.Error("creating block writer", zap.Error(err)) - return - } - - err = bw.Write(block) - if err != nil { - zlog.Error("writing block", zap.Error(err)) - return - } - }(block) - - //read block data from pipe and write block data to dest store - err = destStore.WriteObject(ctx, oneblockFilename, pr) - if err != nil { - return fmt.Errorf("writing block %d to %s: %w", block.Number, oneblockFilename, err) - } - - zlog.Info("wrote block", zap.Uint64("block_num", block.Number), zap.String("filename", oneblockFilename)) - } - - return nil - }) - - if err == io.EOF { - return nil - } - - if err != nil { - return err - } - - return nil -}