Skip to content

Commit

Permalink
Use self signed tokens for stream
Browse files Browse the repository at this point in the history
  • Loading branch information
NickCao committed Aug 13, 2024
1 parent bca34e1 commit 742ecb5
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 100 deletions.
61 changes: 14 additions & 47 deletions internal/service/controller_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,19 @@ import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"net"
"net/url"
"strings"
"sync"
"time"

"github.com/golang-jwt/jwt/v5"
pb "github.com/jumpstarter-dev/jumpstarter-protocol/go/jumpstarter/v1"
"golang.org/x/exp/slices"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
authv1 "k8s.io/api/authentication/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -49,10 +47,6 @@ import (
jumpstarterdevv1alpha1 "github.com/jumpstarter-dev/jumpstarter-controller/api/v1alpha1"
)

// Reference: config/default/kustomization.yaml
const nameSpace = "jumpstarter-router-system"
const namePrefix = "jumpstarter-router-"

// ControlerService exposes a gRPC service
type ControllerService struct {
pb.UnimplementedControllerServiceServer
Expand Down Expand Up @@ -386,60 +380,33 @@ func (s *ControllerService) Dial(ctx context.Context, req *pb.DialRequest) (*pb.

stream := uuid.NewUUID()

audience := (&url.URL{
// TODO should we use grpc scheme?
Scheme: "https",
Host: routerEndpoint(),
Path: fmt.Sprintf("/stream/%s", stream),
}).String()

// TODO: make this configurable and requestable (with limits)
expsecs := int64(3600)

var tokenholder corev1.ServiceAccount

tokenholderName := types.NamespacedName{
Namespace: nameSpace,
Name: namePrefix + "tokenholder",
}

if err := s.Client.Get(ctx, tokenholderName, &tokenholder); err != nil {
logger.Error(err, "failed to get tokenholder service account", "name", tokenholderName)
return nil, status.Errorf(codes.Internal, "failed to get tokenholder service account")
}

tokenRequest := authv1.TokenRequest{
ObjectMeta: metav1.ObjectMeta{
Namespace: tokenholderName.Namespace,
Name: tokenholderName.Name,
},
Spec: authv1.TokenRequestSpec{
Audiences: []string{audience},
ExpirationSeconds: &expsecs,
},
}

if err := s.SubResource("token").Create(ctx, &tokenholder, &tokenRequest); err != nil {
logger.Error(err, "failed to issue stream token")
return nil, status.Errorf(codes.Internal, "failed to issue stream token: %s", err)
}
token, err := jwt.NewWithClaims(jwt.SigningMethodNone, jwt.RegisteredClaims{
Issuer: "https://jumpstarter.dev/stream",
Subject: string(stream),
Audience: []string{"https://jumpstarter.dev/router"},
ExpiresAt: jwt.NewNumericDate(time.Now().Add(time.Minute * 30)),
NotBefore: jwt.NewNumericDate(time.Now()),
IssuedAt: jwt.NewNumericDate(time.Now()),
ID: string(uuid.NewUUID()),
}).SignedString(jwt.UnsafeAllowNoneSignatureType)

This comment has been minimized.

Copy link
@mangelajo

mangelajo Aug 13, 2024

Member

But this is not self-signed, it's un-signed

This comment has been minimized.

Copy link
@NickCao

NickCao Aug 13, 2024

Author Collaborator

Indeed, have to implement a shared PSK for signing. Ideally it can be generated by helm at installation time.


// TODO: find best router from list
endpoint := routerEndpoint()

response := &pb.ListenResponse{
RouterEndpoint: endpoint,
RouterToken: tokenRequest.Status.Token,
RouterToken: token,
}

if err := value.(listenContext).stream.Send(response); err != nil {
logger.Error(err, "failed to send listen response", "response", response)
return nil, err
}

logger.Info("Client dial assigned stream ", "client", identity.GetName(), "stream", audience)
logger.Info("Client dial assigned stream ", "client", identity.GetName(), "stream", stream)
return &pb.DialResponse{
RouterEndpoint: endpoint,
RouterToken: tokenRequest.Status.Token,
RouterToken: token,
}, nil
}

Expand Down
65 changes: 12 additions & 53 deletions internal/service/router_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,14 @@ package service
import (
"context"
"net"
"net/url"
"strings"
"sync"

"github.com/golang-jwt/jwt/v5"
pb "github.com/jumpstarter-dev/jumpstarter-protocol/go/jumpstarter/v1"
"golang.org/x/exp/slices"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/status"
authv1 "k8s.io/api/authentication/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -56,58 +52,21 @@ func (s *RouterService) authenticate(ctx context.Context) (string, error) {
return "", err
}

parser := jwt.NewParser()

parsed, _, err := parser.ParseUnverified(token, &jwt.RegisteredClaims{})
if err != nil {
parsed, err := jwt.ParseWithClaims(
token,
&jwt.RegisteredClaims{},
func(t *jwt.Token) (interface{}, error) { return jwt.UnsafeAllowNoneSignatureType, nil },
jwt.WithIssuer("https://jumpstarter.dev/stream"),
jwt.WithAudience("https://jumpstarter.dev/router"),
jwt.WithIssuedAt(),
jwt.WithExpirationRequired(),
)

if err != nil || !parsed.Valid {
return "", status.Errorf(codes.InvalidArgument, "invalid jwt token")
}

audiences, err := parsed.Claims.GetAudience()
if err != nil {
return "", status.Errorf(codes.InvalidArgument, "invalid jwt audience")
}

var matched []*url.URL

for _, audience := range audiences {
aud, err := url.Parse(audience)
// skip unrecognized audiences
if err != nil {
continue
}
// skip non local audiences
if aud.Scheme != "https" || aud.Host != routerEndpoint() || !strings.HasPrefix(aud.Path, "/stream/") {
continue
}
// add local audience to matched list
matched = append(matched, aud)
}

if len(matched) != 1 {
return "", status.Errorf(codes.InvalidArgument, "invalid number of local jwt audience")
}

// Invariant: len(matched) == 1
audience := matched[0]

tokenReview := authv1.TokenReview{
Spec: authv1.TokenReviewSpec{
Token: token,
Audiences: []string{audience.String()},
},
}
if err := s.Client.Create(ctx, &tokenReview); err != nil {
return "", status.Errorf(codes.Unauthenticated, "failed to create token review")
}

if !tokenReview.Status.Authenticated ||
tokenReview.Status.User.Username != "system:serviceaccount:"+nameSpace+":"+namePrefix+"tokenholder" ||
!slices.Contains(tokenReview.Status.Audiences, audience.String()) {
return "", status.Errorf(codes.Unauthenticated, "unauthenticated jwt token")
}

return strings.TrimPrefix(audience.Path, "/stream/"), nil
return parsed.Claims.GetSubject()
}

func (s *RouterService) Stream(stream pb.RouterService_StreamServer) error {
Expand Down

0 comments on commit 742ecb5

Please sign in to comment.