整体介绍
- Ray 是一个用于分布式计算的开源框架,专为构建和运行分布式应用程序而设计
- Ray 提供了简洁的 API,让开发者能够轻松地将单机程序扩展到分布式集群上,同时保持代码的可读性和可维护性
- Ray 最初由 UC Berkeley 的 RISELab 开发,现在由 Anyscale 公司维护,广泛应用于机器学习、强化学习、并行计算等领域
- Ray 既可以在本地实现并行计算,又可以非常容易的扩展到集群模式,实现分布式计算
- Ray 与深度学习框架如 TensorFlow、PyTorch 和 MXNet 等互相兼容
Ray 的核心架构
- Ray的系统架构采用了混合任务调度的思路,遵循典型的 Master-Slave 设计,但与传统分布式系统有所不同
Ray 中的关键组件总结
- Ray在集群部署模式下启动了以下关键组件:
- GlobalScheduler(全局调度器) :运行在Master节点上,负责接收本地调度器提交的任务,并将任务分发给合适的本地任务调度器执行
- RedisServer :Master节点上启动的Redis服务器,用于保存分布式任务的状态信息(ControlState),包括对象机器的映射、任务描述、任务 debug 信息等
- LocalScheduler(本地调度器) :每个 Slave 节点上启动的本地调度器,用于提交任务到全局调度器,以及分配任务给当前机器的 Worker 进程
- Worker进程 :每个 Slave 节点上可以启动多个 Worker 进程执行分布式任务,并将计算结果存储到 ObjectStore
- ObjectStore(对象存储) :每个 Slave 节点上的存储系统,用于存储只读数据对象,Worker 可以通过共享内存的方式访问这些对象数据,有效减少内存拷贝和对象序列化成本。ObjectStore 底层由 Apache Arrow 实现
- Plasma :每个 Slave 节点上的ObjectStore管理器,当 Worker 访问本地 ObjectStore 上不存在的远程数据对象时,Plasma 会主动拉取其它 Slave 上的对象数据到当前机器
执行模型
- Ray的执行模型基于动态任务图 ,这与 TensorFlow 中的静态计算图有本质区别:
- TensorFlow的计算图用于表征神经网络,在单个应用中执行很多次
- Ray的任务图用于表征整个应用,并仅执行一次
- 任务图对于前台是未知的,随着应用的运行而动态地构建
- 一个任务的执行可能创建更多的任务,形成动态依赖关系
代码示例
并行计算示例(无状态)
- 基于 Ray 的并行计算代码 Demo:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35import ray
import time
import numpy as np
# 初始化 Ray,默认在本地启动
ray.init()
# 使用 @ray.remote 装饰器将函数转换为分布式任务
def compute_square(x):
# 模拟耗时计算
time.sleep(1)
return x * x
# 生成一些数据
data = np.arange(10)
# 并行执行任务
start_time = time.time()
# 创建任务对象引用
square_refs = [compute_square.remote(i) for i in data]
# 等待所有任务完成并获取结果
results = ray.get(square_refs)
end_time = time.time()
print(f"串行计算结果: {[i*i for i in data]}")
print(f"Ray 并行计算结果: {results}")
print(f"Ray 并行计算耗时: {end_time - start_time:.4f} 秒")
# 关闭 Ray
ray.shutdown()
# 串行计算结果: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# Ray 并行计算结果: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# Ray 并行计算耗时: 1.4069 秒
串行计算示例(有状态)
- 基于 Ray 的串行计算代码 Demo:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43import ray
import time
# 初始化 Ray
ray.init()
# 使用 @ray.remote 装饰器定义 Actor 类
class Counter:
def __init__(self):
self.count = 0
def increment(self):
time.sleep(1) # 模拟耗时操作
self.count += 1
return self.count
def get_count(self):
return self.count
# 创建 Actor 实例
counter = Counter.remote()
# 并行调用 Actor 方法
start_time = time.time()
# 提交多个增量任务
increment_refs = [counter.increment.remote() for _ in range(10)]
# 获取所有增量任务的结果
results = ray.get(increment_refs)
# 获取最终计数
final_count = ray.get(counter.get_count.remote())
end_time = time.time()
print(f"每次增量结果: {results}")
print(f"最终计数: {final_count}")
print(f"执行耗时: {end_time - start_time:.4f} 秒")
# 关闭 Ray
ray.shutdown()
# 每次增量结果: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# 最终计数: 10
# 执行耗时: 10.1293 秒
分布式调度示例(集群模式)
以上代码经过非常简单的修改即可进入集群模式
Ray 集群部署包括三个步骤(下面以 6379 端口为例展示流程)
第一步:启动主节点 ,运行
ray start --head从主节点启动集群1
ray start --head --port=6379 --redis-password='your_secure_password_123'
- 注:可通过
--redis-password设置密码(可选),防止未授权节点加入,也可以不使用该参数
- 注:可通过
第二步:启动工作节点 ,运行
ray start --address=<主节点IP>加入集群1
ray start --address='<head-node-ip>:6379' --redis-password='your_secure_password_123'
- 执行上述命令后工作节点就会:
- 自动连接到主节点
- 等待接收任务
- 执行主节点分配的计算任务
- 将结果返回给主节点
- 执行上述命令后工作节点就会:
第三步:在主节点上运行的代码中连接集群
1
ray.init(address='auto', _redis_password='your_secure_password_123')
- 注:以上代码仅在主节点上运行,工作节点不需要显示运行任何代码,仅需要启动并加入集群即可
关闭 Ray 服务:
1
ray stop
特别说明:集群模式与普通单机并行模式的区别很小,仅需要增加修改以上代码即可(其他代码都不需要修改)
Ray 在分布式下默认有许多默认功能:
- 自动负载均衡:Ray 会自动将任务分配到空闲节点
- 容错能力:如果某个工作节点失败,Ray 会重新调度任务
集群模式工作流程总结:
- 主节点通过 Redis 将任务(remote 函数或者类对象)放入队列
- 空闲的工作节点从队列中获取任务
- 工作节点执行任务
- 将运算结果通过 共享内存/Object Store 返回给主节点
附录:工作节点启动高级配置
- 可以通过参数调整工作节点行为:
1
2
3
4
5ray start --address='<head-node-ip>:6379' \
--redis-password='your_secure_password_123' \
--num-cpus=8 \ # 限制使用8个CPU核心
--num-gpus=1 \ # 声明有1个GPU可用
--object-store-memory=100000000 \ # 设置对象存储大小
附录:Ray 集群状态监控
- Ray 提供了 Web UI 用于监控集群状态
- 在主节点启动时已经启用了 Dashboard(默认端口8265)
- 在浏览器访问:http://<主节点IP>:8265
- 在 Dashboard 中可以看到:
- 集群节点列表和资源使用情况
- 当前运行的任务
- 历史任务统计
- 每个节点的CPU/内存使用情况