Skip to content

Commit

Permalink
Merge pull request #1506 from 0chain/feat/ref-error
Browse files Browse the repository at this point in the history
fix consensus thresh and log error
  • Loading branch information
dabasov authored Jun 11, 2024
2 parents e2f3152 + c936c09 commit d6cc848
Show file tree
Hide file tree
Showing 11 changed files with 94 additions and 45 deletions.
2 changes: 1 addition & 1 deletion core/sys/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (f *MemFile) Write(p []byte) (n int, err error) {
}

func (f *MemFile) WriteAt(p []byte, offset int64) (n int, err error) {
if offset < 0 || offset > int64(len(f.Buffer)) {
if offset < 0 || offset > int64(len(f.Buffer)) || len(p) > len(f.Buffer)-int(offset) {
return 0, io.ErrShortWrite
}

Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,6 @@ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/herumi/bls-go-binary v1.31.0 h1:L1goQ2tMtGgpXCg5AwHAdJQpLs/pfnWWEc3Wog6OhmI=
github.com/herumi/bls-go-binary v1.31.0/go.mod h1:O4Vp1AfR4raRGwFeQpr9X/PQtncEicMoOe6BQt1oX0Y=
github.com/hitenjain14/fasthttp v0.0.0-20240229173600-722723e15e17 h1:FbyIK0BfvXVZTOxKOe2dlxJqSPSF2ZXOv2Mc7dvS7sc=
github.com/hitenjain14/fasthttp v0.0.0-20240229173600-722723e15e17/go.mod h1:RZMcXy7u4S+E97IXYTe7WHZ3+mCYOh4vys8PkIGZeXk=
github.com/hitenjain14/fasthttp v0.0.0-20240527123209-06019e79bff9 h1:Z6Mu2JCsW2hbqx91L0HNPRPQ10RyAFvPocQHlrRo1Jk=
github.com/hitenjain14/fasthttp v0.0.0-20240527123209-06019e79bff9/go.mod h1:RZMcXy7u4S+E97IXYTe7WHZ3+mCYOh4vys8PkIGZeXk=
github.com/holiman/bloomfilter/v2 v2.0.3 h1:73e0e/V0tCydx14a0SCYS/EWCxgwLZ18CZcZKVu0fao=
Expand Down
23 changes: 14 additions & 9 deletions zboxcore/sdk/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,13 +230,14 @@ type OperationRequest struct {
IsWebstreaming bool

// Required for uploads
Workdir string
FileMeta FileMeta
FileReader io.Reader
Mask *zboxutil.Uint128 // Required for delete repair operation
DownloadFile bool // Required for upload repair operation
StreamUpload bool // Required for streaming file when actualSize is not available
Opts []ChunkedUploadOption
Workdir string
FileMeta FileMeta
FileReader io.Reader
Mask *zboxutil.Uint128 // Required for delete repair operation
DownloadFile bool // Required for upload repair operation
StreamUpload bool // Required for streaming file when actualSize is not available
CancelCauseFunc context.CancelCauseFunc
Opts []ChunkedUploadOption
}

func GetReadPriceRange() (PriceRange, error) {
Expand Down Expand Up @@ -307,7 +308,11 @@ func (a *Allocation) GetBlobberStats() map[string]*BlobberAllocationStats {
return result
}

const downloadWorkerCount = 6
var downloadWorkerCount = 6

func SetDownloadWorkerCount(count int) {
downloadWorkerCount = count
}

func (a *Allocation) InitAllocation() {
a.downloadChan = make(chan *DownloadRequest, 100)
Expand Down Expand Up @@ -1415,7 +1420,7 @@ func (a *Allocation) getRefs(path, pathHash, authToken, offsetPath, updatedDate,
ctx: a.ctx,
}
oTreeReq.fullconsensus = a.fullconsensus
oTreeReq.consensusThresh = a.consensusThreshold
oTreeReq.consensusThresh = a.DataShards
return oTreeReq.GetRefs()
}

Expand Down
4 changes: 4 additions & 0 deletions zboxcore/sdk/allocation_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/hitenjain14/fasthttp"

"github.com/0chain/gosdk/zboxcore/blockchain"
"github.com/0chain/gosdk/zboxcore/client"
zclient "github.com/0chain/gosdk/zboxcore/client"
"github.com/0chain/gosdk/zboxcore/fileref"
"github.com/0chain/gosdk/zboxcore/mocks"
Expand All @@ -35,6 +36,9 @@ func setupHttpResponses(
refsInput, fileMetaInput []byte, hashes []string,
numBlobbers, numCorrect int, isUpdate bool) {

walletJSON := `{"client_id":"00d2d56d0d573329fe61b8252a4b1715f93fac15176e5d90c413bc92a42e498b","client_key":"000b47144eb0366c3039bca10bc6df3ac289d8823de14ffc08cfdfe83f03e4079ab94bdc3932e7e9bc053f38834c7da63ce6f9c6e540d93cf0c52ba4149f2280","keys":[{"public_key":"000b47144eb0366c3039bca10bc6df3ac289d8823de14ffc08cfdfe83f03e4079ab94bdc3932e7e9bc053f38834c7da63ce6f9c6e540d93cf0c52ba4149f2280","private_key":"77a7faf0dcc1865a475963fee7ce71ca6dc6a20198209eb75d9fc1dc9df41f0f"}],"mnemonics":"mistake alone lumber swamp tape device flight oppose room combine useful typical deal lion device hope glad once million pudding artist brush sing vicious","version":"1.0","date_created":"2024-03-11T20:06:33+05:30","nonce":0}`
client.PopulateClient(walletJSON, "bls0chain") //nolint:errcheck

for i := 0; i < numBlobbers; i++ {
metaBlobberBase := t.Name() + "/" + mockBlobberUrl + strconv.Itoa(i) + zboxutil.FILE_META_ENDPOINT
refsBlobberBase := t.Name() + "/" + mockBlobberUrl + strconv.Itoa(i) + zboxutil.REFS_ENDPOINT
Expand Down
21 changes: 15 additions & 6 deletions zboxcore/sdk/chunked_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ var (
cancelLock sync.Mutex
CurrentMode = UploadModeMedium
shouldSaveProgress = true
HighModeWorkers = 4
)

// DefaultChunkSize default chunk size for file and thumbnail
Expand All @@ -75,6 +76,10 @@ func SetUploadMode(mode UploadMode) {
CurrentMode = mode
}

func SetHighModeWorkers(workers int) {
HighModeWorkers = workers
}

/*
CreateChunkedUpload create a ChunkedUpload instance
Expand Down Expand Up @@ -325,7 +330,7 @@ func calculateWorkersAndRequests(dataShards, totalShards, chunknumber int) (uplo
case UploadModeMedium:
uploadWorkers = 2
case UploadModeHigh:
uploadWorkers = 4
uploadWorkers = HighModeWorkers
}
}

Expand Down Expand Up @@ -420,7 +425,11 @@ func (su *ChunkedUpload) createEncscheme() encryption.EncryptionScheme {
return nil
}
} else {
privateKey, err := encscheme.Initialize(client.GetClient().Mnemonic)
mnemonic := client.GetClient().Mnemonic
if mnemonic == "" {
return nil
}
privateKey, err := encscheme.Initialize(mnemonic)
if err != nil {
return nil
}
Expand Down Expand Up @@ -705,7 +714,7 @@ func (su *ChunkedUpload) processUpload(chunkStartIndex, chunkEndIndex int,
su.removeProgress()
return thrown.New("upload_failed", fmt.Sprintf("Upload failed. %s", err))
}
logger.Logger.Info("uploadingData: ", blobberUpload.chunkStartIndex, " - ", blobberUpload.chunkEndIndex, " ", isFinal, " ", su.fileMeta.RemotePath)
logger.Logger.Debug("uploadingData: ", blobberUpload.chunkStartIndex, " - ", blobberUpload.chunkEndIndex, " ", isFinal, " ", su.fileMeta.RemotePath)
if !lastBufferOnly {
su.uploadWG.Add(1)
select {
Expand All @@ -717,9 +726,9 @@ func (su *ChunkedUpload) processUpload(chunkStartIndex, chunkEndIndex int,

if isFinal {
close(su.uploadChan)
logger.Logger.Info("Waiting for upload to complete")
logger.Logger.Debug("Waiting for upload to complete")
su.uploadWG.Wait()
logger.Logger.Info("Upload completed")
logger.Logger.Debug("Upload completed")
select {
case <-su.ctx.Done():
return context.Cause(su.ctx)
Expand Down Expand Up @@ -860,7 +869,7 @@ func (su *ChunkedUpload) uploadToBlobbers(uploadData UploadData) error {
}
return
}
logger.Logger.Error("error during sendUploadRequest", err)
logger.Logger.Error("error during sendUploadRequest", err, " connectionID: ", su.progress.ConnectionID)
errC := atomic.AddInt32(&errCount, 1)
if errC > int32(su.allocationObj.ParityShards-1) { // If atleast data shards + 1 number of blobbers can process the upload, it can be repaired later
wgErrors <- err
Expand Down
23 changes: 13 additions & 10 deletions zboxcore/sdk/downloadworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ type DownloadRequest struct {
downloadStorer DownloadProgressStorer
workdir string
downloadQueue downloadQueue // Always initialize this queue with max time taken
isResume bool
}

type downloadPriority struct {
Expand Down Expand Up @@ -480,7 +481,9 @@ func (req *DownloadRequest) processDownload() {
remainingSize := size - startBlock*int64(req.effectiveBlockSize)*int64(req.datashards)

if endBlock*int64(req.effectiveBlockSize)*int64(req.datashards) < req.size {
remainingSize = (endBlock - startBlock - 1) * int64(req.effectiveBlockSize) * int64(req.datashards)
remainingSize = blocksPerShard * int64(req.effectiveBlockSize) * int64(req.datashards)
} else if req.isResume {
remainingSize = size
}

if memFile, ok := req.fileHandler.(*sys.MemFile); ok {
Expand Down Expand Up @@ -726,11 +729,12 @@ func (req *DownloadRequest) processDownload() {
}
var total int
if j == n-1 {
total, err = writeAtData(writeAtHandler, data, req.datashards, offset, int(size-offset))
total, err = writeAtData(writeAtHandler, data, req.datashards, offset, int(remainingSize-offset))
} else {
total, err = writeAtData(writeAtHandler, data, req.datashards, offset, -1)
}
if err != nil {
logger.Logger.Error("downloadFailed: ", startBlock+int64(j)*numBlocks, " remainingSize: ", remainingSize, " offset: ", offset)
return errors.Wrap(err, fmt.Sprintf("WriteAt failed for block %d. ", startBlock+int64(j)*numBlocks))
}
for _, rb := range req.bufferMap {
Expand Down Expand Up @@ -959,14 +963,7 @@ func (req *DownloadRequest) initEncryption() (err error) {
return err
}
} else {
key, err := hex.DecodeString(client.GetClientPrivateKey())
if err != nil {
return err
}
err = req.encScheme.InitializeWithPrivateKey(key)
if err != nil {
return err
}
return errors.New("invalid_mnemonic", "Invalid mnemonic")
}

err = req.encScheme.InitForDecryption("filetype:audio", req.encryptedKey)
Expand Down Expand Up @@ -1044,6 +1041,9 @@ func (req *DownloadRequest) calculateShardsParams(
}
if dp != nil {
req.startBlock = int64(dp.LastWrittenBlock)
if req.startBlock > 0 {
req.isResume = true
}
} else {
dp = &DownloadProgress{
ID: progressID,
Expand Down Expand Up @@ -1351,12 +1351,14 @@ func writeAtData(dest io.WriterAt, data [][][]byte, dataShards int, offset int64
n, err := dest.WriteAt(data[i][j], offset+int64(total))
total += n
if err != nil {
logger.Logger.Error("writeAt failed: ", err, " offset: ", offset, " total: ", total, "toWriteData: ", len(data[i][j]), " lastBlock: ", lastBlock)
return total, err
}
} else {
n, err := dest.WriteAt(data[i][j][:lastBlock], offset+int64(total))
total += n
if err != nil {
logger.Logger.Error("writeAt failed: ", err, " offset: ", offset, " total: ", total, "toWriteData: ", len(data[i][j]), " lastBlock: ", lastBlock)
return total, err
}
}
Expand All @@ -1368,6 +1370,7 @@ func writeAtData(dest io.WriterAt, data [][][]byte, dataShards int, offset int64
n, err := dest.WriteAt(data[i][j], offset+int64(total))
total += n
if err != nil {
logger.Logger.Error("writeAt failed: ", err, " offset: ", offset, " total: ", total)
return total, err
}
}
Expand Down
46 changes: 38 additions & 8 deletions zboxcore/sdk/filerefsworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package sdk

import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"io/ioutil"
"math"
"net/http"
"sync"
"time"
Expand Down Expand Up @@ -69,18 +71,17 @@ func (o *ObjectTreeRequest) GetRefs() (*ObjectTreeResult, error) {
for idx, oTreeResponse := range oTreeResponses {
oTreeResponseErrors[idx] = oTreeResponse.err
if oTreeResponse.err != nil {
l.Logger.Error("Error while getting file refs from blobber:", oTreeResponse.err)
if code, _ := zboxutil.GetErrorMessageCode(oTreeResponse.err.Error()); code != INVALID_PATH {
l.Logger.Error("Error while getting file refs from blobber:", oTreeResponse.err)
}
continue
}
var similarFieldRefs []SimilarField
var similarFieldRefs []byte
for _, ref := range oTreeResponse.oTResult.Refs {
similarFieldRefs = append(similarFieldRefs, ref.SimilarField)
decodeBytes, _ := hex.DecodeString(ref.SimilarField.FileMetaHash)
similarFieldRefs = append(similarFieldRefs, decodeBytes...)
}
refsMarshall, err := json.Marshal(similarFieldRefs)
if err != nil {
continue
}
hash := zboxutil.GetRefsHash(refsMarshall)
hash := zboxutil.GetRefsHash(similarFieldRefs)

if _, ok := hashCount[hash]; ok {
hashCount[hash]++
Expand Down Expand Up @@ -108,6 +109,35 @@ func (o *ObjectTreeRequest) GetRefs() (*ObjectTreeResult, error) {
if selected != nil {
return selected, nil
}
if majorError != nil {
l.Logger.Error("error while gettings refs: ", majorError)
}
// build the object tree result by using consensus on individual refs
refHash := make(map[string]int)
selected = &ObjectTreeResult{}
minPage := int64(math.MaxInt64)
for _, oTreeResponse := range oTreeResponses {
if oTreeResponse.err != nil {
continue
}
if oTreeResponse.oTResult.TotalPages < minPage {
minPage = oTreeResponse.oTResult.TotalPages
selected.TotalPages = minPage
}
for _, ref := range oTreeResponse.oTResult.Refs {
if refHash[ref.FileMetaHash] == o.consensusThresh {
continue
}
refHash[ref.FileMetaHash] += 1
if refHash[ref.FileMetaHash] == o.consensusThresh {
selected.Refs = append(selected.Refs, ref)
}
}
}
if len(selected.Refs) > 0 {
selected.OffsetPath = selected.Refs[len(selected.Refs)-1].Path
return selected, nil
}
return nil, errors.New("consensus_failed", "Refs consensus is less than consensus threshold")
}

Expand Down
3 changes: 1 addition & 2 deletions zboxcore/sdk/multi_operation_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

const (
DefaultCreateConnectionTimeOut = 10 * time.Second
DefaultCreateConnectionTimeOut = 45 * time.Second
)

var BatchSize = 6
Expand Down Expand Up @@ -323,7 +323,6 @@ func (mo *MultiOperation) Process() error {
}

if !mo.isConsensusOk() {
mo.allocationObj.checkStatus = false
err = zboxutil.MajorError(errSlice)
if mo.getConsensus() != 0 {
l.Logger.Info("Rolling back changes on minority blobbers")
Expand Down
4 changes: 2 additions & 2 deletions zboxcore/zboxutil/download_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (r *DownloadBufferWithChan) RequestChunk(ctx context.Context, num int) []by
r.mu.Lock()
r.mp[num] = ind
r.mu.Unlock()
return r.buf[ind*r.reqSize : (ind+1)*r.reqSize]
return r.buf[ind*r.reqSize : (ind+1)*r.reqSize : (ind+1)*r.reqSize]
}
}

Expand Down Expand Up @@ -112,7 +112,7 @@ func (r *DownloadBufferWithMask) RequestChunk(ctx context.Context, num int) []by
// assign the chunk by clearing the bit
r.mask &= ^(1 << num)
r.mu.Unlock()
return r.buf[num*r.reqSize : (num+1)*r.reqSize]
return r.buf[num*r.reqSize : (num+1)*r.reqSize : (num+1)*r.reqSize]
}
}

Expand Down
6 changes: 4 additions & 2 deletions zboxcore/zboxutil/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ func SetHostClient(id, baseURL string) {
HostClientMap[id] = &fasthttp.HostClient{
NoDefaultUserAgentHeader: true,
Addr: host,
MaxIdleConnDuration: 60 * time.Second,
MaxConns: 1024,
MaxIdleConnDuration: 45 * time.Second,
DisableHeaderNamesNormalizing: true,
DisablePathNormalizing: true,
Dial: (&fasthttp.TCPDialer{
Expand Down Expand Up @@ -189,7 +190,7 @@ func init() {
}

FastHttpClient = &fasthttp.Client{
MaxIdleConnDuration: 60 * time.Second,
MaxIdleConnDuration: 45 * time.Second,
NoDefaultUserAgentHeader: true, // Don't send: User-Agent: fasthttp
DisableHeaderNamesNormalizing: true, // If you set the case on your headers correctly you can enable this
DisablePathNormalizing: true,
Expand All @@ -202,6 +203,7 @@ func init() {
WriteTimeout: 120 * time.Second,
MaxConnDuration: 45 * time.Second,
MaxResponseBodySize: 1024 * 1024 * 64, //64MB
MaxConnsPerHost: 1024,
}
envProxy.initialize()
log.Init(logger.DEBUG, "0box-sdk")
Expand Down
5 changes: 2 additions & 3 deletions zboxcore/zboxutil/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@ import (
var DefaultTransport = &http.Transport{
Proxy: envProxy.Proxy,
DialContext: (&net.Dialer{
Timeout: 45 * time.Second,
Timeout: 3 * time.Minute,
KeepAlive: 45 * time.Second,
DualStack: true,
}).DialContext,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
TLSHandshakeTimeout: 45 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
MaxIdleConnsPerHost: 25,
WriteBufferSize: 16 * 1024,
}

0 comments on commit d6cc848

Please sign in to comment.