Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

harden ethmonitor for block header streaming errors #118

Merged
merged 6 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/chain-watch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func main() {
monitorOptions.PollingInterval = time.Duration(2000 * time.Millisecond)
monitorOptions.WithLogs = true
monitorOptions.BlockRetentionLimit = 64
monitorOptions.StreamingRetryAfter = 1 * time.Minute
// monitorOptions.StartBlockNumber = nil // track the head

latestBlock, err := provider.BlockByNumber(context.Background(), nil)
Expand Down
116 changes: 91 additions & 25 deletions ethmonitor/ethmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,22 @@ import (
)

var DefaultOptions = Options{
Logger: logger.NewLogger(logger.LogLevel_WARN),
PollingInterval: 1500 * time.Millisecond,
UnsubscribeOnStop: false,
Timeout: 20 * time.Second,
StartBlockNumber: nil, // latest
TrailNumBlocksBehindHead: 0, // latest
BlockRetentionLimit: 200,
WithLogs: false,
LogTopics: []common.Hash{}, // all logs
DebugLogging: false,
CacheExpiry: 300 * time.Second,
Alerter: util.NoopAlerter(),
Logger: logger.NewLogger(logger.LogLevel_WARN),
PollingInterval: 1500 * time.Millisecond,
ExpectedBlockInterval: 15 * time.Second,
StreamingErrorResetInterval: 15 * time.Minute,
StreamingRetryAfter: 20 * time.Minute,
StreamingErrNumToSwitchToPolling: 3,
UnsubscribeOnStop: false,
Timeout: 20 * time.Second,
StartBlockNumber: nil, // latest
TrailNumBlocksBehindHead: 0, // latest
BlockRetentionLimit: 200,
WithLogs: false,
LogTopics: []common.Hash{}, // all logs
DebugLogging: false,
CacheExpiry: 300 * time.Second,
Alerter: util.NoopAlerter(),
}

type Options struct {
Expand All @@ -47,6 +51,18 @@ type Options struct {
// PollingInterval to query the chain for new blocks
PollingInterval time.Duration

// ExpectedBlockInterval is the expected time between blocks
ExpectedBlockInterval time.Duration

// StreamingErrorResetInterval is the time to reset the streaming error count
StreamingErrorResetInterval time.Duration

// StreamingRetryAfter is the time to wait before retrying the streaming again
StreamingRetryAfter time.Duration

// StreamingErrNumToSwitchToPolling is the number of errors before switching to polling
StreamingErrNumToSwitchToPolling int

// Auto-unsubscribe on monitor stop or error
UnsubscribeOnStop bool

Expand Down Expand Up @@ -287,33 +303,66 @@ func (m *Monitor) listenNewHead() <-chan uint64 {
var latestHeadBlock atomic.Uint64
nextBlock := make(chan uint64)

if m.provider.IsStreamingEnabled() {
// Streaming mode if available, where we listen for new heads
// and push the new block number to the nextBlock channel.
go func() {
reconnect:
go func() {
var streamingErrorCount int
var streamingErrorLastTime time.Time

reconnect:
// reset the latest head block
latestHeadBlock.Store(0)

// if we have too many streaming errors, we'll switch to polling
streamingErrorCount++
if time.Since(streamingErrorLastTime) > m.options.StreamingErrorResetInterval {
streamingErrorCount = 0
}

// listen for new heads either via streaming or polling
if m.provider.IsStreamingEnabled() && streamingErrorCount < m.options.StreamingErrNumToSwitchToPolling {
// Streaming mode if available, where we listen for new heads
// and push the new block number to the nextBlock channel.
m.log.Info("ethmonitor: starting stream head listener")

newHeads := make(chan *types.Header)
sub, err := m.provider.SubscribeNewHeads(m.ctx, newHeads)
if err != nil {
m.log.Warnf("ethmonitor: websocket connect failed: %v", err)
m.alert.Alert(context.Background(), "ethmonitor: websocket connect failed", err)
time.Sleep(1500 * time.Millisecond)
time.Sleep(2000 * time.Millisecond)

streamingErrorLastTime = time.Now()
goto reconnect
}

blockTimer := time.NewTimer(3 * m.options.ExpectedBlockInterval)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: defer blockTimer.Stop()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource leak, agrh... defer will not help as it never goes out of function scope. I will get that fixed asap.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#119 -- hows this..?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it is good. The timer will close itself after it fires and will be garbage collected. So there is no need to close it. It will also not cause process to hang. There will be at most a couple of them.

// After waits for the duration to elapse and then sends the current time
// on the returned channel.
// It is equivalent to NewTimer(d).C.
// The underlying Timer is not recovered by the garbage collector
// until the timer fires. If efficiency is a concern, use NewTimer
// instead and call Timer.Stop if the timer is no longer needed.
func After(d Duration) <-chan Time {
	return NewTimer(d).C
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

“a little temporary timer leak” is fine imho

it’d probably be better to close it, though, to be accurate
(a leaked timeout might cause an unnecessary OS interrupt)

for {
select {
case <-m.ctx.Done():
// if we're done, we'll unsubscribe and close the nextBlock channel
sub.Unsubscribe()
close(nextBlock)
return

case err := <-sub.Err():
// if we have an error, we'll reconnect
m.log.Warnf("ethmonitor: websocket subscription error: %v", err)
m.alert.Alert(context.Background(), "ethmonitor: websocket subscription error", err)
sub.Unsubscribe()

streamingErrorLastTime = time.Now()
goto reconnect

case <-blockTimer.C:
// if we haven't received a new block in a while, we'll reconnect.
m.log.Warnf("ethmonitor: haven't received block in expected time, reconnecting..")
sub.Unsubscribe()

streamingErrorLastTime = time.Now()
goto reconnect

case newHead := <-newHeads:
blockTimer.Reset(3 * m.options.ExpectedBlockInterval)

latestHeadBlock.Store(newHead.Number.Uint64())
select {
case nextBlock <- newHead.Number.Uint64():
Expand All @@ -322,21 +371,38 @@ func (m *Monitor) listenNewHead() <-chan uint64 {
}
}
}
}()
} else {
// We default to polling if streaming is not enabled
go func() {
} else {
// We default to polling if streaming is not enabled
m.log.Info("ethmonitor: starting poll head listener")

retryStreamingTimer := time.NewTimer(m.options.StreamingRetryAfter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defer retryStreamingTimer.Stop()

for {
// if streaming is enabled, we'll retry streaming
if m.provider.IsStreamingEnabled() {
select {
case <-retryStreamingTimer.C:
// retry streaming
m.log.Info("ethmonitor: retrying streaming...")
streamingErrorLastTime = time.Now().Add(-m.options.StreamingErrorResetInterval * 2)
goto reconnect
default:
// non-blocking
}
}

// Polling mode, where we poll for the latest block number
select {
case <-m.ctx.Done():
// if we're done, we'll close the nextBlock channel
close(nextBlock)
return

case <-time.After(time.Duration(m.pollInterval.Load())):
nextBlock <- 0
}
}
}()
}
}
}()

// The main loop which notifies the monitor to continue to the next block
go func() {
Expand Down Expand Up @@ -391,7 +457,7 @@ func (m *Monitor) monitor() error {
return nil

case newHeadNum := <-listenNewHead:
// ensure we
// ensure we have a new head number
m.nextBlockNumberMu.Lock()
if m.nextBlockNumber != nil && newHeadNum > 0 && m.nextBlockNumber.Uint64() > newHeadNum {
m.nextBlockNumberMu.Unlock()
Expand Down
Loading