Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🌱 clustercache: do not use RequeueAfter when hitting ErrClusterNotConnected #11736

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,7 @@ func (r *KubeadmConfigReconciler) Reconcile(ctx context.Context, req ctrl.Reques
return ctrl.Result{}, nil
}

res, err := r.reconcile(ctx, scope, cluster, config, configOwner)
if err != nil && errors.Is(err, clustercache.ErrClusterNotConnected) {
log.V(5).Info("Requeuing because connection to the workload cluster is down")
return ctrl.Result{RequeueAfter: time.Minute}, nil
}
return res, err
return r.reconcile(ctx, scope, cluster, config, configOwner)
}

func (r *KubeadmConfigReconciler) reconcile(ctx context.Context, scope *Scope, cluster *clusterv1.Cluster, config *bootstrapv1.KubeadmConfig, configOwner *bsutil.ConfigOwner) (ctrl.Result, error) {
Expand Down
14 changes: 2 additions & 12 deletions controlplane/kubeadm/internal/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,21 +255,11 @@ func (r *KubeadmControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl.

if !kcp.ObjectMeta.DeletionTimestamp.IsZero() {
// Handle deletion reconciliation loop.
res, err = r.reconcileDelete(ctx, controlPlane)
if errors.Is(err, clustercache.ErrClusterNotConnected) {
log.V(5).Info("Requeuing because connection to the workload cluster is down")
return ctrl.Result{RequeueAfter: time.Minute}, nil
}
return res, err
return r.reconcileDelete(ctx, controlPlane)
}

// Handle normal reconciliation loop.
res, err = r.reconcile(ctx, controlPlane)
if errors.Is(err, clustercache.ErrClusterNotConnected) {
log.V(5).Info("Requeuing because connection to the workload cluster is down")
return ctrl.Result{RequeueAfter: time.Minute}, nil
}
return res, err
return r.reconcile(ctx, controlPlane)
}

// initControlPlaneScope initializes the control plane scope; this includes also checking for orphan machines and
Expand Down
17 changes: 1 addition & 16 deletions exp/addons/internal/controllers/clusterresourceset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,17 +183,9 @@ func (r *ClusterResourceSetReconciler) Reconcile(ctx context.Context, req ctrl.R
}

errs := []error{}
errClusterNotConnectedOccurred := false
for _, cluster := range clusters {
if err := r.ApplyClusterResourceSet(ctx, cluster, clusterResourceSet); err != nil {
// Requeue if the reconcile failed because connection to workload cluster was down.
if errors.Is(err, clustercache.ErrClusterNotConnected) {
log.V(5).Info("Requeuing because connection to the workload cluster is down")
errClusterNotConnectedOccurred = true
} else {
// Append the error if the error is not ErrClusterLocked.
errs = append(errs, err)
}
errs = append(errs, err)
}
}

Expand All @@ -218,13 +210,6 @@ func (r *ClusterResourceSetReconciler) Reconcile(ctx context.Context, req ctrl.R
return ctrl.Result{}, kerrors.NewAggregate(errs)
}

// Requeue if ErrClusterNotConnected was returned for one of the clusters.
if errClusterNotConnectedOccurred {
// Requeue after a minute to not end up in exponential delayed requeue which
// could take up to 16m40s.
return ctrl.Result{RequeueAfter: time.Minute}, nil
}

return ctrl.Result{}, nil
}

Expand Down
17 changes: 2 additions & 15 deletions exp/internal/controllers/machinepool_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,28 +228,15 @@ func (r *MachinePoolReconciler) Reconcile(ctx context.Context, req ctrl.Request)

// Handle deletion reconciliation loop.
if !mp.ObjectMeta.DeletionTimestamp.IsZero() {
err := r.reconcileDelete(ctx, cluster, mp)
// Requeue if the reconcile failed because connection to workload cluster was down.
if errors.Is(err, clustercache.ErrClusterNotConnected) {
log.V(5).Info("Requeuing because connection to the workload cluster is down")
return ctrl.Result{RequeueAfter: time.Minute}, nil
}

return ctrl.Result{}, err
return ctrl.Result{}, r.reconcileDelete(ctx, cluster, mp)
}

// Handle normal reconciliation loop.
scope := &scope{
cluster: cluster,
machinePool: mp,
}
res, err := r.reconcile(ctx, scope)
// Requeue if the reconcile failed because connection to workload cluster was down.
if errors.Is(err, clustercache.ErrClusterNotConnected) {
log.V(5).Info("Requeuing because connection to the workload cluster is down")
return ctrl.Result{RequeueAfter: time.Minute}, nil
}
return res, err
return r.reconcile(ctx, scope)
}

func (r *MachinePoolReconciler) reconcile(ctx context.Context, s *scope) (ctrl.Result, error) {
Expand Down
38 changes: 6 additions & 32 deletions internal/controllers/machine/machine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re
return ctrl.Result{}, err
}

