From 9ff13d7a1fdcc5f68c94d411f8e491e2d8a576a3 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 22 Aug 2024 13:38:44 +0000 Subject: [PATCH] close method for api --- parsl/dataflow/dflow.py | 5 +++++ parsl/dataflow/memoization.py | 6 ++++++ 2 files changed, 11 insertions(+) diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index bd62b1b9ea..c6bd7fdedf 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -1260,6 +1260,7 @@ def cleanup(self) -> None: self.log_task_states() + # TODO: do this in the basic memoizer # Checkpointing takes priority over the rest of the tasks # checkpoint if any valid checkpoint method is specified if self.checkpoint_mode is not None: @@ -1272,6 +1273,10 @@ def cleanup(self) -> None: logger.info("Stopping checkpoint timer") self._checkpoint_timer.close() + logger.info("Closing memoizer") + self.memoizer.close() + logger.info("Closed memoizer") + # Send final stats logger.info("Sending end message for usage tracking") self.usage_tracker.send_end_message() diff --git a/parsl/dataflow/memoization.py b/parsl/dataflow/memoization.py index a50b411a9d..0a5f541b9c 100644 --- a/parsl/dataflow/memoization.py +++ b/parsl/dataflow/memoization.py @@ -161,6 +161,9 @@ class Memoizer: def start(self, *, dfk: DataFlowKernel, memoize: bool = True, checkpoint_files: Sequence[str], run_dir: str) -> None: raise NotImplementedError + def close(self) -> None: + raise NotImplementedError + def update_memo(self, task: TaskRecord, r: Future[Any]) -> None: raise NotImplementedError @@ -236,6 +239,9 @@ def start(self, *, dfk: DataFlowKernel, memoize: bool = True, checkpoint_files: logger.info("App caching disabled for all apps") self.memo_lookup_table = {} + def close(self) -> None: + pass # nothing to close but more should move here + def check_memo(self, task: TaskRecord) -> Optional[Future[Any]]: """Create a hash of the task and its inputs and check the lookup table for this hash.