Skip to content

Commit

Permalink
checkpoint: preserve own atx chain (#4612)
Browse files Browse the repository at this point in the history
## Motivation
Closes #4624 

## Changes
code
- if miner doesn't have any atx, check post data dir for metadata and preserve the commitment atx (and its deps) used for initialization
- if miner has an atx that's not part of the snapshot, preserve its chain of deps as far as it can
- if miner has an pending atx in nipost metadata, also preserve its positioning atx
- abort preserving atx if any of its dependency is part of the snapshot of previous checkpoint
- atx dependencies are recursively discovered. the order is important for the validation path
- once node start services after recovering from checkpoint, validate and save the preserved atxs and poet proofs.

systest TestCheckpoint
- publish snapshot that only contains half of the miners atxs. the other miners need to preserve the chain of its own atxs
- using checkpoint url for testing (instead of local snapshot files)

cleanup
- remove recovery from local file
- move checkpoint related data to common/types, as post init dependency is introduced to checkpoint package.

note:
i tried shutting down nodes gracefully in admin::Recover() RPC instead of panicking during TestCheckpoint. but kubernetes doesn't restart the nodes automatically, which means the nodes will not restart to do the recovery.
  • Loading branch information
countvonzero committed Jul 5, 2023
1 parent 52d903d commit f8bd113
Show file tree
Hide file tree
Showing 18 changed files with 934 additions and 662 deletions.
4 changes: 2 additions & 2 deletions activation/activation.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ func (b *Builder) buildNIPostChallenge(ctx context.Context) (*types.NIPostChalle
challenge.Sequence = prevAtx.Sequence + 1
}

if err = saveNipostChallenge(b.nipostBuilder.DataDir(), challenge); err != nil {
if err = SaveNipostChallenge(b.nipostBuilder.DataDir(), challenge); err != nil {
return nil, err
}
return challenge, nil
Expand Down Expand Up @@ -459,7 +459,7 @@ func (b *Builder) Coinbase() types.Address {
}

func (b *Builder) loadChallenge() (*types.NIPostChallenge, error) {
nipost, err := loadNipostChallenge(b.nipostBuilder.DataDir())
nipost, err := LoadNipostChallenge(b.nipostBuilder.DataDir())
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions activation/activation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -965,7 +965,7 @@ func TestBuilder_NIPostPublishRecovery(t *testing.T) {
require.NotNil(t, built)

// the challenge remains
got, err := loadNipostChallenge(tab.nipostBuilder.DataDir())
got, err := LoadNipostChallenge(tab.nipostBuilder.DataDir())
require.NoError(t, err)
require.NotEmpty(t, got)

Expand All @@ -989,7 +989,7 @@ func TestBuilder_NIPostPublishRecovery(t *testing.T) {
})
// This 👇 ensures that handing of the challenge succeeded and the code moved on to the next part
require.ErrorIs(t, tab.PublishActivationTx(context.Background()), ErrATXChallengeExpired)
got, err = loadNipostChallenge(tab.nipostBuilder.DataDir())
got, err = LoadNipostChallenge(tab.nipostBuilder.DataDir())
require.ErrorIs(t, err, os.ErrNotExist)
require.Empty(t, got)

Expand All @@ -1007,7 +1007,7 @@ func TestBuilder_NIPostPublishRecovery(t *testing.T) {
require.NotEqual(t, built.NIPostChallenge, built2.NIPostChallenge)
require.Equal(t, built.TargetEpoch()+1, built2.TargetEpoch())

got, err = loadNipostChallenge(tab.nipostBuilder.DataDir())
got, err = LoadNipostChallenge(tab.nipostBuilder.DataDir())
require.ErrorIs(t, err, os.ErrNotExist)
require.Empty(t, got)
}
Expand Down
4 changes: 2 additions & 2 deletions activation/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ func (h *Handler) GetPosAtxID() (types.ATXID, error) {
return id, nil
}

// HandleAtxData handles atxs received either by gossip or sync.
// HandleAtxData handles atxs received by sync.
func (h *Handler) HandleAtxData(ctx context.Context, peer p2p.Peer, data []byte) error {
err := h.HandleGossipAtx(ctx, peer, data)
if errors.Is(err, errKnownAtx) {
Expand Down Expand Up @@ -532,7 +532,7 @@ func (h *Handler) handleGossipAtx(ctx context.Context, peer p2p.Peer, msg []byte

h.registerHashes(&atx, peer)
if err := h.fetcher.GetPoetProof(ctx, atx.GetPoetProofRef()); err != nil {
return fmt.Errorf("received atx (%v) with syntactically invalid or missing PoET proof (%x): %w",
return fmt.Errorf("received atx (%v) with syntactically invalid or missing poet proof (%x): %w",
atx.ShortString(), atx.GetPoetProofRef().ShortString(), err,
)
}
Expand Down
4 changes: 2 additions & 2 deletions activation/nipost_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,14 @@ func save(filename string, src scale.Encodable) error {
return nil
}

func saveNipostChallenge(dir string, ch *types.NIPostChallenge) error {
func SaveNipostChallenge(dir string, ch *types.NIPostChallenge) error {
if err := save(filepath.Join(dir, challengeFilename), ch); err != nil {
return fmt.Errorf("saving nipost challenge: %w", err)
}
return nil
}

func loadNipostChallenge(dir string) (*types.NIPostChallenge, error) {
func LoadNipostChallenge(dir string) (*types.NIPostChallenge, error) {
var ch types.NIPostChallenge
if err := load(filepath.Join(dir, challengeFilename), &ch); err != nil {
return nil, fmt.Errorf("loading nipost challenge: %w", err)
Expand Down
8 changes: 2 additions & 6 deletions api/grpcserver/admin_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,7 @@ func (a AdminService) CheckpointStream(req *pb.CheckpointStreamRequest, stream p
}
}

func (a AdminService) Recover(ctx context.Context, req *pb.RecoverRequest) (*empty.Empty, error) {
if err := checkpoint.ReadCheckpointAndDie(ctx, a.logger, a.dataDir, req.Uri, types.LayerID(req.RestoreLayer)); err != nil {
a.logger.WithContext(ctx).With().Error("failed to read checkpoint file", log.Err(err))
return nil, status.Errorf(codes.Internal,
fmt.Sprintf("read checkpoint %s and restore %d: %s", req.Uri, req.RestoreLayer, err.Error()))
}
func (a AdminService) Recover(_ context.Context, _ *pb.RecoverRequest) (*empty.Empty, error) {
a.logger.Panic("going to recover from checkpoint")
return &empty.Empty{}, nil
}
23 changes: 1 addition & 22 deletions api/grpcserver/admin_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package grpcserver
import (
"context"
"errors"
"fmt"
"io"
"path/filepath"
"testing"
"time"

Expand All @@ -19,10 +17,7 @@ import (
"github.com/spacemeshos/go-spacemesh/sql/atxs"
)

const (
snapshot uint32 = 15
restore uint32 = 17
)
const snapshot uint32 = 15

func newatx(tb testing.TB, db *sql.Database) {
atx := &types.ActivationTx{
Expand Down Expand Up @@ -111,19 +106,3 @@ func TestAdminService_CheckpointError(t *testing.T) {
_, err = stream.Recv()
require.ErrorContains(t, err, sql.ErrNotFound.Error())
}

func TestAdminService_RecoveryFileMissing(t *testing.T) {
logtest.SetupGlobal(t)
db := sql.InMemory()
svc := NewAdminService(db, t.TempDir(), logtest.New(t))
t.Cleanup(launchServer(t, cfg, svc))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
conn := dialGrpc(ctx, t, cfg.PublicListener)
c := pb.NewAdminServiceClient(conn)

fname := filepath.Join(t.TempDir(), "snapshot")
_, err := c.Recover(ctx, &pb.RecoverRequest{Uri: fmt.Sprintf("file://%s", fname), RestoreLayer: restore})
require.Error(t, err)
}
Loading

0 comments on commit f8bd113

Please sign in to comment.