diff --git a/pkg/reconciler/vertex/scaling/scaling_test.go b/pkg/reconciler/vertex/scaling/scaling_test.go index 4036e751fc..0ea80cef17 100644 --- a/pkg/reconciler/vertex/scaling/scaling_test.go +++ b/pkg/reconciler/vertex/scaling/scaling_test.go @@ -27,6 +27,23 @@ import ( dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" ) +var ( + fakeVertex = &dfv1.Vertex{ + Spec: dfv1.VertexSpec{ + Replicas: ptr.To[int32](3), + AbstractVertex: dfv1.AbstractVertex{ + Scale: dfv1.Scale{ + TargetProcessingSeconds: ptr.To[uint32](1), + }, + }, + }, + Status: dfv1.VertexStatus{ + Replicas: uint32(3), + ReadyReplicas: uint32(2), + }, + } +) + func Test_BasicOperations(t *testing.T) { cl := fake.NewClientBuilder().Build() s := NewScaler(cl) @@ -39,55 +56,40 @@ func Test_BasicOperations(t *testing.T) { } func Test_desiredReplicasSinglePartition(t *testing.T) { - cl := fake.NewClientBuilder().Build() - s := NewScaler(cl) - one := uint32(1) - src := &dfv1.Vertex{ - Spec: dfv1.VertexSpec{ - Replicas: ptr.To[int32](2), - AbstractVertex: dfv1.AbstractVertex{ - Source: &dfv1.Source{ - Kafka: &dfv1.KafkaSource{}, - }, - Scale: dfv1.Scale{ - TargetProcessingSeconds: &one, - }, - }, - }, - Status: dfv1.VertexStatus{ - Replicas: uint32(2), - ReadyReplicas: uint32(2), - }, - } - assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{0}, []int64{0}, []int64{10000}, []int64{5000})) - assert.Equal(t, int32(8), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{10010}, []int64{30000}, []int64{20000})) - assert.Equal(t, int32(8), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{9950}, []int64{30000}, []int64{20000})) - assert.Equal(t, int32(7), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{8751}, []int64{30000}, []int64{20000})) - assert.Equal(t, int32(7), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{8749}, []int64{30000}, []int64{20000})) - assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{0}, []int64{9950}, []int64{30000}, []int64{20000})) - assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{2}, []int64{30000}, []int64{20000})) - assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{0}, []int64{30000}, []int64{20000})) - udf := &dfv1.Vertex{ - Spec: dfv1.VertexSpec{ - Replicas: ptr.To[int32](2), - AbstractVertex: dfv1.AbstractVertex{ - UDF: &dfv1.UDF{}, - }, - }, - Status: dfv1.VertexStatus{ - Replicas: uint32(2), - ReadyReplicas: uint32(2), - }, - } - assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{0}, []int64{0}, []int64{10000}, []int64{5000})) - assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{5000})) - assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{6000})) - assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{7500})) - assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{7900})) - assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{10000})) - assert.Equal(t, int32(3), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{12500})) - assert.Equal(t, int32(3), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{12550})) + t.Run("test src", func(t *testing.T) { + cl := fake.NewClientBuilder().Build() + s := NewScaler(cl) + src := fakeVertex.DeepCopy() + src.Spec.Source = &dfv1.Source{ + Kafka: &dfv1.KafkaSource{}, + } + assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{0}, []int64{0}, []int64{10000}, []int64{5000})) + assert.Equal(t, int32(8), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{10010}, []int64{30000}, []int64{20000})) + assert.Equal(t, int32(8), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{9950}, []int64{30000}, []int64{20000})) + assert.Equal(t, int32(7), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{8751}, []int64{30000}, []int64{20000})) + assert.Equal(t, int32(7), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{8749}, []int64{30000}, []int64{20000})) + assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{0}, []int64{9950}, []int64{30000}, []int64{20000})) + assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{2}, []int64{30000}, []int64{20000})) + assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{0}, []int64{30000}, []int64{20000})) + + }) + + t.Run("test udf", func(t *testing.T) { + cl := fake.NewClientBuilder().Build() + s := NewScaler(cl) + udf := fakeVertex.DeepCopy() + udf.Spec.UDF = &dfv1.UDF{} + assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{0}, []int64{0}, []int64{10000}, []int64{5000})) + assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{5000})) + assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{6000})) + assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{7500})) + assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{7900})) + assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{10000})) + assert.Equal(t, int32(3), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{12500})) + assert.Equal(t, int32(3), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{12550})) + }) + } func Test_desiredReplicasMultiplePartitions(t *testing.T) {