diff --git a/ais/s3/mpt.go b/ais/s3/mpt.go index 0d1316f7412..15179b30917 100644 --- a/ais/s3/mpt.go +++ b/ais/s3/mpt.go @@ -22,7 +22,6 @@ import ( type ( MptPart struct { MD5 string // MD5 of the part (*) - Etag string // returned by s3 FQN string // FQN of the corresponding workfile Size int64 // part size in bytes (*) Num int64 // part number (*) diff --git a/ais/tgts3.go b/ais/tgts3.go index 6917b03cf90..b4325d7a02d 100644 --- a/ais/tgts3.go +++ b/ais/tgts3.go @@ -6,6 +6,7 @@ package ais import ( "context" + "errors" "fmt" "net/http" "strconv" @@ -24,6 +25,8 @@ import ( "github.com/NVIDIA/aistore/fs" ) +const fmtErrBckObj = "invalid %s request: expecting bucket and object (names) in the URL, have %v" + // [METHOD] /s3 func (t *target) s3Handler(w http.ResponseWriter, r *http.Request) { if cmn.Rom.FastV(5, cos.SmoduleS3) { @@ -33,6 +36,11 @@ func (t *target) s3Handler(w http.ResponseWriter, r *http.Request) { if err != nil { return } + if l := len(apiItems); (l == 0 && r.Method == http.MethodGet) || l < 2 { + err := fmt.Errorf(fmtErrBckObj, r.Method, apiItems) + s3.WriteErr(w, r, err, 0) + return + } switch r.Method { case http.MethodHead: @@ -73,18 +81,20 @@ func (t *target) putCopyMpt(w http.ResponseWriter, r *http.Request, config *cmn. switch { case q.Has(s3.QparamMptPartNo) && q.Has(s3.QparamMptUploadID): if r.Header.Get(cos.S3HdrObjSrc) != "" { - if cmn.Rom.FastV(5, cos.SmoduleS3) { - nlog.Infoln("putMptCopy", items) - } - t.putMptCopy(w, r, items) - } else { - if cmn.Rom.FastV(5, cos.SmoduleS3) { - nlog.Infoln("putMptPart", bck.String(), items, q) - } - t.putMptPart(w, r, items, q, bck) + // TODO: copy another object (or its range) => part of the specified multipart upload. + // https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPartCopy.html + s3.WriteErr(w, r, errors.New("UploadPartCopy not implemented yet"), http.StatusNotImplemented) + return + } + if cmn.Rom.FastV(5, cos.SmoduleS3) { + nlog.Infoln("putMptPart", bck.String(), items, q) } + t.putMptPart(w, r, items, q, bck) case r.Header.Get(cos.S3HdrObjSrc) == "": - t.putObjS3(w, r, bck, config, items) + objName := s3.ObjName(items) + lom := core.AllocLOM(objName) + t.putObjS3(w, r, bck, config, lom) + core.FreeLOM(lom) default: t.copyObjS3(w, r, config, items) } @@ -93,10 +103,6 @@ func (t *target) putCopyMpt(w http.ResponseWriter, r *http.Request, config *cmn. // Copy object (maybe from another bucket) // https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html func (t *target) copyObjS3(w http.ResponseWriter, r *http.Request, config *cmn.Config, items []string) { - if len(items) < 2 { - s3.WriteErr(w, r, errS3Obj, 0) - return - } src := r.Header.Get(cos.S3HdrObjSrc) src = strings.Trim(src, "/") // in AWS examples the path starts with "/" parts := strings.SplitN(src, "/", 2) @@ -174,14 +180,7 @@ func (t *target) copyObjS3(w http.ResponseWriter, r *http.Request, config *cmn.C sgl.Free() } -func (t *target) putObjS3(w http.ResponseWriter, r *http.Request, bck *meta.Bck, config *cmn.Config, items []string) { - if len(items) < 2 { - s3.WriteErr(w, r, errS3Obj, 0) - return - } - objName := s3.ObjName(items) - lom := core.AllocLOM(objName) - defer core.FreeLOM(lom) +func (t *target) putObjS3(w http.ResponseWriter, r *http.Request, bck *meta.Bck, config *cmn.Config, lom *core.LOM) { if err := lom.InitBck(bck.Bucket()); err != nil { if cmn.IsErrRemoteBckNotFound(err) { t.BMDVersionFixup(r) @@ -239,7 +238,8 @@ func (t *target) getObjS3(w http.ResponseWriter, r *http.Request, items []string return } if len(items) < 2 { - s3.WriteErr(w, r, errS3Obj, 0) + err := fmt.Errorf(fmtErrBckObj, r.Method, items) + s3.WriteErr(w, r, err, 0) return } objName := s3.ObjName(items) @@ -276,10 +276,6 @@ func (t *target) getObjS3(w http.ResponseWriter, r *http.Request, items []string // HEAD /s3// (TODO: s3.HdrMptCnt) // See: https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html func (t *target) headObjS3(w http.ResponseWriter, r *http.Request, items []string) { - if len(items) < 2 { - s3.WriteErr(w, r, errS3Obj, 0) - return - } bucket, objName := items[0], s3.ObjName(items) bck, err, errCode := meta.InitByNameOnly(bucket, t.owner.bmd) if err != nil { @@ -348,10 +344,6 @@ func (t *target) delObjS3(w http.ResponseWriter, r *http.Request, items []string s3.WriteErr(w, r, err, errCode) return } - if len(items) < 2 { - s3.WriteErr(w, r, errS3Obj, 0) - return - } objName := s3.ObjName(items) lom := core.AllocLOM(objName) defer core.FreeLOM(lom) @@ -382,11 +374,6 @@ func (t *target) postObjS3(w http.ResponseWriter, r *http.Request, items []strin } q := r.URL.Query() if q.Has(s3.QparamMptUploads) { - if len(items) < 2 { - err := fmt.Errorf(fmtErrBO, items) - s3.WriteErr(w, r, err, 0) - return - } if cmn.Rom.FastV(5, cos.SmoduleS3) { nlog.Infoln("startMpt", bck.String(), items, q) } @@ -394,11 +381,6 @@ func (t *target) postObjS3(w http.ResponseWriter, r *http.Request, items []strin return } if q.Has(s3.QparamMptUploadID) { - if len(items) < 2 { - err := fmt.Errorf(fmtErrBO, items) - s3.WriteErr(w, r, err, 0) - return - } if cmn.Rom.FastV(5, cos.SmoduleS3) { nlog.Infoln("completeMpt", bck.String(), items, q) } diff --git a/ais/tgts3mpt.go b/ais/tgts3mpt.go index 5e80e51cf14..f8107a84086 100644 --- a/ais/tgts3mpt.go +++ b/ais/tgts3mpt.go @@ -28,8 +28,6 @@ import ( "github.com/NVIDIA/aistore/fs" ) -const fmtErrBO = "to complete multipart upload both bucket and object names are required (have %v)" - func isRemoteS3(bck *meta.Bck) bool { if bck.Provider == apc.AWS { return true @@ -38,6 +36,16 @@ func isRemoteS3(bck *meta.Bck) bool { return b != nil && b.Provider == apc.AWS } +func multiWriter(writers ...io.Writer) io.Writer { + a := make([]io.Writer, 0, 3) + for _, w := range writers { + if w != nil { + a = append(a, w) + } + } + return io.MultiWriter(a...) +} + // Initialize multipart upload. // - Generate UUID for the upload // - Return the UUID to a caller @@ -74,22 +82,6 @@ func (t *target) startMpt(w http.ResponseWriter, r *http.Request, items []string sgl.Free() } -// Copy another object or its range as a part of the multipart upload. -// Body is empty, everything in the query params and the header. -// https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPartCopy.html -// TODO: not implemented yet -func (*target) putMptCopy(w http.ResponseWriter, r *http.Request, items []string) { - if len(items) < 2 { - err := fmt.Errorf(fmtErrBO, items) - s3.WriteErr(w, r, err, 0) - return - } - - // TODO -- FIXME: add WTP - - s3.WriteErr(w, r, errors.New("not implemented yet"), http.StatusNotImplemented) -} - // PUT a part of the multipart upload. // Body is empty, everything in the query params and the header. // @@ -98,11 +90,6 @@ func (*target) putMptCopy(w http.ResponseWriter, r *http.Request, items []string // // https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPart.html func (t *target) putMptPart(w http.ResponseWriter, r *http.Request, items []string, q url.Values, bck *meta.Bck) { - if len(items) < 2 { - err := fmt.Errorf(fmtErrBO, items) - s3.WriteErr(w, r, err, 0) - return - } // 1. parse/validate uploadID := q.Get(s3.QparamMptUploadID) if uploadID == "" { @@ -125,10 +112,6 @@ func (t *target) putMptPart(w http.ResponseWriter, r *http.Request, items []stri s3.WriteErr(w, r, err, 0) return } - if r.Header.Get(cos.S3HdrObjSrc) != "" { - s3.WriteErr(w, r, errors.New("uploading a copy is not supported yet"), http.StatusNotImplemented) - return - } // 2. init lom, create part file objName := s3.ObjName(items) @@ -140,7 +123,7 @@ func (t *target) putMptPart(w http.ResponseWriter, r *http.Request, items []stri // workfile name format: .. prefix := uploadID + "." + strconv.FormatInt(partNum, 10) wfqn := fs.CSM.Gen(lom, fs.WorkfileType, prefix) - fh, errC := lom.CreateFileRW(wfqn) + partFh, errC := lom.CreateFileRW(wfqn) if errC != nil { s3.WriteErr(w, r, errC, 0) return @@ -148,31 +131,32 @@ func (t *target) putMptPart(w http.ResponseWriter, r *http.Request, items []stri // 3. write var ( - mwriter io.Writer - partSHA, etag string - cksumSHA *cos.CksumHash - // TODO -- FIXME: md5 if !isRemoteS3(bck) || (configured); otherwise, use the part's ETag (`etag`) - cksumMD5 = cos.NewCksumHash(cos.ChecksumMD5) - buf, slab = t.gmm.Alloc() + etag string + partSHA string errCode int + buf, slab = t.gmm.Alloc() + cksumSHA = &cos.CksumHash{} + cksumMD5 = &cos.CksumHash{} + remote = isRemoteS3(bck) ) if partSHA = r.Header.Get(cos.S3HdrContentSHA256); partSHA != "" { cksumSHA = cos.NewCksumHash(cos.ChecksumSHA256) - mwriter = io.MultiWriter(cksumMD5.H, cksumSHA.H, fh) - } else { - mwriter = io.MultiWriter(cksumMD5.H, fh) } - size, err := io.CopyBuffer(mwriter, r.Body, buf) + if !remote { + cksumMD5 = cos.NewCksumHash(cos.ChecksumMD5) + } + mw := multiWriter(cksumMD5.H, cksumSHA.H, partFh) + size, err := io.CopyBuffer(mw, r.Body, buf) slab.Free(buf) // 4. rewind and call s3 API - if err == nil && isRemoteS3(bck) { - if _, err = fh.Seek(0, io.SeekStart); err == nil { - etag, errCode, err = backend.PutMptPart(lom, fh, uploadID, partNum, size) // TODO: include md5? + if err == nil && remote { + if _, err = partFh.Seek(0, io.SeekStart); err == nil { + etag, errCode, err = backend.PutMptPart(lom, partFh, uploadID, partNum, size) } } - cos.Close(fh) + cos.Close(partFh) if err != nil { if nerr := cos.RemoveFile(wfqn); nerr != nil { nlog.Errorf(fmtNested, t, err, "remove", wfqn, nerr) @@ -181,18 +165,14 @@ func (t *target) putMptPart(w http.ResponseWriter, r *http.Request, items []stri return } - // 5. validate md5 and finalize the part - cksumMD5.Finalize() - md5 := cksumMD5.Value() - if etag != "" { - if etag != md5 { - err := fmt.Errorf("upload %q: bad MD5/ETag: %s, part number %d, MD5 %s, ETag %s", - uploadID, lom.Cname(), partNum, md5, etag) - s3.WriteErr(w, r, err, 0) - return - } + // 5. finalize part + // expecting the part's remote etag to be md5 checksum, not computing otherwise + md5 := etag + if cksumMD5.H != nil { + debug.Assert(etag == "") + cksumMD5.Finalize() + md5 = cksumMD5.Value() } - if partSHA != "" { cksumSHA.Finalize() recvSHA := cos.NewCksum(cos.ChecksumSHA256, partSHA) @@ -203,10 +183,8 @@ func (t *target) putMptPart(w http.ResponseWriter, r *http.Request, items []stri return } } - npart := &s3.MptPart{ MD5: md5, - Etag: etag, FQN: wfqn, Size: size, Num: partNum, @@ -215,7 +193,7 @@ func (t *target) putMptPart(w http.ResponseWriter, r *http.Request, items []stri s3.WriteErr(w, r, err, 0) return } - w.Header().Set(cos.S3CksumHeader, cksumMD5.Value()) // s3cmd checks this one + w.Header().Set(cos.S3CksumHeader, md5) // s3cmd checks this one } // Complete multipart upload. @@ -238,7 +216,7 @@ func (t *target) completeMpt(w http.ResponseWriter, r *http.Request, items []str return } if len(partList.Parts) == 0 { - s3.WriteErr(w, r, errors.New("empty list of upload parts"), 0) + s3.WriteErr(w, r, fmt.Errorf("upload %q: empty list of upload parts", uploadID), 0) return } objName := s3.ObjName(items) @@ -247,13 +225,19 @@ func (t *target) completeMpt(w http.ResponseWriter, r *http.Request, items []str s3.WriteErr(w, r, err, 0) return } + size, errN := s3.ObjSize(uploadID) + if errN != nil { + s3.WriteErr(w, r, errN, 0) + return + } // call s3 var ( etag string started = time.Now() + remote = isRemoteS3(bck) ) - if isRemoteS3(bck) { + if remote { v, errCode, err := backend.CompleteMpt(lom, uploadID, partList) if err != nil { s3.WriteErr(w, r, err, errCode) @@ -264,9 +248,9 @@ func (t *target) completeMpt(w http.ResponseWriter, r *http.Request, items []str // append parts and finalize locally var ( - mwriter io.Writer - concatMD5 string // => ETag - actualMD5 = cos.NewCksumHash(cos.ChecksumMD5) + mw io.Writer + concatMD5 string // => ETag + actualCksum = &cos.CksumHash{} ) // .1 sort and check parts sort.Slice(partList.Parts, func(i, j int) bool { @@ -277,11 +261,7 @@ func (t *target) completeMpt(w http.ResponseWriter, r *http.Request, items []str s3.WriteErr(w, r, err, 0) return } - // .2 append all parts and, separately, their respective MD5s - buf, slab := t.gmm.Alloc() - defer slab.Free(buf) - - // .complete. + // 2. .complete. prefix := uploadID + ".complete" wfqn := fs.CSM.Gen(lom, fs.WorkfileType, prefix) wfh, errC := lom.CreateFile(wfqn) @@ -289,31 +269,37 @@ func (t *target) completeMpt(w http.ResponseWriter, r *http.Request, items []str s3.WriteErr(w, r, errC, 0) return } - mwriter = io.MultiWriter(actualMD5.H, wfh) + if remote && lom.CksumConf().Type != cos.ChecksumNone { + actualCksum = cos.NewCksumHash(lom.CksumConf().Type) + } else { + actualCksum = cos.NewCksumHash(cos.ChecksumMD5) + } + mw = multiWriter(actualCksum.H, wfh) // .3 write - for _, partInfo := range nparts { - concatMD5 += partInfo.MD5 - partFh, err := os.Open(partInfo.FQN) - if err != nil { - cos.Close(wfh) - s3.WriteErr(w, r, err, 0) - return - } - if _, err := io.CopyBuffer(mwriter, partFh, buf); err != nil { - cos.Close(wfh) - cos.Close(partFh) - s3.WriteErr(w, r, err, 0) - return + buf, slab := t.gmm.Alloc() + concatMD5, written, errA := _appendMpt(nparts, buf, mw) + slab.Free(buf) + cos.Close(wfh) + if errA == nil && written != size { + errA = fmt.Errorf("upload %q %q: expected full size=%d, got %d", uploadID, lom.Cname(), size, written) + } + if errA != nil { + if nerr := cos.RemoveFile(wfqn); nerr != nil { + nlog.Errorf(fmtNested, t, err, "remove", wfqn, nerr) } - cos.Close(partFh) + s3.WriteErr(w, r, errA, 0) + return } - cos.Close(wfh) // .4 (s3 client => ais://) compute resulting MD5 and, optionally, ETag - actualMD5.Finalize() - + if actualCksum.H != nil { + actualCksum.Finalize() + lom.SetCksum(actualCksum.Cksum.Clone()) + } if etag == "" { + debug.Assert(!remote) + debug.Assert(concatMD5 != "") resMD5 := cos.NewCksumHash(cos.ChecksumMD5) _, err = resMD5.H.Write([]byte(concatMD5)) debug.AssertNoErr(err) @@ -322,14 +308,8 @@ func (t *target) completeMpt(w http.ResponseWriter, r *http.Request, items []str } // .5 finalize - size, errN := s3.ObjSize(uploadID) - if errN != nil { - s3.WriteErr(w, r, errN, 0) - return - } lom.SetSize(size) lom.SetCustomKey(cmn.ETag, etag) - lom.SetCksum(actualMD5.Cksum.Clone()) poi := allocPOI() { @@ -365,6 +345,26 @@ func (t *target) completeMpt(w http.ResponseWriter, r *http.Request, items []str sgl.Free() } +func _appendMpt(nparts []*s3.MptPart, buf []byte, mw io.Writer) (concatMD5 string, written int64, err error) { + for _, partInfo := range nparts { + var ( + partFh *os.File + partSize int64 + ) + concatMD5 += partInfo.MD5 + if partFh, err = os.Open(partInfo.FQN); err != nil { + return "", 0, err + } + partSize, err = io.CopyBuffer(mw, partFh, buf) + cos.Close(partFh) + if err != nil { + return "", 0, err + } + written += partSize + } + return concatMD5, written, nil +} + // Abort an active multipart upload. // Body is empty, only URL query contains uploadID // 1. uploadID must exists @@ -372,11 +372,6 @@ func (t *target) completeMpt(w http.ResponseWriter, r *http.Request, items []str // 3. Remove all info from in-memory structs // https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html func (t *target) abortMpt(w http.ResponseWriter, r *http.Request, items []string, q url.Values) { - if len(items) < 2 { - err := fmt.Errorf(fmtErrBO, items) - s3.WriteErr(w, r, err, 0) - return - } bck, err, errCode := meta.InitByNameOnly(items[0], t.owner.bmd) if err != nil { s3.WriteErr(w, r, err, errCode)