Skip to content

Commit

Permalink
set blocks to 60 and add fasthttp
Browse files Browse the repository at this point in the history
  • Loading branch information
Hitenjain14 committed Jan 29, 2024
1 parent d79ae98 commit 312f03f
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 48 deletions.
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ require (

require (
github.com/VictoriaMetrics/fastcache v1.6.0 // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.2 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
Expand Down Expand Up @@ -112,6 +113,7 @@ require (
github.com/tklauser/go-sysconf v0.3.11 // indirect
github.com/tklauser/numcpus v0.6.0 // indirect
github.com/urfave/cli/v2 v2.17.2-0.20221006022127-8f469abc00aa // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
go.dedis.ch/fixbuf v1.0.3 // indirect
go.opencensus.io v0.24.0 // indirect
Expand All @@ -128,9 +130,10 @@ require (

require (
github.com/btcsuite/btcd/btcutil v1.1.3
github.com/klauspost/compress v1.16.0 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/remeh/sizedwaitgroup v1.0.0
github.com/valyala/fasthttp v1.51.0
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8=
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/apache/arrow/go/arrow v0.0.0-20191024131854-af6fa24be0db/go.mod h1:VTxUBvSJ3s3eHAg65PNgrsn5BtqCRPdmyXh6rAfdxN0=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
Expand Down Expand Up @@ -338,6 +340,8 @@ github.com/klauspost/compress v1.4.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0
github.com/klauspost/compress v1.12.3/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/compress v1.16.0 h1:iULayQNOReoYUe+1qtKOqw9CwJv3aNQu8ivo7lw1HU4=
github.com/klauspost/compress v1.16.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/cpuid/v2 v2.2.4 h1:acbojRNwl3o09bUq+yDCtZFc1aiwaAAxtcn8YkZXnvk=
github.com/klauspost/cpuid/v2 v2.2.4/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY=
Expand Down Expand Up @@ -527,6 +531,10 @@ github.com/uptrace/bunrouter v1.0.20 h1:jNvYNcJxF+lSYBQAaQjnE6I11Zs0m+3M5Ek7fq/T
github.com/uptrace/bunrouter v1.0.20/go.mod h1:TwT7Bc0ztF2Z2q/ZzMuSVkcb/Ig/d3MQeP2cxn3e1hI=
github.com/urfave/cli/v2 v2.17.2-0.20221006022127-8f469abc00aa h1:5SqCsI/2Qya2bCzK15ozrqo2sZxkh0FHynJZOTVoV6Q=
github.com/urfave/cli/v2 v2.17.2-0.20221006022127-8f469abc00aa/go.mod h1:1CNUng3PtjQMtRzJO4FMXBQvkGtuYRxxiR9xMa7jMwI=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.51.0 h1:8b30A5JlZ6C7AS81RsWjYMQmrZG6feChmgAolCl1SqA=
github.com/valyala/fasthttp v1.51.0/go.mod h1:oI2XroL+lI7vdXyYoQk03bXBThfFl2cVdIA3Xl7cH8g=
github.com/willf/bitset v1.1.3/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
Expand Down
13 changes: 3 additions & 10 deletions zboxcore/sdk/chunked_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/0chain/gosdk/zboxcore/zboxutil"
"github.com/google/uuid"
"github.com/klauspost/reedsolomon"
"github.com/remeh/sizedwaitgroup"
)

const (
Expand All @@ -48,8 +47,8 @@ var (
ErrNoEnoughSpaceLeftInAllocation = errors.New("alloc: no enough space left in allocation")
CancelOpCtx = make(map[string]context.CancelCauseFunc)
cancelLock sync.Mutex
UploadWorkers = 2
UploadRequests = 10
// UploadWorkers = 2
UploadRequests = 5
)

// DefaultChunkSize default chunk size for file and thumbnail
Expand Down Expand Up @@ -761,21 +760,15 @@ func getShardSize(dataSize int64, dataShards int, isEncrypted bool) int64 {

func (su *ChunkedUpload) uploadProcessor() {
defer su.uploadWG.Done()
swg := sizedwaitgroup.New(UploadWorkers)
for {
select {
case <-su.ctx.Done():
return
case uploadData, ok := <-su.uploadChan:
if !ok {
swg.Wait()
return
}
swg.Add()
go func() {
su.uploadToBlobbers(uploadData) //nolint:errcheck
swg.Done()
}()
su.uploadToBlobbers(uploadData) //nolint:errcheck
}
}
}
Expand Down
51 changes: 16 additions & 35 deletions zboxcore/sdk/chunked_upload_blobber.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/0chain/gosdk/zboxcore/logger"
"github.com/0chain/gosdk/zboxcore/marker"
"github.com/0chain/gosdk/zboxcore/zboxutil"
"github.com/valyala/fasthttp"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -76,48 +77,37 @@ func (sb *ChunkedUploadBlobber) sendUploadRequest(
)

for i := 0; i < 3; i++ {
dataBufferSize := dataBuffers[ind].Len()
req, err := zboxutil.NewUploadRequestWithMethod(
sb.blobber.Baseurl, su.allocationObj.ID, su.allocationObj.Tx, dataBuffers[ind], su.httpMethod)
req, err := zboxutil.NewFastUploadRequest(
sb.blobber.Baseurl, su.allocationObj.ID, su.allocationObj.Tx, dataBuffers[ind].Bytes(), su.httpMethod)
if err != nil {
return err
}

req.Header.Add("Content-Type", contentSlice[ind])
err, shouldContinue = func() (err error, shouldContinue bool) {
reqCtx, ctxCncl := context.WithTimeout(ctx, su.uploadTimeOut)
var resp *http.Response
err = zboxutil.HttpDo(reqCtx, ctxCncl, req, func(r *http.Response, err error) error {
resp = r
return err
})
defer ctxCncl()

resp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(resp)
err = zboxutil.FastHttpClient.DoTimeout(req, resp, su.uploadTimeOut)
fasthttp.ReleaseRequest(req)
if err != nil {
logger.Logger.Error("Upload : ", err)
return fmt.Errorf("Error while doing reqeust. Error %s", err), false
}

if resp.Body != nil {
defer resp.Body.Close()
}
if resp.StatusCode == http.StatusOK {
io.Copy(io.Discard, resp.Body) //nolint:errcheck
if resp.StatusCode() == http.StatusOK {
return
}
var r UploadResult
var respbody []byte

respbody, err = io.ReadAll(resp.Body)
respbody := resp.Body()
if err != nil {
logger.Logger.Error("Error: Resp ", err)
return fmt.Errorf("Error while reading body. Error %s", err), false
}

if resp.StatusCode == http.StatusTooManyRequests {
if resp.StatusCode() == http.StatusTooManyRequests {
logger.Logger.Error("Got too many request error")
var r int
r, err = zboxutil.GetRateLimitValue(resp)
r, err = zboxutil.GetFastRateLimitValue(resp)
if err != nil {
logger.Logger.Error(err)
return
Expand All @@ -127,20 +117,11 @@ func (sb *ChunkedUploadBlobber) sendUploadRequest(
return
}

if resp.StatusCode != http.StatusOK {
msg := string(respbody)
logger.Logger.Error(sb.blobber.Baseurl,
" Upload error response: ", resp.StatusCode,
"err message: ", msg, "ind: ", ind, "retries: ", i, "chunkIndex: ", chunkIndex, " dataBufferSize:", dataBufferSize, " currentDataBufferLen: ", dataBuffers[ind].Len(), " contentType: ", formData.ContentType)
err = errors.Throw(constants.ErrBadRequest, msg)
return
}

err = json.Unmarshal(respbody, &r)
if err != nil {
logger.Logger.Error(sb.blobber.Baseurl, "Upload response parse error: ", err)
return
}
msg := string(respbody)
logger.Logger.Error(sb.blobber.Baseurl,
" Upload error response: ", resp.StatusCode,
"err message: ", msg)
err = errors.Throw(constants.ErrBadRequest, msg)
return
}()

Expand Down
2 changes: 1 addition & 1 deletion zboxcore/sdk/chunked_upload_form_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func CreateChunkedUploadFormBuilder() ChunkedUploadFormBuilder {
type chunkedUploadFormBuilder struct {
}

const MAX_BLOCKS = 80 // 5MB(CHUNK_SIZE*80)
const MAX_BLOCKS = 60 // 4MB(CHUNK_SIZE*60)

func (b *chunkedUploadFormBuilder) Build(
fileMeta *FileMeta, hasher Hasher, connectionID string,
Expand Down
2 changes: 1 addition & 1 deletion zboxcore/sdk/chunked_upload_progress_storer.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (fs *fsChunkedUploadProgressStorer) Save(up UploadProgress) {
defer fs.Unlock()
fs.up = up
now := time.Now()
if now.Sub(fs.since).Seconds() > 1 {
if now.Sub(fs.since).Seconds() > 2 {
if fs.isRemoved {
return
}
Expand Down
49 changes: 49 additions & 0 deletions zboxcore/zboxutil/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/0chain/gosdk/core/logger"
"github.com/0chain/gosdk/zboxcore/blockchain"
"github.com/0chain/gosdk/zboxcore/client"
"github.com/valyala/fasthttp"
)

const SC_REST_API_URL = "v1/screst/"
Expand All @@ -40,6 +41,8 @@ type HttpClient interface {

var Client HttpClient

var FastHttpClient *fasthttp.Client

var log logger.Logger

func GetLogger() *logger.Logger {
Expand Down Expand Up @@ -141,6 +144,18 @@ func init() {
Client = &http.Client{
Transport: DefaultTransport,
}
maxIdleConnDuration, _ := time.ParseDuration("1h")
FastHttpClient = &fasthttp.Client{
MaxIdleConnDuration: maxIdleConnDuration,
NoDefaultUserAgentHeader: true, // Don't send: User-Agent: fasthttp
DisableHeaderNamesNormalizing: true, // If you set the case on your headers correctly you can enable this
DisablePathNormalizing: true,
// increase DNS cache time to an hour instead of default minute
Dial: (&fasthttp.TCPDialer{
Concurrency: 4096,
DNSCacheDuration: time.Hour,
}).Dial,
}
envProxy.initialize()
log.Init(logger.DEBUG, "0box-sdk")
}
Expand Down Expand Up @@ -179,6 +194,19 @@ func setClientInfoWithSign(req *http.Request, allocation string) error {
return nil
}

func setFastClientInfoWithSign(req *fasthttp.Request, allocation string) error {
req.Header.Set("X-App-Client-ID", client.GetClientID())
req.Header.Set("X-App-Client-Key", client.GetClientPublicKey())

sign, err := client.Sign(encryption.Hash(allocation))
if err != nil {
return err
}
req.Header.Set(CLIENT_SIGNATURE_HEADER, sign)

return nil
}

func NewCommitRequest(baseUrl, allocationID string, allocationTx string, body io.Reader) (*http.Request, error) {
u, err := joinUrl(baseUrl, COMMIT_ENDPOINT, allocationTx)
if err != nil {
Expand Down Expand Up @@ -546,6 +574,27 @@ func NewWriteMarkerUnLockRequest(
return req, nil
}

func NewFastUploadRequest(baseURL, allocationID string, allocationTx string, body []byte, method string) (*fasthttp.Request, error) {
u, err := joinUrl(baseURL, UPLOAD_ENDPOINT, allocationTx)
if err != nil {
return nil, err
}

req := fasthttp.AcquireRequest()

req.Header.SetMethod(method)
req.SetRequestURI(u.String())
req.SetBodyRaw(body)

// set header: X-App-Client-Signature
if err := setFastClientInfoWithSign(req, allocationTx); err != nil {
return nil, err
}

req.Header.Set(ALLOCATION_ID_HEADER, allocationID)
return req, nil
}

func NewUploadRequest(baseUrl, allocationID string, allocationTx string, body io.Reader, update bool) (*http.Request, error) {
u, err := joinUrl(baseUrl, UPLOAD_ENDPOINT, allocationTx)
if err != nil {
Expand Down
18 changes: 18 additions & 0 deletions zboxcore/zboxutil/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/h2non/filetype"
"github.com/lithammer/shortuuid/v3"
"github.com/minio/sha256-simd"
"github.com/valyala/fasthttp"
"golang.org/x/crypto/chacha20poly1305"
"golang.org/x/crypto/scrypt"
)
Expand Down Expand Up @@ -241,6 +242,23 @@ func GetRateLimitValue(r *http.Response) (int, error) {
return int(math.Ceil(rl / dur)), nil
}

func GetFastRateLimitValue(r *fasthttp.Response) (int, error) {
rlStr := r.Header.Peek("X-Rate-Limit-Limit")
durStr := r.Header.Peek("X-Rate-Limit-Duration")

rl, err := strconv.ParseFloat(string(rlStr), 64)
if err != nil {
return 0, err
}

dur, err := strconv.ParseFloat(string(durStr), 64)
if err != nil {
return 0, err
}

return int(math.Ceil(rl / dur)), nil
}

func MajorError(errors []error) error {
countError := make(map[error]int)
for _, value := range errors {
Expand Down

0 comments on commit 312f03f

Please sign in to comment.