Skip to content

Commit

Permalink
lock around monitor while searching for the next block
Browse files Browse the repository at this point in the history
  • Loading branch information
pkieltyka committed Aug 10, 2023
1 parent 19b59d5 commit e178e46
Showing 1 changed file with 32 additions and 11 deletions.
43 changes: 32 additions & 11 deletions ethmonitor/ethmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,26 +259,21 @@ func (m *Monitor) monitor() error {
case <-m.ctx.Done():
return nil

case <-time.After(pollInterval):
case <-time.After(pollInterval / 100): // todo adjust poll times,,
// ...
headBlock := m.chain.Head()
if headBlock != nil {
m.nextBlockNumber = big.NewInt(0).Add(headBlock.Number(), big.NewInt(1))
}

nextBlock, err := m.fetchBlockByNumber(ctx, m.nextBlockNumber)
if errors.Is(err, ethereum.NotFound) {
// reset poll interval as by config
pollInterval = m.options.PollingInterval
continue
}
// ...
nextBlock, err := m.fetchNextBlock(ctx)
if err != nil {
m.log.Warnf("ethmonitor: [retrying] failed to fetch next block # %d, due to: %v", m.nextBlockNumber, err)
pollInterval = m.options.PollingInterval // reset poll interval
continue
panic(err)
}

// speed up the poll interval if we found the next block
pollInterval /= 2
pollInterval /= 2 // todo...

// build deterministic set of add/remove events which construct the canonical chain
events, err = m.buildCanonicalChain(ctx, nextBlock, events)
Expand Down Expand Up @@ -485,6 +480,32 @@ func (m *Monitor) backfillChainLogs(ctx context.Context) {
}
}

func (m *Monitor) fetchNextBlock(ctx context.Context) (*types.Block, error) {
getter := func(ctx context.Context, _ string) (*types.Block, error) {
for {
nextBlock, err := m.fetchBlockByNumber(ctx, m.nextBlockNumber)
if errors.Is(err, ethereum.NotFound) {
time.Sleep(m.options.PollingInterval)
continue
}
if err != nil {
m.log.Warnf("ethmonitor: [retrying] failed to fetch next block # %d, due to: %v", m.nextBlockNumber, err)
time.Sleep(m.options.PollingInterval)
continue
}

return nextBlock, nil
}
}

if m.blockCache != nil {
// todo: we need chain id prefix and other places...
key := "NextBlock:" + m.nextBlockNumber.String()
return m.blockCache.GetOrSetWithLockEx(ctx, key, getter, m.options.CacheExpiry)
}
return getter(ctx, "")
}

func (m *Monitor) fetchBlockByNumber(ctx context.Context, num *big.Int) (*types.Block, error) {
getter := func(ctx context.Context, _ string) (*types.Block, error) {
m.log.Debugf("ethmonitor: fetchBlockByNumber is calling origin for number %s", num)
Expand Down

0 comments on commit e178e46

Please sign in to comment.