Skip to content

Commit

Permalink
FEAT-#6831: Implement read_parquet_glob
Browse files Browse the repository at this point in the history
Signed-off-by: Anatoly Myachev <[email protected]>
  • Loading branch information
anmyachev committed Jan 11, 2024
1 parent 31f8bd0 commit 2757fb7
Show file tree
Hide file tree
Showing 15 changed files with 228 additions and 41 deletions.
17 changes: 14 additions & 3 deletions modin/core/execution/dask/implementations/pandas_on_dask/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,13 @@
from modin.experimental.core.io import (
ExperimentalCSVGlobDispatcher,
ExperimentalCustomTextDispatcher,
ExperimentalPickleDispatcher,
ExperimentalGlobDispatcher,
ExperimentalSQLDispatcher,
)
from modin.experimental.core.storage_formats.pandas.parsers import (
ExperimentalCustomTextParser,
ExperimentalPandasCSVGlobParser,
ExperimentalPandasParquetParser,
ExperimentalPandasPickleParser,
)

Expand Down Expand Up @@ -89,10 +90,20 @@ def __make_write(*classes, build_args=build_args):
read_csv_glob = __make_read(
ExperimentalPandasCSVGlobParser, ExperimentalCSVGlobDispatcher
)
read_parquet_glob = __make_read(
ExperimentalPandasParquetParser, ExperimentalGlobDispatcher
)
to_parquet_glob = __make_write(
ExperimentalGlobDispatcher,
build_args={**build_args, "base_write": BaseIO.to_parquet},
)
read_pickle_distributed = __make_read(
ExperimentalPandasPickleParser, ExperimentalPickleDispatcher
ExperimentalPandasPickleParser, ExperimentalGlobDispatcher
)
to_pickle_distributed = __make_write(
ExperimentalGlobDispatcher,
build_args={**build_args, "base_write": BaseIO.to_pickle},
)
to_pickle_distributed = __make_write(ExperimentalPickleDispatcher)
read_custom_text = __make_read(
ExperimentalCustomTextParser, ExperimentalCustomTextDispatcher
)
Expand Down
10 changes: 10 additions & 0 deletions modin/core/execution/dispatching/factories/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,16 @@ def to_pickle(cls, *args, **kwargs):
def to_pickle_distributed(cls, *args, **kwargs):
return cls.get_factory()._to_pickle_distributed(*args, **kwargs)

@classmethod
# @_inherit_docstrings(factories.PandasOnRayFactory._read_parquet_glob)
def read_parquet_glob(cls, *args, **kwargs):
return cls.get_factory()._read_parquet_glob(*args, **kwargs)

@classmethod
# @_inherit_docstrings(factories.PandasOnRayFactory._to_parquet_glob)
def to_parquet_glob(cls, *args, **kwargs):
return cls.get_factory()._to_parquet_glob(*args, **kwargs)

@classmethod
@_inherit_docstrings(factories.PandasOnRayFactory._read_custom_text)
def read_custom_text(cls, **kwargs):
Expand Down
22 changes: 22 additions & 0 deletions modin/core/execution/dispatching/factories/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,28 @@ def _to_pickle_distributed(cls, *args, **kwargs):
)
return cls.io_cls.to_pickle_distributed(*args, **kwargs)

@classmethod
# @_inherit_docstrings(factories.PandasOnRayFactory._read_parquet_glob)
def _read_parquet_glob(cls, *args, **kwargs):
# TODO: add docstring
current_execution = get_current_execution()
if current_execution not in supported_executions:
raise NotImplementedError(
f"`_read_parquet_glob()` is not implemented for {current_execution} execution."
)
return cls.io_cls.read_parquet_glob(*args, **kwargs)

@classmethod
# @_inherit_docstrings(factories.PandasOnRayFactory._to_parquet_glob)
def _to_parquet_glob(cls, *args, **kwargs):
# TODO: add docstring
current_execution = get_current_execution()
if current_execution not in supported_executions:
raise NotImplementedError(
f"`_to_parquet_glob()` is not implemented for {current_execution} execution."
)
return cls.io_cls.to_parquet_glob(*args, **kwargs)


@doc(_doc_factory_class, execution_name="PandasOnRay")
class PandasOnRayFactory(BaseFactory):
Expand Down
17 changes: 14 additions & 3 deletions modin/core/execution/ray/implementations/pandas_on_ray/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@
from modin.experimental.core.io import (
ExperimentalCSVGlobDispatcher,
ExperimentalCustomTextDispatcher,
ExperimentalPickleDispatcher,
ExperimentalGlobDispatcher,
ExperimentalSQLDispatcher,
)
from modin.experimental.core.storage_formats.pandas.parsers import (
ExperimentalCustomTextParser,
ExperimentalPandasCSVGlobParser,
ExperimentalPandasParquetParser,
ExperimentalPandasPickleParser,
)

