Skip to content

Commit

Permalink
Problem: rpc stream overhead even if not used
Browse files Browse the repository at this point in the history
Solution:
- init the underlying subscriptions lazily.
  • Loading branch information
yihuang committed Nov 1, 2024
1 parent 885530b commit 68b1496
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* [#533](https://github.com/crypto-org-chain/ethermint/pull/533) Bump cosmos-sdk to v0.50.10, cometbft to v0.38.13 and ibc-go to v8.5.1.
* [#546](https://github.com/crypto-org-chain/ethermint/pull/546) Introduce `--async-check-tx` flag to run check-tx async with consensus.
* [#549](https://github.com/crypto-org-chain/ethermint/pull/549) Support build without cgo.
* [#]() Start event stream on demand.

## v0.21.x-cronos

Expand Down
42 changes: 28 additions & 14 deletions rpc/stream/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type validatorAccountFunc func(
) (*evmtypes.QueryValidatorAccountResponse, error)

// RPCStream provides data streams for newHeads, logs, and pendingTransactions.
// it's only started on demand, so there's no overhead if the filter apis are not called at all.
type RPCStream struct {
evtClient rpcclient.EventsClient
logger log.Logger
Expand All @@ -69,39 +70,49 @@ func NewRPCStreams(
logger log.Logger,
txDecoder sdk.TxDecoder,
validatorAccount validatorAccountFunc,
) (*RPCStream, error) {
s := &RPCStream{
evtClient: evtClient,
logger: logger,
txDecoder: txDecoder,

headerStream: NewStream[RPCHeader](headerStreamSegmentSize, headerStreamCapacity),
pendingTxStream: NewStream[common.Hash](txStreamSegmentSize, txStreamCapacity),
logStream: NewStream[*ethtypes.Log](logStreamSegmentSize, logStreamCapacity),
) *RPCStream {
return &RPCStream{
evtClient: evtClient,
logger: logger,
txDecoder: txDecoder,
validatorAccount: validatorAccount,
}
}

func (s *RPCStream) init() {
if s.headerStream != nil {
// already initialized
return
}

s.headerStream = NewStream[RPCHeader](headerStreamSegmentSize, headerStreamCapacity)
s.pendingTxStream = NewStream[common.Hash](txStreamSegmentSize, txStreamCapacity)
s.logStream = NewStream[*ethtypes.Log](logStreamSegmentSize, logStreamCapacity)

ctx := context.Background()

chBlocks, err := s.evtClient.Subscribe(ctx, streamSubscriberName, blockEvents, subscribBufferSize)
if err != nil {
return nil, err
panic(err)
}

chLogs, err := s.evtClient.Subscribe(ctx, streamSubscriberName, evmEvents, subscribBufferSize)
if err != nil {
if err := s.evtClient.UnsubscribeAll(context.Background(), streamSubscriberName); err != nil {
s.logger.Error("failed to unsubscribe", "err", err)
}
return nil, err
panic(err)
}

go s.start(&s.wg, chBlocks, chLogs)

return s, nil
}

func (s *RPCStream) Close() error {
if s.headerStream == nil {
// not initialized
return nil
}

if err := s.evtClient.UnsubscribeAll(context.Background(), streamSubscriberName); err != nil {
return err
}
Expand All @@ -110,20 +121,23 @@ func (s *RPCStream) Close() error {
}

func (s *RPCStream) HeaderStream() *Stream[RPCHeader] {
s.init()
return s.headerStream
}

func (s *RPCStream) PendingTxStream() *Stream[common.Hash] {
s.init()
return s.pendingTxStream
}

func (s *RPCStream) LogStream() *Stream[*ethtypes.Log] {
s.init()
return s.logStream
}

// ListenPendingTx is a callback passed to application to listen for pending transactions in CheckTx.
func (s *RPCStream) ListenPendingTx(hash common.Hash) {
s.pendingTxStream.Add(hash)
s.PendingTxStream().Add(hash)
}

func (s *RPCStream) start(
Expand Down

0 comments on commit 68b1496

Please sign in to comment.