Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
millken committed Jul 13, 2023
1 parent 8231b5b commit ea492f5
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 26 deletions.
25 changes: 13 additions & 12 deletions blocksync/blocksync.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,8 @@ type (

startingHeight uint64 // block number this node started to synchronise from
lastTip uint64
lastTipUpdateTime time.Time
targetHeight uint64 // block number of the highest block header this node has received from peers
requestMaxHeight uint64
lastRequestHeight uint64
mu sync.RWMutex
}

Expand Down Expand Up @@ -140,7 +139,6 @@ func NewBlockSyncer(
) (BlockSync, error) {
bs := &blockSyncer{
cfg: cfg,
lastTipUpdateTime: time.Now(),
buf: newBlockBuffer(cfg.BufferSize, cfg.IntervalSize),
tipHeightHandler: tipHeightHandler,
blockByHeightHandler: blockByHeightHandler,
Expand All @@ -151,7 +149,7 @@ func NewBlockSyncer(
targetHeight: 0,
}
if bs.cfg.Interval != 0 {
bs.syncTask = routine.NewTriggerTask(bs.sync, routine.WithTriggerTaskInterval(bs.cfg.RateLimitInterval))
bs.syncTask = routine.NewTriggerTask(bs.sync, routine.DelayTimeBeforeTrigger(bs.cfg.RateLimitInterval))
bs.syncStageTask = routine.NewRecurringTask(bs.syncStageChecker, bs.cfg.Interval)
}
atomic.StoreUint64(&bs.syncBlockIncrease, 0)
Expand All @@ -173,15 +171,15 @@ func (bs *blockSyncer) commitBlocks(blks []*peerBlock) bool {
return false
}

func (bs *blockSyncer) flushInfo() (time.Time, uint64) {
func (bs *blockSyncer) flushInfo() uint64 {
bs.mu.RLock()
defer bs.mu.RUnlock()

return bs.lastTipUpdateTime, bs.targetHeight
return bs.targetHeight
}

func (bs *blockSyncer) sync() {
_, targetHeight := bs.flushInfo()
targetHeight := bs.flushInfo()
intervals := bs.buf.GetBlocksIntervalsToSync(bs.tipHeightHandler(), targetHeight)
// no sync
if len(intervals) == 0 {
Expand All @@ -192,7 +190,7 @@ func (bs *blockSyncer) sync() {
log.L().Info("block sync intervals.",
zap.Any("intervals", intervals),
zap.Uint64("targetHeight", targetHeight))
atomic.StoreUint64(&bs.requestMaxHeight, intervals[len(intervals)-1].End)
atomic.StoreUint64(&bs.lastRequestHeight, intervals[len(intervals)-1].End)
for i, interval := range intervals {
bs.requestBlock(context.Background(), interval.Start, interval.End, bs.cfg.MaxRepeat-i/bs.cfg.RepeatDecayStep)
}
Expand Down Expand Up @@ -293,14 +291,17 @@ func (bs *blockSyncer) ProcessBlock(ctx context.Context, peer string, blk *block
log.L().Debug("flush blocks", zap.Uint64("start", tip), zap.Uint64("end", syncedHeight))
if syncedHeight > bs.lastTip {
bs.lastTip = syncedHeight
bs.lastTipUpdateTime = time.Now()
}
requestMaxHeight := atomic.LoadUint64(&bs.requestMaxHeight)
bs.checkSync(syncedHeight)
return nil
}

func (bs *blockSyncer) checkSync(syncedHeight uint64) {
requestMaxHeight := atomic.LoadUint64(&bs.lastRequestHeight)
if requestMaxHeight > 0 && syncedHeight >= requestMaxHeight {
bs.syncTask.Trigger()
atomic.SwapUint64(&bs.requestMaxHeight, 0)
atomic.SwapUint64(&bs.lastRequestHeight, 0)
}
return nil
}

// ProcessSyncRequest processes a sync request
Expand Down
26 changes: 13 additions & 13 deletions pkg/routine/triggertask.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package routine

import (
"context"
"log"
"time"

"github.com/iotexproject/iotex-core/pkg/lifecycle"
"github.com/iotexproject/iotex-core/pkg/log"
)

var _ lifecycle.StartStopper = (*TriggerTask)(nil)
Expand All @@ -23,29 +23,29 @@ func (o triggerTaskOption) SetTriggerTaskOption(t *TriggerTask) {
o.setTriggerTaskOption(t)
}

// WithTriggerTaskInterval sets the interval of the task
func WithTriggerTaskInterval(d time.Duration) TriggerTaskOption {
// DelayTimeBeforeTrigger sets the delay time before trigger
func DelayTimeBeforeTrigger(d time.Duration) TriggerTaskOption {
return triggerTaskOption{
setTriggerTaskOption: func(t *TriggerTask) {
t.duration = d
t.delay = d
},
}
}

// TriggerTask represents a task that can be triggered
type TriggerTask struct {
lifecycle.Readiness
duration time.Duration
cb Task
ch chan struct{}
delay time.Duration
cb Task
ch chan struct{}
}

// NewTriggerTask creates an instance of TriggerTask
func NewTriggerTask(cb Task, ops ...TriggerTaskOption) *TriggerTask {
tt := &TriggerTask{
cb: cb,
duration: 0,
ch: make(chan struct{}),
cb: cb,
delay: 0,
ch: make(chan struct{}),
}
for _, opt := range ops {
opt.SetTriggerTaskOption(tt)
Expand All @@ -59,8 +59,8 @@ func (t *TriggerTask) Start(_ context.Context) error {
go func() {
close(ready)
for range t.ch {
if t.duration > 0 {
time.Sleep(t.duration)
if t.delay > 0 {
time.Sleep(t.delay)
}
t.cb()
}
Expand All @@ -73,7 +73,7 @@ func (t *TriggerTask) Start(_ context.Context) error {
// Trigger triggers the task
func (t *TriggerTask) Trigger() {
if !t.IsReady() {
log.Println("[WARN] trigger task is not ready")
log.S().Warnf("trigger task is not ready")
return
}
t.ch <- struct{}{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/routine/triggertask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func TestTriggerTask(t *testing.T) {
require := require.New(t)
h := &MockHandler{Count: 0}
ctx := context.Background()
task := routine.NewTriggerTask(h.Do, routine.WithTriggerTaskInterval(200*time.Millisecond))
task := routine.NewTriggerTask(h.Do, routine.DelayTimeBeforeTrigger(200*time.Millisecond))
require.NoError(task.Start(ctx))
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
Expand Down

0 comments on commit ea492f5

Please sign in to comment.