Skip to content

Commit

Permalink
harden ethmonitor for block header streaming errors (#118)
Browse files Browse the repository at this point in the history
  • Loading branch information
marino39 committed Apr 15, 2024
1 parent 77c5fc9 commit b69b303
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 25 deletions.
1 change: 1 addition & 0 deletions cmd/chain-watch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func main() {
monitorOptions.PollingInterval = time.Duration(2000 * time.Millisecond)
monitorOptions.WithLogs = true
monitorOptions.BlockRetentionLimit = 64
monitorOptions.StreamingRetryAfter = 1 * time.Minute
// monitorOptions.StartBlockNumber = nil // track the head

latestBlock, err := provider.BlockByNumber(context.Background(), nil)
Expand Down
116 changes: 91 additions & 25 deletions ethmonitor/ethmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,22 @@ import (
)

var DefaultOptions = Options{
Logger: logger.NewLogger(logger.LogLevel_WARN),
PollingInterval: 1500 * time.Millisecond,
UnsubscribeOnStop: false,
Timeout: 20 * time.Second,
StartBlockNumber: nil, // latest
TrailNumBlocksBehindHead: 0, // latest
BlockRetentionLimit: 200,
WithLogs: false,
LogTopics: []common.Hash{}, // all logs
DebugLogging: false,
CacheExpiry: 300 * time.Second,
Alerter: util.NoopAlerter(),
Logger: logger.NewLogger(logger.LogLevel_WARN),
PollingInterval: 1500 * time.Millisecond,
ExpectedBlockInterval: 15 * time.Second,
StreamingErrorResetInterval: 15 * time.Minute,
StreamingRetryAfter: 20 * time.Minute,
StreamingErrNumToSwitchToPolling: 3,
UnsubscribeOnStop: false,
Timeout: 20 * time.Second,
StartBlockNumber: nil, // latest
TrailNumBlocksBehindHead: 0, // latest
BlockRetentionLimit: 200,
WithLogs: false,
LogTopics: []common.Hash{}, // all logs
DebugLogging: false,
CacheExpiry: 300 * time.Second,
Alerter: util.NoopAlerter(),
}

type Options struct {
Expand All @@ -47,6 +51,18 @@ type Options struct {
// PollingInterval to query the chain for new blocks
PollingInterval time.Duration

// ExpectedBlockInterval is the expected time between blocks
ExpectedBlockInterval time.Duration

// StreamingErrorResetInterval is the time to reset the streaming error count
StreamingErrorResetInterval time.Duration

// StreamingRetryAfter is the time to wait before retrying the streaming again
StreamingRetryAfter time.Duration

// StreamingErrNumToSwitchToPolling is the number of errors before switching to polling
StreamingErrNumToSwitchToPolling int

// Auto-unsubscribe on monitor stop or error
UnsubscribeOnStop bool

Expand Down Expand Up @@ -287,33 +303,66 @@ func (m *Monitor) listenNewHead() <-chan uint64 {
var latestHeadBlock atomic.Uint64
nextBlock := make(chan uint64)

if m.provider.IsStreamingEnabled() {
// Streaming mode if available, where we listen for new heads
// and push the new block number to the nextBlock channel.
go func() {
reconnect:
go func() {
var streamingErrorCount int
var streamingErrorLastTime time.Time

reconnect:
// reset the latest head block
latestHeadBlock.Store(0)

// if we have too many streaming errors, we'll switch to polling
streamingErrorCount++
if time.Since(streamingErrorLastTime) > m.options.StreamingErrorResetInterval {
streamingErrorCount = 0
}

// listen for new heads either via streaming or polling
if m.provider.IsStreamingEnabled() && streamingErrorCount < m.options.StreamingErrNumToSwitchToPolling {
// Streaming mode if available, where we listen for new heads
// and push the new block number to the nextBlock channel.
m.log.Info("ethmonitor: starting stream head listener")

newHeads := make(chan *types.Header)
sub, err := m.provider.SubscribeNewHeads(m.ctx, newHeads)
if err != nil {
m.log.Warnf("ethmonitor: websocket connect failed: %v", err)
m.alert.Alert(context.Background(), "ethmonitor: websocket connect failed", err)
time.Sleep(1500 * time.Millisecond)
time.Sleep(2000 * time.Millisecond)

streamingErrorLastTime = time.Now()
goto reconnect
}

blockTimer := time.NewTimer(3 * m.options.ExpectedBlockInterval)
for {
select {
case <-m.ctx.Done():
// if we're done, we'll unsubscribe and close the nextBlock channel
sub.Unsubscribe()
close(nextBlock)
return

case err := <-sub.Err():
// if we have an error, we'll reconnect
m.log.Warnf("ethmonitor: websocket subscription error: %v", err)
m.alert.Alert(context.Background(), "ethmonitor: websocket subscription error", err)
sub.Unsubscribe()

streamingErrorLastTime = time.Now()
goto reconnect

case <-blockTimer.C:
// if we haven't received a new block in a while, we'll reconnect.
m.log.Warnf("ethmonitor: haven't received block in expected time, reconnecting..")
sub.Unsubscribe()

streamingErrorLastTime = time.Now()
goto reconnect

case newHead := <-newHeads:
blockTimer.Reset(3 * m.options.ExpectedBlockInterval)

latestHeadBlock.Store(newHead.Number.Uint64())
select {
case nextBlock <- newHead.Number.Uint64():
Expand All @@ -322,21 +371,38 @@ func (m *Monitor) listenNewHead() <-chan uint64 {
}
}
}
}()
} else {
// We default to polling if streaming is not enabled
go func() {
} else {
// We default to polling if streaming is not enabled
m.log.Info("ethmonitor: starting poll head listener")

retryStreamingTimer := time.NewTimer(m.options.StreamingRetryAfter)
for {
// if streaming is enabled, we'll retry streaming
if m.provider.IsStreamingEnabled() {
select {
case <-retryStreamingTimer.C:
// retry streaming
m.log.Info("ethmonitor: retrying streaming...")
streamingErrorLastTime = time.Now().Add(-m.options.StreamingErrorResetInterval * 2)
goto reconnect
default:
// non-blocking
}
}

// Polling mode, where we poll for the latest block number
select {
case <-m.ctx.Done():
// if we're done, we'll close the nextBlock channel
close(nextBlock)
return

case <-time.After(time.Duration(m.pollInterval.Load())):
nextBlock <- 0
}
}
}()
}
}
}()

// The main loop which notifies the monitor to continue to the next block
go func() {
Expand Down Expand Up @@ -391,7 +457,7 @@ func (m *Monitor) monitor() error {
return nil

case newHeadNum := <-listenNewHead:
// ensure we
// ensure we have a new head number
m.nextBlockNumberMu.Lock()
if m.nextBlockNumber != nil && newHeadNum > 0 && m.nextBlockNumber.Uint64() > newHeadNum {
m.nextBlockNumberMu.Unlock()
Expand Down

0 comments on commit b69b303

Please sign in to comment.