diff --git a/cmd/blobstream-ops/common/utils.go b/cmd/blobstream-ops/common/utils.go new file mode 100644 index 0000000..0613007 --- /dev/null +++ b/cmd/blobstream-ops/common/utils.go @@ -0,0 +1,44 @@ +package common + +import ( + "context" + "fmt" + "io" + "os" + "os/signal" + "strings" + "syscall" + + "github.com/cosmos/cosmos-sdk/server" + "github.com/rs/zerolog" + tmconfig "github.com/tendermint/tendermint/config" + tmlog "github.com/tendermint/tendermint/libs/log" +) + +// GetLogger creates a new logger and returns +func GetLogger(level string, format string) (tmlog.Logger, error) { + logLvl, err := zerolog.ParseLevel(level) + if err != nil { + return nil, fmt.Errorf("failed to parse log level (%s): %w", level, err) + } + var logWriter io.Writer + if strings.ToLower(format) == tmconfig.LogFormatPlain { + logWriter = zerolog.ConsoleWriter{Out: os.Stderr} + } else { + logWriter = os.Stderr + } + + return server.ZeroLogWrapper{Logger: zerolog.New(logWriter).Level(logLvl).With().Timestamp().Logger()}, nil +} + +// TrapSignal will listen for any OS signal and cancel the context to exit gracefully. +func TrapSignal(logger tmlog.Logger, cancel context.CancelFunc) { + sigCh := make(chan os.Signal, 1) + + signal.Notify(sigCh, syscall.SIGTERM) + signal.Notify(sigCh, syscall.SIGINT) + + sig := <-sigCh + logger.Info("caught signal; shutting down...", "signal", sig.String()) + cancel() +} diff --git a/cmd/blobstream-ops/replay/cmd.go b/cmd/blobstream-ops/replay/cmd.go new file mode 100644 index 0000000..3825032 --- /dev/null +++ b/cmd/blobstream-ops/replay/cmd.go @@ -0,0 +1,165 @@ +package replay + +import ( + "context" + + "github.com/celestiaorg/blobstream-ops/cmd/blobstream-ops/common" + "github.com/celestiaorg/blobstream-ops/cmd/blobstream-ops/version" + "github.com/celestiaorg/blobstream-ops/replay" + "github.com/ethereum/go-ethereum/accounts/abi/bind" + ethcmn "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/spf13/cobra" + blobstreamxwrapper "github.com/succinctlabs/blobstreamx/bindings" + "github.com/tendermint/tendermint/rpc/client/http" +) + +// Command the replay command +func Command() *cobra.Command { + cmd := &cobra.Command{ + Use: "replay", + Short: "BlobstreamX deployment verification", + Long: "verifies that a BlobstreamX contract is committing to valid data", + SilenceUsage: true, + RunE: func(cmd *cobra.Command, _ []string) error { + config, err := parseFlags(cmd) + if err != nil { + return err + } + if err := config.ValidateBasics(); err != nil { + return err + } + + logger, err := common.GetLogger(config.LogLevel, config.LogFormat) + if err != nil { + return err + } + + buildInfo := version.GetBuildInfo() + logger.Info("initializing replay service", "version", buildInfo.SemanticVersion, "build_date", buildInfo.BuildTime) + + ctx, cancel := context.WithCancel(cmd.Context()) + defer cancel() + + // Listen for and trap any OS signal to graceful shutdown and exit + go common.TrapSignal(logger, cancel) + + // connecting to the source BlobstreamX contract + sourceEVMClient, err := ethclient.Dial(config.SourceEVMRPC) + if err != nil { + return err + } + defer sourceEVMClient.Close() + + sourceBlobstreamReader, err := blobstreamxwrapper.NewBlobstreamXCaller( + ethcmn.HexToAddress(config.SourceContractAddress), + sourceEVMClient, + ) + if err != nil { + return err + } + + // connecting to the target BlobstreamX contract + targetEVMClient, err := ethclient.Dial(config.TargetEVMRPC) + if err != nil { + return err + } + defer targetEVMClient.Close() + + targetBlobstreamReader, err := blobstreamxwrapper.NewBlobstreamXCaller( + ethcmn.HexToAddress(config.TargetContractAddress), + targetEVMClient, + ) + if err != nil { + return err + } + + logger.Info( + "starting replay service", + "evm.source.rpc", + config.SourceEVMRPC, + "evm.source.contract-address", + config.SourceContractAddress, + "evm.target.rpc", + config.TargetEVMRPC, + "evm.target.contract-address", + config.TargetContractAddress, + "core.rpc", + config.CoreRPC, + ) + + latestSourceBlock, err := sourceBlobstreamReader.LatestBlock(&bind.CallOpts{}) + if err != nil { + return err + } + logger.Info("found source blobstreamX contract", "latest_block", latestSourceBlock) + + latestTargetBlock, err := targetBlobstreamReader.LatestBlock(&bind.CallOpts{}) + if err != nil { + return err + } + logger.Info("found target blobstreamX contract", "latest_block", latestTargetBlock) + + var trpc *http.HTTP + if config.Verify { + trpc, err = http.New(config.CoreRPC, "/websocket") + if err != nil { + return err + } + err = trpc.Start() + if err != nil { + return err + } + defer func(trpc *http.HTTP) { + err := trpc.Stop() + if err != nil { + logger.Error("error stopping tendermint RPC", "err", err.Error()) + } + }(trpc) + } + + if latestSourceBlock > latestTargetBlock { + err = replay.Catchup( + ctx, + logger, + config.Verify, + trpc, + sourceEVMClient, + targetEVMClient, + config.SourceContractAddress, + config.TargetContractAddress, + config.TargetChainGateway, + config.PrivateKey, + config.HeaderRangeFunctionID, + config.NextHeaderFunctionID, + config.FilterRange, + ) + if err != nil { + return err + } + } else { + logger.Info("target contract is already up to date") + } + + return replay.Follow( + ctx, + logger, + config.Verify, + trpc, + sourceEVMClient, + targetEVMClient, + config.SourceContractAddress, + config.TargetContractAddress, + config.TargetChainGateway, + config.PrivateKey, + config.HeaderRangeFunctionID, + config.NextHeaderFunctionID, + config.FilterRange, + ) + }, + } + + cmd.SetHelpCommand(&cobra.Command{}) + + return addFlags(cmd) +} diff --git a/cmd/blobstream-ops/replay/config.go b/cmd/blobstream-ops/replay/config.go new file mode 100644 index 0000000..d200aab --- /dev/null +++ b/cmd/blobstream-ops/replay/config.go @@ -0,0 +1,215 @@ +package replay + +import ( + "crypto/ecdsa" + "encoding/hex" + "errors" + "fmt" + + "github.com/ethereum/go-ethereum/crypto" + + ethcmn "github.com/ethereum/go-ethereum/common" + "github.com/spf13/cobra" +) + +const ( + FlagSourceEVMRPC = "evm.source.rpc" + FlagSourceEVMContractAddress = "evm.source.contract-address" + FlagTargetEVMRPC = "evm.target.rpc" + FlagTargetEVMContractAddress = "evm.target.contract-address" + FlagTargetChainGateway = "evm.target.gateway" + FlagEVMPrivateKey = "evm.private-key" + FlagEVMFilterRange = "evm.filter-range" + + FlagHeaderRangeFunctionID = "circuits.header-range.functionID" + FlagNextHeaderFunctionID = "circuits.next-header.functionID" + + FlagVerify = "verify" + + FlagLogLevel = "log.level" + FlagLogFormat = "log.format" + + FlagCoreRPC = "core.rpc" +) + +func addFlags(cmd *cobra.Command) *cobra.Command { + cmd.Flags().String(FlagSourceEVMRPC, "http://localhost:8545", "Specify the Ethereum rpc address of the source EVM chain") + cmd.Flags().String(FlagTargetEVMRPC, "http://localhost:8545", "Specify the Ethereum rpc address of the target EVM chain") + cmd.Flags().String(FlagSourceEVMContractAddress, "", "Specify the source contract at which the source BlobstreamX contract is deployed") + cmd.Flags().String(FlagTargetEVMContractAddress, "", "Specify the target contract at which the target BlobstreamX contract is deployed") + cmd.Flags().String(FlagTargetChainGateway, "", "Specify the target chain succinct gateway contract address") + cmd.Flags().String( + FlagLogLevel, + "info", + "The logging level (trace|debug|info|warn|error|fatal|panic)", + ) + cmd.Flags().String( + FlagLogFormat, + "plain", + "The logging format (json|plain)", + ) + cmd.Flags().String( + FlagCoreRPC, + "tcp://localhost:26657", + "The celestia app rpc address", + ) + cmd.Flags().Bool(FlagVerify, false, "Set to verify the commitments before replaying their proofs. Require the core rpc flag to be set") + cmd.Flags().String(FlagEVMPrivateKey, "", "Specify the EVM private key, in hex format without the leading 0x, to use for replaying transaction in the target chain. Corresponding account should be funded") + cmd.Flags().String(FlagHeaderRangeFunctionID, "", "Specify the function ID of the header range circuit in the target BlobstreamX contract, in hex format without the leading 0x") + cmd.Flags().String(FlagNextHeaderFunctionID, "", "Specify the function ID of the next header circuit in the target BlobstreamX contract, in hex format without the leading 0x") + cmd.Flags().Int64(FlagEVMFilterRange, 5000, "Specify the eth_getLogs filter range") + return cmd +} + +type Config struct { + SourceEVMRPC string + TargetEVMRPC string + SourceContractAddress string + TargetContractAddress string + TargetChainGateway string + LogLevel string + LogFormat string + CoreRPC string + Verify bool + PrivateKey *ecdsa.PrivateKey + HeaderRangeFunctionID [32]byte + NextHeaderFunctionID [32]byte + FilterRange int64 +} + +func (cfg Config) ValidateBasics() error { + if err := ValidateEVMAddress(cfg.SourceContractAddress); err != nil { + return fmt.Errorf("%s: flag --%s", err.Error(), FlagSourceEVMContractAddress) + } + if err := ValidateEVMAddress(cfg.TargetContractAddress); err != nil { + return fmt.Errorf("%s: flag --%s", err.Error(), FlagTargetEVMContractAddress) + } + if err := ValidateEVMAddress(cfg.TargetChainGateway); err != nil { + return fmt.Errorf("%s: flag --%s", err.Error(), FlagTargetChainGateway) + } + if cfg.Verify && cfg.CoreRPC == "" { + return fmt.Errorf("flag --%s is set but the core RPC flag --%s is not set", FlagVerify, FlagCoreRPC) + } + return nil +} + +func ValidateEVMAddress(addr string) error { + if addr == "" { + return fmt.Errorf("the EVM address cannot be empty") + } + if !ethcmn.IsHexAddress(addr) { + return errors.New("valid EVM address is required") + } + return nil +} + +func parseFlags(cmd *cobra.Command) (Config, error) { + // TODO add support for env variables + sourceContractAddress, err := cmd.Flags().GetString(FlagSourceEVMContractAddress) + if err != nil { + return Config{}, err + } + + targetContractAddress, err := cmd.Flags().GetString(FlagTargetEVMContractAddress) + if err != nil { + return Config{}, err + } + + targetChainGateway, err := cmd.Flags().GetString(FlagTargetChainGateway) + if err != nil { + return Config{}, err + } + + sourceEVMRPC, err := cmd.Flags().GetString(FlagSourceEVMRPC) + if err != nil { + return Config{}, err + } + + targetEVMRPC, err := cmd.Flags().GetString(FlagTargetEVMRPC) + if err != nil { + return Config{}, err + } + + coreRPC, err := cmd.Flags().GetString(FlagCoreRPC) + if err != nil { + return Config{}, err + } + + logLevel, err := cmd.Flags().GetString(FlagLogLevel) + if err != nil { + return Config{}, err + } + + logFormat, err := cmd.Flags().GetString(FlagLogFormat) + if err != nil { + return Config{}, err + } + + rawPrivateKey, err := cmd.Flags().GetString(FlagEVMPrivateKey) + if err != nil { + return Config{}, err + } + if rawPrivateKey == "" { + return Config{}, fmt.Errorf("please set the private key --%s", FlagEVMPrivateKey) + } + privateKey, err := crypto.HexToECDSA(rawPrivateKey) + if err != nil { + return Config{}, fmt.Errorf("failed to hex-decode Ethereum ECDSA Private Key: %w", err) + } + + strHeaderRange, err := cmd.Flags().GetString(FlagHeaderRangeFunctionID) + if err != nil { + return Config{}, err + } + if strHeaderRange == "" { + return Config{}, fmt.Errorf("please set the header range function ID --%s", FlagHeaderRangeFunctionID) + } + decodedHeaderRange, err := hex.DecodeString(strHeaderRange) + if err != nil { + return Config{}, err + } + var bzHeaderRange [32]byte + copy(bzHeaderRange[:], decodedHeaderRange) + + strNextHeader, err := cmd.Flags().GetString(FlagNextHeaderFunctionID) + if err != nil { + return Config{}, err + } + if strNextHeader == "" { + return Config{}, fmt.Errorf("please set the header range function ID --%s", FlagHeaderRangeFunctionID) + } + decodedNextHeader, err := hex.DecodeString(strNextHeader) + if err != nil { + return Config{}, err + } + var bzNextHeader [32]byte + copy(bzNextHeader[:], decodedNextHeader) + + filterRange, err := cmd.Flags().GetInt64(FlagEVMFilterRange) + if err != nil { + return Config{}, err + } + + verify, err := cmd.Flags().GetBool(FlagVerify) + if err != nil { + return Config{}, err + } + + // TODO add rate limiting flag + // TODO add gas price multiplier flag + return Config{ + SourceEVMRPC: sourceEVMRPC, + TargetEVMRPC: targetEVMRPC, + SourceContractAddress: sourceContractAddress, + TargetContractAddress: targetContractAddress, + TargetChainGateway: targetChainGateway, + CoreRPC: coreRPC, + LogLevel: logLevel, + LogFormat: logFormat, + PrivateKey: privateKey, + NextHeaderFunctionID: bzNextHeader, + HeaderRangeFunctionID: bzHeaderRange, + FilterRange: filterRange, + Verify: verify, + }, nil +} diff --git a/cmd/blobstream-ops/root/cmd.go b/cmd/blobstream-ops/root/cmd.go index fe44046..cb6a917 100644 --- a/cmd/blobstream-ops/root/cmd.go +++ b/cmd/blobstream-ops/root/cmd.go @@ -1,6 +1,7 @@ package root import ( + "github.com/celestiaorg/blobstream-ops/cmd/blobstream-ops/replay" "github.com/celestiaorg/blobstream-ops/cmd/blobstream-ops/verify" "github.com/celestiaorg/blobstream-ops/cmd/blobstream-ops/version" "github.com/spf13/cobra" @@ -18,6 +19,7 @@ func Cmd() *cobra.Command { rootCmd.AddCommand( version.Cmd, verify.Command(), + replay.Command(), ) rootCmd.SetHelpCommand(&cobra.Command{}) diff --git a/cmd/blobstream-ops/verify/cmd.go b/cmd/blobstream-ops/verify/cmd.go index d2bd9f3..5fe2c80 100644 --- a/cmd/blobstream-ops/verify/cmd.go +++ b/cmd/blobstream-ops/verify/cmd.go @@ -4,22 +4,14 @@ import ( "bytes" "context" "fmt" - "io" - "os" - "os/signal" - "strings" - "syscall" + "github.com/celestiaorg/blobstream-ops/cmd/blobstream-ops/common" "github.com/celestiaorg/blobstream-ops/cmd/blobstream-ops/version" - "github.com/cosmos/cosmos-sdk/server" "github.com/ethereum/go-ethereum/accounts/abi/bind" ethcmn "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethclient" - "github.com/rs/zerolog" "github.com/spf13/cobra" blobstreamxwrapper "github.com/succinctlabs/blobstreamx/bindings" - tmconfig "github.com/tendermint/tendermint/config" - tmlog "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/rpc/client/http" ) @@ -58,7 +50,7 @@ func VerifyContractCommand() *cobra.Command { return err } - logger, err := GetLogger(config.LogLevel, config.LogFormat) + logger, err := common.GetLogger(config.LogLevel, config.LogFormat) if err != nil { return err } @@ -85,7 +77,7 @@ func VerifyContractCommand() *cobra.Command { } // Listen for and trap any OS signal to graceful shutdown and exit - go TrapSignal(logger, cancel) + go common.TrapSignal(logger, cancel) logger.Info( "starting verifier", @@ -162,7 +154,7 @@ func VerifyContractCommand() *cobra.Command { defer func(trpc *http.HTTP) { err := trpc.Stop() if err != nil { - fmt.Println(err.Error()) + logger.Error("error stopping tendermint RPC", "err", err.Error()) } }(trpc) @@ -189,31 +181,3 @@ func VerifyContractCommand() *cobra.Command { } return addStartFlags(command) } - -// GetLogger creates a new logger and returns -func GetLogger(level string, format string) (tmlog.Logger, error) { - logLvl, err := zerolog.ParseLevel(level) - if err != nil { - return nil, fmt.Errorf("failed to parse log level (%s): %w", level, err) - } - var logWriter io.Writer - if strings.ToLower(format) == tmconfig.LogFormatPlain { - logWriter = zerolog.ConsoleWriter{Out: os.Stderr} - } else { - logWriter = os.Stderr - } - - return server.ZeroLogWrapper{Logger: zerolog.New(logWriter).Level(logLvl).With().Timestamp().Logger()}, nil -} - -// TrapSignal will listen for any OS signal and cancel the context to exit gracefully. -func TrapSignal(logger tmlog.Logger, cancel context.CancelFunc) { - sigCh := make(chan os.Signal, 1) - - signal.Notify(sigCh, syscall.SIGTERM) - signal.Notify(sigCh, syscall.SIGINT) - - sig := <-sigCh - logger.Info("caught signal; shutting down...", "signal", sig.String()) - cancel() -} diff --git a/go.mod b/go.mod index 2535d78..6b134be 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/rs/zerolog v1.33.0 github.com/spf13/cobra v1.8.1 github.com/succinctlabs/blobstreamx v0.0.0-20240115194141-5649c689a7fe + github.com/succinctlabs/succinctx v1.1.0 github.com/tendermint/tendermint v0.35.9 ) @@ -35,7 +36,7 @@ require ( github.com/cometbft/cometbft-db v0.9.1 // indirect github.com/confio/ics23/go v0.9.0 // indirect github.com/consensys/bavard v0.1.13 // indirect - github.com/consensys/gnark-crypto v0.12.1 // indirect + github.com/consensys/gnark-crypto v0.12.2-0.20231013160410-1f65e75b6dfb // indirect github.com/cosmos/btcutil v1.0.5 // indirect github.com/cosmos/cosmos-proto v1.0.0-beta.3 // indirect github.com/cosmos/go-bip39 v1.0.0 // indirect diff --git a/go.sum b/go.sum index 3e821b5..71fe92d 100644 --- a/go.sum +++ b/go.sum @@ -215,8 +215,8 @@ github.com/consensys/bavard v0.1.13 h1:oLhMLOFGTLdlda/kma4VOJazblc7IM5y5QPd2A/Yj github.com/consensys/bavard v0.1.13/go.mod h1:9ItSMtA/dXMAiL7BG6bqW2m3NdSEObYWoH223nGHukI= github.com/consensys/gnark-crypto v0.4.1-0.20210426202927-39ac3d4b3f1f/go.mod h1:815PAHg3wvysy0SyIqanF8gZ0Y1wjk/hrDHD/iT88+Q= github.com/consensys/gnark-crypto v0.5.3/go.mod h1:hOdPlWQV1gDLp7faZVeg8Y0iEPFaOUnCc4XeCCk96p0= -github.com/consensys/gnark-crypto v0.12.1 h1:lHH39WuuFgVHONRl3J0LRBtuYdQTumFSDtJF7HpyG8M= -github.com/consensys/gnark-crypto v0.12.1/go.mod h1:v2Gy7L/4ZRosZ7Ivs+9SfUDr0f5UlG+EM5t7MPHiLuY= +github.com/consensys/gnark-crypto v0.12.2-0.20231013160410-1f65e75b6dfb h1:f0BMgIjhZy4lSRHCXFbQst85f5agZAjtDMixQqBWNpc= +github.com/consensys/gnark-crypto v0.12.2-0.20231013160410-1f65e75b6dfb/go.mod h1:v2Gy7L/4ZRosZ7Ivs+9SfUDr0f5UlG+EM5t7MPHiLuY= github.com/containerd/continuity v0.3.0 h1:nisirsYROK15TAMVukJOUyGJjz4BNQJBVsNvAXZJ/eg= github.com/containerd/continuity v0.3.0/go.mod h1:wJEAIwKOm/pBZuBd0JmeTvnLquTB1Ag8espWhkykbPM= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= @@ -994,6 +994,8 @@ github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8 github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= github.com/succinctlabs/blobstreamx v0.0.0-20240115194141-5649c689a7fe h1:lRAlEnfW3/+9ZZuD41TASapvcJZKy8FHMkH9U9Wc7aY= github.com/succinctlabs/blobstreamx v0.0.0-20240115194141-5649c689a7fe/go.mod h1:8ZvZV7KHR9olj1/Hdf5wJYlYjzmLms3ue/P1gzqGxTg= +github.com/succinctlabs/succinctx v1.1.0 h1:8Mz2Ig1eDV8zEq5y7V5JfIftSE5TPjPpOvSTJ1i0lW8= +github.com/succinctlabs/succinctx v1.1.0/go.mod h1:9/IDJr415PLecNQNTMg1BLxWJvrD44uA7ra+C9g/QAs= github.com/supranational/blst v0.3.11 h1:LyU6FolezeWAhvQk0k6O/d49jqgO52MSDDfYgbeoEm4= github.com/supranational/blst v0.3.11/go.mod h1:jZJtfjgudtNl4en1tzwPIV3KjUnQUvG3/j+w+fVonLw= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= diff --git a/replay/evm.go b/replay/evm.go new file mode 100644 index 0000000..0ae91b5 --- /dev/null +++ b/replay/evm.go @@ -0,0 +1,184 @@ +package replay + +import ( + "context" + "crypto/ecdsa" + "errors" + "fmt" + "math/big" + "time" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + ethcmn "github.com/ethereum/go-ethereum/common" + coregethtypes "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethclient" + bindings2 "github.com/succinctlabs/blobstreamx/bindings" + "github.com/succinctlabs/succinctx/bindings" + tmlog "github.com/tendermint/tendermint/libs/log" +) + +type fulfillCallArgs struct { + FunctionID [32]byte `json:"_functionId"` + Input []byte `json:"_input"` + Output []byte `json:"_output"` + Proof []byte `json:"_proof"` + CallbackAddress ethcmn.Address `json:"_callbackAddress"` + CallbackData []byte `json:"_callbackData"` +} + +func toFulfillCallArgs(args map[string]interface{}) (fulfillCallArgs, error) { + fID, ok := args["_functionId"] + if !ok { + return fulfillCallArgs{}, fmt.Errorf("couldn't find the _functionId in map") + } + input, ok := args["_input"] + if !ok { + return fulfillCallArgs{}, fmt.Errorf("couldn't find the _input in map") + } + output, ok := args["_output"] + if !ok { + return fulfillCallArgs{}, fmt.Errorf("couldn't find the _output in map") + } + proof, ok := args["_proof"] + if !ok { + return fulfillCallArgs{}, fmt.Errorf("couldn't find the _proof in map") + } + callbackAddress, ok := args["_callbackAddress"] + if !ok { + return fulfillCallArgs{}, fmt.Errorf("couldn't find the _callbackAddress in map") + } + callbackData, ok := args["_callbackData"] + if !ok { + return fulfillCallArgs{}, fmt.Errorf("couldn't find the _callbackData in map") + } + + return fulfillCallArgs{ + FunctionID: fID.([32]byte), + Input: input.([]byte), + Output: output.([]byte), + Proof: proof.([]byte), + CallbackAddress: callbackAddress.(ethcmn.Address), + CallbackData: callbackData.([]byte), + }, nil +} + +type transactOpsBuilder func(ctx context.Context, client *ethclient.Client, gasLim uint64) (*bind.TransactOpts, error) + +func newTransactOptsBuilder(privKey *ecdsa.PrivateKey) transactOpsBuilder { + publicKey := privKey.Public() + publicKeyECDSA, ok := publicKey.(*ecdsa.PublicKey) + if !ok { + panic(fmt.Errorf("invalid public key; expected: %T, got: %T", &ecdsa.PublicKey{}, publicKey)) + } + + evmAddress := crypto.PubkeyToAddress(*publicKeyECDSA) + return func(ctx context.Context, client *ethclient.Client, gasLim uint64) (*bind.TransactOpts, error) { + nonce, err := client.PendingNonceAt(ctx, evmAddress) + if err != nil { + return nil, err + } + + ethChainID, err := client.ChainID(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get Ethereum chain ID: %w", err) + } + + auth, err := bind.NewKeyedTransactorWithChainID(privKey, ethChainID) + if err != nil { + return nil, fmt.Errorf("failed to create Ethereum transactor: %w", err) + } + + bigGasPrice, err := client.SuggestGasPrice(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get Ethereum gas estimate: %w", err) + } + + auth.Nonce = new(big.Int).SetUint64(nonce) + auth.Value = big.NewInt(0) // in wei + auth.GasLimit = gasLim // in units + auth.GasPrice = bigGasPrice + + return auth, nil + } +} + +func submitProof( + ctx context.Context, + logger tmlog.Logger, + client *ethclient.Client, + opts *bind.TransactOpts, + succinctGateway *bindings.SuccinctGateway, + targetBlobstreamXContract *bindings2.BlobstreamX, + args fulfillCallArgs, + proofNonce int64, + waitTimeout time.Duration, +) error { + for i := 0; i < 10; i++ { + logger.Info("submitting transaction for proof", "nonce", proofNonce, "gas_price", opts.GasPrice.Int64()) + tx, err := succinctGateway.FulfillCall( + opts, + args.FunctionID, + args.Input, + args.Output, + args.Proof, + args.CallbackAddress, + args.CallbackData, + ) + if err != nil { + return err + } + logger.Info("transaction submitted", "hash", tx.Hash().Hex()) + _, err = waitForTransaction(ctx, logger, client, tx, waitTimeout) + if err != nil { + actualNonce, err2 := targetBlobstreamXContract.StateProofNonce(&bind.CallOpts{}) + if err2 != nil { + return err2 + } + if actualNonce.Int64() > proofNonce { + logger.Info("no need to replay this nonce, the contract has already committed to it", "nonce", actualNonce) + return nil + } + + if errors.Is(err, context.DeadlineExceeded) { + logger.Debug("transaction still not included, accelerating...") + // we need to speed up the transaction by increasing the gas price + bigGasPrice, err := client.SuggestGasPrice(ctx) + if err != nil { + return fmt.Errorf("failed to get Ethereum gas estimate: %w", err) + } + + // 20% increase of the suggested gas price + opts.GasPrice = big.NewInt(bigGasPrice.Int64() + bigGasPrice.Int64()/5) + logger.Debug("transaction still not included, accelerating...", "new_gas_price", opts.GasPrice.Int64()) + continue + } + logger.Error("transaction failed", "err", err.Error()) + logger.Debug("retrying...") + return err + } + return nil + } + return fmt.Errorf("failed to submit proof nonce %d", proofNonce) +} + +func waitForTransaction( + ctx context.Context, + logger tmlog.Logger, + backend bind.DeployBackend, + tx *coregethtypes.Transaction, + timeout time.Duration, +) (*coregethtypes.Receipt, error) { + logger.Debug("waiting for transaction to be confirmed", "hash", tx.Hash().String()) + + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + receipt, err := bind.WaitMined(ctx, backend, tx) + if err == nil && receipt != nil && receipt.Status == 1 { + logger.Info("transaction confirmed", "hash", tx.Hash().String(), "block", receipt.BlockNumber.Uint64()) + return receipt, nil + } + + return receipt, err +} diff --git a/replay/replayer.go b/replay/replayer.go new file mode 100644 index 0000000..eb442c7 --- /dev/null +++ b/replay/replayer.go @@ -0,0 +1,400 @@ +package replay + +import ( + "bytes" + "context" + "crypto/ecdsa" + "encoding/hex" + "fmt" + "time" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + ethcmn "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethclient" + blobstreamxwrapper "github.com/succinctlabs/blobstreamx/bindings" + "github.com/succinctlabs/succinctx/bindings" + tmlog "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/rpc/client/http" +) + +func Follow( + ctx context.Context, + logger tmlog.Logger, + verify bool, + trpc *http.HTTP, + sourceEVMClient *ethclient.Client, + targetEVMClient *ethclient.Client, + sourceBlobstreamContractAddress string, + targetBlobstreamContractAddress string, + targetChainGatewayAddress string, + privateKey *ecdsa.PrivateKey, + headerRangeFunctionID [32]byte, + nextHeaderFunctionID [32]byte, + filterRange int64, +) error { + logger.Info("listening for new proofs on the source chain") + sourceBlobstreamX, err := blobstreamxwrapper.NewBlobstreamX(ethcmn.HexToAddress(sourceBlobstreamContractAddress), sourceEVMClient) + if err != nil { + return err + } + + targetBlobstreamX, err := blobstreamxwrapper.NewBlobstreamX(ethcmn.HexToAddress(targetBlobstreamContractAddress), sourceEVMClient) + if err != nil { + return err + } + + newEvents := make(chan *blobstreamxwrapper.BlobstreamXDataCommitmentStored) + subscription, err := sourceBlobstreamX.WatchDataCommitmentStored(&bind.WatchOpts{Context: ctx}, newEvents, nil, nil, nil) + if err != nil { + return err + } + defer subscription.Unsubscribe() + + gateway, err := bindings.NewSuccinctGateway(ethcmn.HexToAddress(targetChainGatewayAddress), targetEVMClient) + if err != nil { + return err + } + abi, err := bindings.SuccinctGatewayMetaData.GetAbi() + if err != nil { + return err + } + + for { + select { + case <-ctx.Done(): + return nil + case event := <-newEvents: + latestTargetContractBlock, err := targetBlobstreamX.LatestBlock(&bind.CallOpts{Context: ctx}) + if err != nil { + return err + } + if event.StartBlock < latestTargetContractBlock { + logger.Info("the target contract is at a higher block, waiting for new events", "event_start_block", event.StartBlock, "target_contract_latest_block", latestTargetContractBlock) + continue + } else if event.StartBlock > latestTargetContractBlock { + logger.Info("the target contract needs to catchup", "event_start_block", event.StartBlock, "target_contract_latest_block", latestTargetContractBlock) + err = Catchup( + ctx, + logger, + verify, + trpc, + sourceEVMClient, + targetEVMClient, + sourceBlobstreamContractAddress, + targetBlobstreamContractAddress, + targetChainGatewayAddress, + privateKey, + headerRangeFunctionID, + nextHeaderFunctionID, + filterRange, + ) + if err != nil { + return err + } + latestTargetContractBlock, err = targetBlobstreamX.LatestBlock(&bind.CallOpts{Context: ctx}) + if err != nil { + return err + } + if event.EndBlock == latestTargetContractBlock { + // the contract is already up to date + logger.Info("contract up to date", "target_contract_latest_block", event.EndBlock) + continue + } + } + logger.Debug("getting transaction containing the proof", "nonce", event.ProofNonce.Int64(), "hash", event.Raw.TxHash.Hex(), "start_block", event.StartBlock) + tx, _, err := sourceEVMClient.TransactionByHash(ctx, event.Raw.TxHash) + if err != nil { + return err + } + + logger.Debug("decoding the proof") + rawMap := make(map[string]interface{}) + inputArgs := abi.Methods["fulfillCall"].Inputs + err = inputArgs.UnpackIntoMap(rawMap, tx.Data()[4:]) + if err != nil { + return err + } + + decodedArgs, err := toFulfillCallArgs(rawMap) + if err != nil { + return err + } + + // update the address to be the target blobstreamX contract for the callback + decodedArgs.CallbackAddress = ethcmn.HexToAddress(targetBlobstreamContractAddress) + if event.EndBlock-event.StartBlock > 1 { + // this is a header range proof + decodedArgs.FunctionID = headerRangeFunctionID + } else { + // this is a next header proof + decodedArgs.FunctionID = nextHeaderFunctionID + } + + logger.Info("replaying the proof", "nonce", event.ProofNonce.Int64()) + opts, err := newTransactOptsBuilder(privateKey)(ctx, targetEVMClient, 25000000) + if err != nil { + return err + } + err = submitProof( + ctx, + logger, + targetEVMClient, + opts, + gateway, + targetBlobstreamX, + decodedArgs, + event.ProofNonce.Int64(), + 3*time.Minute, + ) + if err != nil { + return err + } + logger.Info("successfully replayed proof", "nonce", event.ProofNonce.Int64()) + } + } +} + +func Catchup( + ctx context.Context, + logger tmlog.Logger, + verify bool, + trpc *http.HTTP, + sourceEVMClient *ethclient.Client, + targetEVMClient *ethclient.Client, + sourceBlobstreamContractAddress string, + targetBlobstreamContractAddress string, + targetChainGatewayAddress string, + privateKey *ecdsa.PrivateKey, + headerRangeFunctionID [32]byte, + nextHeaderFunctionID [32]byte, + filterRange int64, +) error { + lookupStartHeight, err := sourceEVMClient.BlockNumber(ctx) + if err != nil { + return err + } + + sourceBlobstreamX, err := blobstreamxwrapper.NewBlobstreamX(ethcmn.HexToAddress(sourceBlobstreamContractAddress), sourceEVMClient) + if err != nil { + return err + } + + targetBlobstreamX, err := blobstreamxwrapper.NewBlobstreamX(ethcmn.HexToAddress(targetBlobstreamContractAddress), sourceEVMClient) + if err != nil { + return err + } + + latestSourceContractBlock, err := sourceBlobstreamX.LatestBlock(&bind.CallOpts{Context: ctx}) + if err != nil { + return err + } + + latestTargetContractBlock, err := targetBlobstreamX.LatestBlock(&bind.CallOpts{Context: ctx}) + if err != nil { + return err + } + + logger.Info("catching up", "latest_source_contract_block", latestSourceContractBlock, "latest_target_contract_block", latestTargetContractBlock) + + latestSourceContractNonce, err := sourceBlobstreamX.StateProofNonce(&bind.CallOpts{Context: ctx}) + if err != nil { + return err + } + + dataCommitmentEvents, err := getAllDataCommitmentStoredEvents( + ctx, + logger, + &sourceBlobstreamX.BlobstreamXFilterer, + int64(lookupStartHeight), + filterRange, + latestSourceContractNonce.Int64(), + int64(latestTargetContractBlock), + ) + if err != nil { + return err + } + + gateway, err := bindings.NewSuccinctGateway(ethcmn.HexToAddress(targetChainGatewayAddress), targetEVMClient) + if err != nil { + return err + } + abi, err := bindings.SuccinctGatewayMetaData.GetAbi() + if err != nil { + return err + } + + for startHeight := latestTargetContractBlock; startHeight < latestSourceContractBlock; { + event, exists := dataCommitmentEvents[int64(startHeight)] + if !exists { + return fmt.Errorf("couldn't find a proof that starts at height %d in events", startHeight) + } + + if verify { + logger.Info("verifying data root tuple root", "proof_nonce_in_source_contract", event.ProofNonce, "start_block", event.StartBlock, "end_block", event.EndBlock) + coreDataCommitment, err := trpc.DataCommitment(ctx, event.StartBlock, event.EndBlock) + if err != nil { + return err + } + if bytes.Equal(coreDataCommitment.DataCommitment.Bytes(), event.DataCommitment[:]) { + logger.Info("data commitment verified") + } else { + logger.Error( + "data commitment mismatch!! quitting", + "proof_nonce_in_source_contract", + event.ProofNonce, + "start_block", + event.StartBlock, + "end_block", + event.EndBlock, + "expected_data_commitment", + hex.EncodeToString(coreDataCommitment.DataCommitment.Bytes()), + "actual_data_commitment", + hex.EncodeToString(event.DataCommitment[:]), + ) + return fmt.Errorf("data commitment mistmatch. start height %d end height %d", event.StartBlock, event.EndBlock) + } + } + + latestSourceBlock, err := sourceBlobstreamX.LatestBlock(&bind.CallOpts{Context: ctx}) + if err != nil { + return err + } + + latestTargetContractBlock, err = targetBlobstreamX.LatestBlock(&bind.CallOpts{Context: ctx}) + if err != nil { + return err + } + if latestTargetContractBlock >= latestSourceBlock { + // contract already up to date + return nil + } + + logger.Debug("getting transaction containing the proof", "startHeight", startHeight, "hash", event.Raw.TxHash.Hex()) + tx, _, err := sourceEVMClient.TransactionByHash(ctx, event.Raw.TxHash) + if err != nil { + return err + } + + logger.Debug("decoding the proof") + rawMap := make(map[string]interface{}) + inputArgs := abi.Methods["fulfillCall"].Inputs + err = inputArgs.UnpackIntoMap(rawMap, tx.Data()[4:]) + if err != nil { + return err + } + + decodedArgs, err := toFulfillCallArgs(rawMap) + if err != nil { + return err + } + + // update the address to be the target blobstreamX contract for the callback + decodedArgs.CallbackAddress = ethcmn.HexToAddress(targetBlobstreamContractAddress) + + if event.EndBlock-event.StartBlock > 1 { + // this is a header range proof + decodedArgs.FunctionID = headerRangeFunctionID + } else { + // this is a next header proof + decodedArgs.FunctionID = nextHeaderFunctionID + } + + logger.Info("replaying the proof", "startHeight", startHeight) + opts, err := newTransactOptsBuilder(privateKey)(ctx, targetEVMClient, 25000000) + if err != nil { + return err + } + err = submitProof( + ctx, + logger, + targetEVMClient, + opts, + gateway, + targetBlobstreamX, + decodedArgs, + int64(startHeight), + 3*time.Minute, + ) + if err != nil { + return err + } + // make sure the contract was updated + latestTargetContractBlock, err = targetBlobstreamX.LatestBlock(&bind.CallOpts{Context: ctx}) + if err != nil { + return err + } + if latestTargetContractBlock == event.EndBlock { + // contract updated successfully, we can advance + startHeight = event.EndBlock + } else { + logger.Error("contract did not update successfully, retrying the same proof", "expected_target_height", event.EndBlock, "actual_target_height", latestTargetContractBlock) + } + } + + latestTargetContractBlock, err = targetBlobstreamX.LatestBlock(&bind.CallOpts{Context: ctx}) + if err != nil { + return err + } + + logger.Info("contract up to date", "latest_target_contract_block", latestTargetContractBlock) + return nil +} + +func getAllDataCommitmentStoredEvents( + ctx context.Context, + logger tmlog.Logger, + blobstreamLogFilterer *blobstreamxwrapper.BlobstreamXFilterer, + lookupStartHeight int64, + filterRange int64, + latestSourceContractNonce int64, + latestTargetContractBlock int64, +) (map[int64]blobstreamxwrapper.BlobstreamXDataCommitmentStored, error) { + logger.Info("querying all the data commitment stored events in the source contract...") + dataCommitmentEvents := make(map[int64]blobstreamxwrapper.BlobstreamXDataCommitmentStored) + for eventLookupEnd := lookupStartHeight; eventLookupEnd > 0; eventLookupEnd -= filterRange { + logger.Debug("querying all the data commitment stored events", "evm_block_start", eventLookupEnd, "evm_block_end", eventLookupEnd-filterRange) + rangeStart := eventLookupEnd - filterRange + rangeEnd := uint64(eventLookupEnd) + events, err := blobstreamLogFilterer.FilterDataCommitmentStored( + &bind.FilterOpts{ + Context: ctx, + Start: uint64(rangeStart), + End: &rangeEnd, + }, + nil, + nil, + nil, + ) + if err != nil { + return nil, err + } + + gatheredTheNecessaryEvents := false + for { + if events.Event != nil { + _, exists := dataCommitmentEvents[int64(events.Event.StartBlock)] + if exists { + continue + } + dataCommitmentEvents[int64(events.Event.StartBlock)] = *events.Event + if int64(events.Event.StartBlock) < latestTargetContractBlock { + gatheredTheNecessaryEvents = true + } + } + if !events.Next() { + break + } + } + if int64(len(dataCommitmentEvents)) >= latestSourceContractNonce-1 { + // found all the events + logger.Info("found all events", "count", len(dataCommitmentEvents)) + break + } + if gatheredTheNecessaryEvents { + logger.Info("found enough events to cover the needed range", "count", len(dataCommitmentEvents)) + break + } + logger.Info("found events", "count", len(dataCommitmentEvents)) + } + return dataCommitmentEvents, nil +}