diff --git a/cmd/chain-watch/main.go b/cmd/chain-watch/main.go index 7160a7e..5b688f7 100644 --- a/cmd/chain-watch/main.go +++ b/cmd/chain-watch/main.go @@ -65,6 +65,7 @@ func main() { monitorOptions.PollingInterval = time.Duration(2000 * time.Millisecond) monitorOptions.WithLogs = true monitorOptions.BlockRetentionLimit = 64 + monitorOptions.StreamingRetryAfter = 1 * time.Minute // monitorOptions.StartBlockNumber = nil // track the head latestBlock, err := provider.BlockByNumber(context.Background(), nil) diff --git a/ethmonitor/ethmonitor.go b/ethmonitor/ethmonitor.go index b9b22af..8450bdf 100644 --- a/ethmonitor/ethmonitor.go +++ b/ethmonitor/ethmonitor.go @@ -26,18 +26,22 @@ import ( ) var DefaultOptions = Options{ - Logger: logger.NewLogger(logger.LogLevel_WARN), - PollingInterval: 1500 * time.Millisecond, - UnsubscribeOnStop: false, - Timeout: 20 * time.Second, - StartBlockNumber: nil, // latest - TrailNumBlocksBehindHead: 0, // latest - BlockRetentionLimit: 200, - WithLogs: false, - LogTopics: []common.Hash{}, // all logs - DebugLogging: false, - CacheExpiry: 300 * time.Second, - Alerter: util.NoopAlerter(), + Logger: logger.NewLogger(logger.LogLevel_WARN), + PollingInterval: 1500 * time.Millisecond, + ExpectedBlockInterval: 15 * time.Second, + StreamingErrorResetInterval: 15 * time.Minute, + StreamingRetryAfter: 20 * time.Minute, + StreamingErrNumToSwitchToPolling: 3, + UnsubscribeOnStop: false, + Timeout: 20 * time.Second, + StartBlockNumber: nil, // latest + TrailNumBlocksBehindHead: 0, // latest + BlockRetentionLimit: 200, + WithLogs: false, + LogTopics: []common.Hash{}, // all logs + DebugLogging: false, + CacheExpiry: 300 * time.Second, + Alerter: util.NoopAlerter(), } type Options struct { @@ -47,6 +51,18 @@ type Options struct { // PollingInterval to query the chain for new blocks PollingInterval time.Duration + // ExpectedBlockInterval is the expected time between blocks + ExpectedBlockInterval time.Duration + + // StreamingErrorResetInterval is the time to reset the streaming error count + StreamingErrorResetInterval time.Duration + + // StreamingRetryAfter is the time to wait before retrying the streaming again + StreamingRetryAfter time.Duration + + // StreamingErrNumToSwitchToPolling is the number of errors before switching to polling + StreamingErrNumToSwitchToPolling int + // Auto-unsubscribe on monitor stop or error UnsubscribeOnStop bool @@ -287,33 +303,66 @@ func (m *Monitor) listenNewHead() <-chan uint64 { var latestHeadBlock atomic.Uint64 nextBlock := make(chan uint64) - if m.provider.IsStreamingEnabled() { - // Streaming mode if available, where we listen for new heads - // and push the new block number to the nextBlock channel. - go func() { - reconnect: + go func() { + var streamingErrorCount int + var streamingErrorLastTime time.Time + + reconnect: + // reset the latest head block + latestHeadBlock.Store(0) + + // if we have too many streaming errors, we'll switch to polling + streamingErrorCount++ + if time.Since(streamingErrorLastTime) > m.options.StreamingErrorResetInterval { + streamingErrorCount = 0 + } + + // listen for new heads either via streaming or polling + if m.provider.IsStreamingEnabled() && streamingErrorCount < m.options.StreamingErrNumToSwitchToPolling { + // Streaming mode if available, where we listen for new heads + // and push the new block number to the nextBlock channel. + m.log.Info("ethmonitor: starting stream head listener") + newHeads := make(chan *types.Header) sub, err := m.provider.SubscribeNewHeads(m.ctx, newHeads) if err != nil { m.log.Warnf("ethmonitor: websocket connect failed: %v", err) m.alert.Alert(context.Background(), "ethmonitor: websocket connect failed", err) - time.Sleep(1500 * time.Millisecond) + time.Sleep(2000 * time.Millisecond) + + streamingErrorLastTime = time.Now() goto reconnect } + blockTimer := time.NewTimer(3 * m.options.ExpectedBlockInterval) for { select { case <-m.ctx.Done(): + // if we're done, we'll unsubscribe and close the nextBlock channel sub.Unsubscribe() close(nextBlock) return + case err := <-sub.Err(): + // if we have an error, we'll reconnect m.log.Warnf("ethmonitor: websocket subscription error: %v", err) m.alert.Alert(context.Background(), "ethmonitor: websocket subscription error", err) sub.Unsubscribe() + + streamingErrorLastTime = time.Now() + goto reconnect + + case <-blockTimer.C: + // if we haven't received a new block in a while, we'll reconnect. + m.log.Warnf("ethmonitor: haven't received block in expected time, reconnecting..") + sub.Unsubscribe() + + streamingErrorLastTime = time.Now() goto reconnect case newHead := <-newHeads: + blockTimer.Reset(3 * m.options.ExpectedBlockInterval) + latestHeadBlock.Store(newHead.Number.Uint64()) select { case nextBlock <- newHead.Number.Uint64(): @@ -322,21 +371,38 @@ func (m *Monitor) listenNewHead() <-chan uint64 { } } } - }() - } else { - // We default to polling if streaming is not enabled - go func() { + } else { + // We default to polling if streaming is not enabled + m.log.Info("ethmonitor: starting poll head listener") + + retryStreamingTimer := time.NewTimer(m.options.StreamingRetryAfter) for { + // if streaming is enabled, we'll retry streaming + if m.provider.IsStreamingEnabled() { + select { + case <-retryStreamingTimer.C: + // retry streaming + m.log.Info("ethmonitor: retrying streaming...") + streamingErrorLastTime = time.Now().Add(-m.options.StreamingErrorResetInterval * 2) + goto reconnect + default: + // non-blocking + } + } + + // Polling mode, where we poll for the latest block number select { case <-m.ctx.Done(): + // if we're done, we'll close the nextBlock channel close(nextBlock) return + case <-time.After(time.Duration(m.pollInterval.Load())): nextBlock <- 0 } } - }() - } + } + }() // The main loop which notifies the monitor to continue to the next block go func() { @@ -391,7 +457,7 @@ func (m *Monitor) monitor() error { return nil case newHeadNum := <-listenNewHead: - // ensure we + // ensure we have a new head number m.nextBlockNumberMu.Lock() if m.nextBlockNumber != nil && newHeadNum > 0 && m.nextBlockNumber.Uint64() > newHeadNum { m.nextBlockNumberMu.Unlock()