Skip to content

Commit

Permalink
revert(depthmonitor): use rate of chunks as signal (#3772)
Browse files Browse the repository at this point in the history
  • Loading branch information
istae authored Feb 13, 2023
1 parent 731d6e7 commit 88c1d23
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 29 deletions.
2 changes: 1 addition & 1 deletion pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -953,7 +953,7 @@ func NewBee(ctx context.Context, addr string, publicKey *ecdsa.PublicKey, signer
pullerService = puller.New(stateStore, kad, batchStore, pullSyncProtocol, p2ps, logger, puller.Options{SyncSleepDur: puller.DefaultSyncErrorSleepDur}, warmupTime)
b.pullerCloser = pullerService

depthMonitor := depthmonitor.New(kad, pullerService, storer, batchStore, logger, warmupTime, depthmonitor.DefaultWakeupInterval, !batchStoreExists)
depthMonitor := depthmonitor.New(kad, pullSyncProtocol, storer, batchStore, logger, warmupTime, depthmonitor.DefaultWakeupInterval, !batchStoreExists)
b.depthMonitorCloser = depthMonitor

if o.EnableStorageIncentives {
Expand Down
9 changes: 4 additions & 5 deletions pkg/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uin

sleep := false
loopStart := time.Now()

loggerV2.Debug("histSyncWorker starting", "peer_address", peer, "bin", bin, "cursor", cur)

for {
Expand Down Expand Up @@ -305,18 +304,18 @@ func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uin
}

syncStart := time.Now()

ctx, cancel := context.WithTimeout(ctx, histSyncTimeout)

top, err := p.syncer.SyncInterval(ctx, peer, bin, s, cur)
if err != nil {
cancel()
p.metrics.HistWorkerErrCounter.Inc()
loggerV2.Debug("histSyncWorker syncing interval failed", "peer_address", peer, "bin", bin, "cursor", cur, "start", s, "topmost", top, "err", err)
loggerV2.Debug("histSyncWorker interval failed", "peer_address", peer, "bin", bin, "cursor", cur, "start", s, "topmost", top, "err", err)
if errors.Is(err, context.DeadlineExceeded) {
p.logger.Debug("peer sync interval timeout, exiting", "total_duration", time.Since(loopStart), "peer_address", peer, "error", err)
p.logger.Debug("histSyncWorker interval timeout, exiting", "total_duration", time.Since(loopStart), "peer_address", peer, "error", err)
err = p.blockLister.Blocklist(peer, histSyncTimeoutBlockList, "sync interval timeout")
if err != nil {
p.logger.Debug("peer sync interval timeout disconnect error", "error", err)
p.logger.Debug("histSyncWorker timeout disconnect error", "error", err)
}
return
}
Expand Down
21 changes: 18 additions & 3 deletions pkg/pullsync/pullsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/pullsync/pb"
"github.com/ethersphere/bee/pkg/pullsync/pullstorage"
"github.com/ethersphere/bee/pkg/rate"
"github.com/ethersphere/bee/pkg/soc"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
Expand All @@ -39,7 +40,10 @@ const (
cancelStreamName = "cancel"
)

const MaxCursor = math.MaxUint64
const (
MaxCursor = math.MaxUint64
DefaultRateDuration = time.Minute * 10
)

var (
ErrUnsolicitedChunk = errors.New("peer sent unsolicited chunk")
Expand Down Expand Up @@ -76,6 +80,8 @@ type Syncer struct {
radius postage.RadiusChecker
overlayAddress swarm.Address

rate *rate.Rate

Interface
io.Closer
}
Expand All @@ -93,6 +99,7 @@ func New(streamer p2p.Streamer, storage pullstorage.Storer, unwrap func(swarm.Ch
quit: make(chan struct{}),
radius: radius,
overlayAddress: overlayAddress,
rate: rate.New(DefaultRateDuration),
}
}

Expand Down Expand Up @@ -233,9 +240,12 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8
}

if len(chunksToPut) > 0 {

if to != MaxCursor { // historical syncing
s.rate.Add(len(chunksToPut))
}

s.metrics.DbOps.Inc()
ctx, cancel := context.WithTimeout(ctx, storagePutTimeout)
defer cancel()

if err := s.storage.Put(ctx, storage.ModePutSync, chunksToPut...); err != nil {
return topmost, fmt.Errorf("delivery put: %w", err)
Expand All @@ -246,6 +256,11 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8
return topmost, nil
}

// Rate returns chunks per second synced
func (s *Syncer) Rate() float64 {
return s.rate.Rate()
}

// handler handles an incoming request to sync an interval
func (s *Syncer) handler(streamCtx context.Context, p p2p.Peer, stream p2p.Stream) (err error) {
select {
Expand Down
33 changes: 15 additions & 18 deletions pkg/topology/depthmonitor/depthmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package depthmonitor

import (
"errors"
"fmt"
"time"

"github.com/ethersphere/bee/pkg/log"
Expand Down Expand Up @@ -38,7 +39,7 @@ type ReserveReporter interface {
// SyncReporter interface needs to be implemented by the syncing component of the node (puller).
type SyncReporter interface {
// Number of active historical syncing jobs.
ActiveHistoricalSyncing() uint64
Rate() float64
}

// Topology interface encapsulates the functionality required by the topology component
Expand Down Expand Up @@ -124,27 +125,33 @@ func (s *Service) manage(warmupTime, wakeupInterval time.Duration, freshNode boo

targetSize := s.reserve.ReserveCapacity() * 4 / 10 // 40% of the capacity

next := func() {
for {
select {
case <-s.quit:
return
case <-time.After(wakeupInterval):
}

radius := s.bs.StorageRadius()

currentSize, err := s.reserve.ComputeReserveSize(radius)
if err != nil {
s.logger.Error(err, "depthmonitor: failed reading reserve size")
return
continue
}

// save last calculated reserve size
s.lastRSize.Store(currentSize)

syncCount := s.syncer.ActiveHistoricalSyncing()
s.logger.Info("depthmonitor: state", "size", currentSize, "radius", radius, "sync_count", syncCount)
rate := s.syncer.Rate()
s.logger.Info("depthmonitor: state", "size", currentSize, "radius", radius, "sync_rate", fmt.Sprintf("%.2f ch/s", rate))

if currentSize > targetSize {
return
continue
}

// if historical syncing rate is at zero, we proactively decrease the storage radius to allow nodes to widen their neighbourhoods
if syncCount == 0 && s.topology.PeersCount(topologyDriver.Filter{}) != 0 {
if rate == 0 && s.topology.PeersCount(topologyDriver.Filter{}) != 0 {
err = s.bs.SetStorageRadius(func(radius uint8) uint8 {
if radius > s.minimumRadius {
radius--
Expand All @@ -157,20 +164,10 @@ func (s *Service) manage(warmupTime, wakeupInterval time.Duration, freshNode boo
}
}
}
next()

for {
select {
case <-s.quit:
return
case <-time.After(wakeupInterval):
next()
}
}
}

func (s *Service) IsFullySynced() bool {
return s.syncer.ActiveHistoricalSyncing() == 0 && s.lastRSize.Load() > s.reserve.ReserveCapacity()*4/10
return s.syncer.Rate() == 0 && s.lastRSize.Load() > s.reserve.ReserveCapacity()*4/10
}

func (s *Service) Close() error {
Expand Down
4 changes: 2 additions & 2 deletions pkg/topology/depthmonitor/depthmonitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,10 @@ func (m *mockTopology) getStorageDepth() uint8 {
}

type mockSyncReporter struct {
rate uint64
rate float64
}

func (m *mockSyncReporter) ActiveHistoricalSyncing() uint64 {
func (m *mockSyncReporter) Rate() float64 {
return m.rate
}

Expand Down

0 comments on commit 88c1d23

Please sign in to comment.