Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX-#5723: Attempt to read list of parquet files in one go. #5724

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 78 additions & 55 deletions modin/core/io/column_stores/parquet_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
from fsspec.spec import AbstractBufferedFile
import numpy as np
from packaging import version
from pandas.api.types import is_list_like
import s3fs

from modin.core.storage_formats.pandas.utils import compute_chunksize
from modin.config import NPartitions
Expand All @@ -40,7 +42,7 @@ class ColumnStoreDataset:

Attributes
----------
path : str, path object or file-like object
paths : list-like of str, path object or file-like object
The filepath of the parquet file in local filesystem or hdfs.
storage_options : dict
Parameters for specific storage engine.
Expand All @@ -60,10 +62,13 @@ class ColumnStoreDataset:
List that contains the full paths of the parquet files in the dataset.
"""

def __init__(self, path, storage_options): # noqa : PR01
self.path = path.__fspath__() if isinstance(path, os.PathLike) else path
def __init__(self, paths, storage_options): # noqa : PR01
self.paths = [
path.__fspath__() if isinstance(path, os.PathLike) else path
for path in paths
]
self.storage_options = storage_options
self._fs_path = None
self._fs_paths = None
self._fs = None
self.dataset = self._init_dataset()
self._row_groups_per_file = None
Expand Down Expand Up @@ -107,14 +112,18 @@ def fs(self):
Filesystem object.
"""
if self._fs is None:
if isinstance(self.path, AbstractBufferedFile):
self._fs = self.path.fs
if isinstance(self.paths[0], AbstractBufferedFile):
self._fs = self.paths[0].fs
else:
self._fs, self._fs_path = url_to_fs(self.path, **self.storage_options)
self._fs_paths = [
url_to_fs(path, **self.storage_options)[1] for path in self.paths
]
# If given list of paths, can assume they are all on the same filesystem
self._fs = url_to_fs(self.paths[0])[0]
return self._fs

@property
def fs_path(self):
def fs_paths(self):
"""
Return the filesystem-specific path or file handle.

Expand All @@ -123,12 +132,18 @@ def fs_path(self):
fs_path : str, path object or file-like object
String path specific to filesystem or a file handle.
"""
if self._fs_path is None:
if isinstance(self.path, AbstractBufferedFile):
self._fs_path = self.path
if self._fs_paths is None:
if len(self.paths) > 1:
self._fs_paths = self.paths
elif isinstance(self.paths[0], AbstractBufferedFile):
self._fs_paths = [self.paths[0]]
else:
self._fs, self._fs_path = url_to_fs(self.path, **self.storage_options)
return self._fs_path
self._fs_paths = [
url_to_fs(path, **self.storage_options)[1] for path in self.paths
]
# If given list of paths, can assume they are all on the same filesystem
self._fs = url_to_fs(self.paths[0])[0]
return self._fs_paths

