Python——Ray-分布式架构简单了解


整体介绍

  • Ray 是一个用于分布式计算的开源框架,专为构建和运行分布式应用程序而设计
  • Ray 提供了简洁的 API,让开发者能够轻松地将单机程序扩展到分布式集群上,同时保持代码的可读性和可维护性
  • Ray 最初由 UC Berkeley 的 RISELab 开发,现在由 Anyscale 公司维护,广泛应用于机器学习、强化学习、并行计算等领域
  • Ray 既可以在本地实现并行计算,又可以非常容易的扩展到集群模式,实现分布式计算
  • Ray 与深度学习框架如 TensorFlow、PyTorch 和 MXNet 等互相兼容

Ray 的核心架构

  • Ray的系统架构采用了混合任务调度的思路,遵循典型的 Master-Slave 设计,但与传统分布式系统有所不同

Ray 中的关键组件总结

  • Ray在集群部署模式下启动了以下关键组件:
    • GlobalScheduler(全局调度器) :运行在Master节点上,负责接收本地调度器提交的任务,并将任务分发给合适的本地任务调度器执行
    • RedisServer :Master节点上启动的Redis服务器,用于保存分布式任务的状态信息(ControlState),包括对象机器的映射、任务描述、任务 debug 信息等
    • LocalScheduler(本地调度器) :每个 Slave 节点上启动的本地调度器,用于提交任务到全局调度器,以及分配任务给当前机器的 Worker 进程
    • Worker进程 :每个 Slave 节点上可以启动多个 Worker 进程执行分布式任务,并将计算结果存储到 ObjectStore
    • ObjectStore(对象存储) :每个 Slave 节点上的存储系统,用于存储只读数据对象,Worker 可以通过共享内存的方式访问这些对象数据,有效减少内存拷贝和对象序列化成本。ObjectStore 底层由 Apache Arrow 实现
    • Plasma :每个 Slave 节点上的ObjectStore管理器,当 Worker 访问本地 ObjectStore 上不存在的远程数据对象时,Plasma 会主动拉取其它 Slave 上的对象数据到当前机器

执行模型

  • Ray的执行模型基于动态任务图 ,这与 TensorFlow 中的静态计算图有本质区别:
    • TensorFlow的计算图用于表征神经网络,在单个应用中执行很多次
    • Ray的任务图用于表征整个应用,并仅执行一次
    • 任务图对于前台是未知的,随着应用的运行而动态地构建
    • 一个任务的执行可能创建更多的任务,形成动态依赖关系

代码示例

并行计算示例(无状态)

  • 基于 Ray 的并行计算代码 Demo:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    import ray
    import time
    import numpy as np

    # 初始化 Ray,默认在本地启动
    ray.init()

    # 使用 @ray.remote 装饰器将函数转换为分布式任务
    @ray.remote
    def compute_square(x):
    # 模拟耗时计算
    time.sleep(1)
    return x * x

    # 生成一些数据
    data = np.arange(10)

    # 并行执行任务
    start_time = time.time()
    # 创建任务对象引用
    square_refs = [compute_square.remote(i) for i in data]
    # 等待所有任务完成并获取结果
    results = ray.get(square_refs)
    end_time = time.time()

    print(f"串行计算结果: {[i*i for i in data]}")
    print(f"Ray 并行计算结果: {results}")
    print(f"Ray 并行计算耗时: {end_time - start_time:.4f} 秒")

    # 关闭 Ray
    ray.shutdown()

    # 串行计算结果: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
    # Ray 并行计算结果: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
    # Ray 并行计算耗时: 1.4069 秒

串行计算示例(有状态)

  • 基于 Ray 的串行计算代码 Demo:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    import ray
    import time

    # 初始化 Ray
    ray.init()

    # 使用 @ray.remote 装饰器定义 Actor 类
    @ray.remote
    class Counter:
    def __init__(self):
    self.count = 0

    def increment(self):
    time.sleep(1) # 模拟耗时操作
    self.count += 1
    return self.count

    def get_count(self):
    return self.count

    # 创建 Actor 实例
    counter = Counter.remote()

    # 并行调用 Actor 方法
    start_time = time.time()
    # 提交多个增量任务
    increment_refs = [counter.increment.remote() for _ in range(10)]
    # 获取所有增量任务的结果
    results = ray.get(increment_refs)
    # 获取最终计数
    final_count = ray.get(counter.get_count.remote())
    end_time = time.time()

    print(f"每次增量结果: {results}")
    print(f"最终计数: {final_count}")
    print(f"执行耗时: {end_time - start_time:.4f} 秒")

    # 关闭 Ray
    ray.shutdown()

    # 每次增量结果: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    # 最终计数: 10
    # 执行耗时: 10.1293 秒

分布式调度示例(集群模式)

  • 以上代码经过非常简单的修改即可进入集群模式

  • Ray 集群部署包括三个步骤(下面以 6379 端口为例展示流程)

  • 第一步:启动主节点 ,运行 ray start --head 从主节点启动集群

    1
    ray start --head --port=6379 --redis-password='your_secure_password_123'
    • 注:可通过 --redis-password 设置密码(可选),防止未授权节点加入,也可以不使用该参数
  • 第二步:启动工作节点 ,运行 ray start --address=<主节点IP> 加入集群

    1
    ray start --address='<head-node-ip>:6379' --redis-password='your_secure_password_123'
    • 执行上述命令后工作节点就会:
      • 自动连接到主节点
      • 等待接收任务
      • 执行主节点分配的计算任务
      • 将结果返回给主节点
  • 第三步:在主节点上运行的代码中连接集群

    1
    ray.init(address='auto', _redis_password='your_secure_password_123')
    • 注:以上代码仅在主节点上运行,工作节点不需要显示运行任何代码,仅需要启动并加入集群即可
  • 关闭 Ray 服务:

    1
    ray stop
  • 特别说明:集群模式与普通单机并行模式的区别很小,仅需要增加修改以上代码即可(其他代码都不需要修改)

  • Ray 在分布式下默认有许多默认功能:

    • 自动负载均衡:Ray 会自动将任务分配到空闲节点
    • 容错能力:如果某个工作节点失败,Ray 会重新调度任务
  • 集群模式工作流程总结:

    • 主节点通过 Redis 将任务(remote 函数或者类对象)放入队列
    • 空闲的工作节点从队列中获取任务
    • 工作节点执行任务
    • 将运算结果通过 共享内存/Object Store 返回给主节点

附录:工作节点启动高级配置

  • 可以通过参数调整工作节点行为:
    1
    2
    3
    4
    5
    ray start --address='<head-node-ip>:6379' \
    --redis-password='your_secure_password_123' \
    --num-cpus=8 \ # 限制使用8个CPU核心
    --num-gpus=1 \ # 声明有1个GPU可用
    --object-store-memory=100000000 \ # 设置对象存储大小

附录:Ray 集群状态监控

  • Ray 提供了 Web UI 用于监控集群状态
  • 在主节点启动时已经启用了 Dashboard(默认端口8265)
  • 在浏览器访问:http://<主节点IP>:8265
  • 在 Dashboard 中可以看到:
    • 集群节点列表和资源使用情况
    • 当前运行的任务
    • 历史任务统计
    • 每个节点的CPU/内存使用情况