diff --git a/app/src/main/java/com/infomaniak/swisstransfer/workers/UploadFileTask.kt b/app/src/main/java/com/infomaniak/swisstransfer/workers/UploadFileTask.kt index d501c24e1..5fd620925 100644 --- a/app/src/main/java/com/infomaniak/swisstransfer/workers/UploadFileTask.kt +++ b/app/src/main/java/com/infomaniak/swisstransfer/workers/UploadFileTask.kt @@ -23,16 +23,14 @@ import com.infomaniak.multiplatform_swisstransfer.common.interfaces.upload.Uploa import com.infomaniak.multiplatform_swisstransfer.common.interfaces.upload.UploadSession import com.infomaniak.multiplatform_swisstransfer.managers.UploadManager import com.infomaniak.sentry.SentryLog -import kotlinx.coroutines.Job -import kotlinx.coroutines.async -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.ensureActive +import kotlinx.coroutines.* import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Semaphore import kotlinx.coroutines.sync.withLock import java.io.BufferedInputStream import java.io.File import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.atomic.AtomicInteger class UploadFileTask( private val uploadManager: UploadManager, @@ -45,7 +43,7 @@ class UploadFileTask( uploadFileSession: UploadFileSession, uploadSession: UploadSession, onUploadBytes: suspend (Long) -> Unit, - ) = coroutineScope { + ) { SentryLog.i(TAG, "start upload file ${uploadFileSession.localPath}") val fileUUID: String = uploadFileSession.remoteUploadFile?.uuid ?: throwAndDestroyUpload(uploadSession.uuid, "Remote upload file not found") @@ -53,15 +51,41 @@ class UploadFileTask( val chunkSize = fileChunkSizeManager.computeChunkSize(fileSize = uploadFileSession.size) val totalChunks = fileChunkSizeManager.computeFileChunks(fileSize = uploadFileSession.size, fileChunkSize = chunkSize) val parallelChunks = fileChunkSizeManager.computeParallelChunks(fileChunkSize = chunkSize) - val lastChunkIndex = totalChunks - 1 - val requestSemaphore = Semaphore(parallelChunks) - val byteArrayPool = ArrayBlockingQueue(parallelChunks) - val chunkParentJob = Job() SentryLog.d(TAG, "chunkSize:$chunkSize | totalChunks:$totalChunks | parallelChunks:$parallelChunks") uploadFileSession.getLocalIoFile(uploadSession.uuid).inputStream().buffered().use { inputStream -> + uploadChunks( + chunkSize = chunkSize, + fileUUID = fileUUID, + inputStream = inputStream, + parallelChunks = parallelChunks, + totalChunks = totalChunks, + uploadSession = uploadSession, + onUploadBytes = onUploadBytes, + ) + } + } + + private suspend fun uploadChunks( + chunkSize: Long, + fileUUID: String, + inputStream: BufferedInputStream, + parallelChunks: Int, + totalChunks: Int, + uploadSession: UploadSession, + onUploadBytes: suspend (Long) -> Unit, + ) { + val requestSemaphore = Semaphore(parallelChunks) + val byteArrayPool = ArrayBlockingQueue(parallelChunks) + + val completedChunks = AtomicInteger(0) + val completableJob: CompletableJob = Job() + val lastChunkIndex = totalChunks - 1 + + coroutineScope { + for (chunkIndex in 0..lastChunkIndex) { requestSemaphore.acquire() SentryLog.i(TAG, "start for chunkIndex:$chunkIndex") @@ -76,19 +100,36 @@ class UploadFileTask( break } - async(chunkParentJob) { - startUploadChunk(uploadSession, fileUUID, chunkIndex, isLastChunk, dataByteArray, onUploadBytes) - byteArrayPool.offer(dataByteArray) - requestSemaphore.release() + launch { + try { + if (totalChunks > 1) { + waitForOthersIfLastChunk(isLastChunk, completableJob, completedChunks, lastChunkIndex) + } + startUploadChunk(uploadSession, fileUUID, chunkIndex, isLastChunk, dataByteArray, onUploadBytes) + } finally { + byteArrayPool.offer(dataByteArray) + requestSemaphore.release() + } } } } - chunkParentJob.complete() - chunkParentJob.join() byteArrayPool.clear() } + private suspend fun waitForOthersIfLastChunk( + isLastChunk: Boolean, + completableJob: CompletableJob, + completedChunks: AtomicInteger, + lastChunkIndex: Int, + ) { + if (isLastChunk) { + completableJob.join() // Wait for all the other jobs to complete + } else { + if (completedChunks.incrementAndGet() == lastChunkIndex) completableJob.complete() + } + } + private suspend inline fun UploadFileSession.getLocalIoFile(uploadUuid: String): File { return localPath.toUri().toFile().also { if (!it.exists()) {