Skip to content

Commit

Permalink
Merge pull request #73 from sei-protocol/yzang/add-latest-offset
Browse files Browse the repository at this point in the history
Enhance replay chagnelog tool to replay till the end
  • Loading branch information
yzang2019 authored Aug 19, 2024
2 parents 422a4fa + 6831f4a commit cc7801b
Showing 1 changed file with 28 additions and 9 deletions.
37 changes: 28 additions & 9 deletions tools/cmd/seidb/operations/replay_changelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
)

var ssStore types.StateStore
var dryRun = true

func ReplayChangelogCmd() *cobra.Command {
dumpDbCmd := &cobra.Command{
Expand All @@ -23,33 +24,49 @@ func ReplayChangelogCmd() *cobra.Command {
}

dumpDbCmd.PersistentFlags().StringP("db-dir", "d", "", "Database Directory")
dumpDbCmd.PersistentFlags().Int64P("start-offset", "s", 0, "From offset")
dumpDbCmd.PersistentFlags().Int64P("end-offset", "e", 1, "End offset")
dumpDbCmd.PersistentFlags().Int64P("start-offset", "s", 0, "Start offset, default to earliest offset")
dumpDbCmd.PersistentFlags().Int64P("end-offset", "e", 0, "End offset, default to latest offset")
dumpDbCmd.PersistentFlags().Bool("no-dry-run", false, "Whether to dry run or re-apply the changelog to DB")

return dumpDbCmd
}

func executeReplayChangelog(cmd *cobra.Command, _ []string) {
dbDir, _ := cmd.Flags().GetString("db-dir")
start, _ := cmd.Flags().GetInt64("start-offset")
end, _ := cmd.Flags().GetInt64("end-offset")
start, _ := cmd.Flags().GetUint64("start-offset")
end, _ := cmd.Flags().GetUint64("end-offset")
noDryRun, _ := cmd.Flags().GetBool("no-dry-run")
if dbDir == "" {
panic("Must provide database dir")
}

if start > end || start < 0 {
panic("Must provide a valid start/end offset")
}
logDir := filepath.Join(dbDir, "changelog")
stream, err := changelog.NewStream(logger.NewNopLogger(), logDir, changelog.Config{})
if err != nil {
panic(err)
}

// use first available offset
if start <= 0 {
startOffset, err := stream.FirstOffset()
if err != nil {
panic(err)
}
start = startOffset
}

if end <= 0 {
// use latest offset
endOffset, err := stream.LastOffset()
if err != nil {
panic(err)
}
end = endOffset
}

// open the database if this is not a dry run
if noDryRun {
dryRun = false
ssConfig := config.DefaultStateStoreConfig()
ssConfig.KeepRecent = 0
ssConfig.DBDirectory = dbDir
Expand All @@ -60,7 +77,7 @@ func executeReplayChangelog(cmd *cobra.Command, _ []string) {
}

// replay the changelog
err = stream.Replay(uint64(start), uint64(end), processChangelogEntry)
err = stream.Replay(start, end, processChangelogEntry)
if err != nil {
panic(err)
}
Expand All @@ -77,7 +94,9 @@ func processChangelogEntry(index uint64, entry proto.ChangelogEntry) error {
for _, changeset := range entry.Changesets {
storeName := changeset.Name
for _, kv := range changeset.Changeset.Pairs {
fmt.Printf("store: %s, key: %X\n", storeName, kv.Key)
if dryRun {
fmt.Printf("store: %s, key: %X\n", storeName, kv.Key)
}
}
if ssStore != nil {
fmt.Printf("Re-applied changeset for height %d\n", entry.Version)
Expand Down

0 comments on commit cc7801b

Please sign in to comment.