Skip to content

Commit

Permalink
mpi
Browse files Browse the repository at this point in the history
  • Loading branch information
luweizheng committed Feb 12, 2024
1 parent 59a0ec5 commit 8bb9cc8
Show file tree
Hide file tree
Showing 26 changed files with 630 additions and 390 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ data/
*.DS_Store
*.csv
*egg-info*
dist*
_build/*
test*.md
.idea
Expand Down
15 changes: 7 additions & 8 deletions _toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ subtrees:
- file: ch-dask/task-graph-partitioning
- file: ch-dask-dataframe/index
entries:
- file: ch-dask-dataframe/dask-pandas
- file: ch-dask-dataframe/read-write
# - file: ch-ray-core/index
# entries:
Expand All @@ -37,13 +36,13 @@ subtrees:
# - file: ch-ray-data/data-load-inspect-save
# - file: ch-ray-data/data-transform
# - file: ch-ray-data/preprocessor
# - file: ch-mpi/index
# entries:
# - file: ch-mpi/mpi-intro
# - file: ch-mpi/mpi-hello-world
# - file: ch-mpi/point-to-point
# - file: ch-mpi/collective
# - file: ch-mpi/remote-memory-access
- file: ch-mpi/index
entries:
- file: ch-mpi/mpi-intro
- file: ch-mpi/mpi-hello-world
- file: ch-mpi/point-to-point
- file: ch-mpi/collective
- file: ch-mpi/remote-memory-access
# - file: ch-mpi-large-model/index
# entries:
# - file: ch-mpi-large-model/nccl
Expand Down
21 changes: 11 additions & 10 deletions ch-dask-dataframe/read-write.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -1257,7 +1257,16 @@
"* Embedded schema\n",
"* Data compression\n",
"\n",
"Columnar storage organizes data by columns instead of rows. Specifically, unlike CSV, which stores data by rows, Parquet stores data via columns. Data analysts are usually interested in specific columns rather than all of them. Parquet allows convenient filtering of unnecessary columns during data reading, resulting in improved performance by reducing the amount of data to be read. Parquet is extensively used in the Apache Spark, Apache Hive, and Apache Flink ecosystems. Parquet inherently includes schema information, embedding metadata such as column names and data types for each column within each Parquet file. This feature eliminates inaccuracies in data types inferring of Dask DataFrame. Data in Parquet is compressed, making it more space-efficient compared to CSV.\n",
"As shown in {numref}`parquet-with-row-group, columnar storage organizes data by columns instead of rows. Data analysts are usually interested in specific columns rather than all of them. Parquet allows filtering of unnecessary columns during data reading, resulting in enhanced performance by reducing the amount of data to be read. Parquet is extensively used in the Apache Spark, Apache Hive, and Apache Flink ecosystems. Parquet inherently includes schema information, embedding metadata such as column names and data types for each column within each Parquet file. This feature eliminates inaccuracies in data types inferring of Dask DataFrame. Data in Parquet is compressed, making it more space-efficient compared to CSV.\n",
"\n",
"\n",
"```{figure} ../img/ch-dask-dataframe/parquet.svg\n",
"---\n",
"width: 600px\n",
"name: parquet-with-row-group\n",
"---\n",
"A Parquet file consists of at least one Row Group. The data within a Row Group is further divided into Column Chunks, each dedicated to storing a specific column.\n",
"```\n",
"\n",
"For instance, it is advisable to read only the necessary columns rather than all of them:\n",
"\n",
Expand All @@ -1268,17 +1277,9 @@
")\n",
"```\n",
"\n",
"Additionally, Parquet introduces the concept of Row Groups, as illustrated in {numref}`parquet-row-group`. Data in a Parquet file is grouped into Row Groups, defining the number of rows within each group. In the example, there are three Row Groups, each containing two rows of data. Each Row Group stores metadata such as maximum and minimum values of their columns. When querying certain columns, metadata helps determine whether to read a specific Row Group, minimizing unnecessary data retrieval.\n",
"Additionally, Parquet introduces the concept of Row Groups, as illustrated in {numref}`parquet-with-row-group`. Data in a Parquet file is grouped into Row Groups, defining the number of rows within each group. In the example, there are three Row Groups, each containing two rows of data. Each Row Group stores metadata such as maximum and minimum values of their columns. When querying certain columns, metadata helps determine whether to read a specific Row Group, minimizing unnecessary data retrieval.\n",
"For example, if a column represents a time series, and a query involves \"sales from 9:00 to 12:00 every day,\" the metadata within the Row Group records the maximum and minimum values of the time column. By utilizing this metadata, it becomes possible to determine whether it is necessary to read the specific Row Group.\n",
"\n",
"```{figure} ../img/ch-dask-dataframe/parquet-row-group.svg\n",
"---\n",
"width: 600px\n",
"name: parquet-row-group\n",
"---\n",
"Parquet Columnar Storage with Row Groups\n",
"```\n",
"\n",
"With Row Groups, a Parquet file may contain multiple groups. However, many Parquet files have only one Row Group.\n",
"\n",
"Typically, enterprise data files are split into multiple Parquet files and organized in a specific manner, such as by time:\n",
Expand Down
10 changes: 5 additions & 5 deletions ch-mpi/broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@

N = 5
if comm.rank == 0:
A = np.arange(N, dtype=np.float64) # rank 0 初始化数据到变量 A
A = np.arange(N, dtype=np.float64) # rank 0 initializes data into variable A
else:
A = np.empty(N, dtype=np.float64) # 其他节点的变量 A 为空
A = np.empty(N, dtype=np.float64) # As on other processes are empty

# 广播
# Broadcast
comm.Bcast([A, MPI.DOUBLE])

# 验证所有节点上的 A
print("Rank: %d, data: %s" % (comm.rank, A))
# Print to verify
print("Rank:%2d, data:%s" % (comm.rank, A))
69 changes: 36 additions & 33 deletions ch-mpi/collective.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,27 @@
"metadata": {},
"source": [
"(mpi-collective)=\n",
"# 集合通信\n",
"# Collective Communication\n",
"\n",
"{ref}`mpi-point2point` 介绍了点对点通信,即发送方和接收方之间相互传输数据。本节主要介绍一种全局的通信方式:集合通信,即在一个组里的多个进程间同时传输数据。集合通信目前只有阻塞的方式。\n",
"In the {ref}`mpi-point2point` section, we discussed point-to-point communication, which involves the mutual transfer of data between sender and receiver. This section focuses on a different type of communication – collective communication, where data is simultaneously transmitted among multiple processes within a group. Collective communication only supports blocking modes.\n",
"\n",
"常用的集合通信主要有以下几类:\n",
"The commonly used collective communications include:\n",
"\n",
"* 同步,比如 [`Comm.Barrier`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Barrier)\n",
"* 数据移动,比如 [`Comm.Bcast`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Bcast) [`Comm.Scatter`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Scatter)[`Comm.Gather`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Gather)等。\n",
"* 集合计算,比如 [`Comm.Reduce`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Reduce) [`Intracomm.Scan`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Intracomm.html#mpi4py.MPI.Intracomm.Scan) 等。\n",
"* Synchronization: For example, [`Comm.Barrier`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Barrier).\n",
"* Data Movement: Such as [`Comm.Bcast`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Bcast), [`Comm.Scatter`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Scatter), [`Comm.Gather`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Gather), etc.\n",
"* Collective Computation: Including [`Comm.Reduce`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Reduce), [`Intracomm.Scan`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Intracomm.html#mpi4py.MPI.Intracomm.Scan), etc.\n",
"\n",
"首字母大写的函数是基于缓存的,比如 `Comm.Bcast` `Comm.Scatter``Comm.Gather`[`Comm.Allgather`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Allgather), [`Comm.Alltoall`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Alltoall)。首字母小写的函数可以收发 Python 对象,比如 [`Comm.bcast`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.bcast)[`Comm.scatter`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.scatter)[`Comm.gather`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.gather)[`Comm.allgather`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.allgather), [`Comm.alltoall`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.alltoall)\n",
"Functions with uppercase initial letters are based on buffers, such as `Comm.Bcast`, `Comm.Scatter`, `Comm.Gather`, [`Comm.Allgather`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Allgather), [`Comm.Alltoall`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Alltoall). Functions with lowercase initial letters can send and receive Python objects, such as [`Comm.bcast`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.bcast), [`Comm.scatter`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.scatter), [`Comm.gather`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.gather), [`Comm.allgather`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.allgather), [`Comm.alltoall`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.alltoall).\n",
"\n",
"## 同步\n",
"## Synchronization\n",
"\n",
"MPI 的计算分布在多个进程,每个进程的计算速度有快有慢。`Comm.Barrier` 对 Communicator 里所有进程都执行同步等待,就像 Barrier 的英文名一样,设置一个屏障,等待所有进程都执行完。计算比较快的进程们达到 `Comm.Barrier`,无法执行 `Comm.Barrier` 之后的计算逻辑,必须等待其他所有进程都到达这里才行。\n",
"MPI computations are distributed across multiple processes, each with varying computational speeds. `Comm.Barrier` forces synchronization, as the name suggests, by setting up a barrier for all processes within the communicator. Faster processes reaching `Comm.Barrier` cannot proceed with the subsequent code logic until all other processes have also reached this point. It acts as a synchronization point, ensuring that all processes complete their computations before moving forward.\n",
"\n",
"## 数据移动\n",
"## Data Movement\n",
"\n",
"### 广播\n",
"### Broadcast\n",
"\n",
"`Comm.Bcast` 将数据从一个发送者全局广播给组里所有进程,广播操作适用于需要将同一份数据发送给所有进程的场景,例如将一个全局变量的值发送给所有进程,如 {numref}`mpi-broadcast` 所示。\n",
"`Comm.Bcast` globally broadcasts data from one sender to all processes within the group. Broadcast operations are useful in scenarios where the same data needs to be sent to all processes, such as broadcasting the value of a global variable to all processes, as illustrated in {numref}`mpi-broadcast`.\n",
"\n",
"```{figure} ../img/ch-mpi/broadcast.svg\n",
"---\n",
Expand All @@ -34,9 +34,10 @@
"---\n",
"Broadcast\n",
"```\n",
"### 案例1:广播\n",
"\n",
"{numref}`mpi-broadcast-py` 对如何将一个 NumPy 数组广播到所有的进程中进行了演示\n",
"### Example 1: Broadcast\n",
"\n",
"The example in {numref}`mpi-broadcast-py` demonstrates how to broadcast a NumPy array to all processes.\n",
"\n",
"```{code-block} python\n",
":caption: broadcast.py\n",
Expand All @@ -51,14 +52,14 @@
"\n",
"N = 5\n",
"if comm.rank == 0:\n",
" A = np.arange(N, dtype=np.float64) # rank 0 初始化数据到变量 A\n",
" A = np.arange(N, dtype=np.float64) # rank 0 initializes data into variable A\n",
"else:\n",
" A = np.empty(N, dtype=np.float64) # 其他节点的变量 A 为空\n",
" A = np.empty(N, dtype=np.float64) # As on other processes are empty\n",
"\n",
"# 广播\n",
"# Broadcast\n",
"comm.Bcast([A, MPI.DOUBLE])\n",
"\n",
"# 验证所有节点上的 A\n",
"# Print to verify\n",
"print(\"Rank:%2d, data:%s\" % (comm.rank, A))\n",
"```"
]
Expand Down Expand Up @@ -87,24 +88,25 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"### Scatter 和 Gather\n",
"### Scatter and Gather\n",
"\n",
"`Comm.Scatter` and `Comm.Gather` are a pair of corresponding operations:\n",
"\n",
"`Comm.Scatter` 和 `Comm.Gather` 是一组相对应的操作,如\n",
"* `Comm.Scatter` scatters data from one process to all processes within the group. A process divides the data into multiple chunks, with each chunk sent to the corresponding process. Other processes receive and store their respective chunks. Scatter operations are suitable for partitioning a larger dataset into multiple smaller chunks.\n",
"\n",
"`Comm.Scatter` 将数据从一个进程分散到组中的所有进程,一个进程将数据分散成多个块,每个块发送给对应的进程。其他进程接收并存储各自的块。Scatter 操作适用于将一个较大的数据集分割成多个小块。\n",
"`Comm.Gather` 与 `Comm.Scatter` 相反,将组里所有进程的小数据块归集到一个进程上。\n",
"* `Comm.Gather`, on the contrary, gathers small data chunks from all processes in the group to one process.\n",
"\n",
"```{figure} ../img/ch-mpi/scatter-gather.svg\n",
"---\n",
"width: 600px\n",
"name: mpi-scatter-gather\n",
"---\n",
"Scatter Gather\n",
"Scatter and Gather\n",
"```\n",
"\n",
"### 案例2:Scatter\n",
"### Example 2: Scatter\n",
"\n",
"{numref}`mpi-scatter` 演示了如何使用 Scatter 将数据分散到所有进程。\n",
"The example in {numref}`mpi-scatter` demonstrates how to use Scatter to distribute data to all processes.\n",
"\n",
"```{code-block} python\n",
":caption: scatter.py\n",
Expand Down Expand Up @@ -158,11 +160,11 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"### Allgather Alltoall\n",
"### Allgather and Alltoall\n",
"\n",
"另外两个比较复杂的操作是 `Comm.Allgather` `Comm.Alltoall`\n",
"Two more complex operations are `Comm.Allgather` and `Comm.Alltoall`.\n",
"\n",
"`Comm.Allgather` `Comm.Gather` 的进阶版,如 {numref}`mpi-allgather` 所示,它把散落在多个进程的多个小数据块发送给每个进程,每个进程都包含了一份相同的数据。\n",
"`Comm.Allgather` is an advanced version of `Comm.Gather`, as shown in {numref}`mpi-allgather`. It sends multiple small data chunks scattered across various processes to every process, ensuring that each process contains an identical set of data.\n",
"\n",
"```{figure} ../img/ch-mpi/allgather.svg\n",
"---\n",
Expand All @@ -172,7 +174,8 @@
"Allgather\n",
"```\n",
"\n",
"`Comm.Alltoall` 是 `Comm.Scatter` 的 `Comm.Gather` 组合,如 {numref}`mpi-alltoall` 所示,先进行 `Comm.Scatter`,再进行 `Comm.Gather`。如果把数据看成一个矩阵,`Comm.Alltoall` 又可以被看做是一种全局的转置(Transpose)操作。\n",
"\n",
"`Comm.Alltoall` is a combination of `Comm.Scatter` and `Comm.Gather`, as illustrated in {numref}`mpi-alltoall`. It first performs `Comm.Scatter` and then follows with `Comm.Gather`. If the data is viewed as a matrix, `Comm.Alltoall` can be considered a global transpose operation.\n",
"\n",
"```{figure} ../img/ch-mpi/alltoall.svg\n",
"---\n",
Expand All @@ -182,21 +185,21 @@
"Alltoall\n",
"```\n",
"\n",
"## 集合计算\n",
"## Collective Computation\n",
"\n",
"集合计算指的是在将散落在不同进程的数据聚合在一起的时候,对数据进行计算,比如 `Comm.Reduce` [`Intracomm`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Intracomm.html) 等。如 {numref}`mpi-reduce` {numref}`mpi-scan` 所示,数据归集到某个进程时,还执行了聚合函数 `f`,常用的聚合函数有求和 [`MPI.SUM`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.SUM.html) 等。\n",
"Collective computation refers to performing computations on data when aggregating scattered data from different processes, such as `Comm.Reduce` and [`Intracomm`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Intracomm.html). As shown in {numref}`mpi-reduce` and {numref}`mpi-scan`, when data is gathered to a specific process, an aggregation function `f` is applied. Common aggregation functions include summation [`MPI.SUM`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.SUM.html) and others.\n",
"\n",
"```{figure} ../img/ch-mpi/reduce.svg\n",
"---\n",
"width: 600px\n",
"width: 500px\n",
"name: mpi-reduce\n",
"---\n",
"Reduce\n",
"```\n",
"\n",
"```{figure} ../img/ch-mpi/scan.svg\n",
"---\n",
"width: 600px\n",
"width: 500px\n",
"name: mpi-scan\n",
"---\n",
"Scan\n",
Expand Down
2 changes: 1 addition & 1 deletion ch-mpi/index.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# MPI
# MPI for Python

```{tableofcontents}
```
6 changes: 3 additions & 3 deletions ch-mpi/master-worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@
size = comm.Get_size()

if rank < size - 1:
# Worker 进程
# Worker process
np.random.seed(rank)
# 随机生成
# Generate random data
data_count = np.random.randint(100)
data = np.random.randint(100, size=data_count)
comm.send(data, dest=size - 1)
print(f"Worker: worker ID: {rank}; count: {len(data)}")
else:
# Master 进程
# Master process
for i in range(size - 1):
status = MPI.Status()
data = comm.recv(source=MPI.ANY_SOURCE, status=status)
Expand Down
Loading

0 comments on commit 8bb9cc8

Please sign in to comment.