Python——Ray-远程函数与本地函数的区别


整体说明

  • 远程函数与本地函数的区别主要在 序列化机制执行位置 两个维度
  • 序列化本质差异:
    • 本地函数可以理解为“传引用”,依赖执行环境已有定义
      • 注:本地函数也不仅仅是 “传引用”
        • Python 的 pickle 序列化函数时,实际上是序列化函数的名称和所在模块的路径
        • 反序列化时,需要在目标环境中导入同名模块、找到同名函数
        • 因此 Python 本地函数调用依赖目标环境与源环境“同构”
        • Ray 跨节点时,Worker 进程的 __main__ 模块通常与 Driver 不同,所以会失败
    • Ray 远程函数是 “传定义+环境” ,集群自动同步,支持跨节点;
  • 执行位置差异:
    • 本地函数固定在调用方进程,无分布式能力;
    • Ray 远程函数由集群调度,可分布式并发执行;
  • 使用场景:
    • 本地函数:适用于单进程/单节点的简单逻辑,无需分布式;
    • Ray 远程函数:适用于分布式计算、并发任务、跨节点执行,是 Ray 分布式能力的核心
  • 核心差异总览
    对比维度 本地函数(未用 @ray.remote 装饰) Ray 远程函数(用 @ray.remote 装饰)
    序列化方式 依赖 Python 原生 pickle,仅序列化「函数引用」 Ray 自定义序列化(结合 pickle+集群元数据),序列化「函数元信息+代码定义」
    序列化限制 无法跨节点传递(远程节点无函数定义,引用失效) 可跨节点传递(集群自动同步函数定义到执行节点)
    执行位置 固定在「调用方所在的本地进程/线程」 分布式调度到「集群任意节点的 Worker 进程」(可指定资源)
    执行特性 同步执行,阻塞调用方;无并发调度能力 异步执行,返回 ObjectRef;支持集群级并发/分布式调度
    依赖传递 需手动确保执行环境有函数依赖(如导入、变量) Ray 自动打包函数依赖(如嵌套函数、闭包变量)并分发

序列化机制:“仅传引用” vs “传定义+元信息”

  • 序列化的核心目的是:让函数能在「非定义环境」中被正确执行
  • 两者的序列化逻辑完全不同:

本地函数:仅序列化“函数引用”,无实际代码

  • Python 原生 pickle 序列化本地函数时,不会打包函数的代码本身 ,只会记录函数的「模块路径+函数名」(比如 __main__.add
  • 这种“引用式序列化”仅在「同一进程/同一节点且函数已定义」的场景下有效,跨节点会直接失效
  • 错误示例:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    import ray

    ray.init(ignore_reinit_error=True)

    # 本地函数
    def add_remote(a, b):
    return a + b

    # 直接传递远程函数的引用(Ray 自动处理序列化)
    @ray.remote
    def execute_remote_func(func, x, y):
    return func(x,y) # 远程工作进程无法识别调用方的 local func,错误

    # 跨节点调度执行(单节点可以成功,但集群有多个节点会失败)
    result_ref = execute_remote_func.remote(add_remote, 2, 3)
    print(ray.get(result_ref)) # 单节点输出:5(成功执行);多节点执行错误

    ray.shutdown()

Ray 远程函数:序列化“函数元信息+代码定义”

  • Ray 对远程函数的序列化做了增强
    • 1)序列化时,不仅记录函数引用,还会打包函数的代码定义依赖模块闭包变量(若有);
    • 2)远程节点接收后,会自动还原函数的执行环境(无需手动导入);
    • 3)底层用 Ray 自定义的序列化器(兼容 pickle,但更适合分布式场景)
  • 正确示例:远程函数跨节点调用成功
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    import ray

    ray.init(ignore_reinit_error=True)

    # Ray 远程函数(已注册,自动序列化代码)
    @ray.remote
    def add_remote(a, b):
    return a + b

    # 直接传递远程函数的引用(Ray 自动处理序列化)
    @ray.remote
    def execute_remote_func(func, x, y):
    return ray.get(func.remote(x, y)) # 远程节点能识别并执行

    # 跨节点调度执行(即使集群有多个节点也能成功)
    result_ref = execute_remote_func.remote(add_remote, 2, 3) # 注意:传入的参数 add_remote 本身也需要是 @ray.remote 封装过的 Ray 远程函数
    print(ray.get(result_ref)) # 输出:5(成功执行)

    ray.shutdown()

