Skip to content

Commit

Permalink
clustercache: do not use RequeueAfter when hitting ErrClusterNotConne…
Browse files Browse the repository at this point in the history
…cted and return error instead
  • Loading branch information
chrischdi committed Jan 23, 2025
1 parent a090145 commit 5d109d8
Show file tree
Hide file tree
Showing 9 changed files with 24 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,7 @@ func (r *KubeadmConfigReconciler) Reconcile(ctx context.Context, req ctrl.Reques
return ctrl.Result{}, nil
}

res, err := r.reconcile(ctx, scope, cluster, config, configOwner)
if err != nil && errors.Is(err, clustercache.ErrClusterNotConnected) {
log.V(5).Info("Requeuing because connection to the workload cluster is down")
return ctrl.Result{RequeueAfter: time.Minute}, nil
}
return res, err
return r.reconcile(ctx, scope, cluster, config, configOwner)
}

func (r *KubeadmConfigReconciler) reconcile(ctx context.Context, scope *Scope, cluster *clusterv1.Cluster, config *bootstrapv1.KubeadmConfig, configOwner *bsutil.ConfigOwner) (ctrl.Result, error) {
Expand Down
14 changes: 2 additions & 12 deletions controlplane/kubeadm/internal/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,21 +255,11 @@ func (r *KubeadmControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl.

if !kcp.ObjectMeta.DeletionTimestamp.IsZero() {
// Handle deletion reconciliation loop.
res, err = r.reconcileDelete(ctx, controlPlane)
if errors.Is(err, clustercache.ErrClusterNotConnected) {
log.V(5).Info("Requeuing because connection to the workload cluster is down")
return ctrl.Result{RequeueAfter: time.Minute}, nil
}
return res, err
return r.reconcileDelete(ctx, controlPlane)
}

// Handle normal reconciliation loop.
res, err = r.reconcile(ctx, controlPlane)
if errors.Is(err, clustercache.ErrClusterNotConnected) {
log.V(5).Info("Requeuing because connection to the workload cluster is down")
return ctrl.Result{RequeueAfter: time.Minute}, nil
}
return res, err
return r.reconcile(ctx, controlPlane)
}

// initControlPlaneScope initializes the control plane scope; this includes also checking for orphan machines and
Expand Down
17 changes: 1 addition & 16 deletions exp/addons/internal/controllers/clusterresourceset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,17 +183,9 @@ func (r *ClusterResourceSetReconciler) Reconcile(ctx context.Context, req ctrl.R
}

errs := []error{}
errClusterNotConnectedOccurred := false
for _, cluster := range clusters {
if err := r.ApplyClusterResourceSet(ctx, cluster, clusterResourceSet); err != nil {
// Requeue if the reconcile failed because connection to workload cluster was down.
if errors.Is(err, clustercache.ErrClusterNotConnected) {
log.V(5).Info("Requeuing because connection to the workload cluster is down")
errClusterNotConnectedOccurred = true
} else {
// Append the error if the error is not ErrClusterLocked.
errs = append(errs, err)
}
errs = append(errs, err)
}
}

Expand All @@ -218,13 +210,6 @@ func (r *ClusterResourceSetReconciler) Reconcile(ctx context.Context, req ctrl.R
return ctrl.Result{}, kerrors.NewAggregate(errs)
}

// Requeue if ErrClusterNotConnected was returned for one of the clusters.
if errClusterNotConnectedOccurred {
// Requeue after a minute to not end up in exponential delayed requeue which
// could take up to 16m40s.
return ctrl.Result{RequeueAfter: time.Minute}, nil
}

return ctrl.Result{}, nil
}

Expand Down
17 changes: 2 additions & 15 deletions exp/internal/controllers/machinepool_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,28 +228,15 @@ func (r *MachinePoolReconciler) Reconcile(ctx context.Context, req ctrl.Request)

// Handle deletion reconciliation loop.
if !mp.ObjectMeta.DeletionTimestamp.IsZero() {
err := r.reconcileDelete(ctx, cluster, mp)
// Requeue if the reconcile failed because connection to workload cluster was down.
if errors.Is(err, clustercache.ErrClusterNotConnected) {
log.V(5).Info("Requeuing because connection to the workload cluster is down")
return ctrl.Result{RequeueAfter: time.Minute}, nil
}

return ctrl.Result{}, err
return ctrl.Result{}, r.reconcileDelete(ctx, cluster, mp)
}

// Handle normal reconciliation loop.
scope := &scope{
cluster: cluster,
machinePool: mp,
}
res, err := r.reconcile(ctx, scope)
// Requeue if the reconcile failed because connection to workload cluster was down.
if errors.Is(err, clustercache.ErrClusterNotConnected) {
log.V(5).Info("Requeuing because connection to the workload cluster is down")
return ctrl.Result{RequeueAfter: time.Minute}, nil
}
return res, err
return r.reconcile(ctx, scope)
}

