Skip to content

Commit

Permalink
ethrpc/ethmonitor: return raw blocks and logs
Browse files Browse the repository at this point in the history
  • Loading branch information
pkieltyka committed Aug 25, 2023
1 parent 86a8569 commit ce75f6e
Show file tree
Hide file tree
Showing 8 changed files with 211 additions and 126 deletions.
117 changes: 88 additions & 29 deletions ethmonitor/ethmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ethmonitor

import (
"context"
"encoding/json"
"errors"
"fmt"
"math/big"
Expand Down Expand Up @@ -99,6 +100,8 @@ type Monitor struct {
blockCache cachestore.Store[*types.Block]
logCache cachestore.Store[[]types.Log]

cache cachestore.Store[[]byte]

publishCh chan Blocks
publishQueue *queue
subscribers []*subscriber
Expand Down Expand Up @@ -139,6 +142,7 @@ func NewMonitor(provider ethrpc.Interface, options ...Options) (*Monitor, error)
}

var (
cache cachestore.Store[[]byte]
blockCache cachestore.Store[*types.Block]
logCache cachestore.Store[[]types.Log]
)
Expand All @@ -151,6 +155,12 @@ func NewMonitor(provider ethrpc.Interface, options ...Options) (*Monitor, error)
if err != nil {
return nil, fmt.Errorf("ethmonitor: open log cache: %w", err)
}

cache, err = cachestorectl.Open[[]byte](opts.CacheBackend, cachestore.WithLockExpiry(5*time.Second))
if err != nil {
return nil, fmt.Errorf("ethmonitor: open cache: %w", err)
}

if opts.CacheExpiry == 0 {
opts.CacheExpiry = 60 * time.Second
}
Expand All @@ -164,6 +174,7 @@ func NewMonitor(provider ethrpc.Interface, options ...Options) (*Monitor, error)
chainID: chainID,
blockCache: blockCache,
logCache: logCache,
cache: cache,
publishCh: make(chan Blocks),
publishQueue: newQueue(opts.BlockRetentionLimit * 2),
subscribers: make([]*subscriber, 0),
Expand Down Expand Up @@ -282,9 +293,9 @@ func (m *Monitor) monitor() error {
nextBlock, miss, err := m.fetchNextBlock(ctx)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
m.log.Infof("ethmonitor: fetchNextBlock timed out: '%v', for blockNum:%d, retrying..", err, m.nextBlockNumber.Uint64())
m.log.Infof("ethmonitor: fetchNextBlock timed out: '%v', for blockNum:%v, retrying..", err, m.nextBlockNumber)
} else {
m.log.Warnf("ethmonitor: fetchNextBlock error reported '%v', for blockNum:%d, retrying..", err, m.nextBlockNumber.Uint64())
m.log.Warnf("ethmonitor: fetchNextBlock error reported '%v', for blockNum:%v, retrying..", err, m.nextBlockNumber)
}

// pause, then retry
Expand Down Expand Up @@ -460,17 +471,22 @@ func (m *Monitor) addLogs(ctx context.Context, blocks Blocks) {
}

func (m *Monitor) filterLogs(ctx context.Context, blockHash common.Hash, topics [][]common.Hash) ([]types.Log, error) {
getter := func(ctx context.Context, _ string) ([]types.Log, error) {
getter := func(ctx context.Context, _ string) ([]byte, error) {
m.log.Debugf("ethmonitor: filterLogs is calling origin for block hash %s", blockHash)

return m.provider.FilterLogs(ctx, ethereum.FilterQuery{
logsPayload, err := m.provider.RawFilterLogs(ctx, ethereum.FilterQuery{
BlockHash: &blockHash,
Topics: topics,
})
return logsPayload, err
}

if m.logCache == nil {
return getter(ctx, "")
if m.cache == nil {
resp, err := getter(ctx, "")
if err != nil {
return nil, err
}
return unmarshalLogs(resp)
}

topicsDigest := xxhash.New()
Expand All @@ -482,7 +498,11 @@ func (m *Monitor) filterLogs(ctx context.Context, blockHash common.Hash, topics
}

key := fmt.Sprintf("ethmonitor:%s:Logs:hash=%s;topics=%d", m.chainID.String(), blockHash.String(), topicsDigest.Sum64())
return m.logCache.GetOrSetWithLockEx(ctx, key, getter, m.options.CacheExpiry)
resp, err := m.cache.GetOrSetWithLockEx(ctx, key, getter, m.options.CacheExpiry)
if err != nil {
return nil, err
}
return unmarshalLogs(resp)
}

func (m *Monitor) backfillChainLogs(ctx context.Context, newBlocks Blocks) {
Expand Down Expand Up @@ -527,7 +547,7 @@ func (m *Monitor) backfillChainLogs(ctx context.Context, newBlocks Blocks) {
func (m *Monitor) fetchNextBlock(ctx context.Context) (*types.Block, bool, error) {
miss := false

getter := func(ctx context.Context, _ string) (*types.Block, error) {
getter := func(ctx context.Context, _ string) ([]byte, error) {
m.log.Debugf("ethmonitor: fetchNextBlock is calling origin for number %s", m.nextBlockNumber)
for {
select {
Expand All @@ -536,7 +556,7 @@ func (m *Monitor) fetchNextBlock(ctx context.Context) (*types.Block, bool, error
default:
}

nextBlock, err := m.fetchBlockByNumber(ctx, m.nextBlockNumber)
nextBlockPayload, err := m.fetchRawBlockByNumber(ctx, m.nextBlockNumber)
if errors.Is(err, ethereum.NotFound) {
miss = true
time.Sleep(m.options.PollingInterval)
Expand All @@ -549,28 +569,37 @@ func (m *Monitor) fetchNextBlock(ctx context.Context) (*types.Block, bool, error
continue
}

return nextBlock, nil
return nextBlockPayload, nil
}
}

if m.blockCache != nil {
key := cacheKeyBlockNum(m.chainID, m.nextBlockNumber)
nextBlock, err := m.blockCache.GetOrSetWithLockEx(ctx, key, getter, m.options.CacheExpiry)
return nextBlock, miss, err
if m.cache == nil {
resp, err := getter(ctx, "")
if err != nil {
return nil, miss, err
}
block, err := unmarshalBlock(resp)
return block, miss, err
}
nextBlock, err := getter(ctx, "")
return nextBlock, miss, err

key := cacheKeyBlockNum(m.chainID, m.nextBlockNumber)
resp, err := m.cache.GetOrSetWithLockEx(ctx, key, getter, m.options.CacheExpiry)
if err != nil {
return nil, miss, err
}
block, err := unmarshalBlock(resp)
return block, miss, err
}

func cacheKeyBlockNum(chainID *big.Int, num *big.Int) string {
return fmt.Sprintf("ethmonitor:%s:BlockNum:%s", chainID.String(), num.String())
}

func (m *Monitor) fetchBlockByNumber(ctx context.Context, num *big.Int) (*types.Block, error) {
func (m *Monitor) fetchRawBlockByNumber(ctx context.Context, num *big.Int) ([]byte, error) {
m.log.Debugf("ethmonitor: fetchBlockByNumber is calling origin for number %s", num)
maxErrAttempts, errAttempts := 3, 0 // quick retry in case of short-term node connection failures

var block *types.Block
var blockPayload []byte
var err error

for {
Expand All @@ -588,7 +617,7 @@ func (m *Monitor) fetchBlockByNumber(ctx context.Context, num *big.Int) (*types.
tctx, cancel := context.WithTimeout(ctx, m.options.Timeout)
defer cancel()

block, err = m.provider.BlockByNumber(tctx, num)
blockPayload, err = m.provider.RawBlockByNumber(tctx, num)
if err != nil {
if errors.Is(err, ethereum.NotFound) {
return nil, ethereum.NotFound
Expand All @@ -599,18 +628,19 @@ func (m *Monitor) fetchBlockByNumber(ctx context.Context, num *big.Int) (*types.
continue
}
}
return block, nil
return blockPayload, nil
}
}

func (m *Monitor) fetchBlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) {
getter := func(ctx context.Context, _ string) (*types.Block, error) {
getter := func(ctx context.Context, _ string) ([]byte, error) {
m.log.Debugf("ethmonitor: fetchBlockByHash is calling origin for hash %s", hash)

maxNotFoundAttempts, notFoundAttempts := 2, 0 // waiting for node to sync
maxErrAttempts, errAttempts := 2, 0 // quick retry in case of short-term node connection failures

var block *types.Block
// var block *types.Block
var blockPayload []byte
var err error

for {
Expand All @@ -628,7 +658,7 @@ func (m *Monitor) fetchBlockByHash(ctx context.Context, hash common.Hash) (*type
return nil, superr.New(ErrMaxAttempts, err)
}

block, err = m.provider.BlockByHash(ctx, hash)
blockPayload, err = m.provider.RawBlockByHash(ctx, hash)
if err != nil {
if errors.Is(err, ethereum.NotFound) {
notFoundAttempts++
Expand All @@ -640,17 +670,28 @@ func (m *Monitor) fetchBlockByHash(ctx context.Context, hash common.Hash) (*type
continue
}
}
if block != nil {
return block, nil
if blockPayload != nil {
return blockPayload, nil
}
}
}

if m.blockCache != nil {
key := fmt.Sprintf("ethmonitor:%s:BlockHash:%s", m.chainID.String(), hash.String())
return m.blockCache.GetOrSetWithLockEx(ctx, key, getter, m.options.CacheExpiry)
if m.cache == nil {
resp, err := getter(ctx, "")
if err != nil {
return nil, err
}
block, err := unmarshalBlock(resp)
return block, err
}
return getter(ctx, "")

key := fmt.Sprintf("ethmonitor:%s:BlockHash:%s", m.chainID.String(), hash.String())
resp, err := m.cache.GetOrSetWithLockEx(ctx, key, getter, m.options.CacheExpiry)
if err != nil {
return nil, err
}
block, err := unmarshalBlock(resp)
return block, err
}

func (m *Monitor) publish(ctx context.Context, events Blocks) error {
Expand Down Expand Up @@ -828,3 +869,21 @@ func clampDuration(x, y time.Duration) time.Duration {
return y
}
}

func unmarshalBlock(blockPayload []byte) (*types.Block, error) {
var block *types.Block
err := ethrpc.IntoBlock(blockPayload, &block)
if err != nil {
return nil, err
}
return block, nil
}

func unmarshalLogs(logsPload []byte) ([]types.Log, error) {
var logs []types.Log
err := json.Unmarshal(logsPload, &logs)
if err != nil {
return nil, err
}
return logs, nil
}
13 changes: 7 additions & 6 deletions ethrpc/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (tx *rpcTransaction) UnmarshalJSON(msg []byte) error {
return json.Unmarshal(msg, &tx.txExtraInfo)
}

func intoBlock(raw json.RawMessage, ret **types.Block) error {
func IntoBlock(raw json.RawMessage, ret **types.Block) error {
if len(raw) == 0 {
return ethereum.NotFound
}
Expand Down Expand Up @@ -93,7 +93,8 @@ func intoBlock(raw json.RawMessage, ret **types.Block) error {
return nil
}

func intoBlocks(raw json.RawMessage, ret *[]*types.Block) error {
// unused
/*func intoBlocks(raw json.RawMessage, ret *[]*types.Block) error {
var list []json.RawMessage
err := json.Unmarshal(raw, &list)
Expand All @@ -112,13 +113,13 @@ func intoBlocks(raw json.RawMessage, ret *[]*types.Block) error {
*ret = blocks
return nil
}
}*/

func intoTransaction(raw json.RawMessage, tx **types.Transaction) error {
return intoTransactionWithPending(raw, tx, nil)
func IntoTransaction(raw json.RawMessage, tx **types.Transaction) error {
return IntoTransactionWithPending(raw, tx, nil)
}

func intoTransactionWithPending(raw json.RawMessage, tx **types.Transaction, pending *bool) error {
func IntoTransactionWithPending(raw json.RawMessage, tx **types.Transaction, pending *bool) error {
var body *rpcTransaction
if err := json.Unmarshal(raw, &body); err != nil {
return err
Expand Down
Loading

0 comments on commit ce75f6e

Please sign in to comment.