Skip to content

Commit

Permalink
ray data
Browse files Browse the repository at this point in the history
  • Loading branch information
luweizheng committed Feb 15, 2024
1 parent f17ee60 commit 2b45209
Show file tree
Hide file tree
Showing 22 changed files with 2,887 additions and 1,133 deletions.
3 changes: 3 additions & 0 deletions _static/custom.css
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
html[data-theme="light"] {
--sbt-color-announcement: rgb(125, 125, 125);
}
24 changes: 12 additions & 12 deletions _toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,19 @@ subtrees:
- file: ch-dask-dataframe/index
entries:
- file: ch-dask-dataframe/read-write
# - file: ch-ray-core/index
# entries:
# - file: ch-ray-core/ray-intro
# - file: ch-ray-core/remote-function
# - file: ch-ray-core/remote-object
# - file: ch-ray-core/remote-class
- file: ch-ray-core/index
entries:
- file: ch-ray-core/ray-intro
- file: ch-ray-core/remote-function
- file: ch-ray-core/remote-object
- file: ch-ray-core/remote-class
# - file: ch-ray-core/ray-internal
# - file: ch-ray-data/index
# entries:
# - file: ch-ray-data/ray-data-intro
# - file: ch-ray-data/data-load-inspect-save
# - file: ch-ray-data/data-transform
# - file: ch-ray-data/preprocessor
- file: ch-ray-data/index
entries:
- file: ch-ray-data/ray-data-intro
- file: ch-ray-data/data-load-inspect-save
- file: ch-ray-data/data-transform
- file: ch-ray-data/preprocessor
- file: ch-mpi/index
entries:
- file: ch-mpi/mpi-intro
Expand Down
25 changes: 13 additions & 12 deletions ch-ray-core/ray-intro.md
Original file line number Diff line number Diff line change
@@ -1,28 +1,29 @@
# Ray 简介
# Introduction to Ray

Ray 是一个可扩展的计算框架。它最初为强化学习设计,之后逐渐演变成一个面向数据科学和人工智能的框架。
如 {numref}`ray-ecosystem` 所示,当前 Ray 主要由底层的 Ray Core 和上层的各类 Ray AIR (Artificial Intelligence Runtime) 生态组成:Ray Core 是一系列底层 API, 可以将 Python 函数或者 Python 类等计算任务横向扩展到多个计算节点上;在 Ray Core 之上,Ray 封装了一些面向数据科学和人工智能的库(Ray AIR),可以进行数据的处理(Ray Data)、模型训练(Ray Train)、模型的超参数调优(Ray Tune),模型推理服务(Ray Serve),强化学习(RLib)等。
Ray is a computing framework initially designed for reinforcement learning, gradually evolving into a framework catering to data science and artificial intelligence.

As depicted in {numref}`ray-ecosystem`, Ray consists of the foundational Ray Core and various Ray AIR (Artificial Intelligence Runtime) components at the higher levels. Ray Core comprises a set of low-level APIs, enabling the horizontal scaling of Python functions or classes across multiple computing nodes. On top of Ray Core, Ray encapsulates several libraries tailored for data science and artificial intelligence within the Ray AIR ecosystem. These include functionalities for data processing (Ray Data), model training (Ray Train), hyperparameter tuning (Ray Tune), model serving (Ray Serve), reinforcement learning (RLib), and more.

```{figure} ../img/ch-ray-core/ray.svg
---
width: 800px
name: ray-ecosystem
---
Ray 生态
Ray ecosystem
```

Ray Core 提供的 API 将 Python 任务横向扩展到集群上,最关键的 API 是两个计算接口和一个数据接口,如 {numref}`ray-core-apis` 所示。
Ray Core provides APIs that horizontally scale Python tasks across a cluster. The key APIs include two computation interfaces and one data interface, as illustrated in {numref}`ray-core-apis`.

* **Task**: Python functions that can be scaled across the cluster.
* **Actor**: Python classes that can be scaled across the cluster.
* **Object**: Immutable distributed objects used for transferring data between Tasks and Actors.

* 任务(Task):面向函数(Function)的接口,用于定义一个函数,该函数可以在集群中分布式地执行。
* 行动者(Actor):面向类(Class)的接口,用于定义一个类,该类可以在集群中分布式地执行。
* 对象(Object):分布式的对象,对象不可变(Immutable),用于在 Task 和 Actor 之间传递数据。
Ray AIR ecosystem is built upon the Ray Core APIs.

