Skip to content

Commit

Permalink
chore: retry on error and add select
Browse files Browse the repository at this point in the history
  • Loading branch information
im-adithya committed Jul 18, 2024
1 parent f6d255a commit 33f07a0
Showing 1 changed file with 59 additions and 51 deletions.
110 changes: 59 additions & 51 deletions lnclient/lnd/lnd.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,72 +421,80 @@ func NewLNDService(ctx context.Context, eventPublisher events.EventPublisher, ln

// Subscribe to payments
go func() {
paymentStream, err := lndClient.SubscribePayments(lndCtx, &routerrpc.TrackPaymentsRequest{
NoInflightUpdates: true,
})
if err != nil {
logger.Logger.Errorf("Error subscribing to payments: %v", err)
return
}
for {
payment, err := paymentStream.Recv()
if lndCtx.Err() == context.Canceled {
return
}
paymentStream, err := lndClient.SubscribePayments(lndCtx, &routerrpc.TrackPaymentsRequest{
NoInflightUpdates: true,
})
if err != nil {
logger.Logger.Errorf("Failed to receive payment: %v", err)
time.Sleep(2 * time.Second)
logger.Logger.Errorf("Error subscribing to payments: %v", err)
continue
}
if payment.Status != lnrpc.Payment_SUCCEEDED {
continue
}

logger.Logger.WithFields(logrus.Fields{
"payment": payment,
}).Info("Received new payment")

transaction, err := lndPaymentToTransaction(payment)
if err != nil {
continue
for {
select {
case <-lndCtx.Done():
return
default:
payment, err := paymentStream.Recv()
if err != nil {
logger.Logger.Errorf("Failed to receive payment: %v", err)
time.Sleep(2 * time.Second)
continue
}
if payment.Status != lnrpc.Payment_SUCCEEDED {
continue
}

logger.Logger.WithFields(logrus.Fields{
"payment": payment,
}).Info("Received new payment")

transaction, err := lndPaymentToTransaction(payment)
if err != nil {
continue
}

eventPublisher.Publish(&events.Event{
Event: "nwc_payment_sent",
Properties: transaction,
})
}
}

eventPublisher.Publish(&events.Event{
Event: "nwc_payment_sent",
Properties: transaction,
})
}
}()

// Subscribe to invoices
go func() {
invoiceStream, err := lndClient.SubscribeInvoices(lndCtx, &lnrpc.InvoiceSubscription{})
if err != nil {
logger.Logger.Errorf("Error subscribing to invoices: %v", err)
return
}
for {
invoice, err := invoiceStream.Recv()
if lndCtx.Err() == context.Canceled {
return
}
invoiceStream, err := lndClient.SubscribeInvoices(lndCtx, &lnrpc.InvoiceSubscription{})
if err != nil {
logger.Logger.Errorf("Failed to receive invoice: %v", err)
time.Sleep(2 * time.Second)
logger.Logger.Errorf("Error subscribing to invoices: %v", err)
continue
}
if invoice.State != lnrpc.Invoice_SETTLED {
continue
for {
select {
case <-lndCtx.Done():
return
default:
invoice, err := invoiceStream.Recv()
if err != nil {
logger.Logger.Errorf("Failed to receive invoice: %v", err)
time.Sleep(2 * time.Second)
continue
}
if invoice.State != lnrpc.Invoice_SETTLED {
continue
}

logger.Logger.WithFields(logrus.Fields{
"invoice": invoice,
}).Info("Received new invoice")

eventPublisher.Publish(&events.Event{
Event: "nwc_payment_received",
Properties: lndInvoiceToTransaction(invoice),
})
}
}

logger.Logger.WithFields(logrus.Fields{
"invoice": invoice,
}).Info("Received new invoice")

eventPublisher.Publish(&events.Event{
Event: "nwc_payment_received",
Properties: lndInvoiceToTransaction(invoice),
})
}
}()

Expand Down

0 comments on commit 33f07a0

Please sign in to comment.