整体说明
- 远程函数与本地函数的区别主要在 序列化机制 和 执行位置 两个维度
- 序列化本质差异:
- 本地函数是“传引用”,依赖执行环境已有定义;
- Ray 远程函数是“传定义+环境”,集群自动同步,支持跨节点;
- 执行位置差异:
- 本地函数固定在调用方进程,无分布式能力;
- Ray 远程函数由集群调度,可分布式并发执行;
- 使用场景:
- 本地函数:适用于单进程/单节点的简单逻辑,无需分布式;
- Ray 远程函数:适用于分布式计算、并发任务、跨节点执行,是 Ray 分布式能力的核心
- 核心差异总览
对比维度 本地函数(未用 @ray.remote装饰)Ray 远程函数(用 @ray.remote装饰)序列化方式 依赖 Python 原生 pickle,仅序列化「函数引用」Ray 自定义序列化(结合 pickle+集群元数据),序列化「函数元信息+代码定义」序列化限制 无法跨节点传递(远程节点无函数定义,引用失效) 可跨节点传递(集群自动同步函数定义到执行节点) 执行位置 固定在「调用方所在的本地进程/线程」 分布式调度到「集群任意节点的 Worker 进程」(可指定资源) 执行特性 同步执行,阻塞调用方;无并发调度能力 异步执行,返回 ObjectRef;支持集群级并发/分布式调度依赖传递 需手动确保执行环境有函数依赖(如导入、变量) Ray 自动打包函数依赖(如嵌套函数、闭包变量)并分发
序列化机制:“仅传引用” vs “传定义+元信息”
- 序列化的核心目的是:让函数能在「非定义环境」中被正确执行
- 两者的序列化逻辑完全不同:
本地函数:仅序列化“函数引用”,无实际代码
- Python 原生
pickle序列化本地函数时,不会打包函数的代码本身 ,只会记录函数的「模块路径+函数名」(比如__main__.add) - 这种“引用式序列化”仅在「同一进程/同一节点且函数已定义」的场景下有效,跨节点会直接失效
- 错误示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18import ray
ray.init(ignore_reinit_error=True)
# 本地函数
def add_remote(a, b):
return a + b
# 直接传递远程函数的引用(Ray 自动处理序列化)
def execute_remote_func(func, x, y):
return func(x,y) # 远程工作进程无法识别调用方的 local func,错误
# 跨节点调度执行(单节点可以成功,但集群有多个节点会失败)
result_ref = execute_remote_func.remote(add_remote, 2, 3)
print(ray.get(result_ref)) # 单节点输出:5(成功执行);多节点执行错误
ray.shutdown()
Ray 远程函数:序列化“函数元信息+代码定义”
- Ray 对远程函数的序列化做了增强 :
- 1)序列化时,不仅记录函数引用,还会打包函数的代码定义、依赖模块、闭包变量(若有);
- 2)远程节点接收后,会自动还原函数的执行环境(无需手动导入);
- 3)底层用 Ray 自定义的序列化器(兼容
pickle,但更适合分布式场景)
- 正确示例:远程函数跨节点调用成功
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19import ray
ray.init(ignore_reinit_error=True)
# Ray 远程函数(已注册,自动序列化代码)
def add_remote(a, b):
return a + b
# 直接传递远程函数的引用(Ray 自动处理序列化)
def execute_remote_func(func, x, y):
return ray.get(func.remote(x, y)) # 远程节点能识别并执行
# 跨节点调度执行(即使集群有多个节点也能成功)
result_ref = execute_remote_func.remote(add_remote, 2, 3)
print(ray.get(result_ref)) # 输出:5(成功执行)
ray.shutdown()
补充:Ray 还支持 嵌套远程函数 闭包变量传递
- 比如在远程函数中引用本地变量,Ray 会自动序列化传递:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15import ray
ray.init(ignore_reinit_error=True)
def outer_remote(x):
# 闭包变量 x 会被 Ray 自动序列化到远程节点
def inner_remote(y):
return x + y
return inner_remote.remote(10)
print(ray.get(ray.get(outer_remote.remote(5)))) # 输出:15
ray.shutdown()
执行位置:“本地固定” vs “集群分布式调度”
- 执行位置的差异是两者最直观的区别,直接决定了是否能利用集群资源:
本地函数:执行在「调用方所在进程」
- 本地函数的执行位置完全固定:
- 无论在哪里调用(即使在远程函数内部调用本地函数),函数都会在「发起调用的进程」中执行【存疑】
- 问题:会出错吧,理论上远程函数内部无法调用本地函数?
- 若在远程函数中调用本地函数,本质是在「远程节点的 Worker 进程」中执行,但该进程没有本地函数的定义(除非手动同步代码),所以必然失败;
- 无并发能力:多个调用会串行执行在同一个进程/线程(或 Python 多进程的子进程,但需手动管理)
- 无论在哪里调用(即使在远程函数内部调用本地函数),函数都会在「发起调用的进程」中执行【存疑】
远程函数:执行在「集群 Worker 进程」
Ray 远程函数的执行位置由 Ray 集群的调度器统一管理:
- 1)调用
func.remote()时,会向 Ray 调度器提交一个任务 - 2)调度器根据集群节点的资源(CPU、GPU、内存)情况,将任务分配到任意可用节点的 Worker 进程
- 3)执行完成后,结果会存储在 Ray 的对象存储中,通过
ray.get()可获取 - 4)支持并发:多个
remote()调用会被调度到不同 Worker 进程/节点,并行执行
- 1)调用
示例:远程函数分布式执行(多节点/多进程并发)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26import ray
import os
import time
# os.environ["RAY_DEDUP_LOGS"] = "0" # 本意是让每个进程结果都完整输出,但这行代码仅当前进程生效,需要启动前环境变量才可以
ray.init(ignore_reinit_error=True)
# Ray 远程函数:打印执行节点的进程 ID 和节点名
def add_remote(a, b):
node_name = ray.util.get_node_ip_address() # 获取执行节点 IP
pid = os.getpid() # 获取执行进程 ID
print(f"在节点 {node_name} 的进程 {pid} 执行 add({a}, {b})")
time.sleep(1) # 模拟耗时操作
return a + b
# 提交 5 个并发任务(会被调度到不同 Worker 进程)
start = time.time()
result_refs = [add_remote.remote(i, i*2) for i in range(5)]
results = ray.get(result_refs) # 等待所有任务完成
end = time.time()
print("结果:", results) # 输出:[0, 3, 6, 9, 12]
print(f"总耗时: {end - start:.2f}s") # 约 1s(并发执行,而非 5s 串行)
ray.shutdown()执行上述脚本:
1
2export RAY_DEDUP_LOGS=0
python demo.py- 注意:仅在代码里面添加
os.environ["RAY_DEDUP_LOGS"] = "0"是不够的,因为:- Ray 的日志去重功能是在 Worker 进程启动时就决定的,而 Worker 是由 Ray 的主进程(Driver)启动的
- 上面的代码在
ray.init()之后才启动 Worker,那么环境变量必须在 Driver 启动 Worker 之前就传递过去,否则 Worker 进程会继承默认的去重配置 - 所以最安全的打印所有日志的方式就是再启动脚本前配置环境变量
- 另一种实现方式是在远程函数中返回 PID,然后由 Driver 打印
- 注意:仅在代码里面添加
输出示例:
1
2
3
4
5
6
7
82025-11-04 11:42:43,175 INFO worker.py:1918 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265
(add_remote pid=14393) 在节点 127.0.0.1 的进程 14393 执行 add(2, 4)
(add_remote pid=14399) 在节点 127.0.0.1 的进程 14399 执行 add(4, 8)
(add_remote pid=14398) 在节点 127.0.0.1 的进程 14398 执行 add(1, 2)
(add_remote pid=14400) 在节点 127.0.0.1 的进程 14400 执行 add(3, 6)
(add_remote pid=14396) 在节点 127.0.0.1 的进程 14396 执行 add(0, 0)
结果: [0, 3, 6, 9, 12]
总耗时: 1.62s