From 52ba6098157125472e53948390f528a4bfa607cd Mon Sep 17 00:00:00 2001 From: Marcin Gorzynski Date: Fri, 12 Apr 2024 15:19:41 +0200 Subject: [PATCH 1/6] harden ethmonitor for block header streaming errors --- ethmonitor/ethmonitor.go | 82 +++++++++++++++++++++++++++++----------- 1 file changed, 59 insertions(+), 23 deletions(-) diff --git a/ethmonitor/ethmonitor.go b/ethmonitor/ethmonitor.go index b9b22af8..21d6ad7a 100644 --- a/ethmonitor/ethmonitor.go +++ b/ethmonitor/ethmonitor.go @@ -26,18 +26,21 @@ import ( ) var DefaultOptions = Options{ - Logger: logger.NewLogger(logger.LogLevel_WARN), - PollingInterval: 1500 * time.Millisecond, - UnsubscribeOnStop: false, - Timeout: 20 * time.Second, - StartBlockNumber: nil, // latest - TrailNumBlocksBehindHead: 0, // latest - BlockRetentionLimit: 200, - WithLogs: false, - LogTopics: []common.Hash{}, // all logs - DebugLogging: false, - CacheExpiry: 300 * time.Second, - Alerter: util.NoopAlerter(), + Logger: logger.NewLogger(logger.LogLevel_WARN), + PollingInterval: 1500 * time.Millisecond, + ExpectedBlockInterval: 15 * time.Second, + StreamingErrorResetInterval: 15 * time.Minute, + ErrorNumToSwitchToPolling: 6, + UnsubscribeOnStop: false, + Timeout: 20 * time.Second, + StartBlockNumber: nil, // latest + TrailNumBlocksBehindHead: 0, // latest + BlockRetentionLimit: 200, + WithLogs: false, + LogTopics: []common.Hash{}, // all logs + DebugLogging: false, + CacheExpiry: 300 * time.Second, + Alerter: util.NoopAlerter(), } type Options struct { @@ -47,6 +50,15 @@ type Options struct { // PollingInterval to query the chain for new blocks PollingInterval time.Duration + // ExpectedBlockInterval is the expected time between blocks + ExpectedBlockInterval time.Duration + + // StreamingErrorResetInterval is the time to reset the streaming error count + StreamingErrorResetInterval time.Duration + + // ErrorNumToSwitchToPolling is the number of errors before switching to polling + ErrorNumToSwitchToPolling int + // Auto-unsubscribe on monitor stop or error UnsubscribeOnStop bool @@ -287,33 +299,59 @@ func (m *Monitor) listenNewHead() <-chan uint64 { var latestHeadBlock atomic.Uint64 nextBlock := make(chan uint64) - if m.provider.IsStreamingEnabled() { - // Streaming mode if available, where we listen for new heads - // and push the new block number to the nextBlock channel. - go func() { - reconnect: + go func() { + var streamingErrorCount int + var streamingErrorLastTime time.Time + + reconnect: + streamingErrorCount++ + if time.Since(streamingErrorLastTime) > m.options.StreamingErrorResetInterval { + streamingErrorCount = 0 + } + + if m.provider.IsStreamingEnabled() && streamingErrorCount < m.options.ErrorNumToSwitchToPolling { + // Streaming mode if available, where we listen for new heads + // and push the new block number to the nextBlock channel. newHeads := make(chan *types.Header) sub, err := m.provider.SubscribeNewHeads(m.ctx, newHeads) if err != nil { m.log.Warnf("ethmonitor: websocket connect failed: %v", err) m.alert.Alert(context.Background(), "ethmonitor: websocket connect failed", err) time.Sleep(1500 * time.Millisecond) + + streamingErrorLastTime = time.Now() goto reconnect } + blockTimer := time.NewTimer(2 * m.options.ExpectedBlockInterval) for { select { case <-m.ctx.Done(): + // if we're done, we'll unsubscribe and close the nextBlock channel sub.Unsubscribe() close(nextBlock) return + case err := <-sub.Err(): + // if we have an error, we'll reconnect m.log.Warnf("ethmonitor: websocket subscription error: %v", err) m.alert.Alert(context.Background(), "ethmonitor: websocket subscription error", err) sub.Unsubscribe() + + streamingErrorLastTime = time.Now() + goto reconnect + + case <-blockTimer.C: + // if we haven't received a new block in a while, we'll reconnect. + m.log.Warnf("ethmonitor: haven't received block in expected time, reconnecting..") + sub.Unsubscribe() + + streamingErrorLastTime = time.Now() goto reconnect case newHead := <-newHeads: + blockTimer.Reset(2 * m.options.ExpectedBlockInterval) + latestHeadBlock.Store(newHead.Number.Uint64()) select { case nextBlock <- newHead.Number.Uint64(): @@ -322,10 +360,8 @@ func (m *Monitor) listenNewHead() <-chan uint64 { } } } - }() - } else { - // We default to polling if streaming is not enabled - go func() { + } else { + // We default to polling if streaming is not enabled for { select { case <-m.ctx.Done(): @@ -335,8 +371,8 @@ func (m *Monitor) listenNewHead() <-chan uint64 { nextBlock <- 0 } } - }() - } + } + }() // The main loop which notifies the monitor to continue to the next block go func() { From 4998f7f1a89fe864ac25762890e7f5f33b2ce6ff Mon Sep 17 00:00:00 2001 From: Marcin Gorzynski Date: Fri, 12 Apr 2024 16:07:45 +0200 Subject: [PATCH 2/6] add head listener log & adjust expected block interval multiplier --- ethmonitor/ethmonitor.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/ethmonitor/ethmonitor.go b/ethmonitor/ethmonitor.go index 21d6ad7a..2b4266d3 100644 --- a/ethmonitor/ethmonitor.go +++ b/ethmonitor/ethmonitor.go @@ -312,6 +312,8 @@ func (m *Monitor) listenNewHead() <-chan uint64 { if m.provider.IsStreamingEnabled() && streamingErrorCount < m.options.ErrorNumToSwitchToPolling { // Streaming mode if available, where we listen for new heads // and push the new block number to the nextBlock channel. + m.log.Info("ethmonitor: starting stream head listener") + newHeads := make(chan *types.Header) sub, err := m.provider.SubscribeNewHeads(m.ctx, newHeads) if err != nil { @@ -323,7 +325,7 @@ func (m *Monitor) listenNewHead() <-chan uint64 { goto reconnect } - blockTimer := time.NewTimer(2 * m.options.ExpectedBlockInterval) + blockTimer := time.NewTimer(3 * m.options.ExpectedBlockInterval) for { select { case <-m.ctx.Done(): @@ -350,7 +352,7 @@ func (m *Monitor) listenNewHead() <-chan uint64 { goto reconnect case newHead := <-newHeads: - blockTimer.Reset(2 * m.options.ExpectedBlockInterval) + blockTimer.Reset(3 * m.options.ExpectedBlockInterval) latestHeadBlock.Store(newHead.Number.Uint64()) select { @@ -362,6 +364,8 @@ func (m *Monitor) listenNewHead() <-chan uint64 { } } else { // We default to polling if streaming is not enabled + m.log.Info("ethmonitor: starting poll head listener") + for { select { case <-m.ctx.Done(): From 21173acd41d0e35989a306a4c029502e985d7cd7 Mon Sep 17 00:00:00 2001 From: Marcin Gorzynski Date: Mon, 15 Apr 2024 10:24:19 +0200 Subject: [PATCH 3/6] retry streaming after given time --- ethmonitor/ethmonitor.go | 53 ++++++++++++++++++++++++++-------------- 1 file changed, 35 insertions(+), 18 deletions(-) diff --git a/ethmonitor/ethmonitor.go b/ethmonitor/ethmonitor.go index 2b4266d3..8a325582 100644 --- a/ethmonitor/ethmonitor.go +++ b/ethmonitor/ethmonitor.go @@ -26,21 +26,22 @@ import ( ) var DefaultOptions = Options{ - Logger: logger.NewLogger(logger.LogLevel_WARN), - PollingInterval: 1500 * time.Millisecond, - ExpectedBlockInterval: 15 * time.Second, - StreamingErrorResetInterval: 15 * time.Minute, - ErrorNumToSwitchToPolling: 6, - UnsubscribeOnStop: false, - Timeout: 20 * time.Second, - StartBlockNumber: nil, // latest - TrailNumBlocksBehindHead: 0, // latest - BlockRetentionLimit: 200, - WithLogs: false, - LogTopics: []common.Hash{}, // all logs - DebugLogging: false, - CacheExpiry: 300 * time.Second, - Alerter: util.NoopAlerter(), + Logger: logger.NewLogger(logger.LogLevel_WARN), + PollingInterval: 1500 * time.Millisecond, + ExpectedBlockInterval: 15 * time.Second, + StreamingErrorResetInterval: 15 * time.Minute, + StreamingRetryAfter: 1 * time.Hour, + StreamingErrNumToSwitchToPolling: 3, + UnsubscribeOnStop: false, + Timeout: 20 * time.Second, + StartBlockNumber: nil, // latest + TrailNumBlocksBehindHead: 0, // latest + BlockRetentionLimit: 200, + WithLogs: false, + LogTopics: []common.Hash{}, // all logs + DebugLogging: false, + CacheExpiry: 300 * time.Second, + Alerter: util.NoopAlerter(), } type Options struct { @@ -56,8 +57,11 @@ type Options struct { // StreamingErrorResetInterval is the time to reset the streaming error count StreamingErrorResetInterval time.Duration - // ErrorNumToSwitchToPolling is the number of errors before switching to polling - ErrorNumToSwitchToPolling int + // StreamingRetryAfter is the time to wait before retrying the streaming again + StreamingRetryAfter time.Duration + + // StreamingErrNumToSwitchToPolling is the number of errors before switching to polling + StreamingErrNumToSwitchToPolling int // Auto-unsubscribe on monitor stop or error UnsubscribeOnStop bool @@ -304,12 +308,17 @@ func (m *Monitor) listenNewHead() <-chan uint64 { var streamingErrorLastTime time.Time reconnect: + // reset the latest head block + latestHeadBlock.Store(0) + + // if we have too many streaming errors, we'll switch to polling streamingErrorCount++ if time.Since(streamingErrorLastTime) > m.options.StreamingErrorResetInterval { streamingErrorCount = 0 } - if m.provider.IsStreamingEnabled() && streamingErrorCount < m.options.ErrorNumToSwitchToPolling { + // listen for new heads either via streaming or polling + if m.provider.IsStreamingEnabled() && streamingErrorCount < m.options.StreamingErrNumToSwitchToPolling { // Streaming mode if available, where we listen for new heads // and push the new block number to the nextBlock channel. m.log.Info("ethmonitor: starting stream head listener") @@ -366,11 +375,19 @@ func (m *Monitor) listenNewHead() <-chan uint64 { // We default to polling if streaming is not enabled m.log.Info("ethmonitor: starting poll head listener") + retryStreamingTimer := time.NewTimer(m.options.StreamingRetryAfter) for { select { case <-m.ctx.Done(): + // if we're done, we'll close the nextBlock channel close(nextBlock) return + + case <-retryStreamingTimer.C: + // retry streaming + m.log.Info("ethmonitor: retrying streaming...") + goto reconnect + case <-time.After(time.Duration(m.pollInterval.Load())): nextBlock <- 0 } From 081b055553e43384653fac79e839cf21d580cde0 Mon Sep 17 00:00:00 2001 From: Marcin Gorzynski Date: Mon, 15 Apr 2024 17:02:34 +0200 Subject: [PATCH 4/6] do not retry streaming is streaming is not enabled --- ethmonitor/ethmonitor.go | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/ethmonitor/ethmonitor.go b/ethmonitor/ethmonitor.go index 8a325582..0d82db5a 100644 --- a/ethmonitor/ethmonitor.go +++ b/ethmonitor/ethmonitor.go @@ -377,17 +377,25 @@ func (m *Monitor) listenNewHead() <-chan uint64 { retryStreamingTimer := time.NewTimer(m.options.StreamingRetryAfter) for { + // if streaming is enabled, we'll retry streaming + if m.provider.IsStreamingEnabled() { + select { + case <-retryStreamingTimer.C: + // retry streaming + m.log.Info("ethmonitor: retrying streaming...") + goto reconnect + default: + // non-blocking + } + } + + // Polling mode, where we poll for the latest block number select { case <-m.ctx.Done(): // if we're done, we'll close the nextBlock channel close(nextBlock) return - case <-retryStreamingTimer.C: - // retry streaming - m.log.Info("ethmonitor: retrying streaming...") - goto reconnect - case <-time.After(time.Duration(m.pollInterval.Load())): nextBlock <- 0 } From 6712e9cbcb13d1d7507a07cccd557c498415c14c Mon Sep 17 00:00:00 2001 From: Peter Kieltyka Date: Mon, 15 Apr 2024 14:56:33 -0400 Subject: [PATCH 5/6] update --- ethmonitor/ethmonitor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ethmonitor/ethmonitor.go b/ethmonitor/ethmonitor.go index 0d82db5a..128296a7 100644 --- a/ethmonitor/ethmonitor.go +++ b/ethmonitor/ethmonitor.go @@ -456,7 +456,7 @@ func (m *Monitor) monitor() error { return nil case newHeadNum := <-listenNewHead: - // ensure we + // ensure we have a new head number m.nextBlockNumberMu.Lock() if m.nextBlockNumber != nil && newHeadNum > 0 && m.nextBlockNumber.Uint64() > newHeadNum { m.nextBlockNumberMu.Unlock() From 338e721b588705a0535fcd8ce0bd3e9d0c52efdf Mon Sep 17 00:00:00 2001 From: Peter Kieltyka Date: Mon, 15 Apr 2024 15:15:27 -0400 Subject: [PATCH 6/6] update --- cmd/chain-watch/main.go | 1 + ethmonitor/ethmonitor.go | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/cmd/chain-watch/main.go b/cmd/chain-watch/main.go index 7160a7e4..5b688f71 100644 --- a/cmd/chain-watch/main.go +++ b/cmd/chain-watch/main.go @@ -65,6 +65,7 @@ func main() { monitorOptions.PollingInterval = time.Duration(2000 * time.Millisecond) monitorOptions.WithLogs = true monitorOptions.BlockRetentionLimit = 64 + monitorOptions.StreamingRetryAfter = 1 * time.Minute // monitorOptions.StartBlockNumber = nil // track the head latestBlock, err := provider.BlockByNumber(context.Background(), nil) diff --git a/ethmonitor/ethmonitor.go b/ethmonitor/ethmonitor.go index 128296a7..8450bdf5 100644 --- a/ethmonitor/ethmonitor.go +++ b/ethmonitor/ethmonitor.go @@ -30,7 +30,7 @@ var DefaultOptions = Options{ PollingInterval: 1500 * time.Millisecond, ExpectedBlockInterval: 15 * time.Second, StreamingErrorResetInterval: 15 * time.Minute, - StreamingRetryAfter: 1 * time.Hour, + StreamingRetryAfter: 20 * time.Minute, StreamingErrNumToSwitchToPolling: 3, UnsubscribeOnStop: false, Timeout: 20 * time.Second, @@ -328,7 +328,7 @@ func (m *Monitor) listenNewHead() <-chan uint64 { if err != nil { m.log.Warnf("ethmonitor: websocket connect failed: %v", err) m.alert.Alert(context.Background(), "ethmonitor: websocket connect failed", err) - time.Sleep(1500 * time.Millisecond) + time.Sleep(2000 * time.Millisecond) streamingErrorLastTime = time.Now() goto reconnect @@ -383,6 +383,7 @@ func (m *Monitor) listenNewHead() <-chan uint64 { case <-retryStreamingTimer.C: // retry streaming m.log.Info("ethmonitor: retrying streaming...") + streamingErrorLastTime = time.Now().Add(-m.options.StreamingErrorResetInterval * 2) goto reconnect default: // non-blocking