Skip to content

Commit

Permalink
params.saving.how from_path_indices for multi_exec_subproc (only PIV)
Browse files Browse the repository at this point in the history
  • Loading branch information
paugier committed Mar 12, 2024
1 parent 1adc3f8 commit 7634f88
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 36 deletions.
8 changes: 8 additions & 0 deletions src/fluidimage/executors/multi_exec_subproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ def _start_processes(self):
self.path_dir_result / f"params_files_{self._unique_postfix}"
)
path_dir_params.mkdir(exist_ok=True)

if (
hasattr(self.topology, "how_saving")
and self.topology.how_saving == "complete"
and hasattr(splitter, "save_indices_files")
):
splitter.save_indices_files(path_dir_params)

for index_process, params_split in enumerate(
splitter.iter_over_new_params()
):
Expand Down
2 changes: 1 addition & 1 deletion src/fluidimage/topologies/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@

__all__ = ["LogTopology", "TopologyBase"]

how_values = ("ask", "new_dir", "complete", "recompute")
how_values = ("ask", "new_dir", "complete", "recompute", "from_path_indices")


def prepare_path_dir_result(
Expand Down
35 changes: 22 additions & 13 deletions src/fluidimage/topologies/piv.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,14 @@ def save_piv_object(self, obj):
ret = obj.save(self.path_dir_result)
self.results.append(ret)

def compute_indices_to_be_computed(self):
index_series = []
for ind_serie, serie in self.series.items():
name_piv = get_name_piv(serie, prefix="piv")
if not (self.path_dir_result / name_piv).exists():
index_series.append(ind_serie)
return index_series

def fill_couples_of_names_and_paths(self, input_queue, output_queues):
"""Fill the two first queues"""
assert input_queue is None
Expand All @@ -175,21 +183,22 @@ def fill_couples_of_names_and_paths(self, input_queue, output_queues):
if not series:
logger.warning("add 0 couple. No PIV to compute.")
return
if self.how_saving == "complete":
index_series = []
for ind_serie, serie in self.series.items():
name_piv = get_name_piv(serie, prefix="piv")
if not (self.path_dir_result / name_piv).exists():
index_series.append(ind_serie)

if not index_series:
logger.warning(
'topology in mode "complete" and work already done.'
)
return

series.set_index_series(index_series)
if self.how_saving in ("complete", "from_path_indices"):
if self.how_saving == "complete":
index_series = self.compute_indices_to_be_computed()
if not index_series:
logger.warning(
'topology in mode "complete" and work already done.'
)
return
elif self.how_saving == "from_path_indices":
path_indices = self.params.series.path_indices_file
index_series = [
int(line) for line in open(path_indices, encoding="utf-8")
]

series.set_index_series(index_series)
if logger.isEnabledFor(DEBUG):
logger.debug(repr([serie.get_name_arrays() for serie in series]))

Expand Down
83 changes: 70 additions & 13 deletions src/fluidimage/topologies/splitters.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ def split_range(start0, stop0, step0, num_parts):
return ranges


def split_list(sequence, num_parts):
num_parts = min(num_parts, len(sequence))
k, m = divmod(len(sequence), num_parts)
return [
sequence[i * k + min(i, m) : (i + 1) * k + min(i + 1, m)]
for i in range(num_parts)
]


class Splitter(ABC):

def __init__(self, params, num_processes, topology=None):
Expand Down Expand Up @@ -68,24 +77,72 @@ def __init__(self, params, num_processes, topology=None):
)
)

self.ranges = split_range(
self.series.ind_start,
self.series.ind_stop,
self.series.ind_step,
self.num_processes,
)
self.indices_lists = None
self.ranges = None
self._indices_files_saved = False
self._path_dir_indices = None

if (
topology is not None
and topology.how_saving == "complete"
and hasattr(topology, "compute_indices_to_be_computed")
):
indices = topology.compute_indices_to_be_computed()
self.indices_lists = split_list(indices, self.num_processes)
else:
self.ranges = split_range(
self.series.ind_start,
self.series.ind_stop,
self.series.ind_step,
self.num_processes,
)

