Skip to content

Commit

Permalink
Merge pull request #710 from imeoer/nydusify-abort-upload
Browse files Browse the repository at this point in the history
nydusify: abort oss blob upload if conversion failed
  • Loading branch information
bergwolf authored Sep 20, 2022
2 parents 8aaeca9 + 45d65fb commit c5934c2
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 94 deletions.
1 change: 1 addition & 0 deletions contrib/nydusify/pkg/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Backend interface {
// TODO: Hopefully, we can pass `Layer` struct in, thus to be able to cook both
// file handle and file path.
Upload(ctx context.Context, blobID, blobPath string, blobSize int64, forcePush bool) (*ocispec.Descriptor, error)
Finalize(cancel bool) error
Check(blobID string) (bool, error)
Type() Type
}
Expand Down
200 changes: 107 additions & 93 deletions contrib/nydusify/pkg/backend/oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"net/http"
"os"
"strconv"
"sync"
"time"

"github.com/aliyun/aliyun-oss-go-sdk/oss"
Expand All @@ -23,16 +24,26 @@ import (
)

const (
splitPartsCount = 4
// Blob size bigger than 100MB, apply multiparts upload.
multipartsUploadThreshold = 100 * 1024 * 1024
// For multipart uploads, OSS has a maximum number of 10000 chunks,
// so we can only upload blob size of about 10000 * multipartChunkSize.
multipartChunkSize = 200 * 1024 * 1024 /// 200MB
)

type multipartStatus struct {
imur *oss.InitiateMultipartUploadResult
parts []oss.UploadPart
blobObjectKey string
crc64 uint64
crc64ErrChan chan error
}

type OSSBackend struct {
// OSS storage does not support directory. Therefore add a prefix to each object
// to make it a path-like object.
objectPrefix string
bucket *oss.Bucket
ms []multipartStatus
msMutex sync.Mutex
}

