Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notifications #76

Closed
wants to merge 14 commits into from
Closed
182 changes: 12 additions & 170 deletions server.go → channel_opener_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,58 +2,41 @@ package main

import (
"context"
"crypto/tls"
"encoding/hex"
"encoding/json"
"fmt"
"log"
"net"
"strings"

"github.com/breez/lspd/btceclegacy"
"github.com/breez/lspd/cln"
"github.com/breez/lspd/config"
"github.com/breez/lspd/interceptor"
"github.com/breez/lspd/lightning"
"github.com/breez/lspd/lnd"
lspdrpc "github.com/breez/lspd/rpc"
ecies "github.com/ecies/go/v2"
"github.com/golang/protobuf/proto"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"golang.org/x/sync/singleflight"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/caddyserver/certmagic"
"github.com/lightningnetwork/lnd/lnwire"
)

type server struct {
type channelOpenerServer struct {
lspdrpc.ChannelOpenerServer
address string
certmagicDomain string
lis net.Listener
s *grpc.Server
nodes map[string]*node
store interceptor.InterceptStore
store interceptor.InterceptStore
}

type node struct {
client lightning.Client
nodeConfig *config.NodeConfig
privateKey *btcec.PrivateKey
publicKey *btcec.PublicKey
eciesPrivateKey *ecies.PrivateKey
eciesPublicKey *ecies.PublicKey
openChannelReqGroup singleflight.Group
func NewChannelOpenerServer(
store interceptor.InterceptStore,
) *channelOpenerServer {
return &channelOpenerServer{
store: store,
}
}

func (s *server) ChannelInformation(ctx context.Context, in *lspdrpc.ChannelInformationRequest) (*lspdrpc.ChannelInformationReply, error) {
func (s *channelOpenerServer) ChannelInformation(ctx context.Context, in *lspdrpc.ChannelInformationRequest) (*lspdrpc.ChannelInformationReply, error) {
node, err := getNode(ctx)
if err != nil {
return nil, err
Expand All @@ -76,7 +59,7 @@ func (s *server) ChannelInformation(ctx context.Context, in *lspdrpc.ChannelInfo
}, nil
}

func (s *server) RegisterPayment(ctx context.Context, in *lspdrpc.RegisterPaymentRequest) (*lspdrpc.RegisterPaymentReply, error) {
func (s *channelOpenerServer) RegisterPayment(ctx context.Context, in *lspdrpc.RegisterPaymentRequest) (*lspdrpc.RegisterPaymentReply, error) {
node, err := getNode(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -126,7 +109,7 @@ func (s *server) RegisterPayment(ctx context.Context, in *lspdrpc.RegisterPaymen
return &lspdrpc.RegisterPaymentReply{}, nil
}

func (s *server) OpenChannel(ctx context.Context, in *lspdrpc.OpenChannelRequest) (*lspdrpc.OpenChannelReply, error) {
func (s *channelOpenerServer) OpenChannel(ctx context.Context, in *lspdrpc.OpenChannelRequest) (*lspdrpc.OpenChannelReply, error) {
node, err := getNode(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -210,7 +193,7 @@ func (n *node) getSignedEncryptedData(in *lspdrpc.Encrypted) (string, []byte, bo
return hex.EncodeToString(signed.Pubkey), signed.Data, usedEcies, nil
}

func (s *server) CheckChannels(ctx context.Context, in *lspdrpc.Encrypted) (*lspdrpc.Encrypted, error) {
func (s *channelOpenerServer) CheckChannels(ctx context.Context, in *lspdrpc.Encrypted) (*lspdrpc.Encrypted, error) {
node, err := getNode(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -265,147 +248,6 @@ func (s *server) CheckChannels(ctx context.Context, in *lspdrpc.Encrypted) (*lsp
return &lspdrpc.Encrypted{Data: encrypted}, nil
}

func NewGrpcServer(
configs []*config.NodeConfig,
address string,
certmagicDomain string,
store interceptor.InterceptStore,
) (*server, error) {
if len(configs) == 0 {
return nil, fmt.Errorf("no nodes supplied")
}

nodes := make(map[string]*node)
for _, config := range configs {
pk, err := hex.DecodeString(config.LspdPrivateKey)
if err != nil {
return nil, fmt.Errorf("hex.DecodeString(config.lspdPrivateKey=%v) error: %v", config.LspdPrivateKey, err)
}

eciesPrivateKey := ecies.NewPrivateKeyFromBytes(pk)
eciesPublicKey := eciesPrivateKey.PublicKey
privateKey, publicKey := btcec.PrivKeyFromBytes(pk)

node := &node{
nodeConfig: config,
privateKey: privateKey,
publicKey: publicKey,
eciesPrivateKey: eciesPrivateKey,
eciesPublicKey: eciesPublicKey,
}

if config.Lnd == nil && config.Cln == nil {
return nil, fmt.Errorf("node has to be either cln or lnd")
}

if config.Lnd != nil && config.Cln != nil {
return nil, fmt.Errorf("node cannot be both cln and lnd")
}

if config.Lnd != nil {
node.client, err = lnd.NewLndClient(config.Lnd)
if err != nil {
return nil, err
}
}

if config.Cln != nil {
node.client, err = cln.NewClnClient(config.Cln.SocketPath)
if err != nil {
return nil, err
}
}

_, exists := nodes[config.Token]
if exists {
return nil, fmt.Errorf("cannot have multiple nodes with the same token")
}

nodes[config.Token] = node
}

return &server{
address: address,
certmagicDomain: certmagicDomain,
nodes: nodes,
store: store,
}, nil
}

func (s *server) Start() error {
// Make sure all nodes are available and set name and pubkey if not set
// in config.
for _, n := range s.nodes {
info, err := n.client.GetInfo()
if err != nil {
return fmt.Errorf("failed to get info from host %s", n.nodeConfig.Host)
}

if n.nodeConfig.Name == "" {
n.nodeConfig.Name = info.Alias
}

if n.nodeConfig.NodePubkey == "" {
n.nodeConfig.NodePubkey = info.Pubkey
}
}

var lis net.Listener
if s.certmagicDomain == "" {
var err error
lis, err = net.Listen("tcp", s.address)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
} else {
tlsConfig, err := certmagic.TLS([]string{s.certmagicDomain})
if err != nil {
log.Fatalf("failed to run certmagic: %v", err)
}
lis, err = tls.Listen("tcp", s.address, tlsConfig)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
}

srv := grpc.NewServer(
grpc_middleware.WithUnaryServerChain(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
if md, ok := metadata.FromIncomingContext(ctx); ok {
for _, auth := range md.Get("authorization") {
if !strings.HasPrefix(auth, "Bearer ") {
continue
}

token := strings.Replace(auth, "Bearer ", "", 1)
node, ok := s.nodes[token]
if !ok {
continue
}

return handler(context.WithValue(ctx, "node", node), req)
}
}
return nil, status.Errorf(codes.PermissionDenied, "Not authorized")
}),
)
lspdrpc.RegisterChannelOpenerServer(srv, s)

s.s = srv
s.lis = lis
if err := srv.Serve(lis); err != nil {
return fmt.Errorf("failed to serve: %v", err)
}

return nil
}

func (s *server) Stop() {
srv := s.s
if srv != nil {
srv.GracefulStop()
}
}

func getNode(ctx context.Context) (*node, error) {
n := ctx.Value("node")
if n == nil {
Expand Down
51 changes: 50 additions & 1 deletion cln/cln_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log"
"path/filepath"
"time"

"github.com/breez/lspd/basetypes"
"github.com/breez/lspd/lightning"
Expand Down Expand Up @@ -65,7 +66,7 @@ func (c *ClnClient) IsConnected(destination []byte) (bool, error) {
}

for _, peer := range peers {
if pubKey == peer.Id {
if pubKey == peer.Id && peer.Connected {
log.Printf("destination online: %x", destination)
return true, nil
}
Expand Down Expand Up @@ -174,6 +175,32 @@ func (c *ClnClient) GetChannel(peerID []byte, channelPoint wire.OutPoint) (*ligh
return nil, fmt.Errorf("no channel found")
}

func (c *ClnClient) GetPeerId(scid *basetypes.ShortChannelID) ([]byte, error) {
scidStr := scid.ToString()
peers, err := c.client.ListPeers()
if err != nil {
return nil, err
}

var dest *string
for _, p := range peers {
for _, ch := range p.Channels {
if ch.Alias.Local == scidStr ||
ch.Alias.Remote == scidStr ||
ch.ShortChannelId == scidStr {
dest = &p.Id
break
}
}
}

if dest == nil {
return nil, nil
}

return hex.DecodeString(*dest)
}

func (c *ClnClient) GetNodeChannelCount(nodeID []byte) (int, error) {
pubkey := hex.EncodeToString(nodeID)
peer, err := c.client.GetPeer(pubkey)
Expand Down Expand Up @@ -228,3 +255,25 @@ func (c *ClnClient) GetClosedChannels(nodeID string, channelPoints map[string]ui

return r, nil
}

var pollingInterval = 400 * time.Millisecond

func (c *ClnClient) WaitOnline(peerID []byte, timeout time.Time) error {
peerIDStr := hex.EncodeToString(peerID)
for {
peer, err := c.client.GetPeer(peerIDStr)
if err == nil && peer.Connected {
return nil
}

select {
case <-time.After(time.Until(timeout)):
return fmt.Errorf("timeout")
case <-time.After(pollingInterval):
}
}
}

func (c *ClnClient) WaitChannelActive(peerID []byte, timeout time.Time) error {
return nil
}
29 changes: 12 additions & 17 deletions cln/cln_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync"
"time"

"github.com/breez/lspd/basetypes"
"github.com/breez/lspd/cln_plugin/proto"
"github.com/breez/lspd/config"
"github.com/breez/lspd/interceptor"
Expand Down Expand Up @@ -129,26 +130,11 @@ func (i *ClnHtlcInterceptor) intercept() error {
log.Printf("unexpected error in interceptor.Recv() %v", err)
break
}
nextHop := "<unknown>"
channels, err := i.client.client.GetChannel(request.Onion.ShortChannelId)
if err != nil {
for _, c := range channels {
if c.Source == i.config.NodePubkey {
nextHop = c.Destination
break
}
if c.Destination == i.config.NodePubkey {
nextHop = c.Source
break
}
}
}

log.Printf("correlationid: %v\nhtlc: %v\nchanID: %v\nnextHop: %v\nincoming amount: %v\noutgoing amount: %v\nincoming expiry: %v\noutgoing expiry: %v\npaymentHash: %v\nonionBlob: %v\n\n",
log.Printf("correlationid: %v\nhtlc: %v\nchanID: %v\nincoming amount: %v\noutgoing amount: %v\nincoming expiry: %v\noutgoing expiry: %v\npaymentHash: %v\nonionBlob: %v\n\n",
request.Correlationid,
request.Htlc,
request.Onion.ShortChannelId,
nextHop,
request.Htlc.AmountMsat, //with fees
request.Onion.ForwardMsat,
request.Htlc.CltvExpiryRelative,
Expand All @@ -163,8 +149,17 @@ func (i *ClnHtlcInterceptor) intercept() error {
if err != nil {
interceptorClient.Send(i.defaultResolution(request))
i.doneWg.Done()
return
}
interceptResult := i.interceptor.Intercept(nextHop, paymentHash, request.Onion.ForwardMsat, request.Onion.OutgoingCltvValue, request.Htlc.CltvExpiry)

scid, err := basetypes.NewShortChannelIDFromString(request.Onion.ShortChannelId)
if err != nil {
interceptorClient.Send(i.defaultResolution(request))
i.doneWg.Done()
JssDWt marked this conversation as resolved.
Show resolved Hide resolved
return
}

interceptResult := i.interceptor.Intercept(scid, paymentHash, request.Onion.ForwardMsat, request.Onion.OutgoingCltvValue, request.Htlc.CltvExpiry)
switch interceptResult.Action {
case interceptor.INTERCEPT_RESUME_WITH_ONION:
interceptorClient.Send(i.resumeWithOnion(request, interceptResult))
Expand Down
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ type NodeConfig struct {
// The channel can be closed if not used this duration in seconds.
MaxInactiveDuration uint64 `json:"maxInactiveDuration,string"`

// The maximum time to hold a htlc after sending a notification when the
// peer is offline.
NotificationTimeout string `json:"notificationTimeout,string"`

// Set this field to connect to an LND node.
Lnd *LndConfig `json:"lnd,omitempty"`

Expand Down
Loading