From 77c5fc944a9e5e3da24cee6fe64da7b73acee76e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcin=20G=C3=B3rzy=C5=84ski?= Date: Fri, 5 Apr 2024 22:43:27 +0200 Subject: [PATCH] Lazy ethmonitor & ethreceipts initialization (#117) --- ethmonitor/ethmonitor.go | 27 +++++++++++++------- ethreceipts/ethreceipts.go | 50 ++++++++++++++++++++++++-------------- 2 files changed, 50 insertions(+), 27 deletions(-) diff --git a/ethmonitor/ethmonitor.go b/ethmonitor/ethmonitor.go index ebd75be5..b9b22af8 100644 --- a/ethmonitor/ethmonitor.go +++ b/ethmonitor/ethmonitor.go @@ -150,11 +150,7 @@ func NewMonitor(provider ethrpc.RawInterface, options ...Options) (*Monitor, err } } - chainID, err := getChainID(provider) - if err != nil { - return nil, err - } - + var err error var cache cachestore.Store[[]byte] if opts.CacheBackend != nil { cache, err = cachestorectl.Open[[]byte](opts.CacheBackend, cachestore.WithLockExpiry(5*time.Second)) @@ -173,7 +169,7 @@ func NewMonitor(provider ethrpc.RawInterface, options ...Options) (*Monitor, err alert: opts.Alerter, provider: provider, chain: newChain(opts.BlockRetentionLimit, opts.Bootstrap), - chainID: chainID, + chainID: nil, cache: cache, publishCh: make(chan Blocks), publishQueue: newQueue(opts.BlockRetentionLimit * 2), @@ -181,6 +177,15 @@ func NewMonitor(provider ethrpc.RawInterface, options ...Options) (*Monitor, err }, nil } +func (m *Monitor) lazyInit(ctx context.Context) error { + var err error + m.chainID, err = getChainID(ctx, m.provider) + if err != nil { + return err + } + return nil +} + func (m *Monitor) Run(ctx context.Context) error { if m.IsRunning() { return fmt.Errorf("ethmonitor: already running") @@ -191,6 +196,10 @@ func (m *Monitor) Run(ctx context.Context) error { atomic.StoreInt32(&m.running, 1) defer atomic.StoreInt32(&m.running, 0) + if err := m.lazyInit(ctx); err != nil { + return err + } + // Check if in bootstrap mode -- in which case we expect nextBlockNumber // to already be set. if m.options.Bootstrap && m.chain.blocks == nil { @@ -1022,10 +1031,10 @@ func (m *Monitor) setPayload(value []byte) []byte { } } -func getChainID(provider ethrpc.Interface) (*big.Int, error) { +func getChainID(ctx context.Context, provider ethrpc.Interface) (*big.Int, error) { var chainID *big.Int - err := breaker.Do(context.Background(), func() error { - ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second) + err := breaker.Do(ctx, func() error { + ctx, cancel := context.WithTimeout(ctx, 4*time.Second) defer cancel() id, err := provider.ChainID(ctx) diff --git a/ethreceipts/ethreceipts.go b/ethreceipts/ethreceipts.go index e0d740a5..61137656 100644 --- a/ethreceipts/ethreceipts.go +++ b/ethreceipts/ethreceipts.go @@ -134,20 +134,6 @@ func NewReceiptsListener(log logger.Logger, provider ethrpc.Interface, monitor * return nil, err } - if opts.NumBlocksToFinality <= 0 { - chainID, err := getChainID(provider) - if err != nil { - chainID = big.NewInt(1) // assume mainnet in case of unlikely error - } - network, ok := ethrpc.Networks[chainID.Uint64()] - if ok { - opts.NumBlocksToFinality = network.NumBlocksToFinality - } - } - if opts.NumBlocksToFinality <= 0 { - opts.NumBlocksToFinality = 1 // absolute min is 1 - } - return &ReceiptsListener{ options: opts, log: log, @@ -164,6 +150,25 @@ func NewReceiptsListener(log logger.Logger, provider ethrpc.Interface, monitor * }, nil } +func (l *ReceiptsListener) lazyInit(ctx context.Context) error { + if l.options.NumBlocksToFinality <= 0 { + chainID, err := getChainID(ctx, l.provider) + if err != nil { + chainID = big.NewInt(1) // assume mainnet in case of unlikely error + } + network, ok := ethrpc.Networks[chainID.Uint64()] + if ok { + l.options.NumBlocksToFinality = network.NumBlocksToFinality + } + } + + if l.options.NumBlocksToFinality <= 0 { + l.options.NumBlocksToFinality = 1 // absolute min is 1 + } + + return nil +} + func (l *ReceiptsListener) Run(ctx context.Context) error { if l.IsRunning() { return fmt.Errorf("ethreceipts: already running") @@ -174,6 +179,10 @@ func (l *ReceiptsListener) Run(ctx context.Context) error { atomic.StoreInt32(&l.running, 1) defer atomic.StoreInt32(&l.running, 0) + if err := l.lazyInit(ctx); err != nil { + return err + } + l.log.Info("ethreceipts: running") return l.listener() @@ -801,19 +810,24 @@ func (l *ReceiptsListener) latestBlockNum() *big.Int { return latestBlockNum } -func getChainID(provider ethrpc.Interface) (*big.Int, error) { +func getChainID(ctx context.Context, provider ethrpc.Interface) (*big.Int, error) { var chainID *big.Int - err := breaker.Do(context.Background(), func() error { - id, err := provider.ChainID(context.Background()) + err := breaker.Do(ctx, func() error { + ctx, cancel := context.WithTimeout(ctx, 4*time.Second) + defer cancel() + + id, err := provider.ChainID(ctx) if err != nil { return err } chainID = id return nil - }, nil, 1*time.Second, 2, 20) + }, nil, 1*time.Second, 2, 3) + if err != nil { return nil, err } + return chainID, nil }