log := ctrl.LoggerFrom(ctx).WithValues("Cluster", klog.KRef(m.Namespace, m.Spec.ClusterName))
ctx = ctrl.LoggerInto(ctx, log)
ctx = ctrl.LoggerInto(ctx, ctrl.LoggerFrom(ctx).WithValues("Cluster", klog.KRef(m.Namespace, m.Spec.ClusterName)))

// Add finalizer first if not set to avoid the race condition between init and delete.
if finalizerAdded, err := finalizers.EnsureFinalizer(ctx, r.Client, m, clusterv1.MachineFinalizer); err != nil || finalizerAdded {
Expand All @@ -208,7 +207,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re

// AddOwners adds the owners of Machine as k/v pairs to the logger.
// Specifically, it will add KubeadmControlPlane, MachineSet and MachineDeployment.
ctx, log, err := clog.AddOwners(ctx, r.Client, m)
ctx, _, err := clog.AddOwners(ctx, r.Client, m)
if err != nil {
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -273,23 +272,11 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re
r.reconcileDelete,
)

res, err := doReconcile(ctx, reconcileDelete, s)
// Requeue if the reconcile failed because connection to workload cluster was down.
if errors.Is(err, clustercache.ErrClusterNotConnected) {
log.V(5).Info("Requeuing because connection to the workload cluster is down")
return ctrl.Result{RequeueAfter: time.Minute}, nil
}
return res, err
return doReconcile(ctx, reconcileDelete, s)
}

// Handle normal reconciliation loop.
res, err := doReconcile(ctx, alwaysReconcile, s)
// Requeue if the reconcile failed because connection to workload cluster was down.
if errors.Is(err, clustercache.ErrClusterNotConnected) {
log.V(5).Info("Requeuing because connection to the workload cluster is down")
return ctrl.Result{RequeueAfter: time.Minute}, nil
}
return res, err
return doReconcile(ctx, alwaysReconcile, s)
}

