Skip to content

Commit

Permalink
Allow configuration of old style warmup routine
Browse files Browse the repository at this point in the history
  • Loading branch information
ZenGround0 committed Dec 26, 2024
1 parent cc0acca commit a4ac610
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 104 deletions.
4 changes: 4 additions & 0 deletions blockstore/splitstore/splitstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ type Config struct {
// from the hotstore should be written to the cold store
UniversalColdBlocks bool

// FullWarmup indicates to do a chain traversal upon splitstore init to copy
// from cold store to hot store
FullWarmup bool

// HotstoreMessageRetention indicates the hotstore retention policy for messages.
// It has the following semantics:
// - a value of 0 will only retain messages within the compaction boundary (4 finalities)
Expand Down
217 changes: 117 additions & 100 deletions blockstore/splitstore/splitstore_warmup.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
package splitstore

import (
"sync"
"sync/atomic"
"time"

"golang.org/x/xerrors"

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/types"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
)

var (
// WarmupBoundary is the number of epochs to load state during warmup.
WarmupBoundary = policy.ChainFinality
// Empirically taken from December 2024
MarkSetEstimate int64 = 10_000_000_000
)

// warmup acquires the compaction lock and spawns a goroutine to warm up the hotstore;
Expand All @@ -29,7 +36,12 @@ func (s *SplitStore) warmup(curTs *types.TipSet) error {
log.Info("warming up hotstore")
start := time.Now()

err := s.doWarmup2(curTs)
var err error
if s.cfg.FullWarmup {
err = s.doWarmup(curTs)
} else {
err = s.doWarmup2(curTs)
}
if err != nil {
log.Errorf("error warming up hotstore: %s", err)
return
Expand All @@ -41,12 +53,14 @@ func (s *SplitStore) warmup(curTs *types.TipSet) error {
return nil
}

// Warmup2
func (s *SplitStore) doWarmup2(curTs *types.TipSet) error {
log.Infow("warmup starting")

epoch := curTs.Height()
count := new(int64)
*count = 10_000_000_000
// Empirically taken from December 2024
*count = MarkSetEstimate
s.markSetSize = *count + *count>>2 // overestimate a bit
err := s.ds.Put(s.ctx, markSetSizeKey, int64ToBytes(s.markSetSize))
if err != nil {
Expand All @@ -68,101 +82,104 @@ func (s *SplitStore) doWarmup2(curTs *types.TipSet) error {
return nil
}

// the legacy warmup procedure before we wrote snapshots directly to the hotstore
// func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
// var boundaryEpoch abi.ChainEpoch
// epoch := curTs.Height()
// if WarmupBoundary < epoch {
// boundaryEpoch = epoch - WarmupBoundary
// }
// var mx sync.Mutex
// batchHot := make([]blocks.Block, 0, batchSize)
// count := new(int64)
// xcount := new(int64)
// missing := new(int64)

// visitor, err := s.markSetEnv.New("warmup", 0)
// if err != nil {
// return xerrors.Errorf("error creating visitor: %w", err)
// }
// defer visitor.Close() //nolint

// err = s.walkChain(curTs, boundaryEpoch, epoch+1, // we don't load messages/receipts in warmup
// visitor,
// func(c cid.Cid) error {
// if isUnitaryObject(c) {
// return errStopWalk
// }

// atomic.AddInt64(count, 1)

// has, err := s.hot.Has(s.ctx, c)
// if err != nil {
// return err
// }

// if has {
// return nil
// }

// blk, err := s.cold.Get(s.ctx, c)
// if err != nil {
// if ipld.IsNotFound(err) {
// atomic.AddInt64(missing, 1)
// return errStopWalk
// }
// return err
// }

// atomic.AddInt64(xcount, 1)

// mx.Lock()
// batchHot = append(batchHot, blk)
// if len(batchHot) == batchSize {
// err = s.hot.PutMany(s.ctx, batchHot)
// if err != nil {
// mx.Unlock()
// return err
// }
// batchHot = batchHot[:0]
// }
// mx.Unlock()

// return nil
// }, func(cid.Cid) error { return nil })

// if err != nil {
// return err
// }

// if len(batchHot) > 0 {
// err = s.hot.PutMany(s.ctx, batchHot)
// if err != nil {
// return err
// }
// }

// log.Infow("warmup stats", "visited", *count, "warm", *xcount, "missing", *missing)

// s.markSetSize = *count + *count>>2 // overestimate a bit
// err = s.ds.Put(s.ctx, markSetSizeKey, int64ToBytes(s.markSetSize))
// if err != nil {
// log.Warnf("error saving mark set size: %s", err)
// }

// // save the warmup epoch
// err = s.ds.Put(s.ctx, warmupEpochKey, epochToBytes(epoch))
// if err != nil {
// return xerrors.Errorf("error saving warm up epoch: %w", err)
// }

// s.warmupEpoch.Store(int64(epoch))

// // also save the compactionIndex, as this is used as an indicator of warmup for upgraded nodes
// err = s.ds.Put(s.ctx, compactionIndexKey, int64ToBytes(s.compactionIndex))
// if err != nil {
// return xerrors.Errorf("error saving compaction index: %w", err)
// }

// return nil
// }
// the full warmup procedure
// this was standard warmup before we wrote snapshots directly to the hotstore
// now this is used only if explicitly configured. A good use case for this is
// when starting splitstore off of an unpruned full node.
func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
var boundaryEpoch abi.ChainEpoch
epoch := curTs.Height()
if WarmupBoundary < epoch {
boundaryEpoch = epoch - WarmupBoundary
}
var mx sync.Mutex
batchHot := make([]blocks.Block, 0, batchSize)
count := new(int64)
xcount := new(int64)
missing := new(int64)

visitor, err := s.markSetEnv.New("warmup", 0)
if err != nil {
return xerrors.Errorf("error creating visitor: %w", err)
}
defer visitor.Close() //nolint

err = s.walkChain(curTs, boundaryEpoch, epoch+1, // we don't load messages/receipts in warmup
visitor,
func(c cid.Cid) error {
if isUnitaryObject(c) {
return errStopWalk
}

atomic.AddInt64(count, 1)

has, err := s.hot.Has(s.ctx, c)
if err != nil {
return err
}

if has {
return nil
}

blk, err := s.cold.Get(s.ctx, c)
if err != nil {
if ipld.IsNotFound(err) {
atomic.AddInt64(missing, 1)
return errStopWalk
}
return err
}

atomic.AddInt64(xcount, 1)

mx.Lock()
batchHot = append(batchHot, blk)
if len(batchHot) == batchSize {
err = s.hot.PutMany(s.ctx, batchHot)
if err != nil {
mx.Unlock()
return err
}
batchHot = batchHot[:0]
}
mx.Unlock()

return nil
}, func(cid.Cid) error { return nil })

if err != nil {
return err
}

if len(batchHot) > 0 {
err = s.hot.PutMany(s.ctx, batchHot)
if err != nil {
return err
}
}

log.Infow("warmup stats", "visited", *count, "warm", *xcount, "missing", *missing)

s.markSetSize = *count + *count>>2 // overestimate a bit
err = s.ds.Put(s.ctx, markSetSizeKey, int64ToBytes(s.markSetSize))
if err != nil {
log.Warnf("error saving mark set size: %s", err)
}

// save the warmup epoch
err = s.ds.Put(s.ctx, warmupEpochKey, epochToBytes(epoch))
if err != nil {
return xerrors.Errorf("error saving warm up epoch: %w", err)
}

s.warmupEpoch.Store(int64(epoch))

// also save the compactionIndex, as this is used as an indicator of warmup for upgraded nodes
err = s.ds.Put(s.ctx, compactionIndexKey, int64ToBytes(s.compactionIndex))
if err != nil {
return xerrors.Errorf("error saving compaction index: %w", err)
}

return nil
}
2 changes: 1 addition & 1 deletion cmd/lotus-sim/simulation/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func NewNode(ctx context.Context, r repo.Repo) (nd *Node, _err error) {
}
}()

bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions node/config/def.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func DefaultFullNode() *FullNode {
ColdStoreType: "discard",
HotStoreType: "badger",
MarkSetType: "badger",
FullWarmup: false,

HotStoreFullGCFrequency: 20,
HotStoreMaxSpaceTarget: 650_000_000_000,
Expand Down
6 changes: 6 additions & 0 deletions node/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,12 @@ type Splitstore struct {
// is set. Moving GC will not occur when total moving size exceeds
// HotstoreMaxSpaceTarget - HotstoreMaxSpaceSafetyBuffer
HotstoreMaxSpaceSafetyBuffer uint64

// Perform a full warmup from the coldstore to the hotstore upon splitstore startup.
// This is useful in the case you are migrating from a non-splitstore setup to splitstore.
// This should be false in the common case where a node is initialized from a snapshot
// since snapshots are loaded directly to the hotstore.
FullWarmup bool
}

// Full Node
Expand Down
8 changes: 5 additions & 3 deletions node/modules/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,11 @@ func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.Locked
}

cfg := &splitstore.Config{
MarkSetType: cfg.Splitstore.MarkSetType,
DiscardColdBlocks: cfg.Splitstore.ColdStoreType == "discard",
UniversalColdBlocks: cfg.Splitstore.ColdStoreType == "universal",
MarkSetType: cfg.Splitstore.MarkSetType,
DiscardColdBlocks: cfg.Splitstore.ColdStoreType == "discard",
UniversalColdBlocks: cfg.Splitstore.ColdStoreType == "universal",
FullWarmup: cfg.Splitstore.FullWarmup,

HotStoreMessageRetention: cfg.Splitstore.HotStoreMessageRetention,
HotStoreFullGCFrequency: cfg.Splitstore.HotStoreFullGCFrequency,
HotstoreMaxSpaceTarget: cfg.Splitstore.HotStoreMaxSpaceTarget,
Expand Down

0 comments on commit a4ac610

Please sign in to comment.