diff --git a/src/fluidimage/executors/multi_exec_subproc.py b/src/fluidimage/executors/multi_exec_subproc.py index f5247fa5..66d6adb6 100644 --- a/src/fluidimage/executors/multi_exec_subproc.py +++ b/src/fluidimage/executors/multi_exec_subproc.py @@ -72,6 +72,14 @@ def _start_processes(self): self.path_dir_result / f"params_files_{self._unique_postfix}" ) path_dir_params.mkdir(exist_ok=True) + + if ( + hasattr(self.topology, "how_saving") + and self.topology.how_saving == "complete" + and hasattr(splitter, "save_indices_files") + ): + splitter.save_indices_files(path_dir_params) + for index_process, params_split in enumerate( splitter.iter_over_new_params() ): diff --git a/src/fluidimage/topologies/__init__.py b/src/fluidimage/topologies/__init__.py index bfb529ac..07ec2642 100644 --- a/src/fluidimage/topologies/__init__.py +++ b/src/fluidimage/topologies/__init__.py @@ -44,7 +44,7 @@ __all__ = ["LogTopology", "TopologyBase"] -how_values = ("ask", "new_dir", "complete", "recompute") +how_values = ("ask", "new_dir", "complete", "recompute", "from_path_indices") def prepare_path_dir_result( diff --git a/src/fluidimage/topologies/piv.py b/src/fluidimage/topologies/piv.py index e1a447ea..b31bf9ba 100644 --- a/src/fluidimage/topologies/piv.py +++ b/src/fluidimage/topologies/piv.py @@ -165,6 +165,14 @@ def save_piv_object(self, obj): ret = obj.save(self.path_dir_result) self.results.append(ret) + def compute_indices_to_be_computed(self): + index_series = [] + for ind_serie, serie in self.series.items(): + name_piv = get_name_piv(serie, prefix="piv") + if not (self.path_dir_result / name_piv).exists(): + index_series.append(ind_serie) + return index_series + def fill_couples_of_names_and_paths(self, input_queue, output_queues): """Fill the two first queues""" assert input_queue is None @@ -175,21 +183,22 @@ def fill_couples_of_names_and_paths(self, input_queue, output_queues): if not series: logger.warning("add 0 couple. No PIV to compute.") return - if self.how_saving == "complete": - index_series = [] - for ind_serie, serie in self.series.items(): - name_piv = get_name_piv(serie, prefix="piv") - if not (self.path_dir_result / name_piv).exists(): - index_series.append(ind_serie) - - if not index_series: - logger.warning( - 'topology in mode "complete" and work already done.' - ) - return - series.set_index_series(index_series) + if self.how_saving in ("complete", "from_path_indices"): + if self.how_saving == "complete": + index_series = self.compute_indices_to_be_computed() + if not index_series: + logger.warning( + 'topology in mode "complete" and work already done.' + ) + return + elif self.how_saving == "from_path_indices": + path_indices = self.params.series.path_indices_file + index_series = [ + int(line) for line in open(path_indices, encoding="utf-8") + ] + series.set_index_series(index_series) if logger.isEnabledFor(DEBUG): logger.debug(repr([serie.get_name_arrays() for serie in series])) diff --git a/src/fluidimage/topologies/splitters.py b/src/fluidimage/topologies/splitters.py index 38e2755b..8ebb0a96 100644 --- a/src/fluidimage/topologies/splitters.py +++ b/src/fluidimage/topologies/splitters.py @@ -33,6 +33,15 @@ def split_range(start0, stop0, step0, num_parts): return ranges +def split_list(sequence, num_parts): + num_parts = min(num_parts, len(sequence)) + k, m = divmod(len(sequence), num_parts) + return [ + sequence[i * k + min(i, m) : (i + 1) * k + min(i + 1, m)] + for i in range(num_parts) + ] + + class Splitter(ABC): def __init__(self, params, num_processes, topology=None): @@ -68,24 +77,72 @@ def __init__(self, params, num_processes, topology=None): ) ) - self.ranges = split_range( - self.series.ind_start, - self.series.ind_stop, - self.series.ind_step, - self.num_processes, - ) + self.indices_lists = None + self.ranges = None + self._indices_files_saved = False + self._path_dir_indices = None + + if ( + topology is not None + and topology.how_saving == "complete" + and hasattr(topology, "compute_indices_to_be_computed") + ): + indices = topology.compute_indices_to_be_computed() + self.indices_lists = split_list(indices, self.num_processes) + else: + self.ranges = split_range( + self.series.ind_start, + self.series.ind_stop, + self.series.ind_step, + self.num_processes, + ) def get_params_series(self, params): return params.series - def iter_over_new_params(self): - for sss in self.ranges: - if len(range(*sss)) == 0: + def save_indices_files(self, path_dir): + self._path_dir_indices = path_dir + + for idx_process, indices in enumerate(self.indices_lists): + if not indices: continue - params = deepcopy(self.params) - p_series = self.get_params_series(params) - p_series.ind_start, p_series.ind_stop, p_series.ind_step = sss - yield params + path = path_dir / f"indices{idx_process:03}.txt" + path.write_text("\n".join(str(index) for index in indices) + "\n") + + self._indices_files_saved = True + + def iter_over_new_params(self): + + if self.ranges is not None: + for sss in self.ranges: + if len(range(*sss)) == 0: + continue + params = deepcopy(self.params) + p_series = self.get_params_series(params) + p_series.ind_start, p_series.ind_stop, p_series.ind_step = sss + yield params + + elif self.indices_lists is not None: + + if not self._indices_files_saved: + raise RuntimeError("First call save_indices_files.") + path_dir = self._path_dir_indices + + params0 = deepcopy(self.params) + params0.saving.how = "from_path_indices" + + p_series = self.get_params_series(params0) + p_series._set_attrib("path_indices_file", None) + + for idx_process, indices in enumerate(self.indices_lists): + if not indices: + continue + params = deepcopy(params0) + p_series = self.get_params_series(params) + p_series.path_indices_file = ( + path_dir / f"indices{idx_process:03}.txt" + ) + yield params class SplitterFromImages(Splitter): diff --git a/src/fluidimage/topologies/test_piv.py b/src/fluidimage/topologies/test_piv.py index a69263e9..606ce2a4 100644 --- a/src/fluidimage/topologies/test_piv.py +++ b/src/fluidimage/topologies/test_piv.py @@ -26,6 +26,13 @@ def test_piv_oseen(tmp_path_oseen, executor): topology = TopologyPIV(params, logging_level="info") topology.compute(executor, nb_max_workers=2) + + path_files = list(Path(topology.path_dir_result).glob("piv*")) + path_files[0].unlink() + path_files[1].unlink() + + params.saving.how = "complete" + topology = TopologyPIV(params, logging_level="info") topology.compute(executor, nb_max_workers=2) topology.make_code_graphviz(topology.path_dir_result / "topo.dot") @@ -53,11 +60,6 @@ def test_piv_jet(tmp_path_jet): # remove one file to test params.saving.how = "complete" path_files = list(Path(topology.path_dir_result).glob("piv*")) - - if not path_files: - sleep(0.2) - path_files = list(Path(topology.path_dir_result).glob("piv*")) - path_files[0].unlink() params.saving.how = "complete" diff --git a/src/fluidimage/topologies/test_splitters.py b/src/fluidimage/topologies/test_splitters.py index 4b25a4cc..735a9189 100644 --- a/src/fluidimage/topologies/test_splitters.py +++ b/src/fluidimage/topologies/test_splitters.py @@ -1,25 +1,37 @@ from fluiddyn.util.serieofarrays import SeriesOfArrays from fluidimage import get_path_image_samples from fluidimage.topologies.piv import TopologyPIV -from fluidimage.topologies.splitters import SplitterFromSeries, split_range +from fluidimage.topologies.splitters import ( + SplitterFromSeries, + split_list, + split_range, +) path_image_samples = get_path_image_samples() def test_split_range(): - for start, stop, step, num_parts in ((1, 55, 2, 9), (0, 10, 1, 3)): num_elems = len(range(start, stop, step)) ranges = split_range(start, stop, step, num_parts) - indices = [] for _r in ranges: indices.extend(range(*_r)) assert len(set(indices)) == num_elems - assert sum(len(range(*sss)) for sss in ranges) == num_elems +def test_split_list(): + num_processes = 4 + for seq in ([5, 1, 3], list(range(10))): + lists = split_list(seq, num_processes) + assert len(lists) == min([num_processes, len(seq)]) + indices = [] + for _l in lists: + indices.extend(_l) + assert indices == seq + + def test_splitter_from_serie(): params = TopologyPIV.create_default_params()