Skip to content

Commit

Permalink
add unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid committed Dec 9, 2023
1 parent b009ed7 commit 43def13
Show file tree
Hide file tree
Showing 4 changed files with 312 additions and 4 deletions.
1 change: 0 additions & 1 deletion pkg/shared/health-status-code/code_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ func newHealthCodeInfo(status string, criticality string) *HealthCodeInfo {
// 2. Warning: The pipeline is in a warning state
// 3. Healthy: The pipeline is healthy
// 4. Unknown: The pipeline is in an unknown state

var vertexHealthMap = map[string]*HealthCodeInfo{
"V1": newHealthCodeInfo(
"All pods are running",
Expand Down
3 changes: 0 additions & 3 deletions server/apis/v1/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,7 @@ func checkVertexLevelHealth(ctx context.Context, h *handler, ns string,
// We first check if the vertex is in running state, if not, return the error message from the status
func isVertexHealthy(h *handler, ns string, pipeline string, vertex *dfv1.Vertex,
vertexName string) (bool, *resourceHealthResponse, error) {
log := logging.FromContext(context.Background())
// check if the vertex is in running state
log.Info("vertex status: ", vertex.Name, vertex.Status.Phase)
if vertex.Status.Phase != dfv1.VertexPhaseRunning {
// check if the number of replicas running in the vertex
// are equal to the number of desired replicas
Expand Down Expand Up @@ -184,7 +182,6 @@ func isVertexHealthy(h *handler, ns string, pipeline string, vertex *dfv1.Vertex
Code: "V6",
}, err
}
log.Info("number of pods: ", len(pods.Items))
// Iterate over all the pods, and verify if all the containers in the pod are in running state
for _, pod := range pods.Items {
// Iterate over all the containers in the pod
Expand Down
160 changes: 160 additions & 0 deletions server/apis/v1/health_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package v1

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/utils/pointer"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
)

const (
RunningPod = "running"
WaitingPod = "waiting"
)

// Client is the struct to hold the Kubernetes Clientset
type Client struct {
Clientset kubernetes.Interface
}

func (c Client) CreatePod(pod *v1.Pod) (*v1.Pod, error) {
_, err := c.Clientset.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
if err != nil {
fmt.Printf("Error occured while creating pod %s: %s", pod.Name, err.Error())
return nil, err
}

fmt.Printf("Pod %s is succesfully created", pod.Name)
return pod, nil
}

func createPod(phase string) {
client := Client{
Clientset: fakeKubeClient,
}
pod := fakePod("test-pl", "test-vertex", testNamespace, phase)
_, err := client.CreatePod(pod)
if err != nil {
fmt.Print(err.Error())
}
pod, err = client.Clientset.CoreV1().Pods(pod.Namespace).Get(context.Background(), pod.Name, metav1.GetOptions{})
if err != nil {
fmt.Printf("Error occured while getting pod %s: %s", pod.Name, err.Error())
}
}

func removePod() {
client := Client{
Clientset: fakeKubeClient,
}
pod := fakePod("test-pl", "test-vertex", testNamespace, "running")
err := client.Clientset.CoreV1().Pods(pod.Namespace).Delete(context.Background(), pod.Name, metav1.DeleteOptions{})
if err != nil {
fmt.Printf("Error occured while deleting pod %s: %s", pod.Name, err.Error())
}
}

// fakePipeline returns a fake pipeline for testing

// TestIsVertexHealthy verifies the functionality of the vertex level health check
// It creates a fake pipeline and checks the health of each vertex
// We test multiple scenarios:
// 1. All vertices are healthy
// 2. One or more vertices are unhealthy
// 3. The number of replicas running in the vertex is less than the expected number of replicas
// 4. Vertex is not in running state
func TestIsVertexHealthy(t *testing.T) {
// Test vertex is in running phase and the pods are in running state
t.Run("test all pods are healthy", func(t *testing.T) {
pipeline := fakePipeline()
vertexName := "test-vertex"
vertex := fakeVertex(vertexName, dfv1.VertexPhaseRunning)

// Create fake handler
h := &handler{
kubeClient: fakeKubeClient,
numaflowClient: &fakeNumaClient,
}

// Create fake pod in running state
createPod(RunningPod)
defer removePod()
healthy, _, err := isVertexHealthy(h, testNamespace, pipeline.GetName(), vertex, vertexName)
if err != nil {
return
}
assert.True(t, healthy)
})

// Test vertex is in running phase and the pods are in waiting state
t.Run("test pod not running", func(t *testing.T) {
pipeline := fakePipeline()
vertexName := "test-vertex"
vertex := fakeVertex(vertexName, dfv1.VertexPhaseRunning)

// Create fake handler
h := &handler{
kubeClient: fakeKubeClient,
numaflowClient: &fakeNumaClient,
}
createPod(WaitingPod)
defer removePod()
healthy, r, err := isVertexHealthy(h, testNamespace, pipeline.GetName(), vertex, vertexName)
if err != nil {
return
}
assert.False(t, healthy)
assert.Equal(t, "V3", r.Code)
})

// Test vertex is not in running state
t.Run("test vertex not in running phase", func(t *testing.T) {
pipeline := fakePipeline()
vertexName := "test-vertex"
// Create a fake vertex in failed state
vertex := fakeVertex(vertexName, dfv1.VertexPhaseFailed)

// Create fake handler
h := &handler{
kubeClient: fakeKubeClient,
numaflowClient: &fakeNumaClient,
}
healthy, r, err := isVertexHealthy(h, testNamespace, pipeline.GetName(), vertex, vertexName)
if err != nil {
return
}
assert.False(t, healthy)
// Refer: pkg/shared/health-status-code
assert.Equal(t, "V2", r.Code)
})

// Test vertex replica count is not equal to the desired replica count
t.Run("test vertex replica count not equal to desired replica count", func(t *testing.T) {
pipeline := fakePipeline()
vertexName := "test-vertex"
// Create a fake vertex in failed state
vertex := fakeVertex(vertexName, dfv1.VertexPhaseFailed)
// Update the replica count to 2
vertex.Spec.Replicas = pointer.Int32(2)

// Create fake handler
h := &handler{
kubeClient: fakeKubeClient,
numaflowClient: &fakeNumaClient,
}
healthy, r, err := isVertexHealthy(h, testNamespace, pipeline.GetName(), vertex, vertexName)
if err != nil {
return
}
assert.False(t, healthy)
// Refer: pkg/shared/health-status-code
assert.Equal(t, "V9", r.Code)
})
}
152 changes: 152 additions & 0 deletions server/apis/v1/test_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package v1

import (
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
fakeclientset "k8s.io/client-go/kubernetes/fake"
"k8s.io/utils/pointer"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/client/clientset/versioned/typed/numaflow/v1alpha1/fake"
)

const testNamespace = "test-ns"

var (
fakeNumaClient = fake.FakeNumaflowV1alpha1{}
fakeKubeClient = fakeclientset.NewSimpleClientset()
testContainerName = "test-container"
)

// getContainerStatus returns a container status with the given phase.
func getContainerStatus(phase string) corev1.ContainerStatus {
switch phase {
case "running":
return corev1.ContainerStatus{
Name: testContainerName,
Ready: true,
State: corev1.ContainerState{
Running: &corev1.ContainerStateRunning{
StartedAt: metav1.Time{Time: time.Now()},
},
},
}
case "waiting":
return corev1.ContainerStatus{
Name: testContainerName,
Ready: false,
State: corev1.ContainerState{
Waiting: &corev1.ContainerStateWaiting{
Reason: "test-reason",
Message: "test-message",
},
},
}
}
return corev1.ContainerStatus{}
}

// fakePod returns a fake pod with the given pipeline name, vertex name, namespace and phase.
func fakePod(pipelineName string, vertexName string, namespace string, phase string) *corev1.Pod {
containerStatus := getContainerStatus(phase)
pod := &corev1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Namespace: namespace,
Labels: map[string]string{
dfv1.KeyPipelineName: pipelineName,
dfv1.KeyVertexName: vertexName,
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: testContainerName,
},
},
},
Status: corev1.PodStatus{
Phase: corev1.PodPhase("Running"),
ContainerStatuses: []corev1.ContainerStatus{containerStatus},
},
}
return pod
}

func fakePipeline() *dfv1.Pipeline {
return &dfv1.Pipeline{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pl",
Namespace: testNamespace,
},
Spec: dfv1.PipelineSpec{
Vertices: []dfv1.AbstractVertex{
{
Name: "input",
Source: &dfv1.Source{
UDTransformer: &dfv1.UDTransformer{
Builtin: &dfv1.Transformer{Name: "filter"},
}},
},
{
Name: "map",
UDF: &dfv1.UDF{
Builtin: &dfv1.Function{Name: "cat"},
},
},
{
Name: "reduce",
UDF: &dfv1.UDF{
Container: &dfv1.Container{
Image: "test-image",
},
GroupBy: &dfv1.GroupBy{
Window: dfv1.Window{
Fixed: &dfv1.FixedWindow{Length: &metav1.Duration{
Duration: 60 * time.Second,
}},
},
Keyed: true,
Storage: &dfv1.PBQStorage{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
},
},
},
{
Name: "output",
Sink: &dfv1.Sink{},
},
},
Edges: []dfv1.Edge{
{From: "input", To: "map"},
{From: "map", To: "reduce"},
{From: "reduce", To: "output"},
},
},
}
}

// fakeVertex returns a fake vertex with the given name and phase.
func fakeVertex(name string, phase dfv1.VertexPhase) *dfv1.Vertex {
v := &dfv1.Vertex{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: testNamespace,
},
Status: dfv1.VertexStatus{
Phase: phase,
Replicas: 1,
},
Spec: dfv1.VertexSpec{
Replicas: pointer.Int32(1),
},
}
return v
}

0 comments on commit 43def13

Please sign in to comment.