From a4ac6106160c1e9b37445e7abc7123c9da83f431 Mon Sep 17 00:00:00 2001 From: zenground0 Date: Thu, 26 Dec 2024 10:42:03 +0530 Subject: [PATCH] Allow configuration of old style warmup routine --- blockstore/splitstore/splitstore.go | 4 + blockstore/splitstore/splitstore_warmup.go | 217 +++++++++++---------- cmd/lotus-sim/simulation/node.go | 2 +- node/config/def.go | 1 + node/config/types.go | 6 + node/modules/blockstore.go | 8 +- 6 files changed, 134 insertions(+), 104 deletions(-) diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index a3b93599f00..18aa755bd21 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -102,6 +102,10 @@ type Config struct { // from the hotstore should be written to the cold store UniversalColdBlocks bool + // FullWarmup indicates to do a chain traversal upon splitstore init to copy + // from cold store to hot store + FullWarmup bool + // HotstoreMessageRetention indicates the hotstore retention policy for messages. // It has the following semantics: // - a value of 0 will only retain messages within the compaction boundary (4 finalities) diff --git a/blockstore/splitstore/splitstore_warmup.go b/blockstore/splitstore/splitstore_warmup.go index c8b9df26f30..4319184ace4 100644 --- a/blockstore/splitstore/splitstore_warmup.go +++ b/blockstore/splitstore/splitstore_warmup.go @@ -1,18 +1,25 @@ package splitstore import ( + "sync" "sync/atomic" "time" "golang.org/x/xerrors" + "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/chain/actors/policy" "github.com/filecoin-project/lotus/chain/types" + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + ipld "github.com/ipfs/go-ipld-format" ) var ( // WarmupBoundary is the number of epochs to load state during warmup. WarmupBoundary = policy.ChainFinality + // Empirically taken from December 2024 + MarkSetEstimate int64 = 10_000_000_000 ) // warmup acquires the compaction lock and spawns a goroutine to warm up the hotstore; @@ -29,7 +36,12 @@ func (s *SplitStore) warmup(curTs *types.TipSet) error { log.Info("warming up hotstore") start := time.Now() - err := s.doWarmup2(curTs) + var err error + if s.cfg.FullWarmup { + err = s.doWarmup(curTs) + } else { + err = s.doWarmup2(curTs) + } if err != nil { log.Errorf("error warming up hotstore: %s", err) return @@ -41,12 +53,14 @@ func (s *SplitStore) warmup(curTs *types.TipSet) error { return nil } +// Warmup2 func (s *SplitStore) doWarmup2(curTs *types.TipSet) error { log.Infow("warmup starting") epoch := curTs.Height() count := new(int64) - *count = 10_000_000_000 + // Empirically taken from December 2024 + *count = MarkSetEstimate s.markSetSize = *count + *count>>2 // overestimate a bit err := s.ds.Put(s.ctx, markSetSizeKey, int64ToBytes(s.markSetSize)) if err != nil { @@ -68,101 +82,104 @@ func (s *SplitStore) doWarmup2(curTs *types.TipSet) error { return nil } -// the legacy warmup procedure before we wrote snapshots directly to the hotstore -// func (s *SplitStore) doWarmup(curTs *types.TipSet) error { -// var boundaryEpoch abi.ChainEpoch -// epoch := curTs.Height() -// if WarmupBoundary < epoch { -// boundaryEpoch = epoch - WarmupBoundary -// } -// var mx sync.Mutex -// batchHot := make([]blocks.Block, 0, batchSize) -// count := new(int64) -// xcount := new(int64) -// missing := new(int64) - -// visitor, err := s.markSetEnv.New("warmup", 0) -// if err != nil { -// return xerrors.Errorf("error creating visitor: %w", err) -// } -// defer visitor.Close() //nolint - -// err = s.walkChain(curTs, boundaryEpoch, epoch+1, // we don't load messages/receipts in warmup -// visitor, -// func(c cid.Cid) error { -// if isUnitaryObject(c) { -// return errStopWalk -// } - -// atomic.AddInt64(count, 1) - -// has, err := s.hot.Has(s.ctx, c) -// if err != nil { -// return err -// } - -// if has { -// return nil -// } - -// blk, err := s.cold.Get(s.ctx, c) -// if err != nil { -// if ipld.IsNotFound(err) { -// atomic.AddInt64(missing, 1) -// return errStopWalk -// } -// return err -// } - -// atomic.AddInt64(xcount, 1) - -// mx.Lock() -// batchHot = append(batchHot, blk) -// if len(batchHot) == batchSize { -// err = s.hot.PutMany(s.ctx, batchHot) -// if err != nil { -// mx.Unlock() -// return err -// } -// batchHot = batchHot[:0] -// } -// mx.Unlock() - -// return nil -// }, func(cid.Cid) error { return nil }) - -// if err != nil { -// return err -// } - -// if len(batchHot) > 0 { -// err = s.hot.PutMany(s.ctx, batchHot) -// if err != nil { -// return err -// } -// } - -// log.Infow("warmup stats", "visited", *count, "warm", *xcount, "missing", *missing) - -// s.markSetSize = *count + *count>>2 // overestimate a bit -// err = s.ds.Put(s.ctx, markSetSizeKey, int64ToBytes(s.markSetSize)) -// if err != nil { -// log.Warnf("error saving mark set size: %s", err) -// } - -// // save the warmup epoch -// err = s.ds.Put(s.ctx, warmupEpochKey, epochToBytes(epoch)) -// if err != nil { -// return xerrors.Errorf("error saving warm up epoch: %w", err) -// } - -// s.warmupEpoch.Store(int64(epoch)) - -// // also save the compactionIndex, as this is used as an indicator of warmup for upgraded nodes -// err = s.ds.Put(s.ctx, compactionIndexKey, int64ToBytes(s.compactionIndex)) -// if err != nil { -// return xerrors.Errorf("error saving compaction index: %w", err) -// } - -// return nil -// } +// the full warmup procedure +// this was standard warmup before we wrote snapshots directly to the hotstore +// now this is used only if explicitly configured. A good use case for this is +// when starting splitstore off of an unpruned full node. +func (s *SplitStore) doWarmup(curTs *types.TipSet) error { + var boundaryEpoch abi.ChainEpoch + epoch := curTs.Height() + if WarmupBoundary < epoch { + boundaryEpoch = epoch - WarmupBoundary + } + var mx sync.Mutex + batchHot := make([]blocks.Block, 0, batchSize) + count := new(int64) + xcount := new(int64) + missing := new(int64) + + visitor, err := s.markSetEnv.New("warmup", 0) + if err != nil { + return xerrors.Errorf("error creating visitor: %w", err) + } + defer visitor.Close() //nolint + + err = s.walkChain(curTs, boundaryEpoch, epoch+1, // we don't load messages/receipts in warmup + visitor, + func(c cid.Cid) error { + if isUnitaryObject(c) { + return errStopWalk + } + + atomic.AddInt64(count, 1) + + has, err := s.hot.Has(s.ctx, c) + if err != nil { + return err + } + + if has { + return nil + } + + blk, err := s.cold.Get(s.ctx, c) + if err != nil { + if ipld.IsNotFound(err) { + atomic.AddInt64(missing, 1) + return errStopWalk + } + return err + } + + atomic.AddInt64(xcount, 1) + + mx.Lock() + batchHot = append(batchHot, blk) + if len(batchHot) == batchSize { + err = s.hot.PutMany(s.ctx, batchHot) + if err != nil { + mx.Unlock() + return err + } + batchHot = batchHot[:0] + } + mx.Unlock() + + return nil + }, func(cid.Cid) error { return nil }) + + if err != nil { + return err + } + + if len(batchHot) > 0 { + err = s.hot.PutMany(s.ctx, batchHot) + if err != nil { + return err + } + } + + log.Infow("warmup stats", "visited", *count, "warm", *xcount, "missing", *missing) + + s.markSetSize = *count + *count>>2 // overestimate a bit + err = s.ds.Put(s.ctx, markSetSizeKey, int64ToBytes(s.markSetSize)) + if err != nil { + log.Warnf("error saving mark set size: %s", err) + } + + // save the warmup epoch + err = s.ds.Put(s.ctx, warmupEpochKey, epochToBytes(epoch)) + if err != nil { + return xerrors.Errorf("error saving warm up epoch: %w", err) + } + + s.warmupEpoch.Store(int64(epoch)) + + // also save the compactionIndex, as this is used as an indicator of warmup for upgraded nodes + err = s.ds.Put(s.ctx, compactionIndexKey, int64ToBytes(s.compactionIndex)) + if err != nil { + return xerrors.Errorf("error saving compaction index: %w", err) + } + + return nil +} diff --git a/cmd/lotus-sim/simulation/node.go b/cmd/lotus-sim/simulation/node.go index cda3e69d839..f1efb2543cd 100644 --- a/cmd/lotus-sim/simulation/node.go +++ b/cmd/lotus-sim/simulation/node.go @@ -51,7 +51,7 @@ func NewNode(ctx context.Context, r repo.Repo) (nd *Node, _err error) { } }() - bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore) + bs, _, err := lr.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return nil, err } diff --git a/node/config/def.go b/node/config/def.go index e6bdc04bdb8..3f0c30272b3 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -75,6 +75,7 @@ func DefaultFullNode() *FullNode { ColdStoreType: "discard", HotStoreType: "badger", MarkSetType: "badger", + FullWarmup: false, HotStoreFullGCFrequency: 20, HotStoreMaxSpaceTarget: 650_000_000_000, diff --git a/node/config/types.go b/node/config/types.go index 3824427b2af..626f74029f2 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -523,6 +523,12 @@ type Splitstore struct { // is set. Moving GC will not occur when total moving size exceeds // HotstoreMaxSpaceTarget - HotstoreMaxSpaceSafetyBuffer HotstoreMaxSpaceSafetyBuffer uint64 + + // Perform a full warmup from the coldstore to the hotstore upon splitstore startup. + // This is useful in the case you are migrating from a non-splitstore setup to splitstore. + // This should be false in the common case where a node is initialized from a snapshot + // since snapshots are loaded directly to the hotstore. + FullWarmup bool } // Full Node diff --git a/node/modules/blockstore.go b/node/modules/blockstore.go index ec0d6fa5c03..4fbc4dfa9b7 100644 --- a/node/modules/blockstore.go +++ b/node/modules/blockstore.go @@ -83,9 +83,11 @@ func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.Locked } cfg := &splitstore.Config{ - MarkSetType: cfg.Splitstore.MarkSetType, - DiscardColdBlocks: cfg.Splitstore.ColdStoreType == "discard", - UniversalColdBlocks: cfg.Splitstore.ColdStoreType == "universal", + MarkSetType: cfg.Splitstore.MarkSetType, + DiscardColdBlocks: cfg.Splitstore.ColdStoreType == "discard", + UniversalColdBlocks: cfg.Splitstore.ColdStoreType == "universal", + FullWarmup: cfg.Splitstore.FullWarmup, + HotStoreMessageRetention: cfg.Splitstore.HotStoreMessageRetention, HotStoreFullGCFrequency: cfg.Splitstore.HotStoreFullGCFrequency, HotstoreMaxSpaceTarget: cfg.Splitstore.HotStoreMaxSpaceTarget,