整体说明
- 分布式系统 是由一组通过网络相互连接的独立计算节点(或计算机)组成的集合,这些节点协同工作以实现一个共同的目标,实现高效的计算和处理
- MPI(Message Passing Interface,消息传递接口) 是一个跨语言的并行计算通信协议,也是一个应用程序接口(API)标准,允许程序在分布式内存系统中高效地交换数据
- MPI 定义了一套标准的库函数和语义规则,允许在非共享内存环境下的多个进程(通常运行在不同的处理器或计算节点上)通过发送和接收消息进行通信和协作
- mpi4py 库是一个构建在 MPI 之上的 Python 库 ,主要使用 Cython 编写
- Cython 的目标是让 Python 代码具备 C 语言的高性能,它是 Python 的一个超集,既支持 Python 语法,又能调用 C 函数、定义 C 类型变量,从而优化 Python 代码的性能
- mpi4py 库以面向对象的方式提供了在 Python 环境下调用 MPI 标准的编程接口,这些接口构建在 MPI-2 C++ 编程接口基础之上,与 C++ 的 MPI 编程接口类似
- mpi4py 库实现了很多 MPI 标准中的接口,包括点对点通信、集合通信、阻塞/非阻塞通信、组间通信等
- 可以在不同进程间传递任何可被 pickle 序列化的内置和用户自定义 Python 对象
安装 mpi4py
Mac 环境安装
安装 Homebrew(若已安装可跳过):
1
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"
安装 Open MPI(建议使用 Open MPI,因为它在 Mac 上通常更容易配置):
1
brew install open-mpi
- 这一步安装依赖比较多,需要一些时间
- 安装完成后,你可以通过运行
mpiexec --version来验证 Open MPI 是否正确安装
安装 mpi4py:
1
pip install mpi4py
(常见报错解法)如果你的系统中有多个 MPI 版本,或者
pip无法找到正确的 MPI 库,你可能需要设置MPICC环境变量来指定 MPI 编译器。例如:1
2export MPICC=/opt/homebrew/bin/mpicc # 根据你的 Open MPI 安装路径调整
pip install mpi4py在安装 mpi4py 后运行以下命令来验证安装:
1
python -c "import mpi4py; print(mpi4py.__version__)"
Ubuntu 环境安装(待补充)
mpi4py 的使用
mpi4py 的操作总结(通信原语)
- mpi4py提供了并行计算所需的各种通信操作,主要分为两类:
- 点对点通信(Point-to-Point Communication) :
- 在两个特定进程间进行数据交换
- 主要操作:Send/send, Recv/recv, Isend/isend, Irecv/irecv, Sendrecv/sendrecv等
- 集体通信(Collective Communication) :
- 涉及通信组(communicator)中的所有进程
- 提供更高效的全局数据操作
- 不同集体通信操作对比
操作类型 通信模式 数据流向 结果分布 典型应用场景 Barrier/barrier 同步 无数据传输 无 进程同步 Bcast/bcast 一对多 根 -> 所有 所有进程相同 分发配置参数 Scatter/scatter 一对多 根 -> 各部分 每个进程不同 数据并行分解 Gather/gather 多对一 所有 -> 根 仅根进程有结果 结果收集 Allgather/allgather 多对多 所有 -> 所有 所有进程相同 全局信息共享 Alltoall/alltoall 多对多 所有↔所有 每个进程不同 矩阵转置 Reduce/reduce 多对一 所有 -> 规约 -> 根 仅根进程有结果 全局统计计算 Allreduce/allreduce 多对多 所有 -> 规约 -> 所有 所有进程相同 需要全局结果的并行计算 Scan/scan 多对多 前缀累积 每个进程不同 累积计算 Reduce_scatter 多对多 先reduce再scatter 每个进程不同 - 在很多地方中,表述也使用类似 All-Gather, All-to-All, All-Reduce 等来表达
- All-Gather 和 All-to-All 的区别:
- All-Gather 和 All-to-All 都是多对多发送数据
- 发送数据上来看:
- All-Gather 中,从任意进程的视角看,向不同进程发送的数据是相同的;
- All-to-All 中则向不同进程发送不同数据,默认数据的维度和
world_size相同,每个receive_rank进程会得到其他进程send_data[receive_rank]的数据
- 结果上来看:All-Gather 操作后,各个进程最终的数据是相同的;All-to-All 操作后,不同进程最终的数据不同
mpi4py 使用演示
以下 mpi4py 示例均以小 Pythonic 风格(小写) 为例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247from mpi4py import MPI
import numpy as np
import time
def main():
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
print(f"进程 {rank} 启动,共有 {size} 个进程")
# 1. barrier - 进程同步
comm.Barrier() # 与 comm.barrier() 等价
# 2. bcast - 广播数据
if rank == 0:
broadcast_data = {"message": "这是来自 rank=0 的 broadcast 消息", "value": 123}
else:
broadcast_data = None
broadcast_data = comm.bcast(broadcast_data, root=0)
print(f"进程 {rank} 收到 bcast 数据: {broadcast_data}")
# 3. scatter - 分发数据
if rank == 0:
scatter_data = [f"from-0-index-{i}" for i in range(size)]
else:
scatter_data = None
data = comm.scatter(scatter_data, root=0)
print(f"进程 {rank} 收到 scatter 数据: {data}")
comm.Barrier()
# 4. gather - 收集数据
if rank == 0:
print(f"==开始测试更复杂的原语:==")
comm.Barrier()
local_data = np.arange(size * rank, size * (rank + 1), dtype=np.int32)
print(f"进程 {rank} 原始数据: {local_data}")
gathered_data = comm.gather(local_data, root=0)
print(f"进程 {rank} 收到 gather 数据: {gathered_data}")
# 5. allgather - 全收集数据
all_gathered_data = comm.allgather(local_data)
print(f"进程 {rank} 收到 allgather 数据: {all_gathered_data}")
# 6. alltoall - 全对全数据交换
recv_data = comm.alltoall(local_data)
print(f"进程 {rank} 收到 alltoall 数据: {recv_data}")
# 7. reduce - 规约操作
recv_data = comm.reduce(local_data, op=MPI.SUM, root=0)
print(f"进程 {rank} 收到 reduce 数据: {recv_data}")
# 8. allreduce - 全局规约操作
recv_data = comm.allreduce(local_data, op=MPI.SUM)
print(f"进程 {rank} 收到 allreduce 数据: {recv_data}")
time.sleep(0.1)
# 高阶,任意维度的 allreduce 操作
comm.Barrier()
if rank == 0:
print(f"==测试任意维度的 allreduce 操作:==")
comm.Barrier()
all_reduce_local_data = np.array([rank*i for i in range(size+3)], dtype=np.int32)
print(f"进程 {rank} 原始 all_reduce_local_data 数据: {all_reduce_local_data}")
recv_data = comm.allreduce(all_reduce_local_data, op=MPI.SUM) # 输入可以是任意维度
print(f"进程 {rank} 收到 allreduce 数据: {recv_data}")
comm.Barrier()
time.sleep(0.1)
# 9. scan - 前缀积累,按照 rank 累积
recv_data = comm.scan(local_data, op=MPI.SUM)
print(f"进程 {rank} 收到 scan 数据: {recv_data}")
comm.Barrier()
time.sleep(0.1)
# 增加:reduce_scatter - reduce_scatter规约操作
if rank == 0:
print(f"==开始 reduce_scatter 演示==")
local_data = np.array([rank*i for i in range(size)])
reduce_scatter_recv_data = np.array(0, local_data.dtype) # 必须将发送数据和接受数据的类型对齐,否则会出现类型不匹配的错误
print(f"进程 {rank} local_data 原始数据: {local_data}")
comm.Barrier()
print(f"进程 {rank} reduce_scatter_recv_data 原始数据: {reduce_scatter_recv_data}")
comm.Reduce_scatter(local_data, reduce_scatter_recv_data, op=MPI.SUM)
print(f"进程 {rank} 收到 Reduce_scatter 数据: {reduce_scatter_recv_data}")
comm.Barrier()
time.sleep(0.1)
# 点对点通信函数演示
if rank == 0:
print(f"==开始点对点通信演示==")
# 10. send 和 recv - 阻塞式发送接收
if rank == 0:
dest = 1
message = f"from {rank}, 你好,进程 1!"
comm.send(message, dest=dest)
print(f"进程 {rank} 发送消息到进程 {dest}")
source = 1
recv_msg = comm.recv(source=source)
print(f"进程 {rank} 从进程 {source} 收到消息: {recv_msg}")
elif rank == 1:
source = 0
recv_msg = comm.recv(source=source)
print(f"进程 {rank} 从进程 {source} 收到消息: {recv_msg}")
dest = 0
message = f"from {rank}, 你好,进程 0!"
comm.send(message, dest=dest)
print(f"进程 {rank} 发送消息到进程 {dest}")
comm.Barrier()
# 11. isend 和 irecv - 非阻塞式发送接收
if rank == 2:
dest = 3
message = "非阻塞消息"
req = comm.isend(message, dest=dest)
source = 3
req_recv = comm.irecv(source=source)
# 可以在这里执行其他计算
req.wait() # 等待发送完成
print(f"进程 {rank} 非阻塞发送完成")
recv_msg = req_recv.wait() # 等待接收完成
print(f"进程 {rank} 收到非阻塞消息: {recv_msg}")
elif rank == 3:
source = 2
req_recv = comm.irecv(source=source)
dest = 2
message = "非阻塞回复"
req = comm.isend(message, dest=dest)
# 可以在这里执行其他计算
recv_msg = req_recv.wait() # 等待接收完成
print(f"进程 {rank} 收到非阻塞消息: {recv_msg}")
req.wait() # 等待发送完成
print(f"进程 {rank} 非阻塞发送完成")
comm.Barrier()
# 12. sendrecv - 同时发送和接收
send_val = -rank * 100
dest = (rank + 1) % size # 发送给下一个
source = (rank - 1) if rank - 1 >= 0 else size - 1 # 接收自来自上一个的
# 上面的实现是一个圆环的通信方式,节点之间消息是依次流转的
print(f"rank={rank}, dest={dest}, source={source}")
# 注意:一定要对齐,A.dest = B,则必须有 B.source = A,否则下面的语句会卡死(一直等待)
recv_val = comm.sendrecv(send_val, dest=dest, source=source)
print(f"进程 {rank} 发送 {send_val} 到进程 {dest},从进程 {source} 接收 {recv_val}")
# 所有演示操作做完,最终同步
comm.Barrier()
if rank == 0:
print("\n所有进程完成演示")
if __name__ == "__main__":
main()
# 进程 0 启动,共有 4 个进程
# 进程 2 启动,共有 4 个进程
# 进程 3 启动,共有 4 个进程
# 进程 1 启动,共有 4 个进程
# 进程 0 收到 bcast 数据: {'message': '这是来自 rank=0 的 broadcast 消息', 'value': 123}
# 进程 1 收到 bcast 数据: {'message': '这是来自 rank=0 的 broadcast 消息', 'value': 123}
# 进程 2 收到 bcast 数据: {'message': '这是来自 rank=0 的 broadcast 消息', 'value': 123}
# 进程 3 收到 bcast 数据: {'message': '这是来自 rank=0 的 broadcast 消息', 'value': 123}
# 进程 1 收到 scatter 数据: from-0-index-1
# 进程 3 收到 scatter 数据: from-0-index-3
# 进程 0 收到 scatter 数据: from-0-index-0
# ==开始测试更复杂的原语:==
# 进程 2 收到 scatter 数据: from-0-index-2
# 进程 0 原始数据: [0 1 2 3]
# 进程 1 原始数据: [4 5 6 7]
# 进程 3 原始数据: [12 13 14 15]
# 进程 1 收到 gather 数据: None
# 进程 3 收到 gather 数据: None
# 进程 2 原始数据: [ 8 9 10 11]
# 进程 2 收到 gather 数据: None
# 进程 0 收到 gather 数据: [array([0, 1, 2, 3], dtype=int32), array([4, 5, 6, 7], dtype=int32), array([ 8, 9, 10, 11], dtype=int32), array([12, 13, 14, 15], dtype=int32)]
# 进程 0 收到 allgather 数据: [array([0, 1, 2, 3], dtype=int32), array([4, 5, 6, 7], dtype=int32), array([ 8, 9, 10, 11], dtype=int32), array([12, 13, 14, 15], dtype=int32)]
# 进程 1 收到 allgather 数据: [array([0, 1, 2, 3], dtype=int32), array([4, 5, 6, 7], dtype=int32), array([ 8, 9, 10, 11], dtype=int32), array([12, 13, 14, 15], dtype=int32)]
# 进程 3 收到 allgather 数据: [array([0, 1, 2, 3], dtype=int32), array([4, 5, 6, 7], dtype=int32), array([ 8, 9, 10, 11], dtype=int32), array([12, 13, 14, 15], dtype=int32)]
# 进程 2 收到 allgather 数据: [array([0, 1, 2, 3], dtype=int32), array([4, 5, 6, 7], dtype=int32), array([ 8, 9, 10, 11], dtype=int32), array([12, 13, 14, 15], dtype=int32)]
# 进程 1 收到 alltoall 数据: [1, 5, 9, 13]
# 进程 0 收到 alltoall 数据: [0, 4, 8, 12]
# 进程 2 收到 alltoall 数据: [2, 6, 10, 14]
# 进程 3 收到 alltoall 数据: [3, 7, 11, 15]
# 进程 3 收到 reduce 数据: None
# 进程 1 收到 reduce 数据: None
# 进程 2 收到 reduce 数据: None
# 进程 0 收到 reduce 数据: [24 28 32 36]
# 进程 1 收到 allreduce 数据: [24 28 32 36]
# 进程 0 收到 allreduce 数据: [24 28 32 36]
# 进程 3 收到 allreduce 数据: [24 28 32 36]
# 进程 2 收到 allreduce 数据: [24 28 32 36]
# ==测试任意维度的 allreduce 操作:==
# 进程 0 原始 all_reduce_local_data 数据: [0 0 0 0 0 0 0]
# 进程 2 原始 all_reduce_local_data 数据: [ 0 2 4 6 8 10 12]
# 进程 3 原始 all_reduce_local_data 数据: [ 0 3 6 9 12 15 18]
# 进程 1 原始 all_reduce_local_data 数据: [0 1 2 3 4 5 6]
# 进程 0 收到 allreduce 数据: [ 0 6 12 18 24 30 36]
# 进程 2 收到 allreduce 数据: [ 0 6 12 18 24 30 36]
# 进程 1 收到 allreduce 数据: [ 0 6 12 18 24 30 36]
# 进程 3 收到 allreduce 数据: [ 0 6 12 18 24 30 36]
# 进程 3 收到 scan 数据: [24 28 32 36]
# 进程 0 收到 scan 数据: [0 1 2 3]
# 进程 1 收到 scan 数据: [ 4 6 8 10]
# 进程 2 收到 scan 数据: [12 15 18 21]
# 进程 3 local_data 原始数据: [0 3 6 9]
# 进程 2 local_data 原始数据: [0 2 4 6]
# 进程 1 local_data 原始数据: [0 1 2 3]
# ==开始 reduce_scatter 演示==
# 进程 0 local_data 原始数据: [0 0 0 0]
# 进程 0 reduce_scatter_recv_data 原始数据: 0
# 进程 1 reduce_scatter_recv_data 原始数据: 0
# 进程 2 reduce_scatter_recv_data 原始数据: 0
# 进程 3 reduce_scatter_recv_data 原始数据: 0
# 进程 0 收到 Reduce_scatter 数据: 0
# 进程 3 收到 Reduce_scatter 数据: 18
# 进程 2 收到 Reduce_scatter 数据: 12
# 进程 1 收到 Reduce_scatter 数据: 6
# ==开始点对点通信演示==
# 进程 0 发送消息到进程 1
# 进程 1 从进程 0 收到消息: from 0, 你好,进程 1!
# 进程 1 发送消息到进程 0
# 进程 0 从进程 1 收到消息: from 1, 你好,进程 0!
# 进程 2 非阻塞发送完成
# 进程 3 收到非阻塞消息: 非阻塞消息
# 进程 3 非阻塞发送完成
# 进程 2 收到非阻塞消息: 非阻塞回复
# rank=2, dest=3, source=1
# rank=3, dest=0, source=2
# rank=0, dest=1, source=3
# rank=1, dest=2, source=0
# 进程 1 发送 -100 到进程 2,从进程 0 接收 0
# 进程 0 发送 0 到进程 1,从进程 3 接收 -300
# 进程 2 发送 -200 到进程 3,从进程 1 接收 -100
# 进程 3 发送 -300 到进程 0,从进程 2 接收 -200
#
# 所有进程完成演示启动上述代码的命令为:
1
mpiexec -n 4 python example.py
附录:传输不同维度的数据
- 在上述
allgather, alltoall, allreduce等操作中,同一个进程传输给其他进程的数据不一定要维度完全相等,甚至类型也不一定要相同,只需要能够做对应的MPIop 就可以 - 示例(某个元素改成列表):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77from mpi4py import MPI
import numpy as np
import time
from numba.cuda import local
def main():
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
print(f"进程 {rank} 启动,共有 {size} 个进程")
comm.Barrier()
local_data = [
[0, 1, 2, 3],
[4, 5, 6, 7],
[ 8, 9, [10,18], 11],
[12, 13, 14, 15],
]
local_data = local_data[rank]
print(f"进程 {rank} 原始数据: {local_data}")
comm.Barrier()
time.sleep(0.1)
# allgather
all_gathered_data = comm.allgather(local_data)
print(f"进程 {rank} 收到 allgather 数据: {all_gathered_data}")
comm.Barrier()
time.sleep(0.1)
# alltoall
recv_data = comm.alltoall(local_data)
print(f"进程 {rank} 收到 alltoall 数据: {recv_data}")
comm.Barrier()
time.sleep(0.1)
# areduce
recv_data = comm.reduce(local_data, op=MPI.SUM, root=0)
print(f"进程 {rank} 收到 reduce 数据: {recv_data}")
comm.Barrier()
time.sleep(0.1)
# allreduce - 全局规约操作
recv_data = comm.allreduce(local_data, op=MPI.SUM)
print(f"进程 {rank} 收到 allreduce 数据: {recv_data}")
comm.Barrier()
time.sleep(0.1)
if __name__ == "__main__":
main()
# 进程 2 启动,共有 4 个进程
# 进程 3 启动,共有 4 个进程
# 进程 1 启动,共有 4 个进程
# 进程 0 启动,共有 4 个进程
# 进程 0 原始数据: [0, 1, 2, 3]
# 进程 1 原始数据: [4, 5, 6, 7]
# 进程 3 原始数据: [12, 13, 14, 15]
# 进程 2 原始数据: [8, 9, [10, 18], 11]
# 进程 3 收到 allgather 数据: [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, [10, 18], 11], [12, 13, 14, 15]]
# 进程 1 收到 allgather 数据: [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, [10, 18], 11], [12, 13, 14, 15]]
# 进程 0 收到 allgather 数据: [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, [10, 18], 11], [12, 13, 14, 15]]
# 进程 2 收到 allgather 数据: [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, [10, 18], 11], [12, 13, 14, 15]]
# 进程 1 收到 alltoall 数据: [1, 5, 9, 13]
# 进程 0 收到 alltoall 数据: [0, 4, 8, 12]
# 进程 3 收到 alltoall 数据: [3, 7, 11, 15]
# 进程 2 收到 alltoall 数据: [2, 6, [10, 18], 14]
# 进程 3 收到 reduce 数据: None
# 进程 2 收到 reduce 数据: None
# 进程 0 收到 reduce 数据: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, [10, 18], 11, 12, 13, 14, 15]
# 进程 1 收到 reduce 数据: None
# 进程 2 收到 allreduce 数据: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, [10, 18], 11, 12, 13, 14, 15]
# 进程 0 收到 allreduce 数据: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, [10, 18], 11, 12, 13, 14, 15]
# 进程 1 收到 allreduce 数据: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, [10, 18], 11, 12, 13, 14, 15]
# 进程 3 收到 allreduce 数据: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, [10, 18], 11, 12, 13, 14, 15]
附录:导入 mpi4py 包的方式
必须使用:
1
from mpi4py import MPI
不能使用:
1
2import mpi4py
MPI = mpi4py.MPI两种写法看似相同, 实际上不一样,第二种方法报错为
AttributeError: module 'mpi4py' has no attribute 'MPI'这是因为 第二种方法 MPI 环境未初始化 ,mpi4py 要求在使用 MPI 功能前必须先初始化,而
import mpi4py不会自动完成这一步
不同导入方式的差别
假设有两种导入方式:
1
2
3
4
5
6# 方式一
from packagea import A
# 方式二
import packagea
A = packagea.A如果
packagea是一个包(包含__init__.py),但__init__.py中未导入或暴露A,方式二将无法通过包名直接访问A当
packagea的__init__.py未显式导入或暴露A时,方式一(from packagea import A)仍能成功的原因与 Python 的导入机制和包结构有关Python 在执行
from packagea import A时,会按以下步骤查找A:- 1) 检查
packagea.__init__.py:若__init__.py中定义或导入了A,直接使用 - 2) 搜索子模块 :若
__init__.py未包含A,Python 会在packagea目录下查找是否存在A.py或A/__init__.py - 3) 递归子模块 :若仍未找到,Python 会尝试递归导入子模块中的
A(例如packagea.module_a.A),但需显式指定路径(如from packagea.module_a import A)
- 1) 检查
mpi4py 两种导入结果不同的原因
- 方式一成功的原因 :当执行
from mpi4py import MPI时:- 1) Python 加载
mpi4py包,执行__init__.py - 2)
__init__.py注册元路径导入器(_mpiabi._install_finder()) - 3) Python 发现
__all__中包含MPI,但__init__.py中未显式定义 - 4) 触发元路径导入器,根据系统 MPI 环境选择并加载对应的
MPI.so文件(注意名称不一定是这个,可能是根据不同系统命名的文件)- 注:已确认在
~/anaconda3/envs/xxx/lib/python3.10/site-packages/mpi4py/路径下不存在MPI.so文件 - 虽然 mpi4py 目录下没有直接名为
MPI.so的文件,但实际存在MPI.mpich.cpython-310-darwin.so(针对 MPICH 实现的扩展模块) 和MPI.openmpi.cpython-310-darwin.so(针对 OpenMPI 实现的扩展模块) _mpiabi._install_finder()的作用之一是根据系统中实际安装的 MPI 库(通过环境变量或系统命令探测),选择对应的.so文件
- 注:已确认在
- 5)
MPI扩展模块被加载,并绑定到mpi4py命名空间,导入成功
- 1) Python 加载
- 方式二失败的原因 :方式一(
import mpi4py后mpi4py.MPI)失败是因为:import mpi4py仅执行__init__.py,不会触发元路径导入器对MPI的查找__init__.py中没有显式导入或定义MPI(如from .MPI import MPI),因此mpi4py.MPI不存在于命名空间中
附录:传统 MPI 风格函数的使用
- 在
mpi4py中,函数名的大小写是有区别的,主要涉及两种不同的编程接口风格:Pythonic 风格(小写) 和 传统 MPI C/Fortran 风格(大写) - 总结:
- 小写函数 :更 Pythonic,返回数据,适合动态数据,代码简洁,不需要管理缓冲区
- 大写函数 :类似 C/Fortran MPI,需要缓冲区,适合高性能计算(避免数据拷贝)
小写函数(Pythonic 风格)
返回数据 ,而不是修改传入的缓冲区(更符合 Python 习惯)
通常更简洁,适合 Python 风格的编程
适用于 NumPy 数组和 Python 对象(如列表、字典等)
以
gather为例:1
2
3
4
5
6
7
8
9
10
11import numpy as np
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
# 使用小写 gather(返回结果,不修改传入的 recvbuf)
gathered_data = comm.gather(data, root=0)
if rank == 0:
print("Gathered data:", gathered_data) # 输出:[0, 10, 20, ...]特点:
gather返回一个列表 ,包含所有进程发送的数据(仅在root进程有效)- 不需要预先分配
recvbuf,适合动态数据
大写函数(传统 MPI 风格)
需要预先分配接收缓冲区(
recvbuf) ,类似 C/Fortran 的 MPI 接口直接修改传入的缓冲区 ,而不是返回数据
适用于高性能计算(特别是 NumPy 数组 ,避免数据拷贝)
以
gather为例:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21import numpy as np
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
# 每个进程准备自己的数据(NumPy 数组)
sendbuf = np.array([rank * 10], dtype=int)
if rank == 0:
# 预先分配 recvbuf(大小必须匹配)
recvbuf = np.empty(size, dtype=int)
else:
recvbuf = None # 非 root 进程不需要 recvbuf
# 使用大写 Gather(修改 recvbuf)
comm.Gather(sendbuf, recvbuf, root=0)
if rank == 0:
print("Gathered data:", recvbuf) # 输出:[0, 10, 20, ...]Gather需要预先分配recvbuf(在root进程)适用于高性能计算(避免 Python 对象的额外开销)
类似函数总结
| 小写(Pythonic) | 大写(传统 MPI) | 用途 |
|---|---|---|
bcast |
Bcast |
广播数据 |
scatter |
Scatter |
分散数据 |
gather |
Gather |
收集数据 |
allgather |
Allgather |
全收集 |
reduce |
Reduce |
规约计算 |
allreduce |
Allreduce |
全规约 |
scan |
Scan |
前缀计算 |
附录:分布式中 GPU 主要集体通信操作介绍
- 分布式计算中常用的集体通信(Collective Communication)操作有All-Gather、All-Reduce 和 Reduce-Scatter,主要用于多进程或多设备(如GPU)之间的数据交互
All-Gather(全收集)
- All-Gather:每个进程提供一块数据,最终所有进程收集到所有其他进程的数据,结果是一个包含所有数据的聚合
- 输入:每个进程有一块独立数据(如
data_i) - 输出:所有进程得到相同的全量数据(
[data_0, data_1, ..., data_n]) - 举例:进程0有
A,进程1有B,进程2有C-> 最终所有进程得到[A, B, C] - 功能:参数广播、分布式训练中同步模型参数
All-Reduce(全规约)
- All-Reduce:先对所有进程的数据进行规约操作(如求和、最大值等),然后将结果分发给所有进程
- 输入:每个进程有一块数据(如
data_i) - 计算:对所有
data_i执行规约(如sum(data_0, data_1, ..., data_n)) - 输出:所有进程得到相同的规约结果(如
sum) - 举例:进程0有
1,进程1有2,进程2有3-> 求和后所有进程得到6 - 功能:梯度聚合(如分布式训练中多卡梯度的全局求和)
Reduce-Scatter(规约散播)
- Reduce-Scatter:先对所有进程的数据进行规约操作,然后将结果按块分散到不同进程中,每个进程只获得结果的一部分
- 输入:每个进程有一块数据(如
data_i) - 计算:规约所有数据(如
sum),然后将结果按进程数切分 - 输出:进程
i获得结果的第i块 - 举例:进程0有
[1, 2],进程1有[3, 4],进程2有[5, 6]-> 全局求和为[9, 12],然后进程0得到9,进程1得到12 - 功能:分布式矩阵计算中分块结果的聚合
整体总结
| 操作 | 输入 | 计算步骤 | 输出 | 是否全量同步 |
|---|---|---|---|---|
| All-Gather | 每个进程一块数据 | 收集所有数据并广播 | 所有进程获得全量数据 | 是 |
| All-Reduce | 每个进程一块数据 | 规约所有数据并广播结果 | 所有进程获得相同的规约结果 | 是 |
| Reduce-Scatter | 每个进程一块数据 | 规约所有数据并按块分发 | 每个进程只获得结果的一部分 | 否 |
特别说明
- All-Reduce 可以拆分为 Reduce-Scatter + All-Gather(先局部规约后全局同步)
- 性能差异 :All-Reduce 通常比分开的两步操作更高效(优化后的算法如 Ring-AllReduce)
附录:Ring-AllReduce
- Ring-AllReduce 是 All-Reduce 的一种高效实现算法,也写为 Ring AllReduce、Ring All-Reduce 等
以数据并行(DP)场景为例
- 假设有 N 个 GPU,每个 GPU 上都有全部参数和一部分数据
- 目标是保证所有 GPU 都能完成一次完整的参数更新
- 特别注意:
- 每个 GPU 都只有一部分数据,所以需要拿到其他 GPU 的数据才能计算梯度(仅依赖自身甚至算不出任何一个参数的梯度)
- 有很多参数,每个 GPU 都要完成所有参数的更新
Ring AllReduce 的核心思想
- Ring AllReduce 的核心思想是:
- 先将所有 GPU 上的局部梯度数据按照参数分成不同的块(每个块包含一部分参数的局部梯度)
- 把所有 GPU 组成一个逻辑上的环形结构 ,大家只和自己的“邻居”交流 ,然后通过多次传递 ,最终让所有人都得到完整的梯度结果
Ring AllReduce 工作流程(分两步走)
- 为了方便理解,本文假设有 4 个 GPU(A, B, C, D)
分块并传递(Reduce-Scatter 阶段)
- 这一阶段的目标是每个 GPU 负责一部分参数的梯度聚合(编码为 \(S_0, S_1, S_2, S_3\))
- 首先,每个 GPU 把自己手里的局部梯度按照参数平均分成 N 份(每份叫一个“块”)
- 然后,大家开始轮流传递:每个 GPU 把自己一部分的“块”传给右边的邻居,同时从左边的邻居那里接收一个“块”
- 当收到邻居的“块”后,就把这个“块”和自己对应的“块”进行合并(比如求和)
- 这个过程重复进行 N-1 次,直到每个人的手里都拿到了自己负责的参数对应那部分的梯度并完成聚合
- 举个例子:
- A 有局部梯度 \(A_0, A_1, A_2, A_3\),下标表示参数块的索引
- B 有局部梯度 \(B_0, B_1, B_2, B_3\),下标表示参数块的索引
- C 有局部梯度 \(C_0, C_1, C_2, C_3\),下标表示参数块的索引
- D 有局部梯度 \(D_0, D_1, D_2, D_3\),下标表示参数块的索引
- 假设某个 GPU 的目标是最终实现参数块 0 的梯度累加计算,即 \(S_0 = A_0+B_0+C_0+D_0\)
- 其他 GPU 的最终目标分别是实现 1,2,3 块参数的梯度累加
- 第一轮:
- A 把 \(A_1\) 给 B,从 D 收到 \(D_0\)
- B 把 \(B_2\) 给 C,从 A 收到 \(A_1\)
- C 把 \(C_3\) 给 D,从 B 收到 \(B_2\)
- D 把 \(D_0\) 给 A,从 C 收到 \(C_3\)
- 然后大家各自合并收到的数据:
- A 现在有 \(A_1, A_2, A_3\), 还有 \((A_0+D_0)\)(计算后的结果)
- B 现在有 \(B_0, B_2, B_3\), 还有 \((B_1+A_1)\)(计算后的结果)
- C 现在有 \(C_0, C_1, C_3\), 还有 \((C_2+B_2)\)(计算后的结果)
- D 现在有 \(D_0, D_1, D_2\), 还有 \((D_3+C_3)\)(计算后的结果)
- 第二轮:
- A 把 \((A_0+D_0)\) 给 B,从 D 收到 \((D_3+C_3)\)
- B 把 \((B_1+A_1)\) 给 C,从 A 收到 \((A_0+D_0)\)
- C 把 \((C_2+B_2)\) 给 D,从 B 收到 \((B_1+A_1)\)
- D 把 \((D_3+C_3)\) 给 A,从 C 收到 \((C_2+B_2)\)
- 然后大家合并各自收到的数据:
- A 现在有 \((A_0+D_0), A_1, A_2\), 还有 \((A_3+C_3+D_3)\)(计算后的结果)
- …
- D 现在有 \(D_0, D_1, (D_3+C_3)\), 还有 \((B_2+C_2+D_2)\)(计算后的结果)
- 这个过程会进行 \(N-1\) 次(N 是参与者的数量),最终每个人手里都会有一部分“最终总和”的数据
- 最终得到,A 手里有 \(S_2\) ,B 手里有 \(S_3\) ,C 手里有 \(S_0\),D 手里有 \(S_1\)
收集并广播(All-Gather 阶段)
- 所有 GPU 再次开始传递,这次传递不再进行计算,而是把自己手里已经计算好的梯度,传给右边的邻居
- 当收到邻居的数据后,就把它保存下来
- 这个过程也重复进行,直到每个人都收到了所有梯度
- 不是一般性,假定(注意:与上面不同,但是不影响推导和理解):
- 现在 A 手里有 \(S_0\) ,B 有 \(S_1\) ,C 有 \(S_2\) ,D 有 \(S_3\)
- 第一轮:
- A 把 \(S_0\) 给 B
- B 把 \(S_1\) 给 C
- C 把 \(S_2\) 给 D
- D 把 \(S_3\) 给 A
- 然后大家各自保存收到的数据:
- A 现在有了 \(S_0\) (自己的) 和 \(S_3\) (从 D 收到)
- B 现在有了 \(S_1\) (自己的) 和 \(S_0\) (从 A 收到)
- 第二轮:
- A 把 \(S_3\) 传递给 B
- …
- D 把 \(S_2\) 传递给 A
- 第三轮:
- A 把 \(S_2\) 传递给 B,至此,B 从 A 处收到了 \(S_0, S_2, S_3\),加上自己的 \(S_1\),也就得到了完整的 \(S_0, S_1, S_2, S_3\)
- 其他节点也一次类推
- 这个过程同样进行 \(N-1\) 次,最终每个人都会收集到所有的 \(S_0, S_1, S_2, S_3\),从而得到了完整的总和
Ring AllReduce 的优点
- 高效利用带宽 :去中心化设计是的它不会让某个节点成为瓶颈
- 比如像传统的“参数服务器”模式,所有数据都传给一个中心服务器,而 Ring AllReduce 是让所有节点都参与数据传输和计算,充分利用了网络的带宽
- 良好的可扩展性 :即使参与的设备数量很多,它的通信效率也能保持相对稳定
- 非常适合大规模的分布式训练(比如训练大型深度学习模型)
- 降低延迟 :通过分块和流水线式的传输,它能有效地减少数据同步的等待时间
- 特别说明:每个 GPU 只能和自己邻居交流这个约束,看似是限制,实际是非常巧妙地设计,是的数据通道建立一次即可,且传输是连续的
通信量对比
- Ring All-Reduce 是 All-Reduce 的一种高效实现方式,在通信量方面相比传统的All-Reduce有显著优势。
- 假设存在 \(N\) 个设备(如GPU),每个设备的数据大小为 \(\Phi\)
- 传统 All-Reduce 的原始版本中,每个GPU需要发送 \((N - 1)\Phi\) 个数据,\(N\) 个 GPU 的总通信量为
$$N(N - 1)\Phi$$- 其通信量与GPU数量呈(N^2) 复杂度
- Ring All-Reduce 将每个 GPU 存储的数据顺序切分为 \(N\) 块,每块的数据量是 \(\frac{\Phi}{N}\)
- Ring All-Reduce 包含 Reduce-Scatter 和 All-Gather 两个步骤,每个步骤都需要 \(N - 1\) 次通信,每次通信的数据量为 \(\frac{\Phi}{N}\)
- 所以每个 GPU 的通信数据量为
$$\frac{2(N - 1)\Phi}{N} \approx 2\Phi$$
- 与传统 All-Reduce 相比,Ring All-Reduce 的每个 GPU 通信量显著减少,且通信量与设备数量 \(N\) 无关,只受限于逻辑环中最慢的两个 GPU 的连接