Skip to content

Commit

Permalink
test cases
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy committed Sep 9, 2024
1 parent b080ead commit a0d24a7
Showing 1 changed file with 369 additions and 0 deletions.
369 changes: 369 additions & 0 deletions pkg/apis/numaflow/v1alpha1/mono_vertex_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package v1alpha1

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -88,6 +89,31 @@ func TestMonoVertex_MarkPhaseRunning(t *testing.T) {
}
}

func TestMonoVertex_MarkDaemonUnHealthy(t *testing.T) {
mvs := MonoVertexStatus{}
mvs.MarkDaemonUnHealthy("reason", "message")

for _, condition := range mvs.Conditions {
if condition.Type == string(MonoVertexConditionDaemonHealthy) {
if condition.Status != metav1.ConditionFalse {
t.Errorf("MarkDaemonUnHealthy should set the DaemonHealthy condition to false, got %v", condition.Status)
}
if condition.Reason != "reason" {
t.Errorf("MarkDaemonUnHealthy should set the Reason to 'reason', got %s", condition.Reason)
}
if condition.Message != "message" {
t.Errorf("MarkDaemonUnHealthy should set the Message to 'message', got %s", condition.Message)
}
}
}
}

func TestMonoVertex_SetObservedGeneration(t *testing.T) {
mvs := MonoVertexStatus{}
mvs.SetObservedGeneration(1)
assert.Equal(t, int64(1), mvs.ObservedGeneration)
}

func TestMonoVertex_IsHealthy(t *testing.T) {
mvs := MonoVertexStatus{}

Expand Down Expand Up @@ -171,3 +197,346 @@ func TestMonoVertexGetPodSpec(t *testing.T) {
assert.Contains(t, envNames, EnvMonoVertexObject)
})
}

func TestMonoVertexLimits_GetReadBatchSize(t *testing.T) {
t.Run("default value", func(t *testing.T) {
mvl := MonoVertexLimits{}
assert.Equal(t, uint64(DefaultReadBatchSize), mvl.GetReadBatchSize())
})

t.Run("custom value", func(t *testing.T) {
customSize := uint64(1000)
mvl := MonoVertexLimits{ReadBatchSize: &customSize}
assert.Equal(t, customSize, mvl.GetReadBatchSize())
})

}

func TestMonoVertexLimits_GetReadTimeout(t *testing.T) {
t.Run("default value", func(t *testing.T) {
mvl := MonoVertexLimits{}
assert.Equal(t, DefaultReadTimeout, mvl.GetReadTimeout())
})

t.Run("custom value", func(t *testing.T) {
customTimeout := metav1.Duration{Duration: 5 * time.Second}
mvl := MonoVertexLimits{ReadTimeout: &customTimeout}
assert.Equal(t, 5*time.Second, mvl.GetReadTimeout())
})
}

func TestMonoVertex_GetDaemonDeploymentName(t *testing.T) {
mv := MonoVertex{
ObjectMeta: metav1.ObjectMeta{
Name: "test-vertex",
},
}
expected := "test-vertex-mv-daemon"
assert.Equal(t, expected, mv.GetDaemonDeploymentName())
}

func TestMonoVertex_GetDaemonServiceURL(t *testing.T) {
mv := MonoVertex{
ObjectMeta: metav1.ObjectMeta{
Name: "test-vertex",
Namespace: "test-namespace",
},
}
expected := "test-vertex-mv-daemon-svc.test-namespace.svc:4327"
assert.Equal(t, expected, mv.GetDaemonServiceURL())
}

func TestMonoVertex_Scalable(t *testing.T) {
t.Run("scalable when not disabled", func(t *testing.T) {
mv := MonoVertex{
Spec: MonoVertexSpec{
Scale: Scale{
Disabled: false,
},
},
}
assert.True(t, mv.Scalable())
})

t.Run("not scalable when disabled", func(t *testing.T) {
mv := MonoVertex{
Spec: MonoVertexSpec{
Scale: Scale{
Disabled: true,
},
},
}
assert.False(t, mv.Scalable())
})
}

func TestMonoVertex_GetReplicas(t *testing.T) {
t.Run("default replicas", func(t *testing.T) {
mv := MonoVertex{}
assert.Equal(t, 1, mv.getReplicas())
})

t.Run("custom replicas", func(t *testing.T) {
replicas := int32(3)
mv := MonoVertex{
Spec: MonoVertexSpec{
Replicas: &replicas,
},
}
assert.Equal(t, 3, mv.getReplicas())
})
}

