diff --git a/core/sys/fs_mem.go b/core/sys/fs_mem.go index 369504a45..3d9f01823 100644 --- a/core/sys/fs_mem.go +++ b/core/sys/fs_mem.go @@ -310,7 +310,7 @@ func (f *MemChanFile) Write(p []byte) (n int, err error) { f.Buffer <- data } else { if cap(f.data) == 0 { - f.data = make([]byte, 0, f.ChunkWriteSize) + f.data = make([]byte, 0, len(p)) } f.data = append(f.data, p...) } @@ -326,7 +326,7 @@ func (f *MemChanFile) Sync() error { } f.Buffer <- f.data[current:end] } - f.data = make([]byte, 0, f.ChunkWriteSize) + f.data = nil return nil } func (f *MemChanFile) Seek(offset int64, whence int) (ret int64, err error) { diff --git a/zboxcore/sdk/allocation.go b/zboxcore/sdk/allocation.go index 24a2458b1..cd0f01f6e 100644 --- a/zboxcore/sdk/allocation.go +++ b/zboxcore/sdk/allocation.go @@ -402,11 +402,13 @@ func (a *Allocation) RepairFile(file sys.File, remotepath string, statusCallback WithEncrypt(true), WithStatusCallback(statusCallback), WithEncryptedPoint(ref.EncryptedKeyPoint), + WithChunkNumber(100), } } else { opts = []ChunkedUploadOption{ WithMask(mask), WithStatusCallback(statusCallback), + WithChunkNumber(100), } } op := &OperationRequest{ @@ -1106,6 +1108,9 @@ func (a *Allocation) generateDownloadRequest( downloadReq.contentMode = contentMode downloadReq.connectionID = connectionID downloadReq.downloadQueue = make(downloadQueue, len(a.Blobbers)) + for i := 0; i < len(a.Blobbers); i++ { + downloadReq.downloadQueue[i].timeTaken = 1000000 + } return downloadReq, nil } @@ -2219,6 +2224,9 @@ func (a *Allocation) downloadFromAuthTicket(fileHandler sys.File, authTicket str downloadReq.fullconsensus = a.fullconsensus downloadReq.consensusThresh = a.consensusThreshold downloadReq.downloadQueue = make(downloadQueue, len(a.Blobbers)) + for i := 0; i < len(a.Blobbers); i++ { + downloadReq.downloadQueue[i].timeTaken = 1000000 + } downloadReq.connectionID = zboxutil.NewConnectionId() downloadReq.completedCallback = func(remotepath string, remotepathHash string) { a.mutex.Lock() diff --git a/zboxcore/sdk/downloadworker.go b/zboxcore/sdk/downloadworker.go index a2c65f3dd..3fbbafd24 100644 --- a/zboxcore/sdk/downloadworker.go +++ b/zboxcore/sdk/downloadworker.go @@ -105,7 +105,7 @@ type DownloadRequest struct { bufferMap map[int]zboxutil.DownloadBuffer downloadStorer DownloadProgressStorer workdir string - downloadQueue downloadQueue + downloadQueue downloadQueue // Always initialize this queue with max time taken } type downloadPriority struct { @@ -681,7 +681,7 @@ func (req *DownloadRequest) processDownload() { var progressLock sync.Mutex firstReqWG := sync.WaitGroup{} firstReqWG.Add(1) - eg, _ := errgroup.WithContext(ctx) + eg, egCtx := errgroup.WithContext(ctx) eg.SetLimit(downloadWorkerCount + EXTRA_COUNT) for i := 0; i < n; i++ { j := i @@ -689,6 +689,11 @@ func (req *DownloadRequest) processDownload() { firstReqWG.Wait() heap.Init(&req.downloadQueue) } + select { + case <-egCtx.Done(): + goto breakDownloadLoop + default: + } eg.Go(func() error { if j == 0 { @@ -738,6 +743,7 @@ func (req *DownloadRequest) processDownload() { } return nil }) + breakDownloadLoop: } if err := eg.Wait(); err != nil { writeCancel() diff --git a/zboxcore/sdk/repairworker.go b/zboxcore/sdk/repairworker.go index bc8e0d90e..f271f3939 100644 --- a/zboxcore/sdk/repairworker.go +++ b/zboxcore/sdk/repairworker.go @@ -63,7 +63,7 @@ func (r *RepairRequest) processRepair(ctx context.Context, a *Allocation) { if r.checkForCancel(a) { return } - + SetNumBlockDownloads(100) r.iterateDir(a, r.listDir) if r.statusCB != nil { @@ -177,7 +177,7 @@ func (r *RepairRequest) repairFile(a *Allocation, file *ListResult) []OperationR return nil } memFile := &sys.MemChanFile{ - Buffer: make(chan []byte, 10), + Buffer: make(chan []byte, 100), ChunkWriteSize: int(a.GetChunkReadSize(ref.EncryptedKey != "")), } op = a.RepairFile(memFile, file.Path, statusCB, found, ref)