Skip to content

Commit

Permalink
fix resume download
Browse files Browse the repository at this point in the history
  • Loading branch information
Hitenjain14 committed Feb 3, 2024
1 parent 00fca4a commit 466cb96
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 23 deletions.
1 change: 1 addition & 0 deletions zboxcore/sdk/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -1134,6 +1134,7 @@ func (a *Allocation) addAndGenerateDownloadRequest(
opt(downloadReq)
}
downloadReq.workdir = filepath.Join(downloadReq.workdir, ".zcn")
fmt.Println("downloadReq: ", downloadReq.downloadStorer != nil, " workDir:", downloadReq.workdir, "opts: ", len(downloadReqOpts))
a.downloadProgressMap[remotePath] = downloadReq
a.downloadRequests = append(a.downloadRequests, downloadReq)
if isFinal {
Expand Down
45 changes: 23 additions & 22 deletions zboxcore/sdk/download_progress_storer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/0chain/errors"
"github.com/0chain/gosdk/core/sys"
"github.com/hitenjain14/fasthttp"
)

type DownloadProgressStorer interface {
Expand Down Expand Up @@ -43,32 +42,34 @@ func CreateFsDownloadProgress() *FsDownloadProgressStorer {
}

func (ds *FsDownloadProgressStorer) Start(ctx context.Context) {
tc := time.NewTicker(2 * time.Second)
ds.next += ds.dp.numBlocks
tc := fasthttp.AcquireTimer(2 * time.Second)
defer fasthttp.ReleaseTimer(tc)
for {
select {
case <-ctx.Done():
return
case <-tc.C:
ds.Lock()
if ds.isRemoved {
ds.Unlock()
go func() {
defer tc.Stop()
for {
select {
case <-ctx.Done():
return
}
if len(ds.queue) > 0 && ds.queue[0] == ds.next {
for len(ds.queue) > 0 && ds.queue[0] == ds.next {
ds.dp.LastWrittenBlock = ds.next
heap.Pop(&ds.queue)
ds.next += ds.dp.numBlocks
case <-tc.C:
ds.Lock()
if ds.isRemoved {
ds.Unlock()
return
}
if len(ds.queue) > 0 && ds.queue[0] == ds.next {
for len(ds.queue) > 0 && ds.queue[0] == ds.next {
ds.dp.LastWrittenBlock = ds.next
heap.Pop(&ds.queue)
ds.next += ds.dp.numBlocks
}
ds.Unlock()
ds.saveToDisk()
} else {
ds.Unlock()
}
ds.Unlock()
ds.saveToDisk()
} else {
ds.Unlock()
}
}
}
}()
}

func (ds *FsDownloadProgressStorer) Load(progressID string) *DownloadProgress {
Expand Down
6 changes: 5 additions & 1 deletion zboxcore/sdk/downloadworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ func (req *DownloadRequest) processDownload(ctx context.Context) {
if !writerAt {
blocks <- blockData{blockNum: j, data: data}
} else {
offset := int64(j) * numBlocks * int64(req.effectiveBlockSize) * int64(req.datashards)
offset := (startBlock + int64(j)*numBlocks) * int64(req.effectiveBlockSize) * int64(req.datashards)
var total int
if j == n-1 {
total, err = writeAtData(writeAtHandler, data, req.datashards, offset, int(lastBlockSize))
Expand Down Expand Up @@ -956,6 +956,10 @@ func (req *DownloadRequest) calculateShardsParams(
// Can be nil when using file writer in wasm
if info != nil {
if req.downloadStorer != nil {
err = sys.Files.MkdirAll(filepath.Join(req.workdir, "download"), 0766)
if err != nil {
return 0, err
}
progressID := req.progressID()
var dp *DownloadProgress
if info.Size() > 0 {
Expand Down

0 comments on commit 466cb96

Please sign in to comment.