Skip to content

Commit

Permalink
temp
Browse files Browse the repository at this point in the history
  • Loading branch information
ellemouton committed Nov 21, 2024
1 parent a2cb894 commit a58f9ff
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 14 deletions.
8 changes: 5 additions & 3 deletions chanbackup/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package chanbackup

import (
"bytes"
"context"
"fmt"
"net"
"os"
Expand Down Expand Up @@ -81,7 +82,7 @@ type ChannelNotifier interface {
// synchronization point to ensure that the chanbackup.SubSwapper does
// not miss any channel open or close events in the period between when
// it's created, and when it requests the channel subscription.
SubscribeChans(map[wire.OutPoint]struct{}) (*ChannelSubscription, error)
SubscribeChans(context.Context, map[wire.OutPoint]struct{}) (*ChannelSubscription, error)
}

// SubSwapper subscribes to new updates to the open channel state, and then
Expand Down Expand Up @@ -119,7 +120,8 @@ type SubSwapper struct {
// set of channels, and the required interfaces to be notified of new channel
// updates, pack a multi backup, and swap the current best backup from its
// storage location.
func NewSubSwapper(startingChans []Single, chanNotifier ChannelNotifier,
func NewSubSwapper(ctx context.Context, startingChans []Single,
chanNotifier ChannelNotifier,
keyRing keychain.KeyRing, backupSwapper Swapper) (*SubSwapper, error) {

// First, we'll subscribe to the latest set of channel updates given
Expand All @@ -128,7 +130,7 @@ func NewSubSwapper(startingChans []Single, chanNotifier ChannelNotifier,
for _, chanBackup := range startingChans {
knownChans[chanBackup.FundingOutpoint] = struct{}{}
}
chanEvents, err := chanNotifier.SubscribeChans(knownChans)
chanEvents, err := chanNotifier.SubscribeChans(ctx, knownChans)
if err != nil {
return nil, err
}
Expand Down
8 changes: 6 additions & 2 deletions channel_notifier.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package lnd

import (
"context"
"fmt"
"net"

Expand Down Expand Up @@ -42,7 +43,8 @@ type channelNotifier struct {
// the channel subscription.
//
// NOTE: This is part of the chanbackup.ChannelNotifier interface.
func (c *channelNotifier) SubscribeChans(startingChans map[wire.OutPoint]struct{}) (
func (c *channelNotifier) SubscribeChans(ctx context.Context,
startingChans map[wire.OutPoint]struct{}) (
*chanbackup.ChannelSubscription, error) {

ltndLog.Infof("Channel backup proxy channel notifier starting")
Expand Down Expand Up @@ -86,7 +88,9 @@ func (c *channelNotifier) SubscribeChans(startingChans map[wire.OutPoint]struct{
go func() {
// First, we'll subscribe to the primary channel notifier so we can
// obtain events for new opened/closed channels.
chanSubscription, err := c.chanNotifier.SubscribeChannelEvents()
chanSubscription, err := c.chanNotifier.SubscribeChannelEvents(
ctx,
)
if err != nil {
panic(fmt.Sprintf("unable to subscribe to chans: %v",
err))
Expand Down
6 changes: 4 additions & 2 deletions channelnotifier/channelnotifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,10 @@ func (c *ChannelNotifier) Stop() error {
//
// TODO(carlaKC): update to allow subscriptions to specify a block height from
// which we would like to subscribe to events.
func (c *ChannelNotifier) SubscribeChannelEvents() (*subscribe.Client, error) {
return c.ntfnServer.Subscribe(c.ctx)
func (c *ChannelNotifier) SubscribeChannelEvents(ctx context.Context) (
*subscribe.Client, error) {

return c.ntfnServer.Subscribe(ctx)
}

// NotifyPendingOpenChannelEvent notifies the channelEventNotifier goroutine
Expand Down
1 change: 1 addition & 0 deletions lnd.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,7 @@ func Main(cfg *Config, lisCfg ListenerCfg, implCfg *ImplementationCfg,
}()

defer func() {
ltndLog.Infof("ELLE: calling cancel & Stop")
cancel()
err := server.Stop()
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions peer/brontide.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package peer
import (
"bytes"
"container/list"
"context"
"errors"
"fmt"
"math/rand"
Expand Down Expand Up @@ -1748,7 +1749,7 @@ func waitUntilLinkActive(p *Brontide,
// we will get an ActiveLinkEvent notification and retrieve the link. If
// the call to GetLink is before SubscribeChannelEvents, however, there
// will be a race condition.
sub, err := p.cfg.ChannelNotifier.SubscribeChannelEvents()
sub, err := p.cfg.ChannelNotifier.SubscribeChannelEvents(context.TODO())
if err != nil {
// If we have a non-nil error, then the server is shutting down and we
// can exit here and return nil. This means no message will be delivered
Expand Down Expand Up @@ -4185,7 +4186,7 @@ func (p *Brontide) attachChannelEventSubscription() error {
// we'll give it a second chance by subscribing to the channel update
// events. Upon receiving the `ActiveLinkEvent`, we'll then request
// enabling the channel again.
sub, err := p.cfg.ChannelNotifier.SubscribeChannelEvents()
sub, err := p.cfg.ChannelNotifier.SubscribeChannelEvents(context.TODO())
if err != nil {
return fmt.Errorf("SubscribeChannelEvents failed: %w", err)
}
Expand Down
1 change: 1 addition & 0 deletions rpcperms/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ func (r *InterceptorChain) Start(ctx context.Context) error {
r.started.Do(func() {
ctx, cancel := context.WithCancel(ctx)
r.cancel = fn.Some(cancel)
r.ctx = ctx

err = r.ntfnServer.Start(ctx)
})
Expand Down
8 changes: 6 additions & 2 deletions rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5132,7 +5132,9 @@ func (r *rpcServer) getInitiators(chanPoint *wire.OutPoint) (
func (r *rpcServer) SubscribeChannelEvents(req *lnrpc.ChannelEventSubscription,
updateStream lnrpc.Lightning_SubscribeChannelEventsServer) error {

channelEventSub, err := r.server.channelNotifier.SubscribeChannelEvents()
channelEventSub, err := r.server.channelNotifier.SubscribeChannelEvents(
updateStream.Context(),
)
if err != nil {
return err
}
Expand Down Expand Up @@ -8238,7 +8240,9 @@ func (r *rpcServer) SubscribeChannelBackups(req *lnrpc.ChannelBackupSubscription

// First, we'll subscribe to the primary channel notifier so we can
// obtain events for new pending/opened/closed channels.
chanSubscription, err := r.server.channelNotifier.SubscribeChannelEvents()
chanSubscription, err := r.server.channelNotifier.SubscribeChannelEvents(
updateStream.Context(),
)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1615,7 +1615,7 @@ func newServer(ctx context.Context, cfg *Config,
return nil, err
}
s.chanSubSwapper, err = chanbackup.NewSubSwapper(
startingChans, chanNotifier, s.cc.KeyRing, backupFile,
ctx, startingChans, chanNotifier, s.cc.KeyRing, backupFile,
)
if err != nil {
return nil, err
Expand All @@ -1628,7 +1628,7 @@ func newServer(ctx context.Context, cfg *Config,
// Create a channel event store which monitors all open channels.
s.chanEventStore = chanfitness.NewChannelEventStore(&chanfitness.Config{
SubscribeChannelEvents: func() (subscribe.Subscription, error) {
return s.channelNotifier.SubscribeChannelEvents()
return s.channelNotifier.SubscribeChannelEvents(ctx)
},
SubscribePeerEvents: func() (subscribe.Subscription, error) {
return s.peerNotifier.SubscribePeerEvents()
Expand Down Expand Up @@ -1716,7 +1716,7 @@ func newServer(ctx context.Context, cfg *Config,
error) {

return s.channelNotifier.
SubscribeChannelEvents()
SubscribeChannelEvents(context.TODO())
},
Signer: cc.Wallet.Cfg.Signer,
NewAddress: func() ([]byte, error) {
Expand Down

0 comments on commit a58f9ff

Please sign in to comment.