From ffee6e08ba9e2f5dc642c7a5483a39c8c914429e Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Thu, 11 Jan 2024 01:02:52 +0100 Subject: [PATCH 1/7] FEAT-#6831: Implement read_parquet_glob Signed-off-by: Anatoly Myachev --- .../implementations/pandas_on_dask/io/io.py | 17 +++++- .../dispatching/factories/dispatcher.py | 10 +++ .../dispatching/factories/factories.py | 22 +++++++ .../implementations/pandas_on_ray/io/io.py | 17 +++++- .../pandas_on_unidist/io/io.py | 17 +++++- modin/core/io/io.py | 4 +- modin/experimental/core/io/__init__.py | 5 +- .../core/io/{pickle => glob}/__init__.py | 2 +- .../glob_dispatcher.py} | 46 ++++++-------- .../core/storage_formats/pandas/parsers.py | 18 ++++++ modin/experimental/pandas/__init__.py | 1 + modin/experimental/pandas/io.py | 61 +++++++++++++++++++ modin/experimental/pandas/test/test_io_exp.py | 27 ++++++++ modin/pandas/accessor.py | 24 ++++++++ modin/pandas/dataframe.py | 2 +- 15 files changed, 232 insertions(+), 41 deletions(-) rename modin/experimental/core/io/{pickle => glob}/__init__.py (92%) rename modin/experimental/core/io/{pickle/pickle_dispatcher.py => glob/glob_dispatcher.py} (75%) diff --git a/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py b/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py index e5242458b65..4c51ebe395d 100644 --- a/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py +++ b/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py @@ -43,12 +43,13 @@ from modin.experimental.core.io import ( ExperimentalCSVGlobDispatcher, ExperimentalCustomTextDispatcher, - ExperimentalPickleDispatcher, + ExperimentalGlobDispatcher, ExperimentalSQLDispatcher, ) from modin.experimental.core.storage_formats.pandas.parsers import ( ExperimentalCustomTextParser, ExperimentalPandasCSVGlobParser, + ExperimentalPandasParquetParser, ExperimentalPandasPickleParser, ) @@ -89,10 +90,20 @@ def __make_write(*classes, build_args=build_args): read_csv_glob = __make_read( ExperimentalPandasCSVGlobParser, ExperimentalCSVGlobDispatcher ) + read_parquet_glob = __make_read( + ExperimentalPandasParquetParser, ExperimentalGlobDispatcher + ) + to_parquet_glob = __make_write( + ExperimentalGlobDispatcher, + build_args={**build_args, "base_write": BaseIO.to_parquet}, + ) read_pickle_distributed = __make_read( - ExperimentalPandasPickleParser, ExperimentalPickleDispatcher + ExperimentalPandasPickleParser, ExperimentalGlobDispatcher + ) + to_pickle_distributed = __make_write( + ExperimentalGlobDispatcher, + build_args={**build_args, "base_write": BaseIO.to_pickle}, ) - to_pickle_distributed = __make_write(ExperimentalPickleDispatcher) read_custom_text = __make_read( ExperimentalCustomTextParser, ExperimentalCustomTextDispatcher ) diff --git a/modin/core/execution/dispatching/factories/dispatcher.py b/modin/core/execution/dispatching/factories/dispatcher.py index 5cdbf65f821..c4d119921b5 100644 --- a/modin/core/execution/dispatching/factories/dispatcher.py +++ b/modin/core/execution/dispatching/factories/dispatcher.py @@ -296,6 +296,16 @@ def to_pickle(cls, *args, **kwargs): def to_pickle_distributed(cls, *args, **kwargs): return cls.get_factory()._to_pickle_distributed(*args, **kwargs) + @classmethod + # @_inherit_docstrings(factories.PandasOnRayFactory._read_parquet_glob) + def read_parquet_glob(cls, *args, **kwargs): + return cls.get_factory()._read_parquet_glob(*args, **kwargs) + + @classmethod + # @_inherit_docstrings(factories.PandasOnRayFactory._to_parquet_glob) + def to_parquet_glob(cls, *args, **kwargs): + return cls.get_factory()._to_parquet_glob(*args, **kwargs) + @classmethod @_inherit_docstrings(factories.PandasOnRayFactory._read_custom_text) def read_custom_text(cls, **kwargs): diff --git a/modin/core/execution/dispatching/factories/factories.py b/modin/core/execution/dispatching/factories/factories.py index f1bc05539c2..4d8eea02728 100644 --- a/modin/core/execution/dispatching/factories/factories.py +++ b/modin/core/execution/dispatching/factories/factories.py @@ -516,6 +516,28 @@ def _to_pickle_distributed(cls, *args, **kwargs): ) return cls.io_cls.to_pickle_distributed(*args, **kwargs) + @classmethod + # @_inherit_docstrings(factories.PandasOnRayFactory._read_parquet_glob) + def _read_parquet_glob(cls, *args, **kwargs): + # TODO: add docstring + current_execution = get_current_execution() + if current_execution not in supported_executions: + raise NotImplementedError( + f"`_read_parquet_glob()` is not implemented for {current_execution} execution." + ) + return cls.io_cls.read_parquet_glob(*args, **kwargs) + + @classmethod + # @_inherit_docstrings(factories.PandasOnRayFactory._to_parquet_glob) + def _to_parquet_glob(cls, *args, **kwargs): + # TODO: add docstring + current_execution = get_current_execution() + if current_execution not in supported_executions: + raise NotImplementedError( + f"`_to_parquet_glob()` is not implemented for {current_execution} execution." + ) + return cls.io_cls.to_parquet_glob(*args, **kwargs) + @doc(_doc_factory_class, execution_name="PandasOnRay") class PandasOnRayFactory(BaseFactory): diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py index cca429a526b..4a90cc54025 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py @@ -42,12 +42,13 @@ from modin.experimental.core.io import ( ExperimentalCSVGlobDispatcher, ExperimentalCustomTextDispatcher, - ExperimentalPickleDispatcher, + ExperimentalGlobDispatcher, ExperimentalSQLDispatcher, ) from modin.experimental.core.storage_formats.pandas.parsers import ( ExperimentalCustomTextParser, ExperimentalPandasCSVGlobParser, + ExperimentalPandasParquetParser, ExperimentalPandasPickleParser, ) @@ -91,10 +92,20 @@ def __make_write(*classes, build_args=build_args): read_csv_glob = __make_read( ExperimentalPandasCSVGlobParser, ExperimentalCSVGlobDispatcher ) + read_parquet_glob = __make_read( + ExperimentalPandasParquetParser, ExperimentalGlobDispatcher + ) + to_parquet_glob = __make_write( + ExperimentalGlobDispatcher, + build_args={**build_args, "base_write": RayIO.to_parquet}, + ) read_pickle_distributed = __make_read( - ExperimentalPandasPickleParser, ExperimentalPickleDispatcher + ExperimentalPandasPickleParser, ExperimentalGlobDispatcher + ) + to_pickle_distributed = __make_write( + ExperimentalGlobDispatcher, + build_args={**build_args, "base_write": RayIO.to_pickle}, ) - to_pickle_distributed = __make_write(ExperimentalPickleDispatcher) read_custom_text = __make_read( ExperimentalCustomTextParser, ExperimentalCustomTextDispatcher ) diff --git a/modin/core/execution/unidist/implementations/pandas_on_unidist/io/io.py b/modin/core/execution/unidist/implementations/pandas_on_unidist/io/io.py index 12e053252f5..4311acecbfa 100644 --- a/modin/core/execution/unidist/implementations/pandas_on_unidist/io/io.py +++ b/modin/core/execution/unidist/implementations/pandas_on_unidist/io/io.py @@ -42,12 +42,13 @@ from modin.experimental.core.io import ( ExperimentalCSVGlobDispatcher, ExperimentalCustomTextDispatcher, - ExperimentalPickleDispatcher, + ExperimentalGlobDispatcher, ExperimentalSQLDispatcher, ) from modin.experimental.core.storage_formats.pandas.parsers import ( ExperimentalCustomTextParser, ExperimentalPandasCSVGlobParser, + ExperimentalPandasParquetParser, ExperimentalPandasPickleParser, ) @@ -91,10 +92,20 @@ def __make_write(*classes, build_args=build_args): read_csv_glob = __make_read( ExperimentalPandasCSVGlobParser, ExperimentalCSVGlobDispatcher ) + read_parquet_glob = __make_read( + ExperimentalPandasParquetParser, ExperimentalGlobDispatcher + ) + to_parquet_glob = __make_write( + ExperimentalGlobDispatcher, + build_args={**build_args, "base_write": UnidistIO.to_parquet}, + ) read_pickle_distributed = __make_read( - ExperimentalPandasPickleParser, ExperimentalPickleDispatcher + ExperimentalPandasPickleParser, ExperimentalGlobDispatcher + ) + to_pickle_distributed = __make_write( + ExperimentalGlobDispatcher, + build_args={**build_args, "base_write": UnidistIO.to_pickle}, ) - to_pickle_distributed = __make_write(ExperimentalPickleDispatcher) read_custom_text = __make_read( ExperimentalCustomTextParser, ExperimentalCustomTextDispatcher ) diff --git a/modin/core/io/io.py b/modin/core/io/io.py index a024d0c6c0e..4cc25f81e12 100644 --- a/modin/core/io/io.py +++ b/modin/core/io/io.py @@ -652,7 +652,7 @@ def to_csv(cls, obj, **kwargs): # noqa: PR01 @_inherit_docstrings( pandas.DataFrame.to_parquet, apilink="pandas.DataFrame.to_parquet" ) - def to_parquet(cls, obj, **kwargs): # noqa: PR01 + def to_parquet(cls, obj, path, **kwargs): # noqa: PR01 """ Write object to the binary parquet format using pandas. @@ -662,4 +662,4 @@ def to_parquet(cls, obj, **kwargs): # noqa: PR01 if isinstance(obj, BaseQueryCompiler): obj = obj.to_pandas() - return obj.to_parquet(**kwargs) + return obj.to_parquet(path, **kwargs) diff --git a/modin/experimental/core/io/__init__.py b/modin/experimental/core/io/__init__.py index 9f3c64b00a8..e1a19b0d54b 100644 --- a/modin/experimental/core/io/__init__.py +++ b/modin/experimental/core/io/__init__.py @@ -13,7 +13,7 @@ """Experimental IO functions implementations.""" -from .pickle.pickle_dispatcher import ExperimentalPickleDispatcher +from .glob.glob_dispatcher import ExperimentalGlobDispatcher from .sql.sql_dispatcher import ExperimentalSQLDispatcher from .text.csv_glob_dispatcher import ExperimentalCSVGlobDispatcher from .text.custom_text_dispatcher import ExperimentalCustomTextDispatcher @@ -21,6 +21,7 @@ __all__ = [ "ExperimentalCSVGlobDispatcher", "ExperimentalSQLDispatcher", - "ExperimentalPickleDispatcher", + "ExperimentalGlobDispatcher", + "ExperimentalGlobDispatcher", "ExperimentalCustomTextDispatcher", ] diff --git a/modin/experimental/core/io/pickle/__init__.py b/modin/experimental/core/io/glob/__init__.py similarity index 92% rename from modin/experimental/core/io/pickle/__init__.py rename to modin/experimental/core/io/glob/__init__.py index 39b28ecda49..208c4ae4f71 100644 --- a/modin/experimental/core/io/pickle/__init__.py +++ b/modin/experimental/core/io/glob/__init__.py @@ -11,4 +11,4 @@ # ANY KIND, either express or implied. See the License for the specific language # governing permissions and limitations under the License. -"""Experimental Pickle format type IO functions implementations.""" +"""Experimental Glob format type IO functions implementations.""" diff --git a/modin/experimental/core/io/pickle/pickle_dispatcher.py b/modin/experimental/core/io/glob/glob_dispatcher.py similarity index 75% rename from modin/experimental/core/io/pickle/pickle_dispatcher.py rename to modin/experimental/core/io/glob/glob_dispatcher.py index 119171e061c..7d003c939e4 100644 --- a/modin/experimental/core/io/pickle/pickle_dispatcher.py +++ b/modin/experimental/core/io/glob/glob_dispatcher.py @@ -11,7 +11,7 @@ # ANY KIND, either express or implied. See the License for the specific language # governing permissions and limitations under the License. -"""Module houses ``ExperimentalPickleDispatcher`` class that is used for reading `.pkl` files.""" +"""Module houses ``ExperimentalGlobDispatcher`` class that is used to read files of different formats in parallel.""" import glob import warnings @@ -24,20 +24,20 @@ from modin.core.storage_formats.pandas.query_compiler import PandasQueryCompiler -class ExperimentalPickleDispatcher(FileDispatcher): - """Class handles utils for reading pickle files.""" +class ExperimentalGlobDispatcher(FileDispatcher): + """Class implements reading different formats, parallelizing by the number of files.""" @classmethod - def _read(cls, filepath_or_buffer, **kwargs): + def _read(cls, **kwargs): """ Read data from `filepath_or_buffer` according to `kwargs` parameters. Parameters ---------- filepath_or_buffer : str, path object or file-like object - `filepath_or_buffer` parameter of `read_pickle` function. + `filepath_or_buffer` parameter of `read_*` function. **kwargs : dict - Parameters of `read_pickle` function. + Parameters of `read_*` function. Returns ------- @@ -46,10 +46,10 @@ def _read(cls, filepath_or_buffer, **kwargs): Notes ----- - In experimental mode, we can use `*` in the filename. - The number of partitions is equal to the number of input files. """ + path_key = "filepath_or_buffer" if "filepath_or_buffer" in kwargs else "path" + filepath_or_buffer = kwargs.pop(path_key) filepath_or_buffer = stringify_path(filepath_or_buffer) if not (isinstance(filepath_or_buffer, str) and "*" in filepath_or_buffer): return cls.single_worker_read( @@ -104,37 +104,31 @@ def write(cls, qc, **kwargs): - if `*` is in the filename, then it will be replaced by the ascending sequence 0, 1, 2, … - if `*` is not in the filename, then the default implementation will be used. - Example: 4 partitions and input filename="partition*.pkl.gz", then filenames will be: - `partition0.pkl.gz`, `partition1.pkl.gz`, `partition2.pkl.gz`, `partition3.pkl.gz`. - Parameters ---------- qc : BaseQueryCompiler The query compiler of the Modin dataframe that we want - to run ``to_pickle_distributed`` on. + to run ``to_[format]_glob`` on. **kwargs : dict - Parameters for ``pandas.to_pickle(**kwargs)``. + Parameters for ``pandas.to_[format](**kwargs)``. """ - kwargs["filepath_or_buffer"] = stringify_path(kwargs["filepath_or_buffer"]) + path_key = "filepath_or_buffer" if "filepath_or_buffer" in kwargs else "path" + filepath_or_buffer = kwargs.pop(path_key) + filepath_or_buffer = stringify_path(filepath_or_buffer) if not ( - isinstance(kwargs["filepath_or_buffer"], str) - and "*" in kwargs["filepath_or_buffer"] + isinstance(filepath_or_buffer, str) and "*" in filepath_or_buffer ) or not isinstance(qc, PandasQueryCompiler): warnings.warn("Defaulting to Modin core implementation") - cls.base_io.to_pickle(qc, **kwargs) + cls.base_write(qc, filepath_or_buffer, **kwargs) return + # just to try + write_func_name = cls.base_write.__name__ + def func(df, **kw): # pragma: no cover idx = str(kw["partition_idx"]) - # dask doesn't make a copy of kwargs on serialization; - # so take a copy ourselves, otherwise the error is: - # kwargs["path"] = kwargs.pop("filepath_or_buffer").replace("*", idx) - # KeyError: 'filepath_or_buffer' - dask_kwargs = dict(kwargs) - dask_kwargs["path"] = dask_kwargs.pop("filepath_or_buffer").replace( - "*", idx - ) - df.to_pickle(**dask_kwargs) + path = filepath_or_buffer.replace("*", idx) + getattr(df, write_func_name)(path, **kwargs) return pandas.DataFrame() result = qc._modin_frame.apply_full_axis( diff --git a/modin/experimental/core/storage_formats/pandas/parsers.py b/modin/experimental/core/storage_formats/pandas/parsers.py index bf09fc99ebc..be2ec01489a 100644 --- a/modin/experimental/core/storage_formats/pandas/parsers.py +++ b/modin/experimental/core/storage_formats/pandas/parsers.py @@ -114,6 +114,24 @@ def parse(fname, **kwargs): return _split_result_for_readers(1, num_splits, df) + [length, width] +@doc(_doc_pandas_parser_class, data_type="parquet files") +class ExperimentalPandasParquetParser(PandasParser): + @staticmethod + @doc(_doc_parse_func, parameters=_doc_parse_parameters_common) + def parse(fname, **kwargs): + warnings.filterwarnings("ignore") + num_splits = 1 + single_worker_read = kwargs.pop("single_worker_read", None) + df = pandas.read_parquet(fname, **kwargs) + if single_worker_read: + return df + + length = len(df) + width = len(df.columns) + + return _split_result_for_readers(1, num_splits, df) + [length, width] + + @doc(_doc_pandas_parser_class, data_type="custom text") class ExperimentalCustomTextParser(PandasParser): @staticmethod diff --git a/modin/experimental/pandas/__init__.py b/modin/experimental/pandas/__init__.py index 7516563d41c..4c484fddb89 100644 --- a/modin/experimental/pandas/__init__.py +++ b/modin/experimental/pandas/__init__.py @@ -40,6 +40,7 @@ from .io import ( # noqa F401 read_csv_glob, read_custom_text, + read_parquet_glob, read_pickle_distributed, read_sql, to_pickle_distributed, diff --git a/modin/experimental/pandas/io.py b/modin/experimental/pandas/io.py index 4d5959366f2..3f84b908211 100644 --- a/modin/experimental/pandas/io.py +++ b/modin/experimental/pandas/io.py @@ -13,6 +13,8 @@ """Implement experimental I/O public API.""" +from __future__ import annotations + import inspect import pathlib import pickle @@ -397,3 +399,62 @@ def to_pickle_distributed( protocol=protocol, storage_options=storage_options, ) + + +@expanduser_path_arg("path") +def read_parquet_glob( + path, + engine: str = "auto", + columns: list[str] | None = None, + storage_options: StorageOptions = None, + use_nullable_dtypes: bool = lib.no_default, + dtype_backend=lib.no_default, + filesystem=None, + filters=None, + **kwargs, +): + # TODO: add docstring + from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher + + return DataFrame( + query_compiler=FactoryDispatcher.read_parquet_glob( + path=path, + engine=engine, + columns=columns, + storage_options=storage_options, + use_nullable_dtypes=use_nullable_dtypes, + dtype_backend=dtype_backend, + filesystem=filesystem, + filters=filters, + **kwargs, + ) + ) + + +@expanduser_path_arg("path") +def to_parquet_glob( + self, + path=None, + engine="auto", + compression="snappy", + index=None, + partition_cols=None, + storage_options: StorageOptions = None, + **kwargs, +): + # TODO: add docstring + obj = self + from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher + + if isinstance(self, DataFrame): + obj = self._query_compiler + FactoryDispatcher.to_parquet_glob( + obj, + path=path, + engine=engine, + compression=compression, + index=index, + partition_cols=partition_cols, + storage_options=storage_options, + **kwargs, + ) diff --git a/modin/experimental/pandas/test/test_io_exp.py b/modin/experimental/pandas/test/test_io_exp.py index 1b9bccd7efd..6acb5703a91 100644 --- a/modin/experimental/pandas/test/test_io_exp.py +++ b/modin/experimental/pandas/test/test_io_exp.py @@ -272,6 +272,33 @@ def test_distributed_pickling(filename, compression, pathlike): teardown_test_files(pickle_files) +@pytest.mark.skipif( + Engine.get() not in ("Ray", "Unidist", "Dask"), + reason=f"{Engine.get()} does not have experimental API", +) +@pytest.mark.parametrize( + "filename", + ["test_parquet_glob.parquet", "test_parquet_glob*.parquet"], +) +def test_parquet_glob(filename): + data = test_data["int_data"] + df = pd.DataFrame(data) + + filename_param = filename + + with ( + warns_that_defaulting_to_pandas() + if filename_param == "test_parquet_glob.parquet" + else contextlib.nullcontext() + ): + df.modin.to_parquet_glob(filename) + read_df = pd.read_parquet_glob(filename) + df_equals(read_df, df) + + pickle_files = glob.glob(str(filename)) + teardown_test_files(pickle_files) + + @pytest.mark.skipif( Engine.get() not in ("Ray", "Unidist", "Dask"), reason=f"{Engine.get()} does not have experimental read_custom_text API", diff --git a/modin/pandas/accessor.py b/modin/pandas/accessor.py index 3d09424f160..65dc54ec5b1 100644 --- a/modin/pandas/accessor.py +++ b/modin/pandas/accessor.py @@ -257,3 +257,27 @@ def to_pickle_distributed( protocol=protocol, storage_options=storage_options, ) + + def to_parquet_glob( + self, + path=None, + engine="auto", + compression="snappy", + index=None, + partition_cols=None, + storage_options: StorageOptions = None, + **kwargs, + ): + # TODO: add docstring + from modin.experimental.pandas.io import to_parquet_glob + + to_parquet_glob( + self._data, + path=path, + engine=engine, + compression=compression, + index=index, + partition_cols=partition_cols, + storage_options=storage_options, + **kwargs, + ) diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index 0ddc868305d..0c2a2e1539b 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -3187,4 +3187,4 @@ def __reduce__(self): # Persistance support methods - END # Namespace for experimental functions - modin = CachedAccessor("modin", ExperimentalFunctions) + modin: ExperimentalFunctions = CachedAccessor("modin", ExperimentalFunctions) From a93f1fb9abdb08a60715ba161c24d7de4eae03f9 Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Fri, 12 Jan 2024 01:49:16 +0100 Subject: [PATCH 2/7] update docs Signed-off-by: Anatoly Myachev --- docs/flow/modin/experimental/pandas.rst | 2 + docs/supported_apis/dataframe_supported.rst | 2 + docs/supported_apis/io_supported.rst | 1 + docs/usage_guide/advanced_usage/index.rst | 6 ++- .../dispatching/factories/dispatcher.py | 4 +- .../dispatching/factories/factories.py | 23 ++++++++--- .../core/io/glob/glob_dispatcher.py | 3 +- modin/experimental/pandas/io.py | 39 +++++++++++++++---- modin/pandas/accessor.py | 30 +++++++++++--- 9 files changed, 87 insertions(+), 23 deletions(-) diff --git a/docs/flow/modin/experimental/pandas.rst b/docs/flow/modin/experimental/pandas.rst index 25d9d8f3bcc..d429003c735 100644 --- a/docs/flow/modin/experimental/pandas.rst +++ b/docs/flow/modin/experimental/pandas.rst @@ -13,4 +13,6 @@ Experimental API Reference .. autofunction:: read_csv_glob .. autofunction:: read_custom_text .. autofunction:: read_pickle_distributed +.. autofunction:: read_parquet_glob .. automethod:: modin.pandas.DataFrame.modin::to_pickle_distributed +.. automethod:: modin.pandas.DataFrame.modin::to_parquet_glob diff --git a/docs/supported_apis/dataframe_supported.rst b/docs/supported_apis/dataframe_supported.rst index bcd5a364221..7d29af21265 100644 --- a/docs/supported_apis/dataframe_supported.rst +++ b/docs/supported_apis/dataframe_supported.rst @@ -421,6 +421,8 @@ default to pandas. | | | | ``path`` parameter specifies a directory where one | | | | | file is written per row partition of the Modin | | | | | dataframe. | +| | | | Experimental implementation: | +| | | | DataFrame.modin.to_parquet_glob | +----------------------------+---------------------------+------------------------+----------------------------------------------------+ | ``to_period`` | `to_period`_ | D | | +----------------------------+---------------------------+------------------------+----------------------------------------------------+ diff --git a/docs/supported_apis/io_supported.rst b/docs/supported_apis/io_supported.rst index c29c0792ef6..11f2a99f5e7 100644 --- a/docs/supported_apis/io_supported.rst +++ b/docs/supported_apis/io_supported.rst @@ -46,6 +46,7 @@ default to pandas. | | | passed via ``**kwargs`` are not supported. | | | | ``use_nullable_dtypes`` == True is not supported. | | | | | +| | | Experimental implementation: read_parquet_glob | +-------------------+---------------------------------+--------------------------------------------------------+ | `read_json`_ | P | Implemented for ``lines=True`` | +-------------------+---------------------------------+--------------------------------------------------------+ diff --git a/docs/usage_guide/advanced_usage/index.rst b/docs/usage_guide/advanced_usage/index.rst index 66560bebbe8..3151c28cff7 100644 --- a/docs/usage_guide/advanced_usage/index.rst +++ b/docs/usage_guide/advanced_usage/index.rst @@ -30,8 +30,10 @@ Modin also supports these experimental APIs on top of pandas that are under acti - :py:func:`~modin.experimental.pandas.read_csv_glob` -- read multiple files in a directory - :py:func:`~modin.experimental.pandas.read_sql` -- add optional parameters for the database connection - :py:func:`~modin.experimental.pandas.read_custom_text` -- read custom text data from file -- :py:func:`~modin.experimental.pandas.read_pickle_distributed` -- read multiple files in a directory -- :py:meth:`~modin.pandas.DataFrame.modin.to_pickle_distributed` -- write to multiple files in a directory +- :py:func:`~modin.experimental.pandas.read_pickle_distributed` -- read multiple pickle files in a directory +- :py:func:`~modin.experimental.pandas.read_parquet_glob` -- read multiple parquet files in a directory +- :py:meth:`~modin.pandas.DataFrame.modin.to_pickle_distributed` -- write to multiple pickle files in a directory +- :py:meth:`~modin.pandas.DataFrame.modin.to_parquet_glob` -- write to multiple parquet files in a directory DataFrame partitioning API -------------------------- diff --git a/modin/core/execution/dispatching/factories/dispatcher.py b/modin/core/execution/dispatching/factories/dispatcher.py index c4d119921b5..8c4ecfa7ace 100644 --- a/modin/core/execution/dispatching/factories/dispatcher.py +++ b/modin/core/execution/dispatching/factories/dispatcher.py @@ -297,12 +297,12 @@ def to_pickle_distributed(cls, *args, **kwargs): return cls.get_factory()._to_pickle_distributed(*args, **kwargs) @classmethod - # @_inherit_docstrings(factories.PandasOnRayFactory._read_parquet_glob) + @_inherit_docstrings(factories.PandasOnRayFactory._read_parquet_glob) def read_parquet_glob(cls, *args, **kwargs): return cls.get_factory()._read_parquet_glob(*args, **kwargs) @classmethod - # @_inherit_docstrings(factories.PandasOnRayFactory._to_parquet_glob) + @_inherit_docstrings(factories.PandasOnRayFactory._to_parquet_glob) def to_parquet_glob(cls, *args, **kwargs): return cls.get_factory()._to_parquet_glob(*args, **kwargs) diff --git a/modin/core/execution/dispatching/factories/factories.py b/modin/core/execution/dispatching/factories/factories.py index 4d8eea02728..91d6273421a 100644 --- a/modin/core/execution/dispatching/factories/factories.py +++ b/modin/core/execution/dispatching/factories/factories.py @@ -517,20 +517,31 @@ def _to_pickle_distributed(cls, *args, **kwargs): return cls.io_cls.to_pickle_distributed(*args, **kwargs) @classmethod - # @_inherit_docstrings(factories.PandasOnRayFactory._read_parquet_glob) - def _read_parquet_glob(cls, *args, **kwargs): - # TODO: add docstring + @doc( + _doc_io_method_raw_template, + source="Parquet files", + params=_doc_io_method_kwargs_params, + ) + def _read_parquet_glob(cls, **kwargs): current_execution = get_current_execution() if current_execution not in supported_executions: raise NotImplementedError( f"`_read_parquet_glob()` is not implemented for {current_execution} execution." ) - return cls.io_cls.read_parquet_glob(*args, **kwargs) + return cls.io_cls.read_parquet_glob(**kwargs) @classmethod - # @_inherit_docstrings(factories.PandasOnRayFactory._to_parquet_glob) def _to_parquet_glob(cls, *args, **kwargs): - # TODO: add docstring + """ + Write query compiler content to several parquet files. + + Parameters + ---------- + *args : args + Arguments to pass to the writer method. + **kwargs : kwargs + Arguments to pass to the writer method. + """ current_execution = get_current_execution() if current_execution not in supported_executions: raise NotImplementedError( diff --git a/modin/experimental/core/io/glob/glob_dispatcher.py b/modin/experimental/core/io/glob/glob_dispatcher.py index 7d003c939e4..5210b6b84e5 100644 --- a/modin/experimental/core/io/glob/glob_dispatcher.py +++ b/modin/experimental/core/io/glob/glob_dispatcher.py @@ -122,7 +122,8 @@ def write(cls, qc, **kwargs): cls.base_write(qc, filepath_or_buffer, **kwargs) return - # just to try + # Be careful, this is a kind of limitation, but at the time of the first implementation, + # getting a name in this way is quite convenient. write_func_name = cls.base_write.__name__ def func(df, **kw): # pragma: no cover diff --git a/modin/experimental/pandas/io.py b/modin/experimental/pandas/io.py index 3f84b908211..3e6fd2de369 100644 --- a/modin/experimental/pandas/io.py +++ b/modin/experimental/pandas/io.py @@ -354,7 +354,7 @@ def to_pickle_distributed( compression: CompressionOptions = "infer", protocol: int = pickle.HIGHEST_PROTOCOL, storage_options: StorageOptions = None, -): +) -> None: """ Pickle (serialize) object to file. @@ -363,7 +363,7 @@ def to_pickle_distributed( Parameters ---------- - filepath_or_buffer : str, path object or file-like object + filepath_or_buffer : str File path where the pickled object will be stored. compression : {{'infer', 'gzip', 'bz2', 'zip', 'xz', None}}, default: 'infer' A string representing the compression to use in the output file. By @@ -412,8 +412,23 @@ def read_parquet_glob( filesystem=None, filters=None, **kwargs, -): - # TODO: add docstring +) -> DataFrame: # noqa: PR01 + """ + Load a parquet object from the file path, returning a DataFrame. + + This experimental feature provides parallel reading from multiple parquet files which are + defined by glob pattern. The files must contain parts of one dataframe, which can be + obtained, for example, by `DataFrame.modin.to_parquet_glob` function. + + Returns + ------- + DataFrame + + Notes + ----- + * Only string type supported for `path` argument. + * The rest of the arguments are the same as for `pandas.read_parquet`. + """ from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher return DataFrame( @@ -434,15 +449,25 @@ def read_parquet_glob( @expanduser_path_arg("path") def to_parquet_glob( self, - path=None, + path, engine="auto", compression="snappy", index=None, partition_cols=None, storage_options: StorageOptions = None, **kwargs, -): - # TODO: add docstring +) -> None: # noqa: PR01 + """ + Write a DataFrame to the binary parquet format. + + This experimental feature provides parallel writing into multiple pickle files which are + defined by glob pattern, otherwise (without glob pattern) default pandas implementation is used. + + Notes + ----- + * Only string type supported for `path` argument. + * The rest of the arguments are the same as for `pandas.to_parquet`. + """ obj = self from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher diff --git a/modin/pandas/accessor.py b/modin/pandas/accessor.py index 65dc54ec5b1..c208d576162 100644 --- a/modin/pandas/accessor.py +++ b/modin/pandas/accessor.py @@ -215,7 +215,7 @@ def to_pickle_distributed( compression: CompressionOptions = "infer", protocol: int = pickle.HIGHEST_PROTOCOL, storage_options: StorageOptions = None, - ): + ) -> None: """ Pickle (serialize) object to file. @@ -224,7 +224,7 @@ def to_pickle_distributed( Parameters ---------- - filepath_or_buffer : str, path object or file-like object + filepath_or_buffer : str File path where the pickled object will be stored. compression : {{'infer', 'gzip', 'bz2', 'zip', 'xz', None}}, default: 'infer' A string representing the compression to use in the output file. By @@ -260,17 +260,37 @@ def to_pickle_distributed( def to_parquet_glob( self, - path=None, + path, engine="auto", compression="snappy", index=None, partition_cols=None, storage_options: StorageOptions = None, **kwargs, - ): - # TODO: add docstring + ) -> None: # noqa: PR01 + """ + Load a parquet object from the file path, returning a DataFrame. + + This experimental feature provides parallel reading from multiple parquet files which are + defined by glob pattern. The files must contain parts of one dataframe, which can be + obtained, for example, by `DataFrame.modin.to_parquet_glob` function. + + Returns + ------- + DataFrame + + Notes + ----- + * Only string type supported for `path` argument. + * The rest of the arguments are the same as for `pandas.read_parquet`. + """ from modin.experimental.pandas.io import to_parquet_glob + if path is None: + raise NotImplementedError( + "`to_parquet_glob` doesn't support path=None, use `to_parquet` in that case." + ) + to_parquet_glob( self._data, path=path, From c3f6a89531402fd7f59918a3cd3c7df1d5dc1616 Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Fri, 12 Jan 2024 18:04:20 +0100 Subject: [PATCH 3/7] Apply suggestions from code review Co-authored-by: Iaroslav Igoshev --- modin/experimental/core/io/glob/glob_dispatcher.py | 4 ++-- modin/experimental/pandas/test/test_io_exp.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/modin/experimental/core/io/glob/glob_dispatcher.py b/modin/experimental/core/io/glob/glob_dispatcher.py index 5210b6b84e5..4ff375052f7 100644 --- a/modin/experimental/core/io/glob/glob_dispatcher.py +++ b/modin/experimental/core/io/glob/glob_dispatcher.py @@ -108,9 +108,9 @@ def write(cls, qc, **kwargs): ---------- qc : BaseQueryCompiler The query compiler of the Modin dataframe that we want - to run ``to_[format]_glob`` on. + to run ``to__glob`` on. **kwargs : dict - Parameters for ``pandas.to_[format](**kwargs)``. + Parameters for ``pandas.to_(**kwargs)``. """ path_key = "filepath_or_buffer" if "filepath_or_buffer" in kwargs else "path" filepath_or_buffer = kwargs.pop(path_key) diff --git a/modin/experimental/pandas/test/test_io_exp.py b/modin/experimental/pandas/test/test_io_exp.py index 6acb5703a91..28bd18fc615 100644 --- a/modin/experimental/pandas/test/test_io_exp.py +++ b/modin/experimental/pandas/test/test_io_exp.py @@ -295,8 +295,8 @@ def test_parquet_glob(filename): read_df = pd.read_parquet_glob(filename) df_equals(read_df, df) - pickle_files = glob.glob(str(filename)) - teardown_test_files(pickle_files) + parquet_files = glob.glob(str(filename)) + teardown_test_files(parquet_files) @pytest.mark.skipif( From dbe4cb2e6f410a0986e948a8013f104c94ac8882 Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Fri, 12 Jan 2024 18:21:40 +0100 Subject: [PATCH 4/7] address review comments Signed-off-by: Anatoly Myachev --- modin/experimental/core/io/glob/glob_dispatcher.py | 1 + modin/pandas/accessor.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/modin/experimental/core/io/glob/glob_dispatcher.py b/modin/experimental/core/io/glob/glob_dispatcher.py index 4ff375052f7..859d455b1c6 100644 --- a/modin/experimental/core/io/glob/glob_dispatcher.py +++ b/modin/experimental/core/io/glob/glob_dispatcher.py @@ -124,6 +124,7 @@ def write(cls, qc, **kwargs): # Be careful, this is a kind of limitation, but at the time of the first implementation, # getting a name in this way is quite convenient. + # We can use this attribute because the names of the BaseIO's methods match pandas API. write_func_name = cls.base_write.__name__ def func(df, **kw): # pragma: no cover diff --git a/modin/pandas/accessor.py b/modin/pandas/accessor.py index c208d576162..48626332ce0 100644 --- a/modin/pandas/accessor.py +++ b/modin/pandas/accessor.py @@ -269,7 +269,7 @@ def to_parquet_glob( **kwargs, ) -> None: # noqa: PR01 """ - Load a parquet object from the file path, returning a DataFrame. + Write a DataFrame to the binary parquet format. This experimental feature provides parallel reading from multiple parquet files which are defined by glob pattern. The files must contain parts of one dataframe, which can be From cff35f188005fd6d133502f16d8af033b1394d95 Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Fri, 12 Jan 2024 19:37:40 +0100 Subject: [PATCH 5/7] Apply suggestions from code review Co-authored-by: Iaroslav Igoshev --- modin/experimental/core/io/glob/glob_dispatcher.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modin/experimental/core/io/glob/glob_dispatcher.py b/modin/experimental/core/io/glob/glob_dispatcher.py index 859d455b1c6..29cb4896290 100644 --- a/modin/experimental/core/io/glob/glob_dispatcher.py +++ b/modin/experimental/core/io/glob/glob_dispatcher.py @@ -11,7 +11,7 @@ # ANY KIND, either express or implied. See the License for the specific language # governing permissions and limitations under the License. -"""Module houses ``ExperimentalGlobDispatcher`` class that is used to read files of different formats in parallel.""" +"""Module houses ``ExperimentalGlobDispatcher`` class that is used to read/write files of different formats in parallel.""" import glob import warnings @@ -25,7 +25,7 @@ class ExperimentalGlobDispatcher(FileDispatcher): - """Class implements reading different formats, parallelizing by the number of files.""" + """Class implements reading/writing different formats, parallelizing by the number of files.""" @classmethod def _read(cls, **kwargs): From 67beb1426f7823effd86910d25fa0929f32e701a Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Fri, 12 Jan 2024 19:42:19 +0100 Subject: [PATCH 6/7] fixes Signed-off-by: Anatoly Myachev --- modin/experimental/core/io/__init__.py | 1 - modin/experimental/pandas/io.py | 2 +- modin/pandas/accessor.py | 11 +++-------- 3 files changed, 4 insertions(+), 10 deletions(-) diff --git a/modin/experimental/core/io/__init__.py b/modin/experimental/core/io/__init__.py index e1a19b0d54b..cdd44bb6213 100644 --- a/modin/experimental/core/io/__init__.py +++ b/modin/experimental/core/io/__init__.py @@ -22,6 +22,5 @@ "ExperimentalCSVGlobDispatcher", "ExperimentalSQLDispatcher", "ExperimentalGlobDispatcher", - "ExperimentalGlobDispatcher", "ExperimentalCustomTextDispatcher", ] diff --git a/modin/experimental/pandas/io.py b/modin/experimental/pandas/io.py index 3e6fd2de369..92349b5f7b1 100644 --- a/modin/experimental/pandas/io.py +++ b/modin/experimental/pandas/io.py @@ -460,7 +460,7 @@ def to_parquet_glob( """ Write a DataFrame to the binary parquet format. - This experimental feature provides parallel writing into multiple pickle files which are + This experimental feature provides parallel writing into multiple parquet files which are defined by glob pattern, otherwise (without glob pattern) default pandas implementation is used. Notes diff --git a/modin/pandas/accessor.py b/modin/pandas/accessor.py index 48626332ce0..83e43cff8f7 100644 --- a/modin/pandas/accessor.py +++ b/modin/pandas/accessor.py @@ -271,18 +271,13 @@ def to_parquet_glob( """ Write a DataFrame to the binary parquet format. - This experimental feature provides parallel reading from multiple parquet files which are - defined by glob pattern. The files must contain parts of one dataframe, which can be - obtained, for example, by `DataFrame.modin.to_parquet_glob` function. - - Returns - ------- - DataFrame + This experimental feature provides parallel writing into multiple parquet files which are + defined by glob pattern, otherwise (without glob pattern) default pandas implementation is used. Notes ----- * Only string type supported for `path` argument. - * The rest of the arguments are the same as for `pandas.read_parquet`. + * The rest of the arguments are the same as for `pandas.to_parquet`. """ from modin.experimental.pandas.io import to_parquet_glob From dc1106e01a3d20d1d2c73505bec15a085589a482 Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Fri, 12 Jan 2024 22:48:02 +0100 Subject: [PATCH 7/7] fix Signed-off-by: Anatoly Myachev --- modin/experimental/core/io/glob/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modin/experimental/core/io/glob/__init__.py b/modin/experimental/core/io/glob/__init__.py index 208c4ae4f71..3b8ba1b4fa7 100644 --- a/modin/experimental/core/io/glob/__init__.py +++ b/modin/experimental/core/io/glob/__init__.py @@ -11,4 +11,4 @@ # ANY KIND, either express or implied. See the License for the specific language # governing permissions and limitations under the License. -"""Experimental Glob format type IO functions implementations.""" +"""Experimental module that allows to work with various formats using glob syntax."""