Skip to content

Commit

Permalink
Merge pull request #879 from bhandras/notification-manager-backoff-fixup
Browse files Browse the repository at this point in the history
notifications: implement incremental backoff for invalid L402 tokens
  • Loading branch information
bhandras authored Jan 29, 2025
2 parents bb859c5 + 61e1528 commit 2b83f45
Show file tree
Hide file tree
Showing 2 changed files with 328 additions and 22 deletions.
91 changes: 70 additions & 21 deletions notifications/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/lightninglabs/aperture/l402"
"github.com/lightninglabs/loop/swapserverrpc"
"github.com/lightningnetwork/lnd/lntypes"
"google.golang.org/grpc"
)

Expand All @@ -26,6 +27,13 @@ const (
NotificationTypeStaticLoopInSweepRequest
)

const (
// defaultMinAliveConnTime is the default minimum time that the
// connection to the server needs to be alive before we consider it a
// successful connection.
defaultMinAliveConnTime = time.Minute
)

// Client is the interface that the notification manager needs to implement in
// order to be able to subscribe to notifications.
type Client interface {
Expand All @@ -45,6 +53,10 @@ type Config struct {
// CurrentToken returns the token that is currently contained in the
// store or an l402.ErrNoToken error if there is none.
CurrentToken func() (*l402.Token, error)

// MinAliveConnTime is the minimum time that the connection to the
// server needs to be alive before we consider it a successful.
MinAliveConnTime time.Duration
}

// Manager is a manager for notifications that the swap server sends to the
Expand All @@ -60,6 +72,11 @@ type Manager struct {

// NewManager creates a new notification manager.
func NewManager(cfg *Config) *Manager {
// Set the default minimum alive connection time if it's not set.
if cfg.MinAliveConnTime == 0 {
cfg.MinAliveConnTime = defaultMinAliveConnTime
}

return &Manager{
cfg: cfg,
subscribers: make(map[NotificationType][]subscriber),
Expand Down Expand Up @@ -128,13 +145,18 @@ func (m *Manager) SubscribeStaticLoopInSweepRequests(ctx context.Context,
// close the readyChan to signal that the manager is ready.
func (m *Manager) Run(ctx context.Context) error {
// Initially we want to immediately try to connect to the server.
waitTime := time.Duration(0)
var (
waitTime time.Duration
backoff time.Duration
attempts int
)

// Start the notification runloop.
for {
timer := time.NewTimer(waitTime)
// Increase the wait time for the next iteration.
waitTime += time.Second * 1
backoff = waitTime + time.Duration(attempts)*time.Second
waitTime = 0
timer := time.NewTimer(backoff)

// Return if the context has been canceled.
select {
Expand All @@ -145,37 +167,66 @@ func (m *Manager) Run(ctx context.Context) error {
}

// In order to create a valid l402 we first are going to call
// the FetchL402 method. As a client might not have outbound capacity
// yet, we'll retry until we get a valid response.
// the FetchL402 method. As a client might not have outbound
// capacity yet, we'll retry until we get a valid response.
if !m.hasL402 {
_, err := m.cfg.CurrentToken()
token, err := m.cfg.CurrentToken()
if err != nil {
// We only log the error if it's not the case that we
// don't have a token yet to avoid spamming the logs.
// We only log the error if it's not the case
// that we don't have a token yet to avoid
// spamming the logs.
if err != l402.ErrNoToken {
log.Errorf("Error getting L402 from store: %v", err)
log.Errorf("Error getting L402 from "+
"the store: %v", err)
}
continue
}
m.hasL402 = true
}

connectedFunc := func() {
// Reset the wait time to 10 seconds.
waitTime = time.Second * 10
// If the preimage is empty, we don't have a valid L402
// yet so we'll continue to retry with the incremental
// backoff.
emptyPreimage := lntypes.Preimage{}
if token.Preimage == emptyPreimage {
attempts++
continue
}

attempts = 0
m.hasL402 = true
}

err := m.subscribeNotifications(ctx, connectedFunc)
connectAttempted := time.Now()
err := m.subscribeNotifications(ctx)
if err != nil {
log.Errorf("Error subscribing to notifications: %v", err)
log.Errorf("Error subscribing to notifications: %v",
err)
}
connectionAliveTime := time.Since(connectAttempted)

// Note that we may be able to connet to the stream but not
// able to use it if the client is unable to pay for their
// L402. In this case the subscription will fail on the first
// read immediately after connecting. We'll therefore only
// consider the connection successful if we were able to use
// the stream for at least the minimum alive connection time
// (which defaults to 1 minute).
if connectionAliveTime > m.cfg.MinAliveConnTime {
// Reset the backoff to 10 seconds and the connect
// attempts to zero if we were really connected for a
// considerable amount of time (1 minute).
waitTime = time.Second * 10
attempts = 0
} else {
// We either failed to connect or the stream
// disconnected immediately, so we just increase the
// backoff.
attempts++
}
}
}

// subscribeNotifications subscribes to the notifications from the server.
func (m *Manager) subscribeNotifications(ctx context.Context,
connectedFunc func()) error {

func (m *Manager) subscribeNotifications(ctx context.Context) error {
callCtx, cancel := context.WithCancel(ctx)
defer cancel()

Expand All @@ -186,8 +237,6 @@ func (m *Manager) subscribeNotifications(ctx context.Context,
return err
}

// Signal that we're connected to the server.
connectedFunc()
log.Debugf("Successfully subscribed to server notifications")

for {
Expand Down
Loading

0 comments on commit 2b83f45

Please sign in to comment.