Skip to content

Commit

Permalink
Update Recovery for multi-smesher
Browse files Browse the repository at this point in the history
  • Loading branch information
fasmat committed Feb 22, 2024
1 parent 196c9bb commit 80c1796
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 41 deletions.
72 changes: 47 additions & 25 deletions checkpoint/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type RecoverConfig struct {
DbFile string
LocalDbFile string
PreserveOwnAtx bool
NodeID types.NodeID
NodeIDs []types.NodeID
Uri string
Restore types.LayerID
}
Expand Down Expand Up @@ -176,15 +176,41 @@ func recoverFromLocalFile(
log.Int("num accounts", len(data.accounts)),
log.Int("num atxs", len(data.atxs)),
)
deps, proofs, err := collectOwnAtxDeps(logger, db, localDB, cfg, data)
if err != nil {
logger.With().Error("failed to collect deps for own atx", log.Err(err))
// continue to recover from checkpoint despite failure to preserve own atx
} else if len(deps) > 0 {
logger.With().Info("collected own atx deps",
log.Context(ctx),
log.Int("own atx deps", len(deps)),
)
allDeps := make([]*types.VerifiedActivationTx, 0)
allProofs := make([]*types.PoetProofMessage, 0)
if cfg.PreserveOwnAtx {
for _, nodeID := range cfg.NodeIDs {
deps, proofs, err := collectOwnAtxDeps(logger, db, localDB, nodeID, cfg.GoldenAtx, data)
if err != nil {
logger.With().Error("failed to collect deps for own atx",
nodeID,
log.Err(err),
)
// continue to recover from checkpoint despite failure to preserve own atx
continue
}

logger.With().Info("collected own atx deps",
log.Context(ctx),
nodeID,
log.Int("own atx deps", len(deps)),
)
allDeps = append(allDeps, deps...)
allProofs = append(allProofs, proofs...)

// // deduplicate allDeps and allProofs by sorting and compacting
// // TODO: for some reason this causes existing tests to fail - need to investigate
// slices.SortFunc(allDeps, func(i, j *types.VerifiedActivationTx) int {
// return bytes.Compare(i.ID().Bytes(), j.ID().Bytes())
// })
// allDeps = slices.Compact(allDeps)
// slices.SortFunc(allProofs, func(i, j *types.PoetProofMessage) int {
// iRef, _ := i.Ref()
// jRef, _ := j.Ref()
// return bytes.Compare(iRef[:], jRef[:])
// })
// allProofs = slices.Compact(allProofs)
}
}
if err := db.Close(); err != nil {
return nil, fmt.Errorf("close old db: %w", err)
Expand Down Expand Up @@ -248,8 +274,8 @@ func recoverFromLocalFile(
types.GetEffectiveGenesis(),
)
var preserve *PreservedData
if len(deps) > 0 {
preserve = &PreservedData{Deps: deps, Proofs: proofs}
if len(allDeps) > 0 {
preserve = &PreservedData{Deps: allDeps, Proofs: allProofs}
}
return preserve, nil
}
Expand Down Expand Up @@ -311,13 +337,11 @@ func collectOwnAtxDeps(
logger log.Log,
db *sql.Database,
localDB *localsql.Database,
cfg *RecoverConfig,
nodeID types.NodeID,
goldenATX types.ATXID,
data *recoverydata,
) ([]*types.VerifiedActivationTx, []*types.PoetProofMessage, error) {
if !cfg.PreserveOwnAtx {
return nil, nil, nil
}
atxid, err := atxs.GetLastIDByNodeID(db, cfg.NodeID)
atxid, err := atxs.GetLastIDByNodeID(db, nodeID)
if err != nil && !errors.Is(err, sql.ErrNotFound) {
return nil, nil, fmt.Errorf("query own last atx id: %w", err)
}
Expand All @@ -329,8 +353,8 @@ func collectOwnAtxDeps(
own = true
}

// check for if miner is building any atx
nipostCh, _ := nipost.Challenge(localDB, cfg.NodeID)
// check for if smesher is building any atx
nipostCh, _ := nipost.Challenge(localDB, nodeID)
if ref == types.EmptyATXID {
if nipostCh == nil {
return nil, nil, nil
Expand All @@ -340,7 +364,7 @@ func collectOwnAtxDeps(
}
}

all := map[types.ATXID]struct{}{cfg.GoldenAtx: {}, types.EmptyATXID: {}}
all := map[types.ATXID]struct{}{goldenATX: {}, types.EmptyATXID: {}}
for _, catx := range data.atxs {
all[catx.ID] = struct{}{}
}
Expand All @@ -353,18 +377,16 @@ func collectOwnAtxDeps(
ref,
log.Bool("own", own),
)
deps, proofs, err = collectDeps(db, cfg.GoldenAtx, ref, all)
deps, proofs, err = collectDeps(db, goldenATX, ref, all)
if err != nil {
return nil, nil, err
}
}
if nipostCh != nil {
logger.With().Info("collecting pending atx and deps",
log.Object("nipost", nipostCh),
)
logger.With().Info("collecting pending atx and deps", log.Object("nipost", nipostCh))
// any previous atx in nipost should already be captured earlier
// we only care about positioning atx here
deps2, proofs2, err := collectDeps(db, cfg.GoldenAtx, nipostCh.PositioningATX, all)
deps2, proofs2, err := collectDeps(db, goldenATX, nipostCh.PositioningATX, all)
if err != nil {
return nil, nil, fmt.Errorf("deps from nipost positioning atx (%v): %w", nipostCh.PositioningATX, err)
}
Expand Down
19 changes: 11 additions & 8 deletions checkpoint/recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func TestRecover(t *testing.T) {
DbFile: "test.sql",
LocalDbFile: "local.sql",
PreserveOwnAtx: true,
NodeID: types.NodeID{2, 3, 4},
NodeIDs: []types.NodeID{types.RandomNodeID()},
Uri: tc.uri,
Restore: types.LayerID(recoverLayer),
}
Expand Down Expand Up @@ -204,7 +204,7 @@ func TestRecover_SameRecoveryInfo(t *testing.T) {
DataDir: t.TempDir(),
DbFile: "test.sql",
PreserveOwnAtx: true,
NodeID: types.NodeID{2, 3, 4},
NodeIDs: []types.NodeID{types.RandomNodeID()},
Uri: fmt.Sprintf("%s/snapshot-15", ts.URL),
Restore: types.LayerID(recoverLayer),
}
Expand Down Expand Up @@ -254,6 +254,9 @@ func validateAndPreserveData(
lg,
)
mfetch.EXPECT().GetAtxs(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
for _, vatx := range deps {
fmt.Println("deps", vatx.ID())
}
for i, vatx := range deps {
encoded, err := codec.Encode(vatx)
require.NoError(tb, err)
Expand Down Expand Up @@ -435,7 +438,7 @@ func TestRecover_OwnAtxNotInCheckpoint_Preserve(t *testing.T) {
DbFile: "test.sql",
LocalDbFile: "local.sql",
PreserveOwnAtx: true,
NodeID: sig.NodeID(),
NodeIDs: []types.NodeID{sig.NodeID()},
Uri: fmt.Sprintf("%s/snapshot-15", ts.URL),
Restore: types.LayerID(recoverLayer),
}
Expand Down Expand Up @@ -507,7 +510,7 @@ func TestRecover_OwnAtxNotInCheckpoint_Preserve_IncludePending(t *testing.T) {
DbFile: "test.sql",
LocalDbFile: "local.sql",
PreserveOwnAtx: true,
NodeID: sig.NodeID(),
NodeIDs: []types.NodeID{sig.NodeID()},
Uri: fmt.Sprintf("%s/snapshot-15", ts.URL),
Restore: types.LayerID(recoverLayer),
}
Expand Down Expand Up @@ -596,7 +599,7 @@ func TestRecover_OwnAtxNotInCheckpoint_Preserve_Still_Initializing(t *testing.T)
DbFile: "test.sql",
LocalDbFile: "local.sql",
PreserveOwnAtx: true,
NodeID: sig.NodeID(),
NodeIDs: []types.NodeID{sig.NodeID()},
Uri: fmt.Sprintf("%s/snapshot-15", ts.URL),
Restore: types.LayerID(recoverLayer),
}
Expand Down Expand Up @@ -682,7 +685,7 @@ func TestRecover_OwnAtxNotInCheckpoint_Preserve_DepIsGolden(t *testing.T) {
DbFile: "test.sql",
LocalDbFile: "local.sql",
PreserveOwnAtx: true,
NodeID: sig.NodeID(),
NodeIDs: []types.NodeID{sig.NodeID()},
Uri: fmt.Sprintf("%s/snapshot-15", ts.URL),
Restore: types.LayerID(recoverLayer),
}
Expand Down Expand Up @@ -764,7 +767,7 @@ func TestRecover_OwnAtxNotInCheckpoint_DontPreserve(t *testing.T) {
DbFile: "test.sql",
LocalDbFile: "local.sql",
PreserveOwnAtx: false,
NodeID: sig.NodeID(),
NodeIDs: []types.NodeID{sig.NodeID()},
Uri: fmt.Sprintf("%s/snapshot-15", ts.URL),
Restore: types.LayerID(recoverLayer),
}
Expand Down Expand Up @@ -834,7 +837,7 @@ func TestRecover_OwnAtxInCheckpoint(t *testing.T) {
DbFile: "test.sql",
LocalDbFile: "local.sql",
PreserveOwnAtx: true,
NodeID: nid,
NodeIDs: []types.NodeID{nid},
Uri: fmt.Sprintf("%s/snapshot-15", ts.URL),
Restore: types.LayerID(recoverLayer),
}
Expand Down
16 changes: 8 additions & 8 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,16 +429,16 @@ func (app *App) LoadCheckpoint(ctx context.Context) (*checkpoint.PreservedData,
if restore == 0 {
return nil, fmt.Errorf("restore layer not set")
}
nodeIDs := make([]types.NodeID, 0, len(app.signers))

Check warning on line 432 in node/node.go

View check run for this annotation

Codecov / codecov/patch

node/node.go#L432

Added line #L432 was not covered by tests
cfg := &checkpoint.RecoverConfig{
GoldenAtx: types.ATXID(app.Config.Genesis.GoldenATX()),
DataDir: app.Config.DataDir(),
DbFile: dbFile,
LocalDbFile: localDbFile,
PreserveOwnAtx: app.Config.Recovery.PreserveOwnAtx,
// TODO(mafa): FIXXME!
// NodeID: app.edSgn.NodeID(),
Uri: checkpointFile,
Restore: restore,
NodeIDs: nodeIDs,

Check warning on line 439 in node/node.go

View check run for this annotation

Codecov / codecov/patch

node/node.go#L439

Added line #L439 was not covered by tests
Uri: checkpointFile,
Restore: restore,
}
app.log.WithContext(ctx).With().Info("recover from checkpoint",
log.String("url", checkpointFile),
Expand All @@ -453,11 +453,11 @@ func (app *App) Started() <-chan struct{} {

// Lock locks the app for exclusive use. It returns an error if the app is already locked.
func (app *App) Lock() error {
lockdir := filepath.Dir(app.Config.FileLock)
if _, err := os.Stat(lockdir); errors.Is(err, os.ErrNotExist) {
err := os.Mkdir(lockdir, os.ModePerm)
lockDir := filepath.Dir(app.Config.FileLock)
if _, err := os.Stat(lockDir); errors.Is(err, os.ErrNotExist) {
err := os.Mkdir(lockDir, os.ModePerm)
if err != nil {
return fmt.Errorf("creating dir %s for lock %s: %w", lockdir, app.Config.FileLock, err)
return fmt.Errorf("creating dir %s for lock %s: %w", lockDir, app.Config.FileLock, err)

Check warning on line 460 in node/node.go

View check run for this annotation

Codecov / codecov/patch

node/node.go#L460

Added line #L460 was not covered by tests
}
}
fl := flock.New(app.Config.FileLock)
Expand Down

0 comments on commit 80c1796

Please sign in to comment.