Skip to content

Commit

Permalink
multipart upload: write-through all parts
Browse files Browse the repository at this point in the history
* do _not_ compute md5 when the bucket's backend is s3
  (neither parts nor whole object)
* refactor; reduce copy/paste
* always validate full size
* on error always remove wfnq
* part two, prev. commit: 333609e

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jan 28, 2024
1 parent 333609e commit f889d45
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 137 deletions.
1 change: 0 additions & 1 deletion ais/s3/mpt.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
type (
MptPart struct {
MD5 string // MD5 of the part (*)
Etag string // returned by s3
FQN string // FQN of the corresponding workfile
Size int64 // part size in bytes (*)
Num int64 // part number (*)
Expand Down
64 changes: 23 additions & 41 deletions ais/tgts3.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package ais

import (
"context"
"errors"
"fmt"
"net/http"
"strconv"
Expand All @@ -24,6 +25,8 @@ import (
"github.com/NVIDIA/aistore/fs"
)

const fmtErrBckObj = "invalid %s request: expecting bucket and object (names) in the URL, have %v"

// [METHOD] /s3
func (t *target) s3Handler(w http.ResponseWriter, r *http.Request) {
if cmn.Rom.FastV(5, cos.SmoduleS3) {
Expand All @@ -33,6 +36,11 @@ func (t *target) s3Handler(w http.ResponseWriter, r *http.Request) {
if err != nil {
return
}
if l := len(apiItems); (l == 0 && r.Method == http.MethodGet) || l < 2 {
err := fmt.Errorf(fmtErrBckObj, r.Method, apiItems)
s3.WriteErr(w, r, err, 0)
return
}

switch r.Method {
case http.MethodHead:
Expand Down Expand Up @@ -73,18 +81,20 @@ func (t *target) putCopyMpt(w http.ResponseWriter, r *http.Request, config *cmn.
switch {
case q.Has(s3.QparamMptPartNo) && q.Has(s3.QparamMptUploadID):
if r.Header.Get(cos.S3HdrObjSrc) != "" {
if cmn.Rom.FastV(5, cos.SmoduleS3) {
nlog.Infoln("putMptCopy", items)
}
t.putMptCopy(w, r, items)
} else {
if cmn.Rom.FastV(5, cos.SmoduleS3) {
nlog.Infoln("putMptPart", bck.String(), items, q)
}
t.putMptPart(w, r, items, q, bck)
// TODO: copy another object (or its range) => part of the specified multipart upload.
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPartCopy.html
s3.WriteErr(w, r, errors.New("UploadPartCopy not implemented yet"), http.StatusNotImplemented)
return
}
if cmn.Rom.FastV(5, cos.SmoduleS3) {
nlog.Infoln("putMptPart", bck.String(), items, q)
}
t.putMptPart(w, r, items, q, bck)
case r.Header.Get(cos.S3HdrObjSrc) == "":
t.putObjS3(w, r, bck, config, items)
objName := s3.ObjName(items)
lom := core.AllocLOM(objName)
t.putObjS3(w, r, bck, config, lom)
core.FreeLOM(lom)
default:
t.copyObjS3(w, r, config, items)
}
Expand All @@ -93,10 +103,6 @@ func (t *target) putCopyMpt(w http.ResponseWriter, r *http.Request, config *cmn.
// Copy object (maybe from another bucket)
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html
func (t *target) copyObjS3(w http.ResponseWriter, r *http.Request, config *cmn.Config, items []string) {
if len(items) < 2 {
s3.WriteErr(w, r, errS3Obj, 0)
return
}
src := r.Header.Get(cos.S3HdrObjSrc)
src = strings.Trim(src, "/") // in AWS examples the path starts with "/"
parts := strings.SplitN(src, "/", 2)
Expand Down Expand Up @@ -174,14 +180,7 @@ func (t *target) copyObjS3(w http.ResponseWriter, r *http.Request, config *cmn.C
sgl.Free()
}

func (t *target) putObjS3(w http.ResponseWriter, r *http.Request, bck *meta.Bck, config *cmn.Config, items []string) {
if len(items) < 2 {
s3.WriteErr(w, r, errS3Obj, 0)
return
}
objName := s3.ObjName(items)
lom := core.AllocLOM(objName)
defer core.FreeLOM(lom)
func (t *target) putObjS3(w http.ResponseWriter, r *http.Request, bck *meta.Bck, config *cmn.Config, lom *core.LOM) {
if err := lom.InitBck(bck.Bucket()); err != nil {
if cmn.IsErrRemoteBckNotFound(err) {
t.BMDVersionFixup(r)
Expand Down Expand Up @@ -239,7 +238,8 @@ func (t *target) getObjS3(w http.ResponseWriter, r *http.Request, items []string
return
}
if len(items) < 2 {
s3.WriteErr(w, r, errS3Obj, 0)
err := fmt.Errorf(fmtErrBckObj, r.Method, items)
s3.WriteErr(w, r, err, 0)
return
}
objName := s3.ObjName(items)
Expand Down Expand Up @@ -276,10 +276,6 @@ func (t *target) getObjS3(w http.ResponseWriter, r *http.Request, items []string
// HEAD /s3/<bucket-name>/<object-name> (TODO: s3.HdrMptCnt)
// See: https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html
func (t *target) headObjS3(w http.ResponseWriter, r *http.Request, items []string) {
if len(items) < 2 {
s3.WriteErr(w, r, errS3Obj, 0)
return
}
bucket, objName := items[0], s3.ObjName(items)
bck, err, errCode := meta.InitByNameOnly(bucket, t.owner.bmd)
if err != nil {
Expand Down Expand Up @@ -348,10 +344,6 @@ func (t *target) delObjS3(w http.ResponseWriter, r *http.Request, items []string
s3.WriteErr(w, r, err, errCode)
return
}
if len(items) < 2 {
s3.WriteErr(w, r, errS3Obj, 0)
return
}
objName := s3.ObjName(items)
lom := core.AllocLOM(objName)
defer core.FreeLOM(lom)
Expand Down Expand Up @@ -382,23 +374,13 @@ func (t *target) postObjS3(w http.ResponseWriter, r *http.Request, items []strin
}
q := r.URL.Query()
if q.Has(s3.QparamMptUploads) {
if len(items) < 2 {
err := fmt.Errorf(fmtErrBO, items)
s3.WriteErr(w, r, err, 0)
return
}
if cmn.Rom.FastV(5, cos.SmoduleS3) {
nlog.Infoln("startMpt", bck.String(), items, q)
}
t.startMpt(w, r, items, bck)
return
}
if q.Has(s3.QparamMptUploadID) {
if len(items) < 2 {
err := fmt.Errorf(fmtErrBO, items)
s3.WriteErr(w, r, err, 0)
return
}
if cmn.Rom.FastV(5, cos.SmoduleS3) {
nlog.Infoln("completeMpt", bck.String(), items, q)
}
Expand Down
Loading

0 comments on commit f889d45

Please sign in to comment.