diff --git a/docs/flow/modin/experimental/pandas.rst b/docs/flow/modin/experimental/pandas.rst index 25d9d8f3bcc..d429003c735 100644 --- a/docs/flow/modin/experimental/pandas.rst +++ b/docs/flow/modin/experimental/pandas.rst @@ -13,4 +13,6 @@ Experimental API Reference .. autofunction:: read_csv_glob .. autofunction:: read_custom_text .. autofunction:: read_pickle_distributed +.. autofunction:: read_parquet_glob .. automethod:: modin.pandas.DataFrame.modin::to_pickle_distributed +.. automethod:: modin.pandas.DataFrame.modin::to_parquet_glob diff --git a/docs/supported_apis/dataframe_supported.rst b/docs/supported_apis/dataframe_supported.rst index bcd5a364221..7d29af21265 100644 --- a/docs/supported_apis/dataframe_supported.rst +++ b/docs/supported_apis/dataframe_supported.rst @@ -421,6 +421,8 @@ default to pandas. | | | | ``path`` parameter specifies a directory where one | | | | | file is written per row partition of the Modin | | | | | dataframe. | +| | | | Experimental implementation: | +| | | | DataFrame.modin.to_parquet_glob | +----------------------------+---------------------------+------------------------+----------------------------------------------------+ | ``to_period`` | `to_period`_ | D | | +----------------------------+---------------------------+------------------------+----------------------------------------------------+ diff --git a/docs/supported_apis/io_supported.rst b/docs/supported_apis/io_supported.rst index c29c0792ef6..11f2a99f5e7 100644 --- a/docs/supported_apis/io_supported.rst +++ b/docs/supported_apis/io_supported.rst @@ -46,6 +46,7 @@ default to pandas. | | | passed via ``**kwargs`` are not supported. | | | | ``use_nullable_dtypes`` == True is not supported. | | | | | +| | | Experimental implementation: read_parquet_glob | +-------------------+---------------------------------+--------------------------------------------------------+ | `read_json`_ | P | Implemented for ``lines=True`` | +-------------------+---------------------------------+--------------------------------------------------------+ diff --git a/docs/usage_guide/advanced_usage/index.rst b/docs/usage_guide/advanced_usage/index.rst index 66560bebbe8..3151c28cff7 100644 --- a/docs/usage_guide/advanced_usage/index.rst +++ b/docs/usage_guide/advanced_usage/index.rst @@ -30,8 +30,10 @@ Modin also supports these experimental APIs on top of pandas that are under acti - :py:func:`~modin.experimental.pandas.read_csv_glob` -- read multiple files in a directory - :py:func:`~modin.experimental.pandas.read_sql` -- add optional parameters for the database connection - :py:func:`~modin.experimental.pandas.read_custom_text` -- read custom text data from file -- :py:func:`~modin.experimental.pandas.read_pickle_distributed` -- read multiple files in a directory -- :py:meth:`~modin.pandas.DataFrame.modin.to_pickle_distributed` -- write to multiple files in a directory +- :py:func:`~modin.experimental.pandas.read_pickle_distributed` -- read multiple pickle files in a directory +- :py:func:`~modin.experimental.pandas.read_parquet_glob` -- read multiple parquet files in a directory +- :py:meth:`~modin.pandas.DataFrame.modin.to_pickle_distributed` -- write to multiple pickle files in a directory +- :py:meth:`~modin.pandas.DataFrame.modin.to_parquet_glob` -- write to multiple parquet files in a directory DataFrame partitioning API -------------------------- diff --git a/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py b/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py index e5242458b65..4c51ebe395d 100644 --- a/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py +++ b/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py @@ -43,12 +43,13 @@ from modin.experimental.core.io import ( ExperimentalCSVGlobDispatcher, ExperimentalCustomTextDispatcher, - ExperimentalPickleDispatcher, + ExperimentalGlobDispatcher, ExperimentalSQLDispatcher, ) from modin.experimental.core.storage_formats.pandas.parsers import ( ExperimentalCustomTextParser, ExperimentalPandasCSVGlobParser, + ExperimentalPandasParquetParser, ExperimentalPandasPickleParser, ) @@ -89,10 +90,20 @@ def __make_write(*classes, build_args=build_args): read_csv_glob = __make_read( ExperimentalPandasCSVGlobParser, ExperimentalCSVGlobDispatcher ) + read_parquet_glob = __make_read( + ExperimentalPandasParquetParser, ExperimentalGlobDispatcher + ) + to_parquet_glob = __make_write( + ExperimentalGlobDispatcher, + build_args={**build_args, "base_write": BaseIO.to_parquet}, + ) read_pickle_distributed = __make_read( - ExperimentalPandasPickleParser, ExperimentalPickleDispatcher + ExperimentalPandasPickleParser, ExperimentalGlobDispatcher + ) + to_pickle_distributed = __make_write( + ExperimentalGlobDispatcher, + build_args={**build_args, "base_write": BaseIO.to_pickle}, ) - to_pickle_distributed = __make_write(ExperimentalPickleDispatcher) read_custom_text = __make_read( ExperimentalCustomTextParser, ExperimentalCustomTextDispatcher ) diff --git a/modin/core/execution/dispatching/factories/dispatcher.py b/modin/core/execution/dispatching/factories/dispatcher.py index 5cdbf65f821..8c4ecfa7ace 100644 --- a/modin/core/execution/dispatching/factories/dispatcher.py +++ b/modin/core/execution/dispatching/factories/dispatcher.py @@ -296,6 +296,16 @@ def to_pickle(cls, *args, **kwargs): def to_pickle_distributed(cls, *args, **kwargs): return cls.get_factory()._to_pickle_distributed(*args, **kwargs) + @classmethod + @_inherit_docstrings(factories.PandasOnRayFactory._read_parquet_glob) + def read_parquet_glob(cls, *args, **kwargs): + return cls.get_factory()._read_parquet_glob(*args, **kwargs) + + @classmethod + @_inherit_docstrings(factories.PandasOnRayFactory._to_parquet_glob) + def to_parquet_glob(cls, *args, **kwargs): + return cls.get_factory()._to_parquet_glob(*args, **kwargs) + @classmethod @_inherit_docstrings(factories.PandasOnRayFactory._read_custom_text) def read_custom_text(cls, **kwargs): diff --git a/modin/core/execution/dispatching/factories/factories.py b/modin/core/execution/dispatching/factories/factories.py index f1bc05539c2..91d6273421a 100644 --- a/modin/core/execution/dispatching/factories/factories.py +++ b/modin/core/execution/dispatching/factories/factories.py @@ -516,6 +516,39 @@ def _to_pickle_distributed(cls, *args, **kwargs): ) return cls.io_cls.to_pickle_distributed(*args, **kwargs) + @classmethod + @doc( + _doc_io_method_raw_template, + source="Parquet files", + params=_doc_io_method_kwargs_params, + ) + def _read_parquet_glob(cls, **kwargs): + current_execution = get_current_execution() + if current_execution not in supported_executions: + raise NotImplementedError( + f"`_read_parquet_glob()` is not implemented for {current_execution} execution." + ) + return cls.io_cls.read_parquet_glob(**kwargs) + + @classmethod + def _to_parquet_glob(cls, *args, **kwargs): + """ + Write query compiler content to several parquet files. + + Parameters + ---------- + *args : args + Arguments to pass to the writer method. + **kwargs : kwargs + Arguments to pass to the writer method. + """ + current_execution = get_current_execution() + if current_execution not in supported_executions: + raise NotImplementedError( + f"`_to_parquet_glob()` is not implemented for {current_execution} execution." + ) + return cls.io_cls.to_parquet_glob(*args, **kwargs) + @doc(_doc_factory_class, execution_name="PandasOnRay") class PandasOnRayFactory(BaseFactory): diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py index cca429a526b..4a90cc54025 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py @@ -42,12 +42,13 @@ from modin.experimental.core.io import ( ExperimentalCSVGlobDispatcher, ExperimentalCustomTextDispatcher, - ExperimentalPickleDispatcher, + ExperimentalGlobDispatcher, ExperimentalSQLDispatcher, ) from modin.experimental.core.storage_formats.pandas.parsers import ( ExperimentalCustomTextParser, ExperimentalPandasCSVGlobParser, + ExperimentalPandasParquetParser, ExperimentalPandasPickleParser, ) @@ -91,10 +92,20 @@ def __make_write(*classes, build_args=build_args): read_csv_glob = __make_read( ExperimentalPandasCSVGlobParser, ExperimentalCSVGlobDispatcher ) + read_parquet_glob = __make_read( + ExperimentalPandasParquetParser, ExperimentalGlobDispatcher + ) + to_parquet_glob = __make_write( + ExperimentalGlobDispatcher, + build_args={**build_args, "base_write": RayIO.to_parquet}, + ) read_pickle_distributed = __make_read( - ExperimentalPandasPickleParser, ExperimentalPickleDispatcher + ExperimentalPandasPickleParser, ExperimentalGlobDispatcher + ) + to_pickle_distributed = __make_write( + ExperimentalGlobDispatcher, + build_args={**build_args, "base_write": RayIO.to_pickle}, ) - to_pickle_distributed = __make_write(ExperimentalPickleDispatcher) read_custom_text = __make_read( ExperimentalCustomTextParser, ExperimentalCustomTextDispatcher ) diff --git a/modin/core/execution/unidist/implementations/pandas_on_unidist/io/io.py b/modin/core/execution/unidist/implementations/pandas_on_unidist/io/io.py index 12e053252f5..4311acecbfa 100644 --- a/modin/core/execution/unidist/implementations/pandas_on_unidist/io/io.py +++ b/modin/core/execution/unidist/implementations/pandas_on_unidist/io/io.py @@ -42,12 +42,13 @@ from modin.experimental.core.io import ( ExperimentalCSVGlobDispatcher, ExperimentalCustomTextDispatcher, - ExperimentalPickleDispatcher, + ExperimentalGlobDispatcher, ExperimentalSQLDispatcher, ) from modin.experimental.core.storage_formats.pandas.parsers import ( ExperimentalCustomTextParser, ExperimentalPandasCSVGlobParser, + ExperimentalPandasParquetParser, ExperimentalPandasPickleParser, ) @@ -91,10 +92,20 @@ def __make_write(*classes, build_args=build_args): read_csv_glob = __make_read( ExperimentalPandasCSVGlobParser, ExperimentalCSVGlobDispatcher ) + read_parquet_glob = __make_read( + ExperimentalPandasParquetParser, ExperimentalGlobDispatcher + ) + to_parquet_glob = __make_write( + ExperimentalGlobDispatcher, + build_args={**build_args, "base_write": UnidistIO.to_parquet}, + ) read_pickle_distributed = __make_read( - ExperimentalPandasPickleParser, ExperimentalPickleDispatcher + ExperimentalPandasPickleParser, ExperimentalGlobDispatcher + ) + to_pickle_distributed = __make_write( + ExperimentalGlobDispatcher, + build_args={**build_args, "base_write": UnidistIO.to_pickle}, ) - to_pickle_distributed = __make_write(ExperimentalPickleDispatcher) read_custom_text = __make_read( ExperimentalCustomTextParser, ExperimentalCustomTextDispatcher ) diff --git a/modin/core/io/io.py b/modin/core/io/io.py index a024d0c6c0e..4cc25f81e12 100644 --- a/modin/core/io/io.py +++ b/modin/core/io/io.py @@ -652,7 +652,7 @@ def to_csv(cls, obj, **kwargs): # noqa: PR01 @_inherit_docstrings( pandas.DataFrame.to_parquet, apilink="pandas.DataFrame.to_parquet" ) - def to_parquet(cls, obj, **kwargs): # noqa: PR01 + def to_parquet(cls, obj, path, **kwargs): # noqa: PR01 """ Write object to the binary parquet format using pandas. @@ -662,4 +662,4 @@ def to_parquet(cls, obj, **kwargs): # noqa: PR01 if isinstance(obj, BaseQueryCompiler): obj = obj.to_pandas() - return obj.to_parquet(**kwargs) + return obj.to_parquet(path, **kwargs) diff --git a/modin/experimental/core/io/__init__.py b/modin/experimental/core/io/__init__.py index 9f3c64b00a8..cdd44bb6213 100644 --- a/modin/experimental/core/io/__init__.py +++ b/modin/experimental/core/io/__init__.py @@ -13,7 +13,7 @@ """Experimental IO functions implementations.""" -from .pickle.pickle_dispatcher import ExperimentalPickleDispatcher +from .glob.glob_dispatcher import ExperimentalGlobDispatcher from .sql.sql_dispatcher import ExperimentalSQLDispatcher from .text.csv_glob_dispatcher import ExperimentalCSVGlobDispatcher from .text.custom_text_dispatcher import ExperimentalCustomTextDispatcher @@ -21,6 +21,6 @@ __all__ = [ "ExperimentalCSVGlobDispatcher", "ExperimentalSQLDispatcher", - "ExperimentalPickleDispatcher", + "ExperimentalGlobDispatcher", "ExperimentalCustomTextDispatcher", ] diff --git a/modin/experimental/core/io/pickle/__init__.py b/modin/experimental/core/io/glob/__init__.py similarity index 90% rename from modin/experimental/core/io/pickle/__init__.py rename to modin/experimental/core/io/glob/__init__.py index 39b28ecda49..3b8ba1b4fa7 100644 --- a/modin/experimental/core/io/pickle/__init__.py +++ b/modin/experimental/core/io/glob/__init__.py @@ -11,4 +11,4 @@ # ANY KIND, either express or implied. See the License for the specific language # governing permissions and limitations under the License. -"""Experimental Pickle format type IO functions implementations.""" +"""Experimental module that allows to work with various formats using glob syntax.""" diff --git a/modin/experimental/core/io/pickle/pickle_dispatcher.py b/modin/experimental/core/io/glob/glob_dispatcher.py similarity index 75% rename from modin/experimental/core/io/pickle/pickle_dispatcher.py rename to modin/experimental/core/io/glob/glob_dispatcher.py index 119171e061c..29cb4896290 100644 --- a/modin/experimental/core/io/pickle/pickle_dispatcher.py +++ b/modin/experimental/core/io/glob/glob_dispatcher.py @@ -11,7 +11,7 @@ # ANY KIND, either express or implied. See the License for the specific language # governing permissions and limitations under the License. -"""Module houses ``ExperimentalPickleDispatcher`` class that is used for reading `.pkl` files.""" +"""Module houses ``ExperimentalGlobDispatcher`` class that is used to read/write files of different formats in parallel.""" import glob import warnings @@ -24,20 +24,20 @@ from modin.core.storage_formats.pandas.query_compiler import PandasQueryCompiler -class ExperimentalPickleDispatcher(FileDispatcher): - """Class handles utils for reading pickle files.""" +class ExperimentalGlobDispatcher(FileDispatcher): + """Class implements reading/writing different formats, parallelizing by the number of files.""" @classmethod - def _read(cls, filepath_or_buffer, **kwargs): + def _read(cls, **kwargs): """ Read data from `filepath_or_buffer` according to `kwargs` parameters. Parameters ---------- filepath_or_buffer : str, path object or file-like object - `filepath_or_buffer` parameter of `read_pickle` function. + `filepath_or_buffer` parameter of `read_*` function. **kwargs : dict - Parameters of `read_pickle` function. + Parameters of `read_*` function. Returns ------- @@ -46,10 +46,10 @@ def _read(cls, filepath_or_buffer, **kwargs): Notes ----- - In experimental mode, we can use `*` in the filename. - The number of partitions is equal to the number of input files. """ + path_key = "filepath_or_buffer" if "filepath_or_buffer" in kwargs else "path" + filepath_or_buffer = kwargs.pop(path_key) filepath_or_buffer = stringify_path(filepath_or_buffer) if not (isinstance(filepath_or_buffer, str) and "*" in filepath_or_buffer): return cls.single_worker_read( @@ -104,37 +104,33 @@ def write(cls, qc, **kwargs): - if `*` is in the filename, then it will be replaced by the ascending sequence 0, 1, 2, … - if `*` is not in the filename, then the default implementation will be used. - Example: 4 partitions and input filename="partition*.pkl.gz", then filenames will be: - `partition0.pkl.gz`, `partition1.pkl.gz`, `partition2.pkl.gz`, `partition3.pkl.gz`. - Parameters ---------- qc : BaseQueryCompiler The query compiler of the Modin dataframe that we want - to run ``to_pickle_distributed`` on. + to run ``to__glob`` on. **kwargs : dict - Parameters for ``pandas.to_pickle(**kwargs)``. + Parameters for ``pandas.to_(**kwargs)``. """ - kwargs["filepath_or_buffer"] = stringify_path(kwargs["filepath_or_buffer"]) + path_key = "filepath_or_buffer" if "filepath_or_buffer" in kwargs else "path" + filepath_or_buffer = kwargs.pop(path_key) + filepath_or_buffer = stringify_path(filepath_or_buffer) if not ( - isinstance(kwargs["filepath_or_buffer"], str) - and "*" in kwargs["filepath_or_buffer"] + isinstance(filepath_or_buffer, str) and "*" in filepath_or_buffer ) or not isinstance(qc, PandasQueryCompiler): warnings.warn("Defaulting to Modin core implementation") - cls.base_io.to_pickle(qc, **kwargs) + cls.base_write(qc, filepath_or_buffer, **kwargs) return + # Be careful, this is a kind of limitation, but at the time of the first implementation, + # getting a name in this way is quite convenient. + # We can use this attribute because the names of the BaseIO's methods match pandas API. + write_func_name = cls.base_write.__name__ + def func(df, **kw): # pragma: no cover idx = str(kw["partition_idx"]) - # dask doesn't make a copy of kwargs on serialization; - # so take a copy ourselves, otherwise the error is: - # kwargs["path"] = kwargs.pop("filepath_or_buffer").replace("*", idx) - # KeyError: 'filepath_or_buffer' - dask_kwargs = dict(kwargs) - dask_kwargs["path"] = dask_kwargs.pop("filepath_or_buffer").replace( - "*", idx - ) - df.to_pickle(**dask_kwargs) + path = filepath_or_buffer.replace("*", idx) + getattr(df, write_func_name)(path, **kwargs) return pandas.DataFrame() result = qc._modin_frame.apply_full_axis( diff --git a/modin/experimental/core/storage_formats/pandas/parsers.py b/modin/experimental/core/storage_formats/pandas/parsers.py index bf09fc99ebc..be2ec01489a 100644 --- a/modin/experimental/core/storage_formats/pandas/parsers.py +++ b/modin/experimental/core/storage_formats/pandas/parsers.py @@ -114,6 +114,24 @@ def parse(fname, **kwargs): return _split_result_for_readers(1, num_splits, df) + [length, width] +@doc(_doc_pandas_parser_class, data_type="parquet files") +class ExperimentalPandasParquetParser(PandasParser): + @staticmethod + @doc(_doc_parse_func, parameters=_doc_parse_parameters_common) + def parse(fname, **kwargs): + warnings.filterwarnings("ignore") + num_splits = 1 + single_worker_read = kwargs.pop("single_worker_read", None) + df = pandas.read_parquet(fname, **kwargs) + if single_worker_read: + return df + + length = len(df) + width = len(df.columns) + + return _split_result_for_readers(1, num_splits, df) + [length, width] + + @doc(_doc_pandas_parser_class, data_type="custom text") class ExperimentalCustomTextParser(PandasParser): @staticmethod diff --git a/modin/experimental/pandas/__init__.py b/modin/experimental/pandas/__init__.py index 7516563d41c..4c484fddb89 100644 --- a/modin/experimental/pandas/__init__.py +++ b/modin/experimental/pandas/__init__.py @@ -40,6 +40,7 @@ from .io import ( # noqa F401 read_csv_glob, read_custom_text, + read_parquet_glob, read_pickle_distributed, read_sql, to_pickle_distributed, diff --git a/modin/experimental/pandas/io.py b/modin/experimental/pandas/io.py index 4d5959366f2..92349b5f7b1 100644 --- a/modin/experimental/pandas/io.py +++ b/modin/experimental/pandas/io.py @@ -13,6 +13,8 @@ """Implement experimental I/O public API.""" +from __future__ import annotations + import inspect import pathlib import pickle @@ -352,7 +354,7 @@ def to_pickle_distributed( compression: CompressionOptions = "infer", protocol: int = pickle.HIGHEST_PROTOCOL, storage_options: StorageOptions = None, -): +) -> None: """ Pickle (serialize) object to file. @@ -361,7 +363,7 @@ def to_pickle_distributed( Parameters ---------- - filepath_or_buffer : str, path object or file-like object + filepath_or_buffer : str File path where the pickled object will be stored. compression : {{'infer', 'gzip', 'bz2', 'zip', 'xz', None}}, default: 'infer' A string representing the compression to use in the output file. By @@ -397,3 +399,87 @@ def to_pickle_distributed( protocol=protocol, storage_options=storage_options, ) + + +@expanduser_path_arg("path") +def read_parquet_glob( + path, + engine: str = "auto", + columns: list[str] | None = None, + storage_options: StorageOptions = None, + use_nullable_dtypes: bool = lib.no_default, + dtype_backend=lib.no_default, + filesystem=None, + filters=None, + **kwargs, +) -> DataFrame: # noqa: PR01 + """ + Load a parquet object from the file path, returning a DataFrame. + + This experimental feature provides parallel reading from multiple parquet files which are + defined by glob pattern. The files must contain parts of one dataframe, which can be + obtained, for example, by `DataFrame.modin.to_parquet_glob` function. + + Returns + ------- + DataFrame + + Notes + ----- + * Only string type supported for `path` argument. + * The rest of the arguments are the same as for `pandas.read_parquet`. + """ + from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher + + return DataFrame( + query_compiler=FactoryDispatcher.read_parquet_glob( + path=path, + engine=engine, + columns=columns, + storage_options=storage_options, + use_nullable_dtypes=use_nullable_dtypes, + dtype_backend=dtype_backend, + filesystem=filesystem, + filters=filters, + **kwargs, + ) + ) + + +@expanduser_path_arg("path") +def to_parquet_glob( + self, + path, + engine="auto", + compression="snappy", + index=None, + partition_cols=None, + storage_options: StorageOptions = None, + **kwargs, +) -> None: # noqa: PR01 + """ + Write a DataFrame to the binary parquet format. + + This experimental feature provides parallel writing into multiple parquet files which are + defined by glob pattern, otherwise (without glob pattern) default pandas implementation is used. + + Notes + ----- + * Only string type supported for `path` argument. + * The rest of the arguments are the same as for `pandas.to_parquet`. + """ + obj = self + from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher + + if isinstance(self, DataFrame): + obj = self._query_compiler + FactoryDispatcher.to_parquet_glob( + obj, + path=path, + engine=engine, + compression=compression, + index=index, + partition_cols=partition_cols, + storage_options=storage_options, + **kwargs, + ) diff --git a/modin/experimental/pandas/test/test_io_exp.py b/modin/experimental/pandas/test/test_io_exp.py index 1b9bccd7efd..28bd18fc615 100644 --- a/modin/experimental/pandas/test/test_io_exp.py +++ b/modin/experimental/pandas/test/test_io_exp.py @@ -272,6 +272,33 @@ def test_distributed_pickling(filename, compression, pathlike): teardown_test_files(pickle_files) +@pytest.mark.skipif( + Engine.get() not in ("Ray", "Unidist", "Dask"), + reason=f"{Engine.get()} does not have experimental API", +) +@pytest.mark.parametrize( + "filename", + ["test_parquet_glob.parquet", "test_parquet_glob*.parquet"], +) +def test_parquet_glob(filename): + data = test_data["int_data"] + df = pd.DataFrame(data) + + filename_param = filename + + with ( + warns_that_defaulting_to_pandas() + if filename_param == "test_parquet_glob.parquet" + else contextlib.nullcontext() + ): + df.modin.to_parquet_glob(filename) + read_df = pd.read_parquet_glob(filename) + df_equals(read_df, df) + + parquet_files = glob.glob(str(filename)) + teardown_test_files(parquet_files) + + @pytest.mark.skipif( Engine.get() not in ("Ray", "Unidist", "Dask"), reason=f"{Engine.get()} does not have experimental read_custom_text API", diff --git a/modin/pandas/accessor.py b/modin/pandas/accessor.py index 3d09424f160..83e43cff8f7 100644 --- a/modin/pandas/accessor.py +++ b/modin/pandas/accessor.py @@ -215,7 +215,7 @@ def to_pickle_distributed( compression: CompressionOptions = "infer", protocol: int = pickle.HIGHEST_PROTOCOL, storage_options: StorageOptions = None, - ): + ) -> None: """ Pickle (serialize) object to file. @@ -224,7 +224,7 @@ def to_pickle_distributed( Parameters ---------- - filepath_or_buffer : str, path object or file-like object + filepath_or_buffer : str File path where the pickled object will be stored. compression : {{'infer', 'gzip', 'bz2', 'zip', 'xz', None}}, default: 'infer' A string representing the compression to use in the output file. By @@ -257,3 +257,42 @@ def to_pickle_distributed( protocol=protocol, storage_options=storage_options, ) + + def to_parquet_glob( + self, + path, + engine="auto", + compression="snappy", + index=None, + partition_cols=None, + storage_options: StorageOptions = None, + **kwargs, + ) -> None: # noqa: PR01 + """ + Write a DataFrame to the binary parquet format. + + This experimental feature provides parallel writing into multiple parquet files which are + defined by glob pattern, otherwise (without glob pattern) default pandas implementation is used. + + Notes + ----- + * Only string type supported for `path` argument. + * The rest of the arguments are the same as for `pandas.to_parquet`. + """ + from modin.experimental.pandas.io import to_parquet_glob + + if path is None: + raise NotImplementedError( + "`to_parquet_glob` doesn't support path=None, use `to_parquet` in that case." + ) + + to_parquet_glob( + self._data, + path=path, + engine=engine, + compression=compression, + index=index, + partition_cols=partition_cols, + storage_options=storage_options, + **kwargs, + ) diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index 0ddc868305d..0c2a2e1539b 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -3187,4 +3187,4 @@ def __reduce__(self): # Persistance support methods - END # Namespace for experimental functions - modin = CachedAccessor("modin", ExperimentalFunctions) + modin: ExperimentalFunctions = CachedAccessor("modin", ExperimentalFunctions)