Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REFACTOR-#7427: Require query compilers to expose engine and storage format. #7430

Merged
5 changes: 5 additions & 0 deletions modin/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ class TestQC(BaseQueryCompiler):
def __init__(self, modin_frame):
self._modin_frame = modin_frame

storage_format = property(
lambda self: "Base", doc=BaseQueryCompiler.storage_format.__doc__
)
engine = property(lambda self: "Python", doc=BaseQueryCompiler.engine.__doc__)

def finalize(self):
self._modin_frame.finalize()

Expand Down
33 changes: 29 additions & 4 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import datetime
import re
from abc import ABC, abstractmethod
from functools import cached_property
from typing import TYPE_CHECKING, Callable, Dict, Hashable, List, Optional, Union

Expand All @@ -33,7 +34,6 @@
from pandas.core.indexes.api import Index, RangeIndex

from modin.config import (
Engine,
IsRayCluster,
MinColumnPartitionSize,
MinRowPartitionSize,
Expand Down Expand Up @@ -80,7 +80,7 @@


class PandasDataframe(
ClassLogger, modin_layer="CORE-DATAFRAME", log_level=LogLevel.DEBUG
ABC, ClassLogger, modin_layer="CORE-DATAFRAME", log_level=LogLevel.DEBUG
):
"""
An abstract class that represents the parent class for any pandas storage format dataframe class.
Expand Down Expand Up @@ -122,6 +122,31 @@ class PandasDataframe(
_dtypes: Optional[ModinDtypes] = None
_pandas_backend: Optional[str] = None

@property
def storage_format(self) -> str:
"""
The storage format for this frame's data.

Returns
-------
str
The storage format.
"""
return "Pandas"

@property
@abstractmethod
def engine(self) -> str:
"""
The engine for this frame.

Returns
-------
str
The engine.
"""
pass

@cached_property
def __constructor__(self) -> type[PandasDataframe]:
"""
Expand Down Expand Up @@ -1707,7 +1732,7 @@ def astype(self, col_dtypes, errors: str = "raise"):
new_dtypes = self_dtypes.copy()
# Update the new dtype series to the proper pandas dtype
new_dtype = pandas.api.types.pandas_dtype(dtype)
if Engine.get() == "Dask" and hasattr(dtype, "_is_materialized"):
if self.engine == "Dask" and hasattr(dtype, "_is_materialized"):
# FIXME: https://github.com/dask/distributed/issues/8585
_ = dtype._materialize_categories()

Expand Down Expand Up @@ -1736,7 +1761,7 @@ def astype_builder(df):
if not (col_dtypes == self_dtypes).all():
new_dtypes = self_dtypes.copy()
new_dtype = pandas.api.types.pandas_dtype(col_dtypes)
if Engine.get() == "Dask" and hasattr(new_dtype, "_is_materialized"):
if self.engine == "Dask" and hasattr(new_dtype, "_is_materialized"):
# FIXME: https://github.com/dask/distributed/issues/8585
_ = new_dtype._materialize_categories()
if isinstance(new_dtype, pandas.CategoricalDtype):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""Module houses class that implements ``PandasDataframe``."""

from modin.core.dataframe.pandas.dataframe.dataframe import PandasDataframe
from modin.utils import _inherit_docstrings

from ..partitioning.partition_manager import PandasOnDaskDataframePartitionManager

Expand Down Expand Up @@ -66,3 +67,8 @@ def __reduce__(self): # noqa: GL08

address = default_client().scheduler_info()["address"]
return self.reconnect, (address, self.__dict__)

@property
@_inherit_docstrings(PandasDataframe.engine)
def engine(self) -> str:
return "Dask"
2 changes: 2 additions & 0 deletions modin/core/execution/dispatching/factories/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,8 @@ def _to_pickle_glob(cls, *args, **kwargs):
**kwargs : kwargs
Arguments to the writer method.
"""
# TODO(https://github.com/modin-project/modin/issues/7429): Use
# frame-level execution instead of the global, default execution.
current_execution = get_current_execution()
if current_execution not in supported_executions:
raise NotImplementedError(
Expand Down
2 changes: 2 additions & 0 deletions modin/core/execution/modin_aqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ def call_progress_bar(result_parts, line_no):

threading.Thread(target=_show_time_updates, args=(progress_bars[pbar_id],)).start()

# TODO(https://github.com/modin-project/modin/issues/7429): Use
# frame-level engine config.
modin_engine = Engine.get()
engine_wrapper = None
if modin_engine == "Ray":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"""

from modin.core.dataframe.pandas.dataframe.dataframe import PandasDataframe
from modin.utils import _inherit_docstrings

from ..partitioning.partition_manager import PandasOnPythonDataframePartitionManager

Expand Down Expand Up @@ -50,3 +51,8 @@ class PandasOnPythonDataframe(PandasDataframe):
"""

_partition_mgr_cls = PandasOnPythonDataframePartitionManager

@property
@_inherit_docstrings(PandasDataframe.engine)
def engine(self) -> str:
return "Python"
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from modin.core.dataframe.base.dataframe.utils import Axis
from modin.core.dataframe.pandas.dataframe.dataframe import PandasDataframe
from modin.utils import _inherit_docstrings

from ..partitioning.partition_manager import PandasOnRayDataframePartitionManager

Expand Down Expand Up @@ -66,3 +67,8 @@ def _get_lengths(self, parts, axis):
dims = [part.width(False) for part in parts]

return self._partition_mgr_cls.materialize_futures(dims)

@property
@_inherit_docstrings(PandasDataframe.engine)
def engine(self) -> str:
return "Ray"
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""Module houses class that implements ``PandasDataframe`` using unidist."""

from modin.core.dataframe.pandas.dataframe.dataframe import PandasDataframe
from modin.utils import _inherit_docstrings

from ..partitioning.partition_manager import PandasOnUnidistDataframePartitionManager

Expand Down Expand Up @@ -47,3 +48,8 @@ class PandasOnUnidistDataframe(PandasDataframe):
def support_materialization_in_worker_process(self) -> bool:
# more details why this is not `True` in https://github.com/modin-project/modin/pull/6673
return False

@property
@_inherit_docstrings(PandasDataframe.engine)
def engine(self) -> str:
return "Unidist"
3 changes: 3 additions & 0 deletions modin/core/execution/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ def remote_function(func, ignore_defaults=False):

# Check if the function already exists to avoid circular imports
elif "remote_function" not in dir():
# TODO(https://github.com/modin-project/modin/issues/7429): Use
# frame-level engine config.

from modin.config import Engine

if Engine.get() == "Ray":
Expand Down
26 changes: 26 additions & 0 deletions modin/core/storage_formats/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,32 @@ class BaseQueryCompiler(
_modin_frame: PandasDataframe
_shape_hint: Optional[str]

@property
@abc.abstractmethod
def storage_format(self) -> str:
"""
The storage format for this query compiler.

Returns
-------
str
The storage format.
"""
pass

@property
@abc.abstractmethod
def engine(self) -> str:
"""
The engine for this query compiler.

Returns
-------
str
The engine.
"""
pass

def __wrap_in_qc(self, obj):
"""
Wrap `obj` in query compiler.
Expand Down
5 changes: 5 additions & 0 deletions modin/core/storage_formats/pandas/native_query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,11 @@ def __init__(self, pandas_frame, shape_hint: Optional[str] = None):
self._modin_frame = pandas_frame
self._shape_hint = shape_hint

storage_format = property(
lambda self: "Pandas", doc=BaseQueryCompiler.storage_format.__doc__
)
engine = property(lambda self: "Native", doc=BaseQueryCompiler.engine.__doc__)

def execute(self):
pass

Expand Down
3 changes: 3 additions & 0 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,9 @@ def __init__(self, modin_frame: PandasDataframe, shape_hint: Optional[str] = Non
self._modin_frame = modin_frame
self._shape_hint = shape_hint

storage_format = property(lambda self: self._modin_frame.storage_format)
engine = property(lambda self: self._modin_frame.engine)

@property
def lazy_row_labels(self):
"""
Expand Down
3 changes: 3 additions & 0 deletions modin/error_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ def single_warning(

@classmethod
def default_to_pandas(cls, message: str = "", reason: str = "") -> None:
# TODO(https://github.com/modin-project/modin/issues/7429): Use
# frame-level engine config.

if message != "":
execution_str = get_current_execution()
message = (
Expand Down
4 changes: 4 additions & 0 deletions modin/tests/pandas/native_df_mode/test_indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,10 @@ def applyier(df):
"check_dtypes": not (len(pandas_df) == 0 and len(pandas_df.columns) != 0)
},
expected_exception=expected_exception,
check_for_execution_propagation=False,
no_check_for_execution_propagation_reason=(
"https://github.com/modin-project/modin/issues/7428"
),
)


Expand Down
46 changes: 45 additions & 1 deletion modin/tests/pandas/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
MinRowPartitionSize,
NPartitions,
RangePartitioning,
StorageFormat,
TestDatasetSize,
TrackFileLeaks,
)
Expand Down Expand Up @@ -924,10 +925,19 @@ def eval_general(
check_kwargs_callable=True,
md_extra_kwargs=None,
comparator_kwargs=None,
check_for_execution_propagation=True,
no_check_for_execution_propagation_reason=None,
**kwargs,
):
md_kwargs, pd_kwargs = {}, {}

if isinstance(modin_df, (pd.DataFrame, pd.Series)):
original_engine = modin_df._query_compiler.engine
original_storage_format = modin_df._query_compiler.storage_format
else:
original_engine = None
original_storage_format = None

def execute_callable(fn, inplace=False, md_kwargs={}, pd_kwargs={}):
try:
pd_result = fn(pandas_df, **pd_kwargs)
Expand Down Expand Up @@ -1000,7 +1010,35 @@ def execute_callable(fn, inplace=False, md_kwargs={}, pd_kwargs={}):
operation, md_kwargs=md_kwargs, pd_kwargs=pd_kwargs, inplace=__inplace__
)
if values is not None:
comparator(*values, **(comparator_kwargs or {}))
assert isinstance(values, tuple) and len(values) == 2
modin_result, pandas_result = values
if (
isinstance(modin_result, (pd.DataFrame, pd.Series))
and original_engine is not None
and original_storage_format is not None
):
if check_for_execution_propagation:
assert modin_result._query_compiler.engine == original_engine, (
f"Result engine {modin_result._query_compiler.engine} does "
+ f"not match expected engine {original_engine}"
)
assert (
modin_result._query_compiler.storage_format
== original_storage_format
), (
"Result storage format "
+ f"{modin_result._query_compiler.storage_format} does "
+ f"not match expected storage format {original_storage_format}"
)
else:
assert (
isinstance(no_check_for_execution_propagation_reason, str)
and len(no_check_for_execution_propagation_reason) > 0
), (
"Must provide a reason for not expecting the operation to "
+ "propagate dataframe/series engine."
)
comparator(modin_result, pandas_result, **(comparator_kwargs or {}))


def eval_io(
Expand Down Expand Up @@ -1042,6 +1080,12 @@ def applyier(module, *args, **kwargs):
result = getattr(module, fn_name)(*args, **kwargs)
if cast_to_str:
result = result.astype(str)
if isinstance(result, (pd.DataFrame, pd.Series)):
# Input methods that return a dataframe, e.g. read_csv, should
# return a dataframe with engine and storage_format that match
# the default Engine and StorageFormat, respectively.
assert result._query_compiler.engine == Engine.get()
assert result._query_compiler.storage_format == StorageFormat.get()
return result

def call_eval_general():
Expand Down
2 changes: 2 additions & 0 deletions modin/tests/test_executions_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ def test_base_abstract_methods():
"default_to_pandas",
"from_interchange_dataframe",
"to_interchange_dataframe",
"engine",
"storage_format",
]

not_implemented_methods = BASE_EXECUTION.__abstractmethods__.difference(
Expand Down
Loading