Expand Down Expand Up @@ -91,10 +92,20 @@ def __make_write(*classes, build_args=build_args):
read_csv_glob = __make_read(
ExperimentalPandasCSVGlobParser, ExperimentalCSVGlobDispatcher
)
read_parquet_glob = __make_read(
ExperimentalPandasParquetParser, ExperimentalGlobDispatcher
)
to_parquet_glob = __make_write(
ExperimentalGlobDispatcher,
build_args={**build_args, "base_write": RayIO.to_parquet},
)
read_pickle_distributed = __make_read(
ExperimentalPandasPickleParser, ExperimentalPickleDispatcher
ExperimentalPandasPickleParser, ExperimentalGlobDispatcher
)
to_pickle_distributed = __make_write(
ExperimentalGlobDispatcher,
build_args={**build_args, "base_write": RayIO.to_pickle},
)
to_pickle_distributed = __make_write(ExperimentalPickleDispatcher)
read_custom_text = __make_read(
ExperimentalCustomTextParser, ExperimentalCustomTextDispatcher
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@
from modin.experimental.core.io import (
ExperimentalCSVGlobDispatcher,
ExperimentalCustomTextDispatcher,
ExperimentalPickleDispatcher,
ExperimentalGlobDispatcher,
ExperimentalSQLDispatcher,
)
from modin.experimental.core.storage_formats.pandas.parsers import (
ExperimentalCustomTextParser,
ExperimentalPandasCSVGlobParser,
ExperimentalPandasParquetParser,
ExperimentalPandasPickleParser,
)

Expand Down Expand Up @@ -91,10 +92,20 @@ def __make_write(*classes, build_args=build_args):
read_csv_glob = __make_read(
ExperimentalPandasCSVGlobParser, ExperimentalCSVGlobDispatcher
)
read_parquet_glob = __make_read(
ExperimentalPandasParquetParser, ExperimentalGlobDispatcher
)
to_parquet_glob = __make_write(
ExperimentalGlobDispatcher,
build_args={**build_args, "base_write": UnidistIO.to_parquet},
)
read_pickle_distributed = __make_read(
ExperimentalPandasPickleParser, ExperimentalPickleDispatcher
ExperimentalPandasPickleParser, ExperimentalGlobDispatcher
)
to_pickle_distributed = __make_write(
ExperimentalGlobDispatcher,
build_args={**build_args, "base_write": UnidistIO.to_pickle},
)
to_pickle_distributed = __make_write(ExperimentalPickleDispatcher)
read_custom_text = __make_read(
ExperimentalCustomTextParser, ExperimentalCustomTextDispatcher
)
Expand Down
4 changes: 2 additions & 2 deletions modin/core/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ def to_csv(cls, obj, **kwargs): # noqa: PR01
@_inherit_docstrings(
pandas.DataFrame.to_parquet, apilink="pandas.DataFrame.to_parquet"
)
def to_parquet(cls, obj, **kwargs): # noqa: PR01
def to_parquet(cls, obj, path, **kwargs): # noqa: PR01
"""
Write object to the binary parquet format using pandas.
Expand All @@ -662,4 +662,4 @@ def to_parquet(cls, obj, **kwargs): # noqa: PR01
if isinstance(obj, BaseQueryCompiler):
obj = obj.to_pandas()

return obj.to_parquet(**kwargs)
return obj.to_parquet(path, **kwargs)
5 changes: 3 additions & 2 deletions modin/experimental/core/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@

"""Experimental IO functions implementations."""

from .pickle.pickle_dispatcher import ExperimentalPickleDispatcher
from .glob.glob_dispatcher import ExperimentalGlobDispatcher
from .sql.sql_dispatcher import ExperimentalSQLDispatcher
from .text.csv_glob_dispatcher import ExperimentalCSVGlobDispatcher
from .text.custom_text_dispatcher import ExperimentalCustomTextDispatcher

__all__ = [
"ExperimentalCSVGlobDispatcher",
"ExperimentalSQLDispatcher",
"ExperimentalPickleDispatcher",
"ExperimentalGlobDispatcher",
"ExperimentalGlobDispatcher",
"ExperimentalCustomTextDispatcher",
]
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

"""Experimental Pickle format type IO functions implementations."""
"""Experimental Glob format type IO functions implementations."""
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

"""Module houses ``ExperimentalPickleDispatcher`` class that is used for reading `.pkl` files."""
"""Module houses ``ExperimentalGlobDispatcher`` class that is used to read files of different formats in parallel."""

import glob
import warnings
Expand All @@ -24,20 +24,20 @@
from modin.core.storage_formats.pandas.query_compiler import PandasQueryCompiler


class ExperimentalPickleDispatcher(FileDispatcher):
"""Class handles utils for reading pickle files."""
class ExperimentalGlobDispatcher(FileDispatcher):
"""Class implements reading different formats, parallelizing by the number of files."""

@classmethod
def _read(cls, filepath_or_buffer, **kwargs):
def _read(cls, **kwargs):
"""
Read data from `filepath_or_buffer` according to `kwargs` parameters.
Parameters
----------
filepath_or_buffer : str, path object or file-like object
`filepath_or_buffer` parameter of `read_pickle` function.
`filepath_or_buffer` parameter of `read_*` function.
**kwargs : dict
Parameters of `read_pickle` function.
Parameters of `read_*` function.
Returns
-------
Expand All @@ -46,10 +46,10 @@ def _read(cls, filepath_or_buffer, **kwargs):
Notes
-----
In experimental mode, we can use `*` in the filename.
The number of partitions is equal to the number of input files.
"""
path_key = "filepath_or_buffer" if "filepath_or_buffer" in kwargs else "path"
filepath_or_buffer = kwargs.pop(path_key)
filepath_or_buffer = stringify_path(filepath_or_buffer)
if not (isinstance(filepath_or_buffer, str) and "*" in filepath_or_buffer):
return cls.single_worker_read(
Expand Down Expand Up @@ -104,37 +104,31 @@ def write(cls, qc, **kwargs):
- if `*` is in the filename, then it will be replaced by the ascending sequence 0, 1, 2, …
- if `*` is not in the filename, then the default implementation will be used.
Example: 4 partitions and input filename="partition*.pkl.gz", then filenames will be:
`partition0.pkl.gz`, `partition1.pkl.gz`, `partition2.pkl.gz`, `partition3.pkl.gz`.
Parameters
----------
qc : BaseQueryCompiler
The query compiler of the Modin dataframe that we want
to run ``to_pickle_distributed`` on.
to run ``to_[format]_glob`` on.
**kwargs : dict
Parameters for ``pandas.to_pickle(**kwargs)``.
Parameters for ``pandas.to_[format](**kwargs)``.
"""
kwargs["filepath_or_buffer"] = stringify_path(kwargs["filepath_or_buffer"])
path_key = "filepath_or_buffer" if "filepath_or_buffer" in kwargs else "path"
filepath_or_buffer = kwargs.pop(path_key)
filepath_or_buffer = stringify_path(filepath_or_buffer)
if not (
isinstance(kwargs["filepath_or_buffer"], str)
and "*" in kwargs["filepath_or_buffer"]
isinstance(filepath_or_buffer, str) and "*" in filepath_or_buffer
) or not isinstance(qc, PandasQueryCompiler):
warnings.warn("Defaulting to Modin core implementation")
cls.base_io.to_pickle(qc, **kwargs)
cls.base_write(qc, filepath_or_buffer, **kwargs)
return

# just to try
write_func_name = cls.base_write.__name__

def func(df, **kw): # pragma: no cover
idx = str(kw["partition_idx"])
# dask doesn't make a copy of kwargs on serialization;
# so take a copy ourselves, otherwise the error is:
# kwargs["path"] = kwargs.pop("filepath_or_buffer").replace("*", idx)
# KeyError: 'filepath_or_buffer'
dask_kwargs = dict(kwargs)
dask_kwargs["path"] = dask_kwargs.pop("filepath_or_buffer").replace(
"*", idx
)
df.to_pickle(**dask_kwargs)
path = filepath_or_buffer.replace("*", idx)
getattr(df, write_func_name)(path, **kwargs)
return pandas.DataFrame()

result = qc._modin_frame.apply_full_axis(
Expand Down
18 changes: 18 additions & 0 deletions modin/experimental/core/storage_formats/pandas/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,24 @@ def parse(fname, **kwargs):
return _split_result_for_readers(1, num_splits, df) + [length, width]


@doc(_doc_pandas_parser_class, data_type="parquet files")
class ExperimentalPandasParquetParser(PandasParser):
@staticmethod
@doc(_doc_parse_func, parameters=_doc_parse_parameters_common)
def parse(fname, **kwargs):
warnings.filterwarnings("ignore")
num_splits = 1
single_worker_read = kwargs.pop("single_worker_read", None)
df = pandas.read_parquet(fname, **kwargs)
if single_worker_read:
return df

length = len(df)
width = len(df.columns)

return _split_result_for_readers(1, num_splits, df) + [length, width]


@doc(_doc_pandas_parser_class, data_type="custom text")
class ExperimentalCustomTextParser(PandasParser):
@staticmethod
Expand Down
1 change: 1 addition & 0 deletions modin/experimental/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from .io import ( # noqa F401
read_csv_glob,
read_custom_text,
read_parquet_glob,
read_pickle_distributed,
read_sql,
to_pickle_distributed,
Expand Down
Loading

0 comments on commit 2757fb7

Please sign in to comment.