From 7e86d5179db8e7299e3ba3c58a8c60863f9c80ce Mon Sep 17 00:00:00 2001 From: wwestgarth Date: Tue, 9 Jan 2024 15:11:14 +0000 Subject: [PATCH] feat: add config option to limit the about of blocks worth of logs to pull from Ethereum --- core/evtforward/ethereum/config.go | 3 +++ core/evtforward/ethereum/engine.go | 40 ++++++++++++++++-------------- 2 files changed, 25 insertions(+), 18 deletions(-) diff --git a/core/evtforward/ethereum/config.go b/core/evtforward/ethereum/config.go index 218172ae279..14421ab092a 100644 --- a/core/evtforward/ethereum/config.go +++ b/core/evtforward/ethereum/config.go @@ -24,12 +24,14 @@ import ( const ( defaultDurationBetweenTwoRetry = 20 * time.Second + maxEthereumBlocks = 10000 // chosen because one of the validators wanted to use quicknode, and this is their limit ) type Config struct { // Level specifies the logging level of the Ethereum implementation of the // Event Forwarder. Level encoding.LogLevel `long:"log-level"` + MaxEthereumBlocks uint64 `long:"max-ethereum-blocks"` PollEventRetryDuration encoding.Duration } @@ -37,5 +39,6 @@ func NewDefaultConfig() Config { return Config{ Level: encoding.LogLevel{Level: logging.InfoLevel}, PollEventRetryDuration: encoding.Duration{Duration: defaultDurationBetweenTwoRetry}, + MaxEthereumBlocks: maxEthereumBlocks, } } diff --git a/core/evtforward/ethereum/engine.go b/core/evtforward/ethereum/engine.go index b3dda5fae1a..78a09c7f1df 100644 --- a/core/evtforward/ethereum/engine.go +++ b/core/evtforward/ethereum/engine.go @@ -25,8 +25,7 @@ import ( ) const ( - engineLogger = "engine" - maxEthereumBlocks = 1000 // 3+ hour worth of blocks? + engineLogger = "engine" ) //go:generate go run github.com/golang/mock/mockgen -destination mocks/forwarder_mock.go -package mocks code.vegaprotocol.io/vega/core/evtforward/ethereum Forwarder @@ -136,11 +135,11 @@ func (e *Engine) Start() { }) } -func issueFilteringRequest(from, to uint64) (ok bool, actualTo uint64) { +func issueFilteringRequest(from, to, nBlocks uint64) (ok bool, actualTo uint64) { if from > to { return false, 0 } - return true, min(from+maxEthereumBlocks, to) + return true, min(from+nBlocks, to) } func min(a, b uint64) uint64 { @@ -151,10 +150,11 @@ func min(a, b uint64) uint64 { } func (e *Engine) gatherEvents(ctx context.Context) { + nBlocks := e.cfg.MaxEthereumBlocks currentHeight := e.filterer.CurrentHeight(ctx) // Ensure we are not issuing a filtering request for non-existing block. - if ok, nextHeight := issueFilteringRequest(e.nextCollateralBlockNumber, currentHeight); ok { + if ok, nextHeight := issueFilteringRequest(e.nextCollateralBlockNumber, currentHeight, nBlocks); ok { e.filterer.FilterCollateralEvents(ctx, e.nextCollateralBlockNumber, nextHeight, func(event *commandspb.ChainEvent) { e.forwarder.ForwardFromSelf(event) }) @@ -162,27 +162,31 @@ func (e *Engine) gatherEvents(ctx context.Context) { } // Ensure we are not issuing a filtering request for non-existing block. - if e.shouldFilterStakingBridge && e.nextStakingBlockNumber <= currentHeight { - e.filterer.FilterStakingEvents(ctx, e.nextStakingBlockNumber, currentHeight, func(event *commandspb.ChainEvent) { - e.forwarder.ForwardFromSelf(event) - }) - e.nextStakingBlockNumber = currentHeight + 1 + if e.shouldFilterStakingBridge { + if ok, nextHeight := issueFilteringRequest(e.nextStakingBlockNumber, currentHeight, nBlocks); ok { + e.filterer.FilterStakingEvents(ctx, e.nextStakingBlockNumber, nextHeight, func(event *commandspb.ChainEvent) { + e.forwarder.ForwardFromSelf(event) + }) + e.nextStakingBlockNumber = nextHeight + 1 + } } // Ensure we are not issuing a filtering request for non-existing block. - if e.shouldFilterVestingBridge && e.nextVestingBlockNumber <= currentHeight { - e.filterer.FilterVestingEvents(ctx, e.nextVestingBlockNumber, currentHeight, func(event *commandspb.ChainEvent) { - e.forwarder.ForwardFromSelf(event) - }) - e.nextVestingBlockNumber = currentHeight + 1 + if e.shouldFilterVestingBridge { + if ok, nextHeight := issueFilteringRequest(e.nextVestingBlockNumber, currentHeight, nBlocks); ok { + e.filterer.FilterVestingEvents(ctx, e.nextVestingBlockNumber, nextHeight, func(event *commandspb.ChainEvent) { + e.forwarder.ForwardFromSelf(event) + }) + e.nextVestingBlockNumber = nextHeight + 1 + } } // Ensure we are not issuing a filtering request for non-existing block. - if e.nextMultiSigControlBlockNumber <= currentHeight { - e.filterer.FilterMultisigControlEvents(ctx, e.nextMultiSigControlBlockNumber, currentHeight, func(event *commandspb.ChainEvent) { + if ok, nextHeight := issueFilteringRequest(e.nextMultiSigControlBlockNumber, currentHeight, nBlocks); ok { + e.filterer.FilterMultisigControlEvents(ctx, e.nextMultiSigControlBlockNumber, nextHeight, func(event *commandspb.ChainEvent) { e.forwarder.ForwardFromSelf(event) }) - e.nextMultiSigControlBlockNumber = currentHeight + 1 + e.nextMultiSigControlBlockNumber = nextHeight + 1 } }