diff --git a/instantout/reservation/actions_test.go b/instantout/reservation/actions_test.go index 2f989d929..d89e526db 100644 --- a/instantout/reservation/actions_test.go +++ b/instantout/reservation/actions_test.go @@ -47,24 +47,24 @@ type mockReservationClient struct { mock.Mock } -func (m *mockReservationClient) OpenReservation(ctx context.Context, - in *swapserverrpc.ServerOpenReservationRequest, - opts ...grpc.CallOption) (*swapserverrpc.ServerOpenReservationResponse, +func (m *mockReservationClient) ReservationNotificationStream( + ctx context.Context, in *swapserverrpc.ReservationNotificationRequest, + opts ...grpc.CallOption, +) (swapserverrpc.ReservationService_ReservationNotificationStreamClient, error) { args := m.Called(ctx, in, opts) - return args.Get(0).(*swapserverrpc.ServerOpenReservationResponse), + return args.Get(0).(swapserverrpc.ReservationService_ReservationNotificationStreamClient), args.Error(1) } -func (m *mockReservationClient) ReservationNotificationStream( - ctx context.Context, in *swapserverrpc.ReservationNotificationRequest, - opts ...grpc.CallOption, -) (swapserverrpc.ReservationService_ReservationNotificationStreamClient, +func (m *mockReservationClient) OpenReservation(ctx context.Context, + in *swapserverrpc.ServerOpenReservationRequest, + opts ...grpc.CallOption) (*swapserverrpc.ServerOpenReservationResponse, error) { args := m.Called(ctx, in, opts) - return args.Get(0).(swapserverrpc.ReservationService_ReservationNotificationStreamClient), + return args.Get(0).(*swapserverrpc.ServerOpenReservationResponse), args.Error(1) } diff --git a/instantout/reservation/fsm.go b/instantout/reservation/fsm.go index 188fe1fa6..86afbbaa2 100644 --- a/instantout/reservation/fsm.go +++ b/instantout/reservation/fsm.go @@ -28,8 +28,9 @@ type Config struct { // swap server. ReservationClient swapserverrpc.ReservationServiceClient - // FetchL402 is the function used to fetch the l402 token. - FetchL402 func(context.Context) error + // NotificationManager is the manager that handles the notification + // subscriptions. + NotificationManager NotificationManager } // FSM is the state machine that manages the reservation lifecycle. diff --git a/instantout/reservation/interfaces.go b/instantout/reservation/interfaces.go index c999d1b95..04bf830d3 100644 --- a/instantout/reservation/interfaces.go +++ b/instantout/reservation/interfaces.go @@ -3,6 +3,8 @@ package reservation import ( "context" "fmt" + + "github.com/lightninglabs/loop/swapserverrpc" ) var ( @@ -31,3 +33,10 @@ type Store interface { // made. ListReservations(ctx context.Context) ([]*Reservation, error) } + +// NotificationManager handles subscribing to incoming reservation +// subscriptions. +type NotificationManager interface { + SubscribeReservations(context.Context, + ) <-chan *swapserverrpc.ServerReservationNotification +} diff --git a/instantout/reservation/manager.go b/instantout/reservation/manager.go index 35f80ecb1..faffbc594 100644 --- a/instantout/reservation/manager.go +++ b/instantout/reservation/manager.go @@ -22,9 +22,6 @@ type Manager struct { // activeReservations contains all the active reservationsFSMs. activeReservations map[ID]*FSM - // hasL402 is true if the client has a valid L402. - hasL402 bool - runCtx context.Context sync.Mutex @@ -59,14 +56,7 @@ func (m *Manager) Run(ctx context.Context, height int32) error { return err } - reservationResChan := make( - chan *reservationrpc.ServerReservationNotification, - ) - - err = m.RegisterReservationNotifications(reservationResChan) - if err != nil { - return err - } + ntfnChan := m.cfg.NotificationManager.SubscribeReservations(ctx) for { select { @@ -74,7 +64,7 @@ func (m *Manager) Run(ctx context.Context, height int32) error { log.Debugf("Received block %v", height) currentHeight = height - case reservationRes := <-reservationResChan: + case reservationRes := <-ntfnChan: log.Debugf("Received reservation %x", reservationRes.ReservationId) _, err := m.newReservation( @@ -157,101 +147,6 @@ func (m *Manager) newReservation(ctx context.Context, currentHeight uint32, return reservationFSM, nil } -// fetchL402 fetches the L402 from the server. This method will keep on -// retrying until it gets a valid response. -func (m *Manager) fetchL402(ctx context.Context) { - // Add a 0 timer so that we initially fetch the L402 immediately. - timer := time.NewTimer(0) - for { - select { - case <-ctx.Done(): - return - - case <-timer.C: - err := m.cfg.FetchL402(ctx) - if err != nil { - log.Warnf("Error fetching L402: %v", err) - timer.Reset(time.Second * 10) - continue - } - m.hasL402 = true - return - } - } -} - -// RegisterReservationNotifications registers a new reservation notification -// stream. -func (m *Manager) RegisterReservationNotifications( - reservationChan chan *reservationrpc.ServerReservationNotification) error { - - // In order to create a valid l402 we first are going to call - // the FetchL402 method. As a client might not have outbound capacity - // yet, we'll retry until we get a valid response. - if !m.hasL402 { - m.fetchL402(m.runCtx) - } - - ctx, cancel := context.WithCancel(m.runCtx) - - // We'll now subscribe to the reservation notifications. - reservationStream, err := m.cfg.ReservationClient. - ReservationNotificationStream( - ctx, &reservationrpc.ReservationNotificationRequest{}, - ) - if err != nil { - cancel() - return err - } - - log.Debugf("Successfully subscribed to reservation notifications") - - // We'll now start a goroutine that will forward all the reservation - // notifications to the reservationChan. - go func() { - for { - reservationRes, err := reservationStream.Recv() - if err == nil && reservationRes != nil { - log.Debugf("Received reservation %x", - reservationRes.ReservationId) - reservationChan <- reservationRes - continue - } - log.Errorf("Error receiving "+ - "reservation: %v", err) - - cancel() - - // If we encounter an error, we'll - // try to reconnect. - for { - select { - case <-m.runCtx.Done(): - return - - case <-time.After(time.Second * 10): - log.Debugf("Reconnecting to " + - "reservation notifications") - err = m.RegisterReservationNotifications( - reservationChan, - ) - if err != nil { - log.Errorf("Error "+ - "reconnecting: %v", err) - continue - } - - // If we were able to reconnect, we'll - // return. - return - } - } - } - }() - - return nil -} - // RecoverReservations tries to recover all reservations that are still active // from the database. func (m *Manager) RecoverReservations(ctx context.Context) error { diff --git a/instantout/reservation/manager_test.go b/instantout/reservation/manager_test.go index f04d54446..1dbb5a349 100644 --- a/instantout/reservation/manager_test.go +++ b/instantout/reservation/manager_test.go @@ -13,7 +13,6 @@ import ( "github.com/lightningnetwork/lnd/chainntnfs" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "google.golang.org/grpc" ) var ( @@ -118,27 +117,22 @@ func newManagerTestContext(t *testing.T) *ManagerTestContext { sendChan := make(chan *swapserverrpc.ServerReservationNotification) - mockReservationClient.On( - "ReservationNotificationStream", mock.Anything, mock.Anything, - mock.Anything, - ).Return( - &dummyReservationNotificationServer{ - SendChan: sendChan, - }, nil, - ) - mockReservationClient.On( "OpenReservation", mock.Anything, mock.Anything, mock.Anything, ).Return( &swapserverrpc.ServerOpenReservationResponse{}, nil, ) + mockNtfnManager := &mockNtfnManager{ + sendChan: sendChan, + } + cfg := &Config{ - Store: store, - Wallet: mockLnd.WalletKit, - ChainNotifier: mockLnd.ChainNotifier, - FetchL402: func(context.Context) error { return nil }, - ReservationClient: mockReservationClient, + Store: store, + Wallet: mockLnd.WalletKit, + ChainNotifier: mockLnd.ChainNotifier, + ReservationClient: mockReservationClient, + NotificationManager: mockNtfnManager, } manager := NewManager(cfg) @@ -152,17 +146,15 @@ func newManagerTestContext(t *testing.T) *ManagerTestContext { } } -type dummyReservationNotificationServer struct { - grpc.ClientStream - - // SendChan is the channel that is used to send notifications. - SendChan chan *swapserverrpc.ServerReservationNotification +type mockNtfnManager struct { + sendChan chan *swapserverrpc.ServerReservationNotification } -func (d *dummyReservationNotificationServer) Recv() ( - *swapserverrpc.ServerReservationNotification, error) { +func (m *mockNtfnManager) SubscribeReservations( + ctx context.Context, +) <-chan *swapserverrpc.ServerReservationNotification { - return <-d.SendChan, nil + return m.sendChan } func mustDecodeID(id string) ID { diff --git a/loopd/daemon.go b/loopd/daemon.go index fea3905de..291c6e541 100644 --- a/loopd/daemon.go +++ b/loopd/daemon.go @@ -21,6 +21,7 @@ import ( "github.com/lightninglabs/loop/loopd/perms" "github.com/lightninglabs/loop/loopdb" loop_looprpc "github.com/lightninglabs/loop/looprpc" + "github.com/lightninglabs/loop/notifications" loop_swaprpc "github.com/lightninglabs/loop/swapserverrpc" "github.com/lightninglabs/loop/sweepbatcher" "github.com/lightningnetwork/lnd/clock" @@ -501,21 +502,41 @@ func (d *Daemon) initialize(withMacaroonService bool) error { } } + // Start the notification manager. + notificationCfg := ¬ifications.Config{ + Client: loop_swaprpc.NewSwapServerClient(swapClient.Conn), + FetchL402: swapClient.Server.FetchL402, + } + notificationManager := notifications.NewManager(notificationCfg) + + d.wg.Add(1) + go func() { + defer d.wg.Done() + + log.Info("Starting notification manager") + err := notificationManager.Run(d.mainCtx) + if err != nil { + d.internalErrChan <- err + log.Errorf("Notification manager stopped: %v", err) + } + }() + var ( reservationManager *reservation.Manager instantOutManager *instantout.Manager ) + // Create the reservation and instantout managers. if d.cfg.EnableExperimental { reservationStore := reservation.NewSQLStore( loopdb.NewTypedStore[reservation.Querier](baseDb), ) reservationConfig := &reservation.Config{ - Store: reservationStore, - Wallet: d.lnd.WalletKit, - ChainNotifier: d.lnd.ChainNotifier, - ReservationClient: reservationClient, - FetchL402: swapClient.Server.FetchL402, + Store: reservationStore, + Wallet: d.lnd.WalletKit, + ChainNotifier: d.lnd.ChainNotifier, + ReservationClient: reservationClient, + NotificationManager: notificationManager, } reservationManager = reservation.NewManager( diff --git a/loopd/log.go b/loopd/log.go index 970bd806b..a4f433f01 100644 --- a/loopd/log.go +++ b/loopd/log.go @@ -10,6 +10,7 @@ import ( "github.com/lightninglabs/loop/instantout/reservation" "github.com/lightninglabs/loop/liquidity" "github.com/lightninglabs/loop/loopdb" + "github.com/lightninglabs/loop/notifications" "github.com/lightninglabs/loop/sweepbatcher" "github.com/lightningnetwork/lnd" "github.com/lightningnetwork/lnd/build" @@ -48,6 +49,9 @@ func SetupLoggers(root *build.RotatingLogWriter, intercept signal.Interceptor) { lnd.AddSubLogger( root, instantout.Subsystem, intercept, instantout.UseLogger, ) + lnd.AddSubLogger( + root, notifications.Subsystem, intercept, notifications.UseLogger, + ) } // genSubLogger creates a logger for a subsystem. We provide an instance of diff --git a/notifications/log.go b/notifications/log.go new file mode 100644 index 000000000..5fb30f100 --- /dev/null +++ b/notifications/log.go @@ -0,0 +1,26 @@ +package notifications + +import ( + "github.com/btcsuite/btclog" + "github.com/lightningnetwork/lnd/build" +) + +// Subsystem defines the sub system name of this package. +const Subsystem = "NTFNS" + +// log is a logger that is initialized with no output filters. This +// means the package will not perform any logging by default until the caller +// requests it. +var log btclog.Logger + +// The default amount of logging is none. +func init() { + UseLogger(build.NewSubLogger(Subsystem, nil)) +} + +// UseLogger uses a specified Logger to output package logging info. +// This should be used in preference to SetLogWriter if the caller is also +// using btclog. +func UseLogger(logger btclog.Logger) { + log = logger +} diff --git a/notifications/manager.go b/notifications/manager.go new file mode 100644 index 000000000..ac6e16a88 --- /dev/null +++ b/notifications/manager.go @@ -0,0 +1,212 @@ +package notifications + +import ( + "context" + "sync" + "time" + + "github.com/lightninglabs/loop/swapserverrpc" + "google.golang.org/grpc" +) + +// NotificationType is the type of notification that the manager can handle. +type NotificationType int + +const ( + // NotificationTypeUnknown is the default notification type. + NotificationTypeUnknown NotificationType = iota + + // NotificationTypeReservation is the notification type for reservation + // notifications. + NotificationTypeReservation +) + +// Client is the interface that the notification manager needs to implement in +// order to be able to subscribe to notifications. +type Client interface { + // SubscribeNotifications subscribes to the notifications from the server. + SubscribeNotifications(ctx context.Context, + in *swapserverrpc.SubscribeNotificationsRequest, + opts ...grpc.CallOption) ( + swapserverrpc.SwapServer_SubscribeNotificationsClient, error) +} + +// Config contains all the services that the notification manager needs to +// operate. +type Config struct { + // Client is the client used to communicate with the swap server. + Client Client + + // FetchL402 is the function used to fetch the l402 token. + FetchL402 func(context.Context) error +} + +// Manager is a manager for notifications that the swap server sends to the +// client. +type Manager struct { + cfg *Config + + hasL402 bool + + subscribers map[NotificationType][]subscriber + sync.Mutex +} + +// NewManager creates a new notification manager. +func NewManager(cfg *Config) *Manager { + return &Manager{ + cfg: cfg, + subscribers: make(map[NotificationType][]subscriber), + } +} + +type subscriber struct { + subCtx context.Context + recvChan interface{} +} + +// SubscribeReservations subscribes to the reservation notifications. +func (m *Manager) SubscribeReservations(ctx context.Context, +) <-chan *swapserverrpc.ServerReservationNotification { + + notifChan := make(chan *swapserverrpc.ServerReservationNotification, 1) + sub := subscriber{ + subCtx: ctx, + recvChan: notifChan, + } + + m.addSubscriber(NotificationTypeReservation, sub) + + // Start a goroutine to remove the subscriber when the context is canceled + go func() { + <-ctx.Done() + m.removeSubscriber(NotificationTypeReservation, sub) + close(notifChan) + }() + + return notifChan +} + +// Run starts the notification manager. It will keep on running until the +// context is canceled. It will subscribe to notifications and forward them to +// the subscribers. On a first successful connection to the server, it will +// close the readyChan to signal that the manager is ready. +func (m *Manager) Run(ctx context.Context) error { + // Initially we want to immediately try to connect to the server. + waitTime := time.Duration(0) + + // Start the notification runloop. + for { + timer := time.NewTimer(waitTime) + // Increase the wait time for the next iteration. + waitTime += time.Second * 1 + + // Return if the context has been canceled. + select { + case <-ctx.Done(): + return nil + + case <-timer.C: + } + + // In order to create a valid l402 we first are going to call + // the FetchL402 method. As a client might not have outbound capacity + // yet, we'll retry until we get a valid response. + if !m.hasL402 { + err := m.cfg.FetchL402(ctx) + if err != nil { + log.Errorf("Error fetching L402: %v", err) + continue + } + m.hasL402 = true + } + + connectedFunc := func() { + // Reset the wait time to 10 seconds. + waitTime = time.Second * 10 + } + + err := m.subscribeNotifications(ctx, connectedFunc) + if err != nil { + log.Errorf("Error subscribing to notifications: %v", err) + } + } +} + +// subscribeNotifications subscribes to the notifications from the server. +func (m *Manager) subscribeNotifications(ctx context.Context, + connectedFunc func()) error { + + callCtx, cancel := context.WithCancel(ctx) + defer cancel() + + notifStream, err := m.cfg.Client.SubscribeNotifications( + callCtx, &swapserverrpc.SubscribeNotificationsRequest{}, + ) + if err != nil { + return err + } + + // Signal that we're connected to the server. + connectedFunc() + log.Debugf("Successfully subscribed to server notifications") + + for { + notification, err := notifStream.Recv() + if err == nil && notification != nil { + log.Debugf("Received notification: %v", notification) + m.handleNotification(notification) + continue + } + + log.Errorf("Error receiving notification: %v", err) + + return err + } +} + +// handleNotification handles an incoming notification from the server, +// forwarding it to the appropriate subscribers. +func (m *Manager) handleNotification(notification *swapserverrpc. + SubscribeNotificationsResponse) { + + switch notification.Notification.(type) { + case *swapserverrpc.SubscribeNotificationsResponse_ReservationNotification: + // We'll forward the reservation notification to all subscribers. + reservationNtfn := notification.GetReservationNotification() + m.Lock() + defer m.Unlock() + + for _, sub := range m.subscribers[NotificationTypeReservation] { + recvChan := sub.recvChan.(chan *swapserverrpc. + ServerReservationNotification) + + recvChan <- reservationNtfn + } + + default: + log.Warnf("Received unknown notification type: %v", + notification) + } +} + +// addSubscriber adds a subscriber to the manager. +func (m *Manager) addSubscriber(notifType NotificationType, sub subscriber) { + m.Lock() + defer m.Unlock() + m.subscribers[notifType] = append(m.subscribers[notifType], sub) +} + +// removeSubscriber removes a subscriber from the manager. +func (m *Manager) removeSubscriber(notifType NotificationType, sub subscriber) { + m.Lock() + defer m.Unlock() + subs := m.subscribers[notifType] + newSubs := make([]subscriber, 0, len(subs)) + for _, s := range subs { + if s != sub { + newSubs = append(newSubs, s) + } + } + m.subscribers[notifType] = newSubs +} diff --git a/notifications/manager_test.go b/notifications/manager_test.go new file mode 100644 index 000000000..ea756ff6a --- /dev/null +++ b/notifications/manager_test.go @@ -0,0 +1,173 @@ +package notifications + +import ( + "context" + "io" + "sync" + "testing" + "time" + + "github.com/lightninglabs/loop/swapserverrpc" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +var ( + testReservationId = []byte{0x01, 0x02} + testReservationId2 = []byte{0x01, 0x02} +) + +// mockNotificationsClient implements the NotificationsClient interface for testing. +type mockNotificationsClient struct { + mockStream swapserverrpc.SwapServer_SubscribeNotificationsClient + subscribeErr error + timesCalled int + sync.Mutex +} + +func (m *mockNotificationsClient) SubscribeNotifications(ctx context.Context, + in *swapserverrpc.SubscribeNotificationsRequest, + opts ...grpc.CallOption) ( + swapserverrpc.SwapServer_SubscribeNotificationsClient, error) { + + m.Lock() + defer m.Unlock() + + m.timesCalled++ + if m.subscribeErr != nil { + return nil, m.subscribeErr + } + return m.mockStream, nil +} + +// mockSubscribeNotificationsClient simulates the server stream. +type mockSubscribeNotificationsClient struct { + grpc.ClientStream + recvChan chan *swapserverrpc.SubscribeNotificationsResponse + recvErrChan chan error +} + +func (m *mockSubscribeNotificationsClient) Recv() ( + *swapserverrpc.SubscribeNotificationsResponse, error) { + + select { + case err := <-m.recvErrChan: + return nil, err + case notif, ok := <-m.recvChan: + if !ok { + return nil, io.EOF + } + return notif, nil + } +} + +func (m *mockSubscribeNotificationsClient) Header() (metadata.MD, error) { + return nil, nil +} + +func (m *mockSubscribeNotificationsClient) Trailer() metadata.MD { + return nil +} + +func (m *mockSubscribeNotificationsClient) CloseSend() error { + return nil +} + +func (m *mockSubscribeNotificationsClient) Context() context.Context { + return context.TODO() +} + +func (m *mockSubscribeNotificationsClient) SendMsg(interface{}) error { + return nil +} + +func (m *mockSubscribeNotificationsClient) RecvMsg(interface{}) error { + return nil +} + +func TestManager_ReservationNotification(t *testing.T) { + // Create a mock notification client + recvChan := make(chan *swapserverrpc.SubscribeNotificationsResponse, 1) + errChan := make(chan error, 1) + mockStream := &mockSubscribeNotificationsClient{ + recvChan: recvChan, + recvErrChan: errChan, + } + mockClient := &mockNotificationsClient{ + mockStream: mockStream, + } + + // Create a Manager with the mock client + mgr := NewManager(&Config{ + Client: mockClient, + FetchL402: func(ctx context.Context) error { + // Simulate successful fetching of L402 + return nil + }, + }) + + // Subscribe to reservation notifications. + subCtx, subCancel := context.WithCancel(context.Background()) + subChan := mgr.SubscribeReservations(subCtx) + + // Run the manager. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + err := mgr.Run(ctx) + require.NoError(t, err) + }() + + // Wait a bit to ensure manager is running and has subscribed + require.Eventually(t, func() bool { + mgr.Lock() + defer mgr.Unlock() + return len(mgr.subscribers[NotificationTypeReservation]) > 0 + }, time.Second*5, 10*time.Millisecond) + + mockClient.Lock() + require.Equal(t, 1, mockClient.timesCalled) + mockClient.Unlock() + + // Send a test notification + testNotif := getTestNotification(testReservationId) + + // Send the notification to the recvChan + recvChan <- testNotif + + // Collect the notification in the callback + receivedNotification := <-subChan + + // Now, check that the notification received in the callback matches the one sent + require.NotNil(t, receivedNotification) + require.Equal(t, testReservationId, receivedNotification.ReservationId) + + // Cancel the subscription + subCancel() + + // Send another test notification` + testNotif2 := getTestNotification(testReservationId2) + recvChan <- testNotif2 + + // Check that the subChan is eventually closed. + require.Eventually(t, func() bool { + select { + case _, ok := <-subChan: + return !ok + default: + return false + } + }, time.Second*5, 10*time.Millisecond) +} + +func getTestNotification(resId []byte) *swapserverrpc.SubscribeNotificationsResponse { + return &swapserverrpc.SubscribeNotificationsResponse{ + Notification: &swapserverrpc.SubscribeNotificationsResponse_ReservationNotification{ + ReservationNotification: &swapserverrpc.ServerReservationNotification{ + ReservationId: resId, + }, + }, + } +}