Skip to content

Commit

Permalink
fix: e2e testing isbsvc deletion timeout issue (#1997)
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang authored Aug 23, 2024
1 parent 04b259a commit ae02243
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 23 deletions.
17 changes: 9 additions & 8 deletions pkg/apis/numaflow/v1alpha1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,15 @@ const (
ServingSourceContainer = "serving"

// components
ComponentISBSvc = "isbsvc"
ComponentDaemon = "daemon"
ComponentVertex = "vertex"
ComponentMonoVertex = "mono-vertex"
ComponentMonoVertexDaemon = "mono-vertex-daemon"
ComponentJob = "job"
ComponentSideInputManager = "side-inputs-manager"
ComponentUXServer = "numaflow-ux"
ComponentISBSvc = "isbsvc"
ComponentDaemon = "daemon"
ComponentVertex = "vertex"
ComponentMonoVertex = "mono-vertex"
ComponentMonoVertexDaemon = "mono-vertex-daemon"
ComponentJob = "job"
ComponentSideInputManager = "side-inputs-manager"
ComponentUXServer = "numaflow-ux"
ComponentControllerManager = "controller-manager"

// controllers
ControllerISBSvc = "isbsvc-controller"
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,7 @@ func (pls *PipelineStatus) MarkDeployFailed(reason, message string) {
pls.SetPhase(PipelinePhaseFailed, message)
}

// MarkVerticesHealthy set the daemon service of the pipeline is healthy.
// MarkDaemonServiceHealthy set the daemon service of the pipeline is healthy.
func (pls *PipelineStatus) MarkDaemonServiceHealthy() {
pls.MarkTrue(PipelineConditionDaemonServiceHealthy)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/reconciler/isbsvc/installer/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ func (r *jetStreamInstaller) createConfigMap(ctx context.Context) error {
svcName := generateJetStreamServiceName(r.isbSvc)
ssName := generateJetStreamStatefulSetName(r.isbSvc)
replicas := r.isbSvc.Spec.JetStream.GetReplicas()
routes := []string{}
var routes []string
for j := 0; j < replicas; j++ {
routes = append(routes, fmt.Sprintf("nats://%s-%s.%s.%s.svc:%s", ssName, strconv.Itoa(j), svcName, r.isbSvc.Namespace, strconv.Itoa(int(clusterPort))))
}
Expand Down
24 changes: 16 additions & 8 deletions test/fixtures/e2e_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,13 @@ import (
)

const (
Namespace = "numaflow-system"
Label = "numaflow-e2e"
LabelValue = "true"
ISBSvcName = "numaflow-e2e"
Namespace = "numaflow-system"
Label = "numaflow-e2e"
LabelValue = "true"
ISBSvcName = "numaflow-e2e"
// the number 90 is carefully chosen to ensure the test suite can finish within a reasonable time without timing out.
// please exercise caution when updating this value, as it may cause e2e tests to be flaky.
// if updated, consider running the entire e2e test suite multiple times to ensure stability.
defaultTimeout = 90 * time.Second

LogSourceVertexStarted = "Start processing source messages"
Expand Down Expand Up @@ -139,13 +142,18 @@ func (s *E2ESuite) TearDownSuite() {
When().
Wait(5 * time.Second).
DeleteISBSvc().
Wait(3 * time.Second).
Wait(3 * time.Second)
// force deleting the ISB svc pods because we have seen pods stuck in terminating state after CRD deletion,
// which causes e2e tests to timeout, this is a workaround to avoid the issue.
deleteISBPodsCMD := fmt.Sprintf("kubectl delete pods -n %s -l %s=%s,%s=%s --ignore-not-found=true --grace-period=0 --force", Namespace, dfv1.KeyComponent, dfv1.ComponentISBSvc, dfv1.KeyISBSvcName, ISBSvcName)
s.Given().When().Exec("sh", []string{"-c", deleteISBPodsCMD}, OutputRegexp(""))
s.Given().ISBSvc(getISBSvcSpec()).
When().
Expect().
ISBSvcDeleted(defaultTimeout)

s.T().Log("ISB svc is deleted")
deleteCMD := fmt.Sprintf("kubectl delete -k ../../config/apps/redis -n %s --ignore-not-found=true", Namespace)
s.Given().When().Exec("sh", []string{"-c", deleteCMD}, OutputRegexp(`service "redis" deleted`))
deleteRedisCMD := fmt.Sprintf("kubectl delete -k ../../config/apps/redis -n %s --ignore-not-found=true", Namespace)
s.Given().When().Exec("sh", []string{"-c", deleteRedisCMD}, OutputRegexp(`service "redis" deleted`))
s.T().Log("Redis resources are deleted")
close(s.stopch)
}
Expand Down
2 changes: 1 addition & 1 deletion test/fixtures/expect.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (t *Expect) ISBSvcDeleted(timeout time.Duration) *Expect {
t.t.Fatalf("Expected ISB svc to be deleted: %v", err)
}

labelSelector := fmt.Sprintf("%s=isbsvc-controller,%s=%s", dfv1.KeyManagedBy, dfv1.KeyISBSvcName, ISBSvcName)
labelSelector := fmt.Sprintf("%s=%s,%s=%s", dfv1.KeyManagedBy, dfv1.ControllerISBSvc, dfv1.KeyISBSvcName, ISBSvcName)
opts := metav1.ListOptions{LabelSelector: labelSelector}
timeoutCh := make(chan bool, 1)
go func() {
Expand Down
6 changes: 3 additions & 3 deletions test/fixtures/given.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Given struct {
kubeClient kubernetes.Interface
}

// creates an ISBSvc based on the parameter, this may be:
// ISBSvc creates an ISBSvc based on the parameter, this may be:
//
// 1. A file name if it starts with "@"
// 2. Raw YAML.
Expand All @@ -61,7 +61,7 @@ func (g *Given) ISBSvc(text string) *Given {
return g
}

// creates a Pipeline based on the parameter, this may be:
// Pipeline creates a Pipeline based on the parameter, this may be:
//
// 1. A file name if it starts with "@"
// 2. Raw YAML.
Expand All @@ -79,7 +79,7 @@ func (g *Given) Pipeline(text string) *Given {
return g
}

// / creates a MonoVertex based on the parameter, this may be:
// MonoVertex creates a MonoVertex based on the parameter, this may be:
//
// 1. A file name if it starts with "@"
// 2. Raw YAML.
Expand Down
40 changes: 39 additions & 1 deletion test/fixtures/when.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (w *When) TerminateAllPodPortForwards() *When {
return w
}

func (w *When) StreamVertexPodlogs(vertexName, containerName string) *When {
func (w *When) StreamVertexPodLogs(vertexName, containerName string) *When {
w.t.Helper()
ctx := context.Background()
labelSelector := fmt.Sprintf("%s=%s,%s=%s", dfv1.KeyPipelineName, w.pipeline.Name, dfv1.KeyVertexName, vertexName)
Expand All @@ -295,6 +295,44 @@ func (w *When) StreamVertexPodlogs(vertexName, containerName string) *When {
return w
}

func (w *When) StreamISBLogs() *When {
w.t.Helper()
ctx := context.Background()
labelSelector := fmt.Sprintf("%s=%s,%s=%s", dfv1.KeyComponent, dfv1.ComponentISBSvc, dfv1.KeyManagedBy, dfv1.ControllerISBSvc)
podList, err := w.kubeClient.CoreV1().Pods(Namespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector, FieldSelector: "status.phase=Running"})
if err != nil {
w.t.Fatalf("Error getting ISB service pods: %v", err)
}
for _, pod := range podList.Items {
stopCh := make(chan struct{}, 1)
streamPodLogs(ctx, w.kubeClient, Namespace, pod.Name, "main", stopCh)
if w.streamLogsStopChannels == nil {
w.streamLogsStopChannels = make(map[string]chan struct{})
}
w.streamLogsStopChannels[pod.Name+":main"] = stopCh
}
return w
}

func (w *When) StreamControllerLogs() *When {
w.t.Helper()
ctx := context.Background()
labelSelector := fmt.Sprintf("%s=%s", dfv1.KeyComponent, dfv1.ComponentControllerManager)
podList, err := w.kubeClient.CoreV1().Pods(Namespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector, FieldSelector: "status.phase=Running"})
if err != nil {
w.t.Fatalf("Error getting the controller pods: %v", err)
}
for _, pod := range podList.Items {
stopCh := make(chan struct{}, 1)
streamPodLogs(ctx, w.kubeClient, Namespace, pod.Name, "controller-manager", stopCh)
if w.streamLogsStopChannels == nil {
w.streamLogsStopChannels = make(map[string]chan struct{})
}
w.streamLogsStopChannels[pod.Name+":controller-manager"] = stopCh
}
return w
}

func (w *When) TerminateAllPodLogs() *When {
w.t.Helper()
if len(w.streamLogsStopChannels) > 0 {
Expand Down

0 comments on commit ae02243

Please sign in to comment.