Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Lithops executor #152

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ Rechunking plans can be executed on a variety of backends. The following table l
rechunker.executors.prefect.PrefectExecutor
rechunker.executors.python.PythonExecutor
rechunker.executors.pywren.PywrenExecutor
rechunker.executors.lithops.LithopsExecutor

.. autoclass:: rechunker.executors.beam.BeamExecutor
.. autoclass:: rechunker.executors.dask.DaskExecutor
.. autoclass:: rechunker.executors.prefect.PrefectExecutor
.. autoclass:: rechunker.executors.python.PythonExecutor
.. autoclass:: rechunker.executors.pywren.PywrenExecutor
.. autoclass:: rechunker.executors.lithops.LithopsExecutor
11 changes: 10 additions & 1 deletion rechunker/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,15 @@ class PythonCopySpecExecutor(PythonPipelineExecutor, CopySpecToPipelinesMixin):
pass

return PythonCopySpecExecutor()
elif name.lower() == "lithops":
from rechunker.executors.lithops import LithopsPipelineExecutor

class LithopsCopySpecExecutor(
LithopsPipelineExecutor, CopySpecToPipelinesMixin
):
pass

return LithopsCopySpecExecutor()
elif name.lower() == "pywren":
from rechunker.executors.pywren import PywrenExecutor

Expand Down Expand Up @@ -609,7 +618,7 @@ def _setup_array_rechunk(
except AttributeError:
pass

if read_chunks == write_chunks or read_chunks == int_chunks:
if read_chunks == write_chunks:
int_array = None
else:
# do intermediate store
Expand Down
7 changes: 7 additions & 0 deletions rechunker/executors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,10 @@
__all__.append("PrefectPipelineExecutor")
except ImportError:
pass

try:
from .lithops import LithopsPipelineExecutor

__all__.append("LithopsPipelineExecutor")
except ImportError:
pass
42 changes: 42 additions & 0 deletions rechunker/executors/lithops.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from typing import Any, Callable

import lithops

from rechunker.types import ParallelPipelines, PipelineExecutor

Task = Callable[[Any], None]


class LithopsPipelineExecutor(PipelineExecutor[Task]):
"""An execution engine based on Lithops framework."""

def pipelines_to_plan(self, pipelines: ParallelPipelines) -> Task:
def map_function(stage, function, config):
function(stage, config=config) if stage else function(config=config)

def _prepare_input_data():
iterdata = []
for pipeline in pipelines:
for stage in pipeline.stages:
if stage.mappable is not None:
for m in stage.mappable:
iterdata.append((m, stage.function, pipeline.config))
else:
iterdata.append((None, stage.function, pipeline.config))
return iterdata

def plan(config):
iterdata = _prepare_input_data()

if config is not None:
fexec = lithops.FunctionExecutor(config=config)
else:
fexec = lithops.FunctionExecutor()

fexec.map(map_function, iterdata[0])
fexec.get_result()

return plan

def execute_plan(self, plan: Task, config=None, **kwargs):
plan(config)