func patchMachine(ctx context.Context, patchHelper *patch.Helper, machine *clusterv1.Machine, options ...patch.Option) error {
Expand Down Expand Up @@ -816,14 +803,7 @@ func (r *Reconciler) drainNode(ctx context.Context, s *scope) (ctrl.Result, erro

remoteClient, err := r.ClusterCache.GetClient(ctx, util.ObjectKey(cluster))
if err != nil {
if errors.Is(err, clustercache.ErrClusterNotConnected) {
log.V(5).Info("Requeuing drain Node because connection to the workload cluster is down")
s.deletingReason = clusterv1.MachineDeletingDrainingNodeV1Beta2Reason
s.deletingMessage = "Requeuing drain Node because connection to the workload cluster is down"
return ctrl.Result{RequeueAfter: time.Minute}, nil
}
log.Error(err, "Error creating a remote client for cluster while draining Node, won't retry")
return ctrl.Result{}, nil
return ctrl.Result{}, errors.Wrapf(err, "failed to drain Node %s", nodeName)
}

node := &corev1.Node{}
Expand Down Expand Up @@ -989,15 +969,9 @@ func (r *Reconciler) shouldWaitForNodeVolumes(ctx context.Context, s *scope) (ct
}

func (r *Reconciler) deleteNode(ctx context.Context, cluster *clusterv1.Cluster, name string) error {
log := ctrl.LoggerFrom(ctx)

remoteClient, err := r.ClusterCache.GetClient(ctx, util.ObjectKey(cluster))
if err != nil {
if errors.Is(err, clustercache.ErrClusterNotConnected) {
return errors.Wrapf(err, "failed deleting Node because connection to the workload cluster is down")
}
log.Error(err, "Error creating a remote client for cluster while deleting Node, won't retry")
return nil
return errors.Wrapf(err, "failed deleting Node because connection to the workload cluster is down")
}

node := &corev1.Node{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,19 +188,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re
}
m.Labels[clusterv1.ClusterNameLabel] = m.Spec.ClusterName

result, err := r.reconcile(ctx, log, cluster, m)
if err != nil {
// Requeue if the reconcile failed because connection to workload cluster was down.
if errors.Is(err, clustercache.ErrClusterNotConnected) {
log.V(5).Info("Requeuing because connection to the workload cluster is down")
return ctrl.Result{RequeueAfter: time.Minute}, nil
}

// Requeue immediately if any errors occurred
return ctrl.Result{}, err
}

return result, nil
return r.reconcile(ctx, log, cluster, m)
}

func (r *Reconciler) reconcile(ctx context.Context, logger logr.Logger, cluster *clusterv1.Cluster, m *clusterv1.MachineHealthCheck) (ctrl.Result, error) {
Expand Down
20 changes: 3 additions & 17 deletions internal/controllers/machineset/machineset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (retres ct
return ctrl.Result{}, err
}

log := ctrl.LoggerFrom(ctx).WithValues("Cluster", klog.KRef(machineSet.Namespace, machineSet.Spec.ClusterName))
ctx = ctrl.LoggerInto(ctx, log)
ctx = ctrl.LoggerInto(ctx, ctrl.LoggerFrom(ctx).WithValues("Cluster", klog.KRef(machineSet.Namespace, machineSet.Spec.ClusterName)))

// Add finalizer first if not set to avoid the race condition between init and delete.
if finalizerAdded, err := finalizers.EnsureFinalizer(ctx, r.Client, machineSet, clusterv1.MachineSetFinalizer); err != nil || finalizerAdded {
Expand All @@ -178,7 +177,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (retres ct

// AddOwners adds the owners of MachineSet as k/v pairs to the logger.
// Specifically, it will add MachineDeployment.
ctx, log, err := clog.AddOwners(ctx, r.Client, machineSet)
ctx, _, err := clog.AddOwners(ctx, r.Client, machineSet)
if err != nil {
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -257,20 +256,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (retres ct
wrapErrMachineSetReconcileFunc(r.syncReplicas, "failed to sync replicas"),
)

result, kerr := doReconcile(ctx, s, reconcileNormal)
if kerr != nil {
// Requeue if the reconcile failed because connection to workload cluster was down.
if errors.Is(kerr, clustercache.ErrClusterNotConnected) {
if len(kerr.Errors()) > 1 {
log.Error(kerr, "Requeuing because connection to the workload cluster is down")
} else {
log.V(5).Info("Requeuing because connection to the workload cluster is down")
}
return ctrl.Result{RequeueAfter: time.Minute}, nil
}
err = kerr
}
return result, err
return doReconcile(ctx, s, reconcileNormal)
}

type scope struct {
Expand Down
12 changes: 1 addition & 11 deletions internal/controllers/topology/cluster/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,6 @@ func (r *Reconciler) SetupForDryRun(recorder record.EventRecorder) {
}

func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Result, reterr error) {
log := ctrl.LoggerFrom(ctx)

// Fetch the Cluster instance.
cluster := &clusterv1.Cluster{}
if err := r.Client.Get(ctx, req.NamespacedName, cluster); err != nil {
Expand Down Expand Up @@ -340,15 +338,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re
}

// Handle normal reconciliation loop.
result, err := r.reconcile(ctx, s)
if err != nil {
// Requeue if the reconcile failed because connection to workload cluster was down.
if errors.Is(err, clustercache.ErrClusterNotConnected) {
log.V(5).Info("Requeuing because connection to the workload cluster is down")
return ctrl.Result{RequeueAfter: time.Minute}, nil
}
}
return result, err
return r.reconcile(ctx, s)
}

// reconcile handles cluster reconciliation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
Expand Down Expand Up @@ -70,8 +71,7 @@ type DockerMachineReconciler struct {
// +kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch

// Reconcile handles DockerMachine events.
func (r *DockerMachineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Result, rerr error) {
log := ctrl.LoggerFrom(ctx)
func (r *DockerMachineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Result, reterr error) {
ctx = container.RuntimeInto(ctx, r.ContainerRuntime)

// Fetch the DockerMachine instance.
Expand Down Expand Up @@ -150,10 +150,7 @@ func (r *DockerMachineReconciler) Reconcile(ctx context.Context, req ctrl.Reques
// Always attempt to Patch the DockerMachine object and status after each reconciliation.
defer func() {
if err := patchDockerMachine(ctx, patchHelper, dockerMachine); err != nil {
log.Error(err, "Failed to patch DockerMachine")
if rerr == nil {
rerr = err
}
reterr = kerrors.NewAggregate([]error{reterr, err})
}
}()

Expand Down Expand Up @@ -188,14 +185,7 @@ func (r *DockerMachineReconciler) Reconcile(ctx context.Context, req ctrl.Reques
}

// Handle non-deleted machines
res, err := r.reconcileNormal(ctx, cluster, dockerCluster, machine, dockerMachine, externalMachine, externalLoadBalancer)
// Requeue if the reconcile failed because the ClusterCacheTracker was locked for
// the current cluster because of concurrent access.
if errors.Is(err, clustercache.ErrClusterNotConnected) {
log.V(5).Info("Requeuing because connection to the workload cluster is down")
return ctrl.Result{RequeueAfter: time.Minute}, nil
}
return res, err
return r.reconcileNormal(ctx, cluster, dockerCluster, machine, dockerMachine, externalMachine, externalLoadBalancer)
}

func patchDockerMachine(ctx context.Context, patchHelper *patch.Helper, dockerMachine *infrav1.DockerMachine) error {
Expand Down
Loading