Python——Ray-远程函数与本地函数的区别


整体说明

  • 远程函数与本地函数的区别主要在 序列化机制执行位置 两个维度
  • 序列化本质差异:
    • 本地函数是“传引用”,依赖执行环境已有定义;
    • Ray 远程函数是“传定义+环境”,集群自动同步,支持跨节点;
  • 执行位置差异:
    • 本地函数固定在调用方进程,无分布式能力;
    • Ray 远程函数由集群调度,可分布式并发执行;
  • 使用场景:
    • 本地函数:适用于单进程/单节点的简单逻辑,无需分布式;
    • Ray 远程函数:适用于分布式计算、并发任务、跨节点执行,是 Ray 分布式能力的核心
  • 核心差异总览
    对比维度 本地函数(未用 @ray.remote 装饰) Ray 远程函数(用 @ray.remote 装饰)
    序列化方式 依赖 Python 原生 pickle,仅序列化「函数引用」 Ray 自定义序列化(结合 pickle+集群元数据),序列化「函数元信息+代码定义」
    序列化限制 无法跨节点传递(远程节点无函数定义,引用失效) 可跨节点传递(集群自动同步函数定义到执行节点)
    执行位置 固定在「调用方所在的本地进程/线程」 分布式调度到「集群任意节点的 Worker 进程」(可指定资源)
    执行特性 同步执行,阻塞调用方;无并发调度能力 异步执行,返回 ObjectRef;支持集群级并发/分布式调度
    依赖传递 需手动确保执行环境有函数依赖(如导入、变量) Ray 自动打包函数依赖(如嵌套函数、闭包变量)并分发

序列化机制:“仅传引用” vs “传定义+元信息”

  • 序列化的核心目的是:让函数能在「非定义环境」中被正确执行
  • 两者的序列化逻辑完全不同:

本地函数:仅序列化“函数引用”,无实际代码

  • Python 原生 pickle 序列化本地函数时,不会打包函数的代码本身 ,只会记录函数的「模块路径+函数名」(比如 __main__.add
  • 这种“引用式序列化”仅在「同一进程/同一节点且函数已定义」的场景下有效,跨节点会直接失效
  • 错误示例:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    import ray

    ray.init(ignore_reinit_error=True)

    # 本地函数
    def add_remote(a, b):
    return a + b

    # 直接传递远程函数的引用(Ray 自动处理序列化)
    @ray.remote
    def execute_remote_func(func, x, y):
    return func(x,y) # 远程工作进程无法识别调用方的 local func,错误

    # 跨节点调度执行(单节点可以成功,但集群有多个节点会失败)
    result_ref = execute_remote_func.remote(add_remote, 2, 3)
    print(ray.get(result_ref)) # 单节点输出:5(成功执行);多节点执行错误

    ray.shutdown()

Ray 远程函数:序列化“函数元信息+代码定义”

  • Ray 对远程函数的序列化做了增强
    • 1)序列化时,不仅记录函数引用,还会打包函数的代码定义依赖模块闭包变量(若有);
    • 2)远程节点接收后,会自动还原函数的执行环境(无需手动导入);
    • 3)底层用 Ray 自定义的序列化器(兼容 pickle,但更适合分布式场景)
  • 正确示例:远程函数跨节点调用成功
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    import ray

    ray.init(ignore_reinit_error=True)

    # Ray 远程函数(已注册,自动序列化代码)
    @ray.remote
    def add_remote(a, b):
    return a + b

    # 直接传递远程函数的引用(Ray 自动处理序列化)
    @ray.remote
    def execute_remote_func(func, x, y):
    return ray.get(func.remote(x, y)) # 远程节点能识别并执行

    # 跨节点调度执行(即使集群有多个节点也能成功)
    result_ref = execute_remote_func.remote(add_remote, 2, 3)
    print(ray.get(result_ref)) # 输出:5(成功执行)

    ray.shutdown()

补充:Ray 还支持 嵌套远程函数 闭包变量传递

  • 比如在远程函数中引用本地变量,Ray 会自动序列化传递:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    import ray

    ray.init(ignore_reinit_error=True)

    @ray.remote
    def outer_remote(x):
    # 闭包变量 x 会被 Ray 自动序列化到远程节点
    @ray.remote
    def inner_remote(y):
    return x + y
    return inner_remote.remote(10)

    print(ray.get(ray.get(outer_remote.remote(5)))) # 输出:15

    ray.shutdown()

执行位置:“本地固定” vs “集群分布式调度”

  • 执行位置的差异是两者最直观的区别,直接决定了是否能利用集群资源:

本地函数:执行在「调用方所在进程」

  • 本地函数的执行位置完全固定:
    • 无论在哪里调用(即使在远程函数内部调用本地函数),函数都会在「发起调用的进程」中执行【存疑】
      • 问题:会出错吧,理论上远程函数内部无法调用本地函数?
    • 若在远程函数中调用本地函数,本质是在「远程节点的 Worker 进程」中执行,但该进程没有本地函数的定义(除非手动同步代码),所以必然失败;
    • 无并发能力:多个调用会串行执行在同一个进程/线程(或 Python 多进程的子进程,但需手动管理)

远程函数:执行在「集群 Worker 进程」

  • Ray 远程函数的执行位置由 Ray 集群的调度器统一管理:

    • 1)调用 func.remote() 时,会向 Ray 调度器提交一个任务
    • 2)调度器根据集群节点的资源(CPU、GPU、内存)情况,将任务分配到任意可用节点的 Worker 进程
    • 3)执行完成后,结果会存储在 Ray 的对象存储中,通过 ray.get() 可获取
    • 4)支持并发:多个 remote() 调用会被调度到不同 Worker 进程/节点,并行执行
  • 示例:远程函数分布式执行(多节点/多进程并发)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    import ray
    import os
    import time

    # os.environ["RAY_DEDUP_LOGS"] = "0" # 本意是让每个进程结果都完整输出,但这行代码仅当前进程生效,需要启动前环境变量才可以
    ray.init(ignore_reinit_error=True)

    # Ray 远程函数:打印执行节点的进程 ID 和节点名
    @ray.remote
    def add_remote(a, b):
    node_name = ray.util.get_node_ip_address() # 获取执行节点 IP
    pid = os.getpid() # 获取执行进程 ID
    print(f"在节点 {node_name} 的进程 {pid} 执行 add({a}, {b})")
    time.sleep(1) # 模拟耗时操作
    return a + b

    # 提交 5 个并发任务(会被调度到不同 Worker 进程)
    start = time.time()
    result_refs = [add_remote.remote(i, i*2) for i in range(5)]
    results = ray.get(result_refs) # 等待所有任务完成
    end = time.time()

    print("结果:", results) # 输出:[0, 3, 6, 9, 12]
    print(f"总耗时: {end - start:.2f}s") # 约 1s(并发执行,而非 5s 串行)

    ray.shutdown()
  • 执行上述脚本:

    1
    2
    export RAY_DEDUP_LOGS=0
    python demo.py
    • 注意:仅在代码里面添加 os.environ["RAY_DEDUP_LOGS"] = "0" 是不够的,因为:
      • Ray 的日志去重功能是在 Worker 进程启动时就决定的,而 Worker 是由 Ray 的主进程(Driver)启动的
      • 上面的代码在 ray.init() 之后才启动 Worker,那么环境变量必须在 Driver 启动 Worker 之前就传递过去,否则 Worker 进程会继承默认的去重配置
      • 所以最安全的打印所有日志的方式就是再启动脚本前配置环境变量
    • 另一种实现方式是在远程函数中返回 PID,然后由 Driver 打印
  • 输出示例:

    1
    2
    3
    4
    5
    6
    7
    8
    2025-11-04 11:42:43,175 INFO worker.py:1918 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265 
    (add_remote pid=14393) 在节点 127.0.0.1 的进程 14393 执行 add(2, 4)
    (add_remote pid=14399) 在节点 127.0.0.1 的进程 14399 执行 add(4, 8)
    (add_remote pid=14398) 在节点 127.0.0.1 的进程 14398 执行 add(1, 2)
    (add_remote pid=14400) 在节点 127.0.0.1 的进程 14400 执行 add(3, 6)
    (add_remote pid=14396) 在节点 127.0.0.1 的进程 14396 执行 add(0, 0)
    结果: [0, 3, 6, 9, 12]
    总耗时: 1.62s