diff --git a/internal/rbd/controllerserver.go b/internal/rbd/controllerserver.go index e3796b6611d..b39ad1d9925 100644 --- a/internal/rbd/controllerserver.go +++ b/internal/rbd/controllerserver.go @@ -559,7 +559,9 @@ func flattenTemporaryClonedImages(ctx context.Context, rbdVol *rbdVolume, cr *ut rbdVol.Monitors, rbdVol.RbdImageName, cr) - if err != nil { + if errors.Is(err, ErrFlattenInProgress) { + return status.Error(codes.Aborted, err.Error()) + } else if err != nil { return status.Error(codes.Internal, err.Error()) } @@ -995,7 +997,7 @@ func cleanupRBDImage(ctx context.Context, if inUse { log.ErrorLog(ctx, "rbd %s is still being used", rbdVol) - return nil, status.Errorf(codes.Internal, "rbd %s is still being used", rbdVol.RbdImageName) + return nil, status.Errorf(codes.FailedPrecondition, "rbd %s is still being used", rbdVol.RbdImageName) } // delete the temporary rbd image created as part of volume clone during @@ -1017,9 +1019,10 @@ func cleanupRBDImage(ctx context.Context, } } - // Deleting rbd image + // Deleting rbd image, it isn't a failure if the image was deleted already. log.DebugLog(ctx, "deleting image %s", rbdVol.RbdImageName) - if err = rbdVol.deleteImage(ctx); err != nil { + err = rbdVol.deleteImage(ctx) + if err != nil && !errors.Is(err, librbd.ErrNotFound) { log.ErrorLog(ctx, "failed to delete rbd image: %s with error: %v", rbdVol, err) @@ -1166,13 +1169,18 @@ func (cs *ControllerServer) CreateSnapshot( } }() + readyToUse := true + vol, err := cs.doSnapshotClone(ctx, rbdVol, rbdSnap, cr) - if err != nil { + if errors.Is(err, ErrFlattenInProgress) { + readyToUse = false + } else if err != nil { return nil, status.Error(codes.Internal, err.Error()) } // Update the metadata on snapshot not on the original image rbdVol.RbdImageName = rbdSnap.RbdSnapName + rbdVol.ImageID = vol.ImageID rbdVol.ClusterName = cs.ClusterName defer func() { @@ -1203,7 +1211,7 @@ func (cs *ControllerServer) CreateSnapshot( SnapshotId: vol.VolID, SourceVolumeId: req.GetSourceVolumeId(), CreationTime: vol.CreatedAt, - ReadyToUse: true, + ReadyToUse: readyToUse, }, }, nil } @@ -1234,10 +1242,12 @@ func cloneFromSnapshot( return nil, status.Error(codes.Internal, err.Error()) } + readyToUse := true + err = vol.flattenRbdImage(ctx, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth) if errors.Is(err, ErrFlattenInProgress) { // if flattening is in progress, return error and do not cleanup - return nil, status.Errorf(codes.Internal, err.Error()) + readyToUse = false } else if err != nil { uErr := undoSnapshotCloning(ctx, rbdVol, rbdSnap, vol, cr) if uErr != nil { @@ -1263,7 +1273,7 @@ func cloneFromSnapshot( SnapshotId: rbdSnap.VolID, SourceVolumeId: rbdSnap.SourceVolumeID, CreationTime: rbdSnap.CreatedAt, - ReadyToUse: true, + ReadyToUse: readyToUse, }, }, nil } diff --git a/internal/rbd/nodeserver.go b/internal/rbd/nodeserver.go index 773557e0c1d..2a0963b85b1 100644 --- a/internal/rbd/nodeserver.go +++ b/internal/rbd/nodeserver.go @@ -595,7 +595,7 @@ func flattenImageBeforeMapping( if err != nil { return err } - depth, err = volOptions.getCloneDepth(ctx) + depth, err = volOptions.getCloneDepth(rbdHardMaxCloneDepth + 1) if err != nil { return err } diff --git a/internal/rbd/rbd_util.go b/internal/rbd/rbd_util.go index 08f9f759889..6aa9ec9fdff 100644 --- a/internal/rbd/rbd_util.go +++ b/internal/rbd/rbd_util.go @@ -114,6 +114,8 @@ type rbdImage struct { NamePrefix string // ParentName represents the parent image name of the image. ParentName string + // ParentID represents the parent image ID of the image. + ParentID string // Parent Pool is the pool that contains the parent image. ParentPool string // Cluster name @@ -505,7 +507,14 @@ func (ri *rbdImage) open() (*librbd.Image, error) { return nil, err } - image, err := librbd.OpenImage(ri.ioctx, ri.RbdImageName, librbd.NoSnapshot) + var image *librbd.Image + + // try to open by id, that works for images in trash too + if ri.ImageID != "" { + image, err = librbd.OpenImageById(ri.ioctx, ri.ImageID, librbd.NoSnapshot) + } else { + image, err = librbd.OpenImage(ri.ioctx, ri.RbdImageName, librbd.NoSnapshot) + } if err != nil { if errors.Is(err, librbd.ErrNotFound) { err = util.JoinErrors(ErrImageNotFound, err) @@ -697,47 +706,80 @@ func (ri *rbdImage) trashRemoveImage(ctx context.Context) error { return nil } -func (ri *rbdImage) getCloneDepth(ctx context.Context) (uint, error) { - var depth uint - vol := rbdVolume{} +// getCloneDepth walks the parents of the image and returns the number of +// images in the chain. The `maxDepth` argument to the function is used to +// specify a limit of the depth to check. When `maxDepth` depth is reached, no +// further traversing of the depth is required. +// +// This function re-uses the ioctx of the image to open all images in the +// chain. There is no need to open new ioctx's for every image. +func (ri *rbdImage) getCloneDepth(maxDepth uint) (uint, error) { + var ( + depth uint + info *librbd.ParentInfo + ) - vol.Pool = ri.Pool - vol.Monitors = ri.Monitors - vol.RbdImageName = ri.RbdImageName - vol.RadosNamespace = ri.RadosNamespace - vol.conn = ri.conn.Copy() + image, err := ri.open() + if err != nil { + return 0, fmt.Errorf("failed to open image %s: %w", ri, err) + } + + // get the librbd.ImageSpec of the parent + info, err = image.GetParent() + + // Close this image, it is not used anymore. Using defer to close it + // and replacing the image with an other image can result in resource + // leaks according to golangci-lint. + image.Close() for { - if vol.RbdImageName == "" { - return depth, nil + if errors.Is(err, librbd.ErrNotFound) { + // image does not have a parent + break + } else if err != nil { + return 0, fmt.Errorf("failed to get parent of image %s at depth %d: %w", ri, depth, err) } - err := vol.openIoctx() - if err != nil { - return depth, err + + // if the parent is in trash, return maxDepth to trigger + // flattening + if info.Image.Trash { + return maxDepth, nil } - err = vol.getImageInfo() - // FIXME: create and destroy the vol inside the loop. - // see https://github.com/ceph/ceph-csi/pull/1838#discussion_r598530807 - vol.ioctx.Destroy() - vol.ioctx = nil - if err != nil { - // if the parent image is moved to trash the name will be present - // in rbd image info but the image will be in trash, in that case - // return the found depth - if errors.Is(err, ErrImageNotFound) { - return depth, nil - } - log.ErrorLog(ctx, "failed to check depth on image %s: %s", &vol, err) + // if there is a parent, count it to the depth + depth++ - return depth, err + // if maxDepth (usually hard-limit) is reached, further + // traversing parents is not needed, some action (flattening) + // will be triggered regardless of deeper depth + if depth == maxDepth { + break } - if vol.ParentName != "" { - depth++ + + // open the parent image, so that the for-loop can continue + // with checking for the parent of the parent + image, err = librbd.OpenImageByIdReadOnly(ri.ioctx, info.Image.ImageID, librbd.NoSnapshot) + if err != nil && errors.Is(err, librbd.ErrNotFound) { + // parent image does not exist, no parent after all + break + } else if err != nil { + imageSpec := info.Image.ImageID + if info.Snap.SnapName != librbd.NoSnapshot { + imageSpec += "@" + info.Snap.SnapName + } + + return 0, fmt.Errorf("failed to open parent image ID %q, at depth %d: %w", imageSpec, depth, err) } - vol.RbdImageName = vol.ParentName - vol.Pool = vol.ParentPool + + info, err = image.GetParent() + // info and err checking is done in the top of the for loop + + // Using defer in a for loop seems to be problematic. Just + // always close the image. + image.Close() } + + return depth, nil } type trashSnapInfo struct { @@ -803,7 +845,7 @@ func (ri *rbdImage) flattenRbdImage( // skip clone depth check if request is for force flatten if !forceFlatten { - depth, err = ri.getCloneDepth(ctx) + depth, err = ri.getCloneDepth(hardlimit) if err != nil { return err } @@ -830,7 +872,9 @@ func (ri *rbdImage) flattenRbdImage( _, err = ta.AddFlatten(admin.NewImageSpec(ri.Pool, ri.RadosNamespace, ri.RbdImageName)) rbdCephMgrSupported := isCephMgrSupported(ctx, ri.ClusterID, err) if rbdCephMgrSupported { - if err != nil { + // it is not possible to flatten an image that is in trash (ErrNotFound) + switch { + case err != nil && !errors.Is(err, rados.ErrNotFound): // discard flattening error if the image does not have any parent rbdFlattenNoParent := fmt.Sprintf("Image %s/%s does not have a parent", ri.Pool, ri.RbdImageName) if strings.Contains(err.Error(), rbdFlattenNoParent) { @@ -839,11 +883,11 @@ func (ri *rbdImage) flattenRbdImage( log.ErrorLog(ctx, "failed to add task flatten for %s : %v", ri, err) return err - } - if forceFlatten || depth >= hardlimit { + case forceFlatten || depth >= hardlimit: return fmt.Errorf("%w: flatten is in progress for image %s", ErrFlattenInProgress, ri.RbdImageName) + default: + log.DebugLog(ctx, "successfully added task to flatten image %q", ri) } - log.DebugLog(ctx, "successfully added task to flatten image %q", ri) } if !rbdCephMgrSupported { log.ErrorLog( @@ -875,6 +919,9 @@ func (ri *rbdImage) getParentName() (string, error) { return "", err } + ri.ParentName = parentInfo.Image.ImageName + ri.ParentID = parentInfo.Image.ImageID + return parentInfo.Image.ImageName, nil } @@ -1588,6 +1635,11 @@ func (ri *rbdImage) getImageInfo() error { // TODO: can rv.VolSize not be a uint64? Or initialize it to -1? ri.VolSize = int64(imageInfo.Size) + ri.ImageID, err = image.GetId() + if err != nil { + return err + } + features, err := image.GetFeatures() if err != nil { return err @@ -1601,11 +1653,13 @@ func (ri *rbdImage) getImageInfo() error { // the parent is an error or not. if errors.Is(err, librbd.ErrNotFound) { ri.ParentName = "" + ri.ParentID = "" } else { return err } } else { ri.ParentName = parentInfo.Image.ImageName + ri.ParentID = parentInfo.Image.ImageID ri.ParentPool = parentInfo.Image.PoolName } // Get image creation time @@ -1636,6 +1690,7 @@ func (ri *rbdImage) getParent() (*rbdImage, error) { parentImage.Pool = ri.ParentPool parentImage.RadosNamespace = ri.RadosNamespace parentImage.RbdImageName = ri.ParentName + parentImage.ImageID = ri.ParentID err = parentImage.getImageInfo() if err != nil { @@ -1692,6 +1747,7 @@ type rbdImageMetadataStash struct { Pool string `json:"pool"` RadosNamespace string `json:"radosNamespace"` ImageName string `json:"image"` + ImageID string `json:"imageID"` UnmapOptions string `json:"unmapOptions"` NbdAccess bool `json:"accessType"` Encrypted bool `json:"encrypted"` @@ -1721,6 +1777,7 @@ func stashRBDImageMetadata(volOptions *rbdVolume, metaDataPath string) error { Pool: volOptions.Pool, RadosNamespace: volOptions.RadosNamespace, ImageName: volOptions.RbdImageName, + ImageID: volOptions.ImageID, Encrypted: volOptions.isBlockEncrypted(), UnmapOptions: volOptions.UnmapOptions, } diff --git a/internal/rbd/snapshot.go b/internal/rbd/snapshot.go index bb420475c3d..85e5c08b00e 100644 --- a/internal/rbd/snapshot.go +++ b/internal/rbd/snapshot.go @@ -107,7 +107,17 @@ func generateVolFromSnap(rbdSnap *rbdSnapshot) *rbdVolume { vol.JournalPool = rbdSnap.JournalPool vol.RadosNamespace = rbdSnap.RadosNamespace vol.RbdImageName = rbdSnap.RbdSnapName - vol.ImageID = rbdSnap.ImageID + vol.ParentName = rbdSnap.ParentName + vol.ParentID = rbdSnap.ParentID + + // /!\ WARNING /!\ + // + // Do not set the ImageID to the ID of the snapshot, a new image will + // be created based on the returned rbdVolume. If the ImageID is set to + // the ID of the snapshot, accessing the new image by ID will actually + // access the snapshot! + // vol.ImageID = rbdSnap.ImageID + // copyEncryptionConfig cannot be used here because the volume and the // snapshot will have the same volumeID which cases the panic in // copyEncryptionConfig function.