From 125b582345c532a98f3c195e57fbe7c85ec38a5f Mon Sep 17 00:00:00 2001 From: Daniel Alejandro Coll Tejeda <62675074+macarronesc@users.noreply.github.com> Date: Fri, 28 Jun 2024 10:47:30 +0000 Subject: [PATCH 1/6] Added Lithops executor --- rechunker/api.py | 9 ++++++- rechunker/executors/__init__.py | 7 ++++++ rechunker/executors/lithops.py | 42 +++++++++++++++++++++++++++++++++ 3 files changed, 57 insertions(+), 1 deletion(-) create mode 100644 rechunker/executors/lithops.py diff --git a/rechunker/api.py b/rechunker/api.py index 0b3ced7..f059734 100644 --- a/rechunker/api.py +++ b/rechunker/api.py @@ -206,6 +206,13 @@ class PythonCopySpecExecutor(PythonPipelineExecutor, CopySpecToPipelinesMixin): pass return PythonCopySpecExecutor() + elif name.lower() == "lithops": + from rechunker.executors.lithops import LithopsPipelineExecutor + + class LithopsCopySpecExecutor(LithopsPipelineExecutor, CopySpecToPipelinesMixin): + pass + + return LithopsCopySpecExecutor() elif name.lower() == "pywren": from rechunker.executors.pywren import PywrenExecutor @@ -609,7 +616,7 @@ def _setup_array_rechunk( except AttributeError: pass - if read_chunks == write_chunks or read_chunks == int_chunks: + if read_chunks == write_chunks: int_array = None else: # do intermediate store diff --git a/rechunker/executors/__init__.py b/rechunker/executors/__init__.py index a4f5364..6b7268c 100644 --- a/rechunker/executors/__init__.py +++ b/rechunker/executors/__init__.py @@ -15,3 +15,10 @@ __all__.append("PrefectPipelineExecutor") except ImportError: pass + +try: + from .lithops import LithopsPipelineExecutor + + __all__.append("LithopsPipelineExecutor") +except ImportError: + pass diff --git a/rechunker/executors/lithops.py b/rechunker/executors/lithops.py new file mode 100644 index 0000000..09603d1 --- /dev/null +++ b/rechunker/executors/lithops.py @@ -0,0 +1,42 @@ +from typing import Callable + +import lithops + +from rechunker.types import ParallelPipelines, PipelineExecutor + +Task = Callable[[], None] + +class LithopsPipelineExecutor(PipelineExecutor[Task]): + """An execution engine based on Lithops framework.""" + + def pipelines_to_plan(self, pipelines: ParallelPipelines) -> Task: + def map_function(stage, function, config): + function(stage, config=config) if stage else function(config=config) + + def _prepare_input_data(): + iterdata = [] + for pipeline in pipelines: + for stage in pipeline.stages: + if stage.mappable is not None: + for m in stage.mappable: + iterdata.append((m, stage.function, pipeline.config)) + else: + iterdata.append((None, stage.function, pipeline.config)) + return iterdata + + def plan(config): + iterdata = _prepare_input_data() + + if config is None: + fexec = lithops.FunctionExecutor(config=config) + else: + fexec = lithops.FunctionExecutor() + + fexec.map(map_function, iterdata[0]) + fexec.get_result() + + return plan + + def execute_plan(self, plan: Task, config = None, **kwargs): + plan(config) + \ No newline at end of file From 4acdca170765a6bfe13e19bd311d15db42269eff Mon Sep 17 00:00:00 2001 From: Daniel Alejandro Coll Tejeda <62675074+macarronesc@users.noreply.github.com> Date: Fri, 28 Jun 2024 10:52:48 +0000 Subject: [PATCH 2/6] Added LithopsExecutor to Documentation --- docs/api.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/api.rst b/docs/api.rst index de42130..1643bab 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -40,9 +40,11 @@ Rechunking plans can be executed on a variety of backends. The following table l rechunker.executors.prefect.PrefectExecutor rechunker.executors.python.PythonExecutor rechunker.executors.pywren.PywrenExecutor + rechunker.executors.lithops.LithopsExecutor .. autoclass:: rechunker.executors.beam.BeamExecutor .. autoclass:: rechunker.executors.dask.DaskExecutor .. autoclass:: rechunker.executors.prefect.PrefectExecutor .. autoclass:: rechunker.executors.python.PythonExecutor .. autoclass:: rechunker.executors.pywren.PywrenExecutor +.. autoclass:: rechunker.executors.lithops.LithopsExecutor From a36c2967c372d6eef77d2c92ef834e7bfdfa9e9e Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 28 Jun 2024 10:54:39 +0000 Subject: [PATCH 3/6] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- rechunker/api.py | 4 +++- rechunker/executors/lithops.py | 8 ++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/rechunker/api.py b/rechunker/api.py index f059734..bdb4b30 100644 --- a/rechunker/api.py +++ b/rechunker/api.py @@ -209,7 +209,9 @@ class PythonCopySpecExecutor(PythonPipelineExecutor, CopySpecToPipelinesMixin): elif name.lower() == "lithops": from rechunker.executors.lithops import LithopsPipelineExecutor - class LithopsCopySpecExecutor(LithopsPipelineExecutor, CopySpecToPipelinesMixin): + class LithopsCopySpecExecutor( + LithopsPipelineExecutor, CopySpecToPipelinesMixin + ): pass return LithopsCopySpecExecutor() diff --git a/rechunker/executors/lithops.py b/rechunker/executors/lithops.py index 09603d1..f583615 100644 --- a/rechunker/executors/lithops.py +++ b/rechunker/executors/lithops.py @@ -6,6 +6,7 @@ Task = Callable[[], None] + class LithopsPipelineExecutor(PipelineExecutor[Task]): """An execution engine based on Lithops framework.""" @@ -24,19 +25,18 @@ def _prepare_input_data(): iterdata.append((None, stage.function, pipeline.config)) return iterdata - def plan(config): + def plan(config): iterdata = _prepare_input_data() if config is None: fexec = lithops.FunctionExecutor(config=config) else: fexec = lithops.FunctionExecutor() - + fexec.map(map_function, iterdata[0]) fexec.get_result() return plan - def execute_plan(self, plan: Task, config = None, **kwargs): + def execute_plan(self, plan: Task, config=None, **kwargs): plan(config) - \ No newline at end of file From f4d8569ad991d3a4c70c9c1bc136721d81df189e Mon Sep 17 00:00:00 2001 From: Daniel Alejandro Coll Tejeda <62675074+macarronesc@users.noreply.github.com> Date: Fri, 28 Jun 2024 11:33:20 +0000 Subject: [PATCH 4/6] Fixed some LithopsExecutor issues --- rechunker/executors/lithops.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rechunker/executors/lithops.py b/rechunker/executors/lithops.py index f583615..8c6bfd4 100644 --- a/rechunker/executors/lithops.py +++ b/rechunker/executors/lithops.py @@ -1,10 +1,10 @@ -from typing import Callable +from typing import Callable, Any import lithops from rechunker.types import ParallelPipelines, PipelineExecutor -Task = Callable[[], None] +Task = Callable[[Any], None] class LithopsPipelineExecutor(PipelineExecutor[Task]): From 827a3e21ec905c4a2e9eb12b7044e447e816d243 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 28 Jun 2024 11:35:40 +0000 Subject: [PATCH 5/6] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- rechunker/executors/lithops.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rechunker/executors/lithops.py b/rechunker/executors/lithops.py index 8c6bfd4..f2afc30 100644 --- a/rechunker/executors/lithops.py +++ b/rechunker/executors/lithops.py @@ -1,4 +1,4 @@ -from typing import Callable, Any +from typing import Any, Callable import lithops From 0e507de4f95d1bcf379adef201dd79b29316c479 Mon Sep 17 00:00:00 2001 From: Daniel Alejandro Coll Tejeda <62675074+macarronesc@users.noreply.github.com> Date: Mon, 1 Jul 2024 07:32:22 +0000 Subject: [PATCH 6/6] Fixed LithopsExecutor mistake --- rechunker/executors/lithops.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rechunker/executors/lithops.py b/rechunker/executors/lithops.py index f2afc30..b181734 100644 --- a/rechunker/executors/lithops.py +++ b/rechunker/executors/lithops.py @@ -28,7 +28,7 @@ def _prepare_input_data(): def plan(config): iterdata = _prepare_input_data() - if config is None: + if config is not None: fexec = lithops.FunctionExecutor(config=config) else: fexec = lithops.FunctionExecutor()