Skip to content

Commit

Permalink
tt
Browse files Browse the repository at this point in the history
  • Loading branch information
0xavi0 committed Jan 24, 2025
1 parent 4aada7e commit 4ba5b4b
Showing 1 changed file with 38 additions and 19 deletions.
57 changes: 38 additions & 19 deletions internal/cmd/controller/gitops/reconciler/gitjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,8 @@ func (r *GitJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
// From this point onwards we have to take into account if the poller
// task was executed.
// If so, we need to return a Result with EnqueueAfter set.
result := reconcile.Result{}
if repoPolled {
result = reconcile.Result{RequeueAfter: getPollingIntervalDuration(gitrepo)}
result.RequeueAfter = addJitter(result.RequeueAfter)
}

res, err := r.manageGitJob(ctx, logger, gitrepo, oldCommit, repoPolled, result)
res, err := r.manageGitJob(ctx, logger, gitrepo, oldCommit, repoPolled)
if err != nil || res.Requeue {
return res, err
}
Expand All @@ -205,10 +200,10 @@ func (r *GitJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
if err != nil {
logger.Error(err, "Reconcile failed final update to git repo status", "status", gitrepo.Status)

return result, err
return r.result(gitrepo), err
}

return result, nil
return r.result(gitrepo), nil
}

// addJitter to the requeue time to avoid thundering herd
Expand All @@ -222,7 +217,7 @@ func addJitter(d time.Duration) time.Duration {
}

// manageGitJob is responsible for creating, updating and deleting the GitJob and setting the GitRepo's status accordingly
func (r *GitJobReconciler) manageGitJob(ctx context.Context, logger logr.Logger, gitrepo *v1alpha1.GitRepo, oldCommit string, repoPolled bool, oldResult reconcile.Result) (reconcile.Result, error) {
func (r *GitJobReconciler) manageGitJob(ctx context.Context, logger logr.Logger, gitrepo *v1alpha1.GitRepo, oldCommit string, repoPolled bool) (reconcile.Result, error) {
name := types.NamespacedName{Namespace: gitrepo.Namespace, Name: gitrepo.Name}
var job batchv1.Job
err := r.Get(ctx, types.NamespacedName{
Expand All @@ -232,7 +227,7 @@ func (r *GitJobReconciler) manageGitJob(ctx context.Context, logger logr.Logger,
if err != nil && !errors.IsNotFound(err) {
err = fmt.Errorf("error retrieving git job: %w", err)
r.Recorder.Event(gitrepo, fleetevent.Warning, "FailedToGetGitJob", err.Error())
return oldResult, err
return r.result(gitrepo), err
}

if errors.IsNotFound(err) {
Expand All @@ -255,16 +250,16 @@ func (r *GitJobReconciler) manageGitJob(ctx context.Context, logger logr.Logger,
r.updateGenerationValuesIfNeeded(gitrepo)
if err := r.validateExternalSecretExist(ctx, gitrepo); err != nil {
r.Recorder.Event(gitrepo, fleetevent.Warning, "FailedValidatingSecret", err.Error())
return oldResult, updateErrorStatus(ctx, r.Client, name, gitrepo.Status, err)
return r.result(gitrepo), updateErrorStatus(ctx, r.Client, name, gitrepo.Status, err)
}
if err := r.createJobAndResources(ctx, gitrepo, logger); err != nil {
return oldResult, err
return r.result(gitrepo), err
}
}
} else if gitrepo.Status.Commit != "" && gitrepo.Status.Commit == oldCommit {
err, recreateGitJob := r.deleteJobIfNeeded(ctx, gitrepo, &job)
if err != nil {
return oldResult, fmt.Errorf("error deleting git job: %w", err)
return r.result(gitrepo), fmt.Errorf("error deleting git job: %w", err)
}
// job was deleted and we need to recreate it
// Requeue so the reconciler creates the job again
Expand All @@ -276,7 +271,7 @@ func (r *GitJobReconciler) manageGitJob(ctx context.Context, logger logr.Logger,
gitrepo.Status.ObservedGeneration = gitrepo.Generation

if err = setStatusFromGitjob(ctx, r.Client, gitrepo, &job); err != nil {
return oldResult, updateErrorStatus(ctx, r.Client, name, gitrepo.Status, err)
return r.result(gitrepo), updateErrorStatus(ctx, r.Client, name, gitrepo.Status, err)
}

return reconcile.Result{}, nil
Expand Down Expand Up @@ -1157,11 +1152,35 @@ func getPollingIntervalDuration(gitrepo *v1alpha1.GitRepo) time.Duration {
return gitrepo.Spec.PollingInterval.Duration
}

func result(repoPolled bool, gitrepo *v1alpha1.GitRepo) reconcile.Result {
if repoPolled {
return reconcile.Result{RequeueAfter: getPollingIntervalDuration(gitrepo)}
}
return reconcile.Result{}
func (r *GitJobReconciler) result(gitrepo *v1alpha1.GitRepo) reconcile.Result {
// We always return a reconcile Result with RequeueAfter set to the polling interval
// unless polling is disabled.
// This is done to ensure the polling cycle is never broken due to race conditions
// between regular events and RequeueAfter events.
// Requeuing more events when there is already an event in the queue is not a problem
// because controller-runtime ignores events with higher timestamp
// For example, if we have an event in the queue that should be executed at time X
// and we try to enqueue another event that should be executed at time X+10 it will be
// dropped.
// If we try to enqueue an event at time X-10, it will replace the one in the queue.
// The queue will always keep the event that should be triggered earlier.
if gitrepo.Spec.DisablePolling {
return reconcile.Result{}
}

// Calculate next reconciliation schedule based on the elapsed time since the last polling
// so it matches the configured polling interval.
// A fixed value may lead to drifts due to out-of-schedule reconciliations.
requeueAfter := getPollingIntervalDuration(gitrepo) - r.Clock.Since(gitrepo.Status.LastPollingTime.Time)
if requeueAfter <= 0 {
// This is a protection for cases in which the calculation above is 0 or less.
// In those cases controller-runtime does not call AddAfter for this object and
// the RequeueAfter cycle is lost.
// To ensure that this cycle is not broken we force the object to be requeued.
return reconcile.Result{Requeue: true}
}
requeueAfter = addJitter(requeueAfter)
return reconcile.Result{RequeueAfter: requeueAfter}
}

func webhookCommitChangedPredicate() predicate.Predicate {
Expand Down

0 comments on commit 4ba5b4b

Please sign in to comment.