Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy committed Sep 10, 2024
1 parent 94abeed commit 00626fa
Showing 1 changed file with 50 additions and 48 deletions.
98 changes: 50 additions & 48 deletions pkg/reconciler/vertex/scaling/scaling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,23 @@ import (
dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
)

var (
fakeVertex = &dfv1.Vertex{
Spec: dfv1.VertexSpec{
Replicas: ptr.To[int32](3),
AbstractVertex: dfv1.AbstractVertex{
Scale: dfv1.Scale{
TargetProcessingSeconds: ptr.To[uint32](1),
},
},
},
Status: dfv1.VertexStatus{
Replicas: uint32(3),
ReadyReplicas: uint32(2),
},
}
)

func Test_BasicOperations(t *testing.T) {
cl := fake.NewClientBuilder().Build()
s := NewScaler(cl)
Expand All @@ -39,55 +56,40 @@ func Test_BasicOperations(t *testing.T) {
}

func Test_desiredReplicasSinglePartition(t *testing.T) {
cl := fake.NewClientBuilder().Build()
s := NewScaler(cl)
one := uint32(1)
src := &dfv1.Vertex{
Spec: dfv1.VertexSpec{
Replicas: ptr.To[int32](2),
AbstractVertex: dfv1.AbstractVertex{
Source: &dfv1.Source{
Kafka: &dfv1.KafkaSource{},
},
Scale: dfv1.Scale{
TargetProcessingSeconds: &one,
},
},
},
Status: dfv1.VertexStatus{
Replicas: uint32(2),
ReadyReplicas: uint32(2),
},
}
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{0}, []int64{0}, []int64{10000}, []int64{5000}))
assert.Equal(t, int32(8), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{10010}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(8), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{9950}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(7), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{8751}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(7), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{8749}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{0}, []int64{9950}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{2}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{0}, []int64{30000}, []int64{20000}))

udf := &dfv1.Vertex{
Spec: dfv1.VertexSpec{
Replicas: ptr.To[int32](2),
AbstractVertex: dfv1.AbstractVertex{
UDF: &dfv1.UDF{},
},
},
Status: dfv1.VertexStatus{
Replicas: uint32(2),
ReadyReplicas: uint32(2),
},
}
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{0}, []int64{0}, []int64{10000}, []int64{5000}))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{5000}))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{6000}))
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{7500}))
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{7900}))
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{10000}))
assert.Equal(t, int32(3), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{12500}))
assert.Equal(t, int32(3), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{12550}))
t.Run("test src", func(t *testing.T) {
cl := fake.NewClientBuilder().Build()
s := NewScaler(cl)
src := fakeVertex.DeepCopy()
src.Spec.Source = &dfv1.Source{
Kafka: &dfv1.KafkaSource{},
}
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{0}, []int64{0}, []int64{10000}, []int64{5000}))
assert.Equal(t, int32(8), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{10010}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(8), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{9950}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(7), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{8751}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(7), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{8749}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{0}, []int64{9950}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{2}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{0}, []int64{30000}, []int64{20000}))

})

t.Run("test udf", func(t *testing.T) {
cl := fake.NewClientBuilder().Build()
s := NewScaler(cl)
udf := fakeVertex.DeepCopy()
udf.Spec.UDF = &dfv1.UDF{}
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{0}, []int64{0}, []int64{10000}, []int64{5000}))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{5000}))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{6000}))
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{7500}))
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{7900}))
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{10000}))
assert.Equal(t, int32(3), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{12500}))
assert.Equal(t, int32(3), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{12550}))
})

}

func Test_desiredReplicasMultiplePartitions(t *testing.T) {
Expand Down

0 comments on commit 00626fa

Please sign in to comment.