Skip to content

Commit

Permalink
Merge pull request #1424 from 0chain/hotfix/time-queue
Browse files Browse the repository at this point in the history
Fix download queue
  • Loading branch information
dabasov authored Mar 13, 2024
2 parents 193e6f9 + 385236b commit 83f0cc9
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 6 deletions.
4 changes: 2 additions & 2 deletions core/sys/fs_mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func (f *MemChanFile) Write(p []byte) (n int, err error) {
f.Buffer <- data
} else {
if cap(f.data) == 0 {
f.data = make([]byte, 0, f.ChunkWriteSize)
f.data = make([]byte, 0, len(p))
}
f.data = append(f.data, p...)
}
Expand All @@ -326,7 +326,7 @@ func (f *MemChanFile) Sync() error {
}
f.Buffer <- f.data[current:end]
}
f.data = make([]byte, 0, f.ChunkWriteSize)
f.data = nil
return nil
}
func (f *MemChanFile) Seek(offset int64, whence int) (ret int64, err error) {
Expand Down
8 changes: 8 additions & 0 deletions zboxcore/sdk/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,11 +402,13 @@ func (a *Allocation) RepairFile(file sys.File, remotepath string, statusCallback
WithEncrypt(true),
WithStatusCallback(statusCallback),
WithEncryptedPoint(ref.EncryptedKeyPoint),
WithChunkNumber(100),
}
} else {
opts = []ChunkedUploadOption{
WithMask(mask),
WithStatusCallback(statusCallback),
WithChunkNumber(100),
}
}
op := &OperationRequest{
Expand Down Expand Up @@ -1106,6 +1108,9 @@ func (a *Allocation) generateDownloadRequest(
downloadReq.contentMode = contentMode
downloadReq.connectionID = connectionID
downloadReq.downloadQueue = make(downloadQueue, len(a.Blobbers))
for i := 0; i < len(a.Blobbers); i++ {
downloadReq.downloadQueue[i].timeTaken = 1000000
}

return downloadReq, nil
}
Expand Down Expand Up @@ -2219,6 +2224,9 @@ func (a *Allocation) downloadFromAuthTicket(fileHandler sys.File, authTicket str
downloadReq.fullconsensus = a.fullconsensus
downloadReq.consensusThresh = a.consensusThreshold
downloadReq.downloadQueue = make(downloadQueue, len(a.Blobbers))
for i := 0; i < len(a.Blobbers); i++ {
downloadReq.downloadQueue[i].timeTaken = 1000000
}
downloadReq.connectionID = zboxutil.NewConnectionId()
downloadReq.completedCallback = func(remotepath string, remotepathHash string) {
a.mutex.Lock()
Expand Down
10 changes: 8 additions & 2 deletions zboxcore/sdk/downloadworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ type DownloadRequest struct {
bufferMap map[int]zboxutil.DownloadBuffer
downloadStorer DownloadProgressStorer
workdir string
downloadQueue downloadQueue
downloadQueue downloadQueue // Always initialize this queue with max time taken
}

type downloadPriority struct {
Expand Down Expand Up @@ -681,14 +681,19 @@ func (req *DownloadRequest) processDownload() {
var progressLock sync.Mutex
firstReqWG := sync.WaitGroup{}
firstReqWG.Add(1)
eg, _ := errgroup.WithContext(ctx)
eg, egCtx := errgroup.WithContext(ctx)
eg.SetLimit(downloadWorkerCount + EXTRA_COUNT)
for i := 0; i < n; i++ {
j := i
if i == 1 {
firstReqWG.Wait()
heap.Init(&req.downloadQueue)
}
select {
case <-egCtx.Done():
goto breakDownloadLoop
default:
}
eg.Go(func() error {

if j == 0 {
Expand Down Expand Up @@ -738,6 +743,7 @@ func (req *DownloadRequest) processDownload() {
}
return nil
})
breakDownloadLoop:
}
if err := eg.Wait(); err != nil {
writeCancel()
Expand Down
4 changes: 2 additions & 2 deletions zboxcore/sdk/repairworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (r *RepairRequest) processRepair(ctx context.Context, a *Allocation) {
if r.checkForCancel(a) {
return
}

SetNumBlockDownloads(100)
r.iterateDir(a, r.listDir)

if r.statusCB != nil {
Expand Down Expand Up @@ -177,7 +177,7 @@ func (r *RepairRequest) repairFile(a *Allocation, file *ListResult) []OperationR
return nil
}
memFile := &sys.MemChanFile{
Buffer: make(chan []byte, 10),
Buffer: make(chan []byte, 100),
ChunkWriteSize: int(a.GetChunkReadSize(ref.EncryptedKey != "")),
}
op = a.RepairFile(memFile, file.Path, statusCB, found, ref)
Expand Down

0 comments on commit 83f0cc9

Please sign in to comment.