From 0d08c275a644c4ae71f6b5198d7374ce5f08dc14 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Genevi=C3=A8ve=20Bastien?= Date: Thu, 7 Nov 2024 13:49:43 -0500 Subject: [PATCH] batchRoute: Better handle job cancellation and deletion fixes #709 * Cancelled jobs sometimes were not really cancelled if a checkpoint was set after the job status was set to cancelled in the main thread, but before the automatic task refresh every 5 seconds, so we refresh the task before updating it in the checkpoint update. * Also, refreshing and saving the job would throw an error if the job is deleted from the DB and would kill the running thread, which would cause the trRouting process to be left defunct. So we add a catch when refreshing/updating the job and stopping the trRouting process is done in the finally block, to make sure it is always stopped, even in case of an exception. --- .../src/services/transitRouting/TrRoutingBatch.ts | 15 ++++++++++----- .../src/tasks/TransitionWorkerPool.ts | 12 +++++++++--- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/packages/transition-backend/src/services/transitRouting/TrRoutingBatch.ts b/packages/transition-backend/src/services/transitRouting/TrRoutingBatch.ts index a199ff671..773675270 100644 --- a/packages/transition-backend/src/services/transitRouting/TrRoutingBatch.ts +++ b/packages/transition-backend/src/services/transitRouting/TrRoutingBatch.ts @@ -195,12 +195,9 @@ class TrRoutingBatch { checkpointTracker.completed(); this.options.progressEmitter.emit('progress', { name: 'BatchRouting', progress: 1.0 }); - this.options.progressEmitter.emit('progress', { name: 'StoppingRoutingParallelServers', progress: 0.0 }); - - const stopStatus = await TrRoutingProcessManager.stopBatch(); - this.options.progressEmitter.emit('progress', { name: 'StoppingRoutingParallelServers', progress: 1.0 }); - console.log('trRouting multiple stopStatus', stopStatus); + // FIXME Should we return here if the job is cancelled? Or we still + // generate the results that have been calculated since now? // Generate the output files this.options.progressEmitter.emit('progress', { name: 'GeneratingBatchRoutingResults', progress: 0.0 }); @@ -253,6 +250,14 @@ class TrRoutingBatch { console.error(`Error in batch routing calculation job ${this.options.jobId}: ${error}`); throw error; } + } finally { + // Make sure to stop the trRouting processes, even if an error occurred + this.options.progressEmitter.emit('progress', { name: 'StoppingRoutingParallelServers', progress: 0.0 }); + + const stopStatus = await TrRoutingProcessManager.stopBatch(); + + this.options.progressEmitter.emit('progress', { name: 'StoppingRoutingParallelServers', progress: 1.0 }); + console.log('trRouting multiple stopStatus', stopStatus); } }; diff --git a/packages/transition-backend/src/tasks/TransitionWorkerPool.ts b/packages/transition-backend/src/tasks/TransitionWorkerPool.ts index 1e0fdf6fa..c4b87c204 100644 --- a/packages/transition-backend/src/tasks/TransitionWorkerPool.ts +++ b/packages/transition-backend/src/tasks/TransitionWorkerPool.ts @@ -30,8 +30,14 @@ function newProgressEmitter(task: ExecutableJob) { }); eventEmitter.on('checkpoint', (checkpoint: number) => { console.log('Task received checkpoint ', checkpoint); - task.attributes.internal_data.checkpoint = checkpoint; - task.save(); + // Refresh the task before saving the checkpoint + task.refresh() + .then(() => { + // Add checkpoint, then save the task + task.attributes.internal_data.checkpoint = checkpoint; + task.save().catch(() => console.error('Error saving task after checkpoint')); + }) + .catch(() => console.error('Error refreshing task before saving checkpoint')); // This will catch deleted jobs }); return eventEmitter; } @@ -65,7 +71,7 @@ const getTaskCancelledFct = (task: ExecutableJob) => { clearInterval(intervalObj); } }) - .catch(() => (refreshError = true)); + .catch(() => (refreshError = true)); // This will catch deleted jobs }, 5000); return () => refreshError || task.status === 'cancelled'; };