diff --git a/pkg/apis/numaflow/v1alpha1/const.go b/pkg/apis/numaflow/v1alpha1/const.go index d0e9eb62f0..e36ec9bd34 100644 --- a/pkg/apis/numaflow/v1alpha1/const.go +++ b/pkg/apis/numaflow/v1alpha1/const.go @@ -41,8 +41,6 @@ const ( KeyPauseTimestamp = "numaflow.numaproj.io/pause-timestamp" KeyDefaultContainer = "kubectl.kubernetes.io/default-container" - RemovePauseTimestampPatch = `[{"op": "remove", "path": "/metadata/annotations/numaflow.numaproj.io~1pause-timestamp"}]` - // ID key in the header of sources like http KeyMetaID = "X-Numaflow-Id" KeyMetaEventTime = "X-Numaflow-Event-Time" diff --git a/pkg/reconciler/isbsvc/controller.go b/pkg/reconciler/isbsvc/controller.go index 1ab7d4e79b..d94e14424d 100644 --- a/pkg/reconciler/isbsvc/controller.go +++ b/pkg/reconciler/isbsvc/controller.go @@ -18,17 +18,20 @@ package isbsvc import ( "context" + "strings" "go.uber.org/zap" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/yaml" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaflow/pkg/reconciler" @@ -58,7 +61,7 @@ func (r *interStepBufferServiceReconciler) Reconcile(ctx context.Context, req ct isbSvc := &dfv1.InterStepBufferService{} if err := r.client.Get(ctx, req.NamespacedName, isbSvc); err != nil { if apierrors.IsNotFound(err) { - return reconcile.Result{}, nil + return ctrl.Result{}, nil } r.logger.Errorw("Unable to get ISB Service", zap.Any("request", req), zap.Error(err)) return ctrl.Result{}, err @@ -69,14 +72,15 @@ func (r *interStepBufferServiceReconciler) Reconcile(ctx context.Context, req ct if reconcileErr != nil { log.Errorw("Reconcile error", zap.Error(reconcileErr)) } - if r.needsUpdate(isbSvc, isbSvcCopy) { - // Update with a DeepCopy because .Status will be cleaned up. - if err := r.client.Update(ctx, isbSvcCopy.DeepCopy()); err != nil { - return reconcile.Result{}, err + if !equality.Semantic.DeepEqual(isbSvc.Finalizers, isbSvcCopy.Finalizers) { + patchYaml := "metadata:\n finalizers: [" + strings.Join(isbSvcCopy.Finalizers, ",") + "]" + patchJson, _ := yaml.YAMLToJSON([]byte(patchYaml)) + if err := r.client.Patch(ctx, isbSvc, client.RawPatch(types.MergePatchType, []byte(patchJson))); err != nil { + return ctrl.Result{}, err } } if err := r.client.Status().Update(ctx, isbSvcCopy); err != nil { - return reconcile.Result{}, err + return ctrl.Result{}, err } return ctrl.Result{}, reconcileErr } @@ -122,16 +126,6 @@ func (r *interStepBufferServiceReconciler) reconcile(ctx context.Context, isbSvc return installer.Install(ctx, isbSvc, r.client, r.kubeClient, r.config, log, r.recorder) } -func (r *interStepBufferServiceReconciler) needsUpdate(old, new *dfv1.InterStepBufferService) bool { - if old == nil { - return true - } - if !equality.Semantic.DeepEqual(old.Finalizers, new.Finalizers) { - return true - } - return false -} - func needsFinalizer(isbSvc *dfv1.InterStepBufferService) bool { if isbSvc.Spec.Redis != nil && isbSvc.Spec.Redis.Native != nil && isbSvc.Spec.Redis.Native.Persistence != nil { return true diff --git a/pkg/reconciler/isbsvc/controller_test.go b/pkg/reconciler/isbsvc/controller_test.go index 82aee9d90f..2a24ec7c69 100644 --- a/pkg/reconciler/isbsvc/controller_test.go +++ b/pkg/reconciler/isbsvc/controller_test.go @@ -202,42 +202,18 @@ func TestReconcileJetStream(t *testing.T) { func TestNeedsUpdate(t *testing.T) { t.Run("needs redis update", func(t *testing.T) { testIsbs := nativeRedisIsbs.DeepCopy() - cl := fake.NewClientBuilder().Build() - r := &interStepBufferServiceReconciler{ - client: cl, - scheme: scheme.Scheme, - config: reconciler.FakeGlobalConfig(t, fakeGlobalISBSvcConfig), - logger: zaptest.NewLogger(t).Sugar(), - } - assert.False(t, r.needsUpdate(nativeRedisIsbs, testIsbs)) controllerutil.AddFinalizer(testIsbs, finalizerName) assert.True(t, contains(testIsbs.Finalizers, finalizerName)) - assert.True(t, r.needsUpdate(nativeRedisIsbs, testIsbs)) controllerutil.RemoveFinalizer(testIsbs, finalizerName) assert.False(t, contains(testIsbs.Finalizers, finalizerName)) - assert.False(t, r.needsUpdate(nativeRedisIsbs, testIsbs)) - testIsbs.Status.MarkConfigured() - assert.False(t, r.needsUpdate(nativeRedisIsbs, testIsbs)) }) t.Run("needs jetstream update", func(t *testing.T) { testIsbs := jetStreamIsbs.DeepCopy() - cl := fake.NewClientBuilder().Build() - r := &interStepBufferServiceReconciler{ - client: cl, - scheme: scheme.Scheme, - config: reconciler.FakeGlobalConfig(t, fakeGlobalISBSvcConfig), - logger: zaptest.NewLogger(t).Sugar(), - } - assert.False(t, r.needsUpdate(nativeRedisIsbs, testIsbs)) controllerutil.AddFinalizer(testIsbs, finalizerName) assert.True(t, contains(testIsbs.Finalizers, finalizerName)) - assert.True(t, r.needsUpdate(nativeRedisIsbs, testIsbs)) controllerutil.RemoveFinalizer(testIsbs, finalizerName) assert.False(t, contains(testIsbs.Finalizers, finalizerName)) - assert.False(t, r.needsUpdate(nativeRedisIsbs, testIsbs)) - testIsbs.Status.MarkConfigured() - assert.False(t, r.needsUpdate(nativeRedisIsbs, testIsbs)) }) } diff --git a/pkg/reconciler/monovertex/controller.go b/pkg/reconciler/monovertex/controller.go index 9aca247a05..3fbfb3c1ab 100644 --- a/pkg/reconciler/monovertex/controller.go +++ b/pkg/reconciler/monovertex/controller.go @@ -236,6 +236,8 @@ func (mr *monoVertexReconciler) orchestratePods(ctx context.Context, monoVtx *df monoVtx.Status.CurrentHash = monoVtx.Status.UpdateHash } else { // Update scenario if updatedReplicas >= desiredReplicas { + monoVtx.Status.UpdatedReplicas = uint32(desiredReplicas) + monoVtx.Status.CurrentHash = monoVtx.Status.UpdateHash return nil } diff --git a/pkg/reconciler/monovertex/scaling/scaling.go b/pkg/reconciler/monovertex/scaling/scaling.go index 481408672c..0b35265190 100644 --- a/pkg/reconciler/monovertex/scaling/scaling.go +++ b/pkg/reconciler/monovertex/scaling/scaling.go @@ -19,7 +19,6 @@ package scaling import ( "container/list" "context" - "encoding/json" "fmt" "math" "strings" @@ -30,7 +29,6 @@ import ( "go.uber.org/zap" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" - "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" @@ -359,12 +357,8 @@ func (s *Scaler) Start(ctx context.Context) error { func (s *Scaler) patchMonoVertexReplicas(ctx context.Context, monoVtx *dfv1.MonoVertex, desiredReplicas int32) error { log := logging.FromContext(ctx) origin := monoVtx.Spec.Replicas - monoVtx.Spec.Replicas = ptr.To[int32](desiredReplicas) - body, err := json.Marshal(monoVtx) - if err != nil { - return fmt.Errorf("failed to marshal MonoVertex object to json, %w", err) - } - if err := s.client.Patch(ctx, monoVtx, client.RawPatch(types.MergePatchType, body)); err != nil && !apierrors.IsNotFound(err) { + patchJson := fmt.Sprintf(`{"spec":{"replicas":%d}}`, desiredReplicas) + if err := s.client.Patch(ctx, monoVtx, client.RawPatch(types.MergePatchType, []byte(patchJson))); err != nil && !apierrors.IsNotFound(err) { return fmt.Errorf("failed to patch MonoVertex replicas, %w", err) } log.Infow("Auto scaling - mono vertex replicas changed.", zap.Int32p("from", origin), zap.Int32("to", desiredReplicas), zap.String("namespace", monoVtx.Namespace), zap.String("vertex", monoVtx.Name)) diff --git a/pkg/reconciler/pipeline/controller.go b/pkg/reconciler/pipeline/controller.go index d8b989f2d6..b2f99e7b1d 100644 --- a/pkg/reconciler/pipeline/controller.go +++ b/pkg/reconciler/pipeline/controller.go @@ -18,7 +18,6 @@ package pipeline import ( "context" - "encoding/json" "fmt" "strings" "time" @@ -40,6 +39,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/yaml" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" daemonclient "github.com/numaproj/numaflow/pkg/daemon/client" @@ -51,6 +51,8 @@ import ( const ( finalizerName = dfv1.ControllerPipeline + + pauseTimestampPath = `/metadata/annotations/numaflow.numaproj.io~1pause-timestamp` ) // pipelineReconciler reconciles a pipeline object. @@ -85,9 +87,10 @@ func (r *pipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c log.Errorw("Reconcile error", zap.Error(reconcileErr)) } plCopy.Status.LastUpdated = metav1.Now() - if needsUpdate(pl, plCopy) { - // Update with a DeepCopy because .Status will be cleaned up. - if err := r.client.Update(ctx, plCopy.DeepCopy()); err != nil { + if !equality.Semantic.DeepEqual(pl.Finalizers, plCopy.Finalizers) { + patchYaml := "metadata:\n finalizers: [" + strings.Join(plCopy.Finalizers, ",") + "]" + patchJson, _ := yaml.YAMLToJSON([]byte(patchYaml)) + if err := r.client.Patch(ctx, pl, client.RawPatch(types.MergePatchType, []byte(patchJson))); err != nil { return result, err } } @@ -292,7 +295,9 @@ func (r *pipelineReconciler) reconcileFixedResources(ctx context.Context, pl *df r.recorder.Eventf(pl, corev1.EventTypeNormal, "CreateVertexSuccess", "Created vertex %s successfully", vertexName) } else { if oldObj.GetAnnotations()[dfv1.KeyHash] != newObj.GetAnnotations()[dfv1.KeyHash] { // need to update + originalReplicas := oldObj.Spec.Replicas oldObj.Spec = newObj.Spec + oldObj.Spec.Replicas = originalReplicas oldObj.Annotations[dfv1.KeyHash] = newObj.GetAnnotations()[dfv1.KeyHash] if err := r.client.Update(ctx, &oldObj); err != nil { r.recorder.Eventf(pl, corev1.EventTypeWarning, "UpdateVertexFailed", "Failed to update vertex: %w", err.Error()) @@ -588,17 +593,6 @@ func (r *pipelineReconciler) cleanUpBuffers(ctx context.Context, pl *dfv1.Pipeli return nil } -func needsUpdate(old, new *dfv1.Pipeline) bool { - if old == nil { - return true - } - if !equality.Semantic.DeepEqual(old.Finalizers, new.Finalizers) { - return true - } - - return false -} - func buildVertices(pl *dfv1.Pipeline) map[string]dfv1.Vertex { result := make(map[string]dfv1.Vertex) for _, v := range pl.Spec.Vertices { @@ -814,7 +808,7 @@ func (r *pipelineReconciler) updateDesiredState(ctx context.Context, pl *dfv1.Pi func (r *pipelineReconciler) resumePipeline(ctx context.Context, pl *dfv1.Pipeline) (bool, error) { // reset pause timestamp if pl.GetAnnotations()[dfv1.KeyPauseTimestamp] != "" { - err := r.client.Patch(ctx, pl, client.RawPatch(types.JSONPatchType, []byte(dfv1.RemovePauseTimestampPatch))) + err := r.client.Patch(ctx, pl, client.RawPatch(types.JSONPatchType, []byte(`[{"op": "remove", "path": "`+pauseTimestampPath+`"}]`))) if err != nil { if apierrors.IsNotFound(err) { return false, nil // skip pipeline if it can't be found @@ -837,13 +831,8 @@ func (r *pipelineReconciler) resumePipeline(ctx context.Context, pl *dfv1.Pipeli func (r *pipelineReconciler) pausePipeline(ctx context.Context, pl *dfv1.Pipeline) (bool, error) { // check that annotations / pause timestamp annotation exist if pl.GetAnnotations() == nil || pl.GetAnnotations()[dfv1.KeyPauseTimestamp] == "" { - pl.SetAnnotations(map[string]string{dfv1.KeyPauseTimestamp: time.Now().Format(time.RFC3339)}) - body, err := json.Marshal(pl) - if err != nil { - return false, err - } - err = r.client.Patch(ctx, pl, client.RawPatch(types.MergePatchType, body)) - if err != nil && !apierrors.IsNotFound(err) { + patchJson := `[{"op": "add", "path": "` + pauseTimestampPath + `", "value": "` + time.Now().Format(time.RFC3339) + `"}]` + if err := r.client.Patch(ctx, pl, client.RawPatch(types.JSONPatchType, []byte(patchJson))); err != nil && !apierrors.IsNotFound(err) { return true, err } } @@ -924,12 +913,8 @@ func (r *pipelineReconciler) scaleVertex(ctx context.Context, pl *dfv1.Pipeline, } } } - vertex.Spec.Replicas = ptr.To[int32](scaleTo) - body, err := json.Marshal(vertex) - if err != nil { - return false, err - } - err = r.client.Patch(ctx, &vertex, client.RawPatch(types.MergePatchType, body)) + patchJson := fmt.Sprintf(`{"spec":{"replicas":%d}}`, scaleTo) + err = r.client.Patch(ctx, &vertex, client.RawPatch(types.MergePatchType, []byte(patchJson))) if err != nil && !apierrors.IsNotFound(err) { return false, err } diff --git a/pkg/reconciler/pipeline/controller_test.go b/pkg/reconciler/pipeline/controller_test.go index e130f49656..aafff27cc3 100644 --- a/pkg/reconciler/pipeline/controller_test.go +++ b/pkg/reconciler/pipeline/controller_test.go @@ -37,7 +37,6 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaflow/pkg/reconciler" @@ -352,8 +351,6 @@ func Test_pauseAndResumePipeline(t *testing.T) { v, err := r.findExistingVertices(ctx, testObj) assert.NoError(t, err) assert.Equal(t, int32(0), *v[testObj.Name+"-"+testObj.Spec.Vertices[0].Name].Spec.Replicas) - assert.NotNil(t, testObj.Annotations[dfv1.KeyPauseTimestamp]) - testObj.Annotations[dfv1.KeyPauseTimestamp] = "" _, err = r.resumePipeline(ctx, testObj) assert.NoError(t, err) v, err = r.findExistingVertices(ctx, testObj) @@ -380,8 +377,6 @@ func Test_pauseAndResumePipeline(t *testing.T) { assert.NoError(t, err) _, err = r.findExistingVertices(ctx, testObj) assert.NoError(t, err) - assert.NotNil(t, testObj.Annotations[dfv1.KeyPauseTimestamp]) - testObj.Annotations[dfv1.KeyPauseTimestamp] = "" _, err = r.resumePipeline(ctx, testObj) assert.NoError(t, err) v, err := r.findExistingVertices(ctx, testObj) @@ -560,16 +555,6 @@ func Test_buildISBBatchJob(t *testing.T) { }) } -func Test_needsUpdate(t *testing.T) { - testObj := testPipeline.DeepCopy() - assert.True(t, needsUpdate(nil, testObj)) - assert.False(t, needsUpdate(testPipeline, testObj)) - controllerutil.AddFinalizer(testObj, finalizerName) - assert.True(t, needsUpdate(testPipeline, testObj)) - testobj1 := testObj.DeepCopy() - assert.False(t, needsUpdate(testObj, testobj1)) -} - func Test_cleanupBuffers(t *testing.T) { cl := fake.NewClientBuilder().Build() ctx := context.TODO() diff --git a/pkg/reconciler/vertex/controller.go b/pkg/reconciler/vertex/controller.go index 8789b5d89a..20945639ab 100644 --- a/pkg/reconciler/vertex/controller.go +++ b/pkg/reconciler/vertex/controller.go @@ -259,6 +259,8 @@ func (r *vertexReconciler) orchestratePods(ctx context.Context, vertex *dfv1.Ver vertex.Status.CurrentHash = vertex.Status.UpdateHash } else { // Update scenario if updatedReplicas >= desiredReplicas { + vertex.Status.UpdatedReplicas = uint32(desiredReplicas) + vertex.Status.CurrentHash = vertex.Status.UpdateHash return nil } diff --git a/pkg/reconciler/vertex/scaling/scaling.go b/pkg/reconciler/vertex/scaling/scaling.go index eed5981e89..139e189f5f 100644 --- a/pkg/reconciler/vertex/scaling/scaling.go +++ b/pkg/reconciler/vertex/scaling/scaling.go @@ -19,7 +19,6 @@ package scaling import ( "container/list" "context" - "encoding/json" "fmt" "math" "strings" @@ -30,7 +29,6 @@ import ( "go.uber.org/zap" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" - "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" @@ -499,12 +497,8 @@ loop: func (s *Scaler) patchVertexReplicas(ctx context.Context, vertex *dfv1.Vertex, desiredReplicas int32) error { log := logging.FromContext(ctx) origin := vertex.Spec.Replicas - vertex.Spec.Replicas = ptr.To[int32](desiredReplicas) - body, err := json.Marshal(vertex) - if err != nil { - return fmt.Errorf("failed to marshal vertex object to json, %w", err) - } - if err := s.client.Patch(ctx, vertex, client.RawPatch(types.MergePatchType, body)); err != nil && !apierrors.IsNotFound(err) { + patchJson := fmt.Sprintf(`{"spec":{"replicas":%d}}`, desiredReplicas) + if err := s.client.Patch(ctx, vertex, client.RawPatch(types.MergePatchType, []byte(patchJson))); err != nil && !apierrors.IsNotFound(err) { return fmt.Errorf("failed to patch vertex replicas, %w", err) } log.Infow("Auto scaling - vertex replicas changed.", zap.Int32p("from", origin), zap.Int32("to", desiredReplicas), zap.String("namespace", vertex.Namespace), zap.String("pipeline", vertex.Spec.PipelineName), zap.String("vertex", vertex.Spec.Name))