Skip to content

Commit

Permalink
dask shuffle (#41)
Browse files Browse the repository at this point in the history
* shuffle
  • Loading branch information
luweizheng authored Apr 25, 2024
1 parent 7abb591 commit 6652b8a
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 17 deletions.
10 changes: 5 additions & 5 deletions ch-dask-dataframe/indexing.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"如 {numref}`fig-pandas-dataframe-model` 所示,pandas DataFrame 主要对二维的表进行处理,有列标签和行标签。行标签通常会被用户忽视,但实际上起着至关重要的作用,比如索引(Indexing)。大多数 pandas DataFrame 的行标签是排好序的索引,比如从 0 开始递增。 DataFrame 里面的数据也是有序的\n",
"如 {numref}`fig-pandas-dataframe-model` 所示,pandas DataFrame 主要对二维的表进行处理,有列标签和行标签。行标签通常会被用户忽视,但实际上起着至关重要的作用:索引(Indexing)。大多数 pandas DataFrame 的行标签是排好序的索引,比如从 0 开始递增。 这种排好序的索引使得 pandas DataFrame 里面的数据是有序的\n",
"\n",
"```{figure} ../img/ch-dask-dataframe/dataframe-model.svg\n",
"---\n",
Expand All @@ -59,7 +59,7 @@
"pandas DataFrame 数据模型\n",
"```\n",
"\n",
"创建 pandas DataFrame 时,会在最左侧自动生成了索引列,它不是 DataFrame 的“官方”字段,因为索引列并没有列名"
"创建 pandas DataFrame 时,会在最左侧自动生成了索引列。从下面的例子可以看出来,索引列没有列名,称不上是一个“字段”,它是在传入数据的字段基础上新增的列"
]
},
{
Expand Down Expand Up @@ -404,7 +404,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"但是可以支持列标签,或者 `:` 这样的通配符"
"但是可以支持列标签来选择某些列;或者行标签上的 `:` 通配符选择所有的行"
]
},
{
Expand Down Expand Up @@ -695,7 +695,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"这个例子设置 `col1` 列为索引列,2 个 Partition 中的数据被打乱重排。如果是在数据量很大的场景,全局数据排序和重分布的成本极高。因此应该尽量避免这个操作。`set_index()` 也有它的优势,它可以加速下游的计算。"
"这个例子设置 `col1` 列为索引列,2 个 Partition 中的数据被打乱重排。如果是在数据量很大的场景,全局数据排序和重分布的成本极高。因此应该尽量避免这个操作。`set_index()` 也有它的优势,它可以加速下游的计算。数据重分布又被称为 Shuffle,我们会在 {numref}`sec-dask-dataframe-shuffle` 中介绍 Shuffle 的计算过程和成本。"
]
},
{
Expand Down Expand Up @@ -827,7 +827,7 @@
"source": [
"### `reset_index()`\n",
"\n",
"在 pandas 中,默认 `as_index=True`,分组字段经过 `groupby()` 之后成为索引列。索引列在 DataFrame 中并不是正式的数据列,如果分组聚合之后只有一个字段(不考虑分组字段),分组聚合的结果就成了一个 `Series`。比如下面 pandas 的例子,`Origin` 列就是分组字段,如果不设置 `as_index=False`,`groupby(\"Origin\", as_index=False)[\"DepDelay\"].mean()` 生成的是一个 `Series`。"
"在 pandas 中,`groupby` 默认 `as_index=True`,分组字段经过 `groupby()` 之后成为索引列。索引列在 DataFrame 中并不是“正式”的字段,如果分组聚合之后只有一个“正式”字段(不考虑索引列),分组聚合的结果就成了一个 `Series`。比如下面 pandas 的例子,`Origin` 列就是分组字段,如果不设置 `as_index=False`,`groupby(\"Origin\", as_index=False)[\"DepDelay\"].mean()` 生成的是一个 `Series`。"
]
},
{
Expand Down
138 changes: 130 additions & 8 deletions ch-dask-dataframe/shuffle.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,126 @@
"(sec-dask-dataframe-shuffle)=\n",
"# Shuffle\n",
"\n",
"在分布式场景下,`sort`,`merge`,`groupby` 有可能会在不同 Worker 之间交换数据,即 Shuffle。这些 pandas 算子在单机上实现起来比较简单,但是在大数据分布式计算场景,实现起来并不简单。\n",
"Dask 在 `2023.1` 版本之后提供了一种新的 Shuffle 方法,可以加速大部分计算任务。\n",
"对于一个分布式系统,在不同 Worker 之间交换数据被称为 Shuffle,或者说 Shuffle 将数据从某个 Partition 移动到其他 Partition。有些 Shuffle 是显式的,比如 `repartition`,从 API 名称可以看出它会在不同的 Partition 之间交换数据;有些 Shuffle 是隐式的,比如 `sort`,`merge` 或 `groupby` 背后都有 Shuffle 过程。Shuffle 一直是分布式大数据计算领域的难题之一,像 `sort`、`merge` 或 `groupby` 这些 pandas 算子在单机上实现起来相对简单,但是在大数据分布式计算场景,实现起来并不那么容易。\n",
"\n",
"## `groupby`\n",
"## Shuffle 实现机制\n",
"\n",
"{numref}`fig-dataframe-groupby` 展示了 `groupby` 在单机上的操作流程,它主要有三个阶段:分组、聚合、输出。分布式场景下,不同的数据分布在不同的 Partition 下。\n",
"{numref}`sec-dask-task-graph` 介绍了 Dask 主要基于 Task Graph 构建, Dask 的 Task Graph 是一个有向无环图。有向边表示下游 Partition 的输入依赖上游 Partition 的输出,任何数据移动都会在 Task Graph 上生成一条有向边。很多计算任务的 Shuffle 有大量的数据移动,有的场景下,所有数据都会打散,也意味着上游会有多条指向下游的边。这种基于 Task Graph 的 Shuffle 会使得 Task Graph 非常庞大,Task Graph 过大会使得 Dask Scheduler 的负载过重,进一步导致计算极其缓慢。如 {numref}`fig-dask-shuffle` 左侧所示,`tasks` 是基于 Task Graph 的机制,上游和下游之间建立了有向边,如果有中间层(通常因为上游流入的数据太大,需要将数据进一步切分成多个 Partition),那么中间层还会进一步增加 Task Graph 的复杂程度。\n",
"\n",
"为解决 Task Graph 过大的问题,Dask 设计了一种点对点(Peer-to-peer)的 Shuffle 机制。如 {numref}`fig-dask-shuffle` 右侧所示,`p2p` 在 Task Graph 中引入了一个虚拟的障碍(Barrier)节点。Barrier 并不是一个真正的 Task,引入 Barrier 节点可以使 Task Graph 复杂度显著下降。\n",
"\n",
"```{figure} ../img/ch-dask-dataframe/shuffle-tasks-p2p.png\n",
"---\n",
"width: 800px\n",
"name: fig-shuffle-tasks-p2p\n",
"---\n",
"Dask 仪表盘\n",
"```\n",
"\n",
"目前,Dask 提供了两类 Shuffle 实现策略:单机和分布式。\n",
"\n",
"* 单机。如果数据大小超出了内存空间,可以将中间数据写到磁盘上。单机场景默认使用这种策略。\n",
"* 分布式。如 {numref}`fig-dask-shuffle` 所示,分布式场景提供了两种 Shuffle 策略,`tasks` 和 `p2p`。`tasks` 是基于 Task Graph 的 Shuffle 实现,很多场景效率比较低,会遇到刚提到的 Task Graph 过大的问题。`p2p` 基于点对点的 Shuffle 实现,Task Graph 的复杂性显著降低,性能也显著提升。Dask 会优先选择 `p2p`。\n",
"\n",
"`dask.config.set({\"dataframe.shuffle.method\": \"p2p\"})` 对当前 Python 脚本的所有计算都使用 `p2p` 方式进行 Shuffle。也可以针对某个算子设置 Shuffle 策略,比如 `ddf.merge(shuffle_method=\"p2p\")`。\n",
"\n",
"为对比两种分布式场景的 Shuffle 机制性能,这里搭建了一个两节点的 Dask 集群,并用 `shuffle` 进行了测试,读者也可以使用单机的 `LocalCluster`,调大数据量,观察不同的 Shuffle 机制的性能表现。"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"import dask\n",
"from dask.distributed import Client, LocalCluster\n",
"\n",
"dask.config.set({'dataframe.query-planning': False})\n",
"\n",
"# 将 `10.0.0.3:8786` 更换为你的 Scheduler 地址\n",
"# 如果没有 Dask 集群,可以使用 LocalCluster\n",
"# client = Client(LocalCluster())\n",
"client = Client(\"10.0.0.3:8786\")\n",
"\n",
"ddf = dask.datasets.timeseries(\n",
" start=\"2024-01-01\",\n",
" end=\"2024-07-01\",\n",
" dtypes={\"x\": float, \"y\": float},\n",
" freq=\"1 h\",\n",
" )"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 138 ms, sys: 19 ms, total: 157 ms\n",
"Wall time: 5.58 s\n"
]
}
],
"source": [
"%%time\n",
"with dask.config.set({\"dataframe.shuffle.method\": \"p2p\"}):\n",
" shuffled = ddf.shuffle(on=\"x\")\n",
" shuffled.compute()"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 136 ms, sys: 8.48 ms, total: 144 ms\n",
"Wall time: 15.8 s\n"
]
}
],
"source": [
"%%time\n",
"with dask.config.set({\"dataframe.shuffle.method\": \"tasks\"}):\n",
" shuffled = ddf.shuffle(on=\"x\")\n",
" shuffled.compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 数据重分布\n",
"\n",
"Dask 提供了三种数据重分布方法:`set_index`,`repartition` 和 `shuffle`,这三种都可能在全局层面对数据进行重分布。\n",
"\n",
"```{table} Dask 三种数据重分布方法\n",
":name: tab-dask-repartition\n",
"\n",
"| 方法名 | 用途 | 是否修改索引 | 是否可以修改 Partition 数量 |\n",
"|---\t|---\t|---\t|---\t|\n",
"| [`DataFrame.set_index`](https://docs.dask.org/en/latest/generated/dask_expr._collection.DataFrame.set_index.html) | 修改索引列,加速后续基于索引列的计算\t| 是 | 是\t|\n",
"| [`DataFrame.repartition`](https://docs.dask.org/en/latest/generated/dask_expr._collection.DataFrame.repartition.html) | 修改 Partition 数量,多用于数据倾斜场景 | 否\t| 是 |\n",
"| [`DataFrame.shuffle`](https://docs.dask.org/en/latest/generated/dask_expr._collection.DataFrame.shuffle.html) | 将相同的值归结到同一个 Partition | 否 | 是 |\n",
"```\n",
"\n",
"在 {numref}`sec-dask-dataframe-indexing` 我们提过,`set_index` 将某字段设置为索引列,后续一系列计算非常依赖这个字段,`set_index` 能显著加速后续计算。`repartition` 主要解决数据倾斜的问题,即某些 Partiton 上的数据过大,过大的 Partition 有可能导致内存不足。"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 案例分析:`groupby`\n",
"\n",
"我们以 `groupby(by=key).sum()` 为例,了解其背后的 Shuffle 过程。{numref}`fig-dataframe-groupby` 展示了计算过程,它主要有三个阶段:分组、组内聚合(比如 `sum`)、组间整合。\n",
"\n",
"```{figure} ../img/ch-dask-dataframe/groupby.svg\n",
"---\n",
Expand All @@ -22,9 +136,17 @@
"DataFrame groupby 示意图\n",
"```\n",
"\n",
"* `groupby(indexed_columns).agg()` 和 `groupby(indexed_columns).apply(user_def_fn)` 性能最好。`indexed_columns` 指的是索引列 Key,`agg` 指的是 Dask DataFrame 提供的官方的 `sum`,`mean`,`nunique` 等聚合方法。因为 `indexed_columns` 是排过序的了,可以很快地对 `indexed_columns` 进行分组,Shuffle 数据量不大。\n",
"* `groupby(non_indexed_columns).agg()` 的数据交换量要更大一些,`agg` 是 Dask 官方提供的方法,做过一些优化。\n",
"* `groupby(non_indexed_columns).apply(user_def_fn)` 的成本最高。它既要对所有数据进行交换,又要执行用户自定义的函数,\n"
"分布式场景下,不同的数据分布在不同的 Partition 下,涉及到 Shuffle 的阶段有:\n",
"\n",
"* 分组:按照 `by` 指定的分组字段进行分组,相同的分组字段被分到一起,这里涉及到大量 Shuffle 操作。\n",
"* 组内聚合:组内聚合的 Shuffle 操作相对比较少。\n",
"* 组间聚合:组间聚合的 Shuffle 操作相对比较小。\n",
"\n",
"根据 Shuffle 操作的数量,不难得出结论:\n",
"\n",
"* `groupby(by=indexed_columns).agg()` 和 `groupby(by=indexed_columns).apply(user_def_fn)` 性能最好。`indexed_columns` 指的是分组字段 `by` 在索引列({numref}`sec-dask-dataframe-indexing` 中 `set_index` 的列);`agg` 指的是 Dask DataFrame 提供的官方的 `sum`,`mean` 等聚合方法。因为 `indexed_columns` 是排过序的了,可以很快地对 `indexed_columns` 进行分组和数据分发。\n",
"* `groupby(by=non_indexed_columns).agg()` 的数据交换量要更大一些,Dask 官方提供的 `agg` 方法做过一些优化。\n",
"* `groupby(by=non_indexed_columns).apply(user_def_fn)` 的成本最高。它既要对所有数据进行交换,又要执行用户自定义的函数,用户自定义函数的效率比 Dask 官方的低。"
]
},
{
Expand All @@ -51,7 +173,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.7"
"version": "3.11.8"
}
},
"nbformat": 4,
Expand Down
6 changes: 5 additions & 1 deletion ch-dask/task-graph-partitioning.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -514,9 +514,13 @@
"\n",
"数据块过大,则 Dask Worker 很容易内存耗尽(Out of Memory,OOM),因为所切分的数据块无法被单个 Dask Worker 所处理。Dask 遇到 OOM 时,会将部分数据卸载到(Spill)硬盘,如果 Spill 之后仍无法完成计算,Dask Worker 进程可能被重启,甚至反复重启。\n",
"\n",
"### 迭代式算法\n",
"\n",
"迭代式算法通常会使用循环,循环的当前迭代依赖之前迭代的数据。Dask 的 Task Graph 对于这类迭代式算法处理得并不好,每个数据依赖都会在 Task Graph 中增加有向边,进而会使得 Task Graph 非常庞大,导致执行效率很低。比如,很多机器学习算法、SQL JOIN 都是基于循环的迭代式算法,用户需要对这些操作有心理准备。\n",
"\n",
"## 设置正确的数据块大小\n",
"\n",
"总之,在做数据块切分时,不应过大,也不应过小。Dask 没有一个简单通用的设置,需要开发者根据自身数据的情况和 Dask 的仪表盘或日志来不断调整。\n",
"总之,在做数据块切分时,不应过大,也不应过小。Dask 没有一个简单通用的设置原则,需要开发者根据自身数据的情况和 Dask 的仪表盘或日志来不断调整。Dask Array 中使用 `rechunk(chunks=...)` 设置数据块大小,`chunks` 参数可以是 `int` 表示一个切分成多少个数据块,也可以是 `(5, 10, 20)` 这样的 `tuple`,表示单个数据块的维度大小。Dask DataFrame 中使用 `repartition(npartitions=...)` 设置数据块大小。 \n",
"\n",
"### 仪表盘\n",
"\n",
Expand Down
2 changes: 1 addition & 1 deletion ch-ray-train-tune/index.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Ray Train 和 Ray Tune
# Ray Train 和 Tune

```{tableofcontents}
```
2 changes: 1 addition & 1 deletion contribute/info.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
参考 [Github Desktop 教程](https://www.classicpress.net/github-desktop-a-really-really-simple-tutorial/)[Git 教程](https://git-scm.com/book/zh/v2/GitHub-对项目做出贡献) 创建 Fork,并将代码仓库克隆到本地。

```bash
git clone https://github.com/godaai/python-data-science.git
git clone https://github.com/<username>/scale-py-zh.git
```

## 环境配置
Expand Down
Binary file added img/ch-dask-dataframe/shuffle-tasks-p2p.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion index.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Dask、Ray、Xorbits、mpi4py

% The SVG rendering breaks latex builds for the GitHub badge, so only include in HTML
```{only} html
[![](https://img.shields.io/github/stars/godaai/scale-py-zh?style=for-the-badge)](https://github.com/godaai/distributed-python)
[![](https://img.shields.io/github/stars/godaai/scale-py-zh?style=for-the-badge)](https://github.com/godaai/scale-py-zh)
```

:::
Expand Down

0 comments on commit 6652b8a

Please sign in to comment.