diff --git a/pkg/shared/health-status-code/code_map.go b/pkg/shared/health-status-code/code_map.go index 3dc408e8a6..44af395645 100644 --- a/pkg/shared/health-status-code/code_map.go +++ b/pkg/shared/health-status-code/code_map.go @@ -26,7 +26,6 @@ func newHealthCodeInfo(status string, criticality string) *HealthCodeInfo { // 2. Warning: The pipeline is in a warning state // 3. Healthy: The pipeline is healthy // 4. Unknown: The pipeline is in an unknown state - var vertexHealthMap = map[string]*HealthCodeInfo{ "V1": newHealthCodeInfo( "All pods are running", diff --git a/server/apis/v1/health.go b/server/apis/v1/health.go index c4c4e6c42e..23142fc9d7 100644 --- a/server/apis/v1/health.go +++ b/server/apis/v1/health.go @@ -153,9 +153,7 @@ func checkVertexLevelHealth(ctx context.Context, h *handler, ns string, // We first check if the vertex is in running state, if not, return the error message from the status func isVertexHealthy(h *handler, ns string, pipeline string, vertex *dfv1.Vertex, vertexName string) (bool, *resourceHealthResponse, error) { - log := logging.FromContext(context.Background()) // check if the vertex is in running state - log.Info("vertex status: ", vertex.Name, vertex.Status.Phase) if vertex.Status.Phase != dfv1.VertexPhaseRunning { // check if the number of replicas running in the vertex // are equal to the number of desired replicas @@ -184,7 +182,6 @@ func isVertexHealthy(h *handler, ns string, pipeline string, vertex *dfv1.Vertex Code: "V6", }, err } - log.Info("number of pods: ", len(pods.Items)) // Iterate over all the pods, and verify if all the containers in the pod are in running state for _, pod := range pods.Items { // Iterate over all the containers in the pod diff --git a/server/apis/v1/health_test.go b/server/apis/v1/health_test.go new file mode 100644 index 0000000000..a2bdab718e --- /dev/null +++ b/server/apis/v1/health_test.go @@ -0,0 +1,160 @@ +package v1 + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "golang.org/x/net/context" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/utils/pointer" + + dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" +) + +const ( + RunningPod = "running" + WaitingPod = "waiting" +) + +// Client is the struct to hold the Kubernetes Clientset +type Client struct { + Clientset kubernetes.Interface +} + +func (c Client) CreatePod(pod *v1.Pod) (*v1.Pod, error) { + _, err := c.Clientset.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) + if err != nil { + fmt.Printf("Error occured while creating pod %s: %s", pod.Name, err.Error()) + return nil, err + } + + fmt.Printf("Pod %s is succesfully created", pod.Name) + return pod, nil +} + +func createPod(phase string) { + client := Client{ + Clientset: fakeKubeClient, + } + pod := fakePod("test-pl", "test-vertex", testNamespace, phase) + _, err := client.CreatePod(pod) + if err != nil { + fmt.Print(err.Error()) + } + pod, err = client.Clientset.CoreV1().Pods(pod.Namespace).Get(context.Background(), pod.Name, metav1.GetOptions{}) + if err != nil { + fmt.Printf("Error occured while getting pod %s: %s", pod.Name, err.Error()) + } +} + +func removePod() { + client := Client{ + Clientset: fakeKubeClient, + } + pod := fakePod("test-pl", "test-vertex", testNamespace, "running") + err := client.Clientset.CoreV1().Pods(pod.Namespace).Delete(context.Background(), pod.Name, metav1.DeleteOptions{}) + if err != nil { + fmt.Printf("Error occured while deleting pod %s: %s", pod.Name, err.Error()) + } +} + +// fakePipeline returns a fake pipeline for testing + +// TestIsVertexHealthy verifies the functionality of the vertex level health check +// It creates a fake pipeline and checks the health of each vertex +// We test multiple scenarios: +// 1. All vertices are healthy +// 2. One or more vertices are unhealthy +// 3. The number of replicas running in the vertex is less than the expected number of replicas +// 4. Vertex is not in running state +func TestIsVertexHealthy(t *testing.T) { + // Test vertex is in running phase and the pods are in running state + t.Run("test all pods are healthy", func(t *testing.T) { + pipeline := fakePipeline() + vertexName := "test-vertex" + vertex := fakeVertex(vertexName, dfv1.VertexPhaseRunning) + + // Create fake handler + h := &handler{ + kubeClient: fakeKubeClient, + numaflowClient: &fakeNumaClient, + } + + // Create fake pod in running state + createPod(RunningPod) + defer removePod() + healthy, _, err := isVertexHealthy(h, testNamespace, pipeline.GetName(), vertex, vertexName) + if err != nil { + return + } + assert.True(t, healthy) + }) + + // Test vertex is in running phase and the pods are in waiting state + t.Run("test pod not running", func(t *testing.T) { + pipeline := fakePipeline() + vertexName := "test-vertex" + vertex := fakeVertex(vertexName, dfv1.VertexPhaseRunning) + + // Create fake handler + h := &handler{ + kubeClient: fakeKubeClient, + numaflowClient: &fakeNumaClient, + } + createPod(WaitingPod) + defer removePod() + healthy, r, err := isVertexHealthy(h, testNamespace, pipeline.GetName(), vertex, vertexName) + if err != nil { + return + } + assert.False(t, healthy) + assert.Equal(t, "V3", r.Code) + }) + + // Test vertex is not in running state + t.Run("test vertex not in running phase", func(t *testing.T) { + pipeline := fakePipeline() + vertexName := "test-vertex" + // Create a fake vertex in failed state + vertex := fakeVertex(vertexName, dfv1.VertexPhaseFailed) + + // Create fake handler + h := &handler{ + kubeClient: fakeKubeClient, + numaflowClient: &fakeNumaClient, + } + healthy, r, err := isVertexHealthy(h, testNamespace, pipeline.GetName(), vertex, vertexName) + if err != nil { + return + } + assert.False(t, healthy) + // Refer: pkg/shared/health-status-code + assert.Equal(t, "V2", r.Code) + }) + + // Test vertex replica count is not equal to the desired replica count + t.Run("test vertex replica count not equal to desired replica count", func(t *testing.T) { + pipeline := fakePipeline() + vertexName := "test-vertex" + // Create a fake vertex in failed state + vertex := fakeVertex(vertexName, dfv1.VertexPhaseFailed) + // Update the replica count to 2 + vertex.Spec.Replicas = pointer.Int32(2) + + // Create fake handler + h := &handler{ + kubeClient: fakeKubeClient, + numaflowClient: &fakeNumaClient, + } + healthy, r, err := isVertexHealthy(h, testNamespace, pipeline.GetName(), vertex, vertexName) + if err != nil { + return + } + assert.False(t, healthy) + // Refer: pkg/shared/health-status-code + assert.Equal(t, "V9", r.Code) + }) +} diff --git a/server/apis/v1/test_utils.go b/server/apis/v1/test_utils.go new file mode 100644 index 0000000000..e4e0a8d4e9 --- /dev/null +++ b/server/apis/v1/test_utils.go @@ -0,0 +1,152 @@ +package v1 + +import ( + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + fakeclientset "k8s.io/client-go/kubernetes/fake" + "k8s.io/utils/pointer" + + dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" + "github.com/numaproj/numaflow/pkg/client/clientset/versioned/typed/numaflow/v1alpha1/fake" +) + +const testNamespace = "test-ns" + +var ( + fakeNumaClient = fake.FakeNumaflowV1alpha1{} + fakeKubeClient = fakeclientset.NewSimpleClientset() + testContainerName = "test-container" +) + +// getContainerStatus returns a container status with the given phase. +func getContainerStatus(phase string) corev1.ContainerStatus { + switch phase { + case "running": + return corev1.ContainerStatus{ + Name: testContainerName, + Ready: true, + State: corev1.ContainerState{ + Running: &corev1.ContainerStateRunning{ + StartedAt: metav1.Time{Time: time.Now()}, + }, + }, + } + case "waiting": + return corev1.ContainerStatus{ + Name: testContainerName, + Ready: false, + State: corev1.ContainerState{ + Waiting: &corev1.ContainerStateWaiting{ + Reason: "test-reason", + Message: "test-message", + }, + }, + } + } + return corev1.ContainerStatus{} +} + +// fakePod returns a fake pod with the given pipeline name, vertex name, namespace and phase. +func fakePod(pipelineName string, vertexName string, namespace string, phase string) *corev1.Pod { + containerStatus := getContainerStatus(phase) + pod := &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: namespace, + Labels: map[string]string{ + dfv1.KeyPipelineName: pipelineName, + dfv1.KeyVertexName: vertexName, + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: testContainerName, + }, + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodPhase("Running"), + ContainerStatuses: []corev1.ContainerStatus{containerStatus}, + }, + } + return pod +} + +func fakePipeline() *dfv1.Pipeline { + return &dfv1.Pipeline{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pl", + Namespace: testNamespace, + }, + Spec: dfv1.PipelineSpec{ + Vertices: []dfv1.AbstractVertex{ + { + Name: "input", + Source: &dfv1.Source{ + UDTransformer: &dfv1.UDTransformer{ + Builtin: &dfv1.Transformer{Name: "filter"}, + }}, + }, + { + Name: "map", + UDF: &dfv1.UDF{ + Builtin: &dfv1.Function{Name: "cat"}, + }, + }, + { + Name: "reduce", + UDF: &dfv1.UDF{ + Container: &dfv1.Container{ + Image: "test-image", + }, + GroupBy: &dfv1.GroupBy{ + Window: dfv1.Window{ + Fixed: &dfv1.FixedWindow{Length: &metav1.Duration{ + Duration: 60 * time.Second, + }}, + }, + Keyed: true, + Storage: &dfv1.PBQStorage{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + }, + }, + { + Name: "output", + Sink: &dfv1.Sink{}, + }, + }, + Edges: []dfv1.Edge{ + {From: "input", To: "map"}, + {From: "map", To: "reduce"}, + {From: "reduce", To: "output"}, + }, + }, + } +} + +// fakeVertex returns a fake vertex with the given name and phase. +func fakeVertex(name string, phase dfv1.VertexPhase) *dfv1.Vertex { + v := &dfv1.Vertex{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: testNamespace, + }, + Status: dfv1.VertexStatus{ + Phase: phase, + Replicas: 1, + }, + Spec: dfv1.VertexSpec{ + Replicas: pointer.Int32(1), + }, + } + return v +}