Skip to content

Commit

Permalink
notifications: implement incremental backoff for invalid L402 tokens
Browse files Browse the repository at this point in the history
  • Loading branch information
bhandras committed Jan 29, 2025
1 parent 7cfceb7 commit 43aa566
Show file tree
Hide file tree
Showing 2 changed files with 230 additions and 20 deletions.
77 changes: 57 additions & 20 deletions notifications/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ const (
NotificationTypeStaticLoopInSweepRequest
)

const (
// defaultMinAliveConnTime is the default minimum time that the
// connection to the server needs to be alive before we consider it a
// successful connection.
defaultMinAliveConnTime = time.Minute
)

// Client is the interface that the notification manager needs to implement in
// order to be able to subscribe to notifications.
type Client interface {
Expand All @@ -45,6 +52,10 @@ type Config struct {
// CurrentToken returns the token that is currently contained in the
// store or an l402.ErrNoToken error if there is none.
CurrentToken func() (*l402.Token, error)

// MinAliveConnTime is the minimum time that the connection to the
// server needs to be alive before we consider it a successful.
MinAliveConnTime time.Duration
}

// Manager is a manager for notifications that the swap server sends to the
Expand All @@ -60,6 +71,11 @@ type Manager struct {

// NewManager creates a new notification manager.
func NewManager(cfg *Config) *Manager {
// Set the default minimum alive connection time if it's not set.
if cfg.MinAliveConnTime == 0 {
cfg.MinAliveConnTime = defaultMinAliveConnTime
}

return &Manager{
cfg: cfg,
subscribers: make(map[NotificationType][]subscriber),
Expand Down Expand Up @@ -128,13 +144,18 @@ func (m *Manager) SubscribeStaticLoopInSweepRequests(ctx context.Context,
// close the readyChan to signal that the manager is ready.
func (m *Manager) Run(ctx context.Context) error {
// Initially we want to immediately try to connect to the server.
waitTime := time.Duration(0)
var (
waitTime time.Duration
backoff time.Duration
connAttempts int
)

// Start the notification runloop.
for {
timer := time.NewTimer(waitTime)
// Increase the wait time for the next iteration.
waitTime += time.Second * 1
backoff = waitTime + time.Duration(connAttempts)*time.Second
waitTime = 0
timer := time.NewTimer(backoff)

// Return if the context has been canceled.
select {
Expand All @@ -145,37 +166,55 @@ func (m *Manager) Run(ctx context.Context) error {
}

// In order to create a valid l402 we first are going to call
// the FetchL402 method. As a client might not have outbound capacity
// yet, we'll retry until we get a valid response.
// the FetchL402 method. As a client might not have outbound
// capacity yet, we'll retry until we get a valid response.
if !m.hasL402 {
_, err := m.cfg.CurrentToken()
if err != nil {
// We only log the error if it's not the case that we
// don't have a token yet to avoid spamming the logs.
// We only log the error if it's not the case
// that we don't have a token yet to avoid
// spamming the logs.
if err != l402.ErrNoToken {
log.Errorf("Error getting L402 from store: %v", err)
log.Errorf("Error getting L402 from "+
"the store: %v", err)
}
continue
}
m.hasL402 = true
}

connectedFunc := func() {
// Reset the wait time to 10 seconds.
waitTime = time.Second * 10
}

err := m.subscribeNotifications(ctx, connectedFunc)
connectAttempted := time.Now()
err := m.subscribeNotifications(ctx)
if err != nil {
log.Errorf("Error subscribing to notifications: %v", err)
log.Errorf("Error subscribing to notifications: %v",
err)
}
connectionAliveTime := time.Since(connectAttempted)

// Note that we may be able to connet to the stream but not
// able to use it if the client is unable to pay for their
// L402. In this case the subscription will fail on the first
// read immediately after connecting. We'll therefore only
// consider the connection successful if we were able to use
// the stream for at least the minimum alive connection time
// (which defaults to 1 minute).
if connectionAliveTime > m.cfg.MinAliveConnTime {
// Reset the backoff to 10 seconds and the connect
// attempts to zero if we were really connected for a
// considerable amount of time (1 minute).
waitTime = time.Second * 10
connAttempts = 0
} else {
// We either failed to connect or the stream
// disconnected immediately, so we just increase the
// backoff.
connAttempts++
}
}
}

// subscribeNotifications subscribes to the notifications from the server.
func (m *Manager) subscribeNotifications(ctx context.Context,
connectedFunc func()) error {

func (m *Manager) subscribeNotifications(ctx context.Context) error {
callCtx, cancel := context.WithCancel(ctx)
defer cancel()

Expand All @@ -186,8 +225,6 @@ func (m *Manager) subscribeNotifications(ctx context.Context,
return err
}

// Signal that we're connected to the server.
connectedFunc()
log.Debugf("Successfully subscribed to server notifications")

for {
Expand Down
173 changes: 173 additions & 0 deletions notifications/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package notifications

import (
"context"
"errors"
"io"
"sync"
"testing"
Expand All @@ -23,6 +24,7 @@ var (
type mockNotificationsClient struct {
mockStream swapserverrpc.SwapServer_SubscribeNotificationsClient
subscribeErr error
attemptTimes []time.Time
timesCalled int
sync.Mutex
}
Expand All @@ -36,6 +38,7 @@ func (m *mockNotificationsClient) SubscribeNotifications(ctx context.Context,
defer m.Unlock()

m.timesCalled++
m.attemptTimes = append(m.attemptTimes, time.Now())
if m.subscribeErr != nil {
return nil, m.subscribeErr
}
Expand Down Expand Up @@ -87,7 +90,11 @@ func (m *mockSubscribeNotificationsClient) RecvMsg(interface{}) error {
return nil
}

// TestManager_ReservationNotification tests that the Manager correctly
// forwards reservation notifications to subscribers.
func TestManager_ReservationNotification(t *testing.T) {
t.Parallel()

// Create a mock notification client
recvChan := make(chan *swapserverrpc.SubscribeNotificationsResponse, 1)
errChan := make(chan error, 1)
Expand Down Expand Up @@ -172,3 +179,169 @@ func getTestNotification(resId []byte) *swapserverrpc.SubscribeNotificationsResp
},
}
}

// TestManager_Backoff verifies that repeated failures in
// subscribeNotifications cause the Manager to space out subscription attempts
// via a predictable incremental backoff.
func TestManager_Backoff(t *testing.T) {
t.Parallel()

// We'll tolerate a bit of jitter in the timing checks.
const tolerance = 300 * time.Millisecond

recvChan := make(chan *swapserverrpc.SubscribeNotificationsResponse)
recvErrChan := make(chan error)

mockStream := &mockSubscribeNotificationsClient{
recvChan: recvChan,
recvErrChan: recvErrChan,
}

// Create a new mock client that will fail to subscribe.
mockClient := &mockNotificationsClient{
mockStream: mockStream,
subscribeErr: errors.New("failing on purpose"),
}

// Manager with a successful CurrentToken so that it always tries
// to subscribe.
mgr := NewManager(&Config{
Client: mockClient,
CurrentToken: func() (*l402.Token, error) {
return &l402.Token{}, nil
},
})

// Run the manager in a background goroutine.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
// We ignore the returned error because the Manager returns
// nil on context cancel.
_ = mgr.Run(ctx)
}()

// Wait long enough to see at least 3 subscription attempts using
// the Manager's default pattern.
// We'll wait ~5 seconds total so we capture at least 3 attempts:
// - Attempt #1: immediate
// - Attempt #2: ~1 second
// - Attempt #3: ~3 seconds after that etc.
time.Sleep(5 * time.Second)

// Cancel the contedt to stop the manager.
cancel()
wg.Wait()

// Check how many attempts we made.
require.GreaterOrEqual(t, len(mockClient.attemptTimes), 3,
"expected at least 3 attempts within 5 seconds",
)

expectedDelay := time.Second
for i := 1; i < len(mockClient.attemptTimes); i++ {
// The expected delay for the i-th gap (comparing attempt i to
// attempt i-1) is i seconds (because the manager increments
// the backoff by 1 second each time).
actualDelay := mockClient.attemptTimes[i].Sub(
mockClient.attemptTimes[i-1],
)

require.InDeltaf(
t, expectedDelay, actualDelay, float64(tolerance),
"Attempt %d -> Attempt %d delay should be ~%v, got %v",
i, i+1, expectedDelay, actualDelay,
)

expectedDelay += time.Second
}
}

// TestManager_MinAliveConnTime verifies that the Manager enforces the minimum
// alive connection time before considering a subscription successful.
func TestManager_MinAliveConnTime(t *testing.T) {
t.Parallel()

// Tolerance to allow for scheduling jitter.
const tolerance = 300 * time.Millisecond

// Set a small MinAliveConnTime so the test doesn't run too long.
// Once a subscription stays alive longer than 2s, the manager resets
// its backoff to 10s on the next loop iteration.
const minAlive = 1 * time.Second

// We'll provide a channel for incoming notifications
// and another for forcing errors to close the subscription.
recvChan := make(chan *swapserverrpc.SubscribeNotificationsResponse)
recvErrChan := make(chan error)

mockStream := &mockSubscribeNotificationsClient{
recvChan: recvChan,
recvErrChan: recvErrChan,
}

// No immediate error from SubscribeNotifications, so it "succeeds".
// We trigger subscription closure by sending an error to recvErrChan.
mockClient := &mockNotificationsClient{
mockStream: mockStream,
// subscribeErr stays nil => success on each call.
}

// Create a Manager that uses our mock client and enforces
// MinAliveConnTime=2s.
mgr := NewManager(&Config{
Client: mockClient,
MinAliveConnTime: minAlive,
CurrentToken: func() (*l402.Token, error) {
return &l402.Token{}, nil
},
})

ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
_ = mgr.Run(ctx)
}()

// Let the subscription stay alive for 2s, which is >1s (minAlive).
// Then force an error to end the subscription. The manager sees
// it stayed connected ~2s and resets its backoff to 10s.
go func() {
time.Sleep(2 * time.Second)
recvErrChan <- errors.New("mock subscription closed")
}()

// Wait enough time (~13s) to see:
// - First subscription (2s)
// - Manager resets to 10s
// - Second subscription attempt starts ~10s later.
time.Sleep(13 * time.Second)

// Signal EOF so the subscription stops.
close(recvChan)

// Stop the manager and wait for cleanup.
cancel()
wg.Wait()

// Expect at least 2 attempts in attemptTimes:
// 1) The one that stayed alive for 2s,
// 2) The next attempt ~10s after that.
require.GreaterOrEqual(
t, len(mockClient.attemptTimes), 2,
"expected at least 2 attempts with a successful subscription",
)

require.InDeltaf(
t, 12*time.Second,
mockClient.attemptTimes[1].Sub(mockClient.attemptTimes[0]),
float64(tolerance),
"Second attempt should occur ~2s after the first",
)
}

0 comments on commit 43aa566

Please sign in to comment.