From 251e84f23eae3c54573bd87799055db750e50314 Mon Sep 17 00:00:00 2001 From: Derek Wang Date: Sun, 14 Jul 2024 21:00:55 -0700 Subject: [PATCH] feat: enable restful daemon client option for UX server (#1826) --- .github/workflows/ci.yaml | 2 +- .github/workflows/release.yml | 2 +- cmd/commands/server.go | 41 +- .../namespaced-numaflow-server.yaml | 6 + config/advanced-install/numaflow-server.yaml | 6 + .../numaflow-server-deployment.yaml | 6 + .../numaflow-cmd-params-config.yaml | 11 +- config/install.yaml | 6 + config/namespace-install.yaml | 6 + pkg/daemon/client/grpc_daemon_client_test.go | 477 ++++++++++ .../client/restful_daemon_client_test.go | 315 +++++++ pkg/daemon/server/service/health_status.go | 2 +- .../server/service/health_status_test.go | 814 ++++++++++++++++++ pkg/shared/clients/nats/client_pool_test.go | 19 +- pkg/shared/clients/nats/nats_client_test.go | 20 +- pkg/shared/clients/nats/options_test.go | 16 + pkg/shared/clients/nats/test/server.go | 3 +- pkg/shared/ewma/simple_ewma_test.go | 43 + pkg/shared/idlehandler/idlehandler_test.go | 16 + server/apis/v1/handler.go | 67 +- server/cmd/server/start.go | 31 +- server/cmd/server/start_test.go | 84 ++ server/routes/routes.go | 21 +- 23 files changed, 1950 insertions(+), 64 deletions(-) create mode 100644 pkg/daemon/client/grpc_daemon_client_test.go create mode 100644 pkg/daemon/client/restful_daemon_client_test.go create mode 100644 pkg/daemon/server/service/health_status_test.go create mode 100644 server/cmd/server/start_test.go diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 6862eccaac..5c476f088e 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -81,7 +81,7 @@ jobs: - name: Set up Go 1.x uses: actions/setup-go@v5 with: - go-version: '1.21' + go-version: '1.22' id: go - name: Check out code diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 534a044730..82e27e4c53 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -101,7 +101,7 @@ jobs: fi - uses: actions/setup-go@v5 with: - go-version: '1.21' + go-version: '1.22' - uses: actions/checkout@v4 - run: go install sigs.k8s.io/bom/cmd/bom@v0.2.0 - run: go install github.com/spdx/spdx-sbom-generator/cmd/generator@v0.0.13 diff --git a/cmd/commands/server.go b/cmd/commands/server.go index decd08d33c..baa839cba0 100644 --- a/cmd/commands/server.go +++ b/cmd/commands/server.go @@ -30,15 +30,16 @@ import ( func NewServerCommand() *cobra.Command { var ( - insecure bool - port int - namespaced bool - managedNamespace string - baseHref string - disableAuth bool - serverAddr string - corsAllowedOrigins string - readOnly bool + insecure bool + port int + namespaced bool + managedNamespace string + baseHref string + disableAuth bool + serverAddr string + corsAllowedOrigins string + readOnly bool + daemonClientProtocol string ) command := &cobra.Command{ @@ -52,16 +53,17 @@ func NewServerCommand() *cobra.Command { baseHref = baseHref + "/" } opts := svrcmd.ServerOptions{ - Insecure: insecure, - Port: port, - Namespaced: namespaced, - ManagedNamespace: managedNamespace, - BaseHref: baseHref, - DisableAuth: disableAuth, - DexServerAddr: common.NumaflowDexServerAddr, - ServerAddr: serverAddr, - CorsAllowedOrigins: corsAllowedOrigins, - ReadOnly: readOnly, + Insecure: insecure, + Port: port, + Namespaced: namespaced, + ManagedNamespace: managedNamespace, + BaseHref: baseHref, + DisableAuth: disableAuth, + DexServerAddr: common.NumaflowDexServerAddr, + ServerAddr: serverAddr, + CorsAllowedOrigins: corsAllowedOrigins, + ReadOnly: readOnly, + DaemonClientProtocol: daemonClientProtocol, } server := svrcmd.NewServer(opts) log := logging.NewLogger().Named("server") @@ -77,5 +79,6 @@ func NewServerCommand() *cobra.Command { command.Flags().BoolVar(&disableAuth, "disable-auth", sharedutil.LookupEnvBoolOr("NUMAFLOW_SERVER_DISABLE_AUTH", false), "Whether to disable authentication and authorization, defaults to false.") command.Flags().StringVar(&serverAddr, "server-addr", sharedutil.LookupEnvStringOr("NUMAFLOW_SERVER_ADDRESS", "https://localhost:8443"), "The external address of the Numaflow server.") command.Flags().StringVar(&corsAllowedOrigins, "cors-allowed-origins", sharedutil.LookupEnvStringOr("NUMAFLOW_SERVER_CORS_ALLOWED_ORIGINS", ""), "The values for allowed cors AllowOrigins header field, separated by comma.") + command.Flags().StringVar(&daemonClientProtocol, "daemon-client-protocol", sharedutil.LookupEnvStringOr("NUMAFLOW_SERVER_DAEMON_CLIENT_PROTOCOL", "grpc"), "The protocol used to connect to the Pipeline daemon service from Numaflow UX server, defaults to 'grpc'.") return command } diff --git a/config/advanced-install/namespaced-numaflow-server.yaml b/config/advanced-install/namespaced-numaflow-server.yaml index 915d972e27..f8da8ffffe 100644 --- a/config/advanced-install/namespaced-numaflow-server.yaml +++ b/config/advanced-install/namespaced-numaflow-server.yaml @@ -258,6 +258,12 @@ spec: key: server.cors.allowed.origins name: numaflow-cmd-params-config optional: true + - name: NUMAFLOW_SERVER_DAEMON_CLIENT_PROTOCOL + valueFrom: + configMapKeyRef: + key: server.daemon.client.protocol + name: numaflow-cmd-params-config + optional: true image: quay.io/numaproj/numaflow:latest imagePullPolicy: Always livenessProbe: diff --git a/config/advanced-install/numaflow-server.yaml b/config/advanced-install/numaflow-server.yaml index baaa91cbf6..b92bc89f69 100644 --- a/config/advanced-install/numaflow-server.yaml +++ b/config/advanced-install/numaflow-server.yaml @@ -269,6 +269,12 @@ spec: key: server.cors.allowed.origins name: numaflow-cmd-params-config optional: true + - name: NUMAFLOW_SERVER_DAEMON_CLIENT_PROTOCOL + valueFrom: + configMapKeyRef: + key: server.daemon.client.protocol + name: numaflow-cmd-params-config + optional: true image: quay.io/numaproj/numaflow:latest imagePullPolicy: Always livenessProbe: diff --git a/config/base/numaflow-server/numaflow-server-deployment.yaml b/config/base/numaflow-server/numaflow-server-deployment.yaml index 92d5c57db1..c07228640d 100644 --- a/config/base/numaflow-server/numaflow-server-deployment.yaml +++ b/config/base/numaflow-server/numaflow-server-deployment.yaml @@ -138,6 +138,12 @@ spec: name: numaflow-cmd-params-config key: server.cors.allowed.origins optional: true + - name: NUMAFLOW_SERVER_DAEMON_CLIENT_PROTOCOL + valueFrom: + configMapKeyRef: + name: numaflow-cmd-params-config + key: server.daemon.client.protocol + optional: true resources: limits: cpu: 500m diff --git a/config/base/shared-config/numaflow-cmd-params-config.yaml b/config/base/shared-config/numaflow-cmd-params-config.yaml index c85cce16ba..81e8c2acb7 100644 --- a/config/base/shared-config/numaflow-cmd-params-config.yaml +++ b/config/base/shared-config/numaflow-cmd-params-config.yaml @@ -18,7 +18,7 @@ data: # controller.leader.election.lease.duration: 15s # ### The duration that the acting controlplane will retry refreshing leadership before giving up. - # Default is 10 seconds. + # Default value is 10 seconds. # The configuration has to be: lease.duration > lease.renew.deadline > lease.renew.period # controller.leader.election.lease.renew.deadline: 10s # @@ -45,6 +45,11 @@ data: ### The external address of the Numaflow server. This is needed when using Dex for authentication. # server.address: https://localhost:8443 # - ### The list of allowed origins for CORS on Numaflow server, separated by a comma, defaults to ''. + ### The list of allowed origins for CORS on Numaflow UX server, separated by a comma, defaults to ''. # For example: server.cors.allowed.origins: "http://localhost:3000,http://localhost:3001" - # server.cors.allowed.origins: "" \ No newline at end of file + # server.cors.allowed.origins: "" + # + ### The protocol used to connect to the Pipeline daemon service from Numaflow UX server. + # Could be either 'grpc' or 'http', defaults to 'grpc'. + # + # server.daemon.client.protocol: grpc \ No newline at end of file diff --git a/config/install.yaml b/config/install.yaml index ccbdddcfe5..8baa59fc13 100644 --- a/config/install.yaml +++ b/config/install.yaml @@ -18671,6 +18671,12 @@ spec: key: server.cors.allowed.origins name: numaflow-cmd-params-config optional: true + - name: NUMAFLOW_SERVER_DAEMON_CLIENT_PROTOCOL + valueFrom: + configMapKeyRef: + key: server.daemon.client.protocol + name: numaflow-cmd-params-config + optional: true image: quay.io/numaproj/numaflow:latest imagePullPolicy: Always livenessProbe: diff --git a/config/namespace-install.yaml b/config/namespace-install.yaml index 43d2d66c97..d3c78ef911 100644 --- a/config/namespace-install.yaml +++ b/config/namespace-install.yaml @@ -18563,6 +18563,12 @@ spec: key: server.cors.allowed.origins name: numaflow-cmd-params-config optional: true + - name: NUMAFLOW_SERVER_DAEMON_CLIENT_PROTOCOL + valueFrom: + configMapKeyRef: + key: server.daemon.client.protocol + name: numaflow-cmd-params-config + optional: true image: quay.io/numaproj/numaflow:latest imagePullPolicy: Always livenessProbe: diff --git a/pkg/daemon/client/grpc_daemon_client_test.go b/pkg/daemon/client/grpc_daemon_client_test.go new file mode 100644 index 0000000000..d1bca6e18a --- /dev/null +++ b/pkg/daemon/client/grpc_daemon_client_test.go @@ -0,0 +1,477 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package client + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/wrapperspb" + + "github.com/numaproj/numaflow/pkg/apis/proto/daemon" +) + +type mockDaemonServiceClient struct { + mock.Mock +} + +func (m *mockDaemonServiceClient) ListBuffers(ctx context.Context, in *daemon.ListBuffersRequest, opts ...grpc.CallOption) (*daemon.ListBuffersResponse, error) { + args := m.Called(ctx, in, opts) + return args.Get(0).(*daemon.ListBuffersResponse), args.Error(1) +} + +func (m *mockDaemonServiceClient) GetBuffer(ctx context.Context, in *daemon.GetBufferRequest, opts ...grpc.CallOption) (*daemon.GetBufferResponse, error) { + args := m.Called(ctx, in, opts) + return args.Get(0).(*daemon.GetBufferResponse), args.Error(1) +} + +func (m *mockDaemonServiceClient) GetVertexMetrics(ctx context.Context, in *daemon.GetVertexMetricsRequest, opts ...grpc.CallOption) (*daemon.GetVertexMetricsResponse, error) { + args := m.Called(ctx, in, opts) + return args.Get(0).(*daemon.GetVertexMetricsResponse), args.Error(1) +} + +func (m *mockDaemonServiceClient) GetPipelineWatermarks(ctx context.Context, in *daemon.GetPipelineWatermarksRequest, opts ...grpc.CallOption) (*daemon.GetPipelineWatermarksResponse, error) { + args := m.Called(ctx, in, opts) + return args.Get(0).(*daemon.GetPipelineWatermarksResponse), args.Error(1) +} + +func (m *mockDaemonServiceClient) GetPipelineStatus(ctx context.Context, in *daemon.GetPipelineStatusRequest, opts ...grpc.CallOption) (*daemon.GetPipelineStatusResponse, error) { + args := m.Called(ctx, in, opts) + return args.Get(0).(*daemon.GetPipelineStatusResponse), args.Error(1) +} + +func TestGrpcDaemonClient_ListPipelineBuffers(t *testing.T) { + t.Run("successful listing", func(t *testing.T) { + mockClient := new(mockDaemonServiceClient) + dc := &grpcDaemonClient{client: mockClient} + + expectedBuffers := []*daemon.BufferInfo{ + {Pipeline: "test-pipeline", BufferName: "buffer1"}, + {Pipeline: "test-pipeline", BufferName: "buffer2"}, + } + + mockClient.On("ListBuffers", mock.Anything, &daemon.ListBuffersRequest{Pipeline: "test-pipeline"}, mock.Anything). + Return(&daemon.ListBuffersResponse{Buffers: expectedBuffers}, nil) + + buffers, err := dc.ListPipelineBuffers(context.Background(), "test-pipeline") + assert.NoError(t, err) + assert.Equal(t, expectedBuffers, buffers) + }) + + t.Run("empty buffer list", func(t *testing.T) { + mockClient := new(mockDaemonServiceClient) + dc := &grpcDaemonClient{client: mockClient} + + mockClient.On("ListBuffers", mock.Anything, &daemon.ListBuffersRequest{Pipeline: "test-pipeline"}, mock.Anything). + Return(&daemon.ListBuffersResponse{Buffers: []*daemon.BufferInfo{}}, nil) + + buffers, err := dc.ListPipelineBuffers(context.Background(), "test-pipeline") + assert.NoError(t, err) + assert.Empty(t, buffers) + }) + + t.Run("error from client", func(t *testing.T) { + mockClient := new(mockDaemonServiceClient) + dc := &grpcDaemonClient{client: mockClient} + + expectedError := errors.New("client error") + mockClient.On("ListBuffers", mock.Anything, &daemon.ListBuffersRequest{Pipeline: "test-pipeline"}, mock.Anything). + Return((*daemon.ListBuffersResponse)(nil), expectedError) + + buffers, err := dc.ListPipelineBuffers(context.Background(), "test-pipeline") + assert.Error(t, err) + assert.Equal(t, expectedError, err) + assert.Nil(t, buffers) + }) + + t.Run("context cancellation", func(t *testing.T) { + mockClient := new(mockDaemonServiceClient) + dc := &grpcDaemonClient{client: mockClient} + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + mockClient.On("ListBuffers", mock.Anything, &daemon.ListBuffersRequest{Pipeline: "test-pipeline"}, mock.Anything). + Return((*daemon.ListBuffersResponse)(nil), context.Canceled) + + buffers, err := dc.ListPipelineBuffers(ctx, "test-pipeline") + assert.Error(t, err) + assert.Equal(t, context.Canceled, err) + assert.Nil(t, buffers) + }) +} + +func TestGrpcDaemonClient_GetPipelineWatermarks(t *testing.T) { + t.Run("successful retrieval", func(t *testing.T) { + mockClient := new(mockDaemonServiceClient) + dc := &grpcDaemonClient{client: mockClient} + + expectedWatermarks := []*daemon.EdgeWatermark{ + {Pipeline: "test-pipeline", Edge: "edge1", Watermarks: []*wrapperspb.Int64Value{wrapperspb.Int64(1000), wrapperspb.Int64(2000)}}, + {Pipeline: "test-pipeline", Edge: "edge2", Watermarks: []*wrapperspb.Int64Value{wrapperspb.Int64(1000), wrapperspb.Int64(3000)}}, + } + + mockClient.On("GetPipelineWatermarks", mock.Anything, &daemon.GetPipelineWatermarksRequest{Pipeline: "test-pipeline"}, mock.Anything). + Return(&daemon.GetPipelineWatermarksResponse{PipelineWatermarks: expectedWatermarks}, nil) + + watermarks, err := dc.GetPipelineWatermarks(context.Background(), "test-pipeline") + assert.NoError(t, err) + assert.Equal(t, expectedWatermarks, watermarks) + }) + + t.Run("empty watermarks", func(t *testing.T) { + mockClient := new(mockDaemonServiceClient) + dc := &grpcDaemonClient{client: mockClient} + + mockClient.On("GetPipelineWatermarks", mock.Anything, &daemon.GetPipelineWatermarksRequest{Pipeline: "test-pipeline"}, mock.Anything). + Return(&daemon.GetPipelineWatermarksResponse{PipelineWatermarks: []*daemon.EdgeWatermark{}}, nil) + + watermarks, err := dc.GetPipelineWatermarks(context.Background(), "test-pipeline") + assert.NoError(t, err) + assert.Empty(t, watermarks) + }) + + t.Run("client error", func(t *testing.T) { + mockClient := new(mockDaemonServiceClient) + dc := &grpcDaemonClient{client: mockClient} + + expectedError := errors.New("client error") + mockClient.On("GetPipelineWatermarks", mock.Anything, &daemon.GetPipelineWatermarksRequest{Pipeline: "test-pipeline"}, mock.Anything). + Return((*daemon.GetPipelineWatermarksResponse)(nil), expectedError) + + watermarks, err := dc.GetPipelineWatermarks(context.Background(), "test-pipeline") + assert.Error(t, err) + assert.Equal(t, expectedError, err) + assert.Nil(t, watermarks) + }) + + t.Run("context cancellation", func(t *testing.T) { + mockClient := new(mockDaemonServiceClient) + dc := &grpcDaemonClient{client: mockClient} + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + mockClient.On("GetPipelineWatermarks", mock.Anything, &daemon.GetPipelineWatermarksRequest{Pipeline: "test-pipeline"}, mock.Anything). + Return((*daemon.GetPipelineWatermarksResponse)(nil), context.Canceled) + + watermarks, err := dc.GetPipelineWatermarks(ctx, "test-pipeline") + assert.Error(t, err) + assert.Equal(t, context.Canceled, err) + assert.Nil(t, watermarks) + }) +} + +func TestGrpcDaemonClient_GetPipelineBuffer(t *testing.T) { + t.Run("successful retrieval", func(t *testing.T) { + mockClient := new(mockDaemonServiceClient) + dc := &grpcDaemonClient{client: mockClient} + + expectedBuffer := &daemon.BufferInfo{ + Pipeline: "test-pipeline", + BufferName: "test-buffer", + PendingCount: &wrapperspb.Int64Value{Value: 10}, + AckPendingCount: &wrapperspb.Int64Value{Value: 5}, + } + + mockClient.On("GetBuffer", mock.Anything, &daemon.GetBufferRequest{Pipeline: "test-pipeline", Buffer: "test-buffer"}, mock.Anything). + Return(&daemon.GetBufferResponse{Buffer: expectedBuffer}, nil) + + buffer, err := dc.GetPipelineBuffer(context.Background(), "test-pipeline", "test-buffer") + assert.NoError(t, err) + assert.Equal(t, expectedBuffer, buffer) + }) + + t.Run("buffer not found", func(t *testing.T) { + mockClient := new(mockDaemonServiceClient) + dc := &grpcDaemonClient{client: mockClient} + + mockClient.On("GetBuffer", mock.Anything, &daemon.GetBufferRequest{Pipeline: "test-pipeline", Buffer: "non-existent-buffer"}, mock.Anything). + Return((*daemon.GetBufferResponse)(nil), errors.New("buffer not found")) + + buffer, err := dc.GetPipelineBuffer(context.Background(), "test-pipeline", "non-existent-buffer") + assert.Error(t, err) + assert.Nil(t, buffer) + assert.Contains(t, err.Error(), "buffer not found") + }) + + t.Run("client error", func(t *testing.T) { + mockClient := new(mockDaemonServiceClient) + dc := &grpcDaemonClient{client: mockClient} + + expectedError := errors.New("client error") + mockClient.On("GetBuffer", mock.Anything, &daemon.GetBufferRequest{Pipeline: "test-pipeline", Buffer: "test-buffer"}, mock.Anything). + Return((*daemon.GetBufferResponse)(nil), expectedError) + + buffer, err := dc.GetPipelineBuffer(context.Background(), "test-pipeline", "test-buffer") + assert.Error(t, err) + assert.Equal(t, expectedError, err) + assert.Nil(t, buffer) + }) + + t.Run("context cancellation", func(t *testing.T) { + mockClient := new(mockDaemonServiceClient) + dc := &grpcDaemonClient{client: mockClient} + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + mockClient.On("GetBuffer", mock.Anything, &daemon.GetBufferRequest{Pipeline: "test-pipeline", Buffer: "test-buffer"}, mock.Anything). + Return((*daemon.GetBufferResponse)(nil), context.Canceled) + + buffer, err := dc.GetPipelineBuffer(ctx, "test-pipeline", "test-buffer") + assert.Error(t, err) + assert.Equal(t, context.Canceled, err) + assert.Nil(t, buffer) + }) +} + +func TestGrpcDaemonClient_GetVertexMetrics(t *testing.T) { + t.Run("successful retrieval", func(t *testing.T) { + mockClient := new(mockDaemonServiceClient) + dc := &grpcDaemonClient{client: mockClient} + + expectedMetrics := []*daemon.VertexMetrics{ + {Vertex: "vertex1", ProcessingRates: map[string]*wrapperspb.DoubleValue{"1m": wrapperspb.Double(100)}}, + {Vertex: "vertex1", ProcessingRates: map[string]*wrapperspb.DoubleValue{"1m": wrapperspb.Double(200)}}, + } + + mockClient.On("GetVertexMetrics", mock.Anything, &daemon.GetVertexMetricsRequest{Pipeline: "test-pipeline", Vertex: "vertex1"}, mock.Anything). + Return(&daemon.GetVertexMetricsResponse{VertexMetrics: expectedMetrics}, nil) + + metrics, err := dc.GetVertexMetrics(context.Background(), "test-pipeline", "vertex1") + assert.NoError(t, err) + assert.Equal(t, expectedMetrics, metrics) + }) + + t.Run("empty metrics", func(t *testing.T) { + mockClient := new(mockDaemonServiceClient) + dc := &grpcDaemonClient{client: mockClient} + + mockClient.On("GetVertexMetrics", mock.Anything, &daemon.GetVertexMetricsRequest{Pipeline: "test-pipeline", Vertex: "vertex1"}, mock.Anything). + Return(&daemon.GetVertexMetricsResponse{VertexMetrics: []*daemon.VertexMetrics{}}, nil) + + metrics, err := dc.GetVertexMetrics(context.Background(), "test-pipeline", "vertex1") + assert.NoError(t, err) + assert.Empty(t, metrics) + }) + + t.Run("client error", func(t *testing.T) { + mockClient := new(mockDaemonServiceClient) + dc := &grpcDaemonClient{client: mockClient} + + expectedError := errors.New("client error") + mockClient.On("GetVertexMetrics", mock.Anything, &daemon.GetVertexMetricsRequest{Pipeline: "test-pipeline", Vertex: "vertex1"}, mock.Anything). + Return((*daemon.GetVertexMetricsResponse)(nil), expectedError) + + metrics, err := dc.GetVertexMetrics(context.Background(), "test-pipeline", "vertex1") + assert.Error(t, err) + assert.Equal(t, expectedError, err) + assert.Nil(t, metrics) + }) + + t.Run("context cancellation", func(t *testing.T) { + mockClient := new(mockDaemonServiceClient) + dc := &grpcDaemonClient{client: mockClient} + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + mockClient.On("GetVertexMetrics", mock.Anything, &daemon.GetVertexMetricsRequest{Pipeline: "test-pipeline", Vertex: "vertex1"}, mock.Anything). + Return((*daemon.GetVertexMetricsResponse)(nil), context.Canceled) + + metrics, err := dc.GetVertexMetrics(ctx, "test-pipeline", "vertex1") + assert.Error(t, err) + assert.Equal(t, context.Canceled, err) + assert.Nil(t, metrics) + }) +} + +func TestGrpcDaemonClient_GetPipelineStatus(t *testing.T) { + t.Run("successful retrieval", func(t *testing.T) { + mockClient := new(mockDaemonServiceClient) + dc := &grpcDaemonClient{client: mockClient} + + expectedStatus := &daemon.PipelineStatus{ + Status: "Running", + Message: "okay", + } + + mockClient.On("GetPipelineStatus", mock.Anything, &daemon.GetPipelineStatusRequest{Pipeline: "test-pipeline"}, mock.Anything). + Return(&daemon.GetPipelineStatusResponse{Status: expectedStatus}, nil) + + status, err := dc.GetPipelineStatus(context.Background(), "test-pipeline") + assert.NoError(t, err) + assert.Equal(t, expectedStatus, status) + }) + + t.Run("pipeline not found", func(t *testing.T) { + mockClient := new(mockDaemonServiceClient) + dc := &grpcDaemonClient{client: mockClient} + + mockClient.On("GetPipelineStatus", mock.Anything, &daemon.GetPipelineStatusRequest{Pipeline: "non-existent-pipeline"}, mock.Anything). + Return((*daemon.GetPipelineStatusResponse)(nil), errors.New("pipeline not found")) + + status, err := dc.GetPipelineStatus(context.Background(), "non-existent-pipeline") + assert.Error(t, err) + assert.Nil(t, status) + assert.Contains(t, err.Error(), "pipeline not found") + }) + + t.Run("client error", func(t *testing.T) { + mockClient := new(mockDaemonServiceClient) + dc := &grpcDaemonClient{client: mockClient} + + expectedError := errors.New("client error") + mockClient.On("GetPipelineStatus", mock.Anything, &daemon.GetPipelineStatusRequest{Pipeline: "test-pipeline"}, mock.Anything). + Return((*daemon.GetPipelineStatusResponse)(nil), expectedError) + + status, err := dc.GetPipelineStatus(context.Background(), "test-pipeline") + assert.Error(t, err) + assert.Equal(t, expectedError, err) + assert.Nil(t, status) + }) + + t.Run("context cancellation", func(t *testing.T) { + mockClient := new(mockDaemonServiceClient) + dc := &grpcDaemonClient{client: mockClient} + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + mockClient.On("GetPipelineStatus", mock.Anything, &daemon.GetPipelineStatusRequest{Pipeline: "test-pipeline"}, mock.Anything). + Return((*daemon.GetPipelineStatusResponse)(nil), context.Canceled) + + status, err := dc.GetPipelineStatus(ctx, "test-pipeline") + assert.Error(t, err) + assert.Equal(t, context.Canceled, err) + assert.Nil(t, status) + }) +} +func TestGrpcDaemonClient_IsDrained(t *testing.T) { + t.Run("all buffers empty", func(t *testing.T) { + mockClient := new(mockDaemonServiceClient) + dc := &grpcDaemonClient{client: mockClient} + mockClient.On("ListBuffers", mock.Anything, &daemon.ListBuffersRequest{Pipeline: "test-pipeline"}, mock.Anything). + Return(&daemon.ListBuffersResponse{ + Buffers: []*daemon.BufferInfo{ + {PendingCount: &wrapperspb.Int64Value{Value: 0}, AckPendingCount: &wrapperspb.Int64Value{Value: 0}}, + {PendingCount: &wrapperspb.Int64Value{Value: 0}, AckPendingCount: &wrapperspb.Int64Value{Value: 0}}, + }, + }, nil) + + isDrained, err := dc.IsDrained(context.Background(), "test-pipeline") + assert.NoError(t, err) + assert.True(t, isDrained) + }) + + t.Run("one buffer with pending count", func(t *testing.T) { + mockClient := new(mockDaemonServiceClient) + dc := &grpcDaemonClient{client: mockClient} + mockClient.On("ListBuffers", mock.Anything, &daemon.ListBuffersRequest{Pipeline: "test-pipeline"}, mock.Anything). + Return(&daemon.ListBuffersResponse{ + Buffers: []*daemon.BufferInfo{ + {PendingCount: &wrapperspb.Int64Value{Value: 1}, AckPendingCount: &wrapperspb.Int64Value{Value: 0}}, + {PendingCount: &wrapperspb.Int64Value{Value: 0}, AckPendingCount: &wrapperspb.Int64Value{Value: 0}}, + }, + }, nil) + + isDrained, err := dc.IsDrained(context.Background(), "test-pipeline") + assert.NoError(t, err) + assert.False(t, isDrained) + }) + + t.Run("one buffer with ack pending count", func(t *testing.T) { + mockClient := new(mockDaemonServiceClient) + dc := &grpcDaemonClient{client: mockClient} + mockClient.On("ListBuffers", mock.Anything, &daemon.ListBuffersRequest{Pipeline: "test-pipeline"}, mock.Anything). + Return(&daemon.ListBuffersResponse{ + Buffers: []*daemon.BufferInfo{ + {PendingCount: &wrapperspb.Int64Value{Value: 0}, AckPendingCount: &wrapperspb.Int64Value{Value: 1}}, + {PendingCount: &wrapperspb.Int64Value{Value: 0}, AckPendingCount: &wrapperspb.Int64Value{Value: 0}}, + }, + }, nil) + + isDrained, err := dc.IsDrained(context.Background(), "test-pipeline") + assert.NoError(t, err) + assert.False(t, isDrained) + }) + + t.Run("error from ListBuffers", func(t *testing.T) { + mockClient := new(mockDaemonServiceClient) + dc := &grpcDaemonClient{client: mockClient} + expectedError := errors.New("list buffers error") + mockClient.On("ListBuffers", mock.Anything, &daemon.ListBuffersRequest{Pipeline: "test-pipeline"}, mock.Anything). + Return((*daemon.ListBuffersResponse)(nil), expectedError) + + isDrained, err := dc.IsDrained(context.Background(), "test-pipeline") + assert.Error(t, err) + assert.Equal(t, expectedError, err) + assert.False(t, isDrained) + }) + + t.Run("empty buffer list", func(t *testing.T) { + mockClient := new(mockDaemonServiceClient) + dc := &grpcDaemonClient{client: mockClient} + mockClient.On("ListBuffers", mock.Anything, &daemon.ListBuffersRequest{Pipeline: "test-pipeline"}, mock.Anything). + Return(&daemon.ListBuffersResponse{ + Buffers: []*daemon.BufferInfo{}, + }, nil) + + isDrained, err := dc.IsDrained(context.Background(), "test-pipeline") + assert.NoError(t, err) + assert.True(t, isDrained) + }) +} + +func TestGrpcDaemonClient_Close(t *testing.T) { + + t.Run("nil connection", func(t *testing.T) { + dc := &grpcDaemonClient{conn: nil} + err := dc.Close() + + assert.NoError(t, err) + }) +} + +func TestNewGRPCDaemonServiceClient(t *testing.T) { + t.Run("successful connection", func(t *testing.T) { + address := "localhost:8080" + client, err := NewGRPCDaemonServiceClient(address) + assert.NoError(t, err) + assert.NotNil(t, client) + + grpcClient, ok := client.(*grpcDaemonClient) + assert.True(t, ok) + assert.NotNil(t, grpcClient.conn) + assert.NotNil(t, grpcClient.client) + + err = client.Close() + assert.NoError(t, err) + }) + + t.Run("empty address", func(t *testing.T) { + address := "" + client, err := NewGRPCDaemonServiceClient(address) + assert.Error(t, err) + assert.Nil(t, client) + }) +} diff --git a/pkg/daemon/client/restful_daemon_client_test.go b/pkg/daemon/client/restful_daemon_client_test.go new file mode 100644 index 0000000000..9aedf3846a --- /dev/null +++ b/pkg/daemon/client/restful_daemon_client_test.go @@ -0,0 +1,315 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package client + +import ( + "context" + "fmt" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestRestfulDaemonClient_IsDrained(t *testing.T) { + t.Run("not drained", func(t *testing.T) { + server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"buffers":[{"pipeline":"simple-pipeline","bufferName":"numaflow-system-simple-pipeline-cat-0","pendingCount":"0","ackPendingCount":"5","totalMessages":"5","bufferLength":"30000","bufferUsageLimit":0.8,"bufferUsage":0.00016666666666666666,"isFull":false},{"pipeline":"simple-pipeline","bufferName":"numaflow-system-simple-pipeline-out-0","pendingCount":"0","ackPendingCount":"0","totalMessages":"0","bufferLength":"30000","bufferUsageLimit":0.8,"bufferUsage":0,"isFull":false}]}`)) + })) + defer server.Close() + + client, _ := NewRESTfulDaemonServiceClient(server.URL) + isDrained, err := client.IsDrained(context.Background(), "simple-pipeline") + assert.NoError(t, err) + assert.False(t, isDrained) + }) + + t.Run("drained", func(t *testing.T) { + server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"buffers":[{"pipeline":"simple-pipeline","bufferName":"numaflow-system-simple-pipeline-cat-0","pendingCount":"0","ackPendingCount":"0","totalMessages":"5","bufferLength":"30000","bufferUsageLimit":0.8,"bufferUsage":0,"isFull":false},{"pipeline":"simple-pipeline","bufferName":"numaflow-system-simple-pipeline-out-0","pendingCount":"0","ackPendingCount":"0","totalMessages":"0","bufferLength":"30000","bufferUsageLimit":0.8,"bufferUsage":0,"isFull":false}]}`)) + })) + defer server.Close() + + client, _ := NewRESTfulDaemonServiceClient(server.URL) + isDrained, err := client.IsDrained(context.Background(), "simple-pipeline") + assert.NoError(t, err) + assert.True(t, isDrained) + }) +} + +func TestRestfulDaemonClient_ListPipelineBuffers(t *testing.T) { + t.Run("error case", func(t *testing.T) { + server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`invalid json`)) + })) + defer server.Close() + + client, _ := NewRESTfulDaemonServiceClient(server.URL) + _, err := client.ListPipelineBuffers(context.Background(), "simple-pipeline") + assert.Error(t, err) + }) + + t.Run("okay", func(t *testing.T) { + server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"buffers":[{"pipeline":"simple-pipeline","bufferName":"numaflow-system-simple-pipeline-cat-0","pendingCount":"0","ackPendingCount":"5","totalMessages":"5","bufferLength":"30000","bufferUsageLimit":0.8,"bufferUsage":0.00016666666666666666,"isFull":false},{"pipeline":"simple-pipeline","bufferName":"numaflow-system-simple-pipeline-out-0","pendingCount":"0","ackPendingCount":"0","totalMessages":"0","bufferLength":"30000","bufferUsageLimit":0.8,"bufferUsage":0,"isFull":false}]}`)) + })) + defer server.Close() + + client, _ := NewRESTfulDaemonServiceClient(server.URL) + buffers, err := client.ListPipelineBuffers(context.Background(), "simple-pipeline") + assert.NoError(t, err) + assert.Len(t, buffers, 2) + assert.Equal(t, "numaflow-system-simple-pipeline-cat-0", buffers[0].BufferName) + assert.Equal(t, "numaflow-system-simple-pipeline-out-0", buffers[1].BufferName) + }) +} + +func TestRestfulDaemonClient_GetPipelineBuffer(t *testing.T) { + t.Run("error case", func(t *testing.T) { + server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`invalid json`)) + })) + defer server.Close() + + client, _ := NewRESTfulDaemonServiceClient(server.URL) + _, err := client.GetPipelineBuffer(context.Background(), "simple-pipeline", "numaflow-system-simple-pipeline-cat-0") + assert.Error(t, err) + }) + + t.Run("okay", func(t *testing.T) { + server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"buffer":{"pipeline":"simple-pipeline","bufferName":"numaflow-system-simple-pipeline-cat-0","pendingCount":"0","ackPendingCount":"0","totalMessages":"0","bufferLength":"30000","bufferUsageLimit":0.8,"bufferUsage":0,"isFull":false}}`)) + })) + defer server.Close() + + client, _ := NewRESTfulDaemonServiceClient(server.URL) + buffer, err := client.GetPipelineBuffer(context.Background(), "simple-pipeline", "numaflow-system-simple-pipeline-cat-0") + assert.NoError(t, err) + assert.NotNil(t, buffer) + assert.Equal(t, "numaflow-system-simple-pipeline-cat-0", buffer.BufferName) + assert.Equal(t, int64(0), buffer.PendingCount.GetValue()) + }) +} + +func TestRestfulDaemonClient_GetVertexMetrics(t *testing.T) { + t.Run("error case", func(t *testing.T) { + server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`invalid json`)) + })) + defer server.Close() + + client, _ := NewRESTfulDaemonServiceClient(server.URL) + _, err := client.GetVertexMetrics(context.Background(), "simple-pipeline", "out") + assert.Error(t, err) + }) + + t.Run("okay", func(t *testing.T) { + server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"vertexMetrics":[{"pipeline":"simple-pipeline","vertex":"out","processingRates":{"15m":5.007407407407407,"1m":5.083333333333333,"5m":5.007407407407407,"default":5.033333333333333},"pendings":{"15m":"0","1m":"0","5m":"0","default":"0"}}]}`)) + })) + defer server.Close() + + client, _ := NewRESTfulDaemonServiceClient(server.URL) + metrics, err := client.GetVertexMetrics(context.Background(), "simple-pipeline", "out") + assert.NoError(t, err) + assert.Len(t, metrics, 1) + assert.Equal(t, "simple-pipeline", metrics[0].Pipeline) + assert.Equal(t, "out", metrics[0].Vertex) + }) +} + +func TestRestfulDaemonClient_GetPipelineWatermarks(t *testing.T) { + t.Run("error case", func(t *testing.T) { + server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`invalid json`)) + })) + defer server.Close() + client, _ := NewRESTfulDaemonServiceClient(server.URL) + _, err := client.GetPipelineWatermarks(context.Background(), "test-pipeline") + assert.Error(t, err) + }) + + t.Run("okay", func(t *testing.T) { + server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"pipelineWatermarks":[{"pipeline":"simple-pipeline","edge":"in-cat","watermarks":["1720992797298"],"isWatermarkEnabled":true,"from":"in","to":"cat"},{"pipeline":"simple-pipeline","edge":"cat-out","watermarks":["1720992796298"],"isWatermarkEnabled":true,"from":"cat","to":"out"}]}`)) + })) + defer server.Close() + + client, _ := NewRESTfulDaemonServiceClient(server.URL) + watermarks, err := client.GetPipelineWatermarks(context.Background(), "test-pipeline") + assert.NoError(t, err) + assert.Len(t, watermarks, 2) + assert.Equal(t, "in-cat", watermarks[0].Edge) + assert.Equal(t, int64(1720992797298), watermarks[0].Watermarks[0].GetValue()) + assert.Equal(t, "cat-out", watermarks[1].Edge) + assert.Equal(t, int64(1720992796298), watermarks[1].Watermarks[0].GetValue()) + }) +} + +func TestRestfulDaemonClient_GetPipelineStatus(t *testing.T) { + t.Run("error case", func(t *testing.T) { + server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`invalid json`)) + })) + defer server.Close() + client, _ := NewRESTfulDaemonServiceClient(server.URL) + _, err := client.GetPipelineStatus(context.Background(), "test-pipeline") + assert.Error(t, err) + }) + + t.Run("okay", func(t *testing.T) { + server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"status":{"status":"healthy","message":"Pipeline data flow is healthy","code":"D1"}}`)) + })) + defer server.Close() + + client, _ := NewRESTfulDaemonServiceClient(server.URL) + status, err := client.GetPipelineStatus(context.Background(), "test-pipeline") + assert.NoError(t, err) + assert.NotNil(t, status) + assert.Equal(t, "healthy", status.Status) + }) +} + +func TestRestfulDaemonClient_Close(t *testing.T) { + t.Run("close without error", func(t *testing.T) { + client := &restfulDaemonClient{} + err := client.Close() + assert.NoError(t, err) + }) +} + +func TestUnmarshalResponse(t *testing.T) { + t.Run("successful unmarshal", func(t *testing.T) { + response := &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader(`{"name":"test","value":123}`)), + } + type testStruct struct { + Name string `json:"name"` + Value int `json:"value"` + } + result, err := unmarshalResponse[testStruct](response) + assert.NoError(t, err) + assert.NotNil(t, result) + assert.Equal(t, "test", result.Name) + assert.Equal(t, 123, result.Value) + }) + + t.Run("error status code", func(t *testing.T) { + response := &http.Response{ + StatusCode: http.StatusBadRequest, + Status: "400 Bad Request", + } + result, err := unmarshalResponse[struct{}](response) + assert.Error(t, err) + assert.Nil(t, result) + assert.Contains(t, err.Error(), "unexpected response 400") + }) + + t.Run("error reading body", func(t *testing.T) { + response := &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(errReader(0)), + } + result, err := unmarshalResponse[struct{}](response) + assert.Error(t, err) + assert.Nil(t, result) + assert.Contains(t, err.Error(), "failed to read data from response body") + }) + + t.Run("error unmarshalling", func(t *testing.T) { + response := &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader(`{"invalid json"`)), + } + result, err := unmarshalResponse[struct{}](response) + assert.Error(t, err) + assert.Nil(t, result) + assert.Contains(t, err.Error(), "failed to unmarshal response body") + }) +} + +type errReader int + +func (errReader) Read(p []byte) (n int, err error) { + return 0, fmt.Errorf("test error") +} + +func TestNewRESTfulDaemonServiceClient(t *testing.T) { + t.Run("with https prefix", func(t *testing.T) { + address := "https://example.com" + client, err := NewRESTfulDaemonServiceClient(address) + assert.NoError(t, err) + assert.NotNil(t, client) + + restfulClient, ok := client.(*restfulDaemonClient) + assert.True(t, ok) + assert.Equal(t, address, restfulClient.hostURL) + assert.NotNil(t, restfulClient.httpClient) + assert.Equal(t, time.Second*1, restfulClient.httpClient.Timeout) + }) + + t.Run("without https prefix", func(t *testing.T) { + address := "example.com" + client, err := NewRESTfulDaemonServiceClient(address) + assert.NoError(t, err) + assert.NotNil(t, client) + + restfulClient, ok := client.(*restfulDaemonClient) + assert.True(t, ok) + assert.Equal(t, "https://"+address, restfulClient.hostURL) + }) + + t.Run("with empty address", func(t *testing.T) { + address := "" + client, err := NewRESTfulDaemonServiceClient(address) + assert.NoError(t, err) + assert.NotNil(t, client) + + restfulClient, ok := client.(*restfulDaemonClient) + assert.True(t, ok) + assert.Equal(t, "https://", restfulClient.hostURL) + }) + + t.Run("TLS config", func(t *testing.T) { + address := "example.com" + client, _ := NewRESTfulDaemonServiceClient(address) + restfulClient, _ := client.(*restfulDaemonClient) + + transport, ok := restfulClient.httpClient.Transport.(*http.Transport) + assert.True(t, ok) + assert.NotNil(t, transport.TLSClientConfig) + assert.True(t, transport.TLSClientConfig.InsecureSkipVerify) + }) +} diff --git a/pkg/daemon/server/service/health_status.go b/pkg/daemon/server/service/health_status.go index d8ef23658b..3b00668be8 100644 --- a/pkg/daemon/server/service/health_status.go +++ b/pkg/daemon/server/service/health_status.go @@ -460,7 +460,7 @@ func generateDataHealthResponse(state string, vertex string) *dataHealthResponse case v1alpha1.PipelineStatusWarning: return newDataHealthResponse( v1alpha1.PipelineStatusWarning, - fmt.Sprintf("Pipeline data flow is in a warning state for %s", vertex), + fmt.Sprintf("Pipeline data flow is in a warning state for %s", vertex), "D2") case v1alpha1.PipelineStatusCritical: return newDataHealthResponse( diff --git a/pkg/daemon/server/service/health_status_test.go b/pkg/daemon/server/service/health_status_test.go new file mode 100644 index 0000000000..ec636096be --- /dev/null +++ b/pkg/daemon/server/service/health_status_test.go @@ -0,0 +1,814 @@ +package service + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/types/known/wrapperspb" + + "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" + "github.com/numaproj/numaflow/pkg/apis/proto/daemon" + "github.com/numaproj/numaflow/pkg/isbsvc" + sharedqueue "github.com/numaproj/numaflow/pkg/shared/queue" +) + +func TestNewDataHealthResponse(t *testing.T) { + tests := []struct { + name string + status string + message string + code string + expectedStatus string + expectedMsg string + expectedCode string + }{ + { + name: "Healthy response", + status: "healthy", + message: "All systems operational", + code: "200", + expectedStatus: "healthy", + expectedMsg: "All systems operational", + expectedCode: "200", + }, + { + name: "Unhealthy response", + status: "unhealthy", + message: "Service degradation detected", + code: "503", + expectedStatus: "unhealthy", + expectedMsg: "Service degradation detected", + expectedCode: "503", + }, + { + name: "Empty response", + status: "", + message: "", + code: "", + expectedStatus: "", + expectedMsg: "", + expectedCode: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + response := newDataHealthResponse(tt.status, tt.message, tt.code) + assert.NotNil(t, response) + assert.Equal(t, tt.expectedStatus, response.Status) + assert.Equal(t, tt.expectedMsg, response.Message) + assert.Equal(t, tt.expectedCode, response.Code) + }) + } +} + +func TestNewHealthChecker(t *testing.T) { + tests := []struct { + name string + pipeline *v1alpha1.Pipeline + isbSvc isbsvc.ISBService + }{ + { + name: "With nil pipeline and nil ISBService", + pipeline: nil, + isbSvc: nil, + }, + { + name: "With non-nil pipeline and nil ISBService", + pipeline: &v1alpha1.Pipeline{}, + isbSvc: nil, + }, + { + name: "With nil pipeline and non-nil ISBService", + pipeline: nil, + isbSvc: &mockISBService{}, + }, + { + name: "With non-nil pipeline and non-nil ISBService", + pipeline: &v1alpha1.Pipeline{}, + isbSvc: &mockISBService{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + hc := NewHealthChecker(tt.pipeline, tt.isbSvc) + assert.NotNil(t, hc) + assert.Equal(t, defaultDataHealthResponse, hc.currentDataStatus) + assert.Equal(t, tt.isbSvc, hc.isbSvcClient) + assert.Equal(t, tt.pipeline, hc.pipeline) + assert.NotNil(t, hc.timelineData) + assert.Empty(t, hc.timelineData) + assert.NotNil(t, hc.statusLock) + }) + } +} + +type mockISBService struct { + isbsvc.ISBService +} + +func TestNewVertexState(t *testing.T) { + tests := []struct { + name string + vertexName string + vertexState string + expectedName string + expectedState string + }{ + { + name: "Normal vertex state", + vertexName: "vertex1", + vertexState: "running", + expectedName: "vertex1", + expectedState: "running", + }, + { + name: "Empty vertex name", + vertexName: "", + vertexState: "pending", + expectedName: "", + expectedState: "pending", + }, + { + name: "Empty vertex state", + vertexName: "vertex2", + vertexState: "", + expectedName: "vertex2", + expectedState: "", + }, + { + name: "Special characters in vertex name", + vertexName: "vertex-3_@#$", + vertexState: "completed", + expectedName: "vertex-3_@#$", + expectedState: "completed", + }, + { + name: "Long vertex name and state", + vertexName: "very_long_vertex_name_that_exceeds_normal_length", + vertexState: "very_long_state_description_that_exceeds_normal_length", + expectedName: "very_long_vertex_name_that_exceeds_normal_length", + expectedState: "very_long_state_description_that_exceeds_normal_length", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := newVertexState(tt.vertexName, tt.vertexState) + assert.NotNil(t, result) + assert.Equal(t, tt.expectedName, result.Name) + assert.Equal(t, tt.expectedState, result.State) + }) + } +} + +func TestGetCurrentHealth(t *testing.T) { + tests := []struct { + name string + initialStatus *dataHealthResponse + expectedStatus *dataHealthResponse + }{ + { + name: "Default health status", + initialStatus: &dataHealthResponse{ + Status: "healthy", + Message: "All systems operational", + Code: "200", + }, + expectedStatus: &dataHealthResponse{ + Status: "healthy", + Message: "All systems operational", + Code: "200", + }, + }, + { + name: "Custom health status", + initialStatus: &dataHealthResponse{ + Status: "degraded", + Message: "Partial system outage", + Code: "503", + }, + expectedStatus: &dataHealthResponse{ + Status: "degraded", + Message: "Partial system outage", + Code: "503", + }, + }, + { + name: "Nil health status", + initialStatus: nil, + expectedStatus: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + hc := &HealthChecker{ + currentDataStatus: tt.initialStatus, + statusLock: &sync.RWMutex{}, + } + + result := hc.getCurrentHealth() + assert.Equal(t, tt.expectedStatus, result) + }) + } +} + +func TestGetCurrentHealthConcurrency(t *testing.T) { + hc := &HealthChecker{ + currentDataStatus: &dataHealthResponse{ + Status: "healthy", + Message: "All systems operational", + Code: "200", + }, + statusLock: &sync.RWMutex{}, + } + + const numGoroutines = 100 + var wg sync.WaitGroup + wg.Add(numGoroutines) + + for i := 0; i < numGoroutines; i++ { + go func() { + defer wg.Done() + result := hc.getCurrentHealth() + assert.NotNil(t, result) + assert.Equal(t, "healthy", result.Status) + assert.Equal(t, "All systems operational", result.Message) + assert.Equal(t, "200", result.Code) + }() + } + + wg.Wait() +} + +func TestGetCurrentHealthWithUpdates(t *testing.T) { + hc := &HealthChecker{ + currentDataStatus: &dataHealthResponse{ + Status: "healthy", + Message: "All systems operational", + Code: "200", + }, + statusLock: &sync.RWMutex{}, + } + + // Initial check + result := hc.getCurrentHealth() + assert.Equal(t, "healthy", result.Status) + + // Update health status + hc.statusLock.Lock() + hc.currentDataStatus = &dataHealthResponse{ + Status: "unhealthy", + Message: "System outage", + Code: "503", + } + hc.statusLock.Unlock() + + // Check updated status + result = hc.getCurrentHealth() + assert.Equal(t, "unhealthy", result.Status) + assert.Equal(t, "System outage", result.Message) + assert.Equal(t, "503", result.Code) +} + +func TestSetCurrentHealth(t *testing.T) { + tests := []struct { + name string + initialStatus *dataHealthResponse + newStatus *dataHealthResponse + expectedStatus *dataHealthResponse + }{ + { + name: "Update from healthy to unhealthy", + initialStatus: &dataHealthResponse{ + Status: "healthy", + Message: "All systems operational", + Code: "200", + }, + newStatus: &dataHealthResponse{ + Status: "unhealthy", + Message: "System failure", + Code: "500", + }, + expectedStatus: &dataHealthResponse{ + Status: "unhealthy", + Message: "System failure", + Code: "500", + }, + }, + { + name: "Update from unhealthy to healthy", + initialStatus: &dataHealthResponse{ + Status: "unhealthy", + Message: "System failure", + Code: "500", + }, + newStatus: &dataHealthResponse{ + Status: "healthy", + Message: "All systems operational", + Code: "200", + }, + expectedStatus: &dataHealthResponse{ + Status: "healthy", + Message: "All systems operational", + Code: "200", + }, + }, + { + name: "Update from nil to valid status", + initialStatus: nil, + newStatus: &dataHealthResponse{ + Status: "degraded", + Message: "Partial system outage", + Code: "503", + }, + expectedStatus: &dataHealthResponse{ + Status: "degraded", + Message: "Partial system outage", + Code: "503", + }, + }, + { + name: "Update to nil status", + initialStatus: &dataHealthResponse{ + Status: "healthy", + Message: "All systems operational", + Code: "200", + }, + newStatus: nil, + expectedStatus: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + hc := &HealthChecker{ + currentDataStatus: tt.initialStatus, + statusLock: &sync.RWMutex{}, + } + + hc.setCurrentHealth(tt.newStatus) + + assert.Equal(t, tt.expectedStatus, hc.currentDataStatus) + }) + } +} + +func TestSetCurrentHealthConcurrency(t *testing.T) { + hc := &HealthChecker{ + currentDataStatus: &dataHealthResponse{ + Status: "healthy", + Message: "All systems operational", + Code: "200", + }, + statusLock: &sync.RWMutex{}, + } + + const numGoroutines = 100 + var wg sync.WaitGroup + wg.Add(numGoroutines) + + for i := 0; i < numGoroutines; i++ { + go func(i int) { + defer wg.Done() + newStatus := &dataHealthResponse{ + Status: fmt.Sprintf("status_%d", i), + Message: fmt.Sprintf("message_%d", i), + Code: fmt.Sprintf("%d", i), + } + hc.setCurrentHealth(newStatus) + }(i) + } + + wg.Wait() + + finalStatus := hc.getCurrentHealth() + assert.NotNil(t, finalStatus) + assert.Contains(t, finalStatus.Status, "status_") + assert.Contains(t, finalStatus.Message, "message_") + assert.NotEqual(t, "", finalStatus.Code) +} + +func TestSetCurrentHealthWithGetCurrentHealth(t *testing.T) { + hc := &HealthChecker{ + currentDataStatus: &dataHealthResponse{ + Status: "healthy", + Message: "All systems operational", + Code: "200", + }, + statusLock: &sync.RWMutex{}, + } + + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + for i := 0; i < 100; i++ { + newStatus := &dataHealthResponse{ + Status: fmt.Sprintf("status_%d", i), + Message: fmt.Sprintf("message_%d", i), + Code: fmt.Sprintf("%d", i), + } + hc.setCurrentHealth(newStatus) + time.Sleep(time.Millisecond) + } + }() + + go func() { + defer wg.Done() + for i := 0; i < 100; i++ { + status := hc.getCurrentHealth() + assert.NotNil(t, status) + time.Sleep(time.Millisecond) + } + }() + + wg.Wait() + + finalStatus := hc.getCurrentHealth() + assert.NotNil(t, finalStatus) + assert.Contains(t, finalStatus.Status, "status_") + assert.Contains(t, finalStatus.Message, "message_") + assert.NotEqual(t, "", finalStatus.Code) +} + +func TestUpdateUsageTimeline(t *testing.T) { + tests := []struct { + name string + bufferList []*daemon.BufferInfo + expected map[string]int + }{ + { + name: "Single buffer update", + bufferList: []*daemon.BufferInfo{ + { + BufferName: "buffer1", + BufferUsage: wrapperspb.Double(0.5), + }, + }, + expected: map[string]int{ + "buffer1": 1, + }, + }, + { + name: "Multiple buffer update", + bufferList: []*daemon.BufferInfo{ + { + BufferName: "buffer1", + BufferUsage: wrapperspb.Double(0.3), + }, + { + BufferName: "buffer2", + BufferUsage: wrapperspb.Double(0.7), + }, + }, + expected: map[string]int{ + "buffer1": 1, + "buffer2": 1, + }, + }, + { + name: "Empty buffer list", + bufferList: []*daemon.BufferInfo{}, + expected: map[string]int{}, + }, + { + name: "Buffer with zero usage", + bufferList: []*daemon.BufferInfo{ + { + BufferName: "buffer1", + BufferUsage: wrapperspb.Double(0), + }, + }, + expected: map[string]int{ + "buffer1": 1, + }, + }, + { + name: "Buffer with full usage", + bufferList: []*daemon.BufferInfo{ + { + BufferName: "buffer1", + BufferUsage: wrapperspb.Double(1), + }, + }, + expected: map[string]int{ + "buffer1": 1, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + hc := &HealthChecker{ + timelineData: make(map[string]*sharedqueue.OverflowQueue[*timelineEntry]), + } + + hc.updateUsageTimeline(tt.bufferList) + + for bufferName, expectedCount := range tt.expected { + assert.NotNil(t, hc.timelineData[bufferName]) + assert.Equal(t, expectedCount, hc.timelineData[bufferName].Length()) + + if expectedCount > 0 { + lastEntry := hc.timelineData[bufferName].Items()[0] + assert.NotZero(t, lastEntry.Time) + } + } + }) + } +} + +func TestUpdateAverageBufferUsage(t *testing.T) { + tests := []struct { + name string + timeline []*timelineEntry + newEntry float64 + expected float64 + }{ + { + name: "Empty timeline", + timeline: []*timelineEntry{}, + newEntry: 0.5, + expected: 0.5, + }, + { + name: "Timeline with one entry", + timeline: []*timelineEntry{ + {BufferUsage: 0.3, AverageBufferUsage: 0.3}, + }, + newEntry: 0.7, + expected: 0.5, + }, + { + name: "Full timeline", + timeline: []*timelineEntry{ + {BufferUsage: 0.1, AverageBufferUsage: 0.1}, + {BufferUsage: 0.2, AverageBufferUsage: 0.15}, + {BufferUsage: 0.3, AverageBufferUsage: 0.2}, + {BufferUsage: 0.4, AverageBufferUsage: 0.25}, + {BufferUsage: 0.5, AverageBufferUsage: 0.3}, + }, + newEntry: 0.6, + expected: 0.35, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := updateAverageBufferUsage(tt.timeline, tt.newEntry) + assert.InDelta(t, tt.expected, result, 0.001, "Expected average buffer usage to be %v, but got %v", tt.expected, result) + }) + } +} + +func TestAssignStateToBufferUsage(t *testing.T) { + tests := []struct { + name string + ewmaValue float64 + expected string + }{ + { + name: "Critical state", + ewmaValue: 96, + expected: criticalState, + }, + { + name: "Warning state", + ewmaValue: 85, + expected: warningState, + }, + { + name: "Healthy state", + ewmaValue: 30, + expected: healthyState, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := assignStateToBufferUsage(tt.ewmaValue) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestAssignStateToTimeline(t *testing.T) { + tests := []struct { + name string + ewmaValues []float64 + lookBack bool + expected string + }{ + { + name: "Single healthy value", + ewmaValues: []float64{30}, + lookBack: false, + expected: healthyState, + }, + { + name: "Single warning value", + ewmaValues: []float64{85}, + lookBack: false, + expected: warningState, + }, + { + name: "Single critical value without lookback", + ewmaValues: []float64{96}, + lookBack: false, + expected: criticalState, + }, + { + name: "Single critical value with lookback", + ewmaValues: []float64{96}, + lookBack: true, + expected: warningState, + }, + { + name: "Multiple values ending with critical, no lookback", + ewmaValues: []float64{30, 85, 96}, + lookBack: false, + expected: criticalState, + }, + { + name: "Multiple values ending with critical, with lookback, insufficient critical count", + ewmaValues: []float64{30, 85, 96, 96}, + lookBack: true, + expected: warningState, + }, + { + name: "Multiple values ending with critical, with lookback, sufficient critical count", + ewmaValues: []float64{96, 96, 96, 96, 96}, + lookBack: true, + expected: criticalState, + }, + { + name: "Values fluctuating between warning and critical", + ewmaValues: []float64{85, 96, 85, 96, 85}, + lookBack: true, + expected: warningState, + }, + { + name: "Values increasing from healthy to critical", + ewmaValues: []float64{30, 50, 70, 90, 96}, + lookBack: true, + expected: warningState, + }, + { + name: "Values decreasing from critical to healthy", + ewmaValues: []float64{96, 90, 70, 50, 30}, + lookBack: true, + expected: healthyState, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := assignStateToTimeline(tt.ewmaValues, tt.lookBack) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestConvertVertexStateToPipelineState(t *testing.T) { + tests := []struct { + name string + vertexStates []*vertexState + expected *dataHealthResponse + }{ + { + name: "All vertices healthy", + vertexStates: []*vertexState{ + {Name: "vertex1", State: v1alpha1.PipelineStatusHealthy}, + {Name: "vertex2", State: v1alpha1.PipelineStatusHealthy}, + }, + expected: newDataHealthResponse(v1alpha1.PipelineStatusHealthy, "Pipeline data flow is healthy", "D1"), + }, + { + name: "One vertex warning", + vertexStates: []*vertexState{ + {Name: "vertex1", State: v1alpha1.PipelineStatusHealthy}, + {Name: "vertex2", State: v1alpha1.PipelineStatusWarning}, + }, + expected: newDataHealthResponse(v1alpha1.PipelineStatusWarning, "Pipeline data flow is in a warning state for vertex2", "D2"), + }, + { + name: "One vertex critical", + vertexStates: []*vertexState{ + {Name: "vertex1", State: v1alpha1.PipelineStatusHealthy}, + {Name: "vertex2", State: v1alpha1.PipelineStatusCritical}, + }, + expected: newDataHealthResponse(v1alpha1.PipelineStatusCritical, "Pipeline data flow is in a critical state for vertex2", "D3"), + }, + { + name: "One vertex unknown", + vertexStates: []*vertexState{ + {Name: "vertex1", State: v1alpha1.PipelineStatusHealthy}, + {Name: "vertex2", State: v1alpha1.PipelineStatusUnknown}, + }, + expected: newDataHealthResponse(v1alpha1.PipelineStatusUnknown, "Pipeline data flow is in an unknown state due to vertex2", "D4"), + }, + { + name: "Empty vertex state list", + vertexStates: []*vertexState{}, + expected: newDataHealthResponse(v1alpha1.PipelineStatusHealthy, "Pipeline data flow is healthy", "D1"), + }, + { + name: "Nil vertex state list", + vertexStates: nil, + expected: newDataHealthResponse(v1alpha1.PipelineStatusHealthy, "Pipeline data flow is healthy", "D1"), + }, + { + name: "Multiple vertices with same highest state", + vertexStates: []*vertexState{ + {Name: "vertex1", State: v1alpha1.PipelineStatusHealthy}, + {Name: "vertex2", State: v1alpha1.PipelineStatusWarning}, + {Name: "vertex3", State: v1alpha1.PipelineStatusWarning}, + }, + expected: newDataHealthResponse(v1alpha1.PipelineStatusWarning, "Pipeline data flow is in a warning state for vertex2", "D2"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := convertVertexStateToPipelineState(tt.vertexStates) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestGenerateDataHealthResponse(t *testing.T) { + tests := []struct { + name string + state string + vertex string + expectedStatus string + expectedMsg string + expectedCode string + }{ + { + name: "Healthy state", + state: v1alpha1.PipelineStatusHealthy, + vertex: "vertex1", + expectedStatus: v1alpha1.PipelineStatusHealthy, + expectedMsg: "Pipeline data flow is healthy", + expectedCode: "D1", + }, + { + name: "Warning state", + state: v1alpha1.PipelineStatusWarning, + vertex: "vertex2", + expectedStatus: v1alpha1.PipelineStatusWarning, + expectedMsg: "Pipeline data flow is in a warning state for vertex2", + expectedCode: "D2", + }, + { + name: "Critical state", + state: v1alpha1.PipelineStatusCritical, + vertex: "vertex3", + expectedStatus: v1alpha1.PipelineStatusCritical, + expectedMsg: "Pipeline data flow is in a critical state for vertex3", + expectedCode: "D3", + }, + { + name: "Unknown state", + state: v1alpha1.PipelineStatusUnknown, + vertex: "vertex4", + expectedStatus: v1alpha1.PipelineStatusUnknown, + expectedMsg: "Pipeline data flow is in an unknown state due to vertex4", + expectedCode: "D4", + }, + { + name: "Invalid state", + state: "invalid", + vertex: "vertex5", + expectedStatus: defaultDataHealthResponse.Status, + expectedMsg: defaultDataHealthResponse.Message, + expectedCode: defaultDataHealthResponse.Code, + }, + { + name: "Empty state and vertex", + state: "", + vertex: "", + expectedStatus: defaultDataHealthResponse.Status, + expectedMsg: defaultDataHealthResponse.Message, + expectedCode: defaultDataHealthResponse.Code, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := generateDataHealthResponse(tt.state, tt.vertex) + assert.NotNil(t, result) + assert.Equal(t, tt.expectedStatus, result.Status) + assert.Equal(t, tt.expectedMsg, result.Message) + assert.Equal(t, tt.expectedCode, result.Code) + }) + } +} diff --git a/pkg/shared/clients/nats/client_pool_test.go b/pkg/shared/clients/nats/client_pool_test.go index 8e8be47247..8b88b50850 100644 --- a/pkg/shared/clients/nats/client_pool_test.go +++ b/pkg/shared/clients/nats/client_pool_test.go @@ -1,3 +1,19 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package nats import ( @@ -5,8 +21,9 @@ import ( "os" "testing" - dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/stretchr/testify/assert" + + dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" ) func TestNewClientPool_Success(t *testing.T) { diff --git a/pkg/shared/clients/nats/nats_client_test.go b/pkg/shared/clients/nats/nats_client_test.go index 7e81277f0e..1162e38d3a 100644 --- a/pkg/shared/clients/nats/nats_client_test.go +++ b/pkg/shared/clients/nats/nats_client_test.go @@ -1,3 +1,19 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package nats import ( @@ -6,12 +22,12 @@ import ( "testing" "github.com/nats-io/nats.go" - dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" - "github.com/numaproj/numaflow/pkg/shared/logging" "github.com/stretchr/testify/assert" "go.uber.org/zap" + dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" natstest "github.com/numaproj/numaflow/pkg/shared/clients/nats/test" + "github.com/numaproj/numaflow/pkg/shared/logging" ) func TestNewNATSClient(t *testing.T) { diff --git a/pkg/shared/clients/nats/options_test.go b/pkg/shared/clients/nats/options_test.go index 87aa3f1858..d7dab4dff5 100644 --- a/pkg/shared/clients/nats/options_test.go +++ b/pkg/shared/clients/nats/options_test.go @@ -1,3 +1,19 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package nats import ( diff --git a/pkg/shared/clients/nats/test/server.go b/pkg/shared/clients/nats/test/server.go index abe5722374..af3c25036e 100644 --- a/pkg/shared/clients/nats/test/server.go +++ b/pkg/shared/clients/nats/test/server.go @@ -13,14 +13,13 @@ See the License for the specific language governing permissions and limitations under the License. */ -package nats +package test import ( "os" "testing" "github.com/nats-io/nats-server/v2/server" - natstestserver "github.com/nats-io/nats-server/v2/test" ) diff --git a/pkg/shared/ewma/simple_ewma_test.go b/pkg/shared/ewma/simple_ewma_test.go index 5f34c3b8e1..313db5dd08 100644 --- a/pkg/shared/ewma/simple_ewma_test.go +++ b/pkg/shared/ewma/simple_ewma_test.go @@ -53,3 +53,46 @@ func TestSimpleEWMA(t *testing.T) { } // TestSimpleEWMAInit tests the SimpleEWMA initialization. +func TestSimpleEWMAAdd(t *testing.T) { + ewma := NewSimpleEWMA() + + // Test initialization + ewma.Add(10) + assert.Equal(t, 10.0, ewma.Get()) +} + +func TestSimpleEWMAGet(t *testing.T) { + ewma := NewSimpleEWMA() + + // Test get before initialization + assert.Equal(t, 0.0, ewma.Get()) +} + +func TestSimpleEWMAReset(t *testing.T) { + ewma := NewSimpleEWMA() + + ewma.Add(100) + assert.Equal(t, 100.0, ewma.Get()) + + ewma.Reset() + assert.Equal(t, 0.0, ewma.Get()) + + // Test that the next Add after Reset initializes properly + ewma.Add(200) + assert.Equal(t, 200.0, ewma.Get()) +} + +func TestSimpleEWMASet(t *testing.T) { + ewma := NewSimpleEWMA() + + ewma.Set(75) + assert.Equal(t, 75.0, ewma.Get()) +} + +func TestSimpleEWMAWithCustomAlpha(t *testing.T) { + ewma := NewSimpleEWMA(10) // span of 10 gives alpha of 0.18181818... + + ewma.Add(50) + ewma.Add(100) + assert.InDelta(t, 59.09090909, ewma.Get(), 0.00000001) +} diff --git a/pkg/shared/idlehandler/idlehandler_test.go b/pkg/shared/idlehandler/idlehandler_test.go index 8e50685648..90dc1662ef 100644 --- a/pkg/shared/idlehandler/idlehandler_test.go +++ b/pkg/shared/idlehandler/idlehandler_test.go @@ -1,3 +1,19 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package idlehandler import ( diff --git a/server/apis/v1/handler.go b/server/apis/v1/handler.go index 1e410bbf0e..b57b21a357 100644 --- a/server/apis/v1/handler.go +++ b/server/apis/v1/handler.go @@ -58,6 +58,36 @@ const ( ValidTypeUpdate = "valid-update" ) +type handlerOptions struct { + // readonly is used to indicate whether the server is in read-only mode + readonly bool + // daemonClientProtocol is used to indicate the protocol of the daemon client, 'grpc' or 'http' + daemonClientProtocol string +} + +func defaultHandlerOptions() *handlerOptions { + return &handlerOptions{ + readonly: false, + daemonClientProtocol: "grpc", + } +} + +type HandlerOption func(*handlerOptions) + +// WithDaemonClientProtocol sets the protocol of the daemon client. +func WithDaemonClientProtocol(protocol string) HandlerOption { + return func(o *handlerOptions) { + o.daemonClientProtocol = protocol + } +} + +// WithReadOnlyMode sets the server to read-only mode. +func WithReadOnlyMode() HandlerOption { + return func(o *handlerOptions) { + o.readonly = true + } +} + type handler struct { kubeClient kubernetes.Interface metricsClient *metricsversiond.Clientset @@ -65,12 +95,12 @@ type handler struct { daemonClientsCache *lru.Cache[string, daemonclient.DaemonClient] dexObj *DexObject localUsersAuthObject *LocalUsersAuthObject - isReadOnly bool healthChecker *HealthChecker + opts *handlerOptions } // NewHandler is used to provide a new instance of the handler type -func NewHandler(ctx context.Context, dexObj *DexObject, localUsersAuthObject *LocalUsersAuthObject, isReadOnly bool) (*handler, error) { +func NewHandler(ctx context.Context, dexObj *DexObject, localUsersAuthObject *LocalUsersAuthObject, opts ...HandlerOption) (*handler, error) { var ( k8sRestConfig *rest.Config err error @@ -88,6 +118,12 @@ func NewHandler(ctx context.Context, dexObj *DexObject, localUsersAuthObject *Lo daemonClientsCache, _ := lru.NewWithEvict[string, daemonclient.DaemonClient](500, func(key string, value daemonclient.DaemonClient) { _ = value.Close() }) + o := defaultHandlerOptions() + for _, opt := range opts { + if opt != nil { + opt(o) + } + } return &handler{ kubeClient: kubeClient, metricsClient: metricsClient, @@ -95,8 +131,8 @@ func NewHandler(ctx context.Context, dexObj *DexObject, localUsersAuthObject *Lo daemonClientsCache: daemonClientsCache, dexObj: dexObj, localUsersAuthObject: localUsersAuthObject, - isReadOnly: isReadOnly, healthChecker: NewHealthChecker(ctx), + opts: o, }, nil } @@ -284,7 +320,7 @@ func (h *handler) GetClusterSummary(c *gin.Context) { // CreatePipeline is used to create a given pipeline func (h *handler) CreatePipeline(c *gin.Context) { - if h.isReadOnly { + if h.opts.readonly { errMsg := "Failed to perform this operation in read only mode" c.JSON(http.StatusForbidden, NewNumaflowAPIResponse(&errMsg, nil)) return @@ -418,7 +454,7 @@ func (h *handler) GetPipeline(c *gin.Context) { // UpdatePipeline is used to update a given pipeline func (h *handler) UpdatePipeline(c *gin.Context) { - if h.isReadOnly { + if h.opts.readonly { errMsg := "Failed to perform this operation in read only mode" c.JSON(http.StatusForbidden, NewNumaflowAPIResponse(&errMsg, nil)) return @@ -474,7 +510,7 @@ func (h *handler) UpdatePipeline(c *gin.Context) { // DeletePipeline is used to delete a given pipeline func (h *handler) DeletePipeline(c *gin.Context) { - if h.isReadOnly { + if h.opts.readonly { errMsg := "Failed to perform this operation in read only mode" c.JSON(http.StatusForbidden, NewNumaflowAPIResponse(&errMsg, nil)) return @@ -495,7 +531,7 @@ func (h *handler) DeletePipeline(c *gin.Context) { // PatchPipeline is used to patch the pipeline spec to achieve operations such as "pause" and "resume" func (h *handler) PatchPipeline(c *gin.Context) { - if h.isReadOnly { + if h.opts.readonly { errMsg := "Failed to perform this operation in read only mode" c.JSON(http.StatusForbidden, NewNumaflowAPIResponse(&errMsg, nil)) return @@ -524,7 +560,7 @@ func (h *handler) PatchPipeline(c *gin.Context) { } func (h *handler) CreateInterStepBufferService(c *gin.Context) { - if h.isReadOnly { + if h.opts.readonly { errMsg := "Failed to perform this operation in read only mode" c.JSON(http.StatusForbidden, NewNumaflowAPIResponse(&errMsg, nil)) return @@ -599,7 +635,7 @@ func (h *handler) GetInterStepBufferService(c *gin.Context) { } func (h *handler) UpdateInterStepBufferService(c *gin.Context) { - if h.isReadOnly { + if h.opts.readonly { errMsg := "Failed to perform this operation in read only mode" c.JSON(http.StatusForbidden, NewNumaflowAPIResponse(&errMsg, nil)) return @@ -641,7 +677,7 @@ func (h *handler) UpdateInterStepBufferService(c *gin.Context) { } func (h *handler) DeleteInterStepBufferService(c *gin.Context) { - if h.isReadOnly { + if h.opts.readonly { errMsg := "Failed to perform this operation in read only mode" c.JSON(http.StatusForbidden, NewNumaflowAPIResponse(&errMsg, nil)) return @@ -715,7 +751,7 @@ func (h *handler) respondWithError(c *gin.Context, message string) { // UpdateVertex is used to update the vertex spec func (h *handler) UpdateVertex(c *gin.Context) { - if h.isReadOnly { + if h.opts.readonly { errMsg := "Failed to perform this operation in read only mode" c.JSON(http.StatusForbidden, NewNumaflowAPIResponse(&errMsg, nil)) return @@ -1147,7 +1183,14 @@ func daemonSvcAddress(ns, pipeline string) string { func (h *handler) getDaemonClient(ns, pipeline string) (daemonclient.DaemonClient, error) { if dClient, ok := h.daemonClientsCache.Get(daemonSvcAddress(ns, pipeline)); !ok { - c, err := daemonclient.NewGRPCDaemonServiceClient(daemonSvcAddress(ns, pipeline)) + var err error + var c daemonclient.DaemonClient + // Default to use gRPC client + if strings.EqualFold(h.opts.daemonClientProtocol, "http") { + c, err = daemonclient.NewRESTfulDaemonServiceClient(daemonSvcAddress(ns, pipeline)) + } else { + c, err = daemonclient.NewGRPCDaemonServiceClient(daemonSvcAddress(ns, pipeline)) + } if err != nil { return nil, err } diff --git a/server/cmd/server/start.go b/server/cmd/server/start.go index cae5fef5a8..24f8e25179 100644 --- a/server/cmd/server/start.go +++ b/server/cmd/server/start.go @@ -44,16 +44,17 @@ var ( ) type ServerOptions struct { - Insecure bool - Port int - Namespaced bool - ManagedNamespace string - BaseHref string - DisableAuth bool - DexServerAddr string - ServerAddr string - CorsAllowedOrigins string - ReadOnly bool + Insecure bool + Port int + Namespaced bool + ManagedNamespace string + BaseHref string + DisableAuth bool + DexServerAddr string + ServerAddr string + CorsAllowedOrigins string + ReadOnly bool + DaemonClientProtocol string } type server struct { @@ -102,10 +103,12 @@ func (s *server) Start(ctx context.Context) { ctx, router, routes.SystemInfo{ - ManagedNamespace: s.options.ManagedNamespace, - Namespaced: s.options.Namespaced, - IsReadOnly: s.options.ReadOnly, - Version: numaflow.GetVersion().String()}, + ManagedNamespace: s.options.ManagedNamespace, + Namespaced: s.options.Namespaced, + IsReadOnly: s.options.ReadOnly, + Version: numaflow.GetVersion().String(), + DaemonClientProtocol: s.options.DaemonClientProtocol, + }, routes.AuthInfo{ DisableAuth: s.options.DisableAuth, DexServerAddr: s.options.DexServerAddr, diff --git a/server/cmd/server/start_test.go b/server/cmd/server/start_test.go new file mode 100644 index 0000000000..645e53a3ff --- /dev/null +++ b/server/cmd/server/start_test.go @@ -0,0 +1,84 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cmd + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCreateAuthRouteMap(t *testing.T) { + t.Run("empty base", func(t *testing.T) { + got := CreateAuthRouteMap("") + assert.Equal(t, 24, len(got)) + }) + + t.Run("customize base", func(t *testing.T) { + got := CreateAuthRouteMap("abcdefg") + assert.Equal(t, 24, len(got)) + for k := range got { + assert.Contains(t, k, "abcdefg") + } + }) +} + +func TestNewServer(t *testing.T) { + opts := ServerOptions{ + Insecure: true, + Port: 8080, + Namespaced: true, + ManagedNamespace: "default", + BaseHref: "/", + DisableAuth: false, + DexServerAddr: "http://dex:5556", + ServerAddr: "http://server:8080", + CorsAllowedOrigins: "http://localhost:3000,http://example.com", + ReadOnly: false, + DaemonClientProtocol: "http", + } + + s := NewServer(opts) + + assert.NotNil(t, s) + assert.Equal(t, opts, s.options) +} + +func TestNeedToRewrite(t *testing.T) { + tests := []struct { + name string + path string + expected bool + }{ + { + name: "path with rewrite prefix", + path: "/namespaces/abc", + expected: true, + }, + { + name: "path without rewrite prefix", + path: "/static/images/logo.png", + expected: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := needToRewrite(tt.path) + assert.Equal(t, tt.expected, result) + }) + } +} diff --git a/server/routes/routes.go b/server/routes/routes.go index 59f7006a5b..580a819079 100644 --- a/server/routes/routes.go +++ b/server/routes/routes.go @@ -31,10 +31,11 @@ import ( ) type SystemInfo struct { - ManagedNamespace string `json:"managedNamespace"` - Namespaced bool `json:"namespaced"` - IsReadOnly bool `json:"isReadOnly"` - Version string `json:"version"` + ManagedNamespace string `json:"managedNamespace"` + Namespaced bool `json:"namespaced"` + IsReadOnly bool `json:"isReadOnly"` + Version string `json:"version"` + DaemonClientProtocol string `json:"daemonClientProtocol"` } type AuthInfo struct { @@ -71,9 +72,9 @@ func Routes(ctx context.Context, r *gin.Engine, sysInfo SystemInfo, authInfo Aut } // Add the AuthN/AuthZ middleware to the group. r1Group.Use(authMiddleware(ctx, authorizer, dexObj, localUsersAuthObj, authRouteMap)) - v1Routes(ctx, r1Group, dexObj, localUsersAuthObj, sysInfo.IsReadOnly) + v1Routes(ctx, r1Group, dexObj, localUsersAuthObj, sysInfo.IsReadOnly, sysInfo.DaemonClientProtocol) } else { - v1Routes(ctx, r1Group, nil, nil, sysInfo.IsReadOnly) + v1Routes(ctx, r1Group, nil, nil, sysInfo.IsReadOnly, sysInfo.DaemonClientProtocol) } r1Group.GET("/sysinfo", func(c *gin.Context) { c.JSON(http.StatusOK, v1.NewNumaflowAPIResponse(nil, sysInfo)) @@ -97,8 +98,12 @@ func v1RoutesNoAuth(r gin.IRouter, dexObj *v1.DexObject, localUsersAuthObject *v // v1Routes defines the routes for the v1 API. For adding a new route, add a new handler function // for the route along with an entry in the RouteMap in auth/route_map.go. -func v1Routes(ctx context.Context, r gin.IRouter, dexObj *v1.DexObject, localUsersAuthObject *v1.LocalUsersAuthObject, isReadOnly bool) { - handler, err := v1.NewHandler(ctx, dexObj, localUsersAuthObject, isReadOnly) +func v1Routes(ctx context.Context, r gin.IRouter, dexObj *v1.DexObject, localUsersAuthObject *v1.LocalUsersAuthObject, isReadOnly bool, daemonClientProtocol string) { + handlerOpts := []v1.HandlerOption{v1.WithDaemonClientProtocol(daemonClientProtocol)} + if isReadOnly { + handlerOpts = append(handlerOpts, v1.WithReadOnlyMode()) + } + handler, err := v1.NewHandler(ctx, dexObj, localUsersAuthObject, handlerOpts...) if err != nil { panic(err) }