func TestMonoVertex_CalculateReplicas(t *testing.T) {
t.Run("auto scaling disabled", func(t *testing.T) {
replicas := int32(5)
mv := MonoVertex{
Spec: MonoVertexSpec{
Replicas: &replicas,
Scale: Scale{
Disabled: true,
},
},
}
assert.Equal(t, 5, mv.CalculateReplicas())
})

t.Run("auto scaling enabled, within range", func(t *testing.T) {
replicas := int32(3)
mv := MonoVertex{
Spec: MonoVertexSpec{
Replicas: &replicas,
Scale: Scale{
Disabled: false,
Min: ptr.To[int32](1),
Max: ptr.To[int32](5),
},
},
}
assert.Equal(t, 3, mv.CalculateReplicas())
})

t.Run("auto scaling enabled, below min", func(t *testing.T) {
replicas := int32(0)
mv := MonoVertex{
Spec: MonoVertexSpec{
Replicas: &replicas,
Scale: Scale{
Disabled: false,
Min: ptr.To[int32](2),
Max: ptr.To[int32](5),
},
},
}
assert.Equal(t, 2, mv.CalculateReplicas())
})

t.Run("auto scaling enabled, above max", func(t *testing.T) {
replicas := int32(10)
mv := MonoVertex{
Spec: MonoVertexSpec{
Replicas: &replicas,
Scale: Scale{
Disabled: false,
Min: ptr.To[int32](2),
Max: ptr.To[int32](5),
},
},
}
assert.Equal(t, 5, mv.CalculateReplicas())
})
}

func TestMonoVertex_GetServiceObj(t *testing.T) {
mv := MonoVertex{
ObjectMeta: metav1.ObjectMeta{
Name: "test-vertex",
Namespace: "test-namespace",
},
}

t.Run("non-headless service", func(t *testing.T) {
svc := mv.getServiceObj("test-service", false, 8080, "http")
assert.Equal(t, "test-service", svc.Name)
assert.Equal(t, "test-namespace", svc.Namespace)
assert.Equal(t, 1, len(svc.Spec.Ports))
assert.Equal(t, int32(8080), svc.Spec.Ports[0].Port)
assert.Equal(t, "http", svc.Spec.Ports[0].Name)
assert.NotEqual(t, "None", svc.Spec.ClusterIP)
})

t.Run("headless service", func(t *testing.T) {
svc := mv.getServiceObj("test-headless-service", true, 9090, "grpc")
assert.Equal(t, "test-headless-service", svc.Name)
assert.Equal(t, "test-namespace", svc.Namespace)
assert.Equal(t, 1, len(svc.Spec.Ports))
assert.Equal(t, int32(9090), svc.Spec.Ports[0].Port)
assert.Equal(t, "grpc", svc.Spec.Ports[0].Name)
assert.Equal(t, "None", svc.Spec.ClusterIP)
})

t.Run("verify labels", func(t *testing.T) {
svc := mv.getServiceObj("test-label-service", false, 7070, "metrics")
expectedLabels := map[string]string{
KeyPartOf: Project,
KeyManagedBy: ControllerMonoVertex,
KeyComponent: ComponentMonoVertex,
KeyMonoVertexName: "test-vertex",
}
assert.Equal(t, expectedLabels, svc.Labels)
})

t.Run("verify selector", func(t *testing.T) {
svc := mv.getServiceObj("test-selector-service", false, 6060, "admin")
expectedSelector := map[string]string{
KeyPartOf: Project,
KeyManagedBy: ControllerMonoVertex,
KeyComponent: ComponentMonoVertex,
KeyMonoVertexName: "test-vertex",
}
assert.Equal(t, expectedSelector, svc.Spec.Selector)
})
}

func TestMonoVertex_GetServiceObjs(t *testing.T) {
mv := MonoVertex{
ObjectMeta: metav1.ObjectMeta{
Name: "test-vertex",
Namespace: "test-namespace",
},
}

t.Run("verify service objects", func(t *testing.T) {
services := mv.GetServiceObjs()
assert.Equal(t, 1, len(services), "Expected 1 service object")

headlessService := services[0]
assert.Equal(t, mv.GetHeadlessServiceName(), headlessService.Name)
assert.Equal(t, "test-namespace", headlessService.Namespace)
assert.Equal(t, "None", headlessService.Spec.ClusterIP)
assert.Equal(t, 1, len(headlessService.Spec.Ports))
assert.Equal(t, int32(MonoVertexMetricsPort), headlessService.Spec.Ports[0].Port)
assert.Equal(t, MonoVertexMetricsPortName, headlessService.Spec.Ports[0].Name)
})
}

