From 4d4328dcd2def6cafbb9b82ab56d540dae5794cf Mon Sep 17 00:00:00 2001 From: Johnny Graettinger Date: Thu, 18 Jan 2024 23:35:39 +0000 Subject: [PATCH 1/3] enable self-service ops collections via special case This is a medium-term ... hack, which enables users to read the ops logs and stats of their own tasks. When reading the especially-enumerated collections, we bypass traditional prefix authorization and instead authorize over the `name` partition of the logs or stats collection. Testing: * Verified this enables the new ops view of the entity details page. * Verified that slightly perturbing how build up allowed prefixes causes unauthorized errors to be returned. * Verified that a locally-modified `flowctl` is able to list journals and fragments of task logs, as well as read them, as a regular user of a local stack. --- journal_server.go | 74 +++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 68 insertions(+), 6 deletions(-) diff --git a/journal_server.go b/journal_server.go index 49b5955..62a1e9f 100644 --- a/journal_server.go +++ b/journal_server.go @@ -1,10 +1,13 @@ package main import ( + "bytes" context "context" "fmt" + "slices" "github.com/estuary/data-plane-gateway/auth" + "github.com/estuary/flow/go/labels" log "github.com/sirupsen/logrus" pb "go.gazette.dev/core/broker/protocol" ) @@ -46,13 +49,39 @@ func (s *JournalAuthServer) List(ctx context.Context, req *pb.ListRequest) (*pb. if err != nil { return nil, err } + ctx = pb.WithDispatchDefault(ctx) + + // Is the user listing (only) ops collections? + var requested = req.Selector.Include.ValuesOf(labels.Collection) + var isOpsListing = len(requested) != 0 + for _, r := range requested { + isOpsListing = isOpsListing && slices.Contains(allOpsCollections, r) + } + + // Special-case listings of ops collections. + // We list all journals, and then filter to those that the user may access. + if isOpsListing { + var resp, err = s.journalClient.List(ctx, req) + if err != nil { + return nil, err + } + + // Filter journals to those the user has access to. + var filtered []pb.ListResponse_Journal + for _, j := range resp.Journals { + if isAllowedOpsJournal(claims, j.Spec.Name) { + filtered = append(filtered, j) + } + } + resp.Journals = filtered + return resp, nil + } err = auth.EnforceSelectorPrefix(claims, req.Selector) if err != nil { return nil, fmt.Errorf("Unauthorized: %w", err) } - ctx = pb.WithDispatchDefault(ctx) return s.journalClient.List(ctx, req) } @@ -65,14 +94,16 @@ func (s *JournalAuthServer) ListFragments(ctx context.Context, req *pb.Fragments err = auth.EnforcePrefix(claims, req.Journal.String()) if err != nil { - return nil, fmt.Errorf("Unauthorized: %w", err) + if !isAllowedOpsJournal(claims, req.Journal) { + return nil, fmt.Errorf("Unauthorized: %w", err) + } } return s.journalClient.ListFragments(ctx, req) } // Read implements protocol.JournalServer -func (s *JournalAuthServer) Read(readReq *pb.ReadRequest, readServer pb.Journal_ReadServer) error { +func (s *JournalAuthServer) Read(req *pb.ReadRequest, readServer pb.Journal_ReadServer) error { ctx := readServer.Context() claims, err := auth.AuthenticateGrpcReq(ctx, s.jwtVerificationKey) @@ -80,12 +111,14 @@ func (s *JournalAuthServer) Read(readReq *pb.ReadRequest, readServer pb.Journal_ return err } - err = auth.EnforcePrefix(claims, readReq.Journal.String()) + err = auth.EnforcePrefix(claims, req.Journal.String()) if err != nil { - return fmt.Errorf("Unauthorized: %w", err) + if !isAllowedOpsJournal(claims, req.Journal) { + return fmt.Errorf("Unauthorized: %w", err) + } } - readClient, err := s.journalClient.Read(ctx, readReq) + readClient, err := s.journalClient.Read(ctx, req) if err != nil { return err } @@ -104,4 +137,33 @@ func (s *JournalAuthServer) Replicate(pb.Journal_ReplicateServer) error { return fmt.Errorf("Unsupported operation: `Replicate`") } +// TODO(johnny): This authorization check is an encapsulated hack that allows +// ops logs and stats to be read-able by end users. +// It's a placeholder for a missing partition-level authorization feature. +func isAllowedOpsJournal(claims *auth.AuthorizedClaims, journal pb.Journal) bool { + var b = make([]byte, 256) + + for _, oc := range allOpsCollections { + for _, kind := range []string{"capture", "derivation", "materialization"} { + for _, prefix := range claims.Prefixes { + b = append(b[:0], oc...) + b = append(b, "/kind="...) + b = append(b, kind...) + b = append(b, "/name="...) + b = labels.EncodePartitionValue(b, prefix) + + if bytes.HasPrefix([]byte(journal), b) { + return true + } + } + } + } + return false +} + +var allOpsCollections = []string{ + "ops.us-central1.v1/logs", + "ops.us-central1.v1/stats", +} + var _ pb.JournalServer = &JournalAuthServer{} From 21c863b12d11d78e7d11315a93c4561bec7e32ff Mon Sep 17 00:00:00 2001 From: Johnny Graettinger Date: Fri, 19 Jan 2024 00:30:57 +0000 Subject: [PATCH 2/3] go.mod: bump go version to 1.21 --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 5bc19b2..2962b05 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/estuary/data-plane-gateway -go 1.19 +go 1.21 require ( github.com/estuary/flow v0.1.9-0.20230303181027-f65a9d7f1a89 From 9dccd6db0b75327f3d468e48f195c9001d2691a6 Mon Sep 17 00:00:00 2001 From: Johnny Graettinger Date: Fri, 19 Jan 2024 00:35:14 +0000 Subject: [PATCH 3/3] Dockerfile: update Go and debian base images --- Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index fe6ec7c..247ee5f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ # Build Stage ################################################################################ -FROM golang:1.19-buster as builder +FROM golang as builder WORKDIR /builder @@ -30,7 +30,7 @@ FROM busybox:1.34-musl as busybox # Runtime Stage ################################################################################ -FROM gcr.io/distroless/base-debian10 +FROM gcr.io/distroless/base-debian11 COPY --from=busybox /bin/sh /bin/sh