Skip to content

Commit

Permalink
Move latest block fetch out of store callback (#1253)
Browse files Browse the repository at this point in the history
store callback is on the hot path of sync process, a network
request is not suitable to be here since it blocks the entire sync
process for arbitrarily long
  • Loading branch information
omerfirmak authored Sep 20, 2023
1 parent 80eea85 commit 0bfac9d
Showing 1 changed file with 53 additions and 20 deletions.
73 changes: 53 additions & 20 deletions sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,17 +222,17 @@ func (s *Synchronizer) verifierTask(ctx context.Context, block *core.Block, stat
return
}
highestBlockHeader := s.highestBlockHeader.Load()
if highestBlockHeader == nil || highestBlockHeader.Number <= block.Number {
highestBlock, err := s.starknetData.BlockLatest(ctx)
if err != nil {
s.log.Warnw("Failed fetching latest block", "err", err)
} else {
s.highestBlockHeader.Store(highestBlock.Header)
isBehind := highestBlock.Number > block.Number+uint64(maxWorkers())
if s.catchUpMode != isBehind {
resetStreams()
}
s.catchUpMode = isBehind
if highestBlockHeader != nil {
isBehind := highestBlockHeader.Number > block.Number+uint64(maxWorkers())
if s.catchUpMode != isBehind {
resetStreams()
}
s.catchUpMode = isBehind
}

if highestBlockHeader == nil || highestBlockHeader.Number < block.Number {
if s.highestBlockHeader.CompareAndSwap(highestBlockHeader, block.Header) {
s.bestBlockGauge.Set(float64(block.Header.Number))
}
}

Expand Down Expand Up @@ -266,6 +266,8 @@ func (s *Synchronizer) syncBlocks(syncCtx context.Context) {

pendingSem := make(chan struct{}, 1)
go s.pollPending(syncCtx, pendingSem)
latestSem := make(chan struct{}, 1)
go s.pollLatest(syncCtx, latestSem)

for {
select {
Expand All @@ -277,6 +279,7 @@ func (s *Synchronizer) syncBlocks(syncCtx context.Context) {
select {
case <-syncCtx.Done():
pendingSem <- struct{}{}
latestSem <- struct{}{}
return
default:
streamCtx, streamCancel = context.WithCancel(syncCtx)
Expand Down Expand Up @@ -346,18 +349,54 @@ func (s *Synchronizer) pollPending(ctx context.Context, sem chan struct{}) {
select {
case sem <- struct{}{}:
go func() {
defer func() {
<-sem
}()
err := s.fetchAndStorePending(ctx)
if err != nil {
s.log.Debugw("Error while trying to poll pending block", "err", err)
}
<-sem
}()
default:
}
}
}
}

func (s *Synchronizer) pollLatest(ctx context.Context, sem chan struct{}) {
poll := func() {
select {
case sem <- struct{}{}:
go func() {
defer func() {
<-sem
}()
highestBlock, err := s.starknetData.BlockLatest(ctx)
if err != nil {
s.log.Warnw("Failed fetching latest block", "err", err)
} else {
s.highestBlockHeader.Store(highestBlock.Header)
}
s.bestBlockGauge.Set(float64(highestBlock.Header.Number))
}()
default:
}
}

ticker := time.NewTicker(time.Minute)
poll()

for {
select {
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
poll()
}
}
}

func (s *Synchronizer) fetchAndStorePending(ctx context.Context) error {
highestBlockHeader := s.highestBlockHeader.Load()
if highestBlockHeader == nil {
Expand Down Expand Up @@ -394,18 +433,12 @@ func (s *Synchronizer) fetchAndStorePending(ctx context.Context) error {

func (s *Synchronizer) updateStats(block *core.Block) {
var (
transactions = block.TransactionCount
currentHeight = block.Number
highestKnownHeight uint64 = 0
transactions = block.TransactionCount
currentHeight = block.Number
)
highestBlockHeader := s.highestBlockHeader.Load()
if highestBlockHeader != nil {
highestKnownHeight = highestBlockHeader.Number
}

s.blockCount.Inc()
s.chainHeightGauge.Set(float64(currentHeight))
s.bestBlockGauge.Set(float64(highestKnownHeight))
s.transactionCount.Add(float64(transactions))
}

Expand Down

0 comments on commit 0bfac9d

Please sign in to comment.