diff --git a/ch-dask-dataframe/indexing.ipynb b/ch-dask-dataframe/indexing.ipynb index 8206544..42f8f48 100644 --- a/ch-dask-dataframe/indexing.ipynb +++ b/ch-dask-dataframe/indexing.ipynb @@ -49,7 +49,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "如 {numref}`fig-pandas-dataframe-model` 所示,pandas DataFrame 主要对二维的表进行处理,有列标签和行标签。行标签通常会被用户忽视,但实际上起着至关重要的作用,比如索引(Indexing)。大多数 pandas DataFrame 的行标签是排好序的索引,比如从 0 开始递增。 DataFrame 里面的数据也是有序的。\n", + "如 {numref}`fig-pandas-dataframe-model` 所示,pandas DataFrame 主要对二维的表进行处理,有列标签和行标签。行标签通常会被用户忽视,但实际上起着至关重要的作用:索引(Indexing)。大多数 pandas DataFrame 的行标签是排好序的索引,比如从 0 开始递增。 这种排好序的索引使得 pandas DataFrame 里面的数据是有序的。\n", "\n", "```{figure} ../img/ch-dask-dataframe/dataframe-model.svg\n", "---\n", @@ -59,7 +59,7 @@ "pandas DataFrame 数据模型\n", "```\n", "\n", - "创建 pandas DataFrame 时,会在最左侧自动生成了索引列,它不是 DataFrame 的“官方”字段,因为索引列并没有列名。" + "创建 pandas DataFrame 时,会在最左侧自动生成了索引列。从下面的例子可以看出来,索引列没有列名,称不上是一个“字段”,它是在传入数据的字段基础上新增的列。" ] }, { @@ -404,7 +404,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "但是可以支持列标签,或者 `:` 这样的通配符:" + "但是可以支持列标签来选择某些列;或者行标签上的 `:` 通配符选择所有的行:" ] }, { @@ -695,7 +695,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "这个例子设置 `col1` 列为索引列,2 个 Partition 中的数据被打乱重排。如果是在数据量很大的场景,全局数据排序和重分布的成本极高。因此应该尽量避免这个操作。`set_index()` 也有它的优势,它可以加速下游的计算。" + "这个例子设置 `col1` 列为索引列,2 个 Partition 中的数据被打乱重排。如果是在数据量很大的场景,全局数据排序和重分布的成本极高。因此应该尽量避免这个操作。`set_index()` 也有它的优势,它可以加速下游的计算。数据重分布又被称为 Shuffle,我们会在 {numref}`sec-dask-dataframe-shuffle` 中介绍 Shuffle 的计算过程和成本。" ] }, { @@ -827,7 +827,7 @@ "source": [ "### `reset_index()`\n", "\n", - "在 pandas 中,默认 `as_index=True` 时,分组字段经过 `groupby()` 之后成为索引列。索引列在 DataFrame 中并不是正式的数据列,如果分组聚合之后只有一个字段(不考虑分组字段),分组聚合的结果就成了一个 `Series`。比如下面 pandas 的例子,`Origin` 列就是分组字段,如果不设置 `as_index=False`,`groupby(\"Origin\", as_index=False)[\"DepDelay\"].mean()` 生成的是一个 `Series`。" + "在 pandas 中,`groupby` 默认 `as_index=True`,分组字段经过 `groupby()` 之后成为索引列。索引列在 DataFrame 中并不是“正式”的字段,如果分组聚合之后只有一个“正式”字段(不考虑索引列),分组聚合的结果就成了一个 `Series`。比如下面 pandas 的例子,`Origin` 列就是分组字段,如果不设置 `as_index=False`,`groupby(\"Origin\", as_index=False)[\"DepDelay\"].mean()` 生成的是一个 `Series`。" ] }, { diff --git a/ch-dask-dataframe/shuffle.ipynb b/ch-dask-dataframe/shuffle.ipynb index b7ef8fb..964a9ed 100644 --- a/ch-dask-dataframe/shuffle.ipynb +++ b/ch-dask-dataframe/shuffle.ipynb @@ -7,12 +7,126 @@ "(sec-dask-dataframe-shuffle)=\n", "# Shuffle\n", "\n", - "在分布式场景下,`sort`,`merge`,`groupby` 有可能会在不同 Worker 之间交换数据,即 Shuffle。这些 pandas 算子在单机上实现起来比较简单,但是在大数据分布式计算场景,实现起来并不简单。\n", - "Dask 在 `2023.1` 版本之后提供了一种新的 Shuffle 方法,可以加速大部分计算任务。\n", + "对于一个分布式系统,在不同 Worker 之间交换数据被称为 Shuffle,或者说 Shuffle 将数据从某个 Partition 移动到其他 Partition。有些 Shuffle 是显式的,比如 `repartition`,从 API 名称可以看出它会在不同的 Partition 之间交换数据;有些 Shuffle 是隐式的,比如 `sort`,`merge` 或 `groupby` 背后都有 Shuffle 过程。Shuffle 一直是分布式大数据计算领域的难题之一,像 `sort`、`merge` 或 `groupby` 这些 pandas 算子在单机上实现起来相对简单,但是在大数据分布式计算场景,实现起来并不那么容易。\n", "\n", - "## `groupby`\n", + "## Shuffle 实现机制\n", "\n", - "{numref}`fig-dataframe-groupby` 展示了 `groupby` 在单机上的操作流程,它主要有三个阶段:分组、聚合、输出。分布式场景下,不同的数据分布在不同的 Partition 下。\n", + "{numref}`sec-dask-task-graph` 介绍了 Dask 主要基于 Task Graph 构建, Dask 的 Task Graph 是一个有向无环图。有向边表示下游 Partition 的输入依赖上游 Partition 的输出,任何数据移动都会在 Task Graph 上生成一条有向边。很多计算任务的 Shuffle 有大量的数据移动,有的场景下,所有数据都会打散,也意味着上游会有多条指向下游的边。这种基于 Task Graph 的 Shuffle 会使得 Task Graph 非常庞大,Task Graph 过大会使得 Dask Scheduler 的负载过重,进一步导致计算极其缓慢。如 {numref}`fig-dask-shuffle` 左侧所示,`tasks` 是基于 Task Graph 的机制,上游和下游之间建立了有向边,如果有中间层(通常因为上游流入的数据太大,需要将数据进一步切分成多个 Partition),那么中间层还会进一步增加 Task Graph 的复杂程度。\n", + "\n", + "为解决 Task Graph 过大的问题,Dask 设计了一种点对点(Peer-to-peer)的 Shuffle 机制。如 {numref}`fig-dask-shuffle` 右侧所示,`p2p` 在 Task Graph 中引入了一个虚拟的障碍(Barrier)节点。Barrier 并不是一个真正的 Task,引入 Barrier 节点可以使 Task Graph 复杂度显著下降。\n", + "\n", + "```{figure} ../img/ch-dask-dataframe/shuffle-tasks-p2p.png\n", + "---\n", + "width: 800px\n", + "name: fig-shuffle-tasks-p2p\n", + "---\n", + "Dask 仪表盘\n", + "```\n", + "\n", + "目前,Dask 提供了两类 Shuffle 实现策略:单机和分布式。\n", + "\n", + "* 单机。如果数据大小超出了内存空间,可以将中间数据写到磁盘上。单机场景默认使用这种策略。\n", + "* 分布式。如 {numref}`fig-dask-shuffle` 所示,分布式场景提供了两种 Shuffle 策略,`tasks` 和 `p2p`。`tasks` 是基于 Task Graph 的 Shuffle 实现,很多场景效率比较低,会遇到刚提到的 Task Graph 过大的问题。`p2p` 基于点对点的 Shuffle 实现,Task Graph 的复杂性显著降低,性能也显著提升。Dask 会优先选择 `p2p`。\n", + "\n", + "`dask.config.set({\"dataframe.shuffle.method\": \"p2p\"})` 对当前 Python 脚本的所有计算都使用 `p2p` 方式进行 Shuffle。也可以针对某个算子设置 Shuffle 策略,比如 `ddf.merge(shuffle_method=\"p2p\")`。\n", + "\n", + "为对比两种分布式场景的 Shuffle 机制性能,这里搭建了一个两节点的 Dask 集群,并用 `shuffle` 进行了测试,读者也可以使用单机的 `LocalCluster`,调大数据量,观察不同的 Shuffle 机制的性能表现。" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "import dask\n", + "from dask.distributed import Client, LocalCluster\n", + "\n", + "dask.config.set({'dataframe.query-planning': False})\n", + "\n", + "# 将 `10.0.0.3:8786` 更换为你的 Scheduler 地址\n", + "# 如果没有 Dask 集群,可以使用 LocalCluster\n", + "# client = Client(LocalCluster())\n", + "client = Client(\"10.0.0.3:8786\")\n", + "\n", + "ddf = dask.datasets.timeseries(\n", + " start=\"2024-01-01\",\n", + " end=\"2024-07-01\",\n", + " dtypes={\"x\": float, \"y\": float},\n", + " freq=\"1 h\",\n", + " )" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "CPU times: user 138 ms, sys: 19 ms, total: 157 ms\n", + "Wall time: 5.58 s\n" + ] + } + ], + "source": [ + "%%time\n", + "with dask.config.set({\"dataframe.shuffle.method\": \"p2p\"}):\n", + " shuffled = ddf.shuffle(on=\"x\")\n", + " shuffled.compute()" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "CPU times: user 136 ms, sys: 8.48 ms, total: 144 ms\n", + "Wall time: 15.8 s\n" + ] + } + ], + "source": [ + "%%time\n", + "with dask.config.set({\"dataframe.shuffle.method\": \"tasks\"}):\n", + " shuffled = ddf.shuffle(on=\"x\")\n", + " shuffled.compute()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 数据重分布\n", + "\n", + "Dask 提供了三种数据重分布方法:`set_index`,`repartition` 和 `shuffle`,这三种都可能在全局层面对数据进行重分布。\n", + "\n", + "```{table} Dask 三种数据重分布方法\n", + ":name: tab-dask-repartition\n", + "\n", + "| 方法名 | 用途 | 是否修改索引 | 是否可以修改 Partition 数量 |\n", + "|---\t|---\t|---\t|---\t|\n", + "| [`DataFrame.set_index`](https://docs.dask.org/en/latest/generated/dask_expr._collection.DataFrame.set_index.html) | 修改索引列,加速后续基于索引列的计算\t| 是 | 是\t|\n", + "| [`DataFrame.repartition`](https://docs.dask.org/en/latest/generated/dask_expr._collection.DataFrame.repartition.html) | 修改 Partition 数量,多用于数据倾斜场景 | 否\t| 是 |\n", + "| [`DataFrame.shuffle`](https://docs.dask.org/en/latest/generated/dask_expr._collection.DataFrame.shuffle.html) | 将相同的值归结到同一个 Partition | 否 | 是 |\n", + "```\n", + "\n", + "在 {numref}`sec-dask-dataframe-indexing` 我们提过,`set_index` 将某字段设置为索引列,后续一系列计算非常依赖这个字段,`set_index` 能显著加速后续计算。`repartition` 主要解决数据倾斜的问题,即某些 Partiton 上的数据过大,过大的 Partition 有可能导致内存不足。" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 案例分析:`groupby`\n", + "\n", + "我们以 `groupby(by=key).sum()` 为例,了解其背后的 Shuffle 过程。{numref}`fig-dataframe-groupby` 展示了计算过程,它主要有三个阶段:分组、组内聚合(比如 `sum`)、组间整合。\n", "\n", "```{figure} ../img/ch-dask-dataframe/groupby.svg\n", "---\n", @@ -22,9 +136,17 @@ "DataFrame groupby 示意图\n", "```\n", "\n", - "* `groupby(indexed_columns).agg()` 和 `groupby(indexed_columns).apply(user_def_fn)` 性能最好。`indexed_columns` 指的是索引列 Key,`agg` 指的是 Dask DataFrame 提供的官方的 `sum`,`mean`,`nunique` 等聚合方法。因为 `indexed_columns` 是排过序的了,可以很快地对 `indexed_columns` 进行分组,Shuffle 数据量不大。\n", - "* `groupby(non_indexed_columns).agg()` 的数据交换量要更大一些,`agg` 是 Dask 官方提供的方法,做过一些优化。\n", - "* `groupby(non_indexed_columns).apply(user_def_fn)` 的成本最高。它既要对所有数据进行交换,又要执行用户自定义的函数,\n" + "分布式场景下,不同的数据分布在不同的 Partition 下,涉及到 Shuffle 的阶段有:\n", + "\n", + "* 分组:按照 `by` 指定的分组字段进行分组,相同的分组字段被分到一起,这里涉及到大量 Shuffle 操作。\n", + "* 组内聚合:组内聚合的 Shuffle 操作相对比较少。\n", + "* 组间聚合:组间聚合的 Shuffle 操作相对比较小。\n", + "\n", + "根据 Shuffle 操作的数量,不难得出结论:\n", + "\n", + "* `groupby(by=indexed_columns).agg()` 和 `groupby(by=indexed_columns).apply(user_def_fn)` 性能最好。`indexed_columns` 指的是分组字段 `by` 在索引列({numref}`sec-dask-dataframe-indexing` 中 `set_index` 的列);`agg` 指的是 Dask DataFrame 提供的官方的 `sum`,`mean` 等聚合方法。因为 `indexed_columns` 是排过序的了,可以很快地对 `indexed_columns` 进行分组和数据分发。\n", + "* `groupby(by=non_indexed_columns).agg()` 的数据交换量要更大一些,Dask 官方提供的 `agg` 方法做过一些优化。\n", + "* `groupby(by=non_indexed_columns).apply(user_def_fn)` 的成本最高。它既要对所有数据进行交换,又要执行用户自定义的函数,用户自定义函数的效率比 Dask 官方的低。" ] }, { @@ -51,7 +173,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.7" + "version": "3.11.8" } }, "nbformat": 4, diff --git a/ch-dask/task-graph-partitioning.ipynb b/ch-dask/task-graph-partitioning.ipynb index 637e3d3..e37b3bd 100644 --- a/ch-dask/task-graph-partitioning.ipynb +++ b/ch-dask/task-graph-partitioning.ipynb @@ -514,9 +514,13 @@ "\n", "数据块过大,则 Dask Worker 很容易内存耗尽(Out of Memory,OOM),因为所切分的数据块无法被单个 Dask Worker 所处理。Dask 遇到 OOM 时,会将部分数据卸载到(Spill)硬盘,如果 Spill 之后仍无法完成计算,Dask Worker 进程可能被重启,甚至反复重启。\n", "\n", + "### 迭代式算法\n", + "\n", + "迭代式算法通常会使用循环,循环的当前迭代依赖之前迭代的数据。Dask 的 Task Graph 对于这类迭代式算法处理得并不好,每个数据依赖都会在 Task Graph 中增加有向边,进而会使得 Task Graph 非常庞大,导致执行效率很低。比如,很多机器学习算法、SQL JOIN 都是基于循环的迭代式算法,用户需要对这些操作有心理准备。\n", + "\n", "## 设置正确的数据块大小\n", "\n", - "总之,在做数据块切分时,不应过大,也不应过小。Dask 没有一个简单通用的设置,需要开发者根据自身数据的情况和 Dask 的仪表盘或日志来不断调整。\n", + "总之,在做数据块切分时,不应过大,也不应过小。Dask 没有一个简单通用的设置原则,需要开发者根据自身数据的情况和 Dask 的仪表盘或日志来不断调整。Dask Array 中使用 `rechunk(chunks=...)` 设置数据块大小,`chunks` 参数可以是 `int` 表示一个切分成多少个数据块,也可以是 `(5, 10, 20)` 这样的 `tuple`,表示单个数据块的维度大小。Dask DataFrame 中使用 `repartition(npartitions=...)` 设置数据块大小。 \n", "\n", "### 仪表盘\n", "\n", diff --git a/ch-ray-train-tune/index.md b/ch-ray-train-tune/index.md index 29671a2..c266b11 100644 --- a/ch-ray-train-tune/index.md +++ b/ch-ray-train-tune/index.md @@ -1,4 +1,4 @@ -# Ray Train 和 Ray Tune +# Ray Train 和 Tune ```{tableofcontents} ``` \ No newline at end of file diff --git a/contribute/info.md b/contribute/info.md index 42c7c43..ee8a49a 100644 --- a/contribute/info.md +++ b/contribute/info.md @@ -11,7 +11,7 @@ 参考 [Github Desktop 教程](https://www.classicpress.net/github-desktop-a-really-really-simple-tutorial/) 或 [Git 教程](https://git-scm.com/book/zh/v2/GitHub-对项目做出贡献) 创建 Fork,并将代码仓库克隆到本地。 ```bash -git clone https://github.com/godaai/python-data-science.git +git clone https://github.com/<username>/scale-py-zh.git ``` ## 环境配置 diff --git a/img/ch-dask-dataframe/shuffle-tasks-p2p.png b/img/ch-dask-dataframe/shuffle-tasks-p2p.png new file mode 100644 index 0000000..e7554f4 Binary files /dev/null and b/img/ch-dask-dataframe/shuffle-tasks-p2p.png differ diff --git a/index.md b/index.md index 89ae596..554f61c 100644 --- a/index.md +++ b/index.md @@ -19,7 +19,7 @@ Dask、Ray、Xorbits、mpi4py % The SVG rendering breaks latex builds for the GitHub badge, so only include in HTML ```{only} html -[](https://github.com/godaai/distributed-python) +[](https://github.com/godaai/scale-py-zh) ``` :::