Skip to content

Commit

Permalink
Machine: ignore attached Volumes referred by pods ignored during drain
Browse files Browse the repository at this point in the history
  • Loading branch information
chrischdi committed Oct 1, 2024
1 parent 274d7e2 commit 151fd64
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 5 deletions.
11 changes: 11 additions & 0 deletions internal/controllers/machine/drain/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ func (l *PodDeleteList) Pods() []*corev1.Pod {
return pods
}

// IgnoredPods returns a list of Pods that have to be ignored before the Node can be considered completely drained.
func (l *PodDeleteList) IgnoredPods() []*corev1.Pod {
pods := []*corev1.Pod{}
for _, i := range l.items {
if !i.Status.Delete {
pods = append(pods, i.Pod)
}
}
return pods
}

func (l *PodDeleteList) errors() []error {
failedPods := make(map[string][]string)
for _, i := range l.items {
Expand Down
182 changes: 179 additions & 3 deletions internal/controllers/machine/machine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ import (

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -808,12 +810,52 @@ func (r *Reconciler) shouldWaitForNodeVolumes(ctx context.Context, cluster *clus
return false, nil
}

if len(node.Status.VolumesAttached) != 0 {
log.Info("Waiting for Node volumes to be detached")
waiter, err := newAttachedVolumeHelper(ctx, remoteClient, node)
if err != nil {
return true, err
}

if !waiter.hasAttachedVolumes() {
// No attached volumes to wait for.
return false, nil
}

// Get all PVCs which are okay to ignore because they got skipped during drain
pvcsToIgnoreFromPods, err := getPersistentVolumeClaimsFromIgnoredPods(ctx, remoteClient, nodeName)
if err != nil {
return true, err
}

if len(pvcsToIgnoreFromPods) == 0 {
// There are attached volumes to wait for and none to ignore.
return true, nil
}

return false, nil
// Get all PersistentVolumes which are referred by Pods ignored during drain via PersistentVolumeClaims.
pvs, err := getPersistentVolumes(ctx, remoteClient, func(pv *corev1.PersistentVolume) bool {
if pv.Spec.ClaimRef != nil && pv.Spec.ClaimRef.Kind == "PersistentVolumeClaim" {
if pvcsToIgnoreFromPods.Has(types.NamespacedName{Namespace: pv.Spec.ClaimRef.Namespace, Name: pv.Spec.ClaimRef.Name}) {
return true
}
}
return false
})
if err != nil {
return true, err
}

// Add all of this PVs to the ignore list.
for _, pv := range pvs {
waiter.ignore(pv)
}

if !waiter.hasAttachedVolumes() {
// No attached volumes to wait for.
return false, nil
}

log.Info("Waiting for Node volumes to be detached")
return true, nil
}

func (r *Reconciler) deleteNode(ctx context.Context, cluster *clusterv1.Cluster, name string) error {
Expand Down Expand Up @@ -1016,3 +1058,137 @@ func (r *Reconciler) nodeToMachine(ctx context.Context, o client.Object) []recon

return nil
}

type attachedVolumeHelper struct {
// attachedVolumeHandles contains VolumeHandles extracted from Node.Status.VolumesAttached.
attachedVolumeHandles sets.Set[string]
// attachedPVNames contains names of PersistentVolumes mapped from VolumeAttachments.
attachedPVNames sets.Set[string]

ignoredVolumeHandles sets.Set[string]
ignoredPVNames sets.Set[string]
}

func newAttachedVolumeHelper(ctx context.Context, remoteClient client.Client, node *corev1.Node) (*attachedVolumeHelper, error) {
w := &attachedVolumeHelper{
attachedVolumeHandles: sets.Set[string]{},
attachedPVNames: sets.Set[string]{},
}

for _, attachedVolume := range node.Status.VolumesAttached {
// PVs from CSI have a string like `kubernetes.io/csi/<driver>^<volumeHandle>`
splitted := strings.Split(string(attachedVolume.Name), "^")
if len(splitted) != 2 {
return nil, errors.Errorf("unable to extract VolumeHandle from node's VolumesAttached name %q", attachedVolume.Name)
}

w.attachedPVNames.Insert(splitted[1])
}

volumeAttachments, err := getVolumeAttachments(ctx, remoteClient, func(va *storagev1.VolumeAttachment) bool {
return va.Status.Attached && va.Spec.NodeName == node.GetName()
})
if err != nil {
return nil, err
}

for _, va := range volumeAttachments {
if va.Spec.Source.PersistentVolumeName == nil {
return nil, errors.Errorf("PersistentVolumeName for VolumeAttachment %s is nil", va.GetName())
}
w.attachedPVNames.Insert(*va.Spec.Source.PersistentVolumeName)
}

return w, nil
}

func (w *attachedVolumeHelper) hasAttachedVolumes() bool {
return len(w.attachedVolumeHandles.Difference(w.ignoredVolumeHandles)) != 0 || len(w.attachedPVNames.Difference(w.ignoredPVNames)) != 0
}

func (w *attachedVolumeHelper) ignore(pv *corev1.PersistentVolume) {
w.ignoredPVNames.Insert(pv.GetName())
w.ignoredVolumeHandles.Insert(pv.Spec.CSI.VolumeHandle)
}

func getPersistentVolumeClaimsFromIgnoredPods(ctx context.Context, remoteClient client.Client, nodeName string) (sets.Set[types.NamespacedName], error) {
drainHelper := drain.Helper{
Client: remoteClient,
}

pods, err := drainHelper.GetPodsForEviction(ctx, nodeName)
if err != nil {
return nil, err
}

ignoredPods := pods.IgnoredPods()

pvcsToIgnore := sets.Set[types.NamespacedName]{}

for _, pod := range ignoredPods {
for _, volume := range pod.Spec.Volumes {
if volume.PersistentVolumeClaim == nil {
continue
}

key := types.NamespacedName{Namespace: pod.GetNamespace(), Name: volume.PersistentVolumeClaim.ClaimName}
pvcsToIgnore.Insert(key)
}
}

return pvcsToIgnore, nil
}

func getVolumeAttachments(ctx context.Context, c client.Client, filter func(*storagev1.VolumeAttachment) bool) ([]*storagev1.VolumeAttachment, error) {
volumeAttachments := []*storagev1.VolumeAttachment{}
volumeAttachmentList := &storagev1.VolumeAttachmentList{}
for {
listOpts := []client.ListOption{
client.Continue(volumeAttachmentList.GetContinue()),
client.Limit(100),
}
if err := c.List(ctx, volumeAttachmentList, listOpts...); err != nil {
return nil, errors.Wrapf(err, "failed to list %T", volumeAttachmentList)
}

for _, volumeAttachment := range volumeAttachmentList.Items {
if filter != nil && !filter(&volumeAttachment) {
continue
}
volumeAttachments = append(volumeAttachments, &volumeAttachment)
}

if volumeAttachmentList.GetContinue() == "" {
break
}
}

return volumeAttachments, nil
}

func getPersistentVolumes(ctx context.Context, c client.Client, filter func(*corev1.PersistentVolume) bool) ([]*corev1.PersistentVolume, error) {
persistentVolumes := []*corev1.PersistentVolume{}
persistentVolumeList := &corev1.PersistentVolumeList{}
for {
listOpts := []client.ListOption{
client.Continue(persistentVolumeList.GetContinue()),
client.Limit(100),
}
if err := c.List(ctx, persistentVolumeList, listOpts...); err != nil {
return nil, errors.Wrapf(err, "failed to list %T", persistentVolumeList)
}

for _, persistentVolume := range persistentVolumeList.Items {
if filter != nil && !filter(&persistentVolume) {
continue
}
persistentVolumes = append(persistentVolumes, &persistentVolume)
}

if persistentVolumeList.GetContinue() == "" {
break
}
}

return persistentVolumes, nil
}
11 changes: 9 additions & 2 deletions internal/controllers/machine/machine_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"k8s.io/utils/ptr"
Expand Down Expand Up @@ -1985,14 +1986,15 @@ func TestShouldWaitForNodeVolumes(t *testing.T) {

attachedVolumes := []corev1.AttachedVolume{
{
Name: "test-volume",
Name: "test-volume^foo",
DevicePath: "test-path",
},
}

tests := []struct {
name string
node *corev1.Node
objs *runtime.Object
expected bool
}{
{
Expand Down Expand Up @@ -2073,7 +2075,8 @@ func TestShouldWaitForNodeVolumes(t *testing.T) {
var objs []client.Object
objs = append(objs, testCluster, tt.node)

c := fake.NewClientBuilder().WithObjects(objs...).Build()
c := fake.NewClientBuilder().WithIndex(&corev1.Pod{}, "spec.nodeName", nodeNameIndex).
WithObjects(objs...).Build()
tracker := remote.NewTestClusterCacheTracker(ctrl.Log, c, c, fakeScheme, client.ObjectKeyFromObject(testCluster))
r := &Reconciler{
Client: c,
Expand All @@ -2087,6 +2090,10 @@ func TestShouldWaitForNodeVolumes(t *testing.T) {
}
}

func nodeNameIndex(o client.Object) []string {
return []string{o.(*corev1.Pod).Spec.NodeName}
}

func TestIsDeleteNodeAllowed(t *testing.T) {
deletionts := metav1.Now()

Expand Down
5 changes: 5 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/spf13/pflag"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -122,6 +123,7 @@ var (
func init() {
_ = clientgoscheme.AddToScheme(scheme)
_ = apiextensionsv1.AddToScheme(scheme)
_ = storagev1.AddToScheme(scheme)

_ = clusterv1alpha3.AddToScheme(scheme)
_ = clusterv1alpha4.AddToScheme(scheme)
Expand Down Expand Up @@ -412,6 +414,9 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager, watchNamespaces map
// Don't cache Pods & DaemonSets (we get/list them e.g. during drain).
&corev1.Pod{},
&appsv1.DaemonSet{},
// Don;t cache PersistentVolumes and VolumeAttachments (we get/list them e.g. during wait for volumes to detach)
&storagev1.VolumeAttachment{},
&corev1.PersistentVolumeClaim{},
},
Indexes: []remote.Index{remote.NodeProviderIDIndex},
ClientQPS: clusterCacheTrackerClientQPS,
Expand Down

0 comments on commit 151fd64

Please sign in to comment.