Skip to content

Commit

Permalink
modin xorbits (#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
luweizheng authored May 9, 2024
1 parent 9236a0a commit c445c1f
Show file tree
Hide file tree
Showing 10 changed files with 668 additions and 7 deletions.
16 changes: 10 additions & 6 deletions _toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ root: index
subtrees:
- numbered: 2
entries:
- file: ch-intro/index
- file: ch-parallel-computing/index
entries:
- file: ch-intro/computer-architecture
- file: ch-intro/serial-parallel
- file: ch-intro/thread-process
- file: ch-intro/parallel-program-design
- file: ch-intro/performance-metrics
- file: ch-parallel-computing/computer-architecture
- file: ch-parallel-computing/serial-parallel
- file: ch-parallel-computing/thread-process
- file: ch-parallel-computing/parallel-program-design
- file: ch-parallel-computing/performance-metrics
- file: ch-data-science/index
entries:
- file: ch-data-science/data-science-lifecycle
Expand Down Expand Up @@ -57,6 +57,10 @@ subtrees:
- file: ch-ray-ml/ray-train
- file: ch-ray-ml/ray-tune
- file: ch-ray-ml/ray-serve
- file: ch-modin-xorbits/index
entries:
- file: ch-modin-xorbits/modin
- file: ch-modin-xorbits/xorbits
- file: ch-mpi/index
entries:
- file: ch-mpi/mpi-intro
Expand Down
332 changes: 332 additions & 0 deletions ch-modin-xorbits/modin.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,332 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"(sec-modin)=\n",
"# Modin\n",
"\n",
"Modin 是一款专门加速 pandas DataFrame 的框架。它对大数据进行了切分,使 DataFrame 运算分布到多核和集群上。它底层使用了 Ray 或 Dask 作为分布式执行引擎。因此,在安装 Modin 时,还要安装对应的执行引擎(Ray、Dask 或 [unidist](https://github.com/modin-project/unidist/)),比如 `pip install \"modin[ray]\"` 或 `pip install \"modin[dask]\"`。Modin 默认使用 Ray 作为其执行引擎。\n",
"\n",
"## API 兼容性\n",
"\n",
"Dask DataFrame 与 pandas DataFrame 其实有不少差异,很多 pandas 工作流并不能快速迁移到 Dask DataFrame 上。Modin 更看重与 pandas 的兼容性,用户只需要 `import modin.pandas as pd`,绝大多数 pandas 工作流可以快速迁移到 Modin 上。\n",
"\n",
"Dask DataFrame 只按列对大数据进行切分,且没有记录每个 Partition 有多少数据,Modin 在多维度对数据进行切分,保留行标签和列标签。Modin 支持行索引 `iloc()`;记录了每个数据块的数据量,可以支持`median()`、`quantile()`;支持行和列的转换(比如,`pivot()`、`transpose()`)等操作。"
]
},
{
"cell_type": "code",
"execution_count": 33,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"\n",
"import sys\n",
"sys.path.append(\"..\")\n",
"from utils import nyc_flights\n",
"\n",
"folder_path = nyc_flights()\n",
"file_path = os.path.join(folder_path, \"*.csv\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
":::{note}\n",
"Modin 的 API 尽量与 pandas 一致,比如,pandas 的 `read_csv()` 只能读一个文件,不能读 `*.csv` 这样的通配符。Modin 额外增加了一些 API,比如,Modin 拓展了 `read_csv()`,提出了 `read_csv_glob()` 方法 可以读取 `*.csv` 这样的通配符,适合读大数据。这些额外增加的 API 在 `modin.experimental.pandas` 中。\n",
":::"
]
},
{
"cell_type": "code",
"execution_count": 34,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"Date 1991-01-11 00:00:00\n",
"DayOfWeek 5\n",
"DepTime 1303.0\n",
"CRSDepTime 1215\n",
"ArrTime 1439.0\n",
"CRSArrTime 1336\n",
"UniqueCarrier US\n",
"FlightNum 121\n",
"TailNum NaN\n",
"ActualElapsedTime 96.0\n",
"CRSElapsedTime 81\n",
"AirTime NaN\n",
"ArrDelay 63.0\n",
"DepDelay 48.0\n",
"Origin EWR\n",
"Dest PIT\n",
"Distance 319.0\n",
"TaxiIn NaN\n",
"TaxiOut NaN\n",
"Cancelled 0\n",
"Diverted 0\n",
"Name: 3, dtype: object"
]
},
"execution_count": 34,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import modin.experimental.pandas as pd\n",
"df = pd.read_csv_glob(file_path, parse_dates={'Date': [0, 1, 2]})\n",
"df.iloc[3]"
]
},
{
"cell_type": "code",
"execution_count": 35,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"0.0"
]
},
"execution_count": 35,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df['ArrDelay'].median()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"如果某些 API 在 Modin 中还没实现,Modin 会退回(Fallback)到 pandas,这也保证了兼容性。当然,缺点也很明显:将 Modin 的 DataFrame 转换为 pandas DataFrame 时,会有额外的开销;如果这个 DataFrame 分布在多个节点上,转化回 pandas 时会把数据集中到单机内存,有可能把单机内存挤爆。\n",
"\n",
"## 立即执行\n",
"\n",
"Modin 是立即执行,这一点与 pandas 一致。用户不需要像 Dask 那样调用 `.compute()` 来触发计算。\n",
"\n",
"Modin 也没有 Dask DataFrame 的数据类型推断。{numref}`sec-dask-dataframe-read-write`中的飞机起降数据上,Dask DataFrame `tail()` 会抛出异常,但 Modin 能够得到 pandas 一样的语义。"
]
},
{
"cell_type": "code",
"execution_count": 36,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>Date</th>\n",
" <th>DayOfWeek</th>\n",
" <th>DepTime</th>\n",
" <th>CRSDepTime</th>\n",
" <th>ArrTime</th>\n",
" <th>CRSArrTime</th>\n",
" <th>UniqueCarrier</th>\n",
" <th>FlightNum</th>\n",
" <th>TailNum</th>\n",
" <th>ActualElapsedTime</th>\n",
" <th>...</th>\n",
" <th>AirTime</th>\n",
" <th>ArrDelay</th>\n",
" <th>DepDelay</th>\n",
" <th>Origin</th>\n",
" <th>Dest</th>\n",
" <th>Distance</th>\n",
" <th>TaxiIn</th>\n",
" <th>TaxiOut</th>\n",
" <th>Cancelled</th>\n",
" <th>Diverted</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>1555982</th>\n",
" <td>1994-12-27</td>\n",
" <td>2</td>\n",
" <td>1721.0</td>\n",
" <td>1715</td>\n",
" <td>1930.0</td>\n",
" <td>1945</td>\n",
" <td>DL</td>\n",
" <td>149</td>\n",
" <td>NaN</td>\n",
" <td>129.0</td>\n",
" <td>...</td>\n",
" <td>NaN</td>\n",
" <td>-15.0</td>\n",
" <td>6.0</td>\n",
" <td>JFK</td>\n",
" <td>ATL</td>\n",
" <td>760.0</td>\n",
" <td>NaN</td>\n",
" <td>NaN</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1555983</th>\n",
" <td>1994-12-28</td>\n",
" <td>3</td>\n",
" <td>1715.0</td>\n",
" <td>1715</td>\n",
" <td>1934.0</td>\n",
" <td>1945</td>\n",
" <td>DL</td>\n",
" <td>149</td>\n",
" <td>NaN</td>\n",
" <td>139.0</td>\n",
" <td>...</td>\n",
" <td>NaN</td>\n",
" <td>-11.0</td>\n",
" <td>0.0</td>\n",
" <td>JFK</td>\n",
" <td>ATL</td>\n",
" <td>760.0</td>\n",
" <td>NaN</td>\n",
" <td>NaN</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1555984</th>\n",
" <td>1994-12-29</td>\n",
" <td>4</td>\n",
" <td>1715.0</td>\n",
" <td>1715</td>\n",
" <td>1941.0</td>\n",
" <td>1945</td>\n",
" <td>DL</td>\n",
" <td>149</td>\n",
" <td>NaN</td>\n",
" <td>146.0</td>\n",
" <td>...</td>\n",
" <td>NaN</td>\n",
" <td>-4.0</td>\n",
" <td>0.0</td>\n",
" <td>JFK</td>\n",
" <td>ATL</td>\n",
" <td>760.0</td>\n",
" <td>NaN</td>\n",
" <td>NaN</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"<p>3 rows x 21 columns</p>\n",
"</div>"
],
"text/plain": [
" Date DayOfWeek DepTime CRSDepTime ArrTime CRSArrTime \\\n",
"1555982 1994-12-27 2 1721.0 1715 1930.0 1945 \n",
"1555983 1994-12-28 3 1715.0 1715 1934.0 1945 \n",
"1555984 1994-12-29 4 1715.0 1715 1941.0 1945 \n",
"\n",
" UniqueCarrier FlightNum TailNum ActualElapsedTime ... AirTime \\\n",
"1555982 DL 149 NaN 129.0 ... NaN \n",
"1555983 DL 149 NaN 139.0 ... NaN \n",
"1555984 DL 149 NaN 146.0 ... NaN \n",
"\n",
" ArrDelay DepDelay Origin Dest Distance TaxiIn TaxiOut Cancelled \\\n",
"1555982 -15.0 6.0 JFK ATL 760.0 NaN NaN 0 \n",
"1555983 -11.0 0.0 JFK ATL 760.0 NaN NaN 0 \n",
"1555984 -4.0 0.0 JFK ATL 760.0 NaN NaN 0 \n",
"\n",
" Diverted \n",
"1555982 0 \n",
"1555983 0 \n",
"1555984 0 \n",
"\n",
"[3 rows x 21 columns]"
]
},
"execution_count": 36,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df.tail(3)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 执行引擎\n",
"\n",
"Modin 支持 Ray、Dask 和 unidist 分布式执行引擎:可以利用单机多核,也可以运行在集群上。以 Ray 为例,用户可以向 Ray 集群上提交作业,在代码中初始 Ray 运行时 `ray.init(address=\"auto\")` 后,会将作业运行 Ray 集群。\n",
"\n",
"Modin 默认使用 Ray 作为执行后端,也可以通过环境变量 `MODIN_ENGINE` 来设置执行后端,在命令行里:`export MODIN_ENGINE=dask`;或在 Jupyter Notebook 中:\n",
"\n",
"```python\n",
"import modin.config as modin_cfg\n",
"modin_cfg.Engine.put(\"ray\")\n",
"```\n",
"\n",
"undist 是 Modin 自己实现的一个执行后端,它支持 MPI,如果想用 undist MPI,除了设置 `MODIN_ENGINE` 还要设置 `UNIDIST_BACKEND`:\n",
"\n",
"```shell\n",
"export MODIN_ENGINE=unidist\n",
"export UNIDIST_BACKEND=mpi \n",
"```"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "dispy",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.7"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Loading

0 comments on commit c445c1f

Please sign in to comment.