From 9954f8477aaa64030616cf35c4e51ffdd66efda1 Mon Sep 17 00:00:00 2001 From: Dillen Padhiar <38965141+dpadhiar@users.noreply.github.com> Date: Fri, 26 Jan 2024 16:35:17 -0800 Subject: [PATCH] feat: terminate reduce vertex pods when pausing pipeline (#1481) Signed-off-by: Dillen Padhiar --- pkg/apis/numaflow/v1alpha1/vertex_types.go | 4 + .../numaflow/v1alpha1/vertex_types_test.go | 1 + pkg/reconciler/pipeline/controller.go | 13 ++- pkg/reconciler/pipeline/controller_test.go | 105 ++++++++++++------ 4 files changed, 86 insertions(+), 37 deletions(-) diff --git a/pkg/apis/numaflow/v1alpha1/vertex_types.go b/pkg/apis/numaflow/v1alpha1/vertex_types.go index 7fb9393565..47dd72c9ec 100644 --- a/pkg/apis/numaflow/v1alpha1/vertex_types.go +++ b/pkg/apis/numaflow/v1alpha1/vertex_types.go @@ -395,6 +395,10 @@ func (v Vertex) GetToBuffers() []string { func (v Vertex) GetReplicas() int { if v.IsReduceUDF() { + // Replicas will be 0 only when pausing a pipeline + if v.Spec.Replicas != nil && int(*v.Spec.Replicas) == 0 { + return 0 + } // Replica of a reduce vertex is determined by the partitions. return v.GetPartitionCount() } diff --git a/pkg/apis/numaflow/v1alpha1/vertex_types_test.go b/pkg/apis/numaflow/v1alpha1/vertex_types_test.go index 96232dd307..34f3c267be 100644 --- a/pkg/apis/numaflow/v1alpha1/vertex_types_test.go +++ b/pkg/apis/numaflow/v1alpha1/vertex_types_test.go @@ -143,6 +143,7 @@ func TestGetVertexReplicas(t *testing.T) { v.Spec.FromEdges = []CombinedEdge{ {Edge: Edge{From: "a", To: "b"}}, } + v.Spec.Replicas = pointer.Int32(5) assert.Equal(t, 1, v.GetReplicas()) v.Spec.Replicas = pointer.Int32(1000) assert.Equal(t, 1, v.GetReplicas()) diff --git a/pkg/reconciler/pipeline/controller.go b/pkg/reconciler/pipeline/controller.go index 7444198d47..e6c64ce83d 100644 --- a/pkg/reconciler/pipeline/controller.go +++ b/pkg/reconciler/pipeline/controller.go @@ -842,10 +842,17 @@ func (r *pipelineReconciler) scaleVertex(ctx context.Context, pl *dfv1.Pipeline, for _, vertex := range existingVertices { if origin := *vertex.Spec.Replicas; origin != replicas && filter(vertex) { scaleTo := replicas - // if vtx does not support autoscaling and min is set, scale up to min + // if replicas equals to 1, it means we are resuming a paused pipeline + // in this case, if a vertex doesn't support auto-scaling, we scale up based on the vertex's configuration: + // for a reducer, we scale up to the partition count + // for a non-reducer, if min is set, we scale up to min if replicas == 1 { - if !vertex.Scalable() && vertex.Spec.Scale.Min != nil && *vertex.Spec.Scale.Min > 1 { - scaleTo = *vertex.Spec.Scale.Min + if !vertex.Scalable() { + if vertex.IsReduceUDF() { + scaleTo = int32(vertex.GetPartitionCount()) + } else if vertex.Spec.Scale.Min != nil && *vertex.Spec.Scale.Min > 1 { + scaleTo = *vertex.Spec.Scale.Min + } } } vertex.Spec.Replicas = pointer.Int32(scaleTo) diff --git a/pkg/reconciler/pipeline/controller_test.go b/pkg/reconciler/pipeline/controller_test.go index 46d8752eab..d0745a0d3e 100644 --- a/pkg/reconciler/pipeline/controller_test.go +++ b/pkg/reconciler/pipeline/controller_test.go @@ -223,40 +223,77 @@ func Test_buildReducesVertices(t *testing.T) { } func Test_pauseAndResumePipeline(t *testing.T) { - cl := fake.NewClientBuilder().Build() - ctx := context.TODO() - testIsbSvc := testNativeRedisIsbSvc.DeepCopy() - testIsbSvc.Status.MarkConfigured() - testIsbSvc.Status.MarkDeployed() - err := cl.Create(ctx, testIsbSvc) - assert.Nil(t, err) - r := &pipelineReconciler{ - client: cl, - scheme: scheme.Scheme, - config: fakeConfig, - image: testFlowImage, - logger: zaptest.NewLogger(t).Sugar(), - recorder: record.NewFakeRecorder(64), - } - testObj := testPipeline.DeepCopy() - testObj.Spec.Vertices[0].Scale.Min = pointer.Int32(3) - _, err = r.reconcile(ctx, testObj) - assert.NoError(t, err) - _, err = r.pausePipeline(ctx, testObj) - assert.NoError(t, err) - v, err := r.findExistingVertices(ctx, testObj) - assert.NoError(t, err) - assert.Equal(t, int32(0), *v[testObj.Name+"-"+testObj.Spec.Vertices[0].Name].Spec.Replicas) - assert.NotNil(t, testObj.Annotations[dfv1.KeyPauseTimestamp]) - testObj.Annotations[dfv1.KeyPauseTimestamp] = "" - _, err = r.resumePipeline(ctx, testObj) - assert.NoError(t, err) - v, err = r.findExistingVertices(ctx, testObj) - assert.NoError(t, err) - // when auto-scaling is enabled, while resuming the pipeline, instead of setting the replicas to Scale.Min, - // we set it to one and let auto-scaling to scale up - assert.Equal(t, int32(1), *v[testObj.Name+"-"+testObj.Spec.Vertices[0].Name].Spec.Replicas) - assert.NoError(t, err) + + t.Run("test normal pipeline", func(t *testing.T) { + cl := fake.NewClientBuilder().Build() + ctx := context.TODO() + testIsbSvc := testNativeRedisIsbSvc.DeepCopy() + testIsbSvc.Status.MarkConfigured() + testIsbSvc.Status.MarkDeployed() + err := cl.Create(ctx, testIsbSvc) + assert.Nil(t, err) + r := &pipelineReconciler{ + client: cl, + scheme: scheme.Scheme, + config: fakeConfig, + image: testFlowImage, + logger: zaptest.NewLogger(t).Sugar(), + recorder: record.NewFakeRecorder(64), + } + testObj := testPipeline.DeepCopy() + testObj.Spec.Vertices[0].Scale.Min = pointer.Int32(3) + _, err = r.reconcile(ctx, testObj) + assert.NoError(t, err) + _, err = r.pausePipeline(ctx, testObj) + assert.NoError(t, err) + v, err := r.findExistingVertices(ctx, testObj) + assert.NoError(t, err) + assert.Equal(t, int32(0), *v[testObj.Name+"-"+testObj.Spec.Vertices[0].Name].Spec.Replicas) + assert.NotNil(t, testObj.Annotations[dfv1.KeyPauseTimestamp]) + testObj.Annotations[dfv1.KeyPauseTimestamp] = "" + _, err = r.resumePipeline(ctx, testObj) + assert.NoError(t, err) + v, err = r.findExistingVertices(ctx, testObj) + assert.NoError(t, err) + // when auto-scaling is enabled, while resuming the pipeline, instead of setting the replicas to Scale.Min, + // we set it to one and let auto-scaling to scale up + assert.Equal(t, int32(1), *v[testObj.Name+"-"+testObj.Spec.Vertices[0].Name].Spec.Replicas) + assert.NoError(t, err) + }) + + t.Run("test reduce pipeline", func(t *testing.T) { + cl := fake.NewClientBuilder().Build() + ctx := context.TODO() + testIsbSvc := testNativeRedisIsbSvc.DeepCopy() + testIsbSvc.Status.MarkConfigured() + testIsbSvc.Status.MarkDeployed() + err := cl.Create(ctx, testIsbSvc) + assert.Nil(t, err) + r := &pipelineReconciler{ + client: cl, + scheme: scheme.Scheme, + config: fakeConfig, + image: testFlowImage, + logger: zaptest.NewLogger(t).Sugar(), + recorder: record.NewFakeRecorder(64), + } + testObj := testReducePipeline.DeepCopy() + _, err = r.reconcile(ctx, testObj) + assert.NoError(t, err) + _, err = r.pausePipeline(ctx, testObj) + assert.NoError(t, err) + _, err = r.findExistingVertices(ctx, testObj) + assert.NoError(t, err) + assert.NotNil(t, testObj.Annotations[dfv1.KeyPauseTimestamp]) + testObj.Annotations[dfv1.KeyPauseTimestamp] = "" + _, err = r.resumePipeline(ctx, testObj) + assert.NoError(t, err) + v, err := r.findExistingVertices(ctx, testObj) + assert.NoError(t, err) + // reduce UDFs are not autoscalable thus they are scaled manually back to their partition count + assert.Equal(t, int32(2), *v[testObj.Name+"-"+testObj.Spec.Vertices[2].Name].Spec.Replicas) + assert.NoError(t, err) + }) } func Test_copyVertexLimits(t *testing.T) {