diff --git a/pkg/apis/numaflow/v1alpha1/mono_vertex_types_test.go b/pkg/apis/numaflow/v1alpha1/mono_vertex_types_test.go index aeb809db05..665acd8b32 100644 --- a/pkg/apis/numaflow/v1alpha1/mono_vertex_types_test.go +++ b/pkg/apis/numaflow/v1alpha1/mono_vertex_types_test.go @@ -18,6 +18,7 @@ package v1alpha1 import ( "testing" + "time" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" @@ -88,6 +89,31 @@ func TestMonoVertex_MarkPhaseRunning(t *testing.T) { } } +func TestMonoVertex_MarkDaemonUnHealthy(t *testing.T) { + mvs := MonoVertexStatus{} + mvs.MarkDaemonUnHealthy("reason", "message") + + for _, condition := range mvs.Conditions { + if condition.Type == string(MonoVertexConditionDaemonHealthy) { + if condition.Status != metav1.ConditionFalse { + t.Errorf("MarkDaemonUnHealthy should set the DaemonHealthy condition to false, got %v", condition.Status) + } + if condition.Reason != "reason" { + t.Errorf("MarkDaemonUnHealthy should set the Reason to 'reason', got %s", condition.Reason) + } + if condition.Message != "message" { + t.Errorf("MarkDaemonUnHealthy should set the Message to 'message', got %s", condition.Message) + } + } + } +} + +func TestMonoVertex_SetObservedGeneration(t *testing.T) { + mvs := MonoVertexStatus{} + mvs.SetObservedGeneration(1) + assert.Equal(t, int64(1), mvs.ObservedGeneration) +} + func TestMonoVertex_IsHealthy(t *testing.T) { mvs := MonoVertexStatus{} @@ -171,3 +197,346 @@ func TestMonoVertexGetPodSpec(t *testing.T) { assert.Contains(t, envNames, EnvMonoVertexObject) }) } + +func TestMonoVertexLimits_GetReadBatchSize(t *testing.T) { + t.Run("default value", func(t *testing.T) { + mvl := MonoVertexLimits{} + assert.Equal(t, uint64(DefaultReadBatchSize), mvl.GetReadBatchSize()) + }) + + t.Run("custom value", func(t *testing.T) { + customSize := uint64(1000) + mvl := MonoVertexLimits{ReadBatchSize: &customSize} + assert.Equal(t, customSize, mvl.GetReadBatchSize()) + }) + +} + +func TestMonoVertexLimits_GetReadTimeout(t *testing.T) { + t.Run("default value", func(t *testing.T) { + mvl := MonoVertexLimits{} + assert.Equal(t, DefaultReadTimeout, mvl.GetReadTimeout()) + }) + + t.Run("custom value", func(t *testing.T) { + customTimeout := metav1.Duration{Duration: 5 * time.Second} + mvl := MonoVertexLimits{ReadTimeout: &customTimeout} + assert.Equal(t, 5*time.Second, mvl.GetReadTimeout()) + }) +} + +func TestMonoVertex_GetDaemonDeploymentName(t *testing.T) { + mv := MonoVertex{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-vertex", + }, + } + expected := "test-vertex-mv-daemon" + assert.Equal(t, expected, mv.GetDaemonDeploymentName()) +} + +func TestMonoVertex_GetDaemonServiceURL(t *testing.T) { + mv := MonoVertex{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-vertex", + Namespace: "test-namespace", + }, + } + expected := "test-vertex-mv-daemon-svc.test-namespace.svc:4327" + assert.Equal(t, expected, mv.GetDaemonServiceURL()) +} + +func TestMonoVertex_Scalable(t *testing.T) { + t.Run("scalable when not disabled", func(t *testing.T) { + mv := MonoVertex{ + Spec: MonoVertexSpec{ + Scale: Scale{ + Disabled: false, + }, + }, + } + assert.True(t, mv.Scalable()) + }) + + t.Run("not scalable when disabled", func(t *testing.T) { + mv := MonoVertex{ + Spec: MonoVertexSpec{ + Scale: Scale{ + Disabled: true, + }, + }, + } + assert.False(t, mv.Scalable()) + }) +} + +func TestMonoVertex_GetReplicas(t *testing.T) { + t.Run("default replicas", func(t *testing.T) { + mv := MonoVertex{} + assert.Equal(t, 1, mv.getReplicas()) + }) + + t.Run("custom replicas", func(t *testing.T) { + replicas := int32(3) + mv := MonoVertex{ + Spec: MonoVertexSpec{ + Replicas: &replicas, + }, + } + assert.Equal(t, 3, mv.getReplicas()) + }) +} + +func TestMonoVertex_CalculateReplicas(t *testing.T) { + t.Run("auto scaling disabled", func(t *testing.T) { + replicas := int32(5) + mv := MonoVertex{ + Spec: MonoVertexSpec{ + Replicas: &replicas, + Scale: Scale{ + Disabled: true, + }, + }, + } + assert.Equal(t, 5, mv.CalculateReplicas()) + }) + + t.Run("auto scaling enabled, within range", func(t *testing.T) { + replicas := int32(3) + mv := MonoVertex{ + Spec: MonoVertexSpec{ + Replicas: &replicas, + Scale: Scale{ + Disabled: false, + Min: ptr.To[int32](1), + Max: ptr.To[int32](5), + }, + }, + } + assert.Equal(t, 3, mv.CalculateReplicas()) + }) + + t.Run("auto scaling enabled, below min", func(t *testing.T) { + replicas := int32(0) + mv := MonoVertex{ + Spec: MonoVertexSpec{ + Replicas: &replicas, + Scale: Scale{ + Disabled: false, + Min: ptr.To[int32](2), + Max: ptr.To[int32](5), + }, + }, + } + assert.Equal(t, 2, mv.CalculateReplicas()) + }) + + t.Run("auto scaling enabled, above max", func(t *testing.T) { + replicas := int32(10) + mv := MonoVertex{ + Spec: MonoVertexSpec{ + Replicas: &replicas, + Scale: Scale{ + Disabled: false, + Min: ptr.To[int32](2), + Max: ptr.To[int32](5), + }, + }, + } + assert.Equal(t, 5, mv.CalculateReplicas()) + }) +} + +func TestMonoVertex_GetServiceObj(t *testing.T) { + mv := MonoVertex{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-vertex", + Namespace: "test-namespace", + }, + } + + t.Run("non-headless service", func(t *testing.T) { + svc := mv.getServiceObj("test-service", false, 8080, "http") + assert.Equal(t, "test-service", svc.Name) + assert.Equal(t, "test-namespace", svc.Namespace) + assert.Equal(t, 1, len(svc.Spec.Ports)) + assert.Equal(t, int32(8080), svc.Spec.Ports[0].Port) + assert.Equal(t, "http", svc.Spec.Ports[0].Name) + assert.NotEqual(t, "None", svc.Spec.ClusterIP) + }) + + t.Run("headless service", func(t *testing.T) { + svc := mv.getServiceObj("test-headless-service", true, 9090, "grpc") + assert.Equal(t, "test-headless-service", svc.Name) + assert.Equal(t, "test-namespace", svc.Namespace) + assert.Equal(t, 1, len(svc.Spec.Ports)) + assert.Equal(t, int32(9090), svc.Spec.Ports[0].Port) + assert.Equal(t, "grpc", svc.Spec.Ports[0].Name) + assert.Equal(t, "None", svc.Spec.ClusterIP) + }) + + t.Run("verify labels", func(t *testing.T) { + svc := mv.getServiceObj("test-label-service", false, 7070, "metrics") + expectedLabels := map[string]string{ + KeyPartOf: Project, + KeyManagedBy: ControllerMonoVertex, + KeyComponent: ComponentMonoVertex, + KeyMonoVertexName: "test-vertex", + } + assert.Equal(t, expectedLabels, svc.Labels) + }) + + t.Run("verify selector", func(t *testing.T) { + svc := mv.getServiceObj("test-selector-service", false, 6060, "admin") + expectedSelector := map[string]string{ + KeyPartOf: Project, + KeyManagedBy: ControllerMonoVertex, + KeyComponent: ComponentMonoVertex, + KeyMonoVertexName: "test-vertex", + } + assert.Equal(t, expectedSelector, svc.Spec.Selector) + }) +} + +func TestMonoVertex_GetServiceObjs(t *testing.T) { + mv := MonoVertex{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-vertex", + Namespace: "test-namespace", + }, + } + + t.Run("verify service objects", func(t *testing.T) { + services := mv.GetServiceObjs() + assert.Equal(t, 1, len(services), "Expected 1 service object") + + headlessService := services[0] + assert.Equal(t, mv.GetHeadlessServiceName(), headlessService.Name) + assert.Equal(t, "test-namespace", headlessService.Namespace) + assert.Equal(t, "None", headlessService.Spec.ClusterIP) + assert.Equal(t, 1, len(headlessService.Spec.Ports)) + assert.Equal(t, int32(MonoVertexMetricsPort), headlessService.Spec.Ports[0].Port) + assert.Equal(t, MonoVertexMetricsPortName, headlessService.Spec.Ports[0].Name) + }) +} + +func TestMonoVertex_GetDaemonDeploymentObj(t *testing.T) { + mv := MonoVertex{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-vertex", + Namespace: "test-namespace", + }, + Spec: MonoVertexSpec{}, + } + + t.Run("basic deployment object", func(t *testing.T) { + req := GetMonoVertexDaemonDeploymentReq{ + Image: "test-image:latest", + PullPolicy: corev1.PullAlways, + DefaultResources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100m"), + corev1.ResourceMemory: resource.MustParse("128Mi"), + }, + }, + } + + deployment, err := mv.GetDaemonDeploymentObj(req) + assert.NoError(t, err) + assert.NotNil(t, deployment) + assert.Equal(t, mv.GetDaemonDeploymentName(), deployment.Name) + assert.Equal(t, mv.Namespace, deployment.Namespace) + assert.Equal(t, "test-image:latest", deployment.Spec.Template.Spec.Containers[0].Image) + assert.Equal(t, corev1.PullAlways, deployment.Spec.Template.Spec.Containers[0].ImagePullPolicy) + assert.Equal(t, resource.MustParse("100m"), deployment.Spec.Template.Spec.Containers[0].Resources.Limits[corev1.ResourceCPU]) + assert.Equal(t, resource.MustParse("128Mi"), deployment.Spec.Template.Spec.Containers[0].Resources.Limits[corev1.ResourceMemory]) + }) + + t.Run("with custom environment variables", func(t *testing.T) { + req := GetMonoVertexDaemonDeploymentReq{ + Image: "test-image:v1", + Env: []corev1.EnvVar{ + {Name: "CUSTOM_ENV", Value: "custom_value"}, + }, + } + + deployment, err := mv.GetDaemonDeploymentObj(req) + assert.NoError(t, err) + assert.NotNil(t, deployment) + + envVars := deployment.Spec.Template.Spec.Containers[0].Env + assert.Contains(t, envVars, corev1.EnvVar{Name: "CUSTOM_ENV", Value: "custom_value"}) + }) + + t.Run("with daemon template", func(t *testing.T) { + mv.Spec.DaemonTemplate = &DaemonTemplate{ + Replicas: ptr.To[int32](3), + AbstractPodTemplate: AbstractPodTemplate{ + NodeSelector: map[string]string{"node": "special"}, + }, + ContainerTemplate: &ContainerTemplate{ + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("200m"), + }, + }, + }, + } + + req := GetMonoVertexDaemonDeploymentReq{ + Image: "test-image:v2", + } + + deployment, err := mv.GetDaemonDeploymentObj(req) + assert.NoError(t, err) + assert.NotNil(t, deployment) + assert.Equal(t, int32(3), *deployment.Spec.Replicas) + assert.Equal(t, "special", deployment.Spec.Template.Spec.NodeSelector["node"]) + assert.Equal(t, resource.MustParse("200m"), deployment.Spec.Template.Spec.Containers[0].Resources.Limits[corev1.ResourceCPU]) + }) + + t.Run("verify probes", func(t *testing.T) { + req := GetMonoVertexDaemonDeploymentReq{ + Image: "test-image:v3", + } + + deployment, err := mv.GetDaemonDeploymentObj(req) + assert.NoError(t, err) + assert.NotNil(t, deployment) + + container := deployment.Spec.Template.Spec.Containers[0] + assert.NotNil(t, container.ReadinessProbe) + assert.NotNil(t, container.LivenessProbe) + + assert.Equal(t, int32(MonoVertexDaemonServicePort), container.ReadinessProbe.HTTPGet.Port.IntVal) + assert.Equal(t, "/readyz", container.ReadinessProbe.HTTPGet.Path) + assert.Equal(t, corev1.URISchemeHTTPS, container.ReadinessProbe.HTTPGet.Scheme) + + assert.Equal(t, int32(MonoVertexDaemonServicePort), container.LivenessProbe.HTTPGet.Port.IntVal) + assert.Equal(t, "/livez", container.LivenessProbe.HTTPGet.Path) + assert.Equal(t, corev1.URISchemeHTTPS, container.LivenessProbe.HTTPGet.Scheme) + }) + + t.Run("verify labels and owner references", func(t *testing.T) { + req := GetMonoVertexDaemonDeploymentReq{ + Image: "test-image:v4", + } + + deployment, err := mv.GetDaemonDeploymentObj(req) + assert.NoError(t, err) + assert.NotNil(t, deployment) + + expectedLabels := map[string]string{ + KeyPartOf: Project, + KeyManagedBy: ControllerMonoVertex, + KeyComponent: ComponentMonoVertexDaemon, + KeyAppName: mv.GetDaemonDeploymentName(), + KeyMonoVertexName: mv.Name, + } + assert.Equal(t, expectedLabels, deployment.Labels) + + assert.Len(t, deployment.OwnerReferences, 1) + assert.Equal(t, mv.Name, deployment.OwnerReferences[0].Name) + assert.Equal(t, MonoVertexGroupVersionKind.Kind, deployment.OwnerReferences[0].Kind) + }) +}