From 43aa5664fe1da02e96416ff150caca5e6f12c4a6 Mon Sep 17 00:00:00 2001 From: Andras Banki-Horvath Date: Wed, 29 Jan 2025 20:21:52 +0100 Subject: [PATCH 1/2] notifications: implement incremental backoff for invalid L402 tokens --- notifications/manager.go | 77 +++++++++++---- notifications/manager_test.go | 173 ++++++++++++++++++++++++++++++++++ 2 files changed, 230 insertions(+), 20 deletions(-) diff --git a/notifications/manager.go b/notifications/manager.go index edbfd306f..d3cf194c1 100644 --- a/notifications/manager.go +++ b/notifications/manager.go @@ -26,6 +26,13 @@ const ( NotificationTypeStaticLoopInSweepRequest ) +const ( + // defaultMinAliveConnTime is the default minimum time that the + // connection to the server needs to be alive before we consider it a + // successful connection. + defaultMinAliveConnTime = time.Minute +) + // Client is the interface that the notification manager needs to implement in // order to be able to subscribe to notifications. type Client interface { @@ -45,6 +52,10 @@ type Config struct { // CurrentToken returns the token that is currently contained in the // store or an l402.ErrNoToken error if there is none. CurrentToken func() (*l402.Token, error) + + // MinAliveConnTime is the minimum time that the connection to the + // server needs to be alive before we consider it a successful. + MinAliveConnTime time.Duration } // Manager is a manager for notifications that the swap server sends to the @@ -60,6 +71,11 @@ type Manager struct { // NewManager creates a new notification manager. func NewManager(cfg *Config) *Manager { + // Set the default minimum alive connection time if it's not set. + if cfg.MinAliveConnTime == 0 { + cfg.MinAliveConnTime = defaultMinAliveConnTime + } + return &Manager{ cfg: cfg, subscribers: make(map[NotificationType][]subscriber), @@ -128,13 +144,18 @@ func (m *Manager) SubscribeStaticLoopInSweepRequests(ctx context.Context, // close the readyChan to signal that the manager is ready. func (m *Manager) Run(ctx context.Context) error { // Initially we want to immediately try to connect to the server. - waitTime := time.Duration(0) + var ( + waitTime time.Duration + backoff time.Duration + connAttempts int + ) // Start the notification runloop. for { - timer := time.NewTimer(waitTime) // Increase the wait time for the next iteration. - waitTime += time.Second * 1 + backoff = waitTime + time.Duration(connAttempts)*time.Second + waitTime = 0 + timer := time.NewTimer(backoff) // Return if the context has been canceled. select { @@ -145,37 +166,55 @@ func (m *Manager) Run(ctx context.Context) error { } // In order to create a valid l402 we first are going to call - // the FetchL402 method. As a client might not have outbound capacity - // yet, we'll retry until we get a valid response. + // the FetchL402 method. As a client might not have outbound + // capacity yet, we'll retry until we get a valid response. if !m.hasL402 { _, err := m.cfg.CurrentToken() if err != nil { - // We only log the error if it's not the case that we - // don't have a token yet to avoid spamming the logs. + // We only log the error if it's not the case + // that we don't have a token yet to avoid + // spamming the logs. if err != l402.ErrNoToken { - log.Errorf("Error getting L402 from store: %v", err) + log.Errorf("Error getting L402 from "+ + "the store: %v", err) } continue } m.hasL402 = true } - connectedFunc := func() { - // Reset the wait time to 10 seconds. - waitTime = time.Second * 10 - } - - err := m.subscribeNotifications(ctx, connectedFunc) + connectAttempted := time.Now() + err := m.subscribeNotifications(ctx) if err != nil { - log.Errorf("Error subscribing to notifications: %v", err) + log.Errorf("Error subscribing to notifications: %v", + err) + } + connectionAliveTime := time.Since(connectAttempted) + + // Note that we may be able to connet to the stream but not + // able to use it if the client is unable to pay for their + // L402. In this case the subscription will fail on the first + // read immediately after connecting. We'll therefore only + // consider the connection successful if we were able to use + // the stream for at least the minimum alive connection time + // (which defaults to 1 minute). + if connectionAliveTime > m.cfg.MinAliveConnTime { + // Reset the backoff to 10 seconds and the connect + // attempts to zero if we were really connected for a + // considerable amount of time (1 minute). + waitTime = time.Second * 10 + connAttempts = 0 + } else { + // We either failed to connect or the stream + // disconnected immediately, so we just increase the + // backoff. + connAttempts++ } } } // subscribeNotifications subscribes to the notifications from the server. -func (m *Manager) subscribeNotifications(ctx context.Context, - connectedFunc func()) error { - +func (m *Manager) subscribeNotifications(ctx context.Context) error { callCtx, cancel := context.WithCancel(ctx) defer cancel() @@ -186,8 +225,6 @@ func (m *Manager) subscribeNotifications(ctx context.Context, return err } - // Signal that we're connected to the server. - connectedFunc() log.Debugf("Successfully subscribed to server notifications") for { diff --git a/notifications/manager_test.go b/notifications/manager_test.go index 82af5e672..7584cb0fa 100644 --- a/notifications/manager_test.go +++ b/notifications/manager_test.go @@ -2,6 +2,7 @@ package notifications import ( "context" + "errors" "io" "sync" "testing" @@ -23,6 +24,7 @@ var ( type mockNotificationsClient struct { mockStream swapserverrpc.SwapServer_SubscribeNotificationsClient subscribeErr error + attemptTimes []time.Time timesCalled int sync.Mutex } @@ -36,6 +38,7 @@ func (m *mockNotificationsClient) SubscribeNotifications(ctx context.Context, defer m.Unlock() m.timesCalled++ + m.attemptTimes = append(m.attemptTimes, time.Now()) if m.subscribeErr != nil { return nil, m.subscribeErr } @@ -87,7 +90,11 @@ func (m *mockSubscribeNotificationsClient) RecvMsg(interface{}) error { return nil } +// TestManager_ReservationNotification tests that the Manager correctly +// forwards reservation notifications to subscribers. func TestManager_ReservationNotification(t *testing.T) { + t.Parallel() + // Create a mock notification client recvChan := make(chan *swapserverrpc.SubscribeNotificationsResponse, 1) errChan := make(chan error, 1) @@ -172,3 +179,169 @@ func getTestNotification(resId []byte) *swapserverrpc.SubscribeNotificationsResp }, } } + +// TestManager_Backoff verifies that repeated failures in +// subscribeNotifications cause the Manager to space out subscription attempts +// via a predictable incremental backoff. +func TestManager_Backoff(t *testing.T) { + t.Parallel() + + // We'll tolerate a bit of jitter in the timing checks. + const tolerance = 300 * time.Millisecond + + recvChan := make(chan *swapserverrpc.SubscribeNotificationsResponse) + recvErrChan := make(chan error) + + mockStream := &mockSubscribeNotificationsClient{ + recvChan: recvChan, + recvErrChan: recvErrChan, + } + + // Create a new mock client that will fail to subscribe. + mockClient := &mockNotificationsClient{ + mockStream: mockStream, + subscribeErr: errors.New("failing on purpose"), + } + + // Manager with a successful CurrentToken so that it always tries + // to subscribe. + mgr := NewManager(&Config{ + Client: mockClient, + CurrentToken: func() (*l402.Token, error) { + return &l402.Token{}, nil + }, + }) + + // Run the manager in a background goroutine. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + // We ignore the returned error because the Manager returns + // nil on context cancel. + _ = mgr.Run(ctx) + }() + + // Wait long enough to see at least 3 subscription attempts using + // the Manager's default pattern. + // We'll wait ~5 seconds total so we capture at least 3 attempts: + // - Attempt #1: immediate + // - Attempt #2: ~1 second + // - Attempt #3: ~3 seconds after that etc. + time.Sleep(5 * time.Second) + + // Cancel the contedt to stop the manager. + cancel() + wg.Wait() + + // Check how many attempts we made. + require.GreaterOrEqual(t, len(mockClient.attemptTimes), 3, + "expected at least 3 attempts within 5 seconds", + ) + + expectedDelay := time.Second + for i := 1; i < len(mockClient.attemptTimes); i++ { + // The expected delay for the i-th gap (comparing attempt i to + // attempt i-1) is i seconds (because the manager increments + // the backoff by 1 second each time). + actualDelay := mockClient.attemptTimes[i].Sub( + mockClient.attemptTimes[i-1], + ) + + require.InDeltaf( + t, expectedDelay, actualDelay, float64(tolerance), + "Attempt %d -> Attempt %d delay should be ~%v, got %v", + i, i+1, expectedDelay, actualDelay, + ) + + expectedDelay += time.Second + } +} + +// TestManager_MinAliveConnTime verifies that the Manager enforces the minimum +// alive connection time before considering a subscription successful. +func TestManager_MinAliveConnTime(t *testing.T) { + t.Parallel() + + // Tolerance to allow for scheduling jitter. + const tolerance = 300 * time.Millisecond + + // Set a small MinAliveConnTime so the test doesn't run too long. + // Once a subscription stays alive longer than 2s, the manager resets + // its backoff to 10s on the next loop iteration. + const minAlive = 1 * time.Second + + // We'll provide a channel for incoming notifications + // and another for forcing errors to close the subscription. + recvChan := make(chan *swapserverrpc.SubscribeNotificationsResponse) + recvErrChan := make(chan error) + + mockStream := &mockSubscribeNotificationsClient{ + recvChan: recvChan, + recvErrChan: recvErrChan, + } + + // No immediate error from SubscribeNotifications, so it "succeeds". + // We trigger subscription closure by sending an error to recvErrChan. + mockClient := &mockNotificationsClient{ + mockStream: mockStream, + // subscribeErr stays nil => success on each call. + } + + // Create a Manager that uses our mock client and enforces + // MinAliveConnTime=2s. + mgr := NewManager(&Config{ + Client: mockClient, + MinAliveConnTime: minAlive, + CurrentToken: func() (*l402.Token, error) { + return &l402.Token{}, nil + }, + }) + + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + _ = mgr.Run(ctx) + }() + + // Let the subscription stay alive for 2s, which is >1s (minAlive). + // Then force an error to end the subscription. The manager sees + // it stayed connected ~2s and resets its backoff to 10s. + go func() { + time.Sleep(2 * time.Second) + recvErrChan <- errors.New("mock subscription closed") + }() + + // Wait enough time (~13s) to see: + // - First subscription (2s) + // - Manager resets to 10s + // - Second subscription attempt starts ~10s later. + time.Sleep(13 * time.Second) + + // Signal EOF so the subscription stops. + close(recvChan) + + // Stop the manager and wait for cleanup. + cancel() + wg.Wait() + + // Expect at least 2 attempts in attemptTimes: + // 1) The one that stayed alive for 2s, + // 2) The next attempt ~10s after that. + require.GreaterOrEqual( + t, len(mockClient.attemptTimes), 2, + "expected at least 2 attempts with a successful subscription", + ) + + require.InDeltaf( + t, 12*time.Second, + mockClient.attemptTimes[1].Sub(mockClient.attemptTimes[0]), + float64(tolerance), + "Second attempt should occur ~2s after the first", + ) +} From 61e1528526762797ef19d0d4e87fe4abb885db91 Mon Sep 17 00:00:00 2001 From: Andras Banki-Horvath Date: Wed, 29 Jan 2025 20:31:58 +0100 Subject: [PATCH 2/2] notifications: handle pending L402 payment --- notifications/manager.go | 26 +++++++--- notifications/manager_test.go | 92 +++++++++++++++++++++++++++++++++-- 2 files changed, 107 insertions(+), 11 deletions(-) diff --git a/notifications/manager.go b/notifications/manager.go index d3cf194c1..023fbcafd 100644 --- a/notifications/manager.go +++ b/notifications/manager.go @@ -7,6 +7,7 @@ import ( "github.com/lightninglabs/aperture/l402" "github.com/lightninglabs/loop/swapserverrpc" + "github.com/lightningnetwork/lnd/lntypes" "google.golang.org/grpc" ) @@ -145,15 +146,15 @@ func (m *Manager) SubscribeStaticLoopInSweepRequests(ctx context.Context, func (m *Manager) Run(ctx context.Context) error { // Initially we want to immediately try to connect to the server. var ( - waitTime time.Duration - backoff time.Duration - connAttempts int + waitTime time.Duration + backoff time.Duration + attempts int ) // Start the notification runloop. for { // Increase the wait time for the next iteration. - backoff = waitTime + time.Duration(connAttempts)*time.Second + backoff = waitTime + time.Duration(attempts)*time.Second waitTime = 0 timer := time.NewTimer(backoff) @@ -169,7 +170,7 @@ func (m *Manager) Run(ctx context.Context) error { // the FetchL402 method. As a client might not have outbound // capacity yet, we'll retry until we get a valid response. if !m.hasL402 { - _, err := m.cfg.CurrentToken() + token, err := m.cfg.CurrentToken() if err != nil { // We only log the error if it's not the case // that we don't have a token yet to avoid @@ -180,6 +181,17 @@ func (m *Manager) Run(ctx context.Context) error { } continue } + + // If the preimage is empty, we don't have a valid L402 + // yet so we'll continue to retry with the incremental + // backoff. + emptyPreimage := lntypes.Preimage{} + if token.Preimage == emptyPreimage { + attempts++ + continue + } + + attempts = 0 m.hasL402 = true } @@ -203,12 +215,12 @@ func (m *Manager) Run(ctx context.Context) error { // attempts to zero if we were really connected for a // considerable amount of time (1 minute). waitTime = time.Second * 10 - connAttempts = 0 + attempts = 0 } else { // We either failed to connect or the stream // disconnected immediately, so we just increase the // backoff. - connAttempts++ + attempts++ } } } diff --git a/notifications/manager_test.go b/notifications/manager_test.go index 7584cb0fa..e2f9fbe01 100644 --- a/notifications/manager_test.go +++ b/notifications/manager_test.go @@ -10,6 +10,7 @@ import ( "github.com/lightninglabs/aperture/l402" "github.com/lightninglabs/loop/swapserverrpc" + "github.com/lightningnetwork/lnd/lntypes" "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/grpc/metadata" @@ -111,7 +112,9 @@ func TestManager_ReservationNotification(t *testing.T) { Client: mockClient, CurrentToken: func() (*l402.Token, error) { // Simulate successful fetching of L402 - return nil, nil + return &l402.Token{ + Preimage: lntypes.Preimage{1, 2, 3}, + }, nil }, }) @@ -208,7 +211,10 @@ func TestManager_Backoff(t *testing.T) { mgr := NewManager(&Config{ Client: mockClient, CurrentToken: func() (*l402.Token, error) { - return &l402.Token{}, nil + // Simulate successful fetching of L402 + return &l402.Token{ + Preimage: lntypes.Preimage{1, 2, 3}, + }, nil }, }) @@ -233,7 +239,7 @@ func TestManager_Backoff(t *testing.T) { // - Attempt #3: ~3 seconds after that etc. time.Sleep(5 * time.Second) - // Cancel the contedt to stop the manager. + // Cancel the context to stop the manager. cancel() wg.Wait() @@ -297,7 +303,10 @@ func TestManager_MinAliveConnTime(t *testing.T) { Client: mockClient, MinAliveConnTime: minAlive, CurrentToken: func() (*l402.Token, error) { - return &l402.Token{}, nil + // Simulate successful fetching of L402 + return &l402.Token{ + Preimage: lntypes.Preimage{1, 2, 3}, + }, nil }, }) @@ -345,3 +354,78 @@ func TestManager_MinAliveConnTime(t *testing.T) { "Second attempt should occur ~2s after the first", ) } + +// TestManager_Backoff_Pending_Token verifies that the Manager backs off when +// the token is pending. +func TestManager_Backoff_Pending_Token(t *testing.T) { + t.Parallel() + + // We'll tolerate a bit of jitter in the timing checks. + const tolerance = 300 * time.Millisecond + + recvChan := make(chan *swapserverrpc.SubscribeNotificationsResponse) + recvErrChan := make(chan error) + + mockStream := &mockSubscribeNotificationsClient{ + recvChan: recvChan, + recvErrChan: recvErrChan, + } + + // Create a new mock client that will fail to subscribe. + mockClient := &mockNotificationsClient{ + mockStream: mockStream, + // subscribeErr stays nil => would succeed on each call. + } + + var tokenCalls []time.Time + // Manager with a successful CurrentToken so that it always tries + // to subscribe. + mgr := NewManager(&Config{ + Client: mockClient, + CurrentToken: func() (*l402.Token, error) { + tokenCalls = append(tokenCalls, time.Now()) + if len(tokenCalls) < 3 { + // Simulate a pending token. + return &l402.Token{}, nil + } + + // Simulate successful fetching of L402 + return &l402.Token{ + Preimage: lntypes.Preimage{1, 2, 3}, + }, nil + }, + }) + + // Run the manager in a background goroutine. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + // We ignore the returned error because the Manager returns + // nil on context cancel. + _ = mgr.Run(ctx) + }() + + // Wait long enough to see at least 3 token calls, so we can see that + // we'll indeed backoff when the token is pending. + time.Sleep(5 * time.Second) + + // Signal EOF so the subscription stops. + close(recvChan) + + // Cancel the context to stop the manager. + cancel() + wg.Wait() + + // Expect exactly 3 token calls. + require.Equal(t, 3, len(tokenCalls)) + + require.InDeltaf( + t, 3*time.Second, tokenCalls[2].Sub(tokenCalls[0]), + float64(tolerance), + "Expected to backoff for at ~3 seconds due to pending token", + ) +}