diff --git a/zboxcore/sdk/allocation.go b/zboxcore/sdk/allocation.go index ee4ea5fc3..9fef41ee1 100644 --- a/zboxcore/sdk/allocation.go +++ b/zboxcore/sdk/allocation.go @@ -1413,7 +1413,6 @@ func (a *Allocation) getRefs(path, pathHash, authToken, offsetPath, updatedDate, offsetDate: offsetDate, fileType: fileType, refType: refType, - wg: &sync.WaitGroup{}, ctx: a.ctx, } oTreeReq.fullconsensus = a.fullconsensus diff --git a/zboxcore/sdk/filerefsworker.go b/zboxcore/sdk/filerefsworker.go index ab8d54b8b..1b003ccaf 100644 --- a/zboxcore/sdk/filerefsworker.go +++ b/zboxcore/sdk/filerefsworker.go @@ -44,7 +44,6 @@ type ObjectTreeRequest struct { updatedDate string // must have "2006-01-02T15:04:05.99999Z07:00" format offsetDate string // must have "2006-01-02T15:04:05.99999Z07:00" format ctx context.Context - wg *sync.WaitGroup Consensus } @@ -52,6 +51,7 @@ type oTreeResponse struct { oTResult *ObjectTreeResult err error hash string + idx int } // Paginated tree should not be collected as this will stall the client @@ -59,54 +59,54 @@ type oTreeResponse struct { func (o *ObjectTreeRequest) GetRefs() (*ObjectTreeResult, error) { totalBlobbersCount := len(o.blobbers) oTreeResponses := make([]oTreeResponse, totalBlobbersCount) - o.wg.Add(totalBlobbersCount) + respChan := make(chan *oTreeResponse, totalBlobbersCount) for i, blob := range o.blobbers { l.Logger.Debug(fmt.Sprintf("Getting file refs for path %v from blobber %v", o.remotefilepath, blob.Baseurl)) - go o.getFileRefs(&oTreeResponses[i], blob.Baseurl) + idx := i + baseURL := blob.Baseurl + go o.getFileRefs(baseURL, respChan, idx) } - o.wg.Wait() hashCount := make(map[string]int) hashRefsMap := make(map[string]*ObjectTreeResult) oTreeResponseErrors := make([]error, totalBlobbersCount) - - for idx, oTreeResponse := range oTreeResponses { - oTreeResponseErrors[idx] = oTreeResponse.err - if oTreeResponse.err != nil { - if code, _ := zboxutil.GetErrorMessageCode(oTreeResponse.err.Error()); code != INVALID_PATH { - l.Logger.Error("Error while getting file refs from blobber:", oTreeResponse.err) + var successCount int + for i := 0; i < totalBlobbersCount; i++ { + select { + case <-o.ctx.Done(): + return nil, o.ctx.Err() + case oTreeResponse := <-respChan: + oTreeResponseErrors[oTreeResponse.idx] = oTreeResponse.err + if oTreeResponse.err != nil { + if code, _ := zboxutil.GetErrorMessageCode(oTreeResponse.err.Error()); code != INVALID_PATH { + l.Logger.Error("Error while getting file refs from blobber:", oTreeResponse.err) + } + continue + } + successCount++ + hash := oTreeResponse.hash + if _, ok := hashCount[hash]; ok { + hashCount[hash]++ + } else { + hashCount[hash]++ + hashRefsMap[hash] = oTreeResponse.oTResult + } + if hashCount[hash] == o.consensusThresh { + return oTreeResponse.oTResult, nil } - continue - } - hash := oTreeResponse.hash - - if _, ok := hashCount[hash]; ok { - hashCount[hash]++ - } else { - hashCount[hash]++ - hashRefsMap[hash] = oTreeResponse.oTResult } } - majorError := zboxutil.MajorError(oTreeResponseErrors) - majorErrorMsg := "" - if majorError != nil { - majorErrorMsg = majorError.Error() - } - if code, _ := zboxutil.GetErrorMessageCode(majorErrorMsg); code == INVALID_PATH { - return &ObjectTreeResult{}, nil - } var selected *ObjectTreeResult - for k, v := range hashCount { - if v >= o.consensusThresh { - selected = hashRefsMap[k] - break + if successCount < o.consensusThresh { + majorError := zboxutil.MajorError(oTreeResponseErrors) + majorErrorMsg := "" + if majorError != nil { + majorErrorMsg = majorError.Error() + } + if code, _ := zboxutil.GetErrorMessageCode(majorErrorMsg); code == INVALID_PATH { + return &ObjectTreeResult{}, nil + } else { + return nil, majorError } - } - - if selected != nil { - return selected, nil - } - if majorError != nil { - l.Logger.Error("error while gettings refs: ", majorError) } // build the object tree result by using consensus on individual refs refHash := make(map[string]int) @@ -137,8 +137,13 @@ func (o *ObjectTreeRequest) GetRefs() (*ObjectTreeResult, error) { return nil, errors.New("consensus_failed", "Refs consensus is less than consensus threshold") } -func (o *ObjectTreeRequest) getFileRefs(oTR *oTreeResponse, bUrl string) { - defer o.wg.Done() +func (o *ObjectTreeRequest) getFileRefs(bUrl string, respChan chan *oTreeResponse, idx int) { + oTR := &oTreeResponse{ + idx: idx, + } + defer func() { + respChan <- oTR + }() oReq, err := zboxutil.NewRefsRequest( bUrl, o.allocationID,