diff --git a/core/processor/abci.go b/core/processor/abci.go index 4eb7d72d8d..a18547adf7 100644 --- a/core/processor/abci.go +++ b/core/processor/abci.go @@ -131,6 +131,9 @@ type SnapshotEngine interface { ReceiveSnapshotChunk(context.Context, *types.RawChunk, string) tmtypes.ResponseApplySnapshotChunk RetrieveSnapshotChunk(uint64, uint32, uint32) (*types.RawChunk, error) HasRestoredStateAlready() bool + + // debug snapshot issues/hash mismatch problems + SnapshotDump(ctx context.Context, path string) ([]byte, error) } type StateVarEngine interface { @@ -1115,6 +1118,15 @@ func (app *App) Finalize() []byte { if err == nil { app.protocolUpgradeService.SetCoreReadyForUpgrade() } + } else if app.cfg.SnapshotDebug.DevEnabled { + if height, _ := vgcontext.BlockHeightFromContext(app.blockCtx); height == app.cfg.SnapshotDebug.CrashAtHeight { + hash, err := app.snapshotEngine.SnapshotDump(app.blockCtx, app.cfg.SnapshotDebug.DebugCrashFile) + if err != nil { + app.log.Panic("Failed to dump snapshot file", logging.Error(err), logging.String("snapshot-hash", string(hash))) + } else { + app.log.Panic("Dumped snapshot file successfully", logging.String("snapshot-hash", string(hash)), logging.String("dump-file", app.cfg.SnapshotDebug.DebugCrashFile)) + } + } } else { snapHash, _, err = app.snapshotEngine.Snapshot(app.blockCtx) } diff --git a/core/processor/config.go b/core/processor/config.go index 83f80265ea..093025c86e 100644 --- a/core/processor/config.go +++ b/core/processor/config.go @@ -25,6 +25,12 @@ const ( namedLogger = "processor" ) +type Snapshot struct { + DevEnabled encoding.Bool + CrashAtHeight uint64 `long:"crash-with-snapshot-at"` + DebugCrashFile string `long:"snapshot-dump-path"` +} + // Config represent the configuration of the processor package. type Config struct { Level encoding.LogLevel `long:"log-level"` @@ -33,6 +39,7 @@ type Config struct { LogOrderCancelDebug encoding.Bool `long:"log-order-cancel-debug"` Ratelimit ratelimit.Config `group:"Ratelimit" namespace:"ratelimit"` KeepCheckpointsMax uint `long:"keep-checkpoints-max"` + SnapshotDebug Snapshot `group:"SnapshotDebug" namespace:"snapshotdebug"` } // NewDefaultConfig creates an instance of the package specific configuration, given a @@ -43,5 +50,10 @@ func NewDefaultConfig() Config { LogOrderSubmitDebug: true, Ratelimit: ratelimit.NewDefaultConfig(), KeepCheckpointsMax: 20, + SnapshotDebug: Snapshot{ + DevEnabled: false, + CrashAtHeight: 0, + DebugCrashFile: "/tmp/snapshot.json", + }, } } diff --git a/core/snapshot/engine.go b/core/snapshot/engine.go index 4eee100fce..b9830595d2 100644 --- a/core/snapshot/engine.go +++ b/core/snapshot/engine.go @@ -16,9 +16,11 @@ package snapshot import ( + "bufio" "context" "errors" "fmt" + "os" "reflect" "sort" "sync" @@ -34,9 +36,12 @@ import ( "code.vegaprotocol.io/vega/libs/proto" "code.vegaprotocol.io/vega/logging" "code.vegaprotocol.io/vega/paths" + snapshotpb "code.vegaprotocol.io/vega/protos/vega/snapshot/v1" "code.vegaprotocol.io/vega/version" tmtypes "github.com/cometbft/cometbft/abci/types" + "github.com/gogo/protobuf/jsonpb" + "github.com/libp2p/go-libp2p/p2p/host/autonat/pb" "go.uber.org/zap" "golang.org/x/exp/slices" ) @@ -512,6 +517,52 @@ func (e *Engine) Snapshot(ctx context.Context) ([]byte, DoneCh, error) { return e.snapshotNow(ctx, true) } +// SnapshotDump takes a snapshot on demand, without persisting it to the underlying DB +// it's meant to just dump to a file for debugging. +func (e *Engine) SnapshotDump(ctx context.Context, path string) ([]byte, error) { + e.ensureEngineIsStarted() + // dump file + f, err := os.Create(path) + if err != nil { + return nil, err + } + defer func() { _ = f.Close() }() + hash, ch, err := e.snapshotNow(ctx, true) + if err != nil { + return nil, err + } + <-ch + payloads, err := e.snapshotTree.AsProtoPayloads() + if err != nil { + return hash, err + } + + w := bufio.NewWriter(f) + m := jsonpb.Marshaler{Indent: " "} + + payloadData := struct { + Data []*snapshotpb.Payload `json:"data,omitempty" protobuf:"bytes,1,rep,name=data"` + pb.Message + }{ + Data: payloads, + } + + s, err := m.MarshalToString(&payloadData) + if err != nil { + return hash, err + } + + if _, err = w.WriteString(s); err != nil { + return hash, err + } + + if err = w.Flush(); err != nil { + return hash, err + } + + return hash, nil +} + // SnapshotNow triggers the snapshot process right now, ignoring the defined // interval. func (e *Engine) SnapshotNow(ctx context.Context) ([]byte, error) { diff --git a/core/snapshot/tree/tree.go b/core/snapshot/tree/tree.go index d918b88683..3c791b49fe 100644 --- a/core/snapshot/tree/tree.go +++ b/core/snapshot/tree/tree.go @@ -134,6 +134,47 @@ func (t *Tree) AsPayloads() ([]*types.Payload, error) { return payloads, nil } +func (t *Tree) AsProtoPayloads() ([]*snapshotpb.Payload, error) { + lastSnapshotTree, err := t.innerTree.GetImmutable(t.innerTree.Version()) + if err != nil { + return nil, fmt.Errorf("could not generate the immutable AVL tree: %w", err) + } + + exporter, err := lastSnapshotTree.Export() + if err != nil { + return nil, fmt.Errorf("could not export the AVL tree: %w", err) + } + defer exporter.Close() + + payloads := []*snapshotpb.Payload{} + + exportedNode, err := exporter.Next() + for err == nil { + // If there is no value, it means the node is an intermediary node and + // not a leaf. Only leaves hold the data we are looking for. + if exportedNode.Value == nil { + exportedNode, err = exporter.Next() + continue + } + + // sort out the payload for this node + payloadProto := &snapshotpb.Payload{} + if perr := proto.Unmarshal(exportedNode.Value, payloadProto); perr != nil { + return nil, perr + } + + payloads = append(payloads, payloadProto) + + exportedNode, err = exporter.Next() + } + + if !errors.Is(err, iavl.ErrorExportDone) { + return nil, fmt.Errorf("failed to export AVL tree: %w", err) + } + + return payloads, nil +} + func (t *Tree) FindImmutableTreeByHeight(blockHeight uint64) (*iavl.ImmutableTree, error) { version, err := t.metadataDB.FindVersionByBlockHeight(blockHeight) if err != nil {