Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a read-only data api for external and cluster.local tools #79

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion buf.gen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,7 @@ plugins:
opt: paths=source_relative
- name: go
out: .
opt: paths=source_relative
opt: paths=source_relative
- name: grpc-gateway
out: .
opt: paths=source_relative
4 changes: 3 additions & 1 deletion buf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,6 @@ lint:
- PACKAGE_SAME_DIRECTORY
breaking:
use:
- WIRE_JSON
- WIRE_JSON
deps:
- buf.build/googleapis/googleapis
37 changes: 34 additions & 3 deletions daemon/main.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
package main

import (
"context"
"flag"
"net/http"
"os"
"strconv"

"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/networkop/meshnet-cni/daemon/cni"
"github.com/networkop/meshnet-cni/daemon/grpcwire"
"github.com/networkop/meshnet-cni/daemon/meshnet"
mpb "github.com/networkop/meshnet-cni/daemon/proto/meshnet/v1beta1"
"github.com/networkop/meshnet-cni/daemon/vxlan"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

const (
defaultPort = 51111
httpPort = "51112"
)

func main() {
Expand Down Expand Up @@ -49,8 +56,32 @@ func main() {
}
log.Info("Starting meshnet daemon...with grpc support")

if err := m.Serve(); err != nil {
log.Errorf("Daemon exited badly: %v", err)
os.Exit(1)
// Start meshnet in a goroutine b/c we have more to do.
go m.Serve()
log.Info("Server started.")

// create a grcp client
conn, err := grpc.DialContext(
context.Background(),
"0.0.0.0:"+strconv.Itoa(grpcPort),
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
log.Fatalln("Failed to dail gRPC server:", err)
}
log.Infof("Connected to grpc server on %d", grpcPort)

// Create an HTTP service for the Grafana service handler and fire it up
gwmux := runtime.NewServeMux()
err = mpb.RegisterGrafanaHandler(context.Background(), gwmux, conn)
if err != nil {
log.Fatalln("Failed to register HTTP gateway:", err)
}
gwServer := &http.Server{
Addr: ":" + httpPort,
Handler: gwmux,
}
log.Infof("Serving gRPC-Gateway on :%s", httpPort)
log.Fatalln(gwServer.ListenAndServe())
}
94 changes: 54 additions & 40 deletions daemon/meshnet/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,68 +15,62 @@ import (
"k8s.io/client-go/util/retry"

"github.com/google/gopacket/pcap"
topologyv1 "github.com/networkop/meshnet-cni/api/types/v1beta1"
mpb "github.com/networkop/meshnet-cni/daemon/proto/meshnet/v1beta1"
)

// TODO: Migrate from getPod to getTopo b/c simpler.
func (m *Meshnet) getPod(ctx context.Context, name, ns string) (*unstructured.Unstructured, error) {
mnetdLogger.Infof("Reading pod %s from K8s", name)
return m.tClient.Topology(ns).Unstructured(ctx, name, metav1.GetOptions{})
}

func (m *Meshnet) getTopo(ctx context.Context, name, ns string) (*topologyv1.Topology, error) {
mnetdLogger.Infof("Reading topology %s/%s from K8s", ns, name)
return m.tClient.Topology(ns).Get(ctx, name, metav1.GetOptions{})
}

func (m *Meshnet) updateStatus(ctx context.Context, obj *unstructured.Unstructured, ns string) error {
mnetdLogger.Infof("Update pod status %s from K8s", obj.GetName())
_, err := m.tClient.Topology(ns).Update(ctx, obj, metav1.UpdateOptions{})
return err
}

// You know, this is really unfortunate choice of names in meshnet.proto.
func (m *Meshnet) makePod(topo *topologyv1.Topology) *mpb.Pod {
links := make([]*mpb.Link, len(topo.Spec.Links))
for i := range links {
links[i] = &mpb.Link{
PeerPod: topo.Spec.Links[i].PeerPod,
LocalIntf: topo.Spec.Links[i].LocalIntf,
PeerIntf: topo.Spec.Links[i].PeerIntf,
LocalIp: topo.Spec.Links[i].LocalIP,
PeerIp: topo.Spec.Links[i].PeerIP,
Uid: int64(topo.Spec.Links[i].UID),
}
}
return &mpb.Pod{
Name: topo.Name,
SrcIp: topo.Status.SrcIP,
NetNs: topo.Status.NetNS,
KubeNs: topo.Namespace,
Links: links,
NodeIp: os.Getenv("HOST_IP"),
NodeIntf: os.Getenv("HOST_INTF"),
ContainerId: topo.Status.ContainerID,
}
}

func (m *Meshnet) Get(ctx context.Context, pod *mpb.PodQuery) (*mpb.Pod, error) {
mnetdLogger.Infof("Retrieving %s's metadata from K8s...", pod.Name)

result, err := m.getPod(ctx, pod.Name, pod.KubeNs)
result, err := m.getTopo(ctx, pod.Name, pod.KubeNs)
if err != nil {
mnetdLogger.Errorf("Failed to read pod %s from K8s", pod.Name)
return nil, err
}

remoteLinks, found, err := unstructured.NestedSlice(result.Object, "spec", "links")
if err != nil || !found || remoteLinks == nil {
mnetdLogger.Errorf("Could not find 'Link' array in pod's spec")
return nil, err
}

links := make([]*mpb.Link, len(remoteLinks))
for i := range links {
remoteLink, ok := remoteLinks[i].(map[string]interface{})
if !ok {
mnetdLogger.Errorf("Unrecognised 'Link' structure")
return nil, err
}
newLink := &mpb.Link{}
newLink.PeerPod, _, _ = unstructured.NestedString(remoteLink, "peer_pod")
newLink.PeerIntf, _, _ = unstructured.NestedString(remoteLink, "peer_intf")
newLink.LocalIntf, _, _ = unstructured.NestedString(remoteLink, "local_intf")
newLink.LocalIp, _, _ = unstructured.NestedString(remoteLink, "local_ip")
newLink.PeerIp, _, _ = unstructured.NestedString(remoteLink, "peer_ip")
newLink.Uid, _, _ = unstructured.NestedInt64(remoteLink, "uid")
links[i] = newLink
}

srcIP, _, _ := unstructured.NestedString(result.Object, "status", "src_ip")
netNs, _, _ := unstructured.NestedString(result.Object, "status", "net_ns")
containerId, _, _ := unstructured.NestedString(result.Object, "status", "container_id")
nodeIP := os.Getenv("HOST_IP")
nodeIntf := os.Getenv("HOST_INTF")

return &mpb.Pod{
Name: pod.Name,
SrcIp: srcIP,
NetNs: netNs,
KubeNs: pod.KubeNs,
Links: links,
NodeIp: nodeIP,
NodeIntf: nodeIntf,
ContainerId: containerId,
}, nil
return m.makePod(result), nil
}

func (m *Meshnet) SetAlive(ctx context.Context, pod *mpb.Pod) (*mpb.BoolResponse, error) {
Expand Down Expand Up @@ -462,3 +456,23 @@ func (m *Meshnet) GenerateNodeInterfaceName(ctx context.Context, in *mpb.Generat
}
return &mpb.GenerateNodeInterfaceNameResponse{Ok: true, NodeIntfName: locIfNm}, nil
}

// API calls for grafana
func (m *Meshnet) getTopoList(ctx context.Context, ns string) (*topologyv1.TopologyList, error) {
mnetdLogger.Infof("Reading topologies from %s", ns)
return m.tClient.Topology(ns).List(ctx, metav1.ListOptions{})
}

func (m *Meshnet) ListTopos(ctx context.Context, ns *mpb.TopoReq) (*mpb.TopoList, error) {
mnetdLogger.Infof("Retrieving topology from %s", ns.KubeNs)
result, err := m.getTopoList(ctx, ns.KubeNs)
if err != nil {
mnetdLogger.Errorf("Failed to read topologies from %s", ns.KubeNs)
return nil, err
}
topos := make([]*mpb.Pod, len(result.Items))
for i, obj := range result.Items {
topos[i] = m.makePod(&obj)
}
return &mpb.TopoList{Pods: topos}, nil
}
2 changes: 2 additions & 0 deletions daemon/meshnet/meshnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Meshnet struct {
mpb.UnimplementedLocalServer
mpb.UnimplementedRemoteServer
mpb.UnimplementedWireProtocolServer
mpb.UnimplementedGrafanaServer
config Config
kClient kubernetes.Interface
tClient topologyclientv1.Interface
Expand Down Expand Up @@ -104,6 +105,7 @@ func New(cfg Config) (*Meshnet, error) {
mpb.RegisterLocalServer(m.s, m)
mpb.RegisterRemoteServer(m.s, m)
mpb.RegisterWireProtocolServer(m.s, m)
mpb.RegisterGrafanaServer(m.s, m)
reflection.Register(m.s)

// After server is registered, reduce logging if link type is GRPC
Expand Down
Loading