func (r *MachinePoolReconciler) reconcile(ctx context.Context, s *scope) (ctrl.Result, error) {
Expand Down
41 changes: 9 additions & 32 deletions internal/controllers/machine/machine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re
return ctrl.Result{}, err
}

log := ctrl.LoggerFrom(ctx).WithValues("Cluster", klog.KRef(m.Namespace, m.Spec.ClusterName))
ctx = ctrl.LoggerInto(ctx, log)
ctx = ctrl.LoggerInto(ctx, ctrl.LoggerFrom(ctx).WithValues("Cluster", klog.KRef(m.Namespace, m.Spec.ClusterName)))

// Add finalizer first if not set to avoid the race condition between init and delete.
if finalizerAdded, err := finalizers.EnsureFinalizer(ctx, r.Client, m, clusterv1.MachineFinalizer); err != nil || finalizerAdded {
Expand All @@ -208,7 +207,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re

// AddOwners adds the owners of Machine as k/v pairs to the logger.
// Specifically, it will add KubeadmControlPlane, MachineSet and MachineDeployment.
ctx, log, err := clog.AddOwners(ctx, r.Client, m)
ctx, _, err := clog.AddOwners(ctx, r.Client, m)
if err != nil {
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -273,23 +272,11 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re
r.reconcileDelete,
)

res, err := doReconcile(ctx, reconcileDelete, s)
// Requeue if the reconcile failed because connection to workload cluster was down.
if errors.Is(err, clustercache.ErrClusterNotConnected) {
log.V(5).Info("Requeuing because connection to the workload cluster is down")
return ctrl.Result{RequeueAfter: time.Minute}, nil
}
return res, err
return doReconcile(ctx, reconcileDelete, s)
}

// Handle normal reconciliation loop.
res, err := doReconcile(ctx, alwaysReconcile, s)
// Requeue if the reconcile failed because connection to workload cluster was down.
if errors.Is(err, clustercache.ErrClusterNotConnected) {
log.V(5).Info("Requeuing because connection to the workload cluster is down")
return ctrl.Result{RequeueAfter: time.Minute}, nil
}
return res, err
return doReconcile(ctx, alwaysReconcile, s)
}

