Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable self-service ops collections via special case #39

Merged
merged 3 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Build Stage
################################################################################
FROM golang:1.19-buster as builder
FROM golang as builder

WORKDIR /builder

Expand Down Expand Up @@ -30,7 +30,7 @@ FROM busybox:1.34-musl as busybox

# Runtime Stage
################################################################################
FROM gcr.io/distroless/base-debian10
FROM gcr.io/distroless/base-debian11

COPY --from=busybox /bin/sh /bin/sh

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/estuary/data-plane-gateway

go 1.19
go 1.21

require (
github.com/estuary/flow v0.1.9-0.20230303181027-f65a9d7f1a89
Expand Down
74 changes: 68 additions & 6 deletions journal_server.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package main

import (
"bytes"
context "context"
"fmt"
"slices"

"github.com/estuary/data-plane-gateway/auth"
"github.com/estuary/flow/go/labels"
log "github.com/sirupsen/logrus"
pb "go.gazette.dev/core/broker/protocol"
)
Expand Down Expand Up @@ -46,13 +49,39 @@ func (s *JournalAuthServer) List(ctx context.Context, req *pb.ListRequest) (*pb.
if err != nil {
return nil, err
}
ctx = pb.WithDispatchDefault(ctx)

// Is the user listing (only) ops collections?
var requested = req.Selector.Include.ValuesOf(labels.Collection)
var isOpsListing = len(requested) != 0
for _, r := range requested {
isOpsListing = isOpsListing && slices.Contains(allOpsCollections, r)
}

// Special-case listings of ops collections.
// We list all journals, and then filter to those that the user may access.
if isOpsListing {
var resp, err = s.journalClient.List(ctx, req)
if err != nil {
return nil, err
}

// Filter journals to those the user has access to.
var filtered []pb.ListResponse_Journal
for _, j := range resp.Journals {
if isAllowedOpsJournal(claims, j.Spec.Name) {
filtered = append(filtered, j)
}
}
resp.Journals = filtered
return resp, nil
}

err = auth.EnforceSelectorPrefix(claims, req.Selector)
if err != nil {
return nil, fmt.Errorf("Unauthorized: %w", err)
}

ctx = pb.WithDispatchDefault(ctx)
return s.journalClient.List(ctx, req)
}

Expand All @@ -65,27 +94,31 @@ func (s *JournalAuthServer) ListFragments(ctx context.Context, req *pb.Fragments

err = auth.EnforcePrefix(claims, req.Journal.String())
if err != nil {
return nil, fmt.Errorf("Unauthorized: %w", err)
if !isAllowedOpsJournal(claims, req.Journal) {
return nil, fmt.Errorf("Unauthorized: %w", err)
}
}

return s.journalClient.ListFragments(ctx, req)
}

// Read implements protocol.JournalServer
func (s *JournalAuthServer) Read(readReq *pb.ReadRequest, readServer pb.Journal_ReadServer) error {
func (s *JournalAuthServer) Read(req *pb.ReadRequest, readServer pb.Journal_ReadServer) error {
ctx := readServer.Context()

claims, err := auth.AuthenticateGrpcReq(ctx, s.jwtVerificationKey)
if err != nil {
return err
}

err = auth.EnforcePrefix(claims, readReq.Journal.String())
err = auth.EnforcePrefix(claims, req.Journal.String())
if err != nil {
return fmt.Errorf("Unauthorized: %w", err)
if !isAllowedOpsJournal(claims, req.Journal) {
return fmt.Errorf("Unauthorized: %w", err)
}
}

readClient, err := s.journalClient.Read(ctx, readReq)
readClient, err := s.journalClient.Read(ctx, req)
if err != nil {
return err
}
Expand All @@ -104,4 +137,33 @@ func (s *JournalAuthServer) Replicate(pb.Journal_ReplicateServer) error {
return fmt.Errorf("Unsupported operation: `Replicate`")
}

// TODO(johnny): This authorization check is an encapsulated hack that allows
// ops logs and stats to be read-able by end users.
// It's a placeholder for a missing partition-level authorization feature.
func isAllowedOpsJournal(claims *auth.AuthorizedClaims, journal pb.Journal) bool {
var b = make([]byte, 256)

for _, oc := range allOpsCollections {
for _, kind := range []string{"capture", "derivation", "materialization"} {
for _, prefix := range claims.Prefixes {
b = append(b[:0], oc...)
b = append(b, "/kind="...)
b = append(b, kind...)
b = append(b, "/name="...)
b = labels.EncodePartitionValue(b, prefix)

if bytes.HasPrefix([]byte(journal), b) {
return true
}
}
}
}
return false
}

var allOpsCollections = []string{
"ops.us-central1.v1/logs",
"ops.us-central1.v1/stats",
}

var _ pb.JournalServer = &JournalAuthServer{}
Loading