diff --git a/pkg/node/node.go b/pkg/node/node.go index bb29a504161..6bdc0c85738 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -953,7 +953,7 @@ func NewBee(ctx context.Context, addr string, publicKey *ecdsa.PublicKey, signer pullerService = puller.New(stateStore, kad, batchStore, pullSyncProtocol, p2ps, logger, puller.Options{SyncSleepDur: puller.DefaultSyncErrorSleepDur}, warmupTime) b.pullerCloser = pullerService - depthMonitor := depthmonitor.New(kad, pullerService, storer, batchStore, logger, warmupTime, depthmonitor.DefaultWakeupInterval, !batchStoreExists) + depthMonitor := depthmonitor.New(kad, pullSyncProtocol, storer, batchStore, logger, warmupTime, depthmonitor.DefaultWakeupInterval, !batchStoreExists) b.depthMonitorCloser = depthMonitor if o.EnableStorageIncentives { diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 9ce0a9bd5a3..1557cc86412 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -270,7 +270,6 @@ func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uin sleep := false loopStart := time.Now() - loggerV2.Debug("histSyncWorker starting", "peer_address", peer, "bin", bin, "cursor", cur) for { @@ -305,18 +304,18 @@ func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uin } syncStart := time.Now() - ctx, cancel := context.WithTimeout(ctx, histSyncTimeout) + top, err := p.syncer.SyncInterval(ctx, peer, bin, s, cur) if err != nil { cancel() p.metrics.HistWorkerErrCounter.Inc() - loggerV2.Debug("histSyncWorker syncing interval failed", "peer_address", peer, "bin", bin, "cursor", cur, "start", s, "topmost", top, "err", err) + loggerV2.Debug("histSyncWorker interval failed", "peer_address", peer, "bin", bin, "cursor", cur, "start", s, "topmost", top, "err", err) if errors.Is(err, context.DeadlineExceeded) { - p.logger.Debug("peer sync interval timeout, exiting", "total_duration", time.Since(loopStart), "peer_address", peer, "error", err) + p.logger.Debug("histSyncWorker interval timeout, exiting", "total_duration", time.Since(loopStart), "peer_address", peer, "error", err) err = p.blockLister.Blocklist(peer, histSyncTimeoutBlockList, "sync interval timeout") if err != nil { - p.logger.Debug("peer sync interval timeout disconnect error", "error", err) + p.logger.Debug("histSyncWorker timeout disconnect error", "error", err) } return } diff --git a/pkg/pullsync/pullsync.go b/pkg/pullsync/pullsync.go index 5d080dd00e8..71a16cbf2a8 100644 --- a/pkg/pullsync/pullsync.go +++ b/pkg/pullsync/pullsync.go @@ -23,6 +23,7 @@ import ( "github.com/ethersphere/bee/pkg/postage" "github.com/ethersphere/bee/pkg/pullsync/pb" "github.com/ethersphere/bee/pkg/pullsync/pullstorage" + "github.com/ethersphere/bee/pkg/rate" "github.com/ethersphere/bee/pkg/soc" "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/swarm" @@ -39,7 +40,10 @@ const ( cancelStreamName = "cancel" ) -const MaxCursor = math.MaxUint64 +const ( + MaxCursor = math.MaxUint64 + DefaultRateDuration = time.Minute * 10 +) var ( ErrUnsolicitedChunk = errors.New("peer sent unsolicited chunk") @@ -76,6 +80,8 @@ type Syncer struct { radius postage.RadiusChecker overlayAddress swarm.Address + rate *rate.Rate + Interface io.Closer } @@ -93,6 +99,7 @@ func New(streamer p2p.Streamer, storage pullstorage.Storer, unwrap func(swarm.Ch quit: make(chan struct{}), radius: radius, overlayAddress: overlayAddress, + rate: rate.New(DefaultRateDuration), } } @@ -233,9 +240,12 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8 } if len(chunksToPut) > 0 { + + if to != MaxCursor { // historical syncing + s.rate.Add(len(chunksToPut)) + } + s.metrics.DbOps.Inc() - ctx, cancel := context.WithTimeout(ctx, storagePutTimeout) - defer cancel() if err := s.storage.Put(ctx, storage.ModePutSync, chunksToPut...); err != nil { return topmost, fmt.Errorf("delivery put: %w", err) @@ -246,6 +256,11 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8 return topmost, nil } +// Rate returns chunks per second synced +func (s *Syncer) Rate() float64 { + return s.rate.Rate() +} + // handler handles an incoming request to sync an interval func (s *Syncer) handler(streamCtx context.Context, p p2p.Peer, stream p2p.Stream) (err error) { select { diff --git a/pkg/topology/depthmonitor/depthmonitor.go b/pkg/topology/depthmonitor/depthmonitor.go index 99c01e68d67..4fca15d6e37 100644 --- a/pkg/topology/depthmonitor/depthmonitor.go +++ b/pkg/topology/depthmonitor/depthmonitor.go @@ -6,6 +6,7 @@ package depthmonitor import ( "errors" + "fmt" "time" "github.com/ethersphere/bee/pkg/log" @@ -38,7 +39,7 @@ type ReserveReporter interface { // SyncReporter interface needs to be implemented by the syncing component of the node (puller). type SyncReporter interface { // Number of active historical syncing jobs. - ActiveHistoricalSyncing() uint64 + Rate() float64 } // Topology interface encapsulates the functionality required by the topology component @@ -124,27 +125,33 @@ func (s *Service) manage(warmupTime, wakeupInterval time.Duration, freshNode boo targetSize := s.reserve.ReserveCapacity() * 4 / 10 // 40% of the capacity - next := func() { + for { + select { + case <-s.quit: + return + case <-time.After(wakeupInterval): + } + radius := s.bs.StorageRadius() currentSize, err := s.reserve.ComputeReserveSize(radius) if err != nil { s.logger.Error(err, "depthmonitor: failed reading reserve size") - return + continue } // save last calculated reserve size s.lastRSize.Store(currentSize) - syncCount := s.syncer.ActiveHistoricalSyncing() - s.logger.Info("depthmonitor: state", "size", currentSize, "radius", radius, "sync_count", syncCount) + rate := s.syncer.Rate() + s.logger.Info("depthmonitor: state", "size", currentSize, "radius", radius, "sync_rate", fmt.Sprintf("%.2f ch/s", rate)) if currentSize > targetSize { - return + continue } // if historical syncing rate is at zero, we proactively decrease the storage radius to allow nodes to widen their neighbourhoods - if syncCount == 0 && s.topology.PeersCount(topologyDriver.Filter{}) != 0 { + if rate == 0 && s.topology.PeersCount(topologyDriver.Filter{}) != 0 { err = s.bs.SetStorageRadius(func(radius uint8) uint8 { if radius > s.minimumRadius { radius-- @@ -157,20 +164,10 @@ func (s *Service) manage(warmupTime, wakeupInterval time.Duration, freshNode boo } } } - next() - - for { - select { - case <-s.quit: - return - case <-time.After(wakeupInterval): - next() - } - } } func (s *Service) IsFullySynced() bool { - return s.syncer.ActiveHistoricalSyncing() == 0 && s.lastRSize.Load() > s.reserve.ReserveCapacity()*4/10 + return s.syncer.Rate() == 0 && s.lastRSize.Load() > s.reserve.ReserveCapacity()*4/10 } func (s *Service) Close() error { diff --git a/pkg/topology/depthmonitor/depthmonitor_test.go b/pkg/topology/depthmonitor/depthmonitor_test.go index a026790c22c..96632655fc2 100644 --- a/pkg/topology/depthmonitor/depthmonitor_test.go +++ b/pkg/topology/depthmonitor/depthmonitor_test.go @@ -216,10 +216,10 @@ func (m *mockTopology) getStorageDepth() uint8 { } type mockSyncReporter struct { - rate uint64 + rate float64 } -func (m *mockSyncReporter) ActiveHistoricalSyncing() uint64 { +func (m *mockSyncReporter) Rate() float64 { return m.rate }