Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PERF-#4804: Preserve lengths/widths caches in broadcast_apply_full_axis #6760

Merged
merged 6 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3272,6 +3272,27 @@ def broadcast_apply_full_axis(
kw["column_widths"] = self._column_widths_cache
elif len(new_columns) == 1 and new_partitions.shape[1] == 1:
kw["column_widths"] = [1]
else:
if (
axis == 0
and kw["row_lengths"] is None
and self._row_lengths_cache is not None
and ModinIndex.is_materialized_index(new_index)
and len(new_index) == sum(self._row_lengths_cache)
# to avoid problems that may arise when filtering empty dataframes
and all(r != 0 for r in self._row_lengths_cache)
Comment on lines +3282 to +3283
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what kind of problems do you mean here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic for filtering empty partitions is too unpredictable at first glance, so I don’t want to enable length calculations in this mode for now.

):
kw["row_lengths"] = self._row_lengths_cache
if (
axis == 1
and kw["column_widths"] is None
and self._column_widths_cache is not None
and ModinIndex.is_materialized_index(new_columns)
and len(new_columns) == sum(self._column_widths_cache)
# to avoid problems that may arise when filtering empty dataframes
and all(w != 0 for w in self._column_widths_cache)
):
kw["column_widths"] = self._column_widths_cache

result = self.__constructor__(
new_partitions, index=new_index, columns=new_columns, **kw
Expand Down
65 changes: 65 additions & 0 deletions modin/test/storage_formats/pandas/test_internals.py
Original file line number Diff line number Diff line change
Expand Up @@ -1398,6 +1398,71 @@ def test_sort_values_cache():
validate_partitions_cache(mf_initial, axis=1)


def test_apply_full_axis_preserve_widths():
md_df = construct_modin_df_by_scheme(
pandas.DataFrame(
{"a": [1, 2, 3, 4], "b": [3, 4, 5, 6], "c": [6, 7, 8, 9], "d": [0, 1, 2, 3]}
),
{"row_lengths": [2, 2], "column_widths": [2, 2]},
)._query_compiler._modin_frame

assert md_df._row_lengths_cache == [2, 2]
assert md_df._column_widths_cache == [2, 2]

def func(df):
if df.iloc[0, 0] == 1:
return pandas.DataFrame(
{"a": [1, 2, 3], "b": [3, 4, 5], "c": [6, 7, 8], "d": [0, 1, 2]}
)
else:
return pandas.DataFrame({"a": [4], "b": [6], "c": [9], "d": [3]})

res = md_df.apply_full_axis(
func=func,
axis=1,
new_index=[0, 1, 2, 3],
new_columns=["a", "b", "c", "d"],
keep_partitioning=True,
)
col_widths_cache = res._column_widths_cache
actual_column_widths = [part.width() for part in res._partitions[0]]

assert col_widths_cache == actual_column_widths
assert res._row_lengths_cache is None


def test_apply_full_axis_preserve_lengths():
md_df = construct_modin_df_by_scheme(
pandas.DataFrame(
{"a": [1, 2, 3, 4], "b": [3, 4, 5, 6], "c": [6, 7, 8, 9], "d": [0, 1, 2, 3]}
),
{"row_lengths": [2, 2], "column_widths": [2, 2]},
)._query_compiler._modin_frame

assert md_df._row_lengths_cache == [2, 2]
assert md_df._column_widths_cache == [2, 2]

def func(df):
if df.iloc[0, 0] == 1:
return pandas.DataFrame({"a": [3, 2, 3, 4], "b": [3, 4, 5, 6]})
else:
return pandas.DataFrame({"c": [9, 5, 6, 7]})

res = md_df.apply_full_axis(
func=func,
axis=0,
new_index=[0, 1, 2, 3],
new_columns=["a", "b", "c"],
keep_partitioning=True,
)

row_lengths_cache = res._row_lengths_cache
actual_row_lengths = [part.length() for part in res._partitions[:, 0]]

assert row_lengths_cache == actual_row_lengths
assert res._column_widths_cache is None


class DummyFuture:
"""
A dummy object emulating future's behaviour, this class is used in ``test_call_queue_serialization``.
Expand Down
Loading