From 43aa5664fe1da02e96416ff150caca5e6f12c4a6 Mon Sep 17 00:00:00 2001 From: Andras Banki-Horvath Date: Wed, 29 Jan 2025 20:21:52 +0100 Subject: [PATCH] notifications: implement incremental backoff for invalid L402 tokens --- notifications/manager.go | 77 +++++++++++---- notifications/manager_test.go | 173 ++++++++++++++++++++++++++++++++++ 2 files changed, 230 insertions(+), 20 deletions(-) diff --git a/notifications/manager.go b/notifications/manager.go index edbfd306f..d3cf194c1 100644 --- a/notifications/manager.go +++ b/notifications/manager.go @@ -26,6 +26,13 @@ const ( NotificationTypeStaticLoopInSweepRequest ) +const ( + // defaultMinAliveConnTime is the default minimum time that the + // connection to the server needs to be alive before we consider it a + // successful connection. + defaultMinAliveConnTime = time.Minute +) + // Client is the interface that the notification manager needs to implement in // order to be able to subscribe to notifications. type Client interface { @@ -45,6 +52,10 @@ type Config struct { // CurrentToken returns the token that is currently contained in the // store or an l402.ErrNoToken error if there is none. CurrentToken func() (*l402.Token, error) + + // MinAliveConnTime is the minimum time that the connection to the + // server needs to be alive before we consider it a successful. + MinAliveConnTime time.Duration } // Manager is a manager for notifications that the swap server sends to the @@ -60,6 +71,11 @@ type Manager struct { // NewManager creates a new notification manager. func NewManager(cfg *Config) *Manager { + // Set the default minimum alive connection time if it's not set. + if cfg.MinAliveConnTime == 0 { + cfg.MinAliveConnTime = defaultMinAliveConnTime + } + return &Manager{ cfg: cfg, subscribers: make(map[NotificationType][]subscriber), @@ -128,13 +144,18 @@ func (m *Manager) SubscribeStaticLoopInSweepRequests(ctx context.Context, // close the readyChan to signal that the manager is ready. func (m *Manager) Run(ctx context.Context) error { // Initially we want to immediately try to connect to the server. - waitTime := time.Duration(0) + var ( + waitTime time.Duration + backoff time.Duration + connAttempts int + ) // Start the notification runloop. for { - timer := time.NewTimer(waitTime) // Increase the wait time for the next iteration. - waitTime += time.Second * 1 + backoff = waitTime + time.Duration(connAttempts)*time.Second + waitTime = 0 + timer := time.NewTimer(backoff) // Return if the context has been canceled. select { @@ -145,37 +166,55 @@ func (m *Manager) Run(ctx context.Context) error { } // In order to create a valid l402 we first are going to call - // the FetchL402 method. As a client might not have outbound capacity - // yet, we'll retry until we get a valid response. + // the FetchL402 method. As a client might not have outbound + // capacity yet, we'll retry until we get a valid response. if !m.hasL402 { _, err := m.cfg.CurrentToken() if err != nil { - // We only log the error if it's not the case that we - // don't have a token yet to avoid spamming the logs. + // We only log the error if it's not the case + // that we don't have a token yet to avoid + // spamming the logs. if err != l402.ErrNoToken { - log.Errorf("Error getting L402 from store: %v", err) + log.Errorf("Error getting L402 from "+ + "the store: %v", err) } continue } m.hasL402 = true } - connectedFunc := func() { - // Reset the wait time to 10 seconds. - waitTime = time.Second * 10 - } - - err := m.subscribeNotifications(ctx, connectedFunc) + connectAttempted := time.Now() + err := m.subscribeNotifications(ctx) if err != nil { - log.Errorf("Error subscribing to notifications: %v", err) + log.Errorf("Error subscribing to notifications: %v", + err) + } + connectionAliveTime := time.Since(connectAttempted) + + // Note that we may be able to connet to the stream but not + // able to use it if the client is unable to pay for their + // L402. In this case the subscription will fail on the first + // read immediately after connecting. We'll therefore only + // consider the connection successful if we were able to use + // the stream for at least the minimum alive connection time + // (which defaults to 1 minute). + if connectionAliveTime > m.cfg.MinAliveConnTime { + // Reset the backoff to 10 seconds and the connect + // attempts to zero if we were really connected for a + // considerable amount of time (1 minute). + waitTime = time.Second * 10 + connAttempts = 0 + } else { + // We either failed to connect or the stream + // disconnected immediately, so we just increase the + // backoff. + connAttempts++ } } } // subscribeNotifications subscribes to the notifications from the server. -func (m *Manager) subscribeNotifications(ctx context.Context, - connectedFunc func()) error { - +func (m *Manager) subscribeNotifications(ctx context.Context) error { callCtx, cancel := context.WithCancel(ctx) defer cancel() @@ -186,8 +225,6 @@ func (m *Manager) subscribeNotifications(ctx context.Context, return err } - // Signal that we're connected to the server. - connectedFunc() log.Debugf("Successfully subscribed to server notifications") for { diff --git a/notifications/manager_test.go b/notifications/manager_test.go index 82af5e672..7584cb0fa 100644 --- a/notifications/manager_test.go +++ b/notifications/manager_test.go @@ -2,6 +2,7 @@ package notifications import ( "context" + "errors" "io" "sync" "testing" @@ -23,6 +24,7 @@ var ( type mockNotificationsClient struct { mockStream swapserverrpc.SwapServer_SubscribeNotificationsClient subscribeErr error + attemptTimes []time.Time timesCalled int sync.Mutex } @@ -36,6 +38,7 @@ func (m *mockNotificationsClient) SubscribeNotifications(ctx context.Context, defer m.Unlock() m.timesCalled++ + m.attemptTimes = append(m.attemptTimes, time.Now()) if m.subscribeErr != nil { return nil, m.subscribeErr } @@ -87,7 +90,11 @@ func (m *mockSubscribeNotificationsClient) RecvMsg(interface{}) error { return nil } +// TestManager_ReservationNotification tests that the Manager correctly +// forwards reservation notifications to subscribers. func TestManager_ReservationNotification(t *testing.T) { + t.Parallel() + // Create a mock notification client recvChan := make(chan *swapserverrpc.SubscribeNotificationsResponse, 1) errChan := make(chan error, 1) @@ -172,3 +179,169 @@ func getTestNotification(resId []byte) *swapserverrpc.SubscribeNotificationsResp }, } } + +// TestManager_Backoff verifies that repeated failures in +// subscribeNotifications cause the Manager to space out subscription attempts +// via a predictable incremental backoff. +func TestManager_Backoff(t *testing.T) { + t.Parallel() + + // We'll tolerate a bit of jitter in the timing checks. + const tolerance = 300 * time.Millisecond + + recvChan := make(chan *swapserverrpc.SubscribeNotificationsResponse) + recvErrChan := make(chan error) + + mockStream := &mockSubscribeNotificationsClient{ + recvChan: recvChan, + recvErrChan: recvErrChan, + } + + // Create a new mock client that will fail to subscribe. + mockClient := &mockNotificationsClient{ + mockStream: mockStream, + subscribeErr: errors.New("failing on purpose"), + } + + // Manager with a successful CurrentToken so that it always tries + // to subscribe. + mgr := NewManager(&Config{ + Client: mockClient, + CurrentToken: func() (*l402.Token, error) { + return &l402.Token{}, nil + }, + }) + + // Run the manager in a background goroutine. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + // We ignore the returned error because the Manager returns + // nil on context cancel. + _ = mgr.Run(ctx) + }() + + // Wait long enough to see at least 3 subscription attempts using + // the Manager's default pattern. + // We'll wait ~5 seconds total so we capture at least 3 attempts: + // - Attempt #1: immediate + // - Attempt #2: ~1 second + // - Attempt #3: ~3 seconds after that etc. + time.Sleep(5 * time.Second) + + // Cancel the contedt to stop the manager. + cancel() + wg.Wait() + + // Check how many attempts we made. + require.GreaterOrEqual(t, len(mockClient.attemptTimes), 3, + "expected at least 3 attempts within 5 seconds", + ) + + expectedDelay := time.Second + for i := 1; i < len(mockClient.attemptTimes); i++ { + // The expected delay for the i-th gap (comparing attempt i to + // attempt i-1) is i seconds (because the manager increments + // the backoff by 1 second each time). + actualDelay := mockClient.attemptTimes[i].Sub( + mockClient.attemptTimes[i-1], + ) + + require.InDeltaf( + t, expectedDelay, actualDelay, float64(tolerance), + "Attempt %d -> Attempt %d delay should be ~%v, got %v", + i, i+1, expectedDelay, actualDelay, + ) + + expectedDelay += time.Second + } +} + +// TestManager_MinAliveConnTime verifies that the Manager enforces the minimum +// alive connection time before considering a subscription successful. +func TestManager_MinAliveConnTime(t *testing.T) { + t.Parallel() + + // Tolerance to allow for scheduling jitter. + const tolerance = 300 * time.Millisecond + + // Set a small MinAliveConnTime so the test doesn't run too long. + // Once a subscription stays alive longer than 2s, the manager resets + // its backoff to 10s on the next loop iteration. + const minAlive = 1 * time.Second + + // We'll provide a channel for incoming notifications + // and another for forcing errors to close the subscription. + recvChan := make(chan *swapserverrpc.SubscribeNotificationsResponse) + recvErrChan := make(chan error) + + mockStream := &mockSubscribeNotificationsClient{ + recvChan: recvChan, + recvErrChan: recvErrChan, + } + + // No immediate error from SubscribeNotifications, so it "succeeds". + // We trigger subscription closure by sending an error to recvErrChan. + mockClient := &mockNotificationsClient{ + mockStream: mockStream, + // subscribeErr stays nil => success on each call. + } + + // Create a Manager that uses our mock client and enforces + // MinAliveConnTime=2s. + mgr := NewManager(&Config{ + Client: mockClient, + MinAliveConnTime: minAlive, + CurrentToken: func() (*l402.Token, error) { + return &l402.Token{}, nil + }, + }) + + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + _ = mgr.Run(ctx) + }() + + // Let the subscription stay alive for 2s, which is >1s (minAlive). + // Then force an error to end the subscription. The manager sees + // it stayed connected ~2s and resets its backoff to 10s. + go func() { + time.Sleep(2 * time.Second) + recvErrChan <- errors.New("mock subscription closed") + }() + + // Wait enough time (~13s) to see: + // - First subscription (2s) + // - Manager resets to 10s + // - Second subscription attempt starts ~10s later. + time.Sleep(13 * time.Second) + + // Signal EOF so the subscription stops. + close(recvChan) + + // Stop the manager and wait for cleanup. + cancel() + wg.Wait() + + // Expect at least 2 attempts in attemptTimes: + // 1) The one that stayed alive for 2s, + // 2) The next attempt ~10s after that. + require.GreaterOrEqual( + t, len(mockClient.attemptTimes), 2, + "expected at least 2 attempts with a successful subscription", + ) + + require.InDeltaf( + t, 12*time.Second, + mockClient.attemptTimes[1].Sub(mockClient.attemptTimes[0]), + float64(tolerance), + "Second attempt should occur ~2s after the first", + ) +}