Skip to content

Commit

Permalink
Vdisk allocation fix (#1060)
Browse files Browse the repository at this point in the history
* We need to mark failed reservation as deleted

We also need to skip cache

* Support utilizing all disks for vdisks allocation

* run make vet

* if reservation in cache already in error no decrement of resources

* add size to error message
  • Loading branch information
Muhamad Azmy authored Nov 16, 2020
1 parent d020662 commit 34cd098
Show file tree
Hide file tree
Showing 7 changed files with 303 additions and 236 deletions.
40 changes: 29 additions & 11 deletions pkg/provision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,25 +235,31 @@ func (e *Engine) provision(ctx context.Context, r *Reservation) error {
return errors.Wrapf(err, "failed to build result object for reservation: %s", result.ID)
}

// we make sure we store the reservation in cache first before
// returning the reply back to the grid, this is to make sure
// if the reply failed for any reason, the node still doesn't
// try to redeploy that reservation.
r.ID = realID
r.Result = *result
if err := e.cache.Add(r); err != nil {
return errors.Wrapf(err, "failed to cache reservation %s locally", r.ID)
}

// send response to explorer
if err := e.reply(ctx, result); err != nil {
log.Error().Err(err).Msg("failed to send result to BCDB")
}

// we skip the counting.
// if we fail to decomission the reservation then must be marked
// as deleted so it's never tried again. we also skip caching
// the reservation object. this is similar to what decomission does
// since on a decomission we also clear up the cache.
if provisionError != nil {
// we need to mark the reservation as deleted as well
if err := e.feedback.Deleted(e.nodeID, realID); err != nil {
log.Error().Err(err).Msg("failed to mark failed reservation as deleted")
}

return provisionError
}

// we only cache successful reservations
r.ID = realID
r.Result = *result
if err := e.cache.Add(r); err != nil {
return errors.Wrapf(err, "failed to cache reservation %s locally", r.ID)
}

// If an update occurs on the network we don't increment the counter
if r.Type == "network_resource" {
nr := pkg.NetResource{}
Expand Down Expand Up @@ -304,6 +310,18 @@ func (e *Engine) decommission(ctx context.Context, r *Reservation) error {
r.ID = r.Reference
}

if r.Result.State == StateError {
// this reservation already failed to deploy
// this code here shouldn't be executing because if
// the reservation has error-ed, it means is should
// not be in cache.
// BUT
// that was not always the case, so instead we
// will just return. here
log.Warn().Str("id", realID).Msg("skipping reservation because it is not provisioned")
return nil
}

err = fn(ctx, r)
if err != nil {
return errors.Wrap(err, "decommissioning of reservation failed")
Expand Down
8 changes: 0 additions & 8 deletions pkg/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,6 @@ type VolumeAllocater interface {

// GetCacheFS return the special filesystem used by 0-OS to store internal state and flist cache
GetCacheFS() (Filesystem, error)

// GetVdiskFS return the filesystem used to store the vdisk file for the VM module
GetVdiskFS() (Filesystem, error)

// CanAllocate checks if the given subvolume name can use this given size
// it checks against physical disk space first if there is enough capacity
// to support this, and if okay, it checks against the subvolume quota limit if set
CanAllocate(name string, size uint64) (bool, error)
}

// VDisk info returned by a call to inspect
Expand Down
101 changes: 55 additions & 46 deletions pkg/storage/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,44 +19,50 @@ const (
)

type vdiskModule struct {
v pkg.VolumeAllocater
path string
module *Module
}

// NewVDiskModule creates a new disk allocator
func NewVDiskModule(v pkg.VolumeAllocater) (pkg.VDiskModule, error) {
fs, err := v.Path(vdiskVolumeName)
if errors.Is(err, os.ErrNotExist) {
fs, err = v.CreateFilesystem(vdiskVolumeName, 0, pkg.SSDDevice)
}
func NewVDiskModule(module *Module) (pkg.VDiskModule, error) {
return &vdiskModule{module: module}, nil
}

func (d *vdiskModule) findDisk(id string) (string, error) {
pools, err := d.module.VDiskPools()
if err != nil {
return nil, err
return "", errors.Wrapf(err, "failed to find disk with id '%s'", id)
}

for _, pool := range pools {
path, err := d.safePath(pool, id)
if err != nil {
return "", err
}

if _, err := os.Stat(path); err == nil {
// file exists
return path, nil
}
}

return &vdiskModule{v: v, path: filepath.Clean(fs.Path)}, nil
return "", os.ErrNotExist
}

// AllocateDisk with given size, return path to virtual disk (size in MB)
func (d *vdiskModule) Allocate(id string, size int64) (string, error) {
path, err := d.safePath(id)
if err != nil {
return "", err
}

if _, err := os.Stat(path); err == nil {
// file exists
path, err := d.findDisk(id)
if err == nil {
return path, errors.Wrapf(os.ErrExist, "disk with id '%s' already exists", id)
}

supported, err := d.v.CanAllocate(vdiskVolumeName, uint64(size))
base, err := d.module.VDiskFindCandidate(uint64(size))
if err != nil {
return path, errors.Wrap(err, "failed to check capacity for this disk allocation")
return "", errors.Wrapf(err, "failed to find a candidate to host vdisk of size '%d'", size)
}

if !supported {
// TODO: we need to find another disk on this node if possible
return path, fmt.Errorf("not enough space available for this disk size")
path, err = d.safePath(base, id)
if err != nil {
return "", err
}

file, err := os.Create(path)
Expand All @@ -69,13 +75,13 @@ func (d *vdiskModule) Allocate(id string, size int64) (string, error) {
return path, syscall.Fallocate(int(file.Fd()), 0, 0, size*mib)
}

func (d *vdiskModule) safePath(id string) (string, error) {
path := filepath.Join(d.path, id)
func (d *vdiskModule) safePath(base, id string) (string, error) {
path := filepath.Join(base, id)
// this to avoid passing an `injection` id like '../name'
// and end up deleting a file on the system. so only delete
// allocated disks
location := filepath.Dir(path)
if filepath.Clean(location) != d.path {
if filepath.Clean(location) != base {
return "", fmt.Errorf("invalid disk id: '%s'", id)
}

Expand All @@ -84,8 +90,10 @@ func (d *vdiskModule) safePath(id string) (string, error) {

// DeallocateVDisk removes a virtual disk
func (d *vdiskModule) Deallocate(id string) error {
path, err := d.safePath(id)
if err != nil {
path, err := d.findDisk(id)
if os.IsNotExist(err) {
return nil
} else if err != nil {
return err
}

Expand All @@ -98,24 +106,16 @@ func (d *vdiskModule) Deallocate(id string) error {

// DeallocateVDisk removes a virtual disk
func (d *vdiskModule) Exists(id string) bool {
path, err := d.safePath(id)

if err != nil {
// invalid ID
return false
}

_, err = os.Stat(path)
_, err := d.findDisk(id)

return err == nil
}

// Inspect return info about the disk
func (d *vdiskModule) Inspect(id string) (disk pkg.VDisk, err error) {
path, err := d.safePath(id)
path, err := d.findDisk(id)

if err != nil {
// invalid ID
return disk, err
}

Expand All @@ -130,21 +130,30 @@ func (d *vdiskModule) Inspect(id string) (disk pkg.VDisk, err error) {
}

func (d *vdiskModule) List() ([]pkg.VDisk, error) {
items, err := ioutil.ReadDir(d.path)
pools, err := d.module.VDiskPools()
if err != nil {
return nil, errors.Wrap(err, "failed to list virtual disks")
return nil, err
}
var disks []pkg.VDisk
for _, pool := range pools {

items, err := ioutil.ReadDir(pool)
if err != nil {
return nil, errors.Wrap(err, "failed to list virtual disks")
}

for _, item := range items {
if item.IsDir() {
continue
}

disks := make([]pkg.VDisk, 0, len(items))
for _, item := range items {
if item.IsDir() {
continue
disks = append(disks, pkg.VDisk{
Path: filepath.Join(pool, item.Name()),
Size: item.Size(),
})
}

disks = append(disks, pkg.VDisk{
Path: filepath.Join(d.path, item.Name()),
Size: item.Size(),
})
return disks, nil
}

return disks, nil
Expand Down
Loading

0 comments on commit 34cd098

Please sign in to comment.