Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add replay functionality #7

Merged
merged 6 commits into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions cmd/blobstream-ops/common/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package common

import (
"context"
"fmt"
"io"
"os"
"os/signal"
"strings"
"syscall"

"github.com/cosmos/cosmos-sdk/server"
"github.com/rs/zerolog"
tmconfig "github.com/tendermint/tendermint/config"
tmlog "github.com/tendermint/tendermint/libs/log"
)

// GetLogger creates a new logger and returns
func GetLogger(level string, format string) (tmlog.Logger, error) {
logLvl, err := zerolog.ParseLevel(level)
if err != nil {
return nil, fmt.Errorf("failed to parse log level (%s): %w", level, err)
}
var logWriter io.Writer
if strings.ToLower(format) == tmconfig.LogFormatPlain {
logWriter = zerolog.ConsoleWriter{Out: os.Stderr}
} else {
logWriter = os.Stderr
}

return server.ZeroLogWrapper{Logger: zerolog.New(logWriter).Level(logLvl).With().Timestamp().Logger()}, nil
}

// TrapSignal will listen for any OS signal and cancel the context to exit gracefully.
func TrapSignal(logger tmlog.Logger, cancel context.CancelFunc) {
sigCh := make(chan os.Signal, 1)

signal.Notify(sigCh, syscall.SIGTERM)
signal.Notify(sigCh, syscall.SIGINT)

sig := <-sigCh
logger.Info("caught signal; shutting down...", "signal", sig.String())
cancel()
}
165 changes: 165 additions & 0 deletions cmd/blobstream-ops/replay/cmd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package replay

import (
"context"

"github.com/celestiaorg/blobstream-ops/cmd/blobstream-ops/common"
"github.com/celestiaorg/blobstream-ops/cmd/blobstream-ops/version"
"github.com/celestiaorg/blobstream-ops/replay"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
ethcmn "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/spf13/cobra"
blobstreamxwrapper "github.com/succinctlabs/blobstreamx/bindings"
"github.com/tendermint/tendermint/rpc/client/http"
)

// Command the replay command
func Command() *cobra.Command {
cmd := &cobra.Command{
Use: "replay",
Short: "BlobstreamX deployment verification",
Long: "verifies that a BlobstreamX contract is committing to valid data",
SilenceUsage: true,
RunE: func(cmd *cobra.Command, _ []string) error {
config, err := parseFlags(cmd)
if err != nil {
return err
}
if err := config.ValidateBasics(); err != nil {
return err
}

logger, err := common.GetLogger(config.LogLevel, config.LogFormat)
if err != nil {
return err
}

buildInfo := version.GetBuildInfo()
logger.Info("initializing replay service", "version", buildInfo.SemanticVersion, "build_date", buildInfo.BuildTime)

ctx, cancel := context.WithCancel(cmd.Context())
defer cancel()

// Listen for and trap any OS signal to graceful shutdown and exit
go common.TrapSignal(logger, cancel)

// connecting to the source BlobstreamX contract
sourceEVMClient, err := ethclient.Dial(config.SourceEVMRPC)
if err != nil {
return err
}
defer sourceEVMClient.Close()

sourceBlobstreamReader, err := blobstreamxwrapper.NewBlobstreamXCaller(
ethcmn.HexToAddress(config.SourceContractAddress),
sourceEVMClient,
)
if err != nil {
return err
}

// connecting to the target BlobstreamX contract
targetEVMClient, err := ethclient.Dial(config.TargetEVMRPC)
if err != nil {
return err
}
defer targetEVMClient.Close()

targetBlobstreamReader, err := blobstreamxwrapper.NewBlobstreamXCaller(
ethcmn.HexToAddress(config.TargetContractAddress),
targetEVMClient,
)
if err != nil {
return err
}

logger.Info(
"starting replay service",
"evm.source.rpc",
config.SourceEVMRPC,
"evm.source.contract-address",
config.SourceContractAddress,
"evm.target.rpc",
config.TargetEVMRPC,
"evm.target.contract-address",
config.TargetContractAddress,
"core.rpc",
config.CoreRPC,
)

latestSourceBlock, err := sourceBlobstreamReader.LatestBlock(&bind.CallOpts{})
if err != nil {
return err
}
logger.Info("found source blobstreamX contract", "latest_block", latestSourceBlock)

latestTargetBlock, err := targetBlobstreamReader.LatestBlock(&bind.CallOpts{})
if err != nil {
return err
}
logger.Info("found target blobstreamX contract", "latest_block", latestTargetBlock)

var trpc *http.HTTP
if config.Verify {
trpc, err = http.New(config.CoreRPC, "/websocket")
if err != nil {
return err
}
err = trpc.Start()
if err != nil {
return err
}
defer func(trpc *http.HTTP) {
err := trpc.Stop()
if err != nil {
logger.Error("error stopping tendermint RPC", "err", err.Error())
}
}(trpc)
}

if latestSourceBlock > latestTargetBlock {
err = replay.Catchup(
ctx,
logger,
config.Verify,
trpc,
sourceEVMClient,
targetEVMClient,
config.SourceContractAddress,
config.TargetContractAddress,
config.TargetChainGateway,
config.PrivateKey,
config.HeaderRangeFunctionID,
config.NextHeaderFunctionID,
config.FilterRange,
)
if err != nil {
return err
}
} else {
logger.Info("target contract is already up to date")
}

return replay.Follow(
ctx,
logger,
config.Verify,
trpc,
sourceEVMClient,
targetEVMClient,
config.SourceContractAddress,
config.TargetContractAddress,
config.TargetChainGateway,
config.PrivateKey,
config.HeaderRangeFunctionID,
config.NextHeaderFunctionID,
config.FilterRange,
)
},
}

cmd.SetHelpCommand(&cobra.Command{})

return addFlags(cmd)
}
Loading
Loading