Skip to content

Commit

Permalink
update sync strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
millken committed Jul 19, 2023
1 parent 5852cce commit 1dddb9b
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 30 deletions.
41 changes: 20 additions & 21 deletions blocksync/blocksync.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ type (

// blockSyncer implements BlockSync interface
blockSyncer struct {
lifecycle.Readiness
cfg Config
buf *blockBuffer

Expand All @@ -73,12 +72,13 @@ type (
syncStageTask *routine.RecurringTask

syncStageHeight uint64
syncRetryHeight uint64
syncReady int32
syncBlockIncrease uint64

startingHeight uint64 // block number this node started to synchronise from
targetHeight uint64 // block number of the highest block header this node has received from peers
lastRequestHeight uint64
lastTipHeight uint64 //store the last committed block height
mu sync.RWMutex
}

Expand Down Expand Up @@ -150,7 +150,10 @@ func NewBlockSyncer(
targetHeight: 0,
}
if bs.cfg.Interval != 0 {
bs.syncTask = routine.NewTriggerTask(bs.sync, routine.DelayTimeBeforeTrigger(bs.cfg.RateLimitInterval))
bs.syncTask = routine.NewTriggerTask(bs.sync,
routine.DelayTimeBeforeTrigger(bs.cfg.RateLimitInterval),
routine.TriggerBufferSize(bs.cfg.TriggerBufferSize),
)
bs.syncStageTask = routine.NewRecurringTask(bs.syncStageChecker, bs.cfg.Interval)
}
atomic.StoreUint64(&bs.syncBlockIncrease, 0)
Expand Down Expand Up @@ -229,10 +232,6 @@ func (bs *blockSyncer) Start(ctx context.Context) error {
if err := bs.syncTask.Start(ctx); err != nil {
return err
}
//we need to wait for the peer to be ready, and then start the sync task
go time.AfterFunc(bs.cfg.Interval, func() {
bs.syncTask.Trigger()
})
}
if bs.syncStageTask != nil {
return bs.syncStageTask.Start(ctx)
Expand Down Expand Up @@ -268,17 +267,9 @@ func (bs *blockSyncer) ProcessBlock(ctx context.Context, peer string, blk *block
atomic.StoreUint64(&bs.targetHeight, targetHeight)
}
// If a block in the requested block is lost, try to resend request block in the next interval
if !bs.IsReady() {
if err := bs.TurnOn(); err == nil {
defer func() {
time.AfterFunc(bs.cfg.Interval, func() {
lastTip := atomic.LoadUint64(&bs.lastTipHeight)
if lastTip == bs.tipHeightHandler() && bs.TurnOff() == nil {
bs.syncTask.Trigger()
}
})
}()
}
if atomic.LoadInt32(&bs.syncReady) == 0 {
atomic.StoreInt32(&bs.syncReady, 1)
time.AfterFunc(bs.cfg.Interval, bs.syncRetryChecker)
}
bs.mu.Lock()
defer bs.mu.Unlock()
Expand All @@ -292,11 +283,10 @@ func (bs *blockSyncer) ProcessBlock(ctx context.Context, peer string, blk *block
}
syncedHeight++
}
atomic.StoreUint64(&bs.lastTipHeight, syncedHeight)
bs.buf.Cleanup(syncedHeight)
log.L().Debug("flush blocks", zap.Uint64("start", tip), zap.Uint64("end", syncedHeight))
requestMaxHeight := atomic.LoadUint64(&bs.lastRequestHeight)
if requestMaxHeight > 0 && syncedHeight >= requestMaxHeight {
lastRequestHeight := atomic.LoadUint64(&bs.lastRequestHeight)
if lastRequestHeight > 0 && syncedHeight >= lastRequestHeight {
bs.syncTask.Trigger()
atomic.SwapUint64(&bs.lastRequestHeight, 0)
}
Expand Down Expand Up @@ -331,6 +321,15 @@ func (bs *blockSyncer) ProcessSyncRequest(ctx context.Context, peer peer.AddrInf
return nil
}

func (bs *blockSyncer) syncRetryChecker() {
tipHeight := bs.tipHeightHandler()
if bs.syncRetryHeight == 0 || bs.syncRetryHeight == tipHeight {
bs.syncTask.Trigger()
}
bs.syncRetryHeight = tipHeight
atomic.StoreInt32(&bs.syncReady, 0)
}

func (bs *blockSyncer) syncStageChecker() {
tipHeight := bs.tipHeightHandler()
atomic.StoreUint64(&bs.syncBlockIncrease, tipHeight-bs.syncStageHeight)
Expand Down
2 changes: 2 additions & 0 deletions blocksync/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Config struct {
RateLimitInterval time.Duration `yaml:"rateLimitInterval"`
ProcessSyncRequestTTL time.Duration `yaml:"processSyncRequestTTL"`
BufferSize uint64 `yaml:"bufferSize"`
TriggerBufferSize int `yaml:"triggerBufferSize"`
IntervalSize uint64 `yaml:"intervalSize"`
// MaxRepeat is the maximal number of repeat of a block sync request
MaxRepeat int `yaml:"maxRepeat"`
Expand All @@ -26,6 +27,7 @@ var DefaultConfig = Config{
RateLimitInterval: 1 * time.Second,
ProcessSyncRequestTTL: 10 * time.Second,
BufferSize: 200,
TriggerBufferSize: 2,
IntervalSize: 20,
MaxRepeat: 3,
RepeatDecayStep: 1,
Expand Down
22 changes: 18 additions & 4 deletions pkg/routine/triggertask.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,21 @@ func DelayTimeBeforeTrigger(d time.Duration) TriggerTaskOption {
}
}

// TriggerBufferSize sets the buffer size of trigger channel
func TriggerBufferSize(sz int) TriggerTaskOption {
return triggerTaskOption{
setTriggerTaskOption: func(t *TriggerTask) {
t.sz = sz
},
}
}

// TriggerTask represents a task that can be triggered
type TriggerTask struct {
lifecycle.Readiness
delay time.Duration
cb Task
sz int
ch chan struct{}
mu sync.Mutex
}
Expand All @@ -47,11 +57,12 @@ func NewTriggerTask(cb Task, ops ...TriggerTaskOption) *TriggerTask {
tt := &TriggerTask{
cb: cb,
delay: 0,
ch: make(chan struct{}),
sz: 0,
}
for _, opt := range ops {
opt.SetTriggerTaskOption(tt)
}
tt.ch = make(chan struct{}, tt.sz)
return tt
}

Expand All @@ -72,18 +83,21 @@ func (t *TriggerTask) Start(_ context.Context) error {
return t.TurnOn()
}

// Trigger triggers the task
func (t *TriggerTask) Trigger() {
// Trigger triggers the task, return true if the task is triggered successfully
// this function is non-blocking
func (t *TriggerTask) Trigger() bool {
if !t.IsReady() {
log.S().Warnf("trigger task is not ready")
return
return false
}
t.mu.Lock()
defer t.mu.Unlock()
select {
case t.ch <- struct{}{}:
return true
default:
}
return false
}

// Stop stops the task
Expand Down
56 changes: 51 additions & 5 deletions pkg/routine/triggertask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package routine_test

import (
"context"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -17,17 +19,61 @@ func TestTriggerTask(t *testing.T) {
require.NoError(task.Start(ctx))
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
var succ uint32
done:
for {
select {
case <-ctx.Done():
goto done
break done
default:
task.Trigger()
if task.Trigger() {
succ++
}
}
}
done:
require.Equal(uint(5), h.Count)
time.Sleep(200 * time.Millisecond)
require.Equal(uint32(6), succ)
require.Equal(uint(6), h.Count)
require.NoError(task.Stop(ctx))
task.Trigger()
require.Equal(uint(6), h.Count)
}

func TestTriggerTaskWithBufferSize(t *testing.T) {
require := require.New(t)
h := &MockHandler{Count: 0}
ctx := context.Background()
task := routine.NewTriggerTask(h.Do,
routine.DelayTimeBeforeTrigger(180*time.Millisecond),
routine.TriggerBufferSize(2))
require.NoError(task.Start(ctx))
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
wg := sync.WaitGroup{}
wg.Add(5)
var succ uint32
for i := 0; i < 5; i++ {
go func(i int) {
defer wg.Done()
done:
for {
select {
case <-ctx.Done():
//t.Logf("exit %d\n", i)
break done
default:
if task.Trigger() {
atomic.AddUint32(&succ, 1)
}
}
}
}(i)
}
wg.Wait()
time.Sleep(500 * time.Millisecond)
require.Equal(uint32(8), succ)
require.Equal(uint(8), h.Count)
require.NoError(task.Stop(ctx))
task.Trigger()
require.Equal(uint(5), h.Count)
require.Equal(uint(8), h.Count)
}

0 comments on commit 1dddb9b

Please sign in to comment.