diff --git a/rpc/stream/rpc.go b/rpc/stream/rpc.go index f3a46da642..5e4d1bb78c 100644 --- a/rpc/stream/rpc.go +++ b/rpc/stream/rpc.go @@ -57,9 +57,12 @@ type RPCStream struct { logger log.Logger txDecoder sdk.TxDecoder - headerStream *Stream[RPCHeader] + // headerStream/logStream are backed by cometbft event subscription + headerStream *Stream[RPCHeader] + logStream *Stream[*ethtypes.Log] + + // pendingTxStream is backed by check-tx ante handler pendingTxStream *Stream[common.Hash] - logStream *Stream[*ethtypes.Log] wg sync.WaitGroup validatorAccount validatorAccountFunc @@ -76,17 +79,17 @@ func NewRPCStreams( logger: logger, txDecoder: txDecoder, validatorAccount: validatorAccount, + pendingTxStream: NewStream[common.Hash](txStreamSegmentSize, txStreamCapacity), } } -func (s *RPCStream) init() { +func (s *RPCStream) initSubscriptions() { if s.headerStream != nil { // already initialized return } s.headerStream = NewStream[RPCHeader](headerStreamSegmentSize, headerStreamCapacity) - s.pendingTxStream = NewStream[common.Hash](txStreamSegmentSize, txStreamCapacity) s.logStream = NewStream[*ethtypes.Log](logStreamSegmentSize, logStreamCapacity) ctx := context.Background() @@ -121,17 +124,16 @@ func (s *RPCStream) Close() error { } func (s *RPCStream) HeaderStream() *Stream[RPCHeader] { - s.init() + s.initSubscriptions() return s.headerStream } func (s *RPCStream) PendingTxStream() *Stream[common.Hash] { - s.init() return s.pendingTxStream } func (s *RPCStream) LogStream() *Stream[*ethtypes.Log] { - s.init() + s.initSubscriptions() return s.logStream }