Skip to content

Commit

Permalink
chore: patch instead of update and bugfix (#2059)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy committed Sep 26, 2024
1 parent 782872f commit 0be7b11
Show file tree
Hide file tree
Showing 9 changed files with 32 additions and 102 deletions.
2 changes: 0 additions & 2 deletions pkg/apis/numaflow/v1alpha1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ const (
KeyPauseTimestamp = "numaflow.numaproj.io/pause-timestamp"
KeyDefaultContainer = "kubectl.kubernetes.io/default-container"

RemovePauseTimestampPatch = `[{"op": "remove", "path": "/metadata/annotations/numaflow.numaproj.io~1pause-timestamp"}]`

// ID key in the header of sources like http
KeyMetaID = "X-Numaflow-Id"
KeyMetaEventTime = "X-Numaflow-Event-Time"
Expand Down
26 changes: 10 additions & 16 deletions pkg/reconciler/isbsvc/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,20 @@ package isbsvc

import (
"context"
"strings"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/yaml"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/reconciler"
Expand Down Expand Up @@ -58,7 +61,7 @@ func (r *interStepBufferServiceReconciler) Reconcile(ctx context.Context, req ct
isbSvc := &dfv1.InterStepBufferService{}
if err := r.client.Get(ctx, req.NamespacedName, isbSvc); err != nil {
if apierrors.IsNotFound(err) {
return reconcile.Result{}, nil
return ctrl.Result{}, nil
}
r.logger.Errorw("Unable to get ISB Service", zap.Any("request", req), zap.Error(err))
return ctrl.Result{}, err
Expand All @@ -69,14 +72,15 @@ func (r *interStepBufferServiceReconciler) Reconcile(ctx context.Context, req ct
if reconcileErr != nil {
log.Errorw("Reconcile error", zap.Error(reconcileErr))
}
if r.needsUpdate(isbSvc, isbSvcCopy) {
// Update with a DeepCopy because .Status will be cleaned up.
if err := r.client.Update(ctx, isbSvcCopy.DeepCopy()); err != nil {
return reconcile.Result{}, err
if !equality.Semantic.DeepEqual(isbSvc.Finalizers, isbSvcCopy.Finalizers) {
patchYaml := "metadata:\n finalizers: [" + strings.Join(isbSvcCopy.Finalizers, ",") + "]"
patchJson, _ := yaml.YAMLToJSON([]byte(patchYaml))
if err := r.client.Patch(ctx, isbSvc, client.RawPatch(types.MergePatchType, []byte(patchJson))); err != nil {
return ctrl.Result{}, err
}
}
if err := r.client.Status().Update(ctx, isbSvcCopy); err != nil {
return reconcile.Result{}, err
return ctrl.Result{}, err
}
return ctrl.Result{}, reconcileErr
}
Expand Down Expand Up @@ -122,16 +126,6 @@ func (r *interStepBufferServiceReconciler) reconcile(ctx context.Context, isbSvc
return installer.Install(ctx, isbSvc, r.client, r.kubeClient, r.config, log, r.recorder)
}

func (r *interStepBufferServiceReconciler) needsUpdate(old, new *dfv1.InterStepBufferService) bool {
if old == nil {
return true
}
if !equality.Semantic.DeepEqual(old.Finalizers, new.Finalizers) {
return true
}
return false
}

func needsFinalizer(isbSvc *dfv1.InterStepBufferService) bool {
if isbSvc.Spec.Redis != nil && isbSvc.Spec.Redis.Native != nil && isbSvc.Spec.Redis.Native.Persistence != nil {
return true
Expand Down
24 changes: 0 additions & 24 deletions pkg/reconciler/isbsvc/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,42 +202,18 @@ func TestReconcileJetStream(t *testing.T) {
func TestNeedsUpdate(t *testing.T) {
t.Run("needs redis update", func(t *testing.T) {
testIsbs := nativeRedisIsbs.DeepCopy()
cl := fake.NewClientBuilder().Build()
r := &interStepBufferServiceReconciler{
client: cl,
scheme: scheme.Scheme,
config: reconciler.FakeGlobalConfig(t, fakeGlobalISBSvcConfig),
logger: zaptest.NewLogger(t).Sugar(),
}
assert.False(t, r.needsUpdate(nativeRedisIsbs, testIsbs))
controllerutil.AddFinalizer(testIsbs, finalizerName)
assert.True(t, contains(testIsbs.Finalizers, finalizerName))
assert.True(t, r.needsUpdate(nativeRedisIsbs, testIsbs))
controllerutil.RemoveFinalizer(testIsbs, finalizerName)
assert.False(t, contains(testIsbs.Finalizers, finalizerName))
assert.False(t, r.needsUpdate(nativeRedisIsbs, testIsbs))
testIsbs.Status.MarkConfigured()
assert.False(t, r.needsUpdate(nativeRedisIsbs, testIsbs))
})

t.Run("needs jetstream update", func(t *testing.T) {
testIsbs := jetStreamIsbs.DeepCopy()
cl := fake.NewClientBuilder().Build()
r := &interStepBufferServiceReconciler{
client: cl,
scheme: scheme.Scheme,
config: reconciler.FakeGlobalConfig(t, fakeGlobalISBSvcConfig),
logger: zaptest.NewLogger(t).Sugar(),
}
assert.False(t, r.needsUpdate(nativeRedisIsbs, testIsbs))
controllerutil.AddFinalizer(testIsbs, finalizerName)
assert.True(t, contains(testIsbs.Finalizers, finalizerName))
assert.True(t, r.needsUpdate(nativeRedisIsbs, testIsbs))
controllerutil.RemoveFinalizer(testIsbs, finalizerName)
assert.False(t, contains(testIsbs.Finalizers, finalizerName))
assert.False(t, r.needsUpdate(nativeRedisIsbs, testIsbs))
testIsbs.Status.MarkConfigured()
assert.False(t, r.needsUpdate(nativeRedisIsbs, testIsbs))
})
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/reconciler/monovertex/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ func (mr *monoVertexReconciler) orchestratePods(ctx context.Context, monoVtx *df
monoVtx.Status.CurrentHash = monoVtx.Status.UpdateHash
} else { // Update scenario
if updatedReplicas >= desiredReplicas {
monoVtx.Status.UpdatedReplicas = uint32(desiredReplicas)
monoVtx.Status.CurrentHash = monoVtx.Status.UpdateHash
return nil
}

Expand Down
10 changes: 2 additions & 8 deletions pkg/reconciler/monovertex/scaling/scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package scaling
import (
"container/list"
"context"
"encoding/json"
"fmt"
"math"
"strings"
Expand All @@ -30,7 +29,6 @@ import (
"go.uber.org/zap"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
Expand Down Expand Up @@ -359,12 +357,8 @@ func (s *Scaler) Start(ctx context.Context) error {
func (s *Scaler) patchMonoVertexReplicas(ctx context.Context, monoVtx *dfv1.MonoVertex, desiredReplicas int32) error {
log := logging.FromContext(ctx)
origin := monoVtx.Spec.Replicas
monoVtx.Spec.Replicas = ptr.To[int32](desiredReplicas)
body, err := json.Marshal(monoVtx)
if err != nil {
return fmt.Errorf("failed to marshal MonoVertex object to json, %w", err)
}
if err := s.client.Patch(ctx, monoVtx, client.RawPatch(types.MergePatchType, body)); err != nil && !apierrors.IsNotFound(err) {
patchJson := fmt.Sprintf(`{"spec":{"replicas":%d}}`, desiredReplicas)
if err := s.client.Patch(ctx, monoVtx, client.RawPatch(types.MergePatchType, []byte(patchJson))); err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to patch MonoVertex replicas, %w", err)
}
log.Infow("Auto scaling - mono vertex replicas changed.", zap.Int32p("from", origin), zap.Int32("to", desiredReplicas), zap.String("namespace", monoVtx.Namespace), zap.String("vertex", monoVtx.Name))
Expand Down
43 changes: 14 additions & 29 deletions pkg/reconciler/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package pipeline

import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
Expand All @@ -40,6 +39,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/yaml"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
daemonclient "github.com/numaproj/numaflow/pkg/daemon/client"
Expand All @@ -51,6 +51,8 @@ import (

const (
finalizerName = dfv1.ControllerPipeline

pauseTimestampPath = `/metadata/annotations/numaflow.numaproj.io~1pause-timestamp`
)

// pipelineReconciler reconciles a pipeline object.
Expand Down Expand Up @@ -85,9 +87,10 @@ func (r *pipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
log.Errorw("Reconcile error", zap.Error(reconcileErr))
}
plCopy.Status.LastUpdated = metav1.Now()
if needsUpdate(pl, plCopy) {
// Update with a DeepCopy because .Status will be cleaned up.
if err := r.client.Update(ctx, plCopy.DeepCopy()); err != nil {
if !equality.Semantic.DeepEqual(pl.Finalizers, plCopy.Finalizers) {
patchYaml := "metadata:\n finalizers: [" + strings.Join(plCopy.Finalizers, ",") + "]"
patchJson, _ := yaml.YAMLToJSON([]byte(patchYaml))
if err := r.client.Patch(ctx, pl, client.RawPatch(types.MergePatchType, []byte(patchJson))); err != nil {
return result, err
}
}
Expand Down Expand Up @@ -292,7 +295,9 @@ func (r *pipelineReconciler) reconcileFixedResources(ctx context.Context, pl *df
r.recorder.Eventf(pl, corev1.EventTypeNormal, "CreateVertexSuccess", "Created vertex %s successfully", vertexName)
} else {
if oldObj.GetAnnotations()[dfv1.KeyHash] != newObj.GetAnnotations()[dfv1.KeyHash] { // need to update
originalReplicas := oldObj.Spec.Replicas
oldObj.Spec = newObj.Spec
oldObj.Spec.Replicas = originalReplicas
oldObj.Annotations[dfv1.KeyHash] = newObj.GetAnnotations()[dfv1.KeyHash]
if err := r.client.Update(ctx, &oldObj); err != nil {
r.recorder.Eventf(pl, corev1.EventTypeWarning, "UpdateVertexFailed", "Failed to update vertex: %w", err.Error())
Expand Down Expand Up @@ -588,17 +593,6 @@ func (r *pipelineReconciler) cleanUpBuffers(ctx context.Context, pl *dfv1.Pipeli
return nil
}

func needsUpdate(old, new *dfv1.Pipeline) bool {
if old == nil {
return true
}
if !equality.Semantic.DeepEqual(old.Finalizers, new.Finalizers) {
return true
}

return false
}

func buildVertices(pl *dfv1.Pipeline) map[string]dfv1.Vertex {
result := make(map[string]dfv1.Vertex)
for _, v := range pl.Spec.Vertices {
Expand Down Expand Up @@ -814,7 +808,7 @@ func (r *pipelineReconciler) updateDesiredState(ctx context.Context, pl *dfv1.Pi
func (r *pipelineReconciler) resumePipeline(ctx context.Context, pl *dfv1.Pipeline) (bool, error) {
// reset pause timestamp
if pl.GetAnnotations()[dfv1.KeyPauseTimestamp] != "" {
err := r.client.Patch(ctx, pl, client.RawPatch(types.JSONPatchType, []byte(dfv1.RemovePauseTimestampPatch)))
err := r.client.Patch(ctx, pl, client.RawPatch(types.JSONPatchType, []byte(`[{"op": "remove", "path": "`+pauseTimestampPath+`"}]`)))
if err != nil {
if apierrors.IsNotFound(err) {
return false, nil // skip pipeline if it can't be found
Expand All @@ -837,13 +831,8 @@ func (r *pipelineReconciler) resumePipeline(ctx context.Context, pl *dfv1.Pipeli
func (r *pipelineReconciler) pausePipeline(ctx context.Context, pl *dfv1.Pipeline) (bool, error) {
// check that annotations / pause timestamp annotation exist
if pl.GetAnnotations() == nil || pl.GetAnnotations()[dfv1.KeyPauseTimestamp] == "" {
pl.SetAnnotations(map[string]string{dfv1.KeyPauseTimestamp: time.Now().Format(time.RFC3339)})
body, err := json.Marshal(pl)
if err != nil {
return false, err
}
err = r.client.Patch(ctx, pl, client.RawPatch(types.MergePatchType, body))
if err != nil && !apierrors.IsNotFound(err) {
patchJson := `[{"op": "add", "path": "` + pauseTimestampPath + `", "value": "` + time.Now().Format(time.RFC3339) + `"}]`
if err := r.client.Patch(ctx, pl, client.RawPatch(types.JSONPatchType, []byte(patchJson))); err != nil && !apierrors.IsNotFound(err) {
return true, err
}
}
Expand Down Expand Up @@ -924,12 +913,8 @@ func (r *pipelineReconciler) scaleVertex(ctx context.Context, pl *dfv1.Pipeline,
}
}
}
vertex.Spec.Replicas = ptr.To[int32](scaleTo)
body, err := json.Marshal(vertex)
if err != nil {
return false, err
}
err = r.client.Patch(ctx, &vertex, client.RawPatch(types.MergePatchType, body))
patchJson := fmt.Sprintf(`{"spec":{"replicas":%d}}`, scaleTo)
err = r.client.Patch(ctx, &vertex, client.RawPatch(types.MergePatchType, []byte(patchJson)))
if err != nil && !apierrors.IsNotFound(err) {
return false, err
}
Expand Down
15 changes: 0 additions & 15 deletions pkg/reconciler/pipeline/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/reconciler"
Expand Down Expand Up @@ -352,8 +351,6 @@ func Test_pauseAndResumePipeline(t *testing.T) {
v, err := r.findExistingVertices(ctx, testObj)
assert.NoError(t, err)
assert.Equal(t, int32(0), *v[testObj.Name+"-"+testObj.Spec.Vertices[0].Name].Spec.Replicas)
assert.NotNil(t, testObj.Annotations[dfv1.KeyPauseTimestamp])
testObj.Annotations[dfv1.KeyPauseTimestamp] = ""
_, err = r.resumePipeline(ctx, testObj)
assert.NoError(t, err)
v, err = r.findExistingVertices(ctx, testObj)
Expand All @@ -380,8 +377,6 @@ func Test_pauseAndResumePipeline(t *testing.T) {
assert.NoError(t, err)
_, err = r.findExistingVertices(ctx, testObj)
assert.NoError(t, err)
assert.NotNil(t, testObj.Annotations[dfv1.KeyPauseTimestamp])
testObj.Annotations[dfv1.KeyPauseTimestamp] = ""
_, err = r.resumePipeline(ctx, testObj)
assert.NoError(t, err)
v, err := r.findExistingVertices(ctx, testObj)
Expand Down Expand Up @@ -560,16 +555,6 @@ func Test_buildISBBatchJob(t *testing.T) {
})
}

func Test_needsUpdate(t *testing.T) {
testObj := testPipeline.DeepCopy()
assert.True(t, needsUpdate(nil, testObj))
assert.False(t, needsUpdate(testPipeline, testObj))
controllerutil.AddFinalizer(testObj, finalizerName)
assert.True(t, needsUpdate(testPipeline, testObj))
testobj1 := testObj.DeepCopy()
assert.False(t, needsUpdate(testObj, testobj1))
}

func Test_cleanupBuffers(t *testing.T) {
cl := fake.NewClientBuilder().Build()
ctx := context.TODO()
Expand Down
2 changes: 2 additions & 0 deletions pkg/reconciler/vertex/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ func (r *vertexReconciler) orchestratePods(ctx context.Context, vertex *dfv1.Ver
vertex.Status.CurrentHash = vertex.Status.UpdateHash
} else { // Update scenario
if updatedReplicas >= desiredReplicas {
vertex.Status.UpdatedReplicas = uint32(desiredReplicas)
vertex.Status.CurrentHash = vertex.Status.UpdateHash
return nil
}

Expand Down
10 changes: 2 additions & 8 deletions pkg/reconciler/vertex/scaling/scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package scaling
import (
"container/list"
"context"
"encoding/json"
"fmt"
"math"
"strings"
Expand All @@ -30,7 +29,6 @@ import (
"go.uber.org/zap"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
Expand Down Expand Up @@ -499,12 +497,8 @@ loop:
func (s *Scaler) patchVertexReplicas(ctx context.Context, vertex *dfv1.Vertex, desiredReplicas int32) error {
log := logging.FromContext(ctx)
origin := vertex.Spec.Replicas
vertex.Spec.Replicas = ptr.To[int32](desiredReplicas)
body, err := json.Marshal(vertex)
if err != nil {
return fmt.Errorf("failed to marshal vertex object to json, %w", err)
}
if err := s.client.Patch(ctx, vertex, client.RawPatch(types.MergePatchType, body)); err != nil && !apierrors.IsNotFound(err) {
patchJson := fmt.Sprintf(`{"spec":{"replicas":%d}}`, desiredReplicas)
if err := s.client.Patch(ctx, vertex, client.RawPatch(types.MergePatchType, []byte(patchJson))); err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to patch vertex replicas, %w", err)
}
log.Infow("Auto scaling - vertex replicas changed.", zap.Int32p("from", origin), zap.Int32("to", desiredReplicas), zap.String("namespace", vertex.Namespace), zap.String("pipeline", vertex.Spec.PipelineName), zap.String("vertex", vertex.Spec.Name))
Expand Down

0 comments on commit 0be7b11

Please sign in to comment.