```{figure} ../img/ch-ray-core/ray-apis.svg
---
width: 800px
name: ray-core-apis
---
Ray Core 核心 API
```

上层的各类生态均基于 Ray Core 的这些底层 API,结合各类人工智能应用编写而成。
Ray Core APIs
```
166 changes: 79 additions & 87 deletions ch-ray-core/remote-class.ipynb

Large diffs are not rendered by default.

655 changes: 237 additions & 418 deletions ch-ray-core/remote-function.ipynb

Large diffs are not rendered by default.

106 changes: 33 additions & 73 deletions ch-ray-core/remote-object.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
},
"source": [
"(remote-object)=\n",
"# 分布式对象存储\n",
"# Distributed Object Storage\n",
"\n",
"Ray 分布式计算中涉及共享数据可被放在分布式对象存储(Distributed Ojbect Store)中,被放置在分布式对象存储中的数据被称为远程对象(Remote Object)中。我们可以使用 [`ray.get()`](https://docs.ray.io/en/latest/ray-core/api/doc/ray.get.html) [`ray.put()`](https://docs.ray.io/en/latest/ray-core/api/doc/ray.put.html) 读写这些 Remote Object。与内存中的 Python 对象实例不同,Remote Object 是不可原地直接更改的(Immutable)。\n",
"In Ray, shared data can be stored in the distributed object store, and data placed in the distributed object store is referred to as a remote object. We can use [`ray.get()`](https://docs.ray.io/en/latest/ray-core/api/doc/ray.get.html) and [`ray.put()`](https://docs.ray.io/en/latest/ray-core/api/doc/ray.put.html) to read and write these remote objects. Unlike in-memory Python object instances that are mutable, remote objects are immutable, i.e., they cannot be changed in place.\n",
"\n",
"## ray.put()ray.get()\n"
"## `ray.put()` and `ray.get()`\n"
]
},
{
Expand All @@ -38,7 +38,7 @@
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "13de9da91de34038b536ee817f7ce2fc",
"model_id": "603a9d57ce204b46983b8af68a3d42e0",
"version_major": 2,
"version_minor": 0
},
Expand All @@ -62,24 +62,20 @@
" <table class=\"jp-RenderedHTMLCommon\" style=\"border-collapse: collapse;color: var(--jp-ui-font-color1);font-size: var(--jp-ui-font-size1);\">\n",
" <tr>\n",
" <td style=\"text-align: left\"><b>Python version:</b></td>\n",
" <td style=\"text-align: left\"><b>3.10.9</b></td>\n",
" <td style=\"text-align: left\"><b>3.11.7</b></td>\n",
" </tr>\n",
" <tr>\n",
" <td style=\"text-align: left\"><b>Ray version:</b></td>\n",
" <td style=\"text-align: left\"><b>2.7.0</b></td>\n",
" <td style=\"text-align: left\"><b>2.9.0</b></td>\n",
" </tr>\n",
" <tr>\n",
" <td style=\"text-align: left\"><b>Dashboard:</b></td>\n",
" <td style=\"text-align: left\"><b><a href=\"http://127.0.0.1:8265\" target=\"_blank\">http://127.0.0.1:8265</a></b></td>\n",
"</tr>\n",
"\n",
" \n",
"</table>\n",
"\n",
" </div>\n",
"</div>\n"
],
"text/plain": [
"RayContext(dashboard_url='127.0.0.1:8265', python_version='3.10.9', ray_version='2.7.0', ray_commit='b4bba4717f5ba04ee25580fe8f88eed63ef0c5dc', protocol_version=None)"
"RayContext(dashboard_url='', python_version='3.11.7', ray_version='2.9.0', ray_commit='9be5a16e3ccad0710bba08d0f75e9ff774ae6880', protocol_version=None)"
]
},
"execution_count": 1,
Expand Down Expand Up @@ -108,19 +104,20 @@
"origin_pos": 2
},
"source": [
"如 {numref}`put-get-object-store` 所示,操作 Remote Object 主要有 `ray.put()` 和 `ray.get()` 两个 API:`ray.put()` 与 `ray.get()` 。\n",
"As shown in {numref}`put-get-object-store`, working with remote objects involves two APIs: `ray.put()` and `ray.get()`.\n",
"\n",
"* `ray.put()` serializes the object data and writes it into the distributed object store. It returns a `RefObjectID`, which is a pointer to this remote object. By referencing this `RefObjectID`, we can use this data object in a distributed manner in remote functions or remote classes.\n",
"\n",
"* `ray.put()` 把某个计算节点中的对象数据进行序列化,并将其写入到 Ray 集群的分布式对象存储中,返回一个 `RefObjectID`,`RefObjectID` 是指向这个 Remote Object 的指针。我们可以通过引用这个 `RefObjectID`,在 Remote Function 或 Remote Class 中分布式地使用这个数据对象。\n",
"* `ray.get()` retrieves the data from the distributed object store via the `RefObjectID` and performs deserialization.\n",
"\n",
"* `ray.get()` 使用 `RefObjectID` 从把数据从分布式对象存储中拉取回来,并进行反序列化。\n",
"\n",
"```{figure} ../img/ch-ray-core/put-get-object-store.svg\n",
"---\n",
"width: 800px\n",
"name: put-get-object-store\n",
"---\n",
"RAY 分布式对象存储示意图\n",
"```\n"
"Ray distributed object store\n",
"```"
]
},
{
Expand Down Expand Up @@ -156,7 +153,7 @@
" return torch.randn(size=(size), dtype=torch.float)\n",
"\n",
"torch.manual_seed(42)\n",
"# 创建 16个 个机张量,每个张量大小为 (X, 8, 8)\n",
"# create 16 tensors, each is (X, 8, 8)\n",
"tensor_obj_ref_list = [ray.put(create_rand_tensor((i, 8, 8))) for i in range(1, 16)]\n",
"tensor_obj_ref_list[0], len(tensor_obj_ref_list)"
]
Expand All @@ -168,7 +165,7 @@
"origin_pos": 4
},
"source": [
"使用 `ray.get()` 从分布式对象存储中拉取数据。\n"
"Get the data from the distributed object store with `ray.get()`:"
]
},
{
Expand Down Expand Up @@ -227,7 +224,7 @@
"origin_pos": 6
},
"source": [
"或者把存放 `ObjectRefIDs` 列表的所有对象都拉取过来:\n"
"Or you can fetch the list of `ObjectRefID`s:"
]
},
{
Expand Down Expand Up @@ -288,7 +285,7 @@
"source": [
"## 案例1:对数据进行转换\n",
"\n",
"Remote Object 的数据是不可原地更改的,比如下面的操作在单机的内存上可以,但是在 Remote Object 上,不可以直接在原地对 Remote Object 做更改。\n"
"Remote Object 的数据是不可原地更改的,比如下面的操作在单机的内存上可以,但是在 Remote Object 上,不可以直接在原地对 Remote Object 做更改。"
]
},
{
Expand Down Expand Up @@ -339,7 +336,7 @@
"origin_pos": 10
},
"source": [
"如果我们想使用新数据,应该使用 Remote Function 或者 Remote Class 对 Remote Object 进行转换操作,生成新的 Remote Object。\n"
"If you want to use new data, you should use a remote function or remote class to perform the transformation operation on the remote object, generating a new remote object."
]
},
{
Expand Down Expand Up @@ -386,13 +383,13 @@
"origin_pos": 12
},
"source": [
"## 传递参数\n",
"## Passing Parameters\n",
"\n",
"Remote Object 可以通过 `RefObjectID` 在 Task、Actor 之间传递。\n",
"Remote Objects can be passed between tasks and actors via `RefObjectID`.\n",
"\n",
"### 直接传递\n",
"### Automatic De-referencing\n",
"\n",
"直接在 Task 或者 Actor 的函数调用时将 `RefObjectID` 作为参数传递进去。在下面这个例子中,`x_obj_ref` 是一个 `RefObjectID``echo()` 这个 Remote Function 将自动从 `x_obj_ref` 获取 `x` 的值。这个自动获取值的过程被称为自动反引用(De-referenced)。\n"
"Directly pass the `RefObjectID` as a parameter when calling a task or actor. In the example below, `x_obj_ref` is a `RefObjectID`, and the `echo()` remote function will automatically get the value of `x` from `x_obj_ref`, which is called de-referencing of `RefObjectID`."
]
},
{
Expand Down Expand Up @@ -483,13 +480,6 @@
]
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\u001b[2m\u001b[36m(echo pid=22623)\u001b[0m current value of argument x: [0, 1, 2, 3, 4]\n"
]
},
{
"data": {
"text/plain": [
Expand All @@ -512,12 +502,12 @@
"origin_pos": 16
},
"source": [
"### 复杂数据结构\n",
"### Complex Data Structures\n",
"\n",
"如果 `RefObjectID` 被包裹在一个复杂的数据结构中,Ray 并不会自动获取 `RefObjectID` 对应的值,即 De-referenced 并不是自动的。复杂数据结构包括:\n",
"If a `RefObjectID` is in a complex data structure, Ray does not automatically get the value of the `RefObjectID`. In other words, de-referencing is not automatic for complex data structures, including:\n",
"\n",
"* `RefObjectID` 被包裹在一个 `dict` 中,比如:`.remote({\"obj\": x_obj_ref})`\n",
"* `RefObjectID` 被包裹在一个 `list` 中,比如:`.remote([x_obj_ref])`\n"
"* When a `RefObjectID` is in a `dict`, for example: `.remote({\"obj\": x_obj_ref})`\n",
"* When a `RefObjectID` is in a `list`, for example: `.remote([x_obj_ref])`\n"
]
},
{
Expand All @@ -537,21 +527,11 @@
]
},
"outputs": [
{
"data": {
"text/plain": [
"{'obj': ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010000000)}"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\u001b[2m\u001b[36m(echo pid=22630)\u001b[0m current value of argument x: [0, 1, 2, 3, 4]\n"
"\u001b[36m(echo pid=95333)\u001b[0m current value of argument x: [0, 1, 2, 3, 4]\n"
]
},
{
Expand Down Expand Up @@ -590,27 +570,7 @@
"name": "stdout",
"output_type": "stream",
"text": [
"\u001b[2m\u001b[36m(echo pid=70968)\u001b[0m current value of argument x: [0, 1, 2, 3, 4]\n",
"\u001b[2m\u001b[36m(echo pid=70962)\u001b[0m current value of argument x: [0, 1, 2, 3, 4]\n",
"\u001b[2m\u001b[36m(echo pid=70963)\u001b[0m current value of argument x: {'obj': ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010000000)}\n",
"\u001b[2m\u001b[36m(echo pid=70963)\u001b[0m current value of argument x: [ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010000000)]\n"
]
},
{
"data": {
"text/plain": [
"[ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010000000)]"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\u001b[2m\u001b[36m(echo pid=22630)\u001b[0m current value of argument x: {'obj': ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010e1f505)}\n"
"\u001b[36m(echo pid=95325)\u001b[0m current value of argument x: {'obj': ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010e1f505)}\n"
]
},
{
Expand All @@ -635,11 +595,11 @@
"origin_pos": 19
},
"source": [
"## 底层实现\n",
"## Implementation\n",
"\n",
"Ray 集群的每个计算节点都有一个基于共享内存的对象存储, Remote Object 的数据会存储在集群某个或者某些计算节点的对象存储中,所有计算节点的共享内存共同组成了分布式对象存储。\n",
"Each computing node in the Ray cluster has a shared memory object store. The data of a remote object is stored in the object store of one or more computing nodes in the cluster. The shared memory of all computing nodes collectively forms the distributed object store.\n",
"\n",
"当某个 Remote Object 的数据量较小时(<= 100 KB),它会被存储在计算节点进程内存中;当数据量较大时,它会被存储在分布式的共享内存中;当集群的共享内存的空间不够时,数据会被外溢(Spill)到持久化的存储上,比如硬盘或者S3。\n"
"When the data volume of a remote object is small (<= 100 KB), it is stored in the memory of the computing node process. When the data volume is large, it is stored in the distributed shared memory. When the shared memory space of the cluster is insufficient, data is spilled to persistent storage, such as a hard disk or S3."
]
},
{
Expand Down Expand Up @@ -680,7 +640,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.9"
"version": "3.11.7"
},
"required_libs": []
},
Expand Down
Loading

0 comments on commit 2b45209

Please sign in to comment.