Skip to content

Commit

Permalink
Fix logs publishing
Browse files Browse the repository at this point in the history
  • Loading branch information
pustovalov committed Dec 13, 2023
1 parent 6124a1e commit 9286557
Showing 1 changed file with 8 additions and 4 deletions.
12 changes: 8 additions & 4 deletions endpoint/eventsEth.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ type EventsEth struct {
*endpoint.Endpoint
eventBroker broker.Broker
newHeadsCh chan event.Block
logsCh chan event.Logs
logsChMap map[rpc.ID]chan event.Logs
}

func NewEventsEth(ep *endpoint.Endpoint, eb broker.Broker) *EventsEth {
return &EventsEth{
Endpoint: ep,
eventBroker: eb,
newHeadsCh: make(chan event.Block, events.NewHeadsChSize),
logsCh: make(chan event.Logs, events.LogsChSize),
logsChMap: make(map[rpc.ID]chan event.Logs),
}
}

Expand Down Expand Up @@ -65,15 +65,19 @@ func (e *EventsEth) Logs(ctx context.Context, subOpts request.LogSubscriptionOpt
rpcSub := notifier.CreateSubscription()

go func() {
logsSubs := e.eventBroker.SubscribeLogs(subOpts, e.logsCh)
logsCh := make(chan event.Logs, events.LogsChSize)
e.logsChMap[rpcSub.ID] = logsCh

logsSubs := e.eventBroker.SubscribeLogs(subOpts, logsCh)
for {
select {
case logs := <-e.logsCh:
case logs := <-logsCh:
for _, log := range logs {
notifier.Notify(rpcSub.ID, &log)
}
case <-rpcSub.Err():
e.eventBroker.UnsubscribeFromLogs(logsSubs)
delete(e.logsChMap, rpcSub.ID)
return
}
}
Expand Down

0 comments on commit 9286557

Please sign in to comment.