From 8c9800eb312f75aecd203e81c300d4793a6576d6 Mon Sep 17 00:00:00 2001 From: Xavi Garcia Date: Wed, 22 Jan 2025 17:10:02 +0100 Subject: [PATCH 1/2] [v0.10] - Always returns result with RequeueAfter set Changes the gitops reconciler result to always have the polling interval in RequeueAfter. This way we take advantage of the fact that if there is already an event in the RequeueAfter queue the next ones will be dropped and we won't miss the polling cycle if there is a race condition between RequeueAfter events and any other event in the reconciler. This is only added when `DisablePolling` is not set. Signed-off-by: Xavi Garcia --- .../gitops/reconciler/gitjob_controller.go | 52 +++++++++++++------ 1 file changed, 37 insertions(+), 15 deletions(-) diff --git a/internal/cmd/controller/gitops/reconciler/gitjob_controller.go b/internal/cmd/controller/gitops/reconciler/gitjob_controller.go index 51c7d9c338..533d3fcf00 100644 --- a/internal/cmd/controller/gitops/reconciler/gitjob_controller.go +++ b/internal/cmd/controller/gitops/reconciler/gitjob_controller.go @@ -211,16 +211,16 @@ func (r *GitJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr err = SetStatusFromBundleDeployments(ctx, r.Client, gitrepo) if err != nil { - return result(repoPolled, gitrepo), updateErrorStatus(ctx, r.Client, req.NamespacedName, gitrepo.Status, err) + return r.result(gitrepo), updateErrorStatus(ctx, r.Client, req.NamespacedName, gitrepo.Status, err) } err = SetStatusFromBundles(ctx, r.Client, gitrepo) if err != nil { - return result(repoPolled, gitrepo), updateErrorStatus(ctx, r.Client, req.NamespacedName, gitrepo.Status, err) + return r.result(gitrepo), updateErrorStatus(ctx, r.Client, req.NamespacedName, gitrepo.Status, err) } if err = UpdateDisplayState(gitrepo); err != nil { - return result(repoPolled, gitrepo), updateErrorStatus(ctx, r.Client, req.NamespacedName, gitrepo.Status, err) + return r.result(gitrepo), updateErrorStatus(ctx, r.Client, req.NamespacedName, gitrepo.Status, err) } gitrepo.Status.Display.ReadyBundleDeployments = fmt.Sprintf("%d/%d", @@ -249,17 +249,17 @@ func (r *GitJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr // in a ready status condition on the bundle. err = r.setReadyStatusFromBundle(ctx, gitrepo) if err != nil { - return result(repoPolled, gitrepo), err + return r.result(gitrepo), err } err = updateStatus(ctx, r.Client, req.NamespacedName, gitrepo.Status) if err != nil { logger.Error(err, "Reconcile failed final update to git repo status", "status", gitrepo.Status) - return result(repoPolled, gitrepo), err + return r.result(gitrepo), err } - return result(repoPolled, gitrepo), nil + return r.result(gitrepo), nil } // manageGitJob is responsible for creating, updating and deleting the GitJob and setting the GitRepo's status accordingly @@ -273,7 +273,7 @@ func (r *GitJobReconciler) manageGitJob(ctx context.Context, logger logr.Logger, if err != nil && !errors.IsNotFound(err) { err = fmt.Errorf("error retrieving git job: %w", err) r.Recorder.Event(gitrepo, fleetevent.Warning, "FailedToGetGitJob", err.Error()) - return result(repoPolled, gitrepo), err + return r.result(gitrepo), err } if errors.IsNotFound(err) { @@ -296,16 +296,16 @@ func (r *GitJobReconciler) manageGitJob(ctx context.Context, logger logr.Logger, r.updateGenerationValuesIfNeeded(gitrepo) if err := r.validateExternalSecretExist(ctx, gitrepo); err != nil { r.Recorder.Event(gitrepo, fleetevent.Warning, "FailedValidatingSecret", err.Error()) - return result(repoPolled, gitrepo), updateErrorStatus(ctx, r.Client, name, gitrepo.Status, err) + return r.result(gitrepo), updateErrorStatus(ctx, r.Client, name, gitrepo.Status, err) } if err := r.createJobAndResources(ctx, gitrepo, logger); err != nil { - return result(repoPolled, gitrepo), err + return r.result(gitrepo), err } } } else if gitrepo.Status.Commit != "" && gitrepo.Status.Commit == oldCommit { err, recreateGitJob := r.deleteJobIfNeeded(ctx, gitrepo, &job) if err != nil { - return result(repoPolled, gitrepo), fmt.Errorf("error deleting git job: %w", err) + return r.result(gitrepo), fmt.Errorf("error deleting git job: %w", err) } // job was deleted and we need to recreate it // Requeue so the reconciler creates the job again @@ -317,7 +317,7 @@ func (r *GitJobReconciler) manageGitJob(ctx context.Context, logger logr.Logger, gitrepo.Status.ObservedGeneration = gitrepo.Generation if err = setStatusFromGitjob(ctx, r.Client, gitrepo, &job); err != nil { - return result(repoPolled, gitrepo), updateErrorStatus(ctx, r.Client, name, gitrepo.Status, err) + return r.result(gitrepo), updateErrorStatus(ctx, r.Client, name, gitrepo.Status, err) } return reconcile.Result{}, nil @@ -1222,11 +1222,33 @@ func getPollingIntervalDuration(gitrepo *v1alpha1.GitRepo) time.Duration { return gitrepo.Spec.PollingInterval.Duration } -func result(repoPolled bool, gitrepo *v1alpha1.GitRepo) reconcile.Result { - if repoPolled { - return reconcile.Result{RequeueAfter: getPollingIntervalDuration(gitrepo)} +func (r *GitJobReconciler) result(gitrepo *v1alpha1.GitRepo) reconcile.Result { + // We always reeturn a reconcile Result with RequeueAfter set to the polling interval + // unless polling is disabled. + // This is done to ensure the polling cycle is never broken due to race conditions + // between regular events and RequeueAfter events. + // Requeuing more events when there is already an event in the queue is not a problem + // because controller-runtime ignores events with higher timestamp + // For example, if we have an event in the queue that should be executed at time X + // and we try to enqueue another event that should be executed at time X+10 it will be + // dropped. + // If we try to enqueue an event at time X-10, it will replace the one in the queue. + // The queue will always keep the event that should be triggered earlier. + if gitrepo.Spec.DisablePolling { + return reconcile.Result{} + } + + // calculate the new polling interval substracting the time elapsed since last polling time happened + // That adjusts possible drifts due to heavy workload, etc. + pollingDuration := getPollingIntervalDuration(gitrepo) - (r.Clock.Now().Sub(gitrepo.Status.LastPollingTime.Time)) + if pollingDuration <= 0 { + // This is a protection for cases in which the calculation above is 0 or less. + // In those cases controller-runtime does not call AddAfter for this object and + // the RequeueAfter cycle is lost. + // To ensure that this cycle is not broken we force the object to be requeued. + return reconcile.Result{Requeue: true} } - return reconcile.Result{} + return reconcile.Result{RequeueAfter: pollingDuration} } func webhookCommitChangedPredicate() predicate.Predicate { From 32e7cf5e8133a302b1170a3e3fa33e18e58236ce Mon Sep 17 00:00:00 2001 From: Xavi Garcia Date: Thu, 23 Jan 2025 13:59:19 +0100 Subject: [PATCH 2/2] Changes after review Signed-off-by: Xavi Garcia --- .../gitops/reconciler/gitjob_controller.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/internal/cmd/controller/gitops/reconciler/gitjob_controller.go b/internal/cmd/controller/gitops/reconciler/gitjob_controller.go index 533d3fcf00..c98a59fbaa 100644 --- a/internal/cmd/controller/gitops/reconciler/gitjob_controller.go +++ b/internal/cmd/controller/gitops/reconciler/gitjob_controller.go @@ -1223,7 +1223,7 @@ func getPollingIntervalDuration(gitrepo *v1alpha1.GitRepo) time.Duration { } func (r *GitJobReconciler) result(gitrepo *v1alpha1.GitRepo) reconcile.Result { - // We always reeturn a reconcile Result with RequeueAfter set to the polling interval + // We always return a reconcile Result with RequeueAfter set to the polling interval // unless polling is disabled. // This is done to ensure the polling cycle is never broken due to race conditions // between regular events and RequeueAfter events. @@ -1238,17 +1238,18 @@ func (r *GitJobReconciler) result(gitrepo *v1alpha1.GitRepo) reconcile.Result { return reconcile.Result{} } - // calculate the new polling interval substracting the time elapsed since last polling time happened - // That adjusts possible drifts due to heavy workload, etc. - pollingDuration := getPollingIntervalDuration(gitrepo) - (r.Clock.Now().Sub(gitrepo.Status.LastPollingTime.Time)) - if pollingDuration <= 0 { + // Calculate next reconciliation schedule based on the elapsed time since the last polling + // so it matches the configured polling interval. + // A fixed value may lead to drifts due to out-of-schedule reconciliations. + requeueAfter := getPollingIntervalDuration(gitrepo) - r.Clock.Since(gitrepo.Status.LastPollingTime.Time) + if requeueAfter <= 0 { // This is a protection for cases in which the calculation above is 0 or less. // In those cases controller-runtime does not call AddAfter for this object and // the RequeueAfter cycle is lost. // To ensure that this cycle is not broken we force the object to be requeued. return reconcile.Result{Requeue: true} } - return reconcile.Result{RequeueAfter: pollingDuration} + return reconcile.Result{RequeueAfter: requeueAfter} } func webhookCommitChangedPredicate() predicate.Predicate {