Skip to content

Commit

Permalink
notifications: handle pending L402 payment
Browse files Browse the repository at this point in the history
  • Loading branch information
bhandras committed Jan 29, 2025
1 parent 43aa566 commit 61e1528
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 11 deletions.
26 changes: 19 additions & 7 deletions notifications/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/lightninglabs/aperture/l402"
"github.com/lightninglabs/loop/swapserverrpc"
"github.com/lightningnetwork/lnd/lntypes"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -145,15 +146,15 @@ func (m *Manager) SubscribeStaticLoopInSweepRequests(ctx context.Context,
func (m *Manager) Run(ctx context.Context) error {
// Initially we want to immediately try to connect to the server.
var (
waitTime time.Duration
backoff time.Duration
connAttempts int
waitTime time.Duration
backoff time.Duration
attempts int
)

// Start the notification runloop.
for {
// Increase the wait time for the next iteration.
backoff = waitTime + time.Duration(connAttempts)*time.Second
backoff = waitTime + time.Duration(attempts)*time.Second
waitTime = 0
timer := time.NewTimer(backoff)

Expand All @@ -169,7 +170,7 @@ func (m *Manager) Run(ctx context.Context) error {
// the FetchL402 method. As a client might not have outbound
// capacity yet, we'll retry until we get a valid response.
if !m.hasL402 {
_, err := m.cfg.CurrentToken()
token, err := m.cfg.CurrentToken()
if err != nil {
// We only log the error if it's not the case
// that we don't have a token yet to avoid
Expand All @@ -180,6 +181,17 @@ func (m *Manager) Run(ctx context.Context) error {
}
continue
}

// If the preimage is empty, we don't have a valid L402
// yet so we'll continue to retry with the incremental
// backoff.
emptyPreimage := lntypes.Preimage{}
if token.Preimage == emptyPreimage {
attempts++
continue
}

attempts = 0
m.hasL402 = true
}

Expand All @@ -203,12 +215,12 @@ func (m *Manager) Run(ctx context.Context) error {
// attempts to zero if we were really connected for a
// considerable amount of time (1 minute).
waitTime = time.Second * 10
connAttempts = 0
attempts = 0
} else {
// We either failed to connect or the stream
// disconnected immediately, so we just increase the
// backoff.
connAttempts++
attempts++
}
}
}
Expand Down
92 changes: 88 additions & 4 deletions notifications/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/lightninglabs/aperture/l402"
"github.com/lightninglabs/loop/swapserverrpc"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -111,7 +112,9 @@ func TestManager_ReservationNotification(t *testing.T) {
Client: mockClient,
CurrentToken: func() (*l402.Token, error) {
// Simulate successful fetching of L402
return nil, nil
return &l402.Token{
Preimage: lntypes.Preimage{1, 2, 3},
}, nil
},
})

Expand Down Expand Up @@ -208,7 +211,10 @@ func TestManager_Backoff(t *testing.T) {
mgr := NewManager(&Config{
Client: mockClient,
CurrentToken: func() (*l402.Token, error) {
return &l402.Token{}, nil
// Simulate successful fetching of L402
return &l402.Token{
Preimage: lntypes.Preimage{1, 2, 3},
}, nil
},
})

Expand All @@ -233,7 +239,7 @@ func TestManager_Backoff(t *testing.T) {
// - Attempt #3: ~3 seconds after that etc.
time.Sleep(5 * time.Second)

// Cancel the contedt to stop the manager.
// Cancel the context to stop the manager.
cancel()
wg.Wait()

Expand Down Expand Up @@ -297,7 +303,10 @@ func TestManager_MinAliveConnTime(t *testing.T) {
Client: mockClient,
MinAliveConnTime: minAlive,
CurrentToken: func() (*l402.Token, error) {
return &l402.Token{}, nil
// Simulate successful fetching of L402
return &l402.Token{
Preimage: lntypes.Preimage{1, 2, 3},
}, nil
},
})

Expand Down Expand Up @@ -345,3 +354,78 @@ func TestManager_MinAliveConnTime(t *testing.T) {
"Second attempt should occur ~2s after the first",
)
}

// TestManager_Backoff_Pending_Token verifies that the Manager backs off when
// the token is pending.
func TestManager_Backoff_Pending_Token(t *testing.T) {
t.Parallel()

// We'll tolerate a bit of jitter in the timing checks.
const tolerance = 300 * time.Millisecond

recvChan := make(chan *swapserverrpc.SubscribeNotificationsResponse)
recvErrChan := make(chan error)

mockStream := &mockSubscribeNotificationsClient{
recvChan: recvChan,
recvErrChan: recvErrChan,
}

// Create a new mock client that will fail to subscribe.
mockClient := &mockNotificationsClient{
mockStream: mockStream,
// subscribeErr stays nil => would succeed on each call.
}

var tokenCalls []time.Time
// Manager with a successful CurrentToken so that it always tries
// to subscribe.
mgr := NewManager(&Config{
Client: mockClient,
CurrentToken: func() (*l402.Token, error) {
tokenCalls = append(tokenCalls, time.Now())
if len(tokenCalls) < 3 {
// Simulate a pending token.
return &l402.Token{}, nil
}

// Simulate successful fetching of L402
return &l402.Token{
Preimage: lntypes.Preimage{1, 2, 3},
}, nil
},
})

// Run the manager in a background goroutine.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
// We ignore the returned error because the Manager returns
// nil on context cancel.
_ = mgr.Run(ctx)
}()

// Wait long enough to see at least 3 token calls, so we can see that
// we'll indeed backoff when the token is pending.
time.Sleep(5 * time.Second)

// Signal EOF so the subscription stops.
close(recvChan)

// Cancel the context to stop the manager.
cancel()
wg.Wait()

// Expect exactly 3 token calls.
require.Equal(t, 3, len(tokenCalls))

require.InDeltaf(
t, 3*time.Second, tokenCalls[2].Sub(tokenCalls[0]),
float64(tolerance),
"Expected to backoff for at ~3 seconds due to pending token",
)
}

0 comments on commit 61e1528

Please sign in to comment.