diff --git a/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py b/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py index e5242458b65..4c51ebe395d 100644 --- a/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py +++ b/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py @@ -43,12 +43,13 @@ from modin.experimental.core.io import ( ExperimentalCSVGlobDispatcher, ExperimentalCustomTextDispatcher, - ExperimentalPickleDispatcher, + ExperimentalGlobDispatcher, ExperimentalSQLDispatcher, ) from modin.experimental.core.storage_formats.pandas.parsers import ( ExperimentalCustomTextParser, ExperimentalPandasCSVGlobParser, + ExperimentalPandasParquetParser, ExperimentalPandasPickleParser, ) @@ -89,10 +90,20 @@ def __make_write(*classes, build_args=build_args): read_csv_glob = __make_read( ExperimentalPandasCSVGlobParser, ExperimentalCSVGlobDispatcher ) + read_parquet_glob = __make_read( + ExperimentalPandasParquetParser, ExperimentalGlobDispatcher + ) + to_parquet_glob = __make_write( + ExperimentalGlobDispatcher, + build_args={**build_args, "base_write": BaseIO.to_parquet}, + ) read_pickle_distributed = __make_read( - ExperimentalPandasPickleParser, ExperimentalPickleDispatcher + ExperimentalPandasPickleParser, ExperimentalGlobDispatcher + ) + to_pickle_distributed = __make_write( + ExperimentalGlobDispatcher, + build_args={**build_args, "base_write": BaseIO.to_pickle}, ) - to_pickle_distributed = __make_write(ExperimentalPickleDispatcher) read_custom_text = __make_read( ExperimentalCustomTextParser, ExperimentalCustomTextDispatcher ) diff --git a/modin/core/execution/dispatching/factories/dispatcher.py b/modin/core/execution/dispatching/factories/dispatcher.py index 5cdbf65f821..c4d119921b5 100644 --- a/modin/core/execution/dispatching/factories/dispatcher.py +++ b/modin/core/execution/dispatching/factories/dispatcher.py @@ -296,6 +296,16 @@ def to_pickle(cls, *args, **kwargs): def to_pickle_distributed(cls, *args, **kwargs): return cls.get_factory()._to_pickle_distributed(*args, **kwargs) + @classmethod + # @_inherit_docstrings(factories.PandasOnRayFactory._read_parquet_glob) + def read_parquet_glob(cls, *args, **kwargs): + return cls.get_factory()._read_parquet_glob(*args, **kwargs) + + @classmethod + # @_inherit_docstrings(factories.PandasOnRayFactory._to_parquet_glob) + def to_parquet_glob(cls, *args, **kwargs): + return cls.get_factory()._to_parquet_glob(*args, **kwargs) + @classmethod @_inherit_docstrings(factories.PandasOnRayFactory._read_custom_text) def read_custom_text(cls, **kwargs): diff --git a/modin/core/execution/dispatching/factories/factories.py b/modin/core/execution/dispatching/factories/factories.py index f1bc05539c2..4d8eea02728 100644 --- a/modin/core/execution/dispatching/factories/factories.py +++ b/modin/core/execution/dispatching/factories/factories.py @@ -516,6 +516,28 @@ def _to_pickle_distributed(cls, *args, **kwargs): ) return cls.io_cls.to_pickle_distributed(*args, **kwargs) + @classmethod + # @_inherit_docstrings(factories.PandasOnRayFactory._read_parquet_glob) + def _read_parquet_glob(cls, *args, **kwargs): + # TODO: add docstring + current_execution = get_current_execution() + if current_execution not in supported_executions: + raise NotImplementedError( + f"`_read_parquet_glob()` is not implemented for {current_execution} execution." + ) + return cls.io_cls.read_parquet_glob(*args, **kwargs) + + @classmethod + # @_inherit_docstrings(factories.PandasOnRayFactory._to_parquet_glob) + def _to_parquet_glob(cls, *args, **kwargs): + # TODO: add docstring + current_execution = get_current_execution() + if current_execution not in supported_executions: + raise NotImplementedError( + f"`_to_parquet_glob()` is not implemented for {current_execution} execution." + ) + return cls.io_cls.to_parquet_glob(*args, **kwargs) + @doc(_doc_factory_class, execution_name="PandasOnRay") class PandasOnRayFactory(BaseFactory): diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py index cca429a526b..4a90cc54025 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py @@ -42,12 +42,13 @@ from modin.experimental.core.io import ( ExperimentalCSVGlobDispatcher, ExperimentalCustomTextDispatcher, - ExperimentalPickleDispatcher, + ExperimentalGlobDispatcher, ExperimentalSQLDispatcher, ) from modin.experimental.core.storage_formats.pandas.parsers import ( ExperimentalCustomTextParser, ExperimentalPandasCSVGlobParser, + ExperimentalPandasParquetParser, ExperimentalPandasPickleParser, ) @@ -91,10 +92,20 @@ def __make_write(*classes, build_args=build_args): read_csv_glob = __make_read( ExperimentalPandasCSVGlobParser, ExperimentalCSVGlobDispatcher ) + read_parquet_glob = __make_read( + ExperimentalPandasParquetParser, ExperimentalGlobDispatcher + ) + to_parquet_glob = __make_write( + ExperimentalGlobDispatcher, + build_args={**build_args, "base_write": RayIO.to_parquet}, + ) read_pickle_distributed = __make_read( - ExperimentalPandasPickleParser, ExperimentalPickleDispatcher + ExperimentalPandasPickleParser, ExperimentalGlobDispatcher + ) + to_pickle_distributed = __make_write( + ExperimentalGlobDispatcher, + build_args={**build_args, "base_write": RayIO.to_pickle}, ) - to_pickle_distributed = __make_write(ExperimentalPickleDispatcher) read_custom_text = __make_read( ExperimentalCustomTextParser, ExperimentalCustomTextDispatcher ) diff --git a/modin/core/execution/unidist/implementations/pandas_on_unidist/io/io.py b/modin/core/execution/unidist/implementations/pandas_on_unidist/io/io.py index 12e053252f5..4311acecbfa 100644 --- a/modin/core/execution/unidist/implementations/pandas_on_unidist/io/io.py +++ b/modin/core/execution/unidist/implementations/pandas_on_unidist/io/io.py @@ -42,12 +42,13 @@ from modin.experimental.core.io import ( ExperimentalCSVGlobDispatcher, ExperimentalCustomTextDispatcher, - ExperimentalPickleDispatcher, + ExperimentalGlobDispatcher, ExperimentalSQLDispatcher, ) from modin.experimental.core.storage_formats.pandas.parsers import ( ExperimentalCustomTextParser, ExperimentalPandasCSVGlobParser, + ExperimentalPandasParquetParser, ExperimentalPandasPickleParser, ) @@ -91,10 +92,20 @@ def __make_write(*classes, build_args=build_args): read_csv_glob = __make_read( ExperimentalPandasCSVGlobParser, ExperimentalCSVGlobDispatcher ) + read_parquet_glob = __make_read( + ExperimentalPandasParquetParser, ExperimentalGlobDispatcher + ) + to_parquet_glob = __make_write( + ExperimentalGlobDispatcher, + build_args={**build_args, "base_write": UnidistIO.to_parquet}, + ) read_pickle_distributed = __make_read( - ExperimentalPandasPickleParser, ExperimentalPickleDispatcher + ExperimentalPandasPickleParser, ExperimentalGlobDispatcher + ) + to_pickle_distributed = __make_write( + ExperimentalGlobDispatcher, + build_args={**build_args, "base_write": UnidistIO.to_pickle}, ) - to_pickle_distributed = __make_write(ExperimentalPickleDispatcher) read_custom_text = __make_read( ExperimentalCustomTextParser, ExperimentalCustomTextDispatcher ) diff --git a/modin/core/io/io.py b/modin/core/io/io.py index a024d0c6c0e..4cc25f81e12 100644 --- a/modin/core/io/io.py +++ b/modin/core/io/io.py @@ -652,7 +652,7 @@ def to_csv(cls, obj, **kwargs): # noqa: PR01 @_inherit_docstrings( pandas.DataFrame.to_parquet, apilink="pandas.DataFrame.to_parquet" ) - def to_parquet(cls, obj, **kwargs): # noqa: PR01 + def to_parquet(cls, obj, path, **kwargs): # noqa: PR01 """ Write object to the binary parquet format using pandas. @@ -662,4 +662,4 @@ def to_parquet(cls, obj, **kwargs): # noqa: PR01 if isinstance(obj, BaseQueryCompiler): obj = obj.to_pandas() - return obj.to_parquet(**kwargs) + return obj.to_parquet(path, **kwargs) diff --git a/modin/experimental/core/io/__init__.py b/modin/experimental/core/io/__init__.py index 9f3c64b00a8..e1a19b0d54b 100644 --- a/modin/experimental/core/io/__init__.py +++ b/modin/experimental/core/io/__init__.py @@ -13,7 +13,7 @@ """Experimental IO functions implementations.""" -from .pickle.pickle_dispatcher import ExperimentalPickleDispatcher +from .glob.glob_dispatcher import ExperimentalGlobDispatcher from .sql.sql_dispatcher import ExperimentalSQLDispatcher from .text.csv_glob_dispatcher import ExperimentalCSVGlobDispatcher from .text.custom_text_dispatcher import ExperimentalCustomTextDispatcher @@ -21,6 +21,7 @@ __all__ = [ "ExperimentalCSVGlobDispatcher", "ExperimentalSQLDispatcher", - "ExperimentalPickleDispatcher", + "ExperimentalGlobDispatcher", + "ExperimentalGlobDispatcher", "ExperimentalCustomTextDispatcher", ] diff --git a/modin/experimental/core/io/pickle/__init__.py b/modin/experimental/core/io/glob/__init__.py similarity index 92% rename from modin/experimental/core/io/pickle/__init__.py rename to modin/experimental/core/io/glob/__init__.py index 39b28ecda49..208c4ae4f71 100644 --- a/modin/experimental/core/io/pickle/__init__.py +++ b/modin/experimental/core/io/glob/__init__.py @@ -11,4 +11,4 @@ # ANY KIND, either express or implied. See the License for the specific language # governing permissions and limitations under the License. -"""Experimental Pickle format type IO functions implementations.""" +"""Experimental Glob format type IO functions implementations.""" diff --git a/modin/experimental/core/io/pickle/pickle_dispatcher.py b/modin/experimental/core/io/glob/glob_dispatcher.py similarity index 75% rename from modin/experimental/core/io/pickle/pickle_dispatcher.py rename to modin/experimental/core/io/glob/glob_dispatcher.py index 119171e061c..7d003c939e4 100644 --- a/modin/experimental/core/io/pickle/pickle_dispatcher.py +++ b/modin/experimental/core/io/glob/glob_dispatcher.py @@ -11,7 +11,7 @@ # ANY KIND, either express or implied. See the License for the specific language # governing permissions and limitations under the License. -"""Module houses ``ExperimentalPickleDispatcher`` class that is used for reading `.pkl` files.""" +"""Module houses ``ExperimentalGlobDispatcher`` class that is used to read files of different formats in parallel.""" import glob import warnings @@ -24,20 +24,20 @@ from modin.core.storage_formats.pandas.query_compiler import PandasQueryCompiler -class ExperimentalPickleDispatcher(FileDispatcher): - """Class handles utils for reading pickle files.""" +class ExperimentalGlobDispatcher(FileDispatcher): + """Class implements reading different formats, parallelizing by the number of files.""" @classmethod - def _read(cls, filepath_or_buffer, **kwargs): + def _read(cls, **kwargs): """ Read data from `filepath_or_buffer` according to `kwargs` parameters. Parameters ---------- filepath_or_buffer : str, path object or file-like object - `filepath_or_buffer` parameter of `read_pickle` function. + `filepath_or_buffer` parameter of `read_*` function. **kwargs : dict - Parameters of `read_pickle` function. + Parameters of `read_*` function. Returns ------- @@ -46,10 +46,10 @@ def _read(cls, filepath_or_buffer, **kwargs): Notes ----- - In experimental mode, we can use `*` in the filename. - The number of partitions is equal to the number of input files. """ + path_key = "filepath_or_buffer" if "filepath_or_buffer" in kwargs else "path" + filepath_or_buffer = kwargs.pop(path_key) filepath_or_buffer = stringify_path(filepath_or_buffer) if not (isinstance(filepath_or_buffer, str) and "*" in filepath_or_buffer): return cls.single_worker_read( @@ -104,37 +104,31 @@ def write(cls, qc, **kwargs): - if `*` is in the filename, then it will be replaced by the ascending sequence 0, 1, 2, … - if `*` is not in the filename, then the default implementation will be used. - Example: 4 partitions and input filename="partition*.pkl.gz", then filenames will be: - `partition0.pkl.gz`, `partition1.pkl.gz`, `partition2.pkl.gz`, `partition3.pkl.gz`. - Parameters ---------- qc : BaseQueryCompiler The query compiler of the Modin dataframe that we want - to run ``to_pickle_distributed`` on. + to run ``to_[format]_glob`` on. **kwargs : dict - Parameters for ``pandas.to_pickle(**kwargs)``. + Parameters for ``pandas.to_[format](**kwargs)``. """ - kwargs["filepath_or_buffer"] = stringify_path(kwargs["filepath_or_buffer"]) + path_key = "filepath_or_buffer" if "filepath_or_buffer" in kwargs else "path" + filepath_or_buffer = kwargs.pop(path_key) + filepath_or_buffer = stringify_path(filepath_or_buffer) if not ( - isinstance(kwargs["filepath_or_buffer"], str) - and "*" in kwargs["filepath_or_buffer"] + isinstance(filepath_or_buffer, str) and "*" in filepath_or_buffer ) or not isinstance(qc, PandasQueryCompiler): warnings.warn("Defaulting to Modin core implementation") - cls.base_io.to_pickle(qc, **kwargs) + cls.base_write(qc, filepath_or_buffer, **kwargs) return + # just to try + write_func_name = cls.base_write.__name__ + def func(df, **kw): # pragma: no cover idx = str(kw["partition_idx"]) - # dask doesn't make a copy of kwargs on serialization; - # so take a copy ourselves, otherwise the error is: - # kwargs["path"] = kwargs.pop("filepath_or_buffer").replace("*", idx) - # KeyError: 'filepath_or_buffer' - dask_kwargs = dict(kwargs) - dask_kwargs["path"] = dask_kwargs.pop("filepath_or_buffer").replace( - "*", idx - ) - df.to_pickle(**dask_kwargs) + path = filepath_or_buffer.replace("*", idx) + getattr(df, write_func_name)(path, **kwargs) return pandas.DataFrame() result = qc._modin_frame.apply_full_axis( diff --git a/modin/experimental/core/storage_formats/pandas/parsers.py b/modin/experimental/core/storage_formats/pandas/parsers.py index bf09fc99ebc..be2ec01489a 100644 --- a/modin/experimental/core/storage_formats/pandas/parsers.py +++ b/modin/experimental/core/storage_formats/pandas/parsers.py @@ -114,6 +114,24 @@ def parse(fname, **kwargs): return _split_result_for_readers(1, num_splits, df) + [length, width] +@doc(_doc_pandas_parser_class, data_type="parquet files") +class ExperimentalPandasParquetParser(PandasParser): + @staticmethod + @doc(_doc_parse_func, parameters=_doc_parse_parameters_common) + def parse(fname, **kwargs): + warnings.filterwarnings("ignore") + num_splits = 1 + single_worker_read = kwargs.pop("single_worker_read", None) + df = pandas.read_parquet(fname, **kwargs) + if single_worker_read: + return df + + length = len(df) + width = len(df.columns) + + return _split_result_for_readers(1, num_splits, df) + [length, width] + + @doc(_doc_pandas_parser_class, data_type="custom text") class ExperimentalCustomTextParser(PandasParser): @staticmethod diff --git a/modin/experimental/pandas/__init__.py b/modin/experimental/pandas/__init__.py index 7516563d41c..4c484fddb89 100644 --- a/modin/experimental/pandas/__init__.py +++ b/modin/experimental/pandas/__init__.py @@ -40,6 +40,7 @@ from .io import ( # noqa F401 read_csv_glob, read_custom_text, + read_parquet_glob, read_pickle_distributed, read_sql, to_pickle_distributed, diff --git a/modin/experimental/pandas/io.py b/modin/experimental/pandas/io.py index 4d5959366f2..3f84b908211 100644 --- a/modin/experimental/pandas/io.py +++ b/modin/experimental/pandas/io.py @@ -13,6 +13,8 @@ """Implement experimental I/O public API.""" +from __future__ import annotations + import inspect import pathlib import pickle @@ -397,3 +399,62 @@ def to_pickle_distributed( protocol=protocol, storage_options=storage_options, ) + + +@expanduser_path_arg("path") +def read_parquet_glob( + path, + engine: str = "auto", + columns: list[str] | None = None, + storage_options: StorageOptions = None, + use_nullable_dtypes: bool = lib.no_default, + dtype_backend=lib.no_default, + filesystem=None, + filters=None, + **kwargs, +): + # TODO: add docstring + from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher + + return DataFrame( + query_compiler=FactoryDispatcher.read_parquet_glob( + path=path, + engine=engine, + columns=columns, + storage_options=storage_options, + use_nullable_dtypes=use_nullable_dtypes, + dtype_backend=dtype_backend, + filesystem=filesystem, + filters=filters, + **kwargs, + ) + ) + + +@expanduser_path_arg("path") +def to_parquet_glob( + self, + path=None, + engine="auto", + compression="snappy", + index=None, + partition_cols=None, + storage_options: StorageOptions = None, + **kwargs, +): + # TODO: add docstring + obj = self + from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher + + if isinstance(self, DataFrame): + obj = self._query_compiler + FactoryDispatcher.to_parquet_glob( + obj, + path=path, + engine=engine, + compression=compression, + index=index, + partition_cols=partition_cols, + storage_options=storage_options, + **kwargs, + ) diff --git a/modin/experimental/pandas/test/test_io_exp.py b/modin/experimental/pandas/test/test_io_exp.py index 1b9bccd7efd..1fce000bc20 100644 --- a/modin/experimental/pandas/test/test_io_exp.py +++ b/modin/experimental/pandas/test/test_io_exp.py @@ -272,6 +272,29 @@ def test_distributed_pickling(filename, compression, pathlike): teardown_test_files(pickle_files) +@pytest.mark.parametrize( + "filename", + ["test_parquet_glob.parquet", "test_parquet_glob*.parquet"], +) +def test_parquet_glob(filename): + data = test_data["int_data"] + df = pd.DataFrame(data) + + filename_param = filename + + with ( + warns_that_defaulting_to_pandas() + if filename_param == "test_parquet_glob.parquet" + else contextlib.nullcontext() + ): + df.modin.to_parquet_glob(filename) + read_df = pd.read_parquet_glob(filename) + df_equals(read_df, df) + + pickle_files = glob.glob(str(filename)) + teardown_test_files(pickle_files) + + @pytest.mark.skipif( Engine.get() not in ("Ray", "Unidist", "Dask"), reason=f"{Engine.get()} does not have experimental read_custom_text API", diff --git a/modin/pandas/accessor.py b/modin/pandas/accessor.py index 3d09424f160..65dc54ec5b1 100644 --- a/modin/pandas/accessor.py +++ b/modin/pandas/accessor.py @@ -257,3 +257,27 @@ def to_pickle_distributed( protocol=protocol, storage_options=storage_options, ) + + def to_parquet_glob( + self, + path=None, + engine="auto", + compression="snappy", + index=None, + partition_cols=None, + storage_options: StorageOptions = None, + **kwargs, + ): + # TODO: add docstring + from modin.experimental.pandas.io import to_parquet_glob + + to_parquet_glob( + self._data, + path=path, + engine=engine, + compression=compression, + index=index, + partition_cols=partition_cols, + storage_options=storage_options, + **kwargs, + ) diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index 0ddc868305d..0c2a2e1539b 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -3187,4 +3187,4 @@ def __reduce__(self): # Persistance support methods - END # Namespace for experimental functions - modin = CachedAccessor("modin", ExperimentalFunctions) + modin: ExperimentalFunctions = CachedAccessor("modin", ExperimentalFunctions)