From 2b124b97394c2f6d907de9ca0bb84c0b3f11600e Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Mon, 20 Nov 2023 18:58:27 +0100 Subject: [PATCH 1/6] PERF-#4804: Preserve lengths/widths caches in 'broadcast_apply_full_axis' Signed-off-by: Anatoly Myachev --- .../dataframe/pandas/dataframe/dataframe.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index bafde25e949..cf856da4ff9 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -3272,6 +3272,25 @@ def broadcast_apply_full_axis( kw["column_widths"] = self._column_widths_cache elif len(new_columns) == 1 and new_partitions.shape[1] == 1: kw["column_widths"] = [1] + else: + if ( + kw["row_lengths"] is None + and new_index is not None + and self._row_lengths_cache is not None + and len(new_index) == sum(self._row_lengths_cache) + # to avoid problems that may arise when filtering empty dataframes + and all(r != 0 for r in self._row_lengths_cache) + ): + kw["row_lengths"] = self._row_lengths_cache + if ( + kw["column_widths"] is None + and new_columns is not None + and self._column_widths_cache is not None + and len(new_columns) == sum(self._column_widths_cache) + # to avoid problems that may arise when filtering empty dataframes + and all(w != 0 for w in self._column_widths_cache) + ): + kw["column_widths"] = self._column_widths_cache result = self.__constructor__( new_partitions, index=new_index, columns=new_columns, **kw From d3724a1785fafe5ffea2483702a04ff0e2caf061 Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Wed, 22 Nov 2023 18:21:09 +0100 Subject: [PATCH 2/6] draft changes Signed-off-by: Anatoly Myachev --- .../dataframe/pandas/dataframe/dataframe.py | 6 +- .../storage_formats/pandas/test_internals.py | 66 +++++++++++++++++++ 2 files changed, 70 insertions(+), 2 deletions(-) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index cf856da4ff9..7a01f4bc2de 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -3274,7 +3274,8 @@ def broadcast_apply_full_axis( kw["column_widths"] = [1] else: if ( - kw["row_lengths"] is None + axis == 0 + and kw["row_lengths"] is None and new_index is not None and self._row_lengths_cache is not None and len(new_index) == sum(self._row_lengths_cache) @@ -3283,7 +3284,8 @@ def broadcast_apply_full_axis( ): kw["row_lengths"] = self._row_lengths_cache if ( - kw["column_widths"] is None + axis == 1 + and kw["column_widths"] is None and new_columns is not None and self._column_widths_cache is not None and len(new_columns) == sum(self._column_widths_cache) diff --git a/modin/test/storage_formats/pandas/test_internals.py b/modin/test/storage_formats/pandas/test_internals.py index c638457dfa1..f6e78a02023 100644 --- a/modin/test/storage_formats/pandas/test_internals.py +++ b/modin/test/storage_formats/pandas/test_internals.py @@ -1398,6 +1398,72 @@ def test_sort_values_cache(): validate_partitions_cache(mf_initial, axis=1) +def test_apply_full_axis_preserve_widths(): + md_df = construct_modin_df_by_scheme( + pandas.DataFrame( + {"a": [1, 2, 3, 4], "b": [3, 4, 5, 6], "c": [6, 7, 8, 9], "d": [0, 1, 2, 3]} + ), + {"row_lengths": [2, 2], "column_widths": [2, 2]}, + )._query_compiler._modin_frame + + assert md_df._row_lengths_cache == [2, 2] + assert md_df._column_widths_cache == [2, 2] + + def func(df): + if df.iloc[0, 0] == 1: + return pandas.DataFrame( + {"a": [1, 2, 3], "b": [3, 4, 5], "c": [6, 7, 8], "d": [0, 1, 2]} + ) + else: + return pandas.DataFrame({"a": [4], "b": [6], "c": [9], "d": [3]}) + + res = md_df.apply_full_axis( + func=func, + axis=1, + new_index=[0, 1, 2, 3], + new_columns=["a", "b", "c", "d"], + keep_partitioning=True, + ) + + actual_column_widths = [part.width() for part in res._partitions[0]] + + assert res._column_widths_cache == actual_column_widths + assert res._row_lengths_cache is None + + +def test_apply_full_axis_preserve_lengths(): + md_df = construct_modin_df_by_scheme( + pandas.DataFrame( + {"a": [1, 2, 3, 4], "b": [3, 4, 5, 6], "c": [6, 7, 8, 9], "d": [0, 1, 2, 3]} + ), + {"row_lengths": [2, 2], "column_widths": [2, 2]}, + )._query_compiler._modin_frame + + assert md_df._row_lengths_cache == [2, 2] + assert md_df._column_widths_cache == [2, 2] + + def func(df): + if df.iloc[0, 0] == 1: + return pandas.DataFrame( + {"a": [1, 2, 3], "b": [3, 4, 5], "c": [6, 7, 8], "d": [0, 1, 2]} + ) + else: + return pandas.DataFrame({"a": [4], "b": [6], "c": [9], "d": [3]}) + + res = md_df.apply_full_axis( + func=func, + axis=0, + new_index=[0, 1, 2, 3], + new_columns=["a", "b", "c", "d"], + keep_partitioning=True, + ) + + actual_row_lengths = [part.length() for part in res._partitions[:, 0]] + + assert res._row_lengths_cache == actual_row_lengths + assert res._column_widths_cache is None + + class DummyFuture: """ A dummy object emulating future's behaviour, this class is used in ``test_call_queue_serialization``. From 687808a9c05b95d4ff4833f3eacef7ab52f759d4 Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Mon, 4 Dec 2023 02:14:42 +0100 Subject: [PATCH 3/6] fixes Signed-off-by: Anatoly Myachev --- modin/core/storage_formats/pandas/query_compiler.py | 1 - 1 file changed, 1 deletion(-) diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index 91ea6c08c0a..b68a4f61a12 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -61,7 +61,6 @@ from modin.core.dataframe.pandas.metadata import ( DtypesDescriptor, ModinDtypes, - ModinIndex, extract_dtype, ) from modin.core.storage_formats.base.query_compiler import BaseQueryCompiler From d98a34dfced60d0237bb79374617387939cf8be8 Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Mon, 4 Dec 2023 17:10:12 +0100 Subject: [PATCH 4/6] Revert "fixes" This reverts commit 5777085ecfaafe0ac560b3d3781115444ca7dddb. --- modin/core/storage_formats/pandas/query_compiler.py | 1 + 1 file changed, 1 insertion(+) diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index b68a4f61a12..91ea6c08c0a 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -61,6 +61,7 @@ from modin.core.dataframe.pandas.metadata import ( DtypesDescriptor, ModinDtypes, + ModinIndex, extract_dtype, ) from modin.core.storage_formats.base.query_compiler import BaseQueryCompiler From 1f014450c4e3f78af8121b57077c417da0d0942d Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Mon, 4 Dec 2023 20:13:08 +0100 Subject: [PATCH 5/6] fixes Signed-off-by: Anatoly Myachev --- modin/core/dataframe/pandas/dataframe/dataframe.py | 4 ++-- modin/test/storage_formats/pandas/test_internals.py | 8 +++----- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 7a01f4bc2de..e9ff924f883 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -3276,8 +3276,8 @@ def broadcast_apply_full_axis( if ( axis == 0 and kw["row_lengths"] is None - and new_index is not None and self._row_lengths_cache is not None + and ModinIndex.is_materialized_index(new_index) and len(new_index) == sum(self._row_lengths_cache) # to avoid problems that may arise when filtering empty dataframes and all(r != 0 for r in self._row_lengths_cache) @@ -3286,8 +3286,8 @@ def broadcast_apply_full_axis( if ( axis == 1 and kw["column_widths"] is None - and new_columns is not None and self._column_widths_cache is not None + and ModinIndex.is_materialized_index(new_columns) and len(new_columns) == sum(self._column_widths_cache) # to avoid problems that may arise when filtering empty dataframes and all(w != 0 for w in self._column_widths_cache) diff --git a/modin/test/storage_formats/pandas/test_internals.py b/modin/test/storage_formats/pandas/test_internals.py index f6e78a02023..33729489e92 100644 --- a/modin/test/storage_formats/pandas/test_internals.py +++ b/modin/test/storage_formats/pandas/test_internals.py @@ -1444,17 +1444,15 @@ def test_apply_full_axis_preserve_lengths(): def func(df): if df.iloc[0, 0] == 1: - return pandas.DataFrame( - {"a": [1, 2, 3], "b": [3, 4, 5], "c": [6, 7, 8], "d": [0, 1, 2]} - ) + return pandas.DataFrame({"a": [3, 2, 3, 4], "b": [3, 4, 5, 6]}) else: - return pandas.DataFrame({"a": [4], "b": [6], "c": [9], "d": [3]}) + return pandas.DataFrame({"c": [9, 5, 6, 7]}) res = md_df.apply_full_axis( func=func, axis=0, new_index=[0, 1, 2, 3], - new_columns=["a", "b", "c", "d"], + new_columns=["a", "b", "c"], keep_partitioning=True, ) From c0ca634235f3e0c63164e6a30c28fa18c8a1a756 Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Tue, 5 Dec 2023 12:58:17 +0100 Subject: [PATCH 6/6] Apply suggestions from code review Co-authored-by: Dmitry Chigarev --- modin/test/storage_formats/pandas/test_internals.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/modin/test/storage_formats/pandas/test_internals.py b/modin/test/storage_formats/pandas/test_internals.py index 33729489e92..c0eb02ca971 100644 --- a/modin/test/storage_formats/pandas/test_internals.py +++ b/modin/test/storage_formats/pandas/test_internals.py @@ -1424,10 +1424,10 @@ def func(df): new_columns=["a", "b", "c", "d"], keep_partitioning=True, ) - + col_widths_cache = res._column_widths_cache actual_column_widths = [part.width() for part in res._partitions[0]] - assert res._column_widths_cache == actual_column_widths + assert col_widths_cache == actual_column_widths assert res._row_lengths_cache is None @@ -1456,9 +1456,10 @@ def func(df): keep_partitioning=True, ) + row_lengths_cache = res._row_lengths_cache actual_row_lengths = [part.length() for part in res._partitions[:, 0]] - assert res._row_lengths_cache == actual_row_lengths + assert row_lengths_cache == actual_row_lengths assert res._column_widths_cache is None