From 59e2750cd7a5515c288df93e18aef3003a0a6c7d Mon Sep 17 00:00:00 2001 From: Jan Grant Date: Mon, 14 Oct 2024 10:55:52 +0100 Subject: [PATCH 1/3] Add a GRPC HealthCheck endpoint This can be used by kubelet to monitor the pod for readiness, and as a backup by the operator to catch pipelines that've run to completion. This means that -cloud and -ai pods that conclude a pipeline run will not exit unless explicitly terminated. --- cmd/redpanda-connect-ai/main.go | 27 +++++++ cmd/redpanda-connect-cloud/main.go | 27 +++++++ go.mod | 2 +- internal/protohealth/endpoint.go | 120 +++++++++++++++++++++++++++++ 4 files changed, 175 insertions(+), 1 deletion(-) create mode 100644 internal/protohealth/endpoint.go diff --git a/cmd/redpanda-connect-ai/main.go b/cmd/redpanda-connect-ai/main.go index af38bd67c2..84fff634f6 100644 --- a/cmd/redpanda-connect-ai/main.go +++ b/cmd/redpanda-connect-ai/main.go @@ -15,7 +15,14 @@ package main import ( + "context" + "fmt" + "os" + "os/signal" + "syscall" + "github.com/redpanda-data/connect/v4/internal/cli" + "github.com/redpanda-data/connect/v4/internal/protohealth" "github.com/redpanda-data/connect/v4/public/schema" // Only import a subset of components for execution. @@ -35,5 +42,25 @@ var ( func main() { schema := schema.CloudAI(Version, DateBuilt) + status := protohealth.NewEndpoint(2999) + errC := make(chan error) + sigC := make(chan os.Signal, 1) + signal.Notify(sigC, os.Interrupt, syscall.SIGTERM) + go func() { + errC <- status.Run(context.Background()) + }() cli.InitEnterpriseCLI(BinaryName, Version, DateBuilt, schema) + select { + case <-sigC: + // External termination should not cause the pipeline to be killed + fmt.Println("received interrupt signal, not marking as complete") + return + default: + } + fmt.Println("exited without interrupt signal, marking as complete") + status.MarkDone() + select { + case <-errC: + case <-sigC: + } } diff --git a/cmd/redpanda-connect-cloud/main.go b/cmd/redpanda-connect-cloud/main.go index 54b327774c..583ecb724f 100644 --- a/cmd/redpanda-connect-cloud/main.go +++ b/cmd/redpanda-connect-cloud/main.go @@ -15,7 +15,14 @@ package main import ( + "context" + "fmt" + "os" + "os/signal" + "syscall" + "github.com/redpanda-data/connect/v4/internal/cli" + "github.com/redpanda-data/connect/v4/internal/protohealth" "github.com/redpanda-data/connect/v4/public/schema" // Only import a subset of components for execution. @@ -33,5 +40,25 @@ var ( func main() { schema := schema.Cloud(Version, DateBuilt) + status := protohealth.NewEndpoint(2999) + errC := make(chan error) + sigC := make(chan os.Signal, 1) + signal.Notify(sigC, os.Interrupt, syscall.SIGTERM) + go func() { + errC <- status.Run(context.Background()) + }() cli.InitEnterpriseCLI(BinaryName, Version, DateBuilt, schema) + select { + case <-sigC: + // External termination should not cause the pipeline to be killed + fmt.Println("received interrupt signal, not marking as complete") + return + default: + } + fmt.Println("exited without interrupt signal, marking as complete") + status.MarkDone() + select { + case <-errC: + case <-sigC: + } } diff --git a/go.mod b/go.mod index d44ce1248a..cb104cbfd2 100644 --- a/go.mod +++ b/go.mod @@ -386,7 +386,7 @@ require ( google.golang.org/genproto v0.0.0-20240708141625-4ad9e859172b // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240709173604-40e1e62336c5 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed // indirect - google.golang.org/grpc v1.66.0 // indirect + google.golang.org/grpc v1.66.0 gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/jcmturner/aescts.v1 v1.0.1 // indirect gopkg.in/jcmturner/dnsutils.v1 v1.0.1 // indirect diff --git a/internal/protohealth/endpoint.go b/internal/protohealth/endpoint.go new file mode 100644 index 0000000000..39833d1cec --- /dev/null +++ b/internal/protohealth/endpoint.go @@ -0,0 +1,120 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package protohealth + +import ( + "context" + "fmt" + "net" + "sync/atomic" + + "google.golang.org/grpc" + "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/reflection" +) + +// Endpoint hosts a grpc health endpoint at the specified port. +// No TLS is wrapped around this; it's for k8s consumption. +type Endpoint struct { + port int16 + srv *grpc.Server + running atomic.Bool + signal chan struct{} + grpc_health_v1.UnimplementedHealthServer +} + +// NewEndpoint constructs the Endpoint +func NewEndpoint(port int16) *Endpoint { + srv := grpc.NewServer() + reflection.Register(srv) + e := &Endpoint{ + port: port, + srv: srv, + signal: make(chan struct{}), + } + grpc_health_v1.RegisterHealthServer(srv, e) + + return e +} + +// Run listens on the supplied GRPC health endpoint for unencrypted connections +func (e *Endpoint) Run(ctx context.Context) error { + e.running.Store(true) + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", e.port)) + if err != nil { + return fmt.Errorf("failed to listen: %w", err) + } + errC := make(chan error, 1) + go func() { + errC <- e.srv.Serve(lis) + }() + select { + case <-ctx.Done(): + e.srv.Stop() + return ctx.Err() + case err := <-errC: + return err + } +} + +// MarkDone should be called to latch the Endpoint into "not ready" +// status. This cannot be reversed. All watchers will be notified. +func (e *Endpoint) MarkDone() { + if e.running.Swap(false) { + close(e.signal) + } +} + +// Check is the one-shot GRPC test endpoint. +func (e *Endpoint) Check(context.Context, *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) { + status := grpc_health_v1.HealthCheckResponse_NOT_SERVING + if e.running.Load() { + status = grpc_health_v1.HealthCheckResponse_SERVING + } + return &grpc_health_v1.HealthCheckResponse{ + Status: status, + }, nil +} + +// Watch is the streaming GRPC endpoint. +func (e *Endpoint) Watch(_ *grpc_health_v1.HealthCheckRequest, server grpc_health_v1.Health_WatchServer) error { + status := grpc_health_v1.HealthCheckResponse_NOT_SERVING + if e.running.Load() { + status = grpc_health_v1.HealthCheckResponse_SERVING + } + + err := server.Send(&grpc_health_v1.HealthCheckResponse{ + Status: status, + }) + if err != nil { + return err + } + + watcher := e.signal + for { + select { + case <-server.Context().Done(): + return server.Context().Err() + case <-watcher: + watcher = nil + err := server.Send(&grpc_health_v1.HealthCheckResponse{ + Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING, + }) + if err != nil { + return err + } + } + } +} From 8140bd6ceec39cee0ef21d3a111ff8c75baf2209 Mon Sep 17 00:00:00 2001 From: Ashley Jeffs Date: Mon, 4 Nov 2024 15:35:00 +0000 Subject: [PATCH 2/3] Only spin up grpc server on the run command --- cmd/redpanda-connect-ai/main.go | 5 +++++ cmd/redpanda-connect-cloud/main.go | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/cmd/redpanda-connect-ai/main.go b/cmd/redpanda-connect-ai/main.go index 84fff634f6..56732efe4f 100644 --- a/cmd/redpanda-connect-ai/main.go +++ b/cmd/redpanda-connect-ai/main.go @@ -42,6 +42,11 @@ var ( func main() { schema := schema.CloudAI(Version, DateBuilt) + if os.Args[1] != "run" { + cli.InitEnterpriseCLI(BinaryName, Version, DateBuilt, schema) + return + } + status := protohealth.NewEndpoint(2999) errC := make(chan error) sigC := make(chan os.Signal, 1) diff --git a/cmd/redpanda-connect-cloud/main.go b/cmd/redpanda-connect-cloud/main.go index 583ecb724f..9dd2a32b2d 100644 --- a/cmd/redpanda-connect-cloud/main.go +++ b/cmd/redpanda-connect-cloud/main.go @@ -40,6 +40,11 @@ var ( func main() { schema := schema.Cloud(Version, DateBuilt) + if os.Args[1] != "run" { + cli.InitEnterpriseCLI(BinaryName, Version, DateBuilt, schema) + return + } + status := protohealth.NewEndpoint(2999) errC := make(chan error) sigC := make(chan os.Signal, 1) From f020aa5a09cd8120938d01354f06f92cb96f2238 Mon Sep 17 00:00:00 2001 From: Ashley Jeffs Date: Mon, 4 Nov 2024 15:38:05 +0000 Subject: [PATCH 3/3] Add check for skinny-run --- cmd/redpanda-connect-ai/main.go | 2 +- cmd/redpanda-connect-cloud/main.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/redpanda-connect-ai/main.go b/cmd/redpanda-connect-ai/main.go index 56732efe4f..01275945c6 100644 --- a/cmd/redpanda-connect-ai/main.go +++ b/cmd/redpanda-connect-ai/main.go @@ -42,7 +42,7 @@ var ( func main() { schema := schema.CloudAI(Version, DateBuilt) - if os.Args[1] != "run" { + if len(os.Args) > 1 && os.Args[1] != "run" { cli.InitEnterpriseCLI(BinaryName, Version, DateBuilt, schema) return } diff --git a/cmd/redpanda-connect-cloud/main.go b/cmd/redpanda-connect-cloud/main.go index 9dd2a32b2d..e5c9c4a18f 100644 --- a/cmd/redpanda-connect-cloud/main.go +++ b/cmd/redpanda-connect-cloud/main.go @@ -40,7 +40,7 @@ var ( func main() { schema := schema.Cloud(Version, DateBuilt) - if os.Args[1] != "run" { + if len(os.Args) > 1 && os.Args[1] != "run" { cli.InitEnterpriseCLI(BinaryName, Version, DateBuilt, schema) return }