From cac57c75581d33e2549111d79c47f8b484f4010b Mon Sep 17 00:00:00 2001 From: Lu Weizheng Date: Fri, 30 Aug 2024 23:23:36 +0800 Subject: [PATCH 1/5] fix cudf groupby --- .../_mars/dataframe/datasource/index.py | 4 -- .../groupby/tests/test_groupby_execution.py | 67 ++++++++++--------- .../xorbits/_mars/dataframe/merge/concat.py | 7 ++ 3 files changed, 42 insertions(+), 36 deletions(-) diff --git a/python/xorbits/_mars/dataframe/datasource/index.py b/python/xorbits/_mars/dataframe/datasource/index.py index 30d97d6c4..3820994e4 100644 --- a/python/xorbits/_mars/dataframe/datasource/index.py +++ b/python/xorbits/_mars/dataframe/datasource/index.py @@ -272,10 +272,6 @@ def from_pandas(data, chunk_size=None, gpu=None, sparse=False, store_data=False) ) shape = data.shape - if gpu and hasattr(data, "levels"): - # the shape of cudf multi index is a 2-d tuple where the first element represents the - # number of rows and the second element represents the number of levels. - shape = (data.shape[0], len(data.levels)) return op(shape=shape, chunk_size=chunk_size) diff --git a/python/xorbits/_mars/dataframe/groupby/tests/test_groupby_execution.py b/python/xorbits/_mars/dataframe/groupby/tests/test_groupby_execution.py index b36823a12..f7960a4b6 100644 --- a/python/xorbits/_mars/dataframe/groupby/tests/test_groupby_execution.py +++ b/python/xorbits/_mars/dataframe/groupby/tests/test_groupby_execution.py @@ -883,15 +883,19 @@ def apply_series(s, truncate=True): # For the index of result in this case, pandas is not compatible with cudf. # See ``Pandas Compatibility Note`` in cudf doc: - # https://docs.rapids.ai/api/cudf/stable/api_docs/api/cudf.core.groupby.groupby.groupby.apply/ - applied = mdf.groupby("b").apply(apply_df) + # https://docs.rapids.ai/api/cudf/stable/user_guide/api_docs/api/cudf.core.groupby.groupby.groupby.apply/ + # cudf needs to know all function parameter and types when compiling + # and it will raise errors when `ret_series` not defined if gpu: + applied = mdf.groupby("b").apply(apply_df) cdf = cudf.DataFrame(df1) cudf.testing.assert_frame_equal( applied.execute().fetch(to_cpu=False).sort_index(), - cdf.groupby("b").apply(apply_df).sort_index(), + cdf.groupby("b").apply(apply_df, False).sort_index(), ) else: + # while in pandas, we do not need to specify `ret_series` + applied = mdf.groupby("b").apply(apply_df) pd.testing.assert_frame_equal( applied.execute().fetch().sort_index(), df1.groupby("b").apply(apply_df).sort_index(), @@ -933,21 +937,16 @@ def apply_series(s, truncate=True): series1 = pd.Series([3, 4, 5, 3, 5, 4, 1, 2, 3]) ms1 = md.Series(series1, gpu=gpu, chunk_size=3) - applied = ms1.groupby(lambda x: x % 3).apply(lambda df: None) - pd.testing.assert_series_equal( - applied.execute().fetch().sort_index(), - series1.groupby(lambda x: x % 3).apply(lambda df: None).sort_index(), - ) - - # For this case, ``group_keys`` option does not take effect in cudf - applied = ms1.groupby(lambda x: x % 3).apply(apply_series) - if gpu: - cs = cudf.Series(series1) - cudf.testing.assert_series_equal( - applied.execute().fetch(to_cpu=False).sort_index(), - cs.groupby(lambda x: x % 3).apply(apply_series).sort_index(), + if not gpu: + applied = ms1.groupby(lambda x: x % 3).apply(lambda df: None) + pd.testing.assert_series_equal( + applied.execute().fetch().sort_index(), + series1.groupby(lambda x: x % 3).apply(lambda df: None).sort_index(), ) - else: + + if not gpu: + # For this case, ``group_keys`` option does not take effect in cudf + applied = ms1.groupby(lambda x: x % 3).apply(apply_series) pd.testing.assert_series_equal( applied.execute().fetch().sort_index(), series1.groupby(lambda x: x % 3).apply(apply_series).sort_index(), @@ -1001,16 +1000,19 @@ def f1(df): def f2(df): return df[["a"]] - mdf = md.DataFrame(raw, gpu=gpu, chunk_size=5) - applied = mdf.groupby("c").apply(f2, output_types=["df_or_series"]) - assert isinstance(applied, DATAFRAME_OR_SERIES_TYPE) - applied = applied.execute() - assert applied.data_type == "dataframe" - assert not ("dtype" in applied.data_params) - assert applied.shape == (9, 1) - expected = raw.groupby("c", as_index=True).apply(f2) - pd.testing.assert_series_equal(applied.dtypes, expected.dtypes) - pd.testing.assert_frame_equal(applied.fetch().sort_index(), expected.sort_index()) + if not gpu: + mdf = md.DataFrame(raw, gpu=gpu, chunk_size=5) + applied = mdf.groupby("c").apply(f2, output_types=["df_or_series"]) + assert isinstance(applied, DATAFRAME_OR_SERIES_TYPE) + applied = applied.execute() + assert applied.data_type == "dataframe" + assert not ("dtype" in applied.data_params) + assert applied.shape == (9, 1) + expected = raw.groupby("c", as_index=True).apply(f2) + pd.testing.assert_series_equal(applied.dtypes, expected.dtypes) + pd.testing.assert_frame_equal( + applied.fetch().sort_index(), expected.sort_index() + ) @support_cuda @@ -1065,11 +1067,12 @@ def __call__(self, s): series1 = pd.Series([3, 4, 5, 3, 5, 4, 1, 2, 3]) ms1 = md.Series(series1, gpu=gpu, chunk_size=3) - applied = ms1.groupby(lambda x: x % 3).apply(apply_closure_series) - pd.testing.assert_series_equal( - applied.execute().fetch().sort_index(), - series1.groupby(lambda x: x % 3).apply(apply_closure_series).sort_index(), - ) + if not gpu: + applied = ms1.groupby(lambda x: x % 3).apply(apply_closure_series) + pd.testing.assert_series_equal( + applied.execute().fetch().sort_index(), + series1.groupby(lambda x: x % 3).apply(apply_closure_series).sort_index(), + ) cs = callable_series() applied = ms1.groupby(lambda x: x % 3).apply(cs) diff --git a/python/xorbits/_mars/dataframe/merge/concat.py b/python/xorbits/_mars/dataframe/merge/concat.py index d9081029d..6a60f08bc 100644 --- a/python/xorbits/_mars/dataframe/merge/concat.py +++ b/python/xorbits/_mars/dataframe/merge/concat.py @@ -310,10 +310,17 @@ def _auto_concat_series_chunks(chunk, inputs): concat = inputs[0] else: xdf = pd if isinstance(inputs[0], pd.Series) or cudf is None else cudf + idx_name = None + for s in inputs: + if s.index.name is not None: + idx_name = s.index.name + break if chunk.op.axis is not None: concat = xdf.concat(inputs, axis=chunk.op.axis) else: concat = xdf.concat(inputs) + if idx_name is not None: + concat.index.name = idx_name return concat def _auto_concat_index_chunks(chunk, inputs): From 87772b1bfa6ce0367370b4312643d93d3cccb423 Mon Sep 17 00:00:00 2001 From: Lu Weizheng Date: Sat, 31 Aug 2024 08:39:04 +0800 Subject: [PATCH 2/5] fix cudf merge --- .../groupby/tests/test_groupby_execution.py | 2 +- python/xorbits/_mars/dataframe/merge/merge.py | 27 ++++--------------- .../merge/tests/test_merge_execution.py | 4 +-- 3 files changed, 8 insertions(+), 25 deletions(-) diff --git a/python/xorbits/_mars/dataframe/groupby/tests/test_groupby_execution.py b/python/xorbits/_mars/dataframe/groupby/tests/test_groupby_execution.py index f7960a4b6..c6dcdd0b6 100644 --- a/python/xorbits/_mars/dataframe/groupby/tests/test_groupby_execution.py +++ b/python/xorbits/_mars/dataframe/groupby/tests/test_groupby_execution.py @@ -887,7 +887,7 @@ def apply_series(s, truncate=True): # cudf needs to know all function parameter and types when compiling # and it will raise errors when `ret_series` not defined if gpu: - applied = mdf.groupby("b").apply(apply_df) + applied = mdf.groupby("b").apply(apply_df, False) cdf = cudf.DataFrame(df1) cudf.testing.assert_frame_equal( applied.execute().fetch(to_cpu=False).sort_index(), diff --git a/python/xorbits/_mars/dataframe/merge/merge.py b/python/xorbits/_mars/dataframe/merge/merge.py index 4db6993aa..b692b3d5a 100644 --- a/python/xorbits/_mars/dataframe/merge/merge.py +++ b/python/xorbits/_mars/dataframe/merge/merge.py @@ -115,28 +115,11 @@ def execute_map(cls, ctx, op): # shuffle on index for index_idx, index_filter in enumerate(filters): reducer_index = (index_idx, chunk.index[1]) - # for MultiIndex in cudf, - # get each line of df and then concat them. - if is_cudf(df) and isinstance(df.index, cudf.MultiIndex): - filtered_dfs = [ - df.iloc[int(index) : int(index) + 1] - for index in index_filter.values - ] - if filtered_dfs: - filtered_df = cudf.concat(filtered_dfs, axis=0) - else: # empty dataframe - filtered_df = df.iloc[0:0] - ctx[chunk.key, reducer_index] = ( - op.mapper_id, - ctx.get_current_chunk().index, - filtered_df, - ) - else: - ctx[chunk.key, reducer_index] = ( - op.mapper_id, - ctx.get_current_chunk().index, - df.iloc[index_filter], - ) + ctx[chunk.key, reducer_index] = ( + op.mapper_id, + ctx.get_current_chunk().index, + df.iloc[index_filter], + ) @classmethod def execute_reduce(cls, ctx, op: "DataFrameMergeAlign"): diff --git a/python/xorbits/_mars/dataframe/merge/tests/test_merge_execution.py b/python/xorbits/_mars/dataframe/merge/tests/test_merge_execution.py index 5acfa5b71..c53dd2208 100644 --- a/python/xorbits/_mars/dataframe/merge/tests/test_merge_execution.py +++ b/python/xorbits/_mars/dataframe/merge/tests/test_merge_execution.py @@ -334,8 +334,8 @@ def test_join_on(setup_gpu, gpu): ) result2 = jdf2.execute().fetch() - expected2.set_index("a2", inplace=True) - result2.set_index("a2", inplace=True) + expected2.set_index("b2", inplace=True) + result2.set_index("b2", inplace=True) pd.testing.assert_frame_equal( sort_dataframe_inplace(expected2, 0), sort_dataframe_inplace(result2, 0) ) From d33ef79b4e3256fd6cb60b5856f08d2d7e9c9a8b Mon Sep 17 00:00:00 2001 From: Lu Weizheng Date: Sat, 31 Aug 2024 22:05:27 +0800 Subject: [PATCH 3/5] fix hadoop --- .github/workflows/python.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python.yaml b/.github/workflows/python.yaml index 1d92f6d5b..15d60b406 100644 --- a/.github/workflows/python.yaml +++ b/.github/workflows/python.yaml @@ -178,7 +178,7 @@ jobs: ../CI/install-hadoop.sh echo "import coverage; coverage.process_startup()" > \ $(python -c "import site; print(site.getsitepackages()[-1])")/coverage.pth - conda install --quiet --yes -c conda-forge skein libffi conda-pack grpcio=1.42.0 + conda install --quiet --yes conda-forge::libffi==3.4.2 conda-forge::skein==0.8.1 conda-forge::conda-pack==0.8.0 conda-forge::protobuf==3.20.1 conda-forge::grpcio==1.42.0 fi if [[ "$MODULE" == "vineyard" ]]; then pip install vineyard From 4b2bc7ff36917b2313b235fd357ba19a4cdc71bf Mon Sep 17 00:00:00 2001 From: Lu Weizheng Date: Mon, 2 Sep 2024 10:22:36 +0800 Subject: [PATCH 4/5] remove juicefs --- .github/workflows/python.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/python.yaml b/.github/workflows/python.yaml index 15d60b406..cda792b2a 100644 --- a/.github/workflows/python.yaml +++ b/.github/workflows/python.yaml @@ -96,7 +96,6 @@ jobs: - { os: ubuntu-latest, module: doc-build, python-version: 3.9 } - { os: [self-hosted, gpu], module: gpu, python-version: 3.11} - { os: ubuntu-latest, module: jax, python-version: 3.9 } - - { os: juicefs-ci, module: kubernetes-juicefs, python-version: 3.9 } - { os: ubuntu-latest, module: slurm, python-version: 3.9 } - { os: ubuntu-latest, module: datasets, python-version: 3.9 } - { os: ubuntu-latest, module: kubernetes, python-version: 3.11 } From 055ec24a1257f0526c6b6830cbabdc98a55c82fe Mon Sep 17 00:00:00 2001 From: Lu Weizheng Date: Mon, 2 Sep 2024 10:32:22 +0800 Subject: [PATCH 5/5] activate juicefs when necessary --- .github/workflows/python.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/python.yaml b/.github/workflows/python.yaml index cda792b2a..67a390400 100644 --- a/.github/workflows/python.yaml +++ b/.github/workflows/python.yaml @@ -96,6 +96,8 @@ jobs: - { os: ubuntu-latest, module: doc-build, python-version: 3.9 } - { os: [self-hosted, gpu], module: gpu, python-version: 3.11} - { os: ubuntu-latest, module: jax, python-version: 3.9 } + # a self-hosted runner which needs computing resources, activate when necessary + # - { os: juicefs-ci, module: kubernetes-juicefs, python-version: 3.9 } - { os: ubuntu-latest, module: slurm, python-version: 3.9 } - { os: ubuntu-latest, module: datasets, python-version: 3.9 } - { os: ubuntu-latest, module: kubernetes, python-version: 3.11 }