From 33f07a0a9d9c1f3d44416a4ba6a97ddc3e9d56e7 Mon Sep 17 00:00:00 2001 From: im-adithya Date: Thu, 18 Jul 2024 20:30:20 +0530 Subject: [PATCH] chore: retry on error and add select --- lnclient/lnd/lnd.go | 110 ++++++++++++++++++++++++-------------------- 1 file changed, 59 insertions(+), 51 deletions(-) diff --git a/lnclient/lnd/lnd.go b/lnclient/lnd/lnd.go index c2056b40..e1734359 100644 --- a/lnclient/lnd/lnd.go +++ b/lnclient/lnd/lnd.go @@ -421,72 +421,80 @@ func NewLNDService(ctx context.Context, eventPublisher events.EventPublisher, ln // Subscribe to payments go func() { - paymentStream, err := lndClient.SubscribePayments(lndCtx, &routerrpc.TrackPaymentsRequest{ - NoInflightUpdates: true, - }) - if err != nil { - logger.Logger.Errorf("Error subscribing to payments: %v", err) - return - } for { - payment, err := paymentStream.Recv() - if lndCtx.Err() == context.Canceled { - return - } + paymentStream, err := lndClient.SubscribePayments(lndCtx, &routerrpc.TrackPaymentsRequest{ + NoInflightUpdates: true, + }) if err != nil { - logger.Logger.Errorf("Failed to receive payment: %v", err) - time.Sleep(2 * time.Second) + logger.Logger.Errorf("Error subscribing to payments: %v", err) continue } - if payment.Status != lnrpc.Payment_SUCCEEDED { - continue - } - - logger.Logger.WithFields(logrus.Fields{ - "payment": payment, - }).Info("Received new payment") - - transaction, err := lndPaymentToTransaction(payment) - if err != nil { - continue + for { + select { + case <-lndCtx.Done(): + return + default: + payment, err := paymentStream.Recv() + if err != nil { + logger.Logger.Errorf("Failed to receive payment: %v", err) + time.Sleep(2 * time.Second) + continue + } + if payment.Status != lnrpc.Payment_SUCCEEDED { + continue + } + + logger.Logger.WithFields(logrus.Fields{ + "payment": payment, + }).Info("Received new payment") + + transaction, err := lndPaymentToTransaction(payment) + if err != nil { + continue + } + + eventPublisher.Publish(&events.Event{ + Event: "nwc_payment_sent", + Properties: transaction, + }) + } } - - eventPublisher.Publish(&events.Event{ - Event: "nwc_payment_sent", - Properties: transaction, - }) } }() // Subscribe to invoices go func() { - invoiceStream, err := lndClient.SubscribeInvoices(lndCtx, &lnrpc.InvoiceSubscription{}) - if err != nil { - logger.Logger.Errorf("Error subscribing to invoices: %v", err) - return - } for { - invoice, err := invoiceStream.Recv() - if lndCtx.Err() == context.Canceled { - return - } + invoiceStream, err := lndClient.SubscribeInvoices(lndCtx, &lnrpc.InvoiceSubscription{}) if err != nil { - logger.Logger.Errorf("Failed to receive invoice: %v", err) - time.Sleep(2 * time.Second) + logger.Logger.Errorf("Error subscribing to invoices: %v", err) continue } - if invoice.State != lnrpc.Invoice_SETTLED { - continue + for { + select { + case <-lndCtx.Done(): + return + default: + invoice, err := invoiceStream.Recv() + if err != nil { + logger.Logger.Errorf("Failed to receive invoice: %v", err) + time.Sleep(2 * time.Second) + continue + } + if invoice.State != lnrpc.Invoice_SETTLED { + continue + } + + logger.Logger.WithFields(logrus.Fields{ + "invoice": invoice, + }).Info("Received new invoice") + + eventPublisher.Publish(&events.Event{ + Event: "nwc_payment_received", + Properties: lndInvoiceToTransaction(invoice), + }) + } } - - logger.Logger.WithFields(logrus.Fields{ - "invoice": invoice, - }).Info("Received new invoice") - - eventPublisher.Publish(&events.Event{ - Event: "nwc_payment_received", - Properties: lndInvoiceToTransaction(invoice), - }) } }()