func patchMachine(ctx context.Context, patchHelper *patch.Helper, machine *clusterv1.Machine, options ...patch.Option) error {
Expand Down Expand Up @@ -816,14 +803,10 @@ func (r *Reconciler) drainNode(ctx context.Context, s *scope) (ctrl.Result, erro

remoteClient, err := r.ClusterCache.GetClient(ctx, util.ObjectKey(cluster))
if err != nil {
if errors.Is(err, clustercache.ErrClusterNotConnected) {
log.V(5).Info("Requeuing drain Node because connection to the workload cluster is down")
s.deletingReason = clusterv1.MachineDeletingDrainingNodeV1Beta2Reason
s.deletingMessage = "Requeuing drain Node because connection to the workload cluster is down"
return ctrl.Result{RequeueAfter: time.Minute}, nil
}
log.Error(err, "Error creating a remote client for cluster while draining Node, won't retry")
return ctrl.Result{}, nil
log.V(5).Info("Waiting for Cluster connection to come up to drain the Node")
s.deletingReason = clusterv1.MachineDeletingDrainingNodeV1Beta2Reason
s.deletingMessage = "Waiting for Cluster connection to come up to drain the Node"
return ctrl.Result{}, err
}

node := &corev1.Node{}
Expand Down Expand Up @@ -989,15 +972,9 @@ func (r *Reconciler) shouldWaitForNodeVolumes(ctx context.Context, s *scope) (ct
}

func (r *Reconciler) deleteNode(ctx context.Context, cluster *clusterv1.Cluster, name string) error {
log := ctrl.LoggerFrom(ctx)

remoteClient, err := r.ClusterCache.GetClient(ctx, util.ObjectKey(cluster))
if err != nil {
if errors.Is(err, clustercache.ErrClusterNotConnected) {
return errors.Wrapf(err, "failed deleting Node because connection to the workload cluster is down")
}
log.Error(err, "Error creating a remote client for cluster while deleting Node, won't retry")
return nil
return errors.Wrapf(err, "failed deleting Node because connection to the workload cluster is down")
}

node := &corev1.Node{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,19 +188,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re
}
m.Labels[clusterv1.ClusterNameLabel] = m.Spec.ClusterName

result, err := r.reconcile(ctx, log, cluster, m)
if err != nil {
// Requeue if the reconcile failed because connection to workload cluster was down.
if errors.Is(err, clustercache.ErrClusterNotConnected) {
log.V(5).Info("Requeuing because connection to the workload cluster is down")
return ctrl.Result{RequeueAfter: time.Minute}, nil
}

// Requeue immediately if any errors occurred
return ctrl.Result{}, err
}

return result, nil
return r.reconcile(ctx, log, cluster, m)
}

func (r *Reconciler) reconcile(ctx context.Context, logger logr.Logger, cluster *clusterv1.Cluster, m *clusterv1.MachineHealthCheck) (ctrl.Result, error) {
Expand Down
20 changes: 3 additions & 17 deletions internal/controllers/machineset/machineset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (retres ct
return ctrl.Result{}, err
}

log := ctrl.LoggerFrom(ctx).WithValues("Cluster", klog.KRef(machineSet.Namespace, machineSet.Spec.ClusterName))
ctx = ctrl.LoggerInto(ctx, log)
ctx = ctrl.LoggerInto(ctx, ctrl.LoggerFrom(ctx).WithValues("Cluster", klog.KRef(machineSet.Namespace, machineSet.Spec.ClusterName)))

// Add finalizer first if not set to avoid the race condition between init and delete.
if finalizerAdded, err := finalizers.EnsureFinalizer(ctx, r.Client, machineSet, clusterv1.MachineSetFinalizer); err != nil || finalizerAdded {
Expand All @@ -178,7 +177,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (retres ct

// AddOwners adds the owners of MachineSet as k/v pairs to the logger.
// Specifically, it will add MachineDeployment.
ctx, log, err := clog.AddOwners(ctx, r.Client, machineSet)
ctx, _, err := clog.AddOwners(ctx, r.Client, machineSet)
if err != nil {
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -257,20 +256,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (retres ct
wrapErrMachineSetReconcileFunc(r.syncReplicas, "failed to sync replicas"),
)

result, kerr := doReconcile(ctx, s, reconcileNormal)
if kerr != nil {
// Requeue if the reconcile failed because connection to workload cluster was down.
if errors.Is(kerr, clustercache.ErrClusterNotConnected) {
if len(kerr.Errors()) > 1 {
log.Error(kerr, "Requeuing because connection to the workload cluster is down")
} else {
log.V(5).Info("Requeuing because connection to the workload cluster is down")
}
return ctrl.Result{RequeueAfter: time.Minute}, nil
}
err = kerr
}
return result, err
return doReconcile(ctx, s, reconcileNormal)
}

type scope struct {
Expand Down
12 changes: 1 addition & 11 deletions internal/controllers/topology/cluster/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,6 @@ func (r *Reconciler) SetupForDryRun(recorder record.EventRecorder) {
}

func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Result, reterr error) {
log := ctrl.LoggerFrom(ctx)

// Fetch the Cluster instance.
cluster := &clusterv1.Cluster{}
if err := r.Client.Get(ctx, req.NamespacedName, cluster); err != nil {
Expand Down Expand Up @@ -340,15 +338,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re
}

// Handle normal reconciliation loop.
result, err := r.reconcile(ctx, s)
if err != nil {
// Requeue if the reconcile failed because connection to workload cluster was down.
if errors.Is(err, clustercache.ErrClusterNotConnected) {
log.V(5).Info("Requeuing because connection to the workload cluster is down")
return ctrl.Result{RequeueAfter: time.Minute}, nil
}
}
return result, err
return r.reconcile(ctx, s)
}

// reconcile handles cluster reconciliation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
Expand Down Expand Up @@ -70,8 +71,7 @@ type DockerMachineReconciler struct {
// +kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch

// Reconcile handles DockerMachine events.
func (r *DockerMachineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Result, rerr error) {
log := ctrl.LoggerFrom(ctx)
func (r *DockerMachineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Result, reterr error) {
ctx = container.RuntimeInto(ctx, r.ContainerRuntime)

// Fetch the DockerMachine instance.
Expand Down Expand Up @@ -150,10 +150,7 @@ func (r *DockerMachineReconciler) Reconcile(ctx context.Context, req ctrl.Reques
// Always attempt to Patch the DockerMachine object and status after each reconciliation.
defer func() {
if err := patchDockerMachine(ctx, patchHelper, dockerMachine); err != nil {
log.Error(err, "Failed to patch DockerMachine")
if rerr == nil {
rerr = err
}
reterr = kerrors.NewAggregate([]error{reterr, err})
}
}()

Expand Down Expand Up @@ -188,14 +185,7 @@ func (r *DockerMachineReconciler) Reconcile(ctx context.Context, req ctrl.Reques
}

// Handle non-deleted machines
res, err := r.reconcileNormal(ctx, cluster, dockerCluster, machine, dockerMachine, externalMachine, externalLoadBalancer)
// Requeue if the reconcile failed because the ClusterCacheTracker was locked for
// the current cluster because of concurrent access.
if errors.Is(err, clustercache.ErrClusterNotConnected) {
log.V(5).Info("Requeuing because connection to the workload cluster is down")
return ctrl.Result{RequeueAfter: time.Minute}, nil
}
return res, err
return r.reconcileNormal(ctx, cluster, dockerCluster, machine, dockerMachine, externalMachine, externalLoadBalancer)
}

func patchDockerMachine(ctx context.Context, patchHelper *patch.Helper, dockerMachine *infrav1.DockerMachine) error {
Expand Down

0 comments on commit 5d109d8

Please sign in to comment.