diff --git a/api/json-schema/schema.json b/api/json-schema/schema.json
index 27796003ba..44e18ef85a 100644
--- a/api/json-schema/schema.json
+++ b/api/json-schema/schema.json
@@ -19691,7 +19691,7 @@
"x-kubernetes-patch-strategy": "merge"
},
"drainedOnPause": {
- "description": "Field to indicate if a pipeline drain successfully occurred, or it timed out. Set to true when the Pipeline is in Paused state, and after it has successfully been drained. defaults to false",
+ "description": "Field to indicate if a pipeline drain successfully occurred, only meaningful when the pipeline is paused. True means it has been successfully drained.",
"type": "boolean"
},
"lastUpdated": {
diff --git a/api/openapi-spec/swagger.json b/api/openapi-spec/swagger.json
index bb918bac66..7067416291 100644
--- a/api/openapi-spec/swagger.json
+++ b/api/openapi-spec/swagger.json
@@ -19678,7 +19678,7 @@
"x-kubernetes-patch-strategy": "merge"
},
"drainedOnPause": {
- "description": "Field to indicate if a pipeline drain successfully occurred, or it timed out. Set to true when the Pipeline is in Paused state, and after it has successfully been drained. defaults to false",
+ "description": "Field to indicate if a pipeline drain successfully occurred, only meaningful when the pipeline is paused. True means it has been successfully drained.",
"type": "boolean"
},
"lastUpdated": {
diff --git a/config/base/crds/full/numaflow.numaproj.io_pipelines.yaml b/config/base/crds/full/numaflow.numaproj.io_pipelines.yaml
index 9670b018e0..6fc509dc96 100644
--- a/config/base/crds/full/numaflow.numaproj.io_pipelines.yaml
+++ b/config/base/crds/full/numaflow.numaproj.io_pipelines.yaml
@@ -9822,7 +9822,6 @@ spec:
type: object
type: array
drainedOnPause:
- default: false
type: boolean
lastUpdated:
format: date-time
diff --git a/config/install.yaml b/config/install.yaml
index ac272ddf19..d19f2fa2f2 100644
--- a/config/install.yaml
+++ b/config/install.yaml
@@ -18095,7 +18095,6 @@ spec:
type: object
type: array
drainedOnPause:
- default: false
type: boolean
lastUpdated:
format: date-time
diff --git a/config/namespace-install.yaml b/config/namespace-install.yaml
index 12579dc36f..5919b0fcf8 100644
--- a/config/namespace-install.yaml
+++ b/config/namespace-install.yaml
@@ -18095,7 +18095,6 @@ spec:
type: object
type: array
drainedOnPause:
- default: false
type: boolean
lastUpdated:
format: date-time
diff --git a/docs/APIs.md b/docs/APIs.md
index 5fd26ad505..bd71cf0592 100644
--- a/docs/APIs.md
+++ b/docs/APIs.md
@@ -8011,11 +8011,12 @@ The generation observed by the Pipeline controller.
+(Optional)
-Field to indicate if a pipeline drain successfully occurred, or it timed
-out. Set to true when the Pipeline is in Paused state, and after it has
-successfully been drained. defaults to false
+Field to indicate if a pipeline drain successfully occurred, only
+meaningful when the pipeline is paused. True means it has been
+successfully drained.
|
diff --git a/examples/1-simple-pipeline.yaml b/examples/1-simple-pipeline.yaml
index 42e9d9e095..e790fa3150 100644
--- a/examples/1-simple-pipeline.yaml
+++ b/examples/1-simple-pipeline.yaml
@@ -27,4 +27,4 @@ spec:
- from: in
to: cat
- from: cat
- to: out
\ No newline at end of file
+ to: out
diff --git a/pkg/apis/numaflow/v1alpha1/generated.proto b/pkg/apis/numaflow/v1alpha1/generated.proto
index 1e6e6bd35a..78e10457a0 100644
--- a/pkg/apis/numaflow/v1alpha1/generated.proto
+++ b/pkg/apis/numaflow/v1alpha1/generated.proto
@@ -1229,10 +1229,9 @@ message PipelineStatus {
// +optional
optional int64 observedGeneration = 11;
- // Field to indicate if a pipeline drain successfully occurred, or it timed out.
- // Set to true when the Pipeline is in Paused state, and after it has successfully been drained.
- // defaults to false
- // +kubebuilder:default=false
+ // Field to indicate if a pipeline drain successfully occurred, only meaningful when the pipeline is paused.
+ // True means it has been successfully drained.
+ // +optional
optional bool drainedOnPause = 12;
}
diff --git a/pkg/apis/numaflow/v1alpha1/openapi_generated.go b/pkg/apis/numaflow/v1alpha1/openapi_generated.go
index a339fdfb7a..765dd96681 100644
--- a/pkg/apis/numaflow/v1alpha1/openapi_generated.go
+++ b/pkg/apis/numaflow/v1alpha1/openapi_generated.go
@@ -4151,7 +4151,7 @@ func schema_pkg_apis_numaflow_v1alpha1_PipelineStatus(ref common.ReferenceCallba
},
"drainedOnPause": {
SchemaProps: spec.SchemaProps{
- Description: "Field to indicate if a pipeline drain successfully occurred, or it timed out. Set to true when the Pipeline is in Paused state, and after it has successfully been drained. defaults to false",
+ Description: "Field to indicate if a pipeline drain successfully occurred, only meaningful when the pipeline is paused. True means it has been successfully drained.",
Type: []string{"boolean"},
Format: "",
},
diff --git a/pkg/apis/numaflow/v1alpha1/pipeline_types.go b/pkg/apis/numaflow/v1alpha1/pipeline_types.go
index c68a7d647c..ff8dfaf5e5 100644
--- a/pkg/apis/numaflow/v1alpha1/pipeline_types.go
+++ b/pkg/apis/numaflow/v1alpha1/pipeline_types.go
@@ -633,10 +633,9 @@ type PipelineStatus struct {
// The generation observed by the Pipeline controller.
// +optional
ObservedGeneration int64 `json:"observedGeneration,omitempty" protobuf:"varint,11,opt,name=observedGeneration"`
- // Field to indicate if a pipeline drain successfully occurred, or it timed out.
- // Set to true when the Pipeline is in Paused state, and after it has successfully been drained.
- // defaults to false
- // +kubebuilder:default=false
+ // Field to indicate if a pipeline drain successfully occurred, only meaningful when the pipeline is paused.
+ // True means it has been successfully drained.
+ // +optional
DrainedOnPause bool `json:"drainedOnPause,omitempty" protobuf:"bytes,12,opt,name=drainedOnPause"`
}
diff --git a/pkg/reconciler/monovertex/controller.go b/pkg/reconciler/monovertex/controller.go
index a0de7e6d6a..a7b7a90c40 100644
--- a/pkg/reconciler/monovertex/controller.go
+++ b/pkg/reconciler/monovertex/controller.go
@@ -110,6 +110,7 @@ func (mr *monoVertexReconciler) reconcile(ctx context.Context, monoVtx *dfv1.Mon
}
}()
+ monoVtx.Status.InitializeConditions()
monoVtx.Status.SetObservedGeneration(monoVtx.Generation)
if monoVtx.Scalable() {
mr.scaler.StartWatching(mVtxKey)
@@ -251,7 +252,7 @@ func (mr *monoVertexReconciler) orchestratePods(ctx context.Context, monoVtx *df
if updatedReplicas+toBeUpdated > desiredReplicas {
toBeUpdated = desiredReplicas - updatedReplicas
}
- log.Infof("Rolling update %d replicas, [%d, %d)\n", toBeUpdated, updatedReplicas, updatedReplicas+toBeUpdated)
+ log.Infof("Rolling update %d replicas, [%d, %d)", toBeUpdated, updatedReplicas, updatedReplicas+toBeUpdated)
// Create pods [updatedReplicas, updatedReplicas+toBeUpdated), and clean up any pods in that range that has a different hash
if err := mr.orchestratePodsFromTo(ctx, monoVtx, *podSpec, updatedReplicas, updatedReplicas+toBeUpdated, monoVtx.Status.UpdateHash); err != nil {
@@ -310,7 +311,7 @@ func (mr *monoVertexReconciler) cleanUpPodsFromTo(ctx context.Context, monoVtx *
if err := mr.client.Delete(ctx, &pod); err != nil {
return fmt.Errorf("failed to delete pod %s: %w", pod.Name, err)
}
- log.Infof("Deleted MonoVertx pod %s\n", pod.Name)
+ log.Infof("Deleted MonoVertx pod %q", pod.Name)
mr.recorder.Eventf(monoVtx, corev1.EventTypeNormal, "DeletePodSuccess", "Succeeded to delete a mono vertex pod %s", pod.Name)
}
return nil
diff --git a/pkg/reconciler/monovertex/scaling/scaling.go b/pkg/reconciler/monovertex/scaling/scaling.go
index 2ae99fcf1b..d523800cdd 100644
--- a/pkg/reconciler/monovertex/scaling/scaling.go
+++ b/pkg/reconciler/monovertex/scaling/scaling.go
@@ -233,11 +233,11 @@ func (s *Scaler) scaleOneMonoVertex(ctx context.Context, key string, worker int)
min := monoVtx.Spec.Scale.GetMinReplicas()
if desired > max {
desired = max
- log.Infof("Calculated desired replica number %d of MonoVertex %q is greater than max, using max %d.", monoVtxName, desired, max)
+ log.Infof("Calculated desired replica number %d of MonoVertex %q is greater than max, using max %d.", desired, monoVtxName, max)
}
if desired < min {
desired = min
- log.Infof("Calculated desired replica number %d of MonoVertex %q is smaller than min, using min %d.", monoVtxName, desired, min)
+ log.Infof("Calculated desired replica number %d of MonoVertex %q is smaller than min, using min %d.", desired, monoVtxName, min)
}
current := int32(monoVtx.Status.Replicas)
if current > max || current < min { // Someone might have manually scaled up/down the MonoVertex
diff --git a/pkg/reconciler/pipeline/controller.go b/pkg/reconciler/pipeline/controller.go
index 96d9fa2266..955344a8c1 100644
--- a/pkg/reconciler/pipeline/controller.go
+++ b/pkg/reconciler/pipeline/controller.go
@@ -143,15 +143,25 @@ func (r *pipelineReconciler) reconcile(ctx context.Context, pl *dfv1.Pipeline) (
}
}()
+ pl.Status.InitConditions()
pl.Status.SetObservedGeneration(pl.Generation)
- // Regular pipeline change
- // This should be happening in all cases to ensure a clean initialization regardless of the lifecycle phase
- // Eg: even for a pipeline started with desiredPhase = Pause, we should still create the resources for the pipeline
- result, err := r.reconcileNonLifecycleChanges(ctx, pl)
- if err != nil {
+ // Orchestrate pipeline sub resources.
+ // This should be happening in all cases to ensure a clean initialization regardless of the lifecycle phase.
+ // Eg: even for a pipeline started with desiredPhase = Pause, we should still create the resources for the pipeline.
+ if err := r.reconcileFixedResources(ctx, pl); err != nil {
r.recorder.Eventf(pl, corev1.EventTypeWarning, "ReconcilePipelineFailed", "Failed to reconcile pipeline: %v", err.Error())
- return result, err
+ return ctrl.Result{}, err
+ }
+ // If the pipeline has a lifecycle change, then do not update the phase as
+ // this should happen only after the required configs for the lifecycle changes
+ // have been applied.
+ if !isLifecycleChange(pl) {
+ pl.Status.SetPhase(pl.Spec.Lifecycle.GetDesiredPhase(), "")
}
+ if err := r.checkChildrenResourceStatus(ctx, pl); err != nil {
+ return ctrl.Result{}, fmt.Errorf("failed to check pipeline children resource status, %w", err)
+ }
+
// check if any changes related to pause/resume lifecycle for the pipeline
if isLifecycleChange(pl) {
oldPhase := pl.Status.Phase
@@ -171,7 +181,7 @@ func (r *pipelineReconciler) reconcile(ctx context.Context, pl *dfv1.Pipeline) (
}
return ctrl.Result{}, nil
}
- return result, nil
+ return ctrl.Result{}, nil
}
// isLifecycleChange determines whether there has been a change requested in the lifecycle
@@ -190,17 +200,16 @@ func isLifecycleChange(pl *dfv1.Pipeline) bool {
return false
}
-// reconcileNonLifecycleChanges do the jobs not related to pipeline lifecycle changes.
-func (r *pipelineReconciler) reconcileNonLifecycleChanges(ctx context.Context, pl *dfv1.Pipeline) (ctrl.Result, error) {
+// reconcileFixedResources do the jobs of creating fixed resources such as daemon service, vertex objects, and ISB management jobs, etc
+func (r *pipelineReconciler) reconcileFixedResources(ctx context.Context, pl *dfv1.Pipeline) error {
log := logging.FromContext(ctx)
if !controllerutil.ContainsFinalizer(pl, finalizerName) {
controllerutil.AddFinalizer(pl, finalizerName)
}
- pl.Status.InitConditions()
if err := ValidatePipeline(pl); err != nil {
log.Errorw("Validation failed", zap.Error(err))
pl.Status.MarkNotConfigured("InvalidSpec", err.Error())
- return ctrl.Result{}, err
+ return err
}
pl.Status.SetVertexCounts(pl.Spec.Vertices)
pl.Status.MarkConfigured()
@@ -215,16 +224,16 @@ func (r *pipelineReconciler) reconcileNonLifecycleChanges(ctx context.Context, p
if apierrors.IsNotFound(err) {
pl.Status.MarkDeployFailed("ISBSvcNotFound", "ISB Service not found.")
log.Errorw("ISB Service not found", zap.String("isbsvc", isbSvcName), zap.Error(err))
- return ctrl.Result{}, fmt.Errorf("isbsvc %s not found", isbSvcName)
+ return fmt.Errorf("isbsvc %s not found", isbSvcName)
}
pl.Status.MarkDeployFailed("GetISBSvcFailed", err.Error())
log.Errorw("Failed to get ISB Service", zap.String("isbsvc", isbSvcName), zap.Error(err))
- return ctrl.Result{}, err
+ return err
}
if !isbSvc.Status.IsHealthy() {
pl.Status.MarkDeployFailed("ISBSvcNotHealthy", "ISB Service not healthy.")
log.Errorw("ISB Service is not in healthy status", zap.String("isbsvc", isbSvcName), zap.Error(err))
- return ctrl.Result{}, fmt.Errorf("isbsvc not healthy")
+ return fmt.Errorf("isbsvc not healthy")
}
// Create or update the Side Inputs Manager deployments
@@ -232,14 +241,14 @@ func (r *pipelineReconciler) reconcileNonLifecycleChanges(ctx context.Context, p
log.Errorw("Failed to create or update Side Inputs Manager deployments", zap.Error(err))
pl.Status.MarkDeployFailed("CreateOrUpdateSIMDeploymentsFailed", err.Error())
r.recorder.Eventf(pl, corev1.EventTypeWarning, "CreateOrUpdateSIMDeploymentsFailed", "Failed to create or update Side Inputs Manager deployments: %w", err.Error())
- return ctrl.Result{}, err
+ return err
}
existingObjs, err := r.findExistingVertices(ctx, pl)
if err != nil {
log.Errorw("Failed to find existing vertices", zap.Error(err))
pl.Status.MarkDeployFailed("ListVerticesFailed", err.Error())
- return ctrl.Result{}, err
+ return err
}
oldBuffers := make(map[string]string)
newBuffers := make(map[string]string)
@@ -279,7 +288,7 @@ func (r *pipelineReconciler) reconcileNonLifecycleChanges(ctx context.Context, p
} else {
pl.Status.MarkDeployFailed("CreateVertexFailed", err.Error())
r.recorder.Eventf(pl, corev1.EventTypeWarning, "CreateVertexFailed", "Failed to create vertex: %w", err.Error())
- return ctrl.Result{}, fmt.Errorf("failed to create vertex, err: %w", err)
+ return fmt.Errorf("failed to create vertex, err: %w", err)
}
}
log.Infow("Created vertex successfully", zap.String("vertex", vertexName))
@@ -291,7 +300,7 @@ func (r *pipelineReconciler) reconcileNonLifecycleChanges(ctx context.Context, p
if err := r.client.Update(ctx, &oldObj); err != nil {
pl.Status.MarkDeployFailed("UpdateVertexFailed", err.Error())
r.recorder.Eventf(pl, corev1.EventTypeWarning, "UpdateVertexFailed", "Failed to update vertex: %w", err.Error())
- return ctrl.Result{}, fmt.Errorf("failed to update vertex, err: %w", err)
+ return fmt.Errorf("failed to update vertex, err: %w", err)
}
log.Infow("Updated vertex successfully", zap.String("vertex", vertexName))
r.recorder.Eventf(pl, corev1.EventTypeNormal, "UpdateVertexSuccess", "Updated vertex %s successfully", vertexName)
@@ -303,7 +312,7 @@ func (r *pipelineReconciler) reconcileNonLifecycleChanges(ctx context.Context, p
if err := r.client.Delete(ctx, &v); err != nil {
pl.Status.MarkDeployFailed("DeleteStaleVertexFailed", err.Error())
r.recorder.Eventf(pl, corev1.EventTypeWarning, "DeleteStaleVertexFailed", "Failed to delete vertex: %w", err.Error())
- return ctrl.Result{}, fmt.Errorf("failed to delete vertex, err: %w", err)
+ return fmt.Errorf("failed to delete vertex, err: %w", err)
}
log.Infow("Deleted stale vertex successfully", zap.String("vertex", v.Name))
r.recorder.Eventf(pl, corev1.EventTypeNormal, "DeleteStaleVertexSuccess", "Deleted stale vertex %s successfully", v.Name)
@@ -328,7 +337,7 @@ func (r *pipelineReconciler) reconcileNonLifecycleChanges(ctx context.Context, p
batchJob := buildISBBatchJob(pl, r.image, isbSvc.Status.Config, "isbsvc-create", args, "cre")
if err := r.client.Create(ctx, batchJob); err != nil && !apierrors.IsAlreadyExists(err) {
pl.Status.MarkDeployFailed("CreateISBSvcCreatingJobFailed", err.Error())
- return ctrl.Result{}, fmt.Errorf("failed to create ISB Svc creating job, err: %w", err)
+ return fmt.Errorf("failed to create ISB Svc creating job, err: %w", err)
}
log.Infow("Created a job successfully for ISB Svc creating", zap.Any("buffers", bfs), zap.Any("buckets", bks), zap.Any("servingStreams", pl.GetServingSourceStreamNames()))
}
@@ -346,31 +355,22 @@ func (r *pipelineReconciler) reconcileNonLifecycleChanges(ctx context.Context, p
batchJob := buildISBBatchJob(pl, r.image, isbSvc.Status.Config, "isbsvc-delete", args, "del")
if err := r.client.Create(ctx, batchJob); err != nil && !apierrors.IsAlreadyExists(err) {
pl.Status.MarkDeployFailed("CreateISBSvcDeletingJobFailed", err.Error())
- return ctrl.Result{}, fmt.Errorf("failed to create ISB Svc deleting job, err: %w", err)
+ return fmt.Errorf("failed to create ISB Svc deleting job, err: %w", err)
}
log.Infow("Created ISB Svc deleting job successfully", zap.Any("buffers", bfs), zap.Any("buckets", bks))
}
// Daemon service
if err := r.createOrUpdateDaemonService(ctx, pl); err != nil {
- return ctrl.Result{}, err
+ return err
}
// Daemon deployment
if err := r.createOrUpdateDaemonDeployment(ctx, pl, isbSvc.Status.Config); err != nil {
- return ctrl.Result{}, err
+ return err
}
pl.Status.MarkDeployed()
- // If the pipeline has a lifecycle change, then do not update the phase as
- // this should happen only after the required configs for the lifecycle changes
- // have been applied.
- if !isLifecycleChange(pl) {
- pl.Status.SetPhase(pl.Spec.Lifecycle.GetDesiredPhase(), "")
- }
- if err := r.checkChildrenResourceStatus(ctx, pl); err != nil {
- return ctrl.Result{}, fmt.Errorf("failed to check pipeline children resource status, %w", err)
- }
- return ctrl.Result{}, nil
+ return nil
}
func (r *pipelineReconciler) createOrUpdateDaemonService(ctx context.Context, pl *dfv1.Pipeline) error {
diff --git a/pkg/reconciler/pipeline/controller_test.go b/pkg/reconciler/pipeline/controller_test.go
index 0cf9205f0a..2a1762aa4b 100644
--- a/pkg/reconciler/pipeline/controller_test.go
+++ b/pkg/reconciler/pipeline/controller_test.go
@@ -154,6 +154,18 @@ func init() {
_ = batchv1.AddToScheme(scheme.Scheme)
}
+func fakeReconciler(t *testing.T, cl client.WithWatch) *pipelineReconciler {
+ t.Helper()
+ return &pipelineReconciler{
+ client: cl,
+ scheme: scheme.Scheme,
+ config: reconciler.FakeGlobalConfig(t, fakeGlobalISBSvcConfig),
+ image: testFlowImage,
+ logger: zaptest.NewLogger(t).Sugar(),
+ recorder: record.NewFakeRecorder(64),
+ }
+}
+
func Test_NewReconciler(t *testing.T) {
cl := fake.NewClientBuilder().Build()
r := NewReconciler(cl, scheme.Scheme, reconciler.FakeGlobalConfig(t, fakeGlobalISBSvcConfig), testFlowImage, zaptest.NewLogger(t).Sugar(), record.NewFakeRecorder(64))
@@ -162,23 +174,17 @@ func Test_NewReconciler(t *testing.T) {
}
func Test_reconcile(t *testing.T) {
+ ctx := context.TODO()
+
t.Run("test reconcile", func(t *testing.T) {
- cl := fake.NewClientBuilder().Build()
- ctx := context.TODO()
testIsbSvc := testNativeRedisIsbSvc.DeepCopy()
testIsbSvc.Status.MarkConfigured()
testIsbSvc.Status.MarkDeployed()
+ cl := fake.NewClientBuilder().Build()
err := cl.Create(ctx, testIsbSvc)
assert.Nil(t, err)
- r := &pipelineReconciler{
- client: cl,
- scheme: scheme.Scheme,
- config: reconciler.FakeGlobalConfig(t, fakeGlobalISBSvcConfig),
- image: testFlowImage,
- logger: zaptest.NewLogger(t).Sugar(),
- recorder: record.NewFakeRecorder(64),
- }
testObj := testPipeline.DeepCopy()
+ r := fakeReconciler(t, cl)
_, err = r.reconcile(ctx, testObj)
assert.NoError(t, err)
vertices := &dfv1.VertexList{}
@@ -191,27 +197,50 @@ func Test_reconcile(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 1, len(jobs.Items))
})
-}
-func Test_reconcileEvents(t *testing.T) {
+ t.Run("test reconcile deleting", func(t *testing.T) {
+ testIsbSvc := testNativeRedisIsbSvc.DeepCopy()
+ testIsbSvc.Status.MarkConfigured()
+ testIsbSvc.Status.MarkDeployed()
+ cl := fake.NewClientBuilder().Build()
+ err := cl.Create(ctx, testIsbSvc)
+ assert.Nil(t, err)
+ testObj := testPipeline.DeepCopy()
+ testObj.DeletionTimestamp = &metav1.Time{Time: time.Now()}
+ r := fakeReconciler(t, cl)
+ _, err = r.reconcile(ctx, testObj)
+ assert.NoError(t, err)
+ })
- fakeConfig := reconciler.FakeGlobalConfig(t, fakeGlobalISBSvcConfig)
- t.Run("test reconcile - invalid name", func(t *testing.T) {
+ t.Run("test reconcile - no isbsvc", func(t *testing.T) {
+ testObj := testPipeline.DeepCopy()
cl := fake.NewClientBuilder().Build()
- ctx := context.TODO()
+ r := fakeReconciler(t, cl)
+ _, err := r.reconcile(ctx, testObj)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "not found")
+ })
+
+ t.Run("test reconcile - isbsvc unhealthy", func(t *testing.T) {
+ testIsbSvc := testNativeRedisIsbSvc.DeepCopy()
+ testIsbSvc.Status.MarkConfigured()
+ cl := fake.NewClientBuilder().Build()
+ _ = cl.Create(ctx, testIsbSvc)
+ testObj := testPipeline.DeepCopy()
+ r := fakeReconciler(t, cl)
+ _, err := r.reconcile(ctx, testObj)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "not healthy")
+ })
+
+ t.Run("test reconcile - invalid name", func(t *testing.T) {
testIsbSvc := testNativeRedisIsbSvc.DeepCopy()
testIsbSvc.Status.MarkConfigured()
testIsbSvc.Status.MarkDeployed()
+ cl := fake.NewClientBuilder().Build()
+ r := fakeReconciler(t, cl)
err := cl.Create(ctx, testIsbSvc)
assert.Nil(t, err)
- r := &pipelineReconciler{
- client: cl,
- scheme: scheme.Scheme,
- config: fakeConfig,
- image: testFlowImage,
- logger: zaptest.NewLogger(t).Sugar(),
- recorder: record.NewFakeRecorder(64),
- }
testObj := testPipeline.DeepCopy()
testObj.Status.Phase = "Paused"
_, err = r.reconcile(ctx, testObj)
@@ -224,22 +253,14 @@ func Test_reconcileEvents(t *testing.T) {
})
t.Run("test reconcile - duplicate vertex", func(t *testing.T) {
- cl := fake.NewClientBuilder().Build()
- ctx := context.TODO()
testIsbSvc := testNativeRedisIsbSvc.DeepCopy()
testIsbSvc.Status.MarkConfigured()
testIsbSvc.Status.MarkDeployed()
+ cl := fake.NewClientBuilder().Build()
err := cl.Create(ctx, testIsbSvc)
assert.Nil(t, err)
- r := &pipelineReconciler{
- client: cl,
- scheme: scheme.Scheme,
- config: fakeConfig,
- image: testFlowImage,
- logger: zaptest.NewLogger(t).Sugar(),
- recorder: record.NewFakeRecorder(64),
- }
testObj := testPipeline.DeepCopy()
+ r := fakeReconciler(t, cl)
_, err = r.reconcile(ctx, testObj)
assert.NoError(t, err)
testObj.Spec.Vertices = append(testObj.Spec.Vertices, dfv1.AbstractVertex{Name: "input", Source: &dfv1.Source{}})
@@ -279,14 +300,7 @@ func Test_pauseAndResumePipeline(t *testing.T) {
testIsbSvc.Status.MarkDeployed()
err := cl.Create(ctx, testIsbSvc)
assert.Nil(t, err)
- r := &pipelineReconciler{
- client: cl,
- scheme: scheme.Scheme,
- config: reconciler.FakeGlobalConfig(t, fakeGlobalISBSvcConfig),
- image: testFlowImage,
- logger: zaptest.NewLogger(t).Sugar(),
- recorder: record.NewFakeRecorder(64),
- }
+ r := fakeReconciler(t, cl)
testObj := testPipeline.DeepCopy()
testObj.Spec.Vertices[0].Scale.Min = ptr.To[int32](3)
_, err = r.reconcile(ctx, testObj)
@@ -316,14 +330,7 @@ func Test_pauseAndResumePipeline(t *testing.T) {
testIsbSvc.Status.MarkDeployed()
err := cl.Create(ctx, testIsbSvc)
assert.Nil(t, err)
- r := &pipelineReconciler{
- client: cl,
- scheme: scheme.Scheme,
- config: reconciler.FakeGlobalConfig(t, fakeGlobalISBSvcConfig),
- image: testFlowImage,
- logger: zaptest.NewLogger(t).Sugar(),
- recorder: record.NewFakeRecorder(64),
- }
+ r := fakeReconciler(t, cl)
testObj := testReducePipeline.DeepCopy()
_, err = r.reconcile(ctx, testObj)
assert.NoError(t, err)
@@ -566,14 +573,7 @@ func Test_cleanupBuffers(t *testing.T) {
func TestCreateOrUpdateDaemon(t *testing.T) {
cl := fake.NewClientBuilder().Build()
ctx := context.TODO()
- r := &pipelineReconciler{
- client: cl,
- scheme: scheme.Scheme,
- config: reconciler.FakeGlobalConfig(t, fakeGlobalISBSvcConfig),
- image: testFlowImage,
- logger: zaptest.NewLogger(t).Sugar(),
- recorder: record.NewFakeRecorder(64),
- }
+ r := fakeReconciler(t, cl)
t.Run("test create or update service", func(t *testing.T) {
testObj := testPipeline.DeepCopy()
@@ -601,14 +601,7 @@ func TestCreateOrUpdateDaemon(t *testing.T) {
func Test_createOrUpdateSIMDeployments(t *testing.T) {
cl := fake.NewClientBuilder().Build()
ctx := context.TODO()
- r := &pipelineReconciler{
- client: cl,
- scheme: scheme.Scheme,
- config: reconciler.FakeGlobalConfig(t, fakeGlobalISBSvcConfig),
- image: testFlowImage,
- logger: zaptest.NewLogger(t).Sugar(),
- recorder: record.NewFakeRecorder(64),
- }
+ r := fakeReconciler(t, cl)
t.Run("no side inputs", func(t *testing.T) {
err := r.createOrUpdateSIMDeployments(ctx, testPipeline, fakeIsbSvcConfig)
@@ -920,14 +913,7 @@ func Test_checkChildrenResourceStatus(t *testing.T) {
testIsbSvc.Status.MarkDeployed()
err := cl.Create(ctx, testIsbSvc)
assert.Nil(t, err)
- r := &pipelineReconciler{
- client: cl,
- scheme: scheme.Scheme,
- config: reconciler.FakeGlobalConfig(t, fakeGlobalISBSvcConfig),
- image: testFlowImage,
- logger: zaptest.NewLogger(t).Sugar(),
- recorder: record.NewFakeRecorder(64),
- }
+ r := fakeReconciler(t, cl)
testObj := testPipelineWithSideinput.DeepCopy()
_, err = r.reconcile(ctx, testObj)
assert.NoError(t, err)
diff --git a/pkg/reconciler/vertex/scaling/scaling.go b/pkg/reconciler/vertex/scaling/scaling.go
index 03f581e6d7..5ea6b7e6d5 100644
--- a/pkg/reconciler/vertex/scaling/scaling.go
+++ b/pkg/reconciler/vertex/scaling/scaling.go
@@ -302,11 +302,11 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
min := vertex.Spec.Scale.GetMinReplicas()
if desired > max {
desired = max
- log.Infof("Calculated desired replica number %d of vertex %q is greater than max, using max %d.", vertex.Name, desired, max)
+ log.Infof("Calculated desired replica number %d of vertex %q is greater than max, using max %d.", desired, vertex.Name, max)
}
if desired < min {
desired = min
- log.Infof("Calculated desired replica number %d of vertex %q is smaller than min, using min %d.", vertex.Name, desired, min)
+ log.Infof("Calculated desired replica number %d of vertex %q is smaller than min, using min %d.", desired, vertex.Name, min)
}
if current > max || current < min { // Someone might have manually scaled up/down the vertex
return s.patchVertexReplicas(ctx, vertex, desired)
@@ -328,14 +328,14 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
directPressure, downstreamPressure := s.hasBackPressure(*pl, *vertex)
if directPressure {
if current > min && current > 1 { // Scale down but not to 0
- log.Infof("Vertex %s has direct back pressure from connected vertices, decreasing one replica.", key)
+ log.Infof("Vertex %q has direct back pressure from connected vertices, decreasing one replica.", key)
return s.patchVertexReplicas(ctx, vertex, current-1)
} else {
- log.Infof("Vertex %s has direct back pressure from connected vertices, skip scaling.", key)
+ log.Infof("Vertex %q has direct back pressure from connected vertices, skip scaling.", key)
return nil
}
} else if downstreamPressure {
- log.Infof("Vertex %s has back pressure in downstream vertices, skip scaling.", key)
+ log.Infof("Vertex %q has back pressure in downstream vertices, skip scaling.", key)
return nil
}
maxAllowedUp := int32(vertex.Spec.Scale.GetReplicasPerScaleUp())
diff --git a/rust/numaflow-models/src/models/pipeline_status.rs b/rust/numaflow-models/src/models/pipeline_status.rs
index e67205b3cd..2fa64471fd 100644
--- a/rust/numaflow-models/src/models/pipeline_status.rs
+++ b/rust/numaflow-models/src/models/pipeline_status.rs
@@ -21,7 +21,7 @@ pub struct PipelineStatus {
/// Conditions are the latest available observations of a resource's current state.
#[serde(rename = "conditions", skip_serializing_if = "Option::is_none")]
pub conditions: Option>,
- /// Field to indicate if a pipeline drain successfully occurred, or it timed out. Set to true when the Pipeline is in Paused state, and after it has successfully been drained. defaults to false
+ /// Field to indicate if a pipeline drain successfully occurred, only meaningful when the pipeline is paused. True means it has been successfully drained.
#[serde(rename = "drainedOnPause", skip_serializing_if = "Option::is_none")]
pub drained_on_pause: Option,
#[serde(rename = "lastUpdated", skip_serializing_if = "Option::is_none")]