def to_pandas_dataframe(self, columns):
"""
Expand Down Expand Up @@ -166,8 +181,8 @@ def _unstrip_protocol(protocol, path):
return path
return f"{protos[0]}://{path}"

if isinstance(self.path, AbstractBufferedFile):
return [self.path]
if len(self.paths) == 1 and isinstance(self.paths[0], AbstractBufferedFile):
return [self.paths[0]]
# version.parse() is expensive, so we can split this into two separate loops
if version.parse(fsspec.__version__) < version.parse("2022.5.0"):
fs_files = [_unstrip_protocol(self.fs.protocol, fpath) for fpath in files]
Expand All @@ -183,7 +198,7 @@ def _init_dataset(self): # noqa: GL08
from pyarrow.parquet import ParquetDataset

return ParquetDataset(
self.fs_path, filesystem=self.fs, use_legacy_dataset=False
self.fs_paths, filesystem=self.fs, use_legacy_dataset=False
)

@property
Expand Down Expand Up @@ -231,7 +246,7 @@ def to_pandas_dataframe(
from pyarrow.parquet import read_table

return read_table(
self._fs_path, columns=columns, filesystem=self.fs
self._fs_paths, columns=columns, filesystem=self.fs
).to_pandas()


Expand All @@ -240,7 +255,7 @@ class FastParquetDataset(ColumnStoreDataset):
def _init_dataset(self): # noqa: GL08
from fastparquet import ParquetFile

return ParquetFile(self.fs_path, fs=self.fs)
return ParquetFile(self.fs_paths, fs=self.fs)

@property
def pandas_metadata(self):
Expand Down Expand Up @@ -285,12 +300,13 @@ def _get_fastparquet_files(self): # noqa: GL08
# have to copy some of their logic here while we work on getting
# an easier method to get a list of valid files.
# See: https://github.com/dask/fastparquet/issues/795
if "*" in self.path:
if "*" in self.paths:
files = self.fs.glob(self.path)
else:
files = [
f
for f in self.fs.find(self.path)
for path in self.paths
for f in self.fs.find(path)
if f.endswith(".parquet") or f.endswith(".parq")
]
return files
Expand Down Expand Up @@ -612,41 +628,49 @@ def _read(cls, path, engine, columns, **kwargs):
ParquetFile API is used. Please refer to the documentation here
https://arrow.apache.org/docs/python/parquet.html
"""
if isinstance(path, str):
if os.path.isdir(path):
path_generator = os.walk(path)
else:
storage_options = kwargs.get("storage_options")
if storage_options is not None:
fs, fs_path = url_to_fs(path, **storage_options)
# s3fs.core.S3File is considered list-like
path_list = (
path
if is_list_like(path) and not isinstance(path, s3fs.core.S3File)
else [path]
)
for path in path_list:
if isinstance(path, str):
if os.path.isdir(path):
path_generator = os.walk(path)
else:
fs, fs_path = url_to_fs(path)
path_generator = fs.walk(fs_path)
partitioned_columns = set()
# We do a tree walk of the path directory because partitioned
# parquet directories have a unique column at each directory level.
# Thus, we can use os.walk(), which does a dfs search, to walk
# through the different columns that the data is partitioned on
for _, dir_names, files in path_generator:
if dir_names:
partitioned_columns.add(dir_names[0].split("=")[0])
if files:
# Metadata files, git files, .DSStore
# TODO: fix conditional for column partitioning, see issue #4637
if len(files[0]) > 0 and files[0][0] == ".":
continue
break
partitioned_columns = list(partitioned_columns)
if len(partitioned_columns):
return cls.single_worker_read(
path,
engine=engine,
columns=columns,
reason="Mixed partitioning columns in Parquet",
**kwargs,
)

dataset = cls.get_dataset(path, engine, kwargs.get("storage_options") or {})
storage_options = kwargs.get("storage_options")
if storage_options is not None:
fs, fs_path = url_to_fs(path, **storage_options)
else:
fs, fs_path = url_to_fs(path)
path_generator = fs.walk(fs_path)
partitioned_columns = set()
# We do a tree walk of the path directory because partitioned
# parquet directories have a unique column at each directory level.
# Thus, we can use os.walk(), which does a dfs search, to walk
# through the different columns that the data is partitioned on
for _, dir_names, files in path_generator:
if dir_names:
partitioned_columns.add(dir_names[0].split("=")[0])
if files:
# Metadata files, git files, .DSStore
# TODO: fix conditional for column partitioning, see issue #4637
if len(files[0]) > 0 and files[0][0] == ".":
continue
break
partitioned_columns = list(partitioned_columns)
if len(partitioned_columns):
return cls.single_worker_read(
path,
engine=engine,
columns=columns,
reason="Mixed partitioning columns in Parquet",
**kwargs,
)
dataset = cls.get_dataset(
path_list, engine, kwargs.get("storage_options") or {}
)
index_columns = (
dataset.pandas_metadata.get("index_columns", [])
if dataset.pandas_metadata
Expand All @@ -659,5 +683,4 @@ def _read(cls, path, engine, columns, **kwargs):
for c in column_names
if c not in index_columns and not cls.index_regex.match(c)
]

return cls.build_query_compiler(dataset, columns, index_columns, **kwargs)
14 changes: 14 additions & 0 deletions modin/pandas/test/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -1356,6 +1356,20 @@ def test_read_parquet(
columns=columns,
)

def test_read_parquet_list_of_files_5698(self, engine, make_parquet_file):
with ensure_clean(".parquet") as f1, ensure_clean(
".parquet"
) as f2, ensure_clean(".parquet") as f3:
for f in [f1, f2, f3]:
make_parquet_file(filename=f)
eval_io(fn_name="read_parquet", path=[f1, f2, f3], engine=engine)

def test_empty_list(self, engine):
eval_io(fn_name="read_parquet", path=[], engine=engine)

def test_list_of_s3_file(self, engine):
raise NotImplementedError

@pytest.mark.xfail(
condition="config.getoption('--simulate-cloud').lower() != 'off'",
reason="The reason of tests fail in `cloud` mode is unknown for now - issue #3264",
Expand Down