def get_params_series(self, params):
return params.series

def iter_over_new_params(self):
for sss in self.ranges:
if len(range(*sss)) == 0:
def save_indices_files(self, path_dir):
self._path_dir_indices = path_dir

for idx_process, indices in enumerate(self.indices_lists):
if not indices:
continue
params = deepcopy(self.params)
p_series = self.get_params_series(params)
p_series.ind_start, p_series.ind_stop, p_series.ind_step = sss
yield params
path = path_dir / f"indices{idx_process:03}.txt"
path.write_text("\n".join(str(index) for index in indices) + "\n")

self._indices_files_saved = True

def iter_over_new_params(self):

if self.ranges is not None:
for sss in self.ranges:
if len(range(*sss)) == 0:
continue
params = deepcopy(self.params)
p_series = self.get_params_series(params)
p_series.ind_start, p_series.ind_stop, p_series.ind_step = sss
yield params

elif self.indices_lists is not None:

if not self._indices_files_saved:
raise RuntimeError("First call save_indices_files.")
path_dir = self._path_dir_indices

params0 = deepcopy(self.params)
params0.saving.how = "from_path_indices"

p_series = self.get_params_series(params0)
p_series._set_attrib("path_indices_file", None)

for idx_process, indices in enumerate(self.indices_lists):
if not indices:
continue
params = deepcopy(params0)
p_series = self.get_params_series(params)
p_series.path_indices_file = (
path_dir / f"indices{idx_process:03}.txt"
)
yield params


class SplitterFromImages(Splitter):
Expand Down
12 changes: 7 additions & 5 deletions src/fluidimage/topologies/test_piv.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ def test_piv_oseen(tmp_path_oseen, executor):

topology = TopologyPIV(params, logging_level="info")
topology.compute(executor, nb_max_workers=2)

path_files = list(Path(topology.path_dir_result).glob("piv*"))
path_files[0].unlink()
path_files[1].unlink()

params.saving.how = "complete"
topology = TopologyPIV(params, logging_level="info")
topology.compute(executor, nb_max_workers=2)

topology.make_code_graphviz(topology.path_dir_result / "topo.dot")
Expand Down Expand Up @@ -53,11 +60,6 @@ def test_piv_jet(tmp_path_jet):

# remove one file to test params.saving.how = "complete"
path_files = list(Path(topology.path_dir_result).glob("piv*"))

if not path_files:
sleep(0.2)
path_files = list(Path(topology.path_dir_result).glob("piv*"))

path_files[0].unlink()

params.saving.how = "complete"
Expand Down
20 changes: 16 additions & 4 deletions src/fluidimage/topologies/test_splitters.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,37 @@
from fluiddyn.util.serieofarrays import SeriesOfArrays
from fluidimage import get_path_image_samples
from fluidimage.topologies.piv import TopologyPIV
from fluidimage.topologies.splitters import SplitterFromSeries, split_range
from fluidimage.topologies.splitters import (
SplitterFromSeries,
split_list,
split_range,
)

path_image_samples = get_path_image_samples()


def test_split_range():

for start, stop, step, num_parts in ((1, 55, 2, 9), (0, 10, 1, 3)):
num_elems = len(range(start, stop, step))
ranges = split_range(start, stop, step, num_parts)

indices = []
for _r in ranges:
indices.extend(range(*_r))
assert len(set(indices)) == num_elems

assert sum(len(range(*sss)) for sss in ranges) == num_elems


def test_split_list():
num_processes = 4
for seq in ([5, 1, 3], list(range(10))):
lists = split_list(seq, num_processes)
assert len(lists) == min([num_processes, len(seq)])
indices = []
for _l in lists:
indices.extend(_l)
assert indices == seq


def test_splitter_from_serie():

params = TopologyPIV.create_default_params()
Expand Down

0 comments on commit 7634f88

Please sign in to comment.