Skip to content

Commit

Permalink
ethrpc: more succinct stream code
Browse files Browse the repository at this point in the history
  • Loading branch information
pkieltyka committed Oct 7, 2024
1 parent f1a464f commit 0e42f6c
Showing 1 changed file with 19 additions and 56 deletions.
75 changes: 19 additions & 56 deletions ethrpc/ethrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,25 +462,20 @@ func (p *Provider) IsStreamingEnabled() bool {
return p.nodeWSURL != ""
}

// SubscribeFilterLogs is stubbed below so we can adhere to the bind.ContractBackend interface.
// NOTE: the p.nodeWSURL is setup with a wss:// prefix, which tells the gethRPC to use a
// websocket connection.
//
// The connection will be closed and unsubscribed when the context is cancelled.
func (p *Provider) SubscribeFilterLogs(ctx context.Context, query ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) {
func (p *Provider) streamSubscribe(ctx context.Context, label string, subscribeFn func(conn *rpc.Client) (ethereum.Subscription, error)) (ethereum.Subscription, error) {
if !p.IsStreamingEnabled() {
return nil, fmt.Errorf("ethrpc: provider instance has not enabled streaming")
}

gethRPC, err := rpc.DialContext(ctx, p.nodeWSURL)
if err != nil {
return nil, fmt.Errorf("ethrpc: SubscribeFilterLogs failed to connect to websocket: %w", err)
return nil, fmt.Errorf("ethrpc: %s failed to connect to websocket: %w", label, err)
}

sub, err := gethRPC.EthSubscribe(ctx, ch, "logs", query)
sub, err := subscribeFn(gethRPC)
if err != nil {
gethRPC.Close()
return nil, fmt.Errorf("ethrpc: SubscribeFilterLogs failed: %w", err)
return nil, fmt.Errorf("ethrpc: %s failed: %w", label, err)
}

p.mu.Lock()
Expand Down Expand Up @@ -518,59 +513,27 @@ func (p *Provider) SubscribeFilterLogs(ctx context.Context, query ethereum.Filte
return sub, nil
}

// SubscribeFilterLogs is stubbed below so we can adhere to the bind.ContractBackend interface.
// NOTE: the p.nodeWSURL is setup with a wss:// prefix, which tells the gethRPC to use a
// websocket connection.
//
// The connection will be closed and unsubscribed when the context is cancelled.
func (p *Provider) SubscribeFilterLogs(ctx context.Context, query ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) {
fn := func(conn *rpc.Client) (ethereum.Subscription, error) {
return conn.EthSubscribe(ctx, ch, "logs", query)
}
return p.streamSubscribe(ctx, "SubscribeFilterLogs", fn)
}

// SubscribeNewHeads listens for new blocks via websocket client. NOTE: the p.nodeWSURL is setup
// with a wss:// prefix, which tells the gethRPC to use a websocket connection.
//
// The connection will be closed and unsubscribed when the context is cancelled.
func (p *Provider) SubscribeNewHeads(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) {
if !p.IsStreamingEnabled() {
return nil, fmt.Errorf("ethrpc: provider instance has not enabled streaming")
}

gethRPC, err := rpc.DialContext(ctx, p.nodeWSURL)
if err != nil {
return nil, fmt.Errorf("ethrpc: SubscribeNewHeads failed to connect to websocket: %w", err)
fn := func(conn *rpc.Client) (ethereum.Subscription, error) {
return conn.EthSubscribe(ctx, ch, "newHeads")
}

sub, err := gethRPC.EthSubscribe(ctx, ch, "newHeads")
if err != nil {
gethRPC.Close()
return nil, fmt.Errorf("ethrpc: SubscribeNewHeads failed: %w", err)
}

p.mu.Lock()
p.streamClosers = append(p.streamClosers, gethRPC)
p.streamUnsubscribers = append(p.streamUnsubscribers, sub)
p.mu.Unlock()

go func() {
// close the subscription when the context is cancelled
// or when the subscription is explicitly closed
select {
case <-ctx.Done():
sub.Unsubscribe()
case <-sub.Err():
}

p.mu.Lock()
sub.Unsubscribe()
for i, unsub := range p.streamUnsubscribers {
if unsub == sub {
p.streamUnsubscribers = append(p.streamUnsubscribers[:i], p.streamUnsubscribers[i+1:]...)
break
}
}
gethRPC.Close()
for i, closer := range p.streamClosers {
if closer == gethRPC {
p.streamClosers = append(p.streamClosers[:i], p.streamClosers[i+1:]...)
break
}
}
p.mu.Unlock()
}()

return sub, nil
return p.streamSubscribe(ctx, "SubscribeNewHeads", fn)
}

func (p *Provider) CloseStreamConns() {
Expand Down

0 comments on commit 0e42f6c

Please sign in to comment.