From a2cf39de566c4112b42d1caa03bf9baa3f1b1d50 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Thu, 7 Nov 2024 10:41:27 +0200 Subject: [PATCH] separate binary --- Makefile | 2 + cmd/multinode3/graph_mux.go | 513 +++++++++++++++++++++++++++++++ cmd/multinode3/main.go | 130 ++++++++ cmd/multinode3/wrapper.go | 591 ++++++++++++++++++++++++++++++++++++ 4 files changed, 1236 insertions(+) create mode 100644 cmd/multinode3/graph_mux.go create mode 100644 cmd/multinode3/main.go create mode 100644 cmd/multinode3/wrapper.go diff --git a/Makefile b/Makefile index c794fac7d0..e97ba8c043 100644 --- a/Makefile +++ b/Makefile @@ -114,6 +114,7 @@ build: $(GOBUILD) -tags="$(DEV_TAGS)" -o lncli-debug $(DEV_GCFLAGS) $(DEV_LDFLAGS) $(PKG)/cmd/lncli $(GOBUILD) -tags="$(DEV_TAGS)" -o multinode-debug $(DEV_GCFLAGS) $(DEV_LDFLAGS) $(PKG)/cmd/multinode $(GOBUILD) -tags="$(DEV_TAGS)" -o multinode2-debug $(DEV_GCFLAGS) $(DEV_LDFLAGS) $(PKG)/cmd/multinode2 + $(GOBUILD) -tags="$(DEV_TAGS)" -o multinode3-debug $(DEV_GCFLAGS) $(DEV_LDFLAGS) $(PKG)/cmd/multinode3 #? build-itest: Build integration test binaries, place them in itest directory build-itest: @@ -140,6 +141,7 @@ install-binaries: $(GOINSTALL) -tags="${tags}" -ldflags="$(RELEASE_LDFLAGS)" $(PKG)/cmd/lncli $(GOINSTALL) -tags="${tags}" -ldflags="$(RELEASE_LDFLAGS)" $(PKG)/cmd/multinode $(GOINSTALL) -tags="${tags}" -ldflags="$(RELEASE_LDFLAGS)" $(PKG)/cmd/multinode2 + $(GOINSTALL) -tags="${tags}" -ldflags="$(RELEASE_LDFLAGS)" $(PKG)/cmd/multinode3 #? manpages: generate and install man pages manpages: diff --git a/cmd/multinode3/graph_mux.go b/cmd/multinode3/graph_mux.go new file mode 100644 index 0000000000..8de36fa788 --- /dev/null +++ b/cmd/multinode3/graph_mux.go @@ -0,0 +1,513 @@ +package main + +import ( + "bytes" + "context" + "fmt" + "net" + "sync" + "time" + + "github.com/btcsuite/btcd/btcec/v2" + "github.com/btcsuite/btcd/wire" + "github.com/go-errors/errors" + "github.com/lightningnetwork/lnd" + "github.com/lightningnetwork/lnd/autopilot" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/discovery" + graphdb "github.com/lightningnetwork/lnd/graph/db" + "github.com/lightningnetwork/lnd/graph/db/models" + "github.com/lightningnetwork/lnd/graph/stats" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/routing/route" +) + +type GraphSourceMux struct { + remote lnd.GraphSource + local *graphdb.ChannelGraph + + // srcPub is a cached version of the local nodes own pub key bytes. + srcPub *route.Vertex + mu sync.Mutex +} + +func (g *GraphSourceMux) NetworkStats(ctx context.Context, excludeNodes map[route.Vertex]struct{}, excludeChannels map[uint64]struct{}) (*models.NetworkStats, error) { + // TODO(elle): need to call local first & build exclude lists to send to + // remote. + return g.remote.NetworkStats(ctx, excludeNodes, excludeChannels) +} + +func (g *GraphSourceMux) GraphBootstrapper(ctx context.Context) (discovery.NetworkPeerBootstrapper, error) { + return g.remote.GraphBootstrapper(ctx) +} + +func (g *GraphSourceMux) BetweenessCentrality(ctx context.Context) (map[autopilot.NodeID]*stats.BetweenessCentrality, error) { + return g.remote.BetweenessCentrality(ctx) +} + +// A compile-time check to ensure that GraphSourceMux implements GraphSource. +var _ lnd.GraphSource = (*GraphSourceMux)(nil) + +func NewGraphBackend(local *graphdb.ChannelGraph, + remote lnd.GraphSource) *GraphSourceMux { + + return &GraphSourceMux{ + local: local, + remote: remote, + } +} + +// NewReadTx returns a new read transaction that can be used other read calls to +// the backing graph. +// +// NOTE: this is part of the graphsession.ReadOnlyGraph interface. +func (g *GraphSourceMux) NewReadTx(ctx context.Context) (graphdb.RTx, error) { + return newRTxSet(ctx, g.remote, g.local) +} + +// ForEachNodeDirectedChannel iterates through all channels of a given +// node, executing the passed callback on the directed edge representing +// the channel and its incoming policy. +// +// If the node in question is the local node, then only the local node is +// queried since it will know all channels that it owns. +// +// Otherwise, we still query the local node in case the node in question is a +// peer with whom the local node has a private channel to. In that case we want +// to make sure to run the call-back on these directed channels since the remote +// node may not know of this channel. Finally, we call the remote node but skip +// any channels we have already handled. +// +// NOTE: this is part of the GraphSource interface. +func (g *GraphSourceMux) ForEachNodeDirectedChannel(ctx context.Context, + tx graphdb.RTx, node route.Vertex, + cb func(channel *graphdb.DirectedChannel) error) error { + + srcPub, err := g.selfNodePub() + if err != nil { + return err + } + + lTx, rTx, err := extractRTxSet(tx) + if err != nil { + return err + } + + // If we are the source node, we know all our channels, so just use + // local DB. + if bytes.Equal(srcPub[:], node[:]) { + return g.local.ForEachNodeDirectedChannel(ctx, lTx, node, cb) + } + + // Call our local DB to collect any private channels we have. + handledPeerChans := make(map[uint64]bool) + err = g.local.ForEachNodeDirectedChannel(ctx, lTx, node, + func(channel *graphdb.DirectedChannel) error { + + // If the other node is not us, we don't need to handle + // it here since the remote node will handle it later. + if !bytes.Equal(channel.OtherNode[:], srcPub[:]) { + return nil + } + + // Else, we call the call back ourselves on this + // channel and mark that we have handled it. + handledPeerChans[channel.ChannelID] = true + + return cb(channel) + }) + if err != nil { + return err + } + + return g.remote.ForEachNodeDirectedChannel(ctx, rTx, node, + func(channel *graphdb.DirectedChannel) error { + + // Skip any we have already handled. + if handledPeerChans[channel.ChannelID] { + return nil + } + + return cb(channel) + }, + ) +} + +// FetchNodeFeatures returns the features of a given node. If no features are +// known for the node, an empty feature vector is returned. +// +// NOTE: this is part of the GraphSource interface. +func (g *GraphSourceMux) FetchNodeFeatures(ctx context.Context, tx graphdb.RTx, + node route.Vertex) (*lnwire.FeatureVector, error) { + + // Query the local DB first. If a non-empty set of features is returned, + // we use these. Otherwise, the remote DB is checked. + feats, err := g.local.FetchNodeFeatures(ctx, tx, node) + if err != nil { + return nil, err + } + + if !feats.IsEmpty() { + return feats, nil + } + + return g.remote.FetchNodeFeatures(ctx, tx, node) +} + +// ForEachNode iterates through all the stored vertices/nodes in the graph, +// executing the passed callback with each node encountered. If the callback +// returns an error, then the transaction is aborted and the iteration stops +// early. +// +// NOTE: this is part of the GraphSource interface. +func (g *GraphSourceMux) ForEachNode(ctx context.Context, tx graphdb.RTx, + cb func(graphdb.RTx, *models.LightningNode) error) error { + + source, err := g.local.SourceNode() + if err != nil { + return err + } + + err = cb(tx, source) + if err != nil { + return err + } + + _, rTx, err := extractRTxSet(tx) + if err != nil { + return err + } + + return g.remote.ForEachNode(ctx, rTx, + func(tx graphdb.RTx, node *models.LightningNode) error { + + if bytes.Equal( + node.PubKeyBytes[:], source.PubKeyBytes[:], + ) { + return nil + } + + return cb(tx, node) + }, + ) +} + +// FetchLightningNode attempts to look up a target node by its identity public +// key. If the node isn't found in the database, then ErrGraphNodeNotFound is +// returned. An optional transaction may be provided. If none is provided, then +// a new one will be created. +// +// NOTE: this is part of the GraphSource interface. +func (g *GraphSourceMux) FetchLightningNode(ctx context.Context, tx graphdb.RTx, + nodePub route.Vertex) (*models.LightningNode, error) { + + srcPub, err := g.selfNodePub() + if err != nil { + return nil, err + } + + lTx, rTx, err := extractRTxSet(tx) + if err != nil { + return nil, err + } + + if bytes.Equal(srcPub[:], nodePub[:]) { + return g.local.FetchLightningNode(ctx, lTx, nodePub) + } + + return g.remote.FetchLightningNode(ctx, rTx, nodePub) +} + +// ForEachNodeChannel iterates through all channels of the given node, +// executing the passed callback with an edge info structure and the policies +// of each end of the channel. The first edge policy is the outgoing edge *to* +// the connecting node, while the second is the incoming edge *from* the +// connecting node. If the callback returns an error, then the iteration is +// halted with the error propagated back up to the caller. +// +// Unknown policies are passed into the callback as nil values. +// +// If the caller wishes to re-use an existing boltdb transaction, then it +// should be passed as the first argument. Otherwise, the first argument should +// be nil and a fresh transaction will be created to execute the graph +// traversal. +// +// NOTE: this is part of the GraphSource interface. +func (g *GraphSourceMux) ForEachNodeChannel(ctx context.Context, tx graphdb.RTx, + nodePub route.Vertex, cb func(graphdb.RTx, *models.ChannelEdgeInfo, + *models.ChannelEdgePolicy, + *models.ChannelEdgePolicy) error) error { + + lTx, rTx, err := extractRTxSet(tx) + if err != nil { + return err + } + + // First query our own db since we may have chan info that our remote + // does not know of (regarding our selves or our channel peers). + var found bool + err = g.local.ForEachNodeChannel(ctx, lTx, nodePub, func(tx graphdb.RTx, + info *models.ChannelEdgeInfo, policy *models.ChannelEdgePolicy, + policy2 *models.ChannelEdgePolicy) error { + + found = true + + return cb(tx, info, policy, policy2) + }) + // Only return the error if it was found. + if err != nil && found { + return err + } + + if found { + return nil + } + + return g.remote.ForEachNodeChannel(ctx, rTx, nodePub, cb) +} + +// FetchChannelEdgesByID attempts to look up the two directed edges for the +// channel identified by the channel ID. If the channel can't be found, then +// graphdb.ErrEdgeNotFound is returned. +// +// NOTE: this is part of the GraphSource interface. +func (g *GraphSourceMux) FetchChannelEdgesByID(ctx context.Context, chanID uint64) ( + *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, + *models.ChannelEdgePolicy, error) { + + info, p1, p2, err := g.local.FetchChannelEdgesByID(ctx, chanID) + if err == nil { + return info, p1, p2, nil + } + + return g.remote.FetchChannelEdgesByID(ctx, chanID) +} + +// IsPublicNode is a helper method that determines whether the node with the +// given public key is seen as a public node in the graph from the graph's +// source node's point of view. This first checks the local node and then the +// remote if the node is not seen as public by the loca node. +// +// NOTE: this is part of the GraphSource interface. +func (g *GraphSourceMux) IsPublicNode(ctx context.Context, pubKey [33]byte) (bool, error) { + isPublic, err := g.local.IsPublicNode(ctx, pubKey) + if err != nil && !errors.Is(err, graphdb.ErrGraphNodeNotFound) { + return false, err + } + if isPublic { + return true, nil + } + + return g.remote.IsPublicNode(ctx, pubKey) +} + +// FetchChannelEdgesByOutpoint returns the channel edge info and most recent +// channel edge policies for a given outpoint. +// +// NOTE: this is part of the GraphSource interface. +func (g *GraphSourceMux) FetchChannelEdgesByOutpoint(ctx context.Context, point *wire.OutPoint) ( + *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, + *models.ChannelEdgePolicy, error) { + + edge, p1, p2, err := g.local.FetchChannelEdgesByOutpoint(ctx, point) + if err == nil { + return edge, p1, p2, nil + } + + return g.remote.FetchChannelEdgesByOutpoint(ctx, point) +} + +// AddrsForNode returns all known addresses for the target node public key. The +// returned boolean must indicate if the given node is unknown to the backing +// source. This merges the results from both the local and remote source. +// +// NOTE: this is part of the GraphSource interface. +func (g *GraphSourceMux) AddrsForNode(ctx context.Context, nodePub *btcec.PublicKey) (bool, + []net.Addr, error) { + + // Check both the local and remote sources and merge the results. + return channeldb.NewMultiAddrSource( + g.local, g.remote, + ).AddrsForNode(ctx, nodePub) +} + +// ForEachChannel iterates through all the channel edges stored within the graph +// and invokes the passed callback for each edge. If the callback returns an +// error, then the transaction is aborted and the iteration stops early. An +// edge's policy structs may be nil if the ChannelUpdate in question has not yet +// been received for the channel. +// +// NOTE: this is part of the GraphSource interface. +func (g *GraphSourceMux) ForEachChannel(ctx context.Context, + cb func(*models.ChannelEdgeInfo, + *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error) error { + + srcPub, err := g.selfNodePub() + if err != nil { + return err + } + + ourChans := make(map[uint64]bool) + err = g.local.ForEachNodeChannel(context.TODO(), nil, srcPub, func(_ graphdb.RTx, + info *models.ChannelEdgeInfo, policy *models.ChannelEdgePolicy, + policy2 *models.ChannelEdgePolicy) error { + + ourChans[info.ChannelID] = true + + return cb(info, policy, policy2) + }) + if err != nil { + return err + } + + return g.remote.ForEachChannel(ctx, func(info *models.ChannelEdgeInfo, + policy *models.ChannelEdgePolicy, + policy2 *models.ChannelEdgePolicy) error { + + if ourChans[info.ChannelID] { + return nil + } + + return cb(info, policy, policy2) + }) +} + +// HasLightningNode determines if the graph has a vertex identified by the +// target node identity public key. If the node exists in the database, a +// timestamp of when the data for the node was lasted updated is returned along +// with a true boolean. Otherwise, an empty time.Time is returned with a false +// boolean. +// +// NOTE: this is part of the GraphSource interface. +func (g *GraphSourceMux) HasLightningNode(ctx context.Context, nodePub [33]byte) (time.Time, bool, error) { + timeStamp, localHas, err := g.local.HasLightningNode(ctx, nodePub) + if err != nil { + return timeStamp, false, err + } + if localHas { + return timeStamp, true, nil + } + + return g.remote.HasLightningNode(ctx, nodePub) +} + +// LookupAlias attempts to return the alias as advertised by the target node. +// graphdb.ErrNodeAliasNotFound is returned if the alias is not found. +// +// NOTE: this is part of the GraphSource interface. +func (g *GraphSourceMux) LookupAlias(ctx context.Context, pub *btcec.PublicKey) (string, error) { + // First check locally. + alias, err := g.local.LookupAlias(ctx, pub) + if err == nil { + return alias, nil + } + if !errors.Is(err, graphdb.ErrNodeAliasNotFound) { + return "", err + } + + return g.remote.LookupAlias(ctx, pub) +} + +// selfNodePub fetches the local nodes pub key. It first checks the cached value +// and if non exists, it queries the database. +func (g *GraphSourceMux) selfNodePub() (route.Vertex, error) { + g.mu.Lock() + defer g.mu.Unlock() + + if g.srcPub != nil { + return *g.srcPub, nil + } + + source, err := g.local.SourceNode() + if err != nil { + return route.Vertex{}, err + } + + pub, err := route.NewVertexFromBytes(source.PubKeyBytes[:]) + if err != nil { + return route.Vertex{}, err + } + + g.srcPub = &pub + + return *g.srcPub, nil +} + +type rTxConstructor interface { + NewReadTx(ctx context.Context) (graphdb.RTx, error) +} + +// rTxSet is an implementation of graphdb.RTx which is backed a read transaction +// for the local graph and one for a remote graph. +type rTxSet struct { + lRTx graphdb.RTx + rRTx graphdb.RTx +} + +// newMultiRTx uses the given rTxConstructors to begin a read transaction for +// each and returns a multiRTx that represents this open set of transactions. +func newRTxSet(ctx context.Context, localConstructor, remoteConstructor rTxConstructor) (*rTxSet, + error) { + + localRTx, err := localConstructor.NewReadTx(ctx) + if err != nil { + return nil, err + } + + remoteRTx, err := remoteConstructor.NewReadTx(ctx) + if err != nil { + _ = localRTx.Close() + + return nil, err + } + + return &rTxSet{ + lRTx: localRTx, + rRTx: remoteRTx, + }, nil +} + +// Close closes all the transactions held by multiRTx. +// +// NOTE: this is part of the graphdb.RTx interface. +func (s *rTxSet) Close() error { + var returnErr error + + if s.lRTx != nil { + if err := s.lRTx.Close(); err != nil { + returnErr = err + } + } + + if s.rRTx != nil { + if err := s.rRTx.Close(); err != nil { + returnErr = err + } + } + + return returnErr +} + +// MustImplementRTx is a helper method that ensures that the rTxSet type +// implements the RTx interface. +// +// NOTE: this is part of the graphdb.RTx interface. +func (s *rTxSet) MustImplementRTx() {} + +// A compile-time check to ensure that multiRTx implements graphdb.RTx. +var _ graphdb.RTx = (*rTxSet)(nil) + +// extractRTxSet is a helper function that casts an RTx into a rTxSet returns +// the local and remote RTxs respectively. +func extractRTxSet(tx graphdb.RTx) (graphdb.RTx, graphdb.RTx, error) { + if tx == nil { + return nil, nil, nil + } + + set, ok := tx.(*rTxSet) + if !ok { + return nil, nil, fmt.Errorf("expected a rTxSet, got %T", tx) + } + + return set.lRTx, set.rRTx, nil +} diff --git a/cmd/multinode3/main.go b/cmd/multinode3/main.go new file mode 100644 index 0000000000..47de9fafba --- /dev/null +++ b/cmd/multinode3/main.go @@ -0,0 +1,130 @@ +package main + +import ( + "context" + "crypto/x509" + "fmt" + "os" + "time" + + "github.com/jessevdk/go-flags" + "github.com/lightningnetwork/lnd" + "github.com/lightningnetwork/lnd/lnrpc" + "github.com/lightningnetwork/lnd/lnrpc/graphrpc" + "github.com/lightningnetwork/lnd/macaroons" + "github.com/lightningnetwork/lnd/signal" + "github.com/lightningnetwork/lnd/tor" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "gopkg.in/macaroon.v2" +) + +func main() { + // Hook interceptor for os signals. + shutdownInterceptor, err := signal.Intercept() + if err != nil { + _, _ = fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + + conn, err := connectToGraphNode() + if err != nil { + _, _ = fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + + provider := &graphProvider{ + remoteGraphConn: graphrpc.NewGraphClient(conn), + remoteLNConn: lnrpc.NewLightningClient(conn), + } + graphrpc.NewGraphClient(conn) + + setupDependentNode(shutdownInterceptor, provider) + <-shutdownInterceptor.ShutdownChannel() +} + +func connectToGraphNode() (*grpc.ClientConn, error) { + macPath := "/Users/elle/.lnd-dev-sam/data/chain/bitcoin/regtest/admin.macaroon" + tlsCertPath := "/Users/elle/.lnd-dev-sam/tls.cert" + + tlsCert, err := os.ReadFile(tlsCertPath) + if err != nil { + return nil, fmt.Errorf("could not load TLS cert file: %v", err) + } + + cp := x509.NewCertPool() + if !cp.AppendCertsFromPEM(tlsCert) { + return nil, fmt.Errorf("credentials: failed to append certificate") + } + creds := credentials.NewClientTLSFromCert(cp, "") + + macBytes, err := os.ReadFile(macPath) + if err != nil { + return nil, fmt.Errorf("unable to read macaroon path (check "+ + "the network setting!): %v", err) + } + mac := &macaroon.Macaroon{} + if err = mac.UnmarshalBinary(macBytes); err != nil { + return nil, fmt.Errorf("unable to decode macaroon: %w", err) + } + // Now we append the macaroon credentials to the dial options. + cred, err := macaroons.NewMacaroonCredential(mac) + if err != nil { + return nil, err + } + + opts := []grpc.DialOption{ + grpc.WithTransportCredentials(creds), + grpc.WithPerRPCCredentials(cred), + } + + return grpc.Dial("localhost:10020", opts...) +} + +func setupDependentNode(interceptor signal.Interceptor, + graphProvider *graphProvider) { + + // Load the configuration, and parse any command line options. This + // function will also set up logging properly. + loadedConfig, err := lnd.LoadConfig(interceptor) + if err != nil { + if e, ok := err.(*flags.Error); !ok || e.Type != flags.ErrHelp { + // Print error if not due to help request. + err = fmt.Errorf("failed to load config: %w", err) + _, _ = fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + + // Help was requested, exit normally. + os.Exit(0) + } + loadedConfig.Caches.RPCGraphCacheDuration = time.Second * 30 + implCfg := loadedConfig.ImplementationConfig(interceptor) + implCfg.GraphProvider = graphProvider + + // Call the "real" main in a nested manner so the defers will properly + // be executed in the case of a graceful shutdown. + + if err = lnd.Main( + loadedConfig, lnd.ListenerCfg{}, implCfg, interceptor, nil, + ); err != nil { + _, _ = fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } +} + +type graphProvider struct { + remoteGraphConn graphrpc.GraphClient + remoteLNConn lnrpc.LightningClient +} + +func (g *graphProvider) Graph(_ context.Context, dbs *lnd.DatabaseInstances) ( + lnd.GraphSource, error) { + + return NewGraphBackend(dbs.GraphDB, &remoteWrapper{ + graphConn: g.remoteGraphConn, + lnConn: g.remoteLNConn, + local: dbs.GraphDB, + net: &tor.ClearNet{}, + }), nil +} diff --git a/cmd/multinode3/wrapper.go b/cmd/multinode3/wrapper.go new file mode 100644 index 0000000000..d8b370eff9 --- /dev/null +++ b/cmd/multinode3/wrapper.go @@ -0,0 +1,591 @@ +package main + +import ( + "bytes" + "context" + "encoding/hex" + "image/color" + "net" + "time" + + "github.com/btcsuite/btcd/btcec/v2" + "github.com/btcsuite/btcd/btcutil" + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd" + "github.com/lightningnetwork/lnd/autopilot" + "github.com/lightningnetwork/lnd/discovery" + graphdb "github.com/lightningnetwork/lnd/graph/db" + "github.com/lightningnetwork/lnd/graph/db/models" + "github.com/lightningnetwork/lnd/graph/stats" + "github.com/lightningnetwork/lnd/lnrpc" + "github.com/lightningnetwork/lnd/lnrpc/graphrpc" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/routing/route" + "github.com/lightningnetwork/lnd/tor" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type remoteWrapper struct { + graphConn graphrpc.GraphClient + lnConn lnrpc.LightningClient + + // export from LND Server struct. + net tor.Net + + local *graphdb.ChannelGraph +} + +func (r *remoteWrapper) BetweenessCentrality(ctx context.Context) (map[autopilot.NodeID]*stats.BetweenessCentrality, error) { + resp, err := r.graphConn.BetweennessCentrality(ctx, &graphrpc.BetweennessCentralityReq{}) + if err != nil { + return nil, err + } + + centrality := make(map[autopilot.NodeID]*stats.BetweenessCentrality) + for _, node := range resp.NodeBetweeness { + var id autopilot.NodeID + copy(id[:], node.Node) + centrality[id] = &stats.BetweenessCentrality{ + Normalized: float64(node.Normalized), + NonNormalized: float64(node.NonNormalized), + } + } + + return centrality, nil +} + +func (r *remoteWrapper) GraphBootstrapper(ctx context.Context) (discovery.NetworkPeerBootstrapper, error) { + resp, err := r.graphConn.BoostrapperName(ctx, &graphrpc.BoostrapperNameReq{}) + if err != nil { + return nil, err + } + + return &bootstrapper{ + name: resp.Name, + remoteWrapper: r, + }, nil +} + +type bootstrapper struct { + name string + *remoteWrapper +} + +func (r *bootstrapper) SampleNodeAddrs(numAddrs uint32, + ignore map[autopilot.NodeID]struct{}) ([]*lnwire.NetAddress, error) { + + var toIgnore [][]byte + for id := range ignore { + toIgnore = append(toIgnore, id[:]) + } + + resp, err := r.graphConn.BootstrapAddrs( + context.Background(), &graphrpc.BootstrapAddrsReq{ + NumAddrs: numAddrs, + IgnoreNodes: toIgnore, + }, + ) + if err != nil { + return nil, err + } + + netAddrs := make([]*lnwire.NetAddress, 0, len(resp.Addrs)) + for _, addr := range resp.Addrs { + netAddr, err := r.net.ResolveTCPAddr(addr.Address.Network, addr.Address.Addr) + if err != nil { + return nil, err + } + idKey, err := btcec.ParsePubKey(addr.NodeId) + if err != nil { + return nil, err + } + netAddrs = append(netAddrs, &lnwire.NetAddress{ + IdentityKey: idKey, + Address: netAddr, + }) + } + + return netAddrs, nil + +} + +func (r *bootstrapper) Name() string { + return r.name +} + +func (r *remoteWrapper) NetworkStats(ctx context.Context, excludeNodes map[route.Vertex]struct{}, excludeChannels map[uint64]struct{}) (*models.NetworkStats, error) { + var ( + exNodes [][]byte + exChans []uint64 + ) + + for node := range excludeNodes { + exNodes = append(exNodes, node[:]) + } + + for chanID := range excludeChannels { + exChans = append(exChans, chanID) + } + + info, err := r.lnConn.GetNetworkInfo(ctx, &lnrpc.NetworkInfoRequest{ + ExcludeNodes: exNodes, + ExcludeChans: exChans, + }) + if err != nil { + return nil, err + } + + return &models.NetworkStats{ + Diameter: info.GraphDiameter, + MaxChanOut: info.MaxOutDegree, + NumNodes: info.NumNodes, + NumChannels: info.NumChannels, + TotalNetworkCapacity: btcutil.Amount(info.TotalNetworkCapacity), + MinChanSize: btcutil.Amount(info.MinChannelSize), + MaxChanSize: btcutil.Amount(info.MaxChannelSize), + MedianChanSize: btcutil.Amount(info.MedianChannelSizeSat), + NumZombies: info.NumZombieChans, + }, nil +} + +// Pathfinding. +func (r *remoteWrapper) NewReadTx(ctx context.Context) (graphdb.RTx, error) { + // TODO(elle): ?? + return graphdb.NewKVDBRTx(nil), nil +} + +// Pathfinding. +func (r *remoteWrapper) ForEachNodeDirectedChannel(ctx context.Context, + _ graphdb.RTx, node route.Vertex, + cb func(channel *graphdb.DirectedChannel) error) error { + + info, err := r.lnConn.GetNodeInfo(ctx, &lnrpc.NodeInfoRequest{ + PubKey: hex.EncodeToString(node[:]), + IncludeChannels: true, + }) + if err != nil { + return err + } + + toNodeCallback := func() route.Vertex { + return node + } + toNodeFeatures := unmarshalFeatures(info.Node.Features) + + for _, channel := range info.Channels { + e, p1, p2, err := unmarshalChannelInfo(channel) + if err != nil { + return err + } + + var cachedInPolicy *models.CachedEdgePolicy + if p2 != nil { + cachedInPolicy = models.NewCachedPolicy(p2) + cachedInPolicy.ToNodePubKey = toNodeCallback + cachedInPolicy.ToNodeFeatures = toNodeFeatures + } + + var inboundFee lnwire.Fee + if p1 != nil { + // Extract inbound fee. If there is a decoding error, + // skip this edge. + _, err := p1.ExtraOpaqueData.ExtractRecords(&inboundFee) + if err != nil { + return nil + } + } + + directedChannel := &graphdb.DirectedChannel{ + ChannelID: e.ChannelID, + IsNode1: node == e.NodeKey1Bytes, + OtherNode: e.NodeKey2Bytes, + Capacity: e.Capacity, + OutPolicySet: p1 != nil, + InPolicy: cachedInPolicy, + InboundFee: inboundFee, + } + + if node == e.NodeKey2Bytes { + directedChannel.OtherNode = e.NodeKey1Bytes + } + + if err := cb(directedChannel); err != nil { + return err + } + } + + return nil +} + +// DescribeGraph. NB: use --caches.rpc-graph-cache-duration +func (r *remoteWrapper) ForEachNode(ctx context.Context, tx graphdb.RTx, + cb func(graphdb.RTx, *models.LightningNode) error) error { + + graph, err := r.lnConn.DescribeGraph(ctx, &lnrpc.ChannelGraphRequest{ + IncludeUnannounced: true, + }) + if err != nil { + return err + } + + selfNode, err := r.local.SourceNode() + if err != nil { + return err + } + + for _, node := range graph.Nodes { + pubKey, err := hex.DecodeString(node.PubKey) + if err != nil { + return err + } + var pubKeyBytes [33]byte + copy(pubKeyBytes[:], pubKey) + + extra, err := lnwire.CustomRecords(node.CustomRecords).Serialize() + if err != nil { + return err + } + + addrs, err := r.unmarshalAddrs(node.Addresses) + if err != nil { + return err + } + + var haveNodeAnnouncement bool + if bytes.Equal(selfNode.PubKeyBytes[:], pubKeyBytes[:]) { + haveNodeAnnouncement = true + } else { + haveNodeAnnouncement = len(addrs) > 0 || + node.Alias != "" || len(extra) > 0 || + len(node.Features) > 0 + } + + n := &models.LightningNode{ + PubKeyBytes: pubKeyBytes, + HaveNodeAnnouncement: haveNodeAnnouncement, + LastUpdate: time.Unix(int64(node.LastUpdate), 0), + Addresses: addrs, + Color: color.RGBA{}, + Alias: node.Alias, + Features: unmarshalFeatures(node.Features), + ExtraOpaqueData: extra, + } + + err = cb(tx, n) + if err != nil { + return err + } + } + + return nil +} + +// DescribeGraph. NB: use --caches.rpc-graph-cache-duration +func (r *remoteWrapper) ForEachChannel(ctx context.Context, + cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error) error { + + graph, err := r.lnConn.DescribeGraph(ctx, &lnrpc.ChannelGraphRequest{ + IncludeUnannounced: true, + }) + if err != nil { + return err + } + + for _, edge := range graph.Edges { + edgeInfo, policy1, policy2, err := unmarshalChannelInfo(edge) + if err != nil { + return err + } + + // To ensure that Describe graph doesnt filter it out. + edgeInfo.AuthProof = &models.ChannelAuthProof{} + + if err := cb(edgeInfo, policy1, policy2); err != nil { + return err + } + } + + return nil +} + +// GetNodeInfo. +func (r *remoteWrapper) ForEachNodeChannel(ctx context.Context, tx graphdb.RTx, + nodePub route.Vertex, cb func(graphdb.RTx, *models.ChannelEdgeInfo, + *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error) error { + + info, err := r.lnConn.GetNodeInfo(ctx, &lnrpc.NodeInfoRequest{ + PubKey: hex.EncodeToString(nodePub[:]), + IncludeChannels: true, + }) + if err != nil { + return err + } + + for _, channel := range info.Channels { + edge, policy1, policy2, err := unmarshalChannelInfo(channel) + if err != nil { + return err + } + + if err := cb(tx, edge, policy1, policy2); err != nil { + return err + } + } + + return nil +} + +func (r *remoteWrapper) FetchNodeFeatures(ctx context.Context, tx graphdb.RTx, node route.Vertex) (*lnwire.FeatureVector, error) { + resp, err := r.lnConn.GetNodeInfo(ctx, &lnrpc.NodeInfoRequest{ + PubKey: hex.EncodeToString(node[:]), + IncludeChannels: false, + }) + if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound { + return lnwire.EmptyFeatureVector(), nil + } else if err != nil { + return nil, err + } + + return unmarshalFeatures(resp.Node.Features), nil +} + +func (r *remoteWrapper) FetchLightningNode(ctx context.Context, tx graphdb.RTx, + nodePub route.Vertex) (*models.LightningNode, error) { + + resp, err := r.lnConn.GetNodeInfo(ctx, &lnrpc.NodeInfoRequest{ + PubKey: hex.EncodeToString(nodePub[:]), + IncludeChannels: false, + }) + if err != nil { + return nil, err + } + node := resp.Node + + pubKey, err := hex.DecodeString(node.PubKey) + if err != nil { + return nil, err + } + var pubKeyBytes [33]byte + copy(pubKeyBytes[:], pubKey) + + extra, err := lnwire.CustomRecords(node.CustomRecords).Serialize() + if err != nil { + return nil, err + } + + addrs, err := r.unmarshalAddrs(resp.Node.Addresses) + if err != nil { + return nil, err + } + + return &models.LightningNode{ + PubKeyBytes: pubKeyBytes, + HaveNodeAnnouncement: resp.IsPublic, + LastUpdate: time.Unix(int64(node.LastUpdate), 0), + Addresses: addrs, + Color: color.RGBA{}, + Alias: node.Alias, + Features: unmarshalFeatures(node.Features), + ExtraOpaqueData: extra, + }, nil +} + +func unmarshalFeatures(features map[uint32]*lnrpc.Feature) *lnwire.FeatureVector { + featureBits := make([]lnwire.FeatureBit, 0, len(features)) + featureNames := make(map[lnwire.FeatureBit]string) + for featureBit, feature := range features { + featureBits = append( + featureBits, lnwire.FeatureBit(featureBit), + ) + + featureNames[lnwire.FeatureBit(featureBit)] = feature.Name + } + + return lnwire.NewFeatureVector( + lnwire.NewRawFeatureVector(featureBits...), featureNames, + ) +} + +func (r *remoteWrapper) FetchChannelEdgesByOutpoint(ctx context.Context, point *wire.OutPoint) (*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, *models.ChannelEdgePolicy, error) { + info, err := r.lnConn.GetChanInfo(ctx, &lnrpc.ChanInfoRequest{ + ChanPoint: point.String(), + }) + if err != nil { + return nil, nil, nil, err + } + + return unmarshalChannelInfo(info) +} + +func (r *remoteWrapper) FetchChannelEdgesByID(ctx context.Context, chanID uint64) (*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, *models.ChannelEdgePolicy, error) { + info, err := r.lnConn.GetChanInfo(ctx, &lnrpc.ChanInfoRequest{ + ChanId: chanID, + }) + if err != nil { + return nil, nil, nil, err + } + + return unmarshalChannelInfo(info) +} + +func unmarshalChannelInfo(info *lnrpc.ChannelEdge) (*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, *models.ChannelEdgePolicy, error) { + chanPoint, err := wire.NewOutPointFromString(info.ChanPoint) + if err != nil { + return nil, nil, nil, err + } + + var ( + node1Bytes [33]byte + node2Bytes [33]byte + ) + node1, err := hex.DecodeString(info.Node1Pub) + if err != nil { + return nil, nil, nil, err + } + copy(node1Bytes[:], node1) + + node2, err := hex.DecodeString(info.Node2Pub) + if err != nil { + return nil, nil, nil, err + } + copy(node2Bytes[:], node2) + + extra, err := lnwire.CustomRecords(info.CustomRecords).Serialize() + if err != nil { + return nil, nil, nil, err + } + + edge := &models.ChannelEdgeInfo{ + ChannelID: info.ChannelId, + ChannelPoint: *chanPoint, + NodeKey1Bytes: node1Bytes, + NodeKey2Bytes: node2Bytes, + Capacity: btcutil.Amount(info.Capacity), + ExtraOpaqueData: extra, + } + + var ( + policy1 *models.ChannelEdgePolicy + policy2 *models.ChannelEdgePolicy + ) + if info.Node1Policy != nil { + policy1, err = unmarshalPolicy(info.ChannelId, info.Node1Policy, true) + if err != nil { + return nil, nil, nil, err + } + } + if info.Node2Policy != nil { + policy2, err = unmarshalPolicy(info.ChannelId, info.Node2Policy, false) + if err != nil { + return nil, nil, nil, err + } + } + + return edge, policy1, policy2, nil +} + +func unmarshalPolicy(channelID uint64, rpcPolicy *lnrpc.RoutingPolicy, + node1 bool) (*models.ChannelEdgePolicy, error) { + + var chanFlags lnwire.ChanUpdateChanFlags + if !node1 { + chanFlags |= lnwire.ChanUpdateDirection + } + if rpcPolicy.Disabled { + chanFlags |= lnwire.ChanUpdateDisabled + } + + extra, err := lnwire.CustomRecords(rpcPolicy.CustomRecords).Serialize() + if err != nil { + return nil, err + } + + return &models.ChannelEdgePolicy{ + ChannelID: channelID, + TimeLockDelta: uint16(rpcPolicy.TimeLockDelta), + MinHTLC: lnwire.MilliSatoshi(rpcPolicy.MinHtlc), + MaxHTLC: lnwire.MilliSatoshi(rpcPolicy.MaxHtlcMsat), + FeeBaseMSat: lnwire.MilliSatoshi(rpcPolicy.FeeBaseMsat), + FeeProportionalMillionths: lnwire.MilliSatoshi(rpcPolicy.FeeRateMilliMsat), + LastUpdate: time.Unix(int64(rpcPolicy.LastUpdate), 0), + ChannelFlags: chanFlags, + ExtraOpaqueData: extra, + }, nil +} + +func (r *remoteWrapper) IsPublicNode(ctx context.Context, pubKey [33]byte) (bool, error) { + resp, err := r.lnConn.GetNodeInfo(ctx, &lnrpc.NodeInfoRequest{ + PubKey: hex.EncodeToString(pubKey[:]), + IncludeChannels: false, + }) + if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound { + return false, nil + } else if err != nil { + return false, err + } + + return resp.IsPublic, nil +} + +func (r *remoteWrapper) AddrsForNode(ctx context.Context, nodePub *btcec.PublicKey) (bool, []net.Addr, error) { + resp, err := r.lnConn.GetNodeInfo(ctx, &lnrpc.NodeInfoRequest{ + PubKey: hex.EncodeToString(nodePub.SerializeCompressed()), + IncludeChannels: false, + }) + if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound { + return false, nil, nil + } else if err != nil { + return false, nil, err + } + + addrs, err := r.unmarshalAddrs(resp.Node.Addresses) + if err != nil { + return false, nil, err + } + + return true, addrs, nil +} + +func (r *remoteWrapper) unmarshalAddrs(addrs []*lnrpc.NodeAddress) ([]net.Addr, error) { + netAddrs := make([]net.Addr, 0, len(addrs)) + for _, addr := range addrs { + netAddr, err := r.net.ResolveTCPAddr(addr.Network, addr.Addr) + if err != nil { + return nil, err + } + netAddrs = append(netAddrs, netAddr) + } + + return netAddrs, nil +} + +func (r *remoteWrapper) HasLightningNode(ctx context.Context, nodePub [33]byte) (time.Time, bool, error) { + resp, err := r.lnConn.GetNodeInfo(ctx, &lnrpc.NodeInfoRequest{ + PubKey: hex.EncodeToString(nodePub[:]), + IncludeChannels: false, + }) + if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound { + return time.Time{}, false, nil + } else if err != nil { + return time.Time{}, false, err + } + + return time.Unix(int64(resp.Node.LastUpdate), 0), true, nil +} + +func (r *remoteWrapper) LookupAlias(ctx context.Context, pub *btcec.PublicKey) ( + string, error) { + + resp, err := r.lnConn.GetNodeInfo(ctx, &lnrpc.NodeInfoRequest{ + PubKey: hex.EncodeToString(pub.SerializeCompressed()), + IncludeChannels: false, + }) + if err != nil { + return "", err + } + + return resp.Node.Alias, nil +} + +var _ lnd.GraphSource = (*remoteWrapper)(nil)