diff --git a/internal/controllers/machine/drain/filters.go b/internal/controllers/machine/drain/filters.go index 1851bd453104..5fff8679f427 100644 --- a/internal/controllers/machine/drain/filters.go +++ b/internal/controllers/machine/drain/filters.go @@ -62,6 +62,17 @@ func (l *PodDeleteList) Pods() []*corev1.Pod { return pods } +// IgnoredPods returns a list of Pods that have to be ignored before the Node can be considered completely drained. +func (l *PodDeleteList) IgnoredPods() []*corev1.Pod { + pods := []*corev1.Pod{} + for _, i := range l.items { + if !i.Status.Delete { + pods = append(pods, i.Pod) + } + } + return pods +} + func (l *PodDeleteList) errors() []error { failedPods := make(map[string][]string) for _, i := range l.items { diff --git a/internal/controllers/machine/machine_controller.go b/internal/controllers/machine/machine_controller.go index 777f4f9053c9..52a95301642c 100644 --- a/internal/controllers/machine/machine_controller.go +++ b/internal/controllers/machine/machine_controller.go @@ -24,11 +24,13 @@ import ( "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" kerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" @@ -808,12 +810,52 @@ func (r *Reconciler) shouldWaitForNodeVolumes(ctx context.Context, cluster *clus return false, nil } - if len(node.Status.VolumesAttached) != 0 { - log.Info("Waiting for Node volumes to be detached") + waiter, err := newAttachedVolumeHelper(ctx, remoteClient, node) + if err != nil { + return true, err + } + + if !waiter.hasAttachedVolumes() { + // No attached volumes to wait for. + return false, nil + } + + // Get all PVCs which are okay to ignore because they got skipped during drain + pvcsToIgnoreFromPods, err := getPersistentVolumeClaimsFromIgnoredPods(ctx, remoteClient, nodeName) + if err != nil { + return true, err + } + + if len(pvcsToIgnoreFromPods) == 0 { + // There are attached volumes to wait for and none to ignore. return true, nil } - return false, nil + // Get all PersistentVolumes which are referred by Pods ignored during drain via PersistentVolumeClaims. + pvs, err := getPersistentVolumes(ctx, remoteClient, func(pv *corev1.PersistentVolume) bool { + if pv.Spec.ClaimRef != nil && pv.Spec.ClaimRef.Kind == "PersistentVolumeClaim" { + if pvcsToIgnoreFromPods.Has(types.NamespacedName{Namespace: pv.Spec.ClaimRef.Namespace, Name: pv.Spec.ClaimRef.Name}) { + return true + } + } + return false + }) + if err != nil { + return true, err + } + + // Add all of this PVs to the ignore list. + for _, pv := range pvs { + waiter.ignore(pv) + } + + if !waiter.hasAttachedVolumes() { + // No attached volumes to wait for. + return false, nil + } + + log.Info("Waiting for Node volumes to be detached") + return true, nil } func (r *Reconciler) deleteNode(ctx context.Context, cluster *clusterv1.Cluster, name string) error { @@ -1016,3 +1058,137 @@ func (r *Reconciler) nodeToMachine(ctx context.Context, o client.Object) []recon return nil } + +type attachedVolumeHelper struct { + // attachedVolumeHandles contains VolumeHandles extracted from Node.Status.VolumesAttached. + attachedVolumeHandles sets.Set[string] + // attachedPVNames contains names of PersistentVolumes mapped from VolumeAttachments. + attachedPVNames sets.Set[string] + + ignoredVolumeHandles sets.Set[string] + ignoredPVNames sets.Set[string] +} + +func newAttachedVolumeHelper(ctx context.Context, remoteClient client.Client, node *corev1.Node) (*attachedVolumeHelper, error) { + w := &attachedVolumeHelper{ + attachedVolumeHandles: sets.Set[string]{}, + attachedPVNames: sets.Set[string]{}, + } + + for _, attachedVolume := range node.Status.VolumesAttached { + // PVs from CSI have a string like `kubernetes.io/csi/^` + splitted := strings.Split(string(attachedVolume.Name), "^") + if len(splitted) != 2 { + return nil, errors.Errorf("unable to extract VolumeHandle from node's VolumesAttached name %q", attachedVolume.Name) + } + + w.attachedPVNames.Insert(splitted[1]) + } + + volumeAttachments, err := getVolumeAttachments(ctx, remoteClient, func(va *storagev1.VolumeAttachment) bool { + return va.Status.Attached && va.Spec.NodeName == node.GetName() + }) + if err != nil { + return nil, err + } + + for _, va := range volumeAttachments { + if va.Spec.Source.PersistentVolumeName == nil { + return nil, errors.Errorf("PersistentVolumeName for VolumeAttachment %s is nil", va.GetName()) + } + w.attachedPVNames.Insert(*va.Spec.Source.PersistentVolumeName) + } + + return w, nil +} + +func (w *attachedVolumeHelper) hasAttachedVolumes() bool { + return len(w.attachedVolumeHandles.Difference(w.ignoredVolumeHandles)) != 0 || len(w.attachedPVNames.Difference(w.ignoredPVNames)) != 0 +} + +func (w *attachedVolumeHelper) ignore(pv *corev1.PersistentVolume) { + w.ignoredPVNames.Insert(pv.GetName()) + w.ignoredVolumeHandles.Insert(pv.Spec.CSI.VolumeHandle) +} + +func getPersistentVolumeClaimsFromIgnoredPods(ctx context.Context, remoteClient client.Client, nodeName string) (sets.Set[types.NamespacedName], error) { + drainHelper := drain.Helper{ + Client: remoteClient, + } + + pods, err := drainHelper.GetPodsForEviction(ctx, nodeName) + if err != nil { + return nil, err + } + + ignoredPods := pods.IgnoredPods() + + pvcsToIgnore := sets.Set[types.NamespacedName]{} + + for _, pod := range ignoredPods { + for _, volume := range pod.Spec.Volumes { + if volume.PersistentVolumeClaim == nil { + continue + } + + key := types.NamespacedName{Namespace: pod.GetNamespace(), Name: volume.PersistentVolumeClaim.ClaimName} + pvcsToIgnore.Insert(key) + } + } + + return pvcsToIgnore, nil +} + +func getVolumeAttachments(ctx context.Context, c client.Client, filter func(*storagev1.VolumeAttachment) bool) ([]*storagev1.VolumeAttachment, error) { + volumeAttachments := []*storagev1.VolumeAttachment{} + volumeAttachmentList := &storagev1.VolumeAttachmentList{} + for { + listOpts := []client.ListOption{ + client.Continue(volumeAttachmentList.GetContinue()), + client.Limit(100), + } + if err := c.List(ctx, volumeAttachmentList, listOpts...); err != nil { + return nil, errors.Wrapf(err, "failed to list %T", volumeAttachmentList) + } + + for _, volumeAttachment := range volumeAttachmentList.Items { + if filter != nil && !filter(&volumeAttachment) { + continue + } + volumeAttachments = append(volumeAttachments, &volumeAttachment) + } + + if volumeAttachmentList.GetContinue() == "" { + break + } + } + + return volumeAttachments, nil +} + +func getPersistentVolumes(ctx context.Context, c client.Client, filter func(*corev1.PersistentVolume) bool) ([]*corev1.PersistentVolume, error) { + persistentVolumes := []*corev1.PersistentVolume{} + persistentVolumeList := &corev1.PersistentVolumeList{} + for { + listOpts := []client.ListOption{ + client.Continue(persistentVolumeList.GetContinue()), + client.Limit(100), + } + if err := c.List(ctx, persistentVolumeList, listOpts...); err != nil { + return nil, errors.Wrapf(err, "failed to list %T", persistentVolumeList) + } + + for _, persistentVolume := range persistentVolumeList.Items { + if filter != nil && !filter(&persistentVolume) { + continue + } + persistentVolumes = append(persistentVolumes, &persistentVolume) + } + + if persistentVolumeList.GetContinue() == "" { + break + } + } + + return persistentVolumes, nil +} diff --git a/internal/controllers/machine/machine_controller_test.go b/internal/controllers/machine/machine_controller_test.go index ec72dee7cc50..7f84744d50ed 100644 --- a/internal/controllers/machine/machine_controller_test.go +++ b/internal/controllers/machine/machine_controller_test.go @@ -29,6 +29,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/record" "k8s.io/utils/ptr" @@ -1985,7 +1986,7 @@ func TestShouldWaitForNodeVolumes(t *testing.T) { attachedVolumes := []corev1.AttachedVolume{ { - Name: "test-volume", + Name: "test-volume^foo", DevicePath: "test-path", }, } @@ -1993,6 +1994,7 @@ func TestShouldWaitForNodeVolumes(t *testing.T) { tests := []struct { name string node *corev1.Node + objs *runtime.Object expected bool }{ { @@ -2073,7 +2075,8 @@ func TestShouldWaitForNodeVolumes(t *testing.T) { var objs []client.Object objs = append(objs, testCluster, tt.node) - c := fake.NewClientBuilder().WithObjects(objs...).Build() + c := fake.NewClientBuilder().WithIndex(&corev1.Pod{}, "spec.nodeName", nodeNameIndex). + WithObjects(objs...).Build() tracker := remote.NewTestClusterCacheTracker(ctrl.Log, c, c, fakeScheme, client.ObjectKeyFromObject(testCluster)) r := &Reconciler{ Client: c, @@ -2087,6 +2090,10 @@ func TestShouldWaitForNodeVolumes(t *testing.T) { } } +func nodeNameIndex(o client.Object) []string { + return []string{o.(*corev1.Pod).Spec.NodeName} +} + func TestIsDeleteNodeAllowed(t *testing.T) { deletionts := metav1.Now() diff --git a/main.go b/main.go index cc0c7a2bb428..5d302a84dc79 100644 --- a/main.go +++ b/main.go @@ -28,6 +28,7 @@ import ( "github.com/spf13/pflag" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -122,6 +123,7 @@ var ( func init() { _ = clientgoscheme.AddToScheme(scheme) _ = apiextensionsv1.AddToScheme(scheme) + _ = storagev1.AddToScheme(scheme) _ = clusterv1alpha3.AddToScheme(scheme) _ = clusterv1alpha4.AddToScheme(scheme) @@ -412,6 +414,9 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager, watchNamespaces map // Don't cache Pods & DaemonSets (we get/list them e.g. during drain). &corev1.Pod{}, &appsv1.DaemonSet{}, + // Don;t cache PersistentVolumes and VolumeAttachments (we get/list them e.g. during wait for volumes to detach) + &storagev1.VolumeAttachment{}, + &corev1.PersistentVolumeClaim{}, }, Indexes: []remote.Index{remote.NodeProviderIDIndex}, ClientQPS: clusterCacheTrackerClientQPS,