diff --git a/.gitignore b/.gitignore index c82988c6..6473b7e1 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ .release_notes.md *.spkg /sf-data +/firehose-data /sf.yaml /fireeth /dist diff --git a/CHANGELOG.md b/CHANGELOG.md index ac9208f7..414488d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,15 +10,24 @@ for instructions to keep up to date. This releases refactor `firehose-ethereum` repository to use the common shared Firehose Core library (https://github.com/streamingfast/firehose-core) that every single Firehose supported chain should use and follow. -At the data level and gRPC level, there is no changes in behavior to all core components which are `reader-node`, `merger`, `relayer`, `firehose`, `substreams-tier1` and `substreams-tier2`. +Both at the data level and gRPC level, there is no changes in behavior to all core components which are `reader-node`, `merger`, `relayer`, `firehose`, `substreams-tier1` and `substreams-tier2`. A lot of changes happened at the operators level however and some superflous mode have been removed, especially around the `reader-node` application. The full changes is listed below, operators should review thoroughly the changelog. -> **Important** It's important to emphasis that at the data level, nothing changed, so reverting to 1.4.12 in case of a problem is quite easy and no special data migration is required. +> [!IMPORTANT] +> It's important to emphasis that at the data level, nothing changed, so reverting to 1.4.19 in case of a problem is quite easy and no special data migration is required outside of changing back to the old set of flags that was used before. #### Operators -You will find below the detailed upgrade procedure for the configuration file providers usually use. If you are using the flags based approach, simply update the corresponding flags. +You will find below the detailed upgrade procedure for the configuration file operators usually use. If you are using the flags based approach, simply update the corresponding flags. + +#### Common Changes + +* The `{sf-data-dir}` templating argument used in various flags to resolve to the `--data-dir=` value has been deprecated and should now be simply `{data-dir}`. The older replacement is still going to work but you should replace any occurrences of `{sf-data-dir}` in your flag definition by `{data-dir}`. + +* The default value for `common-blocks-cache-dir` changed from `{sf-data-dir}/blocks-cache` to `file://{data-dir}/storage/blocks-cache`. If you didn't had this flag defined and you had `common-blocks-cache-enabled: true`, you should define `common-blocks-cache-dir: file://{data-dir}/blocks-cache`. + +* The default value for `common-live-blocks-addr` changed from `:15011` to `:10014`. If you didn't had this flag defined and wish to keep the old default, define `common-live-blocks-addr: 15011` and ensure you also modify `relayer-grpc-listen-addr: :15011` (see next entry for details). #### App `reader-node` changes @@ -37,7 +46,20 @@ Before this release, the `reader-node` app was managing for you a portion of the - `--http.vhosts=* ` - `--firehose-enabled` -We have now removed those magical additions and operators are now responsible of providing the flags they required to properly run a Firehose-enabled native `geth` node. The `+` sign that was used to append/override the flags has been removed also since no default additions is performed, the `+` was now useless. We also removed the following `fireeth` configuration value: +We have now removed those magical additions and operators are now responsible of providing the flags they required to properly run a Firehose-enabled native `geth` node. The `+` sign that was used to append/override the flags has been removed also since no default additions is performed, the `+` was now useless. To make some flag easier to define and avoid repetition, a few templating variable can be used within the `reader-node-arguments` value: + +- `{data-dir}` The current data-dir path defined by the config value `data-dir` +- `{node-data-dir}` The node data dir path defined by the flag `reader-node-data-dir` +- `{hostname}` The machine's hostname +- `{start-block-num}` The resolved start block number defined by the flag `reader-node-start-block-num` (can be overwritten) +- `{stop-block-num}` The stop block number defined by the flag `reader-node-stop-block-num` + +As an example, if you provide the config value `reader-node-data-dir=/var/geth` for example, then you could use `reader-node-arguments: --datadir={node-data-dir}` and that would resolve to `reader-node-arguments: --datadir=/var/geth` for you. + +> [!NOTE] +> The `reader-node-arguments` is a string that is parsed using Shell word splitting rules which means for example that double quotes are supported like `--datadir="/var/with space/path"` and the argument will be correctly accepted. We use https://github.com/kballard/go-shellquote as your parsing library. + +We also removed the following `reader-node` configuration value: - `reader-node-type` (No replacement needed, just remove it) - `reader-node-ipc-path` (If you were using that, define it manually using `geth` flag `--ipcpath=...`) @@ -45,6 +67,55 @@ We have now removed those magical additions and operators are now responsible of Default listening addresses changed also to be the same on all `firehose-<...>` project, meaning consistent ports across all chains for operators. The `reader-node-grpc-listen-addr` default listen address went from `:13010` to `:10010` and `reader-node-manager-api-addr` from `:13009` to `:10011`. If you have no occurrences of `13010` or `13009` in your config file or your scripts, there is nothing to do. Otherwise, feel free to adjust the default port to fit your needs, if you do change `reader-node-grpc-listen-addr`, ensure `--relayer-source` is also updated as by default it points to `:10010`. +Here an example of the required changes. + +Change: + +```yaml +start: + args: + - ... + - reader-node + - ... + flags: + ... + reader-node-bootstrap-data-url: ./reader/genesis.json + reader-node-enforce-peers: localhost:13041 + reader-node-arguments: +--firehose-genesis-file=./reader/genesis.json --authrpc.port=8552 + reader-node-log-to-zap: false + ... +``` + +To: + +```yaml +start: + args: + - ... + - reader-node + - ... + flags: + ... + reader-node-bootstrap-data-url: ./reader/genesis.json + reader-node-arguments: + --networkid=1515 + --datadir={node-data-dir} + --ipcpath={data-dir}/reader/ipc + --port=30305 + --http + --http.api=eth,net,web3 + --http.port=8547 + --http.addr=0.0.0.0 + --http.vhosts=* + --firehose-enabled + --firehose-genesis-file=./reader/genesis.json + --authrpc.port=8552 + ... +``` + +> [!NOTE] +> Adjust the `--networkid=1515` value to fit your targeted chain, see https://chainlist.org/ for a list of Ethereum chain and their `network-id` value. + #### App `node` removed In previous version of `firehose-ethereum`, it was possible to use the `node` app to launch managed "peering/backup/whatever" Ethereum node, this is not possible anymore. If you were using the `node` app previously, like in this config: @@ -68,7 +139,9 @@ We have completely drop support to concentrate on the core mission of Firehose w #### Rename of `combined-index-builder` to `index-builder` -The app has been renamed to simply `index-builder` and the flags has been completely renamed removing the prefix `combined-` in front of them. So change: +The app has been renamed to simply `index-builder` and the flags has been completely renamed removing the prefix `combined-` in front of them. + +Change: ```yaml start: @@ -85,7 +158,7 @@ start: ... ``` -To +To: ```yaml start: @@ -102,7 +175,8 @@ start: ... ``` -> **Note** Rename only configuration item you had previously defined, do not copy paste verbatim example aboe +> [!NOTE] +> Rename only configuration item you had previously defined, do not copy paste verbatim example above. * Removed support for `archive-node` app, if you were using this, please use a standard NEAR Archive node to do the same job. @@ -110,19 +184,18 @@ start: * String variable `{sf-data-dir}` which interpolates at runtime to Firehose data directory is now `{data-dir}`. If any of your parameter value has `{sf-data-dir}` in its value, change it to `{data-dir}`. - > **Note** This is an important change, forgetting to change it will change expected locations of data leading to errors or wrong data. + > [!NOTE] + > This is an important change, forgetting to change it will change expected locations of data leading to errors or wrong data. * The default value for `config-file` changed from `sf.yaml` to `firehose.yaml`. If you didn't had this flag defined and wish to keep the old default, define `config-file: sf.yaml`. * The default value for `data-dir` changed from `sf-data` to `firehose-data`. If you didn't had this flag defined before, you should either move `sf-data` to `firehose-data` or define `data-dir: sf-data`. - > **Note** This is an important change, forgetting to change it will change expected locations of data leading to errors or wrong data. + > [!NOTE] + > This is an important change, forgetting to change it will change expected locations of data leading to errors or wrong data. * The flag `verbose` has been renamed to `log-verbosity`. -* The default value for `common-blocks-cache-dir` changed from `{sf-data-dir}/blocks-cache` to `file://{data-dir}/storage/blocks-cache`. If you didn't had this flag defined and you had `common-blocks-cache-enabled: true`, you should define `common-blocks-cache-dir: {data-dir}/blocks-cache`. - -* The default value for `common-live-blocks-addr` changed from `:15011` to `:10014`. If you didn't had this flag defined and wish to keep the old default, define `common-live-blocks-addr: 15011` and ensure you also modify `relayer-grpc-listen-addr: :15011` (see next entry for details). * The default value for `relayer-grpc-listen-addr` changed from `:15011` to `:10014`. If you didn't had this flag defined and wish to keep the old default, define `relayer-grpc-listen-addr: 15011` and ensure you also modify `common-live-blocks-addr: :15011` (see previous entry for details). @@ -134,7 +207,8 @@ start: * The `reader-node-arguments` is not populated anymore with default `--home={node-data-dir} run` which means you must now specify those manually. The variables `{data-dir}`, `{node-data-dir}` and `{hostname}` are interpolated respectively to Firehose absolute `data-dir` value, to Firehose absolute `reader-node-data-dir` value and to current hostname. To upgrade, if you had no `reader-node-arguments` defined, you must now define `reader-node-arguments: --home="{node-data-dir}" run`, if you had a `+` in your `reader-node-arguments: +--some-flag`, you must now define it like `reader-node-arguments: --home="{node-data-dir}" --some-flag run`. - > **Note** This is an important change, forgetting to change it will change expected locations of data leading to errors or wrong data. + > [!NOTE] + > This is an important change, forgetting to change it will change expected locations of data leading to errors or wrong data. * The `reader-node-boot-nodes` flag has been removed entirely, if you have boot nodes to specify, specify them in `reader-node-arguments` using `--boot-nodes=...` instead. diff --git a/cmd/fireeth/main.go b/cmd/fireeth/main.go index 935a9c07..468e8112 100644 --- a/cmd/fireeth/main.go +++ b/cmd/fireeth/main.go @@ -1,13 +1,17 @@ package main import ( + // Forced imported to convey they fact that this is a required import (for its side-effect!) + _ "github.com/streamingfast/firehose-ethereum/types" + "github.com/spf13/cobra" + "github.com/spf13/pflag" firecore "github.com/streamingfast/firehose-core" "github.com/streamingfast/firehose-ethereum/codec" "github.com/streamingfast/firehose-ethereum/transform" + "github.com/streamingfast/firehose-ethereum/types" pbeth "github.com/streamingfast/firehose-ethereum/types/pb/sf/ethereum/type/v2" "github.com/streamingfast/logging" - "github.com/streamingfast/node-manager/mindreader" pbbstream "github.com/streamingfast/pbgo/sf/bstream/v1" "go.uber.org/zap" "google.golang.org/protobuf/reflect/protoreflect" @@ -18,57 +22,64 @@ func init() { } func main() { - firecore.Main(&firecore.Chain[*pbeth.Block]{ - ShortName: "eth", - LongName: "Ethereum", - ExecutableName: "geth", - FullyQualifiedModule: "github.com/streamingfast/firehose-ethereum", - Version: version, + firecore.Main(Chain) +} - Protocol: "ETH", - ProtocolVersion: 1, +var Chain = &firecore.Chain[*pbeth.Block]{ + ShortName: "eth", + LongName: "Ethereum", + ExecutableName: "geth", + FullyQualifiedModule: "github.com/streamingfast/firehose-ethereum", + Version: version, - BlockFactory: func() firecore.Block { return new(pbeth.Block) }, + // Ensure that if you ever modify test, modify also `types/init.go#init` so that the `bstream.InitGeneric` there fits us + Protocol: "ETH", + ProtocolVersion: 1, - BlockIndexerFactories: map[string]firecore.BlockIndexerFactory[*pbeth.Block]{ - transform.CombinedIndexerShortName: transform.NewEthCombinedIndexer, - }, + BlockFactory: func() firecore.Block { return new(pbeth.Block) }, + BlockAcceptedVersions: types.BlockAcceptedVersions, - BlockTransformerFactories: map[protoreflect.FullName]firecore.BlockTransformerFactory{ - transform.HeaderOnlyMessageName: transform.NewHeaderOnlyTransformFactory, - transform.CombinedFilterMessageName: transform.NewCombinedFilterTransformFactory, + BlockIndexerFactories: map[string]firecore.BlockIndexerFactory[*pbeth.Block]{ + transform.CombinedIndexerShortName: transform.NewEthCombinedIndexer, + }, - // Still needed? - transform.MultiCallToFilterMessageName: transform.NewMultiCallToFilterTransformFactory, - transform.MultiLogFilterMessageName: transform.NewMultiLogFilterTransformFactory, - }, + BlockTransformerFactories: map[protoreflect.FullName]firecore.BlockTransformerFactory{ + transform.HeaderOnlyMessageName: transform.NewHeaderOnlyTransformFactory, + transform.CombinedFilterMessageName: transform.NewCombinedFilterTransformFactory, - ConsoleReaderFactory: func(lines chan string, blockEncoder firecore.BlockEncoder, logger *zap.Logger, tracer logging.Tracer) (mindreader.ConsolerReader, error) { - // FIXME: This was hardcoded also in the previouse firehose-near version, Firehose will break if this is not available - // blockEncoder - return codec.NewConsoleReader(logger, lines) - }, + transform.MultiCallToFilterMessageName: transform.NewMultiCallToFilterTransformFactory, + transform.MultiLogFilterMessageName: transform.NewMultiLogFilterTransformFactory, + }, - // ReaderNodeBootstrapperFactory: newReaderNodeBootstrapper, + ConsoleReaderFactory: codec.NewConsoleReader, - Tools: &firecore.ToolsConfig[*pbeth.Block]{ - BlockPrinter: printBlock, + RegisterExtraStartFlags: func(flags *pflag.FlagSet) { + flags.String("reader-node-bootstrap-data-url", "", "URL (file or gs) to either a genesis.json file or a .tar.zst archive to decompress in the datadir. Only used when bootstrapping (no prior data)") + }, - RegisterExtraCmd: func(chain *firecore.Chain[*pbeth.Block], toolsCmd *cobra.Command, zlog *zap.Logger, tracer logging.Tracer) error { - // toolsCmd.AddCommand(newToolsGenerateNodeKeyCmd(chain)) - // toolsCmd.AddCommand(newToolsBackfillCmd(zlog)) + ReaderNodeBootstrapperFactory: newReaderNodeBootstrapper, - return nil - }, + Tools: &firecore.ToolsConfig[*pbeth.Block]{ + BlockPrinter: printBlock, + + RegisterExtraCmd: func(chain *firecore.Chain[*pbeth.Block], toolsCmd *cobra.Command, zlog *zap.Logger, tracer logging.Tracer) error { + // toolsCmd.AddCommand(newToolsGenerateNodeKeyCmd(chain)) + // toolsCmd.AddCommand(newToolsBackfillCmd(zlog)) - TransformFlags: map[string]*firecore.TransformFlag{ - // "receipt-account-filters": { - // Description: "Comma-separated accounts to use as filter/index. If it contains a colon (:), it will be interpreted as : (each of which can be empty, ex: 'hello:' or ':world')", - // Parser: parseReceiptAccountFilters, - // }, + return nil + }, + + TransformFlags: &firecore.TransformFlags{ + Register: func(flags *pflag.FlagSet) { + flags.Bool("header-only", false, "Apply the HeaderOnly transform sending back Block's header only (with few top-level fields), exclusive option") + flags.String("call-filters", "", "call filters (format: '[address1[+address2[+...]]]:[eventsig1[+eventsig2[+...]]]") + flags.String("log-filters", "", "log filters (format: '[address1[+address2[+...]]]:[eventsig1[+eventsig2[+...]]]") + flags.Bool("send-all-block-headers", false, "ask for all the blocks to be sent (header-only if there is no match)") }, + + Parse: parseTransformFlags, }, - }) + }, } // Version value, injected via go build `ldflags` at build time, **must** not be removed or inlined diff --git a/cmd/fireeth/main_test.go b/cmd/fireeth/main_test.go new file mode 100644 index 00000000..4fecc806 --- /dev/null +++ b/cmd/fireeth/main_test.go @@ -0,0 +1,37 @@ +package main + +import ( + "bytes" + "testing" + + "github.com/streamingfast/bstream" + pbeth "github.com/streamingfast/firehose-ethereum/types/pb/sf/ethereum/type/v2" + "github.com/test-go/testify/require" +) + +func Test_Encode_Decode_Block(t *testing.T) { + Chain.Validate() + Chain.Init() + + original, err := Chain.BlockEncoder.Encode(&pbeth.Block{ + Number: 1, + Header: &pbeth.BlockHeader{}, + }) + require.NoError(t, err) + + require.Equal(t, uint64(1), original.ToProtocol().(*pbeth.Block).Number) + + buffer := bytes.NewBuffer(nil) + writer, err := bstream.GetBlockWriterFactory.New(buffer) + require.NoError(t, err) + + require.NoError(t, writer.Write(original)) + + reader, err := bstream.GetBlockReaderFactory.New(buffer) + require.NoError(t, err) + + hydrated, err := reader.Read() + require.NoError(t, err) + + require.Equal(t, uint64(1), hydrated.ToProtocol().(*pbeth.Block).Number) +} diff --git a/cmd/fireeth/reader_node_bootstraper.go b/cmd/fireeth/reader_node_bootstraper.go new file mode 100644 index 00000000..52017e2a --- /dev/null +++ b/cmd/fireeth/reader_node_bootstraper.go @@ -0,0 +1,245 @@ +package main + +import ( + "archive/tar" + "context" + "fmt" + "io" + "os" + "os/exec" + "path/filepath" + "strings" + "time" + + "github.com/spf13/cobra" + "github.com/streamingfast/cli" + "github.com/streamingfast/cli/sflags" + "github.com/streamingfast/dstore" + firecore "github.com/streamingfast/firehose-core" + "github.com/streamingfast/node-manager/operator" + "go.uber.org/zap" +) + +func newReaderNodeBootstrapper(ctx context.Context, logger *zap.Logger, cmd *cobra.Command, resolvedNodeArguments []string, resolver firecore.ReaderNodeArgumentResolver) (operator.Bootstrapper, error) { + nodePath := sflags.MustGetString(cmd, "reader-node-path") + bootstrapDataURL := sflags.MustGetString(cmd, "reader-node-bootstrap-data-url") + nodeDataDir := resolver("{node-data-dir}") + + switch { + case strings.HasSuffix(bootstrapDataURL, "tar.zst") || strings.HasSuffix(bootstrapDataURL, "tar.zstd"): + // There could be a mistmatch here if the user override `--datadir` manually, we live it for now + return NewTarballBootstrapper(bootstrapDataURL, nodeDataDir, logger), nil + + case strings.HasSuffix(bootstrapDataURL, "json"): + var args []string + if dataDirArgument := findDataDirArgument(resolvedNodeArguments); dataDirArgument != "" { + args = append(args, dataDirArgument) + } + + return NewGenesisBootstrapper(nodeDataDir, bootstrapDataURL, nodePath, append(args, "init"), logger), nil + default: + return nil, fmt.Errorf("'reader-node-bootstrap-data-url' config should point to either an archive ending in '.tar.zstd' or a genesis file ending in '.json', not %s", bootstrapDataURL) + } +} + +func findDataDirArgument(resolvedNodeArguments []string) string { + for i, arg := range resolvedNodeArguments { + if strings.HasPrefix(arg, "--datadir") { + // If the argument is in 2 parts (e.g. [--datadir, ]), we try to re-combine them + if arg == "--datadir" { + if len(resolvedNodeArguments) > i+1 { + return "--datadir=" + resolvedNodeArguments[i+1] + } + + // The arguments are invalid, we'll let the node fail later on + return arg + } + + return arg + } + } + + return "" +} + +// GenesisBootstrapper needs to write genesis file, static node file, then run a command like 'geth init' +type GenesisBootstrapper struct { + dataDir string + genesisFileURL string + cmdArgs []string + nodePath string + // staticNodesFilepath string + logger *zap.Logger +} + +func NewGenesisBootstrapper(dataDir string, genesisFileURL string, nodePath string, cmdArgs []string, logger *zap.Logger) *GenesisBootstrapper { + return &GenesisBootstrapper{ + dataDir: dataDir, + genesisFileURL: genesisFileURL, + nodePath: nodePath, + cmdArgs: cmdArgs, + logger: logger, + } +} + +func downloadDstoreObject(url string, destpath string) error { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + reader, _, _, err := dstore.OpenObject(ctx, url) + if err != nil { + return fmt.Errorf("cannot get file from store: %w", err) + } + defer reader.Close() + data, err := io.ReadAll(reader) + if err != nil { + return err + } + + return os.WriteFile(destpath, data, 0644) +} + +func (b *GenesisBootstrapper) Bootstrap() error { + if b.genesisFileURL == "" || isBootstrapped(b.dataDir, b.logger) { + return nil + } + + genesisFilePath := filepath.Join(b.dataDir, "genesis.json") + + b.logger.Info("running bootstrap sequence", zap.String("data_dir", b.dataDir), zap.String("genesis_file_path", genesisFilePath)) + if err := os.MkdirAll(b.dataDir, 0755); err != nil { + return fmt.Errorf("cannot create folder %s to bootstrap node: %w", b.dataDir, err) + } + + if !cli.FileExists(genesisFilePath) { + b.logger.Info("fetching genesis file", zap.String("source_url", b.genesisFileURL)) + if err := downloadDstoreObject(b.genesisFileURL, genesisFilePath); err != nil { + return err + } + } + + cmd := exec.Command(b.nodePath, append(b.cmdArgs, genesisFilePath)...) + b.logger.Info("running node init command (creating genesis block from genesis.json)", zap.Stringer("cmd", cmd)) + if output, err := runCmd(cmd); err != nil { + return fmt.Errorf("failed to init node (output %s): %w", output, err) + } + + return nil +} + +func NewTarballBootstrapper( + url string, + dataDir string, + logger *zap.Logger, +) *TarballBootstrapper { + return &TarballBootstrapper{ + url: url, + dataDir: dataDir, + logger: logger, + } +} + +type TarballBootstrapper struct { + url string + dataDir string + logger *zap.Logger +} + +func isBootstrapped(dataDir string, logger *zap.Logger) bool { + var foundCURRENT bool + err := filepath.Walk(dataDir, + func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if info.IsDir() { + return nil + } + if filepath.Base(path) == "CURRENT" { + foundCURRENT = true + return io.EOF + } + return nil + }) + if err != nil && !os.IsNotExist(err) && err != io.EOF { + logger.Warn("error while checking for bootstrapped status", zap.Error(err)) + } + + return foundCURRENT +} + +func (b *TarballBootstrapper) isBootstrapped() bool { + return isBootstrapped(b.dataDir, b.logger) +} + +func (b *TarballBootstrapper) Bootstrap() error { + if b.isBootstrapped() { + return nil + } + + b.logger.Info("bootstrapping geth chain data from pre-built data", zap.String("bootstrap_data_url", b.url)) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) + defer cancel() + + reader, _, _, err := dstore.OpenObject(ctx, b.url, dstore.Compression("zstd")) + if err != nil { + return fmt.Errorf("cannot get snapshot from gstore: %w", err) + } + defer reader.Close() + + b.createChainData(reader) + return nil +} + +func (b *TarballBootstrapper) createChainData(reader io.Reader) error { + err := os.MkdirAll(b.dataDir, os.ModePerm) + if err != nil { + return fmt.Errorf("unable to create blocks log file: %w", err) + } + + b.logger.Info("extracting bootstrapping data into node data directory", zap.String("data_dir", b.dataDir)) + tr := tar.NewReader(reader) + for { + header, err := tr.Next() + if err != nil { + if err == io.EOF { + return nil + } + + return err + } + + path := filepath.Join(b.dataDir, header.Name) + b.logger.Debug("about to write content of entry", zap.String("name", header.Name), zap.String("path", path), zap.Bool("is_dir", header.FileInfo().IsDir())) + if header.FileInfo().IsDir() { + err = os.MkdirAll(path, os.ModePerm) + if err != nil { + return fmt.Errorf("unable to create directory: %w", err) + } + + continue + } + + file, err := os.Create(path) + if err != nil { + return fmt.Errorf("unable to create file: %w", err) + } + + if _, err := io.Copy(file, tr); err != nil { + file.Close() + return err + } + file.Close() + } +} + +func runCmd(cmd *exec.Cmd) (string, error) { + // This runs (and wait) the command, combines both stdout and stderr in a single stream and return everything + out, err := cmd.CombinedOutput() + if err == nil { + return "", nil + } + + return string(out), err +} diff --git a/cmd/fireeth/tools_transform_flags.go b/cmd/fireeth/tools_transform_flags.go new file mode 100644 index 00000000..91b36886 --- /dev/null +++ b/cmd/fireeth/tools_transform_flags.go @@ -0,0 +1,153 @@ +package main + +import ( + "fmt" + "strings" + + "github.com/spf13/cobra" + "github.com/streamingfast/cli/sflags" + "github.com/streamingfast/eth-go" + pbtransform "github.com/streamingfast/firehose-ethereum/types/pb/sf/ethereum/transform/v1" + "go.uber.org/zap" + "google.golang.org/protobuf/types/known/anypb" +) + +func parseTransformFlags(cmd *cobra.Command, logger *zap.Logger) (transforms []*anypb.Any, err error) { + filters, err := parseFilters(sflags.MustGetString(cmd, "call-filters"), sflags.MustGetString(cmd, "log-filters"), sflags.MustGetBool(cmd, "send-all-block-headers")) + if err != nil { + return nil, err + } + + headerOnly := sflags.MustGetBool(cmd, "header-only") + if filters != nil && headerOnly { + return nil, fmt.Errorf("'header-only' flag is exclusive with 'call-filters', 'log-filters' and 'send-all-block-headers' choose either 'header-only' or a combination of the others") + } + + if headerOnly { + t, err := anypb.New(&pbtransform.HeaderOnly{}) + if err != nil { + return nil, err + } + + return []*anypb.Any{t}, nil + } + + if filters != nil { + t, err := anypb.New(filters) + if err != nil { + return nil, err + } + + return []*anypb.Any{t}, nil + } + + return +} + +func parseFilters(callFilters, logFilters string, sendAllBlockHeaders bool) (*pbtransform.CombinedFilter, error) { + mf := &pbtransform.CombinedFilter{} + + if callFilters == "" && logFilters == "" && !sendAllBlockHeaders { + return nil, nil + } + if callFilters != "" { + for _, filter := range strings.Split(callFilters, ",") { + if filter == "" { + continue + } + parts := strings.Split(filter, ":") + if len(parts) != 2 { + return nil, fmt.Errorf("option --call-filters must be of type address_hash+address_hash+address_hash:event_sig_hash+event_sig_hash (repeated, separated by comma)") + } + var addrs []eth.Address + for _, a := range strings.Split(parts[0], "+") { + if a != "" { + addr := eth.MustNewAddressLoose(a) + addrs = append(addrs, addr) + } + } + var sigs []eth.Hash + for _, s := range strings.Split(parts[1], "+") { + if s != "" { + sig := eth.MustNewHash(s) + sigs = append(sigs, sig) + } + } + + mf.CallFilters = append(mf.CallFilters, basicCallToFilter(addrs, sigs)) + } + } + + if logFilters != "" { + for _, filter := range strings.Split(logFilters, ",") { + if filter == "" { + continue + } + parts := strings.Split(filter, ":") + if len(parts) != 2 { + return nil, fmt.Errorf("option --log-filters must be of type address_hash+address_hash+address_hash:event_sig_hash+event_sig_hash (repeated, separated by comma)") + } + var addrs []eth.Address + for _, a := range strings.Split(parts[0], "+") { + if a != "" { + addr := eth.MustNewAddress(a) + addrs = append(addrs, addr) + } + } + var sigs []eth.Hash + for _, s := range strings.Split(parts[1], "+") { + if s != "" { + sig := eth.MustNewHash(s) + sigs = append(sigs, sig) + } + } + + mf.LogFilters = append(mf.LogFilters, basicLogFilter(addrs, sigs)) + } + } + + if sendAllBlockHeaders { + mf.SendAllBlockHeaders = true + } + return mf, nil +} + +func basicCallToFilter(addrs []eth.Address, sigs []eth.Hash) *pbtransform.CallToFilter { + var addrBytes [][]byte + var sigsBytes [][]byte + + for _, addr := range addrs { + b := addr.Bytes() + addrBytes = append(addrBytes, b) + } + + for _, sig := range sigs { + b := sig.Bytes() + sigsBytes = append(sigsBytes, b) + } + + return &pbtransform.CallToFilter{ + Addresses: addrBytes, + Signatures: sigsBytes, + } +} + +func basicLogFilter(addrs []eth.Address, sigs []eth.Hash) *pbtransform.LogFilter { + var addrBytes [][]byte + var sigsBytes [][]byte + + for _, addr := range addrs { + b := addr.Bytes() + addrBytes = append(addrBytes, b) + } + + for _, sig := range sigs { + b := sig.Bytes() + sigsBytes = append(sigsBytes, b) + } + + return &pbtransform.LogFilter{ + Addresses: addrBytes, + EventSignatures: sigsBytes, + } +} diff --git a/codec/console_reader.go b/codec/console_reader.go index 671fbe9e..8eb420ca 100644 --- a/codec/console_reader.go +++ b/codec/console_reader.go @@ -31,8 +31,10 @@ import ( "github.com/streamingfast/bstream" "github.com/streamingfast/dmetrics" "github.com/streamingfast/eth-go" - "github.com/streamingfast/firehose-ethereum/types" + firecore "github.com/streamingfast/firehose-core" pbeth "github.com/streamingfast/firehose-ethereum/types/pb/sf/ethereum/type/v2" + "github.com/streamingfast/logging" + "github.com/streamingfast/node-manager/mindreader" "go.uber.org/zap" "google.golang.org/protobuf/proto" ) @@ -50,7 +52,7 @@ type ConsoleReader struct { logger *zap.Logger } -func NewConsoleReader(logger *zap.Logger, lines chan string) (*ConsoleReader, error) { +func NewConsoleReader(lines chan string, blockEncoder firecore.BlockEncoder, logger *zap.Logger, tracer logging.Tracer) (mindreader.ConsolerReader, error) { globalStats := newConsoleReaderStats() globalStats.StartPeriodicLogToZap(context.Background(), logger, 30*time.Second) @@ -58,7 +60,7 @@ func NewConsoleReader(logger *zap.Logger, lines chan string) (*ConsoleReader, er lines: lines, close: func() {}, - ctx: &parseCtx{logger: logger, globalStats: globalStats, normalizationFeatures: &normalizationFeatures{}}, + ctx: &parseCtx{logger: logger, globalStats: globalStats, normalizationFeatures: &normalizationFeatures{}, encoder: blockEncoder}, done: make(chan interface{}), stats: globalStats, @@ -177,6 +179,7 @@ type parseCtx struct { transactionTraces []*pbeth.TransactionTrace evmCallStackIndexes []int32 + encoder firecore.BlockEncoder stats *parsingStats globalStats *consoleReaderStats @@ -1134,7 +1137,7 @@ func (ctx *parseCtx) readBlock(line string) (*bstream.Block, error) { libNum = computeProofOfWorkLIBNum(block.Number, bstream.GetProtocolFirstStreamableBlock) } - bstreamBlock, err := types.BlockFromProto(block, libNum) + bstreamBlock, err := ctx.encoder.Encode(firecore.BlockEnveloppe{Block: block, LIBNum: libNum}) if err != nil { return nil, err } @@ -1241,7 +1244,7 @@ func (ctx *parseCtx) readEndBlock(line string) (*bstream.Block, error) { libNum = computeProofOfWorkLIBNum(block.Number, bstream.GetProtocolFirstStreamableBlock) } - bstreamBlock, err := types.BlockFromProto(block, libNum) + bstreamBlock, err := ctx.encoder.Encode(firecore.BlockEnveloppe{Block: block, LIBNum: libNum}) if err != nil { return nil, err } diff --git a/codec/console_reader_test.go b/codec/console_reader_test.go index 18ce8b01..5d289313 100644 --- a/codec/console_reader_test.go +++ b/codec/console_reader_test.go @@ -31,7 +31,7 @@ import ( "testing" "github.com/golang/protobuf/proto" - "github.com/streamingfast/firehose-ethereum/types" + firecore "github.com/streamingfast/firehose-core" pbeth "github.com/streamingfast/firehose-ethereum/types/pb/sf/ethereum/type/v2" "github.com/streamingfast/jsonpb" "github.com/stretchr/testify/assert" @@ -167,13 +167,14 @@ func TestGeneratePBBlocks(t *testing.T) { t.Skip("generate only when firehose-logs.dmlog changes") cr := testFileConsoleReader(t, "testdata/firehose-logs.dmlog") + encoder := firecore.NewBlockEncoder() for { out, err := cr.ReadBlock() if out != nil { block := out.ToProtocol().(*pbeth.Block) - bstreamBlock, err := types.BlockFromProto(block, out.LibNum) + bstreamBlock, err := encoder.Encode(firecore.BlockEnveloppe{Block: block, LIBNum: out.LibNum}) require.NoError(t, err) pbBlock, err := bstreamBlock.ToProto() @@ -222,10 +223,12 @@ func testFileConsoleReader(t *testing.T, filename string) *ConsoleReader { } func testReaderConsoleReader(helperFunc func(), lines chan string, closer func()) *ConsoleReader { + encoder := firecore.NewBlockEncoder() + l := &ConsoleReader{ lines: lines, close: closer, - ctx: &parseCtx{logger: zlog, stats: newParsingStats(zlog, 0), globalStats: newConsoleReaderStats(), normalizationFeatures: &normalizationFeatures{UpgradeBlockV2ToV3: true}}, + ctx: &parseCtx{logger: zlog, stats: newParsingStats(zlog, 0), globalStats: newConsoleReaderStats(), normalizationFeatures: &normalizationFeatures{UpgradeBlockV2ToV3: true}, encoder: encoder}, logger: zlog, } diff --git a/codec/init_test.go b/codec/init_test.go index cda1f14e..5d594e1c 100644 --- a/codec/init_test.go +++ b/codec/init_test.go @@ -1,8 +1,12 @@ package codec -import "github.com/streamingfast/logging" +import ( + // Import for its side-effect (registering necessary bstream + _ "github.com/streamingfast/firehose-ethereum/types" + "github.com/streamingfast/logging" +) -var zlog, _ = logging.PackageLogger("fireeth", "github.com/streamingfast/firehose-ethereum/node-mananager/codec") +var zlog, _ = logging.PackageLogger("fireeth", "github.com/streamingfast/firehose-ethereum/codec") func init() { logging.InstantiateLoggers() diff --git a/devel/.gitignore b/devel/.gitignore index ead192c1..af9c8b2d 100644 --- a/devel/.gitignore +++ b/devel/.gitignore @@ -1,5 +1,6 @@ geth/ sf-data/ cs-data/ +firehose-data/ perso /bsc-local/localdata/ diff --git a/devel/firehose-mainnet/firehose-mainnet.yaml b/devel/firehose-mainnet/firehose-mainnet.yaml index e25325bd..e7200820 100644 --- a/devel/firehose-mainnet/firehose-mainnet.yaml +++ b/devel/firehose-mainnet/firehose-mainnet.yaml @@ -7,6 +7,10 @@ start: # - $FIREETH_COMMON_FORKED_BLOCKS_STORE_URL (defines common-forked-blocks-store-url) # - $FIREETH_COMMON_MERGED_BLOCKS_STORE_URL (defines common-merged-blocks-store-url) # - $FIREETH_SUBSTREAMS_RPC_ENDPOINTS (defines substreams-rpc-endpoint) + # + # Assuming `BLOCK_STORE_URL` is defined, someone can use those config + # - Mainnet: ./devel/firehose-mainnet/start.sh -c -- --common-one-block-store-url="${BLOCK_STORE_URL}/eth-mainnet/v4-oneblock" --common-forked-blocks-store-url="${BLOCK_STORE_URL}/eth-mainnet/v4-forked" --common-merged-blocks-store-url="${BLOCK_STORE_URL}/eth-mainnet/v4" --common-live-blocks-addr="" + # # Comment out 'common-live-blocks-addr' to only use historical which improves the start up speed of 'fireeth' common-live-blocks-addr: localhost:9001 diff --git a/devel/firehose-mainnet/start.sh b/devel/firehose-mainnet/start.sh index 79c6b652..ea15ab90 100755 --- a/devel/firehose-mainnet/start.sh +++ b/devel/firehose-mainnet/start.sh @@ -20,20 +20,6 @@ main() { set -e - if [[ - -z "$FIREETH_COMMON_ONE_BLOCK_STORE_URL" || - -z "$FIREETH_COMMON_FORKED_BLOCKS_STORE_URL" || - -z "$FIREETH_COMMON_MERGED_BLOCKS_STORE_URL" || - -z "$FIREETH_SUBSTREAMS_RPC_ENDPOINTS" - ]]; then - echo 'To use this config, you must define:' - echo '- FIREETH_COMMON_ONE_BLOCK_STORE_URL (defines common-one-block-store-url)' - echo '- FIREETH_COMMON_FORKED_BLOCKS_STORE_URL (defines common-forked-blocks-store-url)' - echo '- FIREETH_COMMON_MERGED_BLOCKS_STORE_URL (defines common-merged-blocks-store-url)' - echo '- FIREETH_SUBSTREAMS_RPC_ENDPOINTS (defines substreams-rpc-endpoint)' - exit 1 - fi - if [[ $clean == "true" ]]; then rm -rf sf-data &> /dev/null || true fi diff --git a/devel/standard/miner/bootstrap.tar.gz b/devel/standard/miner/bootstrap.tar.gz new file mode 100644 index 00000000..3a3d84e8 Binary files /dev/null and b/devel/standard/miner/bootstrap.tar.gz differ diff --git a/devel/standard/miner/bootstrap.tar.zst b/devel/standard/miner/bootstrap.tar.zst deleted file mode 100644 index ecbb910c..00000000 Binary files a/devel/standard/miner/bootstrap.tar.zst and /dev/null differ diff --git a/devel/standard/miner/config.toml b/devel/standard/miner/config.toml new file mode 100644 index 00000000..594ffb79 --- /dev/null +++ b/devel/standard/miner/config.toml @@ -0,0 +1,15 @@ +[Eth] +NetworkId = 1515 +SyncMode = "full" +# Activate archive mode +NoPruning = true + +[Node] +HTTPHost = "" +HTTPPort = 8545 +HTTPModules = ["personal", "net", "web3", "eth"] +AuthPort = 8551 + +[Node.P2P] +ListenAddr = ":30303" +NoDiscovery = true diff --git a/devel/standard/reader/config.toml b/devel/standard/reader/config.toml new file mode 100644 index 00000000..a8645ebb --- /dev/null +++ b/devel/standard/reader/config.toml @@ -0,0 +1,4 @@ +[Node.P2P] +StaticNodes = [ + "enode://e4d5433fd9e84930cd38028f2fdb1ca8d55bdb7b6a749da57e8aa7fc5d3c146c44c0d129a14cecc7b0f13bb98700bf392dd4fd7c31bf2fe26038d4ba8f5a8e32@127.0.0.1:30303", +] diff --git a/devel/standard/reader/static-nodes.json b/devel/standard/reader/static-nodes.json deleted file mode 100644 index 8e3dfab0..00000000 --- a/devel/standard/reader/static-nodes.json +++ /dev/null @@ -1,3 +0,0 @@ -[ - "enode://444aadceb03f6dad618842399a1e5908de4bdd7bf3fa64b3dc1ac6dfe5843a99302b86049b2b5c3e16ee3a3279cb09465d98741a8f791b0b0f7a48c309d41dcf@127.0.0.1:13090" -] \ No newline at end of file diff --git a/devel/standard/standard.yaml b/devel/standard/standard.yaml index 77e81252..3a4042e9 100644 --- a/devel/standard/standard.yaml +++ b/devel/standard/standard.yaml @@ -1,23 +1,31 @@ start: args: - - node - reader-node - merger - relayer - - combined-index-builder + - index-builder - firehose - substreams-tier1 - substreams-tier2 flags: - node-role: dev-miner - node-bootstrap-data-url: ./miner/bootstrap.tar.zst - node-log-to-zap: false - reader-node-bootstrap-data-url: ./reader/genesis.json - reader-node-enforce-peers: localhost:13041 # App `node` manager API port - reader-node-arguments: +--firehose-genesis-file=./reader/genesis.json --authrpc.port=8552 - reader-node-log-to-zap: false - merger-time-between-store-pruning: 10s firehose-grpc-listen-addr: :8089 + merger-time-between-store-pruning: 10s + reader-node-bootstrap-data-url: ./reader/genesis.json + reader-node-arguments: + --config=./reader/config.toml + --networkid=1515 + --datadir={node-data-dir} + --ipcpath={data-dir}/reader/ipc + --port=30305 + --nodiscover + --authrpc.port=8552 + --http + --http.api=eth,net,web3 + --http.port=8547 + --http.addr=0.0.0.0 + --http.vhosts=* + --firehose-enabled + --firehose-genesis-file=./reader/genesis.json #substreams-rpc-endpoints: $ETH_MAINNET_RPC # replace with eth mainnet rpc endpoint substreams-rpc-cache-chunk-size: 100 substreams-state-bundle-size: 100 diff --git a/devel/standard/start.sh b/devel/standard/start.sh index 4964b96c..456960b5 100755 --- a/devel/standard/start.sh +++ b/devel/standard/start.sh @@ -18,13 +18,36 @@ main() { shift $((OPTIND-1)) [[ $1 = "--" ]] && shift + fh_data_dir="$ROOT/firehose-data" + miner_data_dir="$fh_data_dir/miner" + + # If nodekey is changed, the enode must be updated too because it's generated from the nodekey (put here for reference purposes, do not remove) + miner_enode="enode://e4d5433fd9e84930cd38028f2fdb1ca8d55bdb7b6a749da57e8aa7fc5d3c146c44c0d129a14cecc7b0f13bb98700bf392dd4fd7c31bf2fe26038d4ba8f5a8e32@[127.0.0.1]:30303" + miner_nodekey="f5f0aadf436e6b35c5fc00a1b0dbc181113ce4f3c448b73b954fe932c00a1b0d" + set -e if [[ $clean == "true" ]]; then - rm -rf sf-data &> /dev/null || true + rm -rf "$fh_data_dir" &> /dev/null || true + + echo "Creating miner Geth directory from 'bootstrap.tar.gz'" + mkdir -p "$miner_data_dir" + tar -C "$miner_data_dir" -xzf "$ROOT/miner/bootstrap.tar.gz" fi - exec $fireeth -c $(basename $ROOT).yaml start "$@" + # The sleep is here to give 1s for the process to exists, too fast and the 'kill' is a noop + trap "trap - SIGTERM && sleep 1 && kill -- -$$" SIGINT SIGTERM EXIT + geth \ + --config="$ROOT/miner/config.toml" \ + --datadir="$miner_data_dir" \ + --mine \ + --miner.etherbase=0x821b55d8abe79bc98f05eb675fdc50dfe796b7ab \ + --nodekeyhex="$miner_nodekey" \ + --allow-insecure-unlock \ + --password=/dev/null \ + --unlock=0x821b55d8abe79bc98f05eb675fdc50dfe796b7ab & + + $fireeth -c $(basename $ROOT).yaml start "$@" } usage_error() { @@ -46,7 +69,7 @@ usage() { echo " -c Clean actual data directory first" echo "" echo "Examples" - echo " Stream blocks grpcurl -plaintext -import-path ../proto -import-path ./proto -proto sf/ethereum/type/v2/type.proto -proto sf/firehose/v1/firehose.proto -d '{\"start_block_num\": -1}' localhost:13042 sf.firehose.v1.Stream.Blocks" + echo " Stream blocks grpcurl -plaintext -d '{\"start_block_num\": -1}' localhost:8089 sf.firehose.v2.Stream/Blocks" } main "$@" diff --git a/tools/download-blocks-from-firehose.go b/tools/download-blocks-from-firehose.go deleted file mode 100644 index 8a5e58c7..00000000 --- a/tools/download-blocks-from-firehose.go +++ /dev/null @@ -1,92 +0,0 @@ -package tools - -import ( - "context" - "fmt" - "os" - "strconv" - - "github.com/spf13/cobra" - "github.com/streamingfast/bstream" - "github.com/streamingfast/firehose-ethereum/types" - pbeth "github.com/streamingfast/firehose-ethereum/types/pb/sf/ethereum/type/v2" - sftools "github.com/streamingfast/sf-tools" - "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/types/known/anypb" -) - -func init() { - Cmd.AddCommand(DownloadFromFirehoseCmd) - DownloadFromFirehoseCmd.Flags().StringP("api-token-env-var", "a", "FIREHOSE_API_TOKEN", "Look for a JWT in this environment variable to authenticate against endpoint") - DownloadFromFirehoseCmd.Flags().BoolP("plaintext", "p", false, "Use plaintext connection to firehose") - DownloadFromFirehoseCmd.Flags().BoolP("insecure", "k", false, "Skip SSL certificate validation when connecting to firehose") - DownloadFromFirehoseCmd.Flags().Bool("fix-ordinals", false, "Decode the eth blocks to fix the ordinals in the receipt logs") -} - -var DownloadFromFirehoseCmd = &cobra.Command{ - Use: "download-from-firehose ", - Short: "download blocks from firehose and save them to merged-blocks", - Args: cobra.ExactArgs(4), - RunE: downloadFromFirehoseE, - Example: ExamplePrefixed("fireeth tools download-from-firehose", ` - api.streamingfast.io 1000 2000 ./outputdir - `), -} - -func downloadFromFirehoseE(cmd *cobra.Command, args []string) error { - ctx := context.Background() - - endpoint := args[0] - start, err := strconv.ParseUint(args[1], 10, 64) - if err != nil { - return fmt.Errorf("parsing start block num: %w", err) - } - stop, err := strconv.ParseUint(args[2], 10, 64) - if err != nil { - return fmt.Errorf("parsing stop block num: %w", err) - } - destFolder := args[3] - - apiTokenEnvVar := mustGetString(cmd, "api-token-env-var") - apiToken := os.Getenv(apiTokenEnvVar) - - plaintext := mustGetBool(cmd, "plaintext") - insecure := mustGetBool(cmd, "insecure") - var fixerFunc func(*bstream.Block) (*bstream.Block, error) - if mustGetBool(cmd, "fix-ordinals") { - fixerFunc = func(in *bstream.Block) (*bstream.Block, error) { - block := in.ToProtocol().(*pbeth.Block) - return types.BlockFromProto(block, in.LibNum) - } - } else { - fixerFunc = func(in *bstream.Block) (*bstream.Block, error) { - return in, nil - } - } - - return sftools.DownloadFirehoseBlocks( - ctx, - endpoint, - apiToken, - insecure, - plaintext, - start, - stop, - destFolder, - decodeAnyPB, - fixerFunc, - zlog, - ) -} - -func decodeAnyPB(in *anypb.Any) (*bstream.Block, error) { - block := &pbeth.Block{} - if err := anypb.UnmarshalTo(in, block, proto.UnmarshalOptions{}); err != nil { - return nil, fmt.Errorf("unmarshal anypb: %w", err) - } - - // We are downloading only final blocks from the Firehose connection which means the LIB for them - // can be set to themself (althought we use `- 1` to ensure problem would occur if codde don't like - // `LIBNum == self.BlockNum`). - return types.BlockFromProto(block, block.Number-1) -} diff --git a/tools/firehose-prometheus-exporter.go b/tools/firehose-prometheus-exporter.go deleted file mode 100644 index 28f9560c..00000000 --- a/tools/firehose-prometheus-exporter.go +++ /dev/null @@ -1,14 +0,0 @@ -package tools - -import ( - sftools "github.com/streamingfast/sf-tools" -) - -func init() { - prometheusExporterCmd := sftools.GetFirehosePrometheusExporterCmd(zlog, tracer, transformsSetter) - prometheusExporterCmd.Flags().Bool("header-only", false, "Apply the HeaderOnly transform sending back Block's header only (with few top-level fields), exclusive option") - prometheusExporterCmd.Flags().String("call-filters", "", "call filters (format: '[address1[+address2[+...]]]:[eventsig1[+eventsig2[+...]]]") - prometheusExporterCmd.Flags().String("log-filters", "", "log filters (format: '[address1[+address2[+...]]]:[eventsig1[+eventsig2[+...]]]") - prometheusExporterCmd.Flags().Bool("send-all-block-headers", false, "ask for all the blocks to be sent (header-only if there is no match)") - Cmd.AddCommand(prometheusExporterCmd) -} diff --git a/types/block.go b/types/block.go deleted file mode 100644 index ca4fee6b..00000000 --- a/types/block.go +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright 2021 dfuse Platform Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package types - -import ( - "fmt" - - "github.com/streamingfast/bstream" - firecore "github.com/streamingfast/firehose-core" - pbeth "github.com/streamingfast/firehose-ethereum/types/pb/sf/ethereum/type/v2" - pbbstream "github.com/streamingfast/pbgo/sf/bstream/v1" - "google.golang.org/protobuf/proto" -) - -var _ firecore.Block = (*pbeth.Block)(nil) - -func BlockFromProto(b *pbeth.Block, libNum uint64) (*bstream.Block, error) { - blockTime, err := b.Time() - if err != nil { - return nil, err - } - - content, err := proto.Marshal(b) - if err != nil { - return nil, fmt.Errorf("unable to marshal to binary form: %s", err) - } - - blk := &bstream.Block{ - Id: b.ID(), - Number: b.Number, - PreviousId: b.PreviousID(), - Timestamp: blockTime, - LibNum: libNum, - PayloadKind: pbbstream.Protocol_ETH, - PayloadVersion: b.Ver, - } - - return bstream.GetBlockPayloadSetter(blk, content) -} diff --git a/types/decoder.go b/types/decoder.go deleted file mode 100644 index 3afef621..00000000 --- a/types/decoder.go +++ /dev/null @@ -1,47 +0,0 @@ -// Copyright 2021 dfuse Platform Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package types - -import ( - "fmt" - - "github.com/streamingfast/bstream" - pbeth "github.com/streamingfast/firehose-ethereum/types/pb/sf/ethereum/type/v2" - pbbstream "github.com/streamingfast/pbgo/sf/bstream/v1" - "google.golang.org/protobuf/proto" -) - -func BlockDecoder(blk *bstream.Block) (interface{}, error) { - if blk.Kind() != pbbstream.Protocol_ETH { - return nil, fmt.Errorf("expected kind %s, got %s", pbbstream.Protocol_ETH, blk.Kind()) - } - - if blk.Version() > 3 || blk.Version() < 1 { - return nil, fmt.Errorf("this decoder only knows about version 1, 2 and 3, got %d", blk.Version()) - } - - block := new(pbeth.Block) - pl, err := blk.Payload.Get() - if err != nil { - return nil, fmt.Errorf("unable to get payload: %s", err) - } - - err = proto.Unmarshal(pl, block) - if err != nil { - return nil, fmt.Errorf("unable to decode payload: %s", err) - } - - return block, nil -} diff --git a/types/init.go b/types/init.go index 8350471c..7b3d533a 100644 --- a/types/init.go +++ b/types/init.go @@ -15,38 +15,32 @@ package types import ( - "fmt" - "io" "strings" - "time" "github.com/streamingfast/bstream" + firecore "github.com/streamingfast/firehose-core" + pbeth "github.com/streamingfast/firehose-ethereum/types/pb/sf/ethereum/type/v2" pbbstream "github.com/streamingfast/pbgo/sf/bstream/v1" + "google.golang.org/protobuf/proto" ) +var _ firecore.Block = (*pbeth.Block)(nil) + +var encoder = firecore.NewBlockEncoder() + +var BlockAcceptedVersions = []int32{1, 2, 3} + func init() { - bstream.GetBlockWriterFactory = bstream.BlockWriterFactoryFunc(blockWriterFactory) - bstream.GetBlockReaderFactory = bstream.BlockReaderFactoryFunc(blockReaderFactory) - bstream.GetBlockDecoder = bstream.BlockDecoderFunc(BlockDecoder) - bstream.GetBlockWriterHeaderLen = 10 - bstream.GetBlockPayloadSetter = bstream.MemoryBlockPayloadSetter - bstream.GetMemoizeMaxAge = 20 * time.Second + // Doing it in `types` ensure that does that depend only on us are properly initialized + firecore.UnsafePayloadKind = pbbstream.Protocol_ETH + + // Must fit what is defined in `cmd/fireeth/main.go` in regards to `protocol` and `protocolVersion` and `blockAcceptedVersions` + bstream.InitGeneric("ETH", 1, BlockAcceptedVersions, func() proto.Message { return &pbeth.Block{} }) bstream.NormalizeBlockID = func(in string) string { return strings.TrimPrefix(strings.ToLower(in), "0x") } } -func blockReaderFactory(reader io.Reader) (bstream.BlockReader, error) { - return bstream.NewDBinBlockReader(reader, func(contentType string, version int32) error { - protocol := pbbstream.Protocol(pbbstream.Protocol_value[contentType]) - if protocol != pbbstream.Protocol_ETH && version != 1 { - return fmt.Errorf("reader only knows about %s block kind at version 1, got %s at version %d", protocol, contentType, version) - } - - return nil - }) -} - -func blockWriterFactory(writer io.Writer) (bstream.BlockWriter, error) { - return bstream.NewDBinBlockWriter(writer, pbbstream.Protocol_ETH.String(), 1) +func BlockFromProto(b *pbeth.Block, libNum uint64) (*bstream.Block, error) { + return encoder.Encode(firecore.BlockEnveloppe{Block: b, LIBNum: libNum}) } diff --git a/types/logging.go b/types/logging.go deleted file mode 100644 index ab471b35..00000000 --- a/types/logging.go +++ /dev/null @@ -1,21 +0,0 @@ -// Copyright 2021 dfuse Platform Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package types - -import ( - "github.com/streamingfast/logging" -) - -var zlog, _ = logging.PackageLogger("fireeth", "github.com/streamingfast/firehose-ethereum/types") diff --git a/types/pb/sf/ethereum/type/v2/type.go b/types/pb/sf/ethereum/type/v2/type.go index 84c88dc9..c22ca4c2 100644 --- a/types/pb/sf/ethereum/type/v2/type.go +++ b/types/pb/sf/ethereum/type/v2/type.go @@ -293,13 +293,6 @@ func (b *Block) GetFirehoseBlockID() string { return hex.EncodeToString(b.Hash) } -// GetFirehoseBlockLIBNum implements firecore.Block. -func (b *Block) GetFirehoseBlockLIBNum() uint64 { - // FIXME: Wrong, LIBNum is incorrect and can only be implemented by console reader! - // Need to review firehose-core to see how GetFirehoseBlockLIBNum is used - return b.LIBNum() -} - // GetFirehoseBlockNumber implements firecore.Block. func (b *Block) GetFirehoseBlockNumber() uint64 { return b.Number @@ -314,3 +307,8 @@ func (b *Block) GetFirehoseBlockParentID() string { func (b *Block) GetFirehoseBlockTime() time.Time { return b.Header.Timestamp.AsTime() } + +// GetFirehoseBlockVersion implements firecore.Block. +func (b *Block) GetFirehoseBlockVersion() int32 { + return b.Ver +} diff --git a/types/testing/types.go b/types/testing/types.go index ea158626..42ad6243 100644 --- a/types/testing/types.go +++ b/types/testing/types.go @@ -555,7 +555,3 @@ func failInvalidComponent(t testing.T, tag string, component interface{}, option require.FailNowf(t, "invalid component", "Invalid %s component of type %T", tag, component) } - -func logInvalidComponent(tag string, component interface{}) { - zlog.Info(fmt.Sprintf("invalid %s component of type %T", tag, component)) -}