func TestMonoVertex_GetDaemonDeploymentObj(t *testing.T) {
mv := MonoVertex{
ObjectMeta: metav1.ObjectMeta{
Name: "test-vertex",
Namespace: "test-namespace",
},
Spec: MonoVertexSpec{},
}

t.Run("basic deployment object", func(t *testing.T) {
req := GetMonoVertexDaemonDeploymentReq{
Image: "test-image:latest",
PullPolicy: corev1.PullAlways,
DefaultResources: corev1.ResourceRequirements{
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("100m"),
corev1.ResourceMemory: resource.MustParse("128Mi"),
},
},
}

deployment, err := mv.GetDaemonDeploymentObj(req)
assert.NoError(t, err)
assert.NotNil(t, deployment)
assert.Equal(t, mv.GetDaemonDeploymentName(), deployment.Name)
assert.Equal(t, mv.Namespace, deployment.Namespace)
assert.Equal(t, "test-image:latest", deployment.Spec.Template.Spec.Containers[0].Image)
assert.Equal(t, corev1.PullAlways, deployment.Spec.Template.Spec.Containers[0].ImagePullPolicy)
assert.Equal(t, resource.MustParse("100m"), deployment.Spec.Template.Spec.Containers[0].Resources.Limits[corev1.ResourceCPU])
assert.Equal(t, resource.MustParse("128Mi"), deployment.Spec.Template.Spec.Containers[0].Resources.Limits[corev1.ResourceMemory])
})

t.Run("with custom environment variables", func(t *testing.T) {
req := GetMonoVertexDaemonDeploymentReq{
Image: "test-image:v1",
Env: []corev1.EnvVar{
{Name: "CUSTOM_ENV", Value: "custom_value"},
},
}

deployment, err := mv.GetDaemonDeploymentObj(req)
assert.NoError(t, err)
assert.NotNil(t, deployment)

envVars := deployment.Spec.Template.Spec.Containers[0].Env
assert.Contains(t, envVars, corev1.EnvVar{Name: "CUSTOM_ENV", Value: "custom_value"})
})

t.Run("with daemon template", func(t *testing.T) {
mv.Spec.DaemonTemplate = &DaemonTemplate{
Replicas: ptr.To[int32](3),
AbstractPodTemplate: AbstractPodTemplate{
NodeSelector: map[string]string{"node": "special"},
},
ContainerTemplate: &ContainerTemplate{
Resources: corev1.ResourceRequirements{
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("200m"),
},
},
},
}

req := GetMonoVertexDaemonDeploymentReq{
Image: "test-image:v2",
}

deployment, err := mv.GetDaemonDeploymentObj(req)
assert.NoError(t, err)
assert.NotNil(t, deployment)
assert.Equal(t, int32(3), *deployment.Spec.Replicas)
assert.Equal(t, "special", deployment.Spec.Template.Spec.NodeSelector["node"])
assert.Equal(t, resource.MustParse("200m"), deployment.Spec.Template.Spec.Containers[0].Resources.Limits[corev1.ResourceCPU])
})

t.Run("verify probes", func(t *testing.T) {
req := GetMonoVertexDaemonDeploymentReq{
Image: "test-image:v3",
}

deployment, err := mv.GetDaemonDeploymentObj(req)
assert.NoError(t, err)
assert.NotNil(t, deployment)

container := deployment.Spec.Template.Spec.Containers[0]
assert.NotNil(t, container.ReadinessProbe)
assert.NotNil(t, container.LivenessProbe)

assert.Equal(t, int32(MonoVertexDaemonServicePort), container.ReadinessProbe.HTTPGet.Port.IntVal)
assert.Equal(t, "/readyz", container.ReadinessProbe.HTTPGet.Path)
assert.Equal(t, corev1.URISchemeHTTPS, container.ReadinessProbe.HTTPGet.Scheme)

assert.Equal(t, int32(MonoVertexDaemonServicePort), container.LivenessProbe.HTTPGet.Port.IntVal)
assert.Equal(t, "/livez", container.LivenessProbe.HTTPGet.Path)
assert.Equal(t, corev1.URISchemeHTTPS, container.LivenessProbe.HTTPGet.Scheme)
})

t.Run("verify labels and owner references", func(t *testing.T) {
req := GetMonoVertexDaemonDeploymentReq{
Image: "test-image:v4",
}

deployment, err := mv.GetDaemonDeploymentObj(req)
assert.NoError(t, err)
assert.NotNil(t, deployment)

expectedLabels := map[string]string{
KeyPartOf: Project,
KeyManagedBy: ControllerMonoVertex,
KeyComponent: ComponentMonoVertexDaemon,
KeyAppName: mv.GetDaemonDeploymentName(),
KeyMonoVertexName: mv.Name,
}
assert.Equal(t, expectedLabels, deployment.Labels)

assert.Len(t, deployment.OwnerReferences, 1)
assert.Equal(t, mv.Name, deployment.OwnerReferences[0].Name)
assert.Equal(t, MonoVertexGroupVersionKind.Kind, deployment.OwnerReferences[0].Kind)
})
}

0 comments on commit a0d24a7

Please sign in to comment.