Skip to content

Commit

Permalink
Lazy ethmonitor & ethreceipts initialization (#117)
Browse files Browse the repository at this point in the history
  • Loading branch information
marino39 committed Apr 5, 2024
1 parent 91fd5b3 commit 77c5fc9
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 27 deletions.
27 changes: 18 additions & 9 deletions ethmonitor/ethmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,7 @@ func NewMonitor(provider ethrpc.RawInterface, options ...Options) (*Monitor, err
}
}

chainID, err := getChainID(provider)
if err != nil {
return nil, err
}

var err error
var cache cachestore.Store[[]byte]
if opts.CacheBackend != nil {
cache, err = cachestorectl.Open[[]byte](opts.CacheBackend, cachestore.WithLockExpiry(5*time.Second))
Expand All @@ -173,14 +169,23 @@ func NewMonitor(provider ethrpc.RawInterface, options ...Options) (*Monitor, err
alert: opts.Alerter,
provider: provider,
chain: newChain(opts.BlockRetentionLimit, opts.Bootstrap),
chainID: chainID,
chainID: nil,
cache: cache,
publishCh: make(chan Blocks),
publishQueue: newQueue(opts.BlockRetentionLimit * 2),
subscribers: make([]*subscriber, 0),
}, nil
}

func (m *Monitor) lazyInit(ctx context.Context) error {
var err error
m.chainID, err = getChainID(ctx, m.provider)
if err != nil {
return err
}
return nil
}

func (m *Monitor) Run(ctx context.Context) error {
if m.IsRunning() {
return fmt.Errorf("ethmonitor: already running")
Expand All @@ -191,6 +196,10 @@ func (m *Monitor) Run(ctx context.Context) error {
atomic.StoreInt32(&m.running, 1)
defer atomic.StoreInt32(&m.running, 0)

if err := m.lazyInit(ctx); err != nil {
return err
}

// Check if in bootstrap mode -- in which case we expect nextBlockNumber
// to already be set.
if m.options.Bootstrap && m.chain.blocks == nil {
Expand Down Expand Up @@ -1022,10 +1031,10 @@ func (m *Monitor) setPayload(value []byte) []byte {
}
}

func getChainID(provider ethrpc.Interface) (*big.Int, error) {
func getChainID(ctx context.Context, provider ethrpc.Interface) (*big.Int, error) {
var chainID *big.Int
err := breaker.Do(context.Background(), func() error {
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
err := breaker.Do(ctx, func() error {
ctx, cancel := context.WithTimeout(ctx, 4*time.Second)
defer cancel()

id, err := provider.ChainID(ctx)
Expand Down
50 changes: 32 additions & 18 deletions ethreceipts/ethreceipts.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,20 +134,6 @@ func NewReceiptsListener(log logger.Logger, provider ethrpc.Interface, monitor *
return nil, err
}

if opts.NumBlocksToFinality <= 0 {
chainID, err := getChainID(provider)
if err != nil {
chainID = big.NewInt(1) // assume mainnet in case of unlikely error
}
network, ok := ethrpc.Networks[chainID.Uint64()]
if ok {
opts.NumBlocksToFinality = network.NumBlocksToFinality
}
}
if opts.NumBlocksToFinality <= 0 {
opts.NumBlocksToFinality = 1 // absolute min is 1
}

return &ReceiptsListener{
options: opts,
log: log,
Expand All @@ -164,6 +150,25 @@ func NewReceiptsListener(log logger.Logger, provider ethrpc.Interface, monitor *
}, nil
}

func (l *ReceiptsListener) lazyInit(ctx context.Context) error {
if l.options.NumBlocksToFinality <= 0 {
chainID, err := getChainID(ctx, l.provider)
if err != nil {
chainID = big.NewInt(1) // assume mainnet in case of unlikely error
}
network, ok := ethrpc.Networks[chainID.Uint64()]
if ok {
l.options.NumBlocksToFinality = network.NumBlocksToFinality
}
}

if l.options.NumBlocksToFinality <= 0 {
l.options.NumBlocksToFinality = 1 // absolute min is 1
}

return nil
}

func (l *ReceiptsListener) Run(ctx context.Context) error {
if l.IsRunning() {
return fmt.Errorf("ethreceipts: already running")
Expand All @@ -174,6 +179,10 @@ func (l *ReceiptsListener) Run(ctx context.Context) error {
atomic.StoreInt32(&l.running, 1)
defer atomic.StoreInt32(&l.running, 0)

if err := l.lazyInit(ctx); err != nil {
return err
}

l.log.Info("ethreceipts: running")

return l.listener()
Expand Down Expand Up @@ -801,19 +810,24 @@ func (l *ReceiptsListener) latestBlockNum() *big.Int {
return latestBlockNum
}

func getChainID(provider ethrpc.Interface) (*big.Int, error) {
func getChainID(ctx context.Context, provider ethrpc.Interface) (*big.Int, error) {
var chainID *big.Int
err := breaker.Do(context.Background(), func() error {
id, err := provider.ChainID(context.Background())
err := breaker.Do(ctx, func() error {
ctx, cancel := context.WithTimeout(ctx, 4*time.Second)
defer cancel()

id, err := provider.ChainID(ctx)
if err != nil {
return err
}
chainID = id
return nil
}, nil, 1*time.Second, 2, 20)
}, nil, 1*time.Second, 2, 3)

if err != nil {
return nil, err
}

return chainID, nil
}

Expand Down

0 comments on commit 77c5fc9

Please sign in to comment.