Skip to content

Commit

Permalink
Use fake client in mock controller
Browse files Browse the repository at this point in the history
  • Loading branch information
NickCao committed Aug 13, 2024
1 parent 742ecb5 commit 8dcdb3f
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 232 deletions.
249 changes: 19 additions & 230 deletions cmd/mock/main.go
Original file line number Diff line number Diff line change
@@ -1,251 +1,40 @@
package main

import (
"context"
"log"
"net"
"sync"

"github.com/google/uuid"
jumpstarterdevv1alpha1 "github.com/jumpstarter-dev/jumpstarter-controller/api/v1alpha1"
"github.com/jumpstarter-dev/jumpstarter-controller/internal/service"
pb "github.com/jumpstarter-dev/jumpstarter-protocol/go/jumpstarter/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)

type ControllerService struct {
pb.UnimplementedControllerServiceServer
exportersLock sync.RWMutex
exporters map[string]Exporter
listenersLock sync.RWMutex
listeners map[string]listenContext
}

type RouterService struct {
pb.UnimplementedRouterServiceServer
pendingLock sync.RWMutex
pending map[string]streamContext
}

type listenContext struct {
cancel context.CancelFunc
stream pb.ControllerService_ListenServer
}

type streamContext struct {
cancel context.CancelFunc
stream pb.RouterService_StreamServer
}

type Exporter struct {
Labels map[string]string
DriverInstanceReports []*pb.DriverInstanceReport
}

func (c *ControllerService) Register(
ctx context.Context,
req *pb.RegisterRequest,
) (*pb.RegisterResponse, error) {
c.exportersLock.Lock()
defer c.exportersLock.Unlock()
c.exporters[req.GetUuid()] = Exporter{
Labels: req.GetLabels(),
DriverInstanceReports: req.GetReports(),
}
return &pb.RegisterResponse{}, nil
}

func (c *ControllerService) Unregister(
ctx context.Context,
req *pb.UnregisterRequest,
) (*pb.UnregisterResponse, error) {
c.exportersLock.Lock()
defer c.exportersLock.Unlock()
delete(c.exporters, req.GetUuid())
return &pb.UnregisterResponse{}, nil
}

func (c *ControllerService) Listen(req *pb.ListenRequest, stream pb.ControllerService_ListenServer) error {
ctx := stream.Context()

exporter, err := service.BearerTokenFromContext(ctx)
if err != nil {
return err
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()

lctx := listenContext{
cancel: cancel,
stream: stream,
}

c.listenersLock.Lock()
if _, ok := c.listeners[exporter]; ok {
return status.Errorf(codes.AlreadyExists, "exporter is already listening")
}
c.listeners[exporter] = lctx
c.listenersLock.Unlock()

defer func() {
c.listenersLock.Lock()
delete(c.listeners, exporter)
c.listenersLock.Unlock()
}()

select {
case <-ctx.Done():
return nil
}
}

func (c *ControllerService) Dial(ctx context.Context, req *pb.DialRequest) (*pb.DialResponse, error) {
c.listenersLock.RLock()
listener, ok := c.listeners[req.GetUuid()]
c.listenersLock.RUnlock()

if !ok {
return nil, status.Errorf(codes.Unavailable, "no matching listener")
}

stream, _ := uuid.NewUUID()

// TODO: find best router from list
endpoint := "127.0.0.1:8083"

if err := listener.stream.Send(&pb.ListenResponse{
RouterEndpoint: endpoint,
RouterToken: stream.String(),
}); err != nil {
return nil, err
}

return &pb.DialResponse{
RouterEndpoint: endpoint,
RouterToken: stream.String(),
}, nil
}

func (c *ControllerService) ListExporters(
ctx context.Context,
req *pb.ListExportersRequest,
) (*pb.ListExportersResponse, error) {
c.exportersLock.RLock()
defer c.exportersLock.RUnlock()
exporters := []*pb.GetReportResponse{}

for uuid, exporter := range c.exporters {
mismatch := false
for key, value := range req.GetLabels() {
if v, ok := exporter.Labels[key]; !ok || v != value {
mismatch = true
break
}
}
if !mismatch {
exporters = append(exporters, &pb.GetReportResponse{
Uuid: uuid,
Labels: exporter.Labels,
Reports: exporter.DriverInstanceReports,
})
}
}

return &pb.ListExportersResponse{
Exporters: exporters,
}, nil

}

func (c *ControllerService) GetExporter(
ctx context.Context,
req *pb.GetExporterRequest,
) (*pb.GetExporterResponse, error) {
c.exportersLock.RLock()
defer c.exportersLock.RUnlock()
if exporter, ok := c.exporters[req.GetUuid()]; ok {
return &pb.GetExporterResponse{
Exporter: &pb.GetReportResponse{
Uuid: req.GetUuid(),
Labels: exporter.Labels,
Reports: exporter.DriverInstanceReports,
},
}, nil
} else {
return nil, status.Errorf(codes.NotFound, "no such device")
}
}

func (c *ControllerService) LeaseExporter(
ctx context.Context,
req *pb.LeaseExporterRequest,
) (*pb.LeaseExporterResponse, error) {
return &pb.LeaseExporterResponse{
LeaseExporterResponseOneof: &pb.LeaseExporterResponse_Success{
Success: &pb.LeaseExporterResponseSuccess{
Duration: req.Duration,
},
},
}, nil
}

func (c *ControllerService) ReleaseExporter(
ctx context.Context,
req *pb.ReleaseExporterRequest,
) (*pb.ReleaseExporterResponse, error) {
return &pb.ReleaseExporterResponse{
ReleaseExporterResponseOneof: &pb.ReleaseExporterResponse_Success{
Success: &pb.ReleaseExporterResponseSuccess{},
},
}, nil
}

func (r *RouterService) Stream(stream pb.RouterService_StreamServer) error {
ctx := stream.Context()

streamName, err := service.BearerTokenFromContext(ctx)
if err != nil {
return err
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()

sctx := streamContext{
cancel: cancel,
stream: stream,
}

r.pendingLock.RLock()
s, ok := r.pending[streamName]
r.pendingLock.RUnlock()

if ok {
defer s.cancel()
return service.Forward(ctx, stream, s.stream)
} else {
r.pendingLock.Lock()
r.pending[streamName] = sctx
r.pendingLock.Unlock()
}
var (
scheme = runtime.NewScheme()
client = fake.NewFakeClient()
)

select {
case <-ctx.Done():
return nil
}
func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(jumpstarterdevv1alpha1.AddToScheme(scheme))
}

func main() {
server := grpc.NewServer()

pb.RegisterControllerServiceServer(server, &ControllerService{
exporters: make(map[string]Exporter),
listeners: make(map[string]listenContext),
pb.RegisterControllerServiceServer(server, &service.ControllerService{
Client: client,
Scheme: scheme,
})
pb.RegisterRouterServiceServer(server, &RouterService{
pending: make(map[string]streamContext),

pb.RegisterRouterServiceServer(server, &service.RouterService{
Client: client,
Scheme: scheme,
})

listener, err := net.Listen("tcp", ":8083")
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ go 1.22.3

require (
github.com/golang-jwt/jwt/v5 v5.2.1
github.com/google/uuid v1.6.0
github.com/jumpstarter-dev/jumpstarter-protocol/go v0.0.0-20240801151533-1669eb0a23a7
github.com/onsi/ginkgo/v2 v2.17.1
github.com/onsi/gomega v1.32.0
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e
golang.org/x/sync v0.6.0
google.golang.org/grpc v1.64.0
google.golang.org/protobuf v1.34.2
k8s.io/api v0.30.0
k8s.io/apimachinery v0.30.0
k8s.io/client-go v0.30.0
Expand All @@ -22,6 +22,7 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.9.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
Expand All @@ -37,6 +38,7 @@ require (
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/imdario/mergo v0.3.6 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
Expand All @@ -63,7 +65,6 @@ require (
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down

0 comments on commit 8dcdb3f

Please sign in to comment.