Skip to content

Commit

Permalink
feat: Ensure to start the last chunk at the end (#249)
Browse files Browse the repository at this point in the history
  • Loading branch information
tevincent authored Dec 12, 2024
2 parents ee66065 + b275adb commit 5bd6929
Showing 1 changed file with 56 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,14 @@ import com.infomaniak.multiplatform_swisstransfer.common.interfaces.upload.Uploa
import com.infomaniak.multiplatform_swisstransfer.common.interfaces.upload.UploadSession
import com.infomaniak.multiplatform_swisstransfer.managers.UploadManager
import com.infomaniak.sentry.SentryLog
import kotlinx.coroutines.Job
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withLock
import java.io.BufferedInputStream
import java.io.File
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.atomic.AtomicInteger

class UploadFileTask(
private val uploadManager: UploadManager,
Expand All @@ -45,23 +43,49 @@ class UploadFileTask(
uploadFileSession: UploadFileSession,
uploadSession: UploadSession,
onUploadBytes: suspend (Long) -> Unit,
) = coroutineScope {
) {
SentryLog.i(TAG, "start upload file ${uploadFileSession.localPath}")
val fileUUID: String = uploadFileSession.remoteUploadFile?.uuid
?: throwAndDestroyUpload(uploadSession.uuid, "Remote upload file not found")

val chunkSize = fileChunkSizeManager.computeChunkSize(fileSize = uploadFileSession.size)
val totalChunks = fileChunkSizeManager.computeFileChunks(fileSize = uploadFileSession.size, fileChunkSize = chunkSize)
val parallelChunks = fileChunkSizeManager.computeParallelChunks(fileChunkSize = chunkSize)
val lastChunkIndex = totalChunks - 1

val requestSemaphore = Semaphore(parallelChunks)
val byteArrayPool = ArrayBlockingQueue<ByteArray>(parallelChunks)
val chunkParentJob = Job()

SentryLog.d(TAG, "chunkSize:$chunkSize | totalChunks:$totalChunks | parallelChunks:$parallelChunks")

uploadFileSession.getLocalIoFile(uploadSession.uuid).inputStream().buffered().use { inputStream ->
uploadChunks(
chunkSize = chunkSize,
fileUUID = fileUUID,
inputStream = inputStream,
parallelChunks = parallelChunks,
totalChunks = totalChunks,
uploadSession = uploadSession,
onUploadBytes = onUploadBytes,
)
}
}

private suspend fun uploadChunks(
chunkSize: Long,
fileUUID: String,
inputStream: BufferedInputStream,
parallelChunks: Int,
totalChunks: Int,
uploadSession: UploadSession,
onUploadBytes: suspend (Long) -> Unit,
) {
val requestSemaphore = Semaphore(parallelChunks)
val byteArrayPool = ArrayBlockingQueue<ByteArray>(parallelChunks)

val completedChunks = AtomicInteger(0)
val completableJob: CompletableJob = Job()
val lastChunkIndex = totalChunks - 1

coroutineScope {

for (chunkIndex in 0..lastChunkIndex) {
requestSemaphore.acquire()
SentryLog.i(TAG, "start for chunkIndex:$chunkIndex")
Expand All @@ -76,19 +100,36 @@ class UploadFileTask(
break
}

async(chunkParentJob) {
startUploadChunk(uploadSession, fileUUID, chunkIndex, isLastChunk, dataByteArray, onUploadBytes)
byteArrayPool.offer(dataByteArray)
requestSemaphore.release()
launch {
try {
if (totalChunks > 1) {
waitForOthersIfLastChunk(isLastChunk, completableJob, completedChunks, lastChunkIndex)
}
startUploadChunk(uploadSession, fileUUID, chunkIndex, isLastChunk, dataByteArray, onUploadBytes)
} finally {
byteArrayPool.offer(dataByteArray)
requestSemaphore.release()
}
}
}
}

chunkParentJob.complete()
chunkParentJob.join()
byteArrayPool.clear()
}

private suspend fun waitForOthersIfLastChunk(
isLastChunk: Boolean,
completableJob: CompletableJob,
completedChunks: AtomicInteger,
lastChunkIndex: Int,
) {
if (isLastChunk) {
completableJob.join() // Wait for all the other jobs to complete
} else {
if (completedChunks.incrementAndGet() == lastChunkIndex) completableJob.complete()
}
}

private suspend inline fun UploadFileSession.getLocalIoFile(uploadUuid: String): File {
return localPath.toUri().toFile().also {
if (!it.exists()) {
Expand Down

0 comments on commit 5bd6929

Please sign in to comment.