func newOSSBackend(rawConfig []byte) (*OSSBackend, error) {
Expand Down Expand Up @@ -96,36 +107,22 @@ func calcCrc64ECMA(path string) (uint64, error) {
return blobCrc64, nil
}

// Upload blob as image layer to oss backend.
// Depending on blob's size, upload it by multiparts method or the normal method.
// Verify integrity by calculate CRC64.
// Upload blob as image layer to oss backend and verify
// integrity by calculate CRC64.
func (b *OSSBackend) Upload(ctx context.Context, blobID, blobPath string, size int64, forcePush bool) (*ocispec.Descriptor, error) {
blobObjectKey := b.objectPrefix + blobID

desc := blobDesc(size, blobID)

if !forcePush {
if exist, err := b.bucket.IsObjectExist(blobObjectKey); err != nil {
return nil, err
return nil, errors.Wrap(err, "check object existence")
} else if exist {
logrus.Infof("Skip upload because blob exists: %s", blobID)
logrus.Infof("skip upload because blob exists: %s", blobID)
return &desc, nil
}
}

var stat os.FileInfo
stat, err := os.Stat(blobPath)
if err != nil {
return nil, err
}
blobSize := stat.Size()

var needMultiparts = false
// Blob size bigger than 100MB, apply multiparts upload.
if blobSize >= multipartsUploadThreshold {
needMultiparts = true
}

start := time.Now()
var crc64 uint64
crc64ErrChan := make(chan error, 1)
Expand All @@ -134,102 +131,119 @@ func (b *OSSBackend) Upload(ctx context.Context, blobID, blobPath string, size i
crc64, e = calcCrc64ECMA(blobPath)
crc64ErrChan <- e
}()

defer close(crc64ErrChan)

if needMultiparts {
logrus.Debugf("Upload %s using multiparts method", blobObjectKey)
chunks, err := oss.SplitFileByPartNum(blobPath, splitPartsCount)
if err != nil {
return nil, err
}
logrus.Debugf("upload %s using multipart method", blobObjectKey)
chunks, err := oss.SplitFileByPartSize(blobPath, multipartChunkSize)
if err != nil {
return nil, errors.Wrap(err, "split file by part size")
}

imur, err := b.bucket.InitiateMultipartUpload(blobObjectKey)
if err != nil {
return nil, err
}
imur, err := b.bucket.InitiateMultipartUpload(blobObjectKey)
if err != nil {
return nil, errors.Wrap(err, "initiate multipart upload")
}

// It always splits the blob into splitPartsCount=4 parts
partsChan := make(chan oss.UploadPart, splitPartsCount)
eg := new(errgroup.Group)
partsChan := make(chan oss.UploadPart, len(chunks))
for _, chunk := range chunks {
ck := chunk
eg.Go(func() error {
p, err := b.bucket.UploadPartFromFile(imur, blobPath, ck.Offset, ck.Size, ck.Number)
if err != nil {
return errors.Wrap(err, "upload part from file")
}
partsChan <- p
return nil
})
}

g := new(errgroup.Group)
for _, chunk := range chunks {
ck := chunk
g.Go(func() error {
p, err := b.bucket.UploadPartFromFile(imur, blobPath, ck.Offset, ck.Size, ck.Number)
if err != nil {
return err
}
partsChan <- p
return nil
})
if err := eg.Wait(); err != nil {
close(partsChan)
if err := b.bucket.AbortMultipartUpload(imur); err != nil {
return nil, errors.Wrap(err, "abort multipart upload")
}
return nil, errors.Wrap(err, "upload parts")
}
close(partsChan)

if err := g.Wait(); err != nil {
b.bucket.AbortMultipartUpload(imur)
close(partsChan)
return nil, errors.Wrap(err, "Uploading parts failed")
}
var parts []oss.UploadPart
for p := range partsChan {
parts = append(parts, p)
}

close(partsChan)
ms := multipartStatus{
imur: &imur,
parts: parts,
blobObjectKey: blobObjectKey,
crc64: crc64,
crc64ErrChan: crc64ErrChan,
}
b.msMutex.Lock()
defer b.msMutex.Unlock()
b.ms = append(b.ms, ms)

var parts []oss.UploadPart
for p := range partsChan {
parts = append(parts, p)
}
logrus.Debugf("uploaded blob %s, costs %s", blobObjectKey, time.Since(start))

_, err = b.bucket.CompleteMultipartUpload(imur, parts)
if err != nil {
return nil, err
return &desc, nil
}

func (b *OSSBackend) Finalize(cancel bool) error {
b.msMutex.Lock()
defer b.msMutex.Unlock()

for _, ms := range b.ms {
if cancel {
// If there is any failure during conversion process, it will
// cause the uploaded blob to be left on oss, and these blobs
// are hard to be GC-ed, so we need always to use the multipart
// upload, and should call the `AbortMultipartUpload` method to
// prevent blob residue as much as possible once any error happens
// during conversion process.
if err := b.bucket.AbortMultipartUpload(*ms.imur); err != nil {
logrus.WithError(err).Warn("abort multipart upload")
} else {
logrus.Warnf("blob upload has been aborted: %s", ms.blobObjectKey)
}
continue
}
} else {
reader, err := os.Open(blobPath)

_, err := b.bucket.CompleteMultipartUpload(*ms.imur, ms.parts)
if err != nil {
return nil, err
return errors.Wrap(err, "complete multipart upload")
}
defer reader.Close()
err = b.bucket.PutObject(blobObjectKey, reader)

props, err := b.bucket.GetObjectDetailedMeta(ms.blobObjectKey)
if err != nil {
return nil, err
return errors.Wrapf(err, "get object meta")
}
}

props, err := b.bucket.GetObjectDetailedMeta(blobObjectKey)
if err != nil {
return nil, errors.Wrapf(err, "get object meta")
}
// Try to validate blob object integrity if any crc64 value is returned.
if value, ok := props[http.CanonicalHeaderKey("x-oss-hash-crc64ecma")]; ok {
if len(value) == 1 {
uploadedCrc, err := strconv.ParseUint(value[0], 10, 64)
if err != nil {
return errors.Wrapf(err, "parse uploaded crc64")
}

// Try to validate blob object integrity if any crc64 value is returned.
if value, ok := props[http.CanonicalHeaderKey("x-oss-hash-crc64ecma")]; ok {
if len(value) == 1 {
uploadedCrc, err := strconv.ParseUint(value[0], 10, 64)
if err != nil {
return nil, errors.Wrapf(err, "parse uploaded crc64")
}
err = <-ms.crc64ErrChan
if err != nil {
return errors.Wrapf(err, "calculate crc64")
}

err = <-crc64ErrChan
if err != nil {
return nil, errors.Wrapf(err, "calculate crc64")
}
if uploadedCrc != ms.crc64 {
return errors.Errorf("crc64 mismatch, uploaded=%d, expected=%d", uploadedCrc, ms.crc64)
}

if uploadedCrc != crc64 {
return nil, errors.Errorf("CRC64 mismatch. Uploaded=%d, expected=%d", uploadedCrc, crc64)
} else {
logrus.Warnf("too many values, skip crc64 integrity check.")
}

} else {
logrus.Warnf("Too many values, skip crc64 integrity check.")
logrus.Warnf("no crc64 in header, skip crc64 integrity check.")
}
} else {
logrus.Warnf("No CRC64 in header, skip crc64 integrity check.")
}

// With OSS backend, no blob has to be pushed to registry, but have to push to build cache.

end := time.Now()
elapsed := end.Sub(start)
logrus.Debugf("Uploading blob %s costs %s", blobObjectKey, elapsed)

return &desc, nil
return nil
}

func (b *OSSBackend) Check(blobID string) (bool, error) {
Expand Down
4 changes: 3 additions & 1 deletion contrib/nydusify/pkg/backend/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ func (r *Registry) Upload(
}

return &desc, nil
}

func (r *Registry) Finalize(cancel bool) error {
return nil
}

func (r *Registry) Check(blobID string) (bool, error) {

return true, nil
}

Expand Down
7 changes: 7 additions & 0 deletions contrib/nydusify/pkg/converter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ func (cvt *Converter) convert(ctx context.Context) (retErr error) {

defer func() {
if retErr != nil {
if err := cvt.storageBackend.Finalize(true); err != nil {
logrus.WithError(err).Warnf("Cancel backend upload")
}
if repo != "" {
metrics.ConversionFailureCount(repo, "unknown")
}
Expand Down Expand Up @@ -347,6 +350,10 @@ func (cvt *Converter) convert(ctx context.Context) (retErr error) {
})
}

if err := cvt.storageBackend.Finalize(false); err != nil {
return errors.Wrap(err, "Finalize backend upload")
}

// Push OCI manifest, Nydus manifest and manifest index
mm := &manifestManager{
sourceProvider: sourceProvider,
Expand Down
4 changes: 4 additions & 0 deletions contrib/nydusify/pkg/packer/pusher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ func (m *mockBackend) Upload(ctx context.Context, blobID, blobPath string, blobS
return nil, args.Error(0)
}

func (m *mockBackend) Finalize(cancel bool) error {
return nil
}

func (m *mockBackend) Check(_ string) (bool, error) {
return false, nil
}
Expand Down

0 comments on commit c5934c2

Please sign in to comment.