From ca2967ca048d235ba30c6bd9c28b8bbfbc52339b Mon Sep 17 00:00:00 2001 From: Derek Wang Date: Sun, 12 Jan 2025 11:14:05 -0800 Subject: [PATCH] fix: lastScaledAt not updated during autoscaling Signed-off-by: Derek Wang --- pkg/reconciler/monovertex/controller.go | 4 ++-- pkg/reconciler/vertex/controller.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/reconciler/monovertex/controller.go b/pkg/reconciler/monovertex/controller.go index dbe63c6ca..e0ba7cac4 100644 --- a/pkg/reconciler/monovertex/controller.go +++ b/pkg/reconciler/monovertex/controller.go @@ -206,7 +206,8 @@ func (mr *monoVertexReconciler) orchestratePods(ctx context.Context, monoVtx *df if err := mr.cleanUpPodsFromTo(ctx, monoVtx, desiredReplicas, math.MaxInt); err != nil { return fmt.Errorf("failed to clean up mono vertex pods [%v, ∞): %w", desiredReplicas, err) } - if currentReplicas := int(monoVtx.Status.Replicas); currentReplicas > desiredReplicas { + currentReplicas := int(monoVtx.Status.Replicas) + if currentReplicas > desiredReplicas { monoVtx.Status.Replicas = uint32(desiredReplicas) } updatedReplicas := int(monoVtx.Status.UpdatedReplicas) @@ -286,7 +287,6 @@ func (mr *monoVertexReconciler) orchestratePods(ctx context.Context, monoVtx *df } } - currentReplicas := int(monoVtx.Status.Replicas) if currentReplicas != desiredReplicas { log.Infow("MonoVertex replicas changed", "currentReplicas", currentReplicas, "desiredReplicas", desiredReplicas) mr.recorder.Eventf(monoVtx, corev1.EventTypeNormal, "ReplicasScaled", "Replicas changed from %d to %d", currentReplicas, desiredReplicas) diff --git a/pkg/reconciler/vertex/controller.go b/pkg/reconciler/vertex/controller.go index c5e7fdcfe..393a57395 100644 --- a/pkg/reconciler/vertex/controller.go +++ b/pkg/reconciler/vertex/controller.go @@ -220,7 +220,8 @@ func (r *vertexReconciler) orchestratePods(ctx context.Context, vertex *dfv1.Ver if err := r.cleanUpPodsFromTo(ctx, vertex, desiredReplicas, math.MaxInt); err != nil { return fmt.Errorf("failed to clean up vertex pods [%v, ∞): %w", desiredReplicas, err) } - if currentReplicas := int(vertex.Status.Replicas); currentReplicas > desiredReplicas { + currentReplicas := int(vertex.Status.Replicas) + if currentReplicas > desiredReplicas { vertex.Status.Replicas = uint32(desiredReplicas) } updatedReplicas := int(vertex.Status.UpdatedReplicas) @@ -300,7 +301,6 @@ func (r *vertexReconciler) orchestratePods(ctx context.Context, vertex *dfv1.Ver } } - currentReplicas := int(vertex.Status.Replicas) if currentReplicas != desiredReplicas { log.Infow("Pipeline Vertex replicas changed", "currentReplicas", currentReplicas, "desiredReplicas", desiredReplicas) r.recorder.Eventf(vertex, corev1.EventTypeNormal, "ReplicasScaled", "Replicas changed from %d to %d", currentReplicas, desiredReplicas)