From ae02243b3f30de8da407b148bbac7cb2e48a68c4 Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Fri, 23 Aug 2024 19:14:20 -0400 Subject: [PATCH] fix: e2e testing isbsvc deletion timeout issue (#1997) Signed-off-by: Keran Yang --- pkg/apis/numaflow/v1alpha1/const.go | 17 +++++---- pkg/apis/numaflow/v1alpha1/pipeline_types.go | 2 +- pkg/reconciler/isbsvc/installer/jetstream.go | 2 +- test/fixtures/e2e_suite.go | 24 ++++++++---- test/fixtures/expect.go | 2 +- test/fixtures/given.go | 6 +-- test/fixtures/when.go | 40 +++++++++++++++++++- 7 files changed, 70 insertions(+), 23 deletions(-) diff --git a/pkg/apis/numaflow/v1alpha1/const.go b/pkg/apis/numaflow/v1alpha1/const.go index 177e6f73a1..8677101378 100644 --- a/pkg/apis/numaflow/v1alpha1/const.go +++ b/pkg/apis/numaflow/v1alpha1/const.go @@ -87,14 +87,15 @@ const ( ServingSourceContainer = "serving" // components - ComponentISBSvc = "isbsvc" - ComponentDaemon = "daemon" - ComponentVertex = "vertex" - ComponentMonoVertex = "mono-vertex" - ComponentMonoVertexDaemon = "mono-vertex-daemon" - ComponentJob = "job" - ComponentSideInputManager = "side-inputs-manager" - ComponentUXServer = "numaflow-ux" + ComponentISBSvc = "isbsvc" + ComponentDaemon = "daemon" + ComponentVertex = "vertex" + ComponentMonoVertex = "mono-vertex" + ComponentMonoVertexDaemon = "mono-vertex-daemon" + ComponentJob = "job" + ComponentSideInputManager = "side-inputs-manager" + ComponentUXServer = "numaflow-ux" + ComponentControllerManager = "controller-manager" // controllers ControllerISBSvc = "isbsvc-controller" diff --git a/pkg/apis/numaflow/v1alpha1/pipeline_types.go b/pkg/apis/numaflow/v1alpha1/pipeline_types.go index 010b53bf20..07d62f673d 100644 --- a/pkg/apis/numaflow/v1alpha1/pipeline_types.go +++ b/pkg/apis/numaflow/v1alpha1/pipeline_types.go @@ -690,7 +690,7 @@ func (pls *PipelineStatus) MarkDeployFailed(reason, message string) { pls.SetPhase(PipelinePhaseFailed, message) } -// MarkVerticesHealthy set the daemon service of the pipeline is healthy. +// MarkDaemonServiceHealthy set the daemon service of the pipeline is healthy. func (pls *PipelineStatus) MarkDaemonServiceHealthy() { pls.MarkTrue(PipelineConditionDaemonServiceHealthy) } diff --git a/pkg/reconciler/isbsvc/installer/jetstream.go b/pkg/reconciler/isbsvc/installer/jetstream.go index d887f79e38..f0faa09caa 100644 --- a/pkg/reconciler/isbsvc/installer/jetstream.go +++ b/pkg/reconciler/isbsvc/installer/jetstream.go @@ -411,7 +411,7 @@ func (r *jetStreamInstaller) createConfigMap(ctx context.Context) error { svcName := generateJetStreamServiceName(r.isbSvc) ssName := generateJetStreamStatefulSetName(r.isbSvc) replicas := r.isbSvc.Spec.JetStream.GetReplicas() - routes := []string{} + var routes []string for j := 0; j < replicas; j++ { routes = append(routes, fmt.Sprintf("nats://%s-%s.%s.%s.svc:%s", ssName, strconv.Itoa(j), svcName, r.isbSvc.Namespace, strconv.Itoa(int(clusterPort)))) } diff --git a/test/fixtures/e2e_suite.go b/test/fixtures/e2e_suite.go index 64b27e3eb9..be1255a037 100644 --- a/test/fixtures/e2e_suite.go +++ b/test/fixtures/e2e_suite.go @@ -38,10 +38,13 @@ import ( ) const ( - Namespace = "numaflow-system" - Label = "numaflow-e2e" - LabelValue = "true" - ISBSvcName = "numaflow-e2e" + Namespace = "numaflow-system" + Label = "numaflow-e2e" + LabelValue = "true" + ISBSvcName = "numaflow-e2e" + // the number 90 is carefully chosen to ensure the test suite can finish within a reasonable time without timing out. + // please exercise caution when updating this value, as it may cause e2e tests to be flaky. + // if updated, consider running the entire e2e test suite multiple times to ensure stability. defaultTimeout = 90 * time.Second LogSourceVertexStarted = "Start processing source messages" @@ -139,13 +142,18 @@ func (s *E2ESuite) TearDownSuite() { When(). Wait(5 * time.Second). DeleteISBSvc(). - Wait(3 * time.Second). + Wait(3 * time.Second) + // force deleting the ISB svc pods because we have seen pods stuck in terminating state after CRD deletion, + // which causes e2e tests to timeout, this is a workaround to avoid the issue. + deleteISBPodsCMD := fmt.Sprintf("kubectl delete pods -n %s -l %s=%s,%s=%s --ignore-not-found=true --grace-period=0 --force", Namespace, dfv1.KeyComponent, dfv1.ComponentISBSvc, dfv1.KeyISBSvcName, ISBSvcName) + s.Given().When().Exec("sh", []string{"-c", deleteISBPodsCMD}, OutputRegexp("")) + s.Given().ISBSvc(getISBSvcSpec()). + When(). Expect(). ISBSvcDeleted(defaultTimeout) - s.T().Log("ISB svc is deleted") - deleteCMD := fmt.Sprintf("kubectl delete -k ../../config/apps/redis -n %s --ignore-not-found=true", Namespace) - s.Given().When().Exec("sh", []string{"-c", deleteCMD}, OutputRegexp(`service "redis" deleted`)) + deleteRedisCMD := fmt.Sprintf("kubectl delete -k ../../config/apps/redis -n %s --ignore-not-found=true", Namespace) + s.Given().When().Exec("sh", []string{"-c", deleteRedisCMD}, OutputRegexp(`service "redis" deleted`)) s.T().Log("Redis resources are deleted") close(s.stopch) } diff --git a/test/fixtures/expect.go b/test/fixtures/expect.go index 2e550653fe..afa30447d9 100644 --- a/test/fixtures/expect.go +++ b/test/fixtures/expect.go @@ -76,7 +76,7 @@ func (t *Expect) ISBSvcDeleted(timeout time.Duration) *Expect { t.t.Fatalf("Expected ISB svc to be deleted: %v", err) } - labelSelector := fmt.Sprintf("%s=isbsvc-controller,%s=%s", dfv1.KeyManagedBy, dfv1.KeyISBSvcName, ISBSvcName) + labelSelector := fmt.Sprintf("%s=%s,%s=%s", dfv1.KeyManagedBy, dfv1.ControllerISBSvc, dfv1.KeyISBSvcName, ISBSvcName) opts := metav1.ListOptions{LabelSelector: labelSelector} timeoutCh := make(chan bool, 1) go func() { diff --git a/test/fixtures/given.go b/test/fixtures/given.go index 30a4ab01a7..e6f2d0dbef 100644 --- a/test/fixtures/given.go +++ b/test/fixtures/given.go @@ -43,7 +43,7 @@ type Given struct { kubeClient kubernetes.Interface } -// creates an ISBSvc based on the parameter, this may be: +// ISBSvc creates an ISBSvc based on the parameter, this may be: // // 1. A file name if it starts with "@" // 2. Raw YAML. @@ -61,7 +61,7 @@ func (g *Given) ISBSvc(text string) *Given { return g } -// creates a Pipeline based on the parameter, this may be: +// Pipeline creates a Pipeline based on the parameter, this may be: // // 1. A file name if it starts with "@" // 2. Raw YAML. @@ -79,7 +79,7 @@ func (g *Given) Pipeline(text string) *Given { return g } -// / creates a MonoVertex based on the parameter, this may be: +// MonoVertex creates a MonoVertex based on the parameter, this may be: // // 1. A file name if it starts with "@" // 2. Raw YAML. diff --git a/test/fixtures/when.go b/test/fixtures/when.go index 7ab85ae772..986085bdd6 100644 --- a/test/fixtures/when.go +++ b/test/fixtures/when.go @@ -276,7 +276,7 @@ func (w *When) TerminateAllPodPortForwards() *When { return w } -func (w *When) StreamVertexPodlogs(vertexName, containerName string) *When { +func (w *When) StreamVertexPodLogs(vertexName, containerName string) *When { w.t.Helper() ctx := context.Background() labelSelector := fmt.Sprintf("%s=%s,%s=%s", dfv1.KeyPipelineName, w.pipeline.Name, dfv1.KeyVertexName, vertexName) @@ -295,6 +295,44 @@ func (w *When) StreamVertexPodlogs(vertexName, containerName string) *When { return w } +func (w *When) StreamISBLogs() *When { + w.t.Helper() + ctx := context.Background() + labelSelector := fmt.Sprintf("%s=%s,%s=%s", dfv1.KeyComponent, dfv1.ComponentISBSvc, dfv1.KeyManagedBy, dfv1.ControllerISBSvc) + podList, err := w.kubeClient.CoreV1().Pods(Namespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector, FieldSelector: "status.phase=Running"}) + if err != nil { + w.t.Fatalf("Error getting ISB service pods: %v", err) + } + for _, pod := range podList.Items { + stopCh := make(chan struct{}, 1) + streamPodLogs(ctx, w.kubeClient, Namespace, pod.Name, "main", stopCh) + if w.streamLogsStopChannels == nil { + w.streamLogsStopChannels = make(map[string]chan struct{}) + } + w.streamLogsStopChannels[pod.Name+":main"] = stopCh + } + return w +} + +func (w *When) StreamControllerLogs() *When { + w.t.Helper() + ctx := context.Background() + labelSelector := fmt.Sprintf("%s=%s", dfv1.KeyComponent, dfv1.ComponentControllerManager) + podList, err := w.kubeClient.CoreV1().Pods(Namespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector, FieldSelector: "status.phase=Running"}) + if err != nil { + w.t.Fatalf("Error getting the controller pods: %v", err) + } + for _, pod := range podList.Items { + stopCh := make(chan struct{}, 1) + streamPodLogs(ctx, w.kubeClient, Namespace, pod.Name, "controller-manager", stopCh) + if w.streamLogsStopChannels == nil { + w.streamLogsStopChannels = make(map[string]chan struct{}) + } + w.streamLogsStopChannels[pod.Name+":controller-manager"] = stopCh + } + return w +} + func (w *When) TerminateAllPodLogs() *When { w.t.Helper() if len(w.streamLogsStopChannels) > 0 {