补充:Ray 还支持 嵌套远程函数 闭包变量传递

  • 比如在远程函数中引用本地变量,Ray 会自动序列化传递:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    import ray

    ray.init(ignore_reinit_error=True)

    @ray.remote
    def outer_remote(x):
    # 闭包变量 x 会被 Ray 自动序列化到远程节点
    @ray.remote
    def inner_remote(y):
    return x + y
    return inner_remote.remote(10)

    print(ray.get(ray.get(outer_remote.remote(5)))) # 输出:15

    ray.shutdown()

执行位置:“本地固定” vs “集群分布式调度”

  • 执行位置的差异是两者最直观的区别,直接决定了是否能利用集群资源:

本地函数:执行在 调用方所在进程

  • 本地函数的执行位置完全固定:
    • 无论在哪里调用(即使在远程函数内部调用本地函数),函数都会在 发起调用的进程 中执行【存疑】
      • 问题:这里描述有错(部分书籍会这样写),理论上远程函数内部无法调用本地函数,所以应该加上一句,在可以调用成功的前提下
    • 若在远程函数中调用本地函数,本质是在「远程节点的 Worker 进程」中执行,但该进程没有本地函数的定义(除非手动同步代码),所以必然失败;
    • 无并发能力:多个调用会串行执行在同一个进程/线程(或 Python 多进程的子进程,但需手动管理)

远程函数:执行在「集群 Worker 进程」

  • Ray 远程函数的执行位置由 Ray 集群的调度器统一管理:

    • 1)调用 func.remote() 时,会向 Ray 调度器提交一个任务
    • 2)调度器根据集群节点的资源(CPU、GPU、内存)情况,将任务分配到任意可用节点的 Worker 进程
    • 3)执行完成后,结果会存储在 Ray 的对象存储中,通过 ray.get() 可获取
    • 4)支持并发:多个 remote() 调用会被调度到不同 Worker 进程/节点,并行执行
  • 示例:远程函数分布式执行(多节点/多进程并发)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
       import ray
    import os
    import time

    # os.environ["RAY_DEDUP_LOGS"] = "0" # 本意是让每个进程结果都完整输出,但这行代码仅当前进程生效,需要启动前配置环境变量才可以
    # # 如果是通过代码定义,则在 os.environ 设置在 ray.init() 之前进行才能生效,因为 Worker 进程在初始化时读取环境变量
    ray.init(ignore_reinit_error=True)

    # Ray 远程函数:打印执行节点的进程 ID 和节点名
    @ray.remote
    def add_remote(a, b):
    node_name = ray.util.get_node_ip_address() # 获取执行节点 IP
    pid = os.getpid() # 获取执行进程 ID
    print(f"在节点 {node_name} 的进程 {pid} 执行 add({a}, {b})")
    time.sleep(1) # 模拟耗时操作
    return a + b

    # 提交 5 个并发任务(会被调度到不同 Worker 进程)
    start = time.time()
    result_refs = [add_remote.remote(i, i*2) for i in range(5)]
    results = ray.get(result_refs) # 等待所有任务完成
    end = time.time()

    print("结果:", results) # 输出:[0, 3, 6, 9, 12]
    print(f"总耗时: {end - start:.2f}s") # 约 1s(并发执行,而非 5s 串行)

    ray.shutdown()
  • 执行上述脚本:

    1
    2
    export RAY_DEDUP_LOGS=0
    python demo.py
    • 注意:仅在代码里面添加 os.environ["RAY_DEDUP_LOGS"] = "0" 是不够的,因为:
      • Ray 的日志去重功能是在 Worker 进程启动时就决定的,而 Worker 是由 Ray 的主进程(Driver)启动的
      • 上面的代码在 ray.init() 之后才启动 Worker,那么环境变量必须在 Driver 启动 Worker 之前就传递过去,否则 Worker 进程会继承默认的去重配置
      • 所以最安全的打印所有日志的方式就是再启动脚本前配置环境变量
    • 另一种实现方式是在远程函数中返回 PID,然后由 Driver 打印
  • 输出示例:

    1
    2
    3
    4
    5
    6
    7
    8
    2025-11-04 11:42:43,175 INFO worker.py:1918 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265 
    (add_remote pid=14393) 在节点 127.0.0.1 的进程 14393 执行 add(2, 4)
    (add_remote pid=14399) 在节点 127.0.0.1 的进程 14399 执行 add(4, 8)
    (add_remote pid=14398) 在节点 127.0.0.1 的进程 14398 执行 add(1, 2)
    (add_remote pid=14400) 在节点 127.0.0.1 的进程 14400 执行 add(3, 6)
    (add_remote pid=14396) 在节点 127.0.0.1 的进程 14396 执行 add(0, 0)
    结果: [0, 3, 6, 9, 12]
    总耗时: 1.62s

附录:远程调用时传入的函数指针必须是远程函数

  • 在 Ray 中不支持直接传入 local 函数指针作为远程函数的执行对象,需通过 Ray 装饰器(@ray.remote)将函数注册为远程可执行,再通过 函数名.remote() 调用(本质是基于函数标识而非指针传递)
  • 总结:
    • 不推荐将普通函数作为参数传递给 Ray 远程函数
    • 推荐使用 @ray.remote 装饰器或在远程函数内部定义逻辑
    • 注意:一些代码在单机环境下可能碰巧能运行,但不具有可移植性和可靠性(这一点需要注意 Ray 本地调试通过可能也无法分布式运行)

错误示例(未注册本地函数)

  • add 未被 @ray.remote 注册,它只是一个本地函数 ,无法在 Ray 分布式环境中执行,直接传递给远程函数(如 execute_func)会报错

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    import ray

    ray.init(ignore_reinit_error=True)

    # 未注册的本地函数
    def add(a, b):
    return a + b

    # 已注册的远程函数
    @ray.remote
    def execute_func(func, x, y):
    # 此处调用本地函数会失败,因为 func 在远程节点无定义
    # # 远程节点的工作进程无法导入本地主模块的 add_local 函数,也无法序列化传递普通函数,可能会直接抛出 SerializationError
    # # 单进程/单节点下调用指针函数可以执行,但是分布式情况下,local_func 无法被序列化,会出错
    return func(x, y) # 报错:NameError,PicklingError 或 SerializationError

    # 调用会抛出异常
    try:
    result = ray.get(execute_func.remote(add, 4, 6))
    except Exception as e:
    print("错误:", e) # 提示无法序列化或找不到函数

    ray.shutdown()
  • 核心原因:Ray 远程函数执行依赖序列化传输集群节点间代码同步

    • 未注册的本地函数无法被序列化为集群可识别的任务,且远程节点没有该函数的定义,会导致执行失败

正确示例(远程函数调用)

  • Ray 的远程函数依赖集群调度,通过 @ray.remote 显式注册后使用远程调用函数调用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    import ray

    ray.init(ignore_reinit_error=True)

    # 定义远程函数(会注册到 Ray 集群)
    @ray.remote
    def add(a, b):
    return a + b

    # 远程函数,可接收其他远程函数的调用结果
    @ray.remote
    def execute_func(func, x, y):
    # 这里 func 是远程函数标识,通过 .remote() 触发执行
    result = ray.get(func.remote(x, y)) # 使用远程调用的方式调用函数指针,实现调用远程函数,正确!
    # result = func(x, y) # remote 函数无法被直接调用,错误!
    # result = add(x,y) # remote 函数无法被直接调用,错误!
    # result = add_local(x, y) # add_local 当做 local 函数调用(注意:不再是指针传入),正确!
    return result

    # # 不使用 remote 直接调用 远程函数,错误
    # result1 = add(2, 3)

    # 使用remote直接调用远程函数,正确
    result1 = ray.get(add.remote(2, 3))
    print("直接调用结果:", result1) # 输出:5

    # 间接通过另一个远程函数调用(模拟"传递函数逻辑")
    result2 = ray.get(execute_func.remote(add, 2, 3))
    print("间接调用结果:", result2) # 输出:10

    ray.shutdown()
  • Ray 的远程函数依赖集群调度,需通过 @ray.remote 显式注册,无法像本地代码那样传递函数指针(内存地址在分布式环境中无效)

  • 若需在远程函数中复用其他函数逻辑,直接传递已注册的远程函数名(如示例中的 add),再通过 func.remote() 调用即可