Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

check consensus in get ref #1538

Merged
merged 3 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion zboxcore/sdk/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -1413,7 +1413,6 @@ func (a *Allocation) getRefs(path, pathHash, authToken, offsetPath, updatedDate,
offsetDate: offsetDate,
fileType: fileType,
refType: refType,
wg: &sync.WaitGroup{},
ctx: a.ctx,
}
oTreeReq.fullconsensus = a.fullconsensus
Expand Down
85 changes: 45 additions & 40 deletions zboxcore/sdk/filerefsworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,69 +44,69 @@ type ObjectTreeRequest struct {
updatedDate string // must have "2006-01-02T15:04:05.99999Z07:00" format
offsetDate string // must have "2006-01-02T15:04:05.99999Z07:00" format
ctx context.Context
wg *sync.WaitGroup
Consensus
}

type oTreeResponse struct {
oTResult *ObjectTreeResult
err error
hash string
idx int
}

// Paginated tree should not be collected as this will stall the client
// It should rather be handled by application that uses gosdk
func (o *ObjectTreeRequest) GetRefs() (*ObjectTreeResult, error) {
totalBlobbersCount := len(o.blobbers)
oTreeResponses := make([]oTreeResponse, totalBlobbersCount)
o.wg.Add(totalBlobbersCount)
respChan := make(chan *oTreeResponse, totalBlobbersCount)
for i, blob := range o.blobbers {
l.Logger.Debug(fmt.Sprintf("Getting file refs for path %v from blobber %v", o.remotefilepath, blob.Baseurl))
go o.getFileRefs(&oTreeResponses[i], blob.Baseurl)
idx := i
baseURL := blob.Baseurl
go o.getFileRefs(baseURL, respChan, idx)
}
o.wg.Wait()
hashCount := make(map[string]int)
hashRefsMap := make(map[string]*ObjectTreeResult)
oTreeResponseErrors := make([]error, totalBlobbersCount)

for idx, oTreeResponse := range oTreeResponses {
oTreeResponseErrors[idx] = oTreeResponse.err
if oTreeResponse.err != nil {
if code, _ := zboxutil.GetErrorMessageCode(oTreeResponse.err.Error()); code != INVALID_PATH {
l.Logger.Error("Error while getting file refs from blobber:", oTreeResponse.err)
var successCount int
for i := 0; i < totalBlobbersCount; i++ {
select {
case <-o.ctx.Done():
return nil, o.ctx.Err()
case oTreeResponse := <-respChan:
oTreeResponseErrors[oTreeResponse.idx] = oTreeResponse.err
if oTreeResponse.err != nil {
if code, _ := zboxutil.GetErrorMessageCode(oTreeResponse.err.Error()); code != INVALID_PATH {
l.Logger.Error("Error while getting file refs from blobber:", oTreeResponse.err)
}
continue
}
successCount++
hash := oTreeResponse.hash
if _, ok := hashCount[hash]; ok {
hashCount[hash]++
} else {
hashCount[hash]++
hashRefsMap[hash] = oTreeResponse.oTResult
}
if hashCount[hash] == o.consensusThresh {
return oTreeResponse.oTResult, nil
}
continue
}
hash := oTreeResponse.hash

if _, ok := hashCount[hash]; ok {
hashCount[hash]++
} else {
hashCount[hash]++
hashRefsMap[hash] = oTreeResponse.oTResult
}
}
majorError := zboxutil.MajorError(oTreeResponseErrors)
majorErrorMsg := ""
if majorError != nil {
majorErrorMsg = majorError.Error()
}
if code, _ := zboxutil.GetErrorMessageCode(majorErrorMsg); code == INVALID_PATH {
return &ObjectTreeResult{}, nil
}
var selected *ObjectTreeResult
for k, v := range hashCount {
if v >= o.consensusThresh {
selected = hashRefsMap[k]
break
if successCount < o.consensusThresh {
majorError := zboxutil.MajorError(oTreeResponseErrors)
majorErrorMsg := ""
if majorError != nil {
majorErrorMsg = majorError.Error()
}
if code, _ := zboxutil.GetErrorMessageCode(majorErrorMsg); code == INVALID_PATH {
return &ObjectTreeResult{}, nil
} else {
return nil, majorError
}
}

if selected != nil {
return selected, nil
}
if majorError != nil {
l.Logger.Error("error while gettings refs: ", majorError)
}
// build the object tree result by using consensus on individual refs
refHash := make(map[string]int)
Expand Down Expand Up @@ -137,8 +137,13 @@ func (o *ObjectTreeRequest) GetRefs() (*ObjectTreeResult, error) {
return nil, errors.New("consensus_failed", "Refs consensus is less than consensus threshold")
}

func (o *ObjectTreeRequest) getFileRefs(oTR *oTreeResponse, bUrl string) {
defer o.wg.Done()
func (o *ObjectTreeRequest) getFileRefs(bUrl string, respChan chan *oTreeResponse, idx int) {
oTR := &oTreeResponse{
idx: idx,
}
defer func() {
respChan <- oTR
}()
oReq, err := zboxutil.NewRefsRequest(
bUrl,
o.allocationID,
Expand Down
Loading