Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: patch instead of update and bugfix #2059

Merged
merged 3 commits into from
Sep 14, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions pkg/apis/numaflow/v1alpha1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ const (
KeyPauseTimestamp = "numaflow.numaproj.io/pause-timestamp"
KeyDefaultContainer = "kubectl.kubernetes.io/default-container"

RemovePauseTimestampPatch = `[{"op": "remove", "path": "/metadata/annotations/numaflow.numaproj.io~1pause-timestamp"}]`

// ID key in the header of sources like http
KeyMetaID = "X-Numaflow-Id"
KeyMetaEventTime = "X-Numaflow-Event-Time"
Expand Down
23 changes: 14 additions & 9 deletions pkg/reconciler/isbsvc/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,20 @@ package isbsvc

import (
"context"
"strings"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/yaml"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/reconciler"
Expand Down Expand Up @@ -58,7 +61,7 @@ func (r *interStepBufferServiceReconciler) Reconcile(ctx context.Context, req ct
isbSvc := &dfv1.InterStepBufferService{}
if err := r.client.Get(ctx, req.NamespacedName, isbSvc); err != nil {
if apierrors.IsNotFound(err) {
return reconcile.Result{}, nil
return ctrl.Result{}, nil
}
r.logger.Errorw("Unable to get ISB Service", zap.Any("request", req), zap.Error(err))
return ctrl.Result{}, err
Expand All @@ -69,14 +72,15 @@ func (r *interStepBufferServiceReconciler) Reconcile(ctx context.Context, req ct
if reconcileErr != nil {
log.Errorw("Reconcile error", zap.Error(reconcileErr))
}
if r.needsUpdate(isbSvc, isbSvcCopy) {
// Update with a DeepCopy because .Status will be cleaned up.
if err := r.client.Update(ctx, isbSvcCopy.DeepCopy()); err != nil {
return reconcile.Result{}, err
if needsToPatchFinalizers(isbSvc, isbSvcCopy) {
patchYaml := "metadata:\n finalizers: [" + strings.Join(isbSvcCopy.Finalizers, ",") + "]"
patchJson, _ := yaml.YAMLToJSON([]byte(patchYaml))
if err := r.client.Patch(ctx, isbSvc, client.RawPatch(types.MergePatchType, []byte(patchJson))); err != nil {
return ctrl.Result{}, err
}
}
if err := r.client.Status().Update(ctx, isbSvcCopy); err != nil {
return reconcile.Result{}, err
return ctrl.Result{}, err
}
return ctrl.Result{}, reconcileErr
}
Expand Down Expand Up @@ -122,13 +126,14 @@ func (r *interStepBufferServiceReconciler) reconcile(ctx context.Context, isbSvc
return installer.Install(ctx, isbSvc, r.client, r.kubeClient, r.config, log, r.recorder)
}

func (r *interStepBufferServiceReconciler) needsUpdate(old, new *dfv1.InterStepBufferService) bool {
if old == nil {
return true
func needsToPatchFinalizers(old, new *dfv1.InterStepBufferService) bool {
if old == nil { // This is a weird scenario, nothing we can do. Theoretically it will never happen.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since it is a weird scenario should be log so it might help whoever is debugging?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleaned it up since there's no need to add this function.

return false
}
if !equality.Semantic.DeepEqual(old.Finalizers, new.Finalizers) {
return true
}

return false
}

Expand Down
30 changes: 8 additions & 22 deletions pkg/reconciler/isbsvc/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,42 +202,28 @@ func TestReconcileJetStream(t *testing.T) {
func TestNeedsUpdate(t *testing.T) {
t.Run("needs redis update", func(t *testing.T) {
testIsbs := nativeRedisIsbs.DeepCopy()
cl := fake.NewClientBuilder().Build()
r := &interStepBufferServiceReconciler{
client: cl,
scheme: scheme.Scheme,
config: reconciler.FakeGlobalConfig(t, fakeGlobalISBSvcConfig),
logger: zaptest.NewLogger(t).Sugar(),
}
assert.False(t, r.needsUpdate(nativeRedisIsbs, testIsbs))
assert.False(t, needsToPatchFinalizers(nativeRedisIsbs, testIsbs))
controllerutil.AddFinalizer(testIsbs, finalizerName)
assert.True(t, contains(testIsbs.Finalizers, finalizerName))
assert.True(t, r.needsUpdate(nativeRedisIsbs, testIsbs))
assert.True(t, needsToPatchFinalizers(nativeRedisIsbs, testIsbs))
controllerutil.RemoveFinalizer(testIsbs, finalizerName)
assert.False(t, contains(testIsbs.Finalizers, finalizerName))
assert.False(t, r.needsUpdate(nativeRedisIsbs, testIsbs))
assert.False(t, needsToPatchFinalizers(nativeRedisIsbs, testIsbs))
testIsbs.Status.MarkConfigured()
assert.False(t, r.needsUpdate(nativeRedisIsbs, testIsbs))
assert.False(t, needsToPatchFinalizers(nativeRedisIsbs, testIsbs))
})

t.Run("needs jetstream update", func(t *testing.T) {
testIsbs := jetStreamIsbs.DeepCopy()
cl := fake.NewClientBuilder().Build()
r := &interStepBufferServiceReconciler{
client: cl,
scheme: scheme.Scheme,
config: reconciler.FakeGlobalConfig(t, fakeGlobalISBSvcConfig),
logger: zaptest.NewLogger(t).Sugar(),
}
assert.False(t, r.needsUpdate(nativeRedisIsbs, testIsbs))
assert.False(t, needsToPatchFinalizers(nativeRedisIsbs, testIsbs))
controllerutil.AddFinalizer(testIsbs, finalizerName)
assert.True(t, contains(testIsbs.Finalizers, finalizerName))
assert.True(t, r.needsUpdate(nativeRedisIsbs, testIsbs))
assert.True(t, needsToPatchFinalizers(nativeRedisIsbs, testIsbs))
controllerutil.RemoveFinalizer(testIsbs, finalizerName)
assert.False(t, contains(testIsbs.Finalizers, finalizerName))
assert.False(t, r.needsUpdate(nativeRedisIsbs, testIsbs))
assert.False(t, needsToPatchFinalizers(nativeRedisIsbs, testIsbs))
testIsbs.Status.MarkConfigured()
assert.False(t, r.needsUpdate(nativeRedisIsbs, testIsbs))
assert.False(t, needsToPatchFinalizers(nativeRedisIsbs, testIsbs))
})
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/reconciler/monovertex/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ func (mr *monoVertexReconciler) orchestratePods(ctx context.Context, monoVtx *df
monoVtx.Status.CurrentHash = monoVtx.Status.UpdateHash
} else { // Update scenario
if updatedReplicas >= desiredReplicas {
monoVtx.Status.UpdatedReplicas = uint32(desiredReplicas)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixing 4.)

monoVtx.Status.CurrentHash = monoVtx.Status.UpdateHash
return nil
}

Expand Down
10 changes: 2 additions & 8 deletions pkg/reconciler/monovertex/scaling/scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package scaling
import (
"container/list"
"context"
"encoding/json"
"fmt"
"math"
"strings"
Expand All @@ -30,7 +29,6 @@ import (
"go.uber.org/zap"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
Expand Down Expand Up @@ -359,12 +357,8 @@ func (s *Scaler) Start(ctx context.Context) error {
func (s *Scaler) patchMonoVertexReplicas(ctx context.Context, monoVtx *dfv1.MonoVertex, desiredReplicas int32) error {
log := logging.FromContext(ctx)
origin := monoVtx.Spec.Replicas
monoVtx.Spec.Replicas = ptr.To[int32](desiredReplicas)
body, err := json.Marshal(monoVtx)
if err != nil {
return fmt.Errorf("failed to marshal MonoVertex object to json, %w", err)
}
if err := s.client.Patch(ctx, monoVtx, client.RawPatch(types.MergePatchType, body)); err != nil && !apierrors.IsNotFound(err) {
patchJson := fmt.Sprintf(`{"spec":{"replicas":%d}}`, desiredReplicas)
if err := s.client.Patch(ctx, monoVtx, client.RawPatch(types.MergePatchType, []byte(patchJson))); err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to patch MonoVertex replicas, %w", err)
}
log.Infow("Auto scaling - mono vertex replicas changed.", zap.Int32p("from", origin), zap.Int32("to", desiredReplicas), zap.String("namespace", monoVtx.Namespace), zap.String("vertex", monoVtx.Name))
Expand Down
38 changes: 17 additions & 21 deletions pkg/reconciler/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package pipeline

import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
Expand All @@ -40,6 +39,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/yaml"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
daemonclient "github.com/numaproj/numaflow/pkg/daemon/client"
Expand All @@ -51,6 +51,8 @@ import (

const (
finalizerName = dfv1.ControllerPipeline

pauseTimestampPath = `/metadata/annotations/numaflow.numaproj.io~1pause-timestamp`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

~1 what does that mean?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

)

// pipelineReconciler reconciles a pipeline object.
Expand Down Expand Up @@ -85,9 +87,10 @@ func (r *pipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
log.Errorw("Reconcile error", zap.Error(reconcileErr))
}
plCopy.Status.LastUpdated = metav1.Now()
if needsUpdate(pl, plCopy) {
// Update with a DeepCopy because .Status will be cleaned up.
if err := r.client.Update(ctx, plCopy.DeepCopy()); err != nil {
if needsToPatchFinalizers(pl, plCopy) {
patchYaml := "metadata:\n finalizers: [" + strings.Join(plCopy.Finalizers, ",") + "]"
patchJson, _ := yaml.YAMLToJSON([]byte(patchYaml))
if err := r.client.Patch(ctx, pl, client.RawPatch(types.MergePatchType, []byte(patchJson))); err != nil {
return result, err
}
}
Expand Down Expand Up @@ -292,7 +295,9 @@ func (r *pipelineReconciler) reconcileFixedResources(ctx context.Context, pl *df
r.recorder.Eventf(pl, corev1.EventTypeNormal, "CreateVertexSuccess", "Created vertex %s successfully", vertexName)
} else {
if oldObj.GetAnnotations()[dfv1.KeyHash] != newObj.GetAnnotations()[dfv1.KeyHash] { // need to update
originReplicas := oldObj.Spec.Replicas
oldObj.Spec = newObj.Spec
oldObj.Spec.Replicas = originReplicas
oldObj.Annotations[dfv1.KeyHash] = newObj.GetAnnotations()[dfv1.KeyHash]
if err := r.client.Update(ctx, &oldObj); err != nil {
r.recorder.Eventf(pl, corev1.EventTypeWarning, "UpdateVertexFailed", "Failed to update vertex: %w", err.Error())
Expand Down Expand Up @@ -588,9 +593,9 @@ func (r *pipelineReconciler) cleanUpBuffers(ctx context.Context, pl *dfv1.Pipeli
return nil
}

func needsUpdate(old, new *dfv1.Pipeline) bool {
if old == nil {
return true
func needsToPatchFinalizers(old, new *dfv1.Pipeline) bool {
if old == nil { // This is a weird scenario, nothing we can do. Theoretically it will never happen.
return false
}
if !equality.Semantic.DeepEqual(old.Finalizers, new.Finalizers) {
return true
Expand Down Expand Up @@ -814,7 +819,7 @@ func (r *pipelineReconciler) updateDesiredState(ctx context.Context, pl *dfv1.Pi
func (r *pipelineReconciler) resumePipeline(ctx context.Context, pl *dfv1.Pipeline) (bool, error) {
// reset pause timestamp
if pl.GetAnnotations()[dfv1.KeyPauseTimestamp] != "" {
err := r.client.Patch(ctx, pl, client.RawPatch(types.JSONPatchType, []byte(dfv1.RemovePauseTimestampPatch)))
err := r.client.Patch(ctx, pl, client.RawPatch(types.JSONPatchType, []byte(`[{"op": "remove", "path": "`+pauseTimestampPath+`"}]`)))
if err != nil {
if apierrors.IsNotFound(err) {
return false, nil // skip pipeline if it can't be found
Expand All @@ -837,13 +842,8 @@ func (r *pipelineReconciler) resumePipeline(ctx context.Context, pl *dfv1.Pipeli
func (r *pipelineReconciler) pausePipeline(ctx context.Context, pl *dfv1.Pipeline) (bool, error) {
// check that annotations / pause timestamp annotation exist
if pl.GetAnnotations() == nil || pl.GetAnnotations()[dfv1.KeyPauseTimestamp] == "" {
pl.SetAnnotations(map[string]string{dfv1.KeyPauseTimestamp: time.Now().Format(time.RFC3339)})
body, err := json.Marshal(pl)
if err != nil {
return false, err
}
err = r.client.Patch(ctx, pl, client.RawPatch(types.MergePatchType, body))
if err != nil && !apierrors.IsNotFound(err) {
patchJson := `[{"op": "add", "path": "` + pauseTimestampPath + `", "value": "` + time.Now().Format(time.RFC3339) + `"}]`
if err := r.client.Patch(ctx, pl, client.RawPatch(types.JSONPatchType, []byte(patchJson))); err != nil && !apierrors.IsNotFound(err) {
return true, err
}
}
Expand Down Expand Up @@ -924,12 +924,8 @@ func (r *pipelineReconciler) scaleVertex(ctx context.Context, pl *dfv1.Pipeline,
}
}
}
vertex.Spec.Replicas = ptr.To[int32](scaleTo)
body, err := json.Marshal(vertex)
if err != nil {
return false, err
}
err = r.client.Patch(ctx, &vertex, client.RawPatch(types.MergePatchType, body))
patchJson := fmt.Sprintf(`{"spec":{"replicas":%d}}`, scaleTo)
err = r.client.Patch(ctx, &vertex, client.RawPatch(types.MergePatchType, []byte(patchJson)))
if err != nil && !apierrors.IsNotFound(err) {
return false, err
}
Expand Down
12 changes: 4 additions & 8 deletions pkg/reconciler/pipeline/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,6 @@ func Test_pauseAndResumePipeline(t *testing.T) {
v, err := r.findExistingVertices(ctx, testObj)
assert.NoError(t, err)
assert.Equal(t, int32(0), *v[testObj.Name+"-"+testObj.Spec.Vertices[0].Name].Spec.Replicas)
assert.NotNil(t, testObj.Annotations[dfv1.KeyPauseTimestamp])
testObj.Annotations[dfv1.KeyPauseTimestamp] = ""
_, err = r.resumePipeline(ctx, testObj)
assert.NoError(t, err)
v, err = r.findExistingVertices(ctx, testObj)
Expand All @@ -380,8 +378,6 @@ func Test_pauseAndResumePipeline(t *testing.T) {
assert.NoError(t, err)
_, err = r.findExistingVertices(ctx, testObj)
assert.NoError(t, err)
assert.NotNil(t, testObj.Annotations[dfv1.KeyPauseTimestamp])
testObj.Annotations[dfv1.KeyPauseTimestamp] = ""
_, err = r.resumePipeline(ctx, testObj)
assert.NoError(t, err)
v, err := r.findExistingVertices(ctx, testObj)
Expand Down Expand Up @@ -562,12 +558,12 @@ func Test_buildISBBatchJob(t *testing.T) {

func Test_needsUpdate(t *testing.T) {
testObj := testPipeline.DeepCopy()
assert.True(t, needsUpdate(nil, testObj))
assert.False(t, needsUpdate(testPipeline, testObj))
assert.False(t, needsToPatchFinalizers(nil, testObj))
assert.False(t, needsToPatchFinalizers(testPipeline, testObj))
controllerutil.AddFinalizer(testObj, finalizerName)
assert.True(t, needsUpdate(testPipeline, testObj))
assert.True(t, needsToPatchFinalizers(testPipeline, testObj))
testobj1 := testObj.DeepCopy()
assert.False(t, needsUpdate(testObj, testobj1))
assert.False(t, needsToPatchFinalizers(testObj, testobj1))
}

func Test_cleanupBuffers(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/reconciler/vertex/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ func (r *vertexReconciler) orchestratePods(ctx context.Context, vertex *dfv1.Ver
vertex.Status.CurrentHash = vertex.Status.UpdateHash
} else { // Update scenario
if updatedReplicas >= desiredReplicas {
vertex.Status.UpdatedReplicas = uint32(desiredReplicas)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixing 4.)

vertex.Status.CurrentHash = vertex.Status.UpdateHash
return nil
}

Expand Down
10 changes: 2 additions & 8 deletions pkg/reconciler/vertex/scaling/scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package scaling
import (
"container/list"
"context"
"encoding/json"
"fmt"
"math"
"strings"
Expand All @@ -30,7 +29,6 @@ import (
"go.uber.org/zap"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
Expand Down Expand Up @@ -499,12 +497,8 @@ loop:
func (s *Scaler) patchVertexReplicas(ctx context.Context, vertex *dfv1.Vertex, desiredReplicas int32) error {
log := logging.FromContext(ctx)
origin := vertex.Spec.Replicas
vertex.Spec.Replicas = ptr.To[int32](desiredReplicas)
body, err := json.Marshal(vertex)
if err != nil {
return fmt.Errorf("failed to marshal vertex object to json, %w", err)
}
if err := s.client.Patch(ctx, vertex, client.RawPatch(types.MergePatchType, body)); err != nil && !apierrors.IsNotFound(err) {
patchJson := fmt.Sprintf(`{"spec":{"replicas":%d}}`, desiredReplicas)
if err := s.client.Patch(ctx, vertex, client.RawPatch(types.MergePatchType, []byte(patchJson))); err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to patch vertex replicas, %w", err)
}
log.Infow("Auto scaling - vertex replicas changed.", zap.Int32p("from", origin), zap.Int32("to", desiredReplicas), zap.String("namespace", vertex.Namespace), zap.String("pipeline", vertex.Spec.PipelineName), zap.String("vertex", vertex.Spec.Name))
Expand Down
Loading