Skip to content

Commit

Permalink
enable self-service ops collections via special case
Browse files Browse the repository at this point in the history
This is a medium-term ... hack, which enables users to read the ops logs
and stats of their own tasks. When reading the especially-enumerated
collections, we bypass traditional prefix authorization and instead
authorize over the `name` partition of the logs or stats collection.

Testing:
 * Verified this enables the new ops view of the entity details page.
 * Verified that slightly perturbing how build up allowed prefixes
   causes unauthorized errors to be returned.
 * Verified that a locally-modified `flowctl` is able to list journals
   and fragments of task logs, as well as read them, as a regular user
   of a local stack.
  • Loading branch information
jgraettinger committed Jan 19, 2024
1 parent ca10b52 commit d086acb
Showing 1 changed file with 68 additions and 6 deletions.
74 changes: 68 additions & 6 deletions journal_server.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package main

import (
"bytes"
context "context"
"fmt"
"slices"

"github.com/estuary/data-plane-gateway/auth"
"github.com/estuary/flow/go/labels"
log "github.com/sirupsen/logrus"
pb "go.gazette.dev/core/broker/protocol"
)
Expand Down Expand Up @@ -46,13 +49,39 @@ func (s *JournalAuthServer) List(ctx context.Context, req *pb.ListRequest) (*pb.
if err != nil {
return nil, err
}
ctx = pb.WithDispatchDefault(ctx)

// Is the user listing (only) ops collections?
var requested = req.Selector.Include.ValuesOf(labels.Collection)
var isOpsListing = len(requested) != 0
for _, r := range requested {
isOpsListing = isOpsListing && slices.Contains(allOpsCollections, r)
}

// Special-case listings of ops collections.
// We list all journals, and then filter to those that the user may access.
if isOpsListing {
var resp, err = s.journalClient.List(ctx, req)
if err != nil {
return nil, err
}

// Filter journals to those the user has access to.
var filtered []pb.ListResponse_Journal
for _, j := range resp.Journals {
if isAllowedOpsJournal(claims, j.Spec.Name) {
filtered = append(filtered, j)
}
}
resp.Journals = filtered
return resp, nil
}

err = auth.EnforceSelectorPrefix(claims, req.Selector)
if err != nil {
return nil, fmt.Errorf("Unauthorized: %w", err)
}

ctx = pb.WithDispatchDefault(ctx)
return s.journalClient.List(ctx, req)
}

Expand All @@ -65,27 +94,31 @@ func (s *JournalAuthServer) ListFragments(ctx context.Context, req *pb.Fragments

err = auth.EnforcePrefix(claims, req.Journal.String())
if err != nil {
return nil, fmt.Errorf("Unauthorized: %w", err)
if !isAllowedOpsJournal(claims, req.Journal) {
return nil, fmt.Errorf("Unauthorized: %w", err)
}
}

return s.journalClient.ListFragments(ctx, req)
}

// Read implements protocol.JournalServer
func (s *JournalAuthServer) Read(readReq *pb.ReadRequest, readServer pb.Journal_ReadServer) error {
func (s *JournalAuthServer) Read(req *pb.ReadRequest, readServer pb.Journal_ReadServer) error {
ctx := readServer.Context()

claims, err := auth.AuthenticateGrpcReq(ctx, s.jwtVerificationKey)
if err != nil {
return err
}

err = auth.EnforcePrefix(claims, readReq.Journal.String())
err = auth.EnforcePrefix(claims, req.Journal.String())
if err != nil {
return fmt.Errorf("Unauthorized: %w", err)
if !isAllowedOpsJournal(claims, req.Journal) {
return fmt.Errorf("Unauthorized: %w", err)
}
}

readClient, err := s.journalClient.Read(ctx, readReq)
readClient, err := s.journalClient.Read(ctx, req)
if err != nil {
return err
}
Expand All @@ -104,4 +137,33 @@ func (s *JournalAuthServer) Replicate(pb.Journal_ReplicateServer) error {
return fmt.Errorf("Unsupported operation: `Replicate`")
}

// TODO(johnny): This authorization check is an encapsulated hack that allows
// ops logs and stats to be read-able by end users.
// It's a placeholder for a missing partition-level authorization feature.
func isAllowedOpsJournal(claims *auth.AuthorizedClaims, journal pb.Journal) bool {
var b = make([]byte, 256)

for _, oc := range allOpsCollections {
for _, kind := range []string{"capture", "derivation", "materialization"} {
for _, prefix := range claims.Prefixes {
b = append(b[:0], oc...)
b = append(b, "/kind="...)
b = append(b, kind...)
b = append(b, "/name="...)
b = labels.EncodePartitionValue(b, prefix)

if bytes.HasPrefix([]byte(journal), b) {
return true
}
}
}
}
return false
}

var allOpsCollections = []string{
"ops.us-central1.v1/logs",
"ops.us-central1.v1/stats",
}

var _ pb.JournalServer = &JournalAuthServer{}

0 comments on commit d086acb

Please sign in to comment.