From 61e1528526762797ef19d0d4e87fe4abb885db91 Mon Sep 17 00:00:00 2001 From: Andras Banki-Horvath Date: Wed, 29 Jan 2025 20:31:58 +0100 Subject: [PATCH] notifications: handle pending L402 payment --- notifications/manager.go | 26 +++++++--- notifications/manager_test.go | 92 +++++++++++++++++++++++++++++++++-- 2 files changed, 107 insertions(+), 11 deletions(-) diff --git a/notifications/manager.go b/notifications/manager.go index d3cf194c1..023fbcafd 100644 --- a/notifications/manager.go +++ b/notifications/manager.go @@ -7,6 +7,7 @@ import ( "github.com/lightninglabs/aperture/l402" "github.com/lightninglabs/loop/swapserverrpc" + "github.com/lightningnetwork/lnd/lntypes" "google.golang.org/grpc" ) @@ -145,15 +146,15 @@ func (m *Manager) SubscribeStaticLoopInSweepRequests(ctx context.Context, func (m *Manager) Run(ctx context.Context) error { // Initially we want to immediately try to connect to the server. var ( - waitTime time.Duration - backoff time.Duration - connAttempts int + waitTime time.Duration + backoff time.Duration + attempts int ) // Start the notification runloop. for { // Increase the wait time for the next iteration. - backoff = waitTime + time.Duration(connAttempts)*time.Second + backoff = waitTime + time.Duration(attempts)*time.Second waitTime = 0 timer := time.NewTimer(backoff) @@ -169,7 +170,7 @@ func (m *Manager) Run(ctx context.Context) error { // the FetchL402 method. As a client might not have outbound // capacity yet, we'll retry until we get a valid response. if !m.hasL402 { - _, err := m.cfg.CurrentToken() + token, err := m.cfg.CurrentToken() if err != nil { // We only log the error if it's not the case // that we don't have a token yet to avoid @@ -180,6 +181,17 @@ func (m *Manager) Run(ctx context.Context) error { } continue } + + // If the preimage is empty, we don't have a valid L402 + // yet so we'll continue to retry with the incremental + // backoff. + emptyPreimage := lntypes.Preimage{} + if token.Preimage == emptyPreimage { + attempts++ + continue + } + + attempts = 0 m.hasL402 = true } @@ -203,12 +215,12 @@ func (m *Manager) Run(ctx context.Context) error { // attempts to zero if we were really connected for a // considerable amount of time (1 minute). waitTime = time.Second * 10 - connAttempts = 0 + attempts = 0 } else { // We either failed to connect or the stream // disconnected immediately, so we just increase the // backoff. - connAttempts++ + attempts++ } } } diff --git a/notifications/manager_test.go b/notifications/manager_test.go index 7584cb0fa..e2f9fbe01 100644 --- a/notifications/manager_test.go +++ b/notifications/manager_test.go @@ -10,6 +10,7 @@ import ( "github.com/lightninglabs/aperture/l402" "github.com/lightninglabs/loop/swapserverrpc" + "github.com/lightningnetwork/lnd/lntypes" "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/grpc/metadata" @@ -111,7 +112,9 @@ func TestManager_ReservationNotification(t *testing.T) { Client: mockClient, CurrentToken: func() (*l402.Token, error) { // Simulate successful fetching of L402 - return nil, nil + return &l402.Token{ + Preimage: lntypes.Preimage{1, 2, 3}, + }, nil }, }) @@ -208,7 +211,10 @@ func TestManager_Backoff(t *testing.T) { mgr := NewManager(&Config{ Client: mockClient, CurrentToken: func() (*l402.Token, error) { - return &l402.Token{}, nil + // Simulate successful fetching of L402 + return &l402.Token{ + Preimage: lntypes.Preimage{1, 2, 3}, + }, nil }, }) @@ -233,7 +239,7 @@ func TestManager_Backoff(t *testing.T) { // - Attempt #3: ~3 seconds after that etc. time.Sleep(5 * time.Second) - // Cancel the contedt to stop the manager. + // Cancel the context to stop the manager. cancel() wg.Wait() @@ -297,7 +303,10 @@ func TestManager_MinAliveConnTime(t *testing.T) { Client: mockClient, MinAliveConnTime: minAlive, CurrentToken: func() (*l402.Token, error) { - return &l402.Token{}, nil + // Simulate successful fetching of L402 + return &l402.Token{ + Preimage: lntypes.Preimage{1, 2, 3}, + }, nil }, }) @@ -345,3 +354,78 @@ func TestManager_MinAliveConnTime(t *testing.T) { "Second attempt should occur ~2s after the first", ) } + +// TestManager_Backoff_Pending_Token verifies that the Manager backs off when +// the token is pending. +func TestManager_Backoff_Pending_Token(t *testing.T) { + t.Parallel() + + // We'll tolerate a bit of jitter in the timing checks. + const tolerance = 300 * time.Millisecond + + recvChan := make(chan *swapserverrpc.SubscribeNotificationsResponse) + recvErrChan := make(chan error) + + mockStream := &mockSubscribeNotificationsClient{ + recvChan: recvChan, + recvErrChan: recvErrChan, + } + + // Create a new mock client that will fail to subscribe. + mockClient := &mockNotificationsClient{ + mockStream: mockStream, + // subscribeErr stays nil => would succeed on each call. + } + + var tokenCalls []time.Time + // Manager with a successful CurrentToken so that it always tries + // to subscribe. + mgr := NewManager(&Config{ + Client: mockClient, + CurrentToken: func() (*l402.Token, error) { + tokenCalls = append(tokenCalls, time.Now()) + if len(tokenCalls) < 3 { + // Simulate a pending token. + return &l402.Token{}, nil + } + + // Simulate successful fetching of L402 + return &l402.Token{ + Preimage: lntypes.Preimage{1, 2, 3}, + }, nil + }, + }) + + // Run the manager in a background goroutine. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + // We ignore the returned error because the Manager returns + // nil on context cancel. + _ = mgr.Run(ctx) + }() + + // Wait long enough to see at least 3 token calls, so we can see that + // we'll indeed backoff when the token is pending. + time.Sleep(5 * time.Second) + + // Signal EOF so the subscription stops. + close(recvChan) + + // Cancel the context to stop the manager. + cancel() + wg.Wait() + + // Expect exactly 3 token calls. + require.Equal(t, 3, len(tokenCalls)) + + require.InDeltaf( + t, 3*time.Second, tokenCalls[2].Sub(tokenCalls[0]), + float64(tolerance), + "Expected to backoff for at ~3 seconds due to pending token", + ) +}