整体介绍
torch.distributed.dist.init_process_group是 PyTorch 分布式 包中用于初始化进程组的核心函数,它在分布式训练中负责协调多个进程之间的通信- 本文重点讲解
torch.distributed.dist.init_process_group函数的使用
dist.init_process_group 函数使用注意事项
- 必须在所有进程中调用该函数,且参数需保持一致(除
rank外) - 初始化后需调用
dist.destroy_process_group()进行清理,否则在复杂的程序中,容易出现资源泄露问题 - 实际使用中推荐通过
torch.distributed.launch或torchrun工具启动,他们会自动设置环境变量- 使用
torch.multiprocessing.spawn启动则需要自己管理参数或环境变量
- 使用
- 不同后端有不同的适用场景:GPU 集群优先用
nccl,CPU 集群用gloo(Mac 上实验使用gloo) - 确保所有进程能够访问到
init_method指定的地址或文件 - 初始化时的 IP 问题,有两个特点:
- MASTER_ADDR 必须是
rank=0的机器所在的 IP(torchrun --node_rank=0的机器所在的 IP),该进程负责作为 master 完成交流和初始化操作 - 在
dist.init_process_group执行后所有进程的地位是等价的,都可以作为 master (虚拟的含义:处理主要事务)- 比如,
dist.broadcast(tensor, src)中的src可以被指定为任意值 - 亲测,在单机启动后,想用哪个进程作为 master(处理主要事务)都可以
- 比如,
- 一般建议使用
rank=0的进程作为 master 处理主要事务
- MASTER_ADDR 必须是
函数原型及其参数讲解
dist.init_process_group函数原型1
2
3
4
5
6
7
8
9
10
11torch.distributed.init_process_group(
backend: Optional[str] = None,
init_method: Optional[str] = None,
timeout: Optional[timedelta] = None,
world_size: int = -1,
rank: int = -1,
store: Optional[Store] = None,
group_name: str = "",
pg_options: Optional[Any] = None,
device_id: Optional[torch.device] = None,
)
dist.init_process_group 主要参数说明
1)backend(必填)
- 指定通信后端,决定了进程间通信使用的底层协议
- 可选值:
'nccl'(推荐 GPU 通信)、'gloo'(CPU 和 GPU 均可)、'mpi'(需 MPI 库支持) - 注意:
'nccl'仅支持 GPU 且性能最优,'gloo'对 CPU 支持更好
2)init_method(可选)
- 指定进程组的初始化方式,常用方式如下面
'env://'(推荐):从环境变量读取配置(需设置MASTER_ADDR、MASTER_PORT等)'tcp://ip:port':指定主节点的 IP 和端口'file:///path':通过共享文件系统初始化(需所有进程可访问该路径)init_method参数的默认值为None,当使用默认值时,其行为取决于是否设置了环境变量TORCH_DISTRIBUTED_INIT_METHOD:- 如果设置了
TORCH_DISTRIBUTED_INIT_METHOD环境变量- 函数会自动使用该环境变量的值作为初始化方法(等价于显式传入
init_method=环境变量值) - 例如,若环境变量设置为
tcp://127.0.0.1:23456,则会以该 TCP 地址进行初始化
- 函数会自动使用该环境变量的值作为初始化方法(等价于显式传入
- 如果未设置
TORCH_DISTRIBUTED_INIT_METHOD环境变量- 此时会触发 默认初始化逻辑 ,函数会尝试从环境变量中读取分布式配置(等价于
init_method='env://') - 此时要求必须设置以下环境变量才能正常初始化:
MASTER_ADDR:主节点的 IP 地址MASTER_PORT:主节点的端口号(需所有进程可访问)WORLD_SIZE:总进程数(可选,部分启动工具会自动设置)RANK:当前进程的全局编号(0 为主进程,可选,部分启动工具会自动设置)
- 如果这些环境变量未正确设置,会抛出类似
RuntimeError: Expected env:// init method but no MASTER_ADDR or MASTER_PORT found的错误- 但无需担心:使用
torchrun(或python -m torch.distributed.launch)启动时,若不指定--master_addr和--master_port,则torchrun会默认 环境变量设置为MASTER_ADDR:MASTER_PORT=127.0.0.1:29500
- 但无需担心:使用
- 此时会触发 默认初始化逻辑 ,函数会尝试从环境变量中读取分布式配置(等价于
- 如果设置了
3)world_size(可选)
- 总进程数,即参与分布式训练的进程总数
- 使用
'env://'时可由环境变量WORLD_SIZE指定
4)rank(可选)
- 当前进程的编号(0 到 world_size-1),0 通常为主进程
- 使用
'env://'时可由环境变量RANK指定
5)timeout
- 通信超时时间,默认为 30 分钟(1800 秒)
- 对于耗时较长的操作可适当增大
6)store
- 与
init_method参数互斥,不指明store时,由init_method参数传入的方式决定初始化什么类型的 Store - init_method
- 详情见附录
- 与
7)pg_options
pg_options参数用于配置进程组的特定选项,它是一个可选参数,允许用户为不同的后端指定特定的配置选项- 该参数的类型通常是
Optional[Any],具体的选项内容取决于所使用的通信后端,主要用于:- 配置后端特定的优化参数
- 设置通信超时时间
- 调整内存使用策略
- 配置网络相关参数等
- 举例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14import torch.distributed as dist
# 示例:为NCCL后端配置特定选项
pg_options = {
'timeout': 1800, # 30分钟超时
'init_method_timeout': 300, # 初始化超时
}
dist.init_process_group(
backend="nccl",
world_size=4,
rank=0,
pg_options=pg_options
)
8)device_id
device_id参数用于将进程”绑定”到单个特定设备,从而实现后端特定优化,是一个torch.device类型的可选参数,主要用于 GPU 训练场景- NCCL 后端的特殊效果:在 NCCL 后端下,device_id 参数有两个重要影响
- 1)立即形成通信器 :通信器会立即形成(直接调用
ncclCommInit*而非延迟初始化) - 2)内存占用优化 :每个进程会在指定的GPU上占用显存,而不是在第一个可访问的GPU上[citation:7]
- 1)立即形成通信器 :通信器会立即形成(直接调用
- 示例:
1
2
3
4
5
6
7
8
9
10
11
12import torch
import torch.distributed as dist
# 指定当前进程使用的GPU设备
device_id = torch.device(f"cuda:{local_rank}")
dist.init_process_group(
backend="nccl",
world_size=world_size,
rank=rank,
device_id=device_id
)
示例:单节点多进程(使用 torch.multiprocessing)
- 下面的代码启动单节点时,无需配置环境变量,也不需要特殊的启动命令,使用
python命令即可1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26import torch
import torch.distributed as dist
import torch.multiprocessing as mp
def init_process(rank, world_size):
# 设置环境变量
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# 初始化进程组
dist.init_process_group(
backend='nccl', # 使用NCCL后端(GPU)
init_method='env://',
world_size=world_size,
rank=rank
)
# 后续分布式操作...
print(f"Process {rank} initialized")
# 销毁进程组
dist.destroy_process_group()
if __name__ == "__main__":
world_size = 4 # 4个进程
mp.spawn(init_process, args=(world_size,), nprocs=world_size, join=True)
示例:多节点分布式(通过环境变量配置)
在每个节点上运行的脚本中:
1
2
3
4
5
6
7
8
9
10
11
12
13import torch.distributed as dist
import os
# 环境变量通常由分布式启动工具设置
# 如 torch.distributed.launch 或 torchrun
dist.init_process_group(backend='nccl')
# 可通过以下方式获取当前进程信息
rank = dist.get_rank()
world_size = dist.get_world_size()
local_rank = int(os.environ.get('LOCAL_RANK', 0)) # 节点内的进程编号
print(f"Rank {rank}/{world_size}, Local rank {local_rank}")上面的代码运行多节点时需要提前指定环境变量(不同机器不同),或通过
torchrun等命令启动
使用时的一些常规规范写法
分布式启动后,常用到下面三个参数:
1
2
3rank: 当前进程在的全局进程编号
word_size: 分布式系统总进程数
local_rank: 当前进程在当前节点上的本地进程编号(标准用法)当使用
torchrun命令(或torch.distributed.launch命令时):可通过环境变量获取这三个参数(此时是默认设置的)
1
2
3rank = int(os.environ["RANK"])
world_size = int(os.environ["WORLD_SIZE"])
local_rank = int(os.environ["LOCAL_RANK"])此时启动环境,可以仅指定后端即可
1
dist.init_process_group(backend='nccl')
启动命令需要在不同的机器上执行以下,且在命令中传入当前机器对应的参数(自动转化成环境变量),比如:
1
2
3
4
5# 机器1
python -m torch.distributed.launch --nproc_per_node=4 --nnodes=2 --node_rank=0 --master_addr="localhost" --master_port=29500 DDP_demo.py
# 机器2
python -m torch.distributed.launch --nproc_per_node=4 --nnodes=2 --node_rank=1 --master_addr="localhost" --master_port=29500 DDP_demo.py- 易错点,要非常小心:
python -m torch.distributed.launch(或torchrun) 的启动参数必须在脚本名DDP_demo.py之前DDP_demo.py之后的参数都是传递给DDP_demo.py的,不再是python -m torch.distributed.launch(或torchrun)的参数- 不建议使用环境变量的方式定义
--master_port等参数(包括MASTER_PORT=12368 torchrun xx.py等方式也不建议)- 因为
torchrun会自动覆盖环境变量MASTER_PORT(即使没有显示传入--master_port参数也会用默认值27500覆盖)
- 因为
- 易错点,要非常小心:
(使用较少)当使用
torch.multiprocessing.spawn启动多进程时,需要自己主动管理环境变量,或通过参数传入无论使用哪种方式使用,均可使用下面的代码获取参数
1
2
3rank = dist.get_rank()
world_size = dist.get_world_size()
local_rank = int(os.environ["LOCAL_RANK"]) # 若未配置则需要使用参数显示传入
通过 torch.multiprocessing.spawn 启动完整示例
在代码里面使用
torch.multiprocessing.spawn函数启动多进程(不使用任何环境变量,整体管理较为复杂)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34import torch.distributed as dist
import torch.multiprocessing as mp
def train_fn(local_rank, args, world_size):
rank = args.start_rank + local_rank # 全局进程编号 = 起始rank + 本地进程编号
print(rank) # 输出全局进程编号
# 初始化分布式进程组
dist.init_process_group(
backend="nccl",
init_method=f"tcp://{args.master_addr}:{args.master_port}", # 多机多卡需要指定服务器地址,不能写死成 local
world_size=world_size,
rank=rank, # 全局进程编号 = 起始rank + 本地进程编号
)
# 绑定本地GPU
torch.cuda.set_device(local_rank)
device = torch.device("cuda", local_rank)
# ...
# 对于多机多卡,为了复用同一套代码(方便管理),使用传入参数的方式启动代码,方便传参实现不同 node 机器的启动
parser = argparse.ArgumentParser()
parser.add_argument("--master_addr", type=str, default="localhost", help="主节点IP")
parser.add_argument("--master_port", type=str, default="12355", help="主节点端口")
parser.add_argument("--world_size", type=int, required=True, help="总进程数")
parser.add_argument("--start_rank", type=int, required=True, help="当前机器进程的起始全局rank")
parser.add_argument("--num_gpus", type=int, default=torch.cuda.device_count(), help="当前机器的GPU数量")
args = parser.parse_args()
print(args)
# 启动当前机器的所有进程(num_gpus个),每台机器启动自己的进程数即可
mp.spawn(
train_fn,
args=(args, args.world_size), # 传递给train_fn的参数
nprocs=args.num_gpus, # 进程数=当前机器的GPU数
join=True # 等待所有子进程完成
)此时启动命令可以仅使用普通的
python命令(后面的参数可选)1
python demo.py --xxx xx
- 每个节点通过传入不同参数即可指定分布式进程数量,允许不同节点不同数量(
start_rank数和启动时的num_gpus参数控制)
- 每个节点通过传入不同参数即可指定分布式进程数量,允许不同节点不同数量(
通过 torch.distributed.launch 命令启动
假设脚本名为
ddp_demo.py单机4卡,使用4个GPU启动:
1
python -m torch.distributed.launch --nproc_per_node=4 ddp_demo.py
--nproc_per_node:指定单机的GPU数量
多机多卡,2个机器,各有4个GPU启动:
1
2
3
4
5# 机器1
python -m torch.distributed.launch --nproc_per_node=4 --nnodes=2 --node_rank=1 --master_addr="localhost" --master_port=29500 DDP_demo.py
# 机器2
python -m torch.distributed.launch --nproc_per_node=4 --nnodes=2 --node_rank=0 --master_addr="localhost" --master_port=29500 DDP_demo.py--nnodes:总节点数--node_rank:当前节点编号--master_addr:指定master节点的IP地址,默认值是”localhost”,但是显示给出来更明确--master_port:指定master节点的端口号,默认值是 29500,但是显示给出来更明确
通过 torchrun 命令启动
以下结论是在 Mac 系统下得到的,暂时没有在 Linux 上尝试
torchrun和torch.distributed.launch都是 PyTorch 中用于启动分布式训练的工具torchrun是 PyTorch 1.10 后推出的新一代工具,旨在替代torch.distributed.launch两者的核心区别:
特性 torch.distributed.launchtorchrun推出时间 较早版本(已逐步废弃) PyTorch 1.10+ 推出(推荐使用) 进程管理 依赖用户手动管理进程(如指定 --node_rank等)自动管理进程,支持弹性训练(节点故障后自动恢复) 配置方式 大部分参数需通过命令行传入 支持从环境变量、命令行、配置文件读取参数 容错能力 无弹性训练支持,进程崩溃后需手动重启 支持弹性训练( --max_restarts等参数)日志管理 日志输出较为基础 提供更规范的日志管理,区分不同进程的输出 torchrun启动单机多卡(与torch.distributed.launch相同):1
2
3
4
5回顾 `torch.distributed.launch` 的启动方式
python -m torch.distributed.launch --nproc_per_node=4 ddp_demo.py
torchrun 的启动方式:替换 `python -m torch.distributed.launch` 为 `torchrun` 即可
torchrun --nproc_per_node=4 ddp_demo.pytorchrun启动多机多卡(相对torch.distributed.launch而言,torchrun会自动推断部分参数(如节点数、进程数))回顾
torch.distributed.launch需要手动指定总节点数(--nnodes)和当前节点序号(--node_rank)1
2
3
4
5# 机器1
python -m torch.distributed.launch --nproc_per_node=4 --nnodes=2 --node_rank=0 --master_addr="localhost" --master_port=29500 DDP_demo.py
# 机器2
python -m torch.distributed.launch --nproc_per_node=4 --nnodes=2 --node_rank=1 --master_addr="localhost" --master_port=29500 DDP_demo.py(待确认:Mac 系统下失败)
torchrun无需手动指定node_rank,只需在所有节点上指定相同的--rdzv_id(任务ID)和--rdzv_endpoint(主节点地址):1
2# 所有节点统一执行相同命令(不需要区分不同节点使用不同命令,会自动分配node_rank)
torchrun --nproc_per_node=4 --nnodes=2 --rdzv_id=123 --rdzv_backend=c10d --rdzv_endpoint="localhost:29500" DDP_demo.py--rdzv_id:任务唯一ID(任意整数)rdzv_backend:后端(默认c10d)rdzv_endpoint:主节点IP:端口torchrun还可以用--max_restarts等参数指定最大重启次数
(Mac 系统下成功)在 Mac 系统下,上面的命令执行会出现问题(上面命令在两个窗口分别打开)
1
2torchrun --nproc_per_node=4 --nnodes=2 --node_rank=1 --master_addr="localhost" --master_port=29500 DDP_test.py
torchrun --nproc_per_node=4 --nnodes=2 --node_rank=0 --master_addr="localhost" --master_port=29500 DDP_test.py- 亲测上面的代码可以成功
特别地,部分介绍文档会说
torch.distributed.launch和torchrun对代码的使用有一些不同,主要是初始化不同:初始化方式有下面两种:
1
2
3
4
5
6
7
8
9
10
11# 方式一:
rank = int(os.environ["RANK"])
world_size = int(os.environ["WORLD_SIZE"])
local_rank = int(os.environ["LOCAL_RANK"])
dist.init_process_group(backend='gloo', world_size=world_size, rank=rank)
# 方式二:
dist.init_process_group(backend='gloo')
rank = dist.get_rank()
world_size = dist.get_world_size()
local_rank = int(os.environ["LOCAL_RANK"])特别说明:部分文档介绍说
torchrun只能用方式二,torch.distributed.launch只能用方式一亲测:
torch.distributed.launch可用方式一和方式二初始化两种方式均可用,具体使用命令为:1
2
3
4
5# 机器1
python -m torch.distributed.launch --nproc_per_node=4 --nnodes=2 --node_rank=0 --master_addr="localhost" --master_port=29500 DDP_demo.py
# 机器2
python -m torch.distributed.launch --nproc_per_node=4 --nnodes=2 --node_rank=1 --master_addr="localhost" --master_port=29500 DDP_demo.pytorchrun可用方式二定义,在使用下面的命令时,方式一方式二均可:1
torchrun --nproc_per_node=4 --nnodes=2 --node_rank=0 --master_addr="localhost" --master_port=29500 DDP_test.py
使用建议:
- 优先使用
torchrun:它是 PyTorch 官方推荐的新一代工具,简化了分布式配置,支持弹性训练,且兼容未来的功能更新 - 避免使用
torch.distributed.launch:该工具已逐步被废弃,不再添加新功能,仅为兼容性保留
- 优先使用
附录:python -m torch.distributed.launch 命令具体在做什么?
- 本节讲述
python -m torch.distributed.launch,实际上torchrun是python -m torch.distributed.launch的升级版本,做的事情差不多,但还多了些功能
启动多个进程
根据
--nproc_per_node参数指定的数量,为每个 GPU 启动一个独立的进程- 例如,如果
--nproc_per_node=4,则会在当前节点启动 4 个进程,每个进程绑定到一个 GPU 上
- 例如,如果
每个进程会独立执行训练脚本(如
train.py),并通过dist.init_process_group初始化分布式环境dist.init_process_group会让当前进程于其他进程简历通信
代码中,通过下面的命令执行即可使用进程自己的 GPU(实现 GPU 的分配):
1
2
3
4import os
import torch
local_rank = int(os.environ["LOCAL_RANK"])
torch.cuda.set_device(local_rank)注:这也是在使用普通的 python 命令启动时,需要在代码里自己手动启动多进程的原因
设置环境变量(分布式环境所需的)
python -m torch.distributed.launch命令会设置分布式训练所需的环境变量(如WORLD_SIZE、RANK、LOCAL_RANK、MASTER_ADDR、MASTER_PORT等)- 每个进程的环境变量不同,
RANK、LOCAL_RANK等环境变量均是有差异的
代码内部在做什么?(非启动命令的工作)
- 代码内部通过
dist.init_process_group初始化分布式通信后端(如nccl或gloo),确保所有进程能够协同工作backend:通信后端(如nccl用于 GPU,gloo用于 CPU)init_method:初始化方法(如tcp://指定 master 地址和端口)
- 代码示例 :
1
dist.init_process_group(backend='nccl', init_method='env://')
附录:init_process_group 的 store 参数详细用法说明
- 在
torch.distributed.init_process_group()函数中,store参数是一个可选参数,用于指定分布式进程间通信所使用的键值存储后端 - 该参数允许用户显式创建和配置存储实例,而不是依赖默认的自动创建机制
store参数的作用:进程间协调,store参数指定的存储系统用于:- 存储分布式训练过程中的元数据
- 协调各个进程的初始化过程
- 实现进程间的同步和通信
- 管理进程组的状态信息
支持的 Store 类型
- PyTorch 分布式包支持三种主要的键值存储类型,这些 Store 的核心功能一致(同步元数据),但实现方式不同,选择时需根据分布式环境的网络、存储和调度方式决定
- 下面是最常见的三种 Store 的使用示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25import torch.distributed as dist
# 第一类:TCPStore
# # server store 持有数据,client store 可以连接到 server store(by TCP)访问数据,
store = dist.TCPStore(
host_name="localhost", # 服务器(master)地址,所有参与分布式训练的进程必须能通过此地址访问到 master
port=12345, # 需确保该端口在主机上未被占用,且所有进程可访问此端口
world_size=4, # 指明 store users 数,默认为 None(表示没有固定的 store users 数量),注:一般与分布式环境的 world_size保持一致
is_master=True, # 是否为master节点,只有主进程会启动 TCP 服务器,其他进程(非主进程)会作为客户端连接到主进程
timeout=300, # 超时时间(秒)
wait_for_workers=True, # (bool, optional), 默认为 True,是否等待所有 workers 连接到 store 服务器,只有当 world_size 为固定值时生效
)
# # 第二类:FileStore
# store = dist.FileStore("/tmp/distributed_store", world_size=4)
# # 第三类:HashStore(通常用于单机多进程)
# store = dist.HashStore()
# 使用自定义store初始化进程组
dist.init_process_group(
backend="nccl",
world_size=4,
rank=0,
store=store
)
使用 init_method 指定初始化 Store 类型
- 在 PyTorch 的
dist.init_process_group中,不指明store参数时,init_method参数决定了分布式进程初始化时使用的 Store 类型 - 不同的
init_method对应不同的 Store 实现,用于在进程间同步元数据(如进程编号、通信地址等) - 不同初始化方法如下:
init_method='tcp://master_ip:port':(对应TCPStore)- 基于 TCP 协议的集中式 Store,需要指定一个主节点(master)的 IP 和端口
- 主进程会在指定地址创建 TCP 监听,其他进程通过该地址连接主进程,完成元数据交换
- 适用于大多数分布式场景(单机多卡、多机多卡),无需依赖外部服务
init_method='file:///path/to/shared_file'(对应FileStore)- 基于共享文件系统的 Store,所有进程通过读写同一个共享文件同步元数据
- 要求所有进程可访问同一个共享文件系统(如 NFS、本地文件系统,单机多卡场景常用)
- 无需网络通信,但依赖文件系统的可靠性和性能
init_method='env://'(由环境变量指定的 Store(通常是TCPStore))- 不直接指定 Store 类型,而是通过环境变量(如
MASTER_ADDR、MASTER_PORT、WORLD_SIZE、RANK等)配置初始化信息 - 本质上仍会创建
TCPStore,但参数由环境变量而非函数参数传入 - 常用于容器化环境(如 Kubernetes)或需要动态配置的场景
- 不直接指定 Store 类型,而是通过环境变量(如
- 第三方分布式框架集成(如 Slurm、MPI)(对应
SlurmStore、MPIStore等(取决于框架))- 当使用 Slurm 或 MPI 启动分布式任务时,PyTorch 可自动检测并使用对应框架的 Store
- 例如,
init_method='slurm://'会使用SlurmStore,通过 Slurm 的环境变量和接口同步元数据,无需手动指定主节点
- 总结核心差异总结
init_method类型Store 实现 依赖条件 适用场景 tcp://master_ip:portTCPStore网络连通性 通用分布式场景(单机/多机) file:///pathFileStore共享文件系统 单机多卡或共享存储的多机场景 env://通常为 TCPStore环境变量配置 容器化、动态配置场景 第三方框架(如 Slurm) 框架专属 Store 对应调度框架环境 集群管理系统(Slurm/MPI)
使用 PrefixStore 进行多进程组管理
- 使用 PrefixStore 可以为不同的进程组创建隔离的命名空间 :
1
2
3
4
5
6
7
8
9# 创建基础store
base_store = dist.TCPStore("localhost", 12345, world_size, is_master=True)
# 为不同进程组创建前缀store
pg1_store = dist.PrefixStore("group1_", base_store)
pg2_store = dist.PrefixStore("group2_", base_store)
# 使用不同的store初始化不同的进程组
dist.init_process_group(backend="nccl", store=pg1_store, ...)
注意事项
当同时指定了环境变量和
store参数时,store参数会优先使用:1
2
3
4
5
6# 即使设置了环境变量,也会使用显式指定的store
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
custom_store = dist.TCPStore("192.168.1.100", 29500, world_size, is_master)
dist.init_process_group(backend="nccl", store=custom_store) # 使用custom_store和
init_method参数是互斥的,只能指定其中一个:1
2
3
4
5
6
7
8
9
10
11
12# 方法1:使用init_method
dist.init_process_group(
backend="nccl",
init_method="tcp://192.168.1.100:29500"
)
# 方法2:使用store参数
store = dist.TCPStore("192.168.1.100", 29500, world_size, is_master)
dist.init_process_group(
backend="nccl",
store=store
)可根据环境动态配置不同
store1
2
3
4
5
6
7
8
9
10
11
12def create_store(backend, world_size, rank):
if backend == "nccl":
# GPU训练使用TCPStore
return dist.TCPStore(
host_name=os.environ.get('MASTER_ADDR', 'localhost'),
port=int(os.environ.get('MASTER_PORT', '29500')),
world_size=world_size,
is_master=(rank == 0)
)
else:
# CPU训练可以使用FileStore
return dist.FileStore("/tmp/dist_store", world_size)
附录:init_process_group 的 device_id 参数详细用法说明
device_id参数用于将进程”绑定”到单个特定设备,从而实现后端特定优化,是一个torch.device类型的可选参数,主要用于 GPU 训练场景device_id参数在 NCCL 后端下有特殊效果:在 NCCL 后端下,device_id 参数有两个重要影响- 1)立即形成通信器 :通信器会立即形成(直接调用
ncclCommInit*而非延迟初始化) - 2)内存占用优化 :每个进程会在指定的 GPU 上占用显存,而不是在第一个可访问的 GPU 上
- If you want to know NCCL initialization error early, you can also use this field
- 1)立即形成通信器 :通信器会立即形成(直接调用
使用
gloo后端时,不需要设置device_id基本用法
1
2
3
4
5
6
7
8
9
10
11
12import torch
import torch.distributed as dist
# 指定当前进程使用的GPU设备
device_id = torch.device(f"cuda:{local_rank}")
dist.init_process_group(
backend="nccl",
world_size=world_size,
rank=rank,
device_id=device_id # 不指定 device_id 默认会在可访问的第一个 GPU 上占用内存?
)多 GPU 训练场景
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23import torch
import torch.distributed as dist
import os
def setup_distributed():
# 获取本地rank
local_rank = int(os.environ.get("LOCAL_RANK", 0))
world_size = int(os.environ.get("WORLD_SIZE", 1))
rank = int(os.environ.get("RANK", 0))
# 设置当前进程的GPU设备
torch.cuda.set_device(local_rank) # 根据不同 rank 获取不同的设备
device_id = torch.device(f"cuda:{local_rank}")
# 初始化进程组,绑定到特定GPU
dist.init_process_group(
backend="nccl",
world_size=world_size,
rank=rank,
device_id=device_id # 绑定到特定设备
)
return device_id
附录:init_process_group 中 NCCL 的延迟初始化
NCCL 是 NVIDIA 集合通信库(NVIDIA Collective Communications Library)
- 提供多 GPU / 多节点通信原语(如 all-reduce、broadcast、all-gather、reduce-scatter 等)
- 针对 PCIe/NVLink 与 NVIDIA 网络优化,用于加速深度学习分布式训练
- 开源地址:github.com/NVIDIA/nccl
延迟初始化(Lazy Initialization)是指 NCCL 通信器不在
init_process_group()调用时立即创建,而是推迟到 第一次实际需要进行集合通信操作时才创建- 注:若
init_process_group()函数指定了device_id参数,则 NCCL 会立即初始化到当前设备上
- 注:若
立即初始化(指定
device_id时)1
2
3
4
5
6
7
8
9device_id = torch.device(f"cuda:{local_rank}")
dist.init_process_group(backend="nccl", device_id=device_id)
# 在这一行执行时,NCCL立即:
# 1. 调用 ncclCommInit* 系列函数
# 2. 创建通信器对象
# 3. 分配通信缓冲区
# 4. 建立进程间的通信通道
# 5. 进行通信拓扑优化延迟初始化(不指定
device_id时)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19dist.init_process_group(backend="nccl") # 没有device_id
# 在这一行执行时,NCCL只是:
# 1. 记录进程组的元信息
# 2. 设置必要的环境变量
# 3. 但不创建实际的通信器
# 真正的初始化发生在第一次集合通信时:
tensor = torch.randn(10).cuda()
dist.all_reduce(tensor) # 在这里才真正初始化 NCCL 通信器!自动识别需要绑定的 GPU 并初始化,这里会很慢,因为要初始化 NCCL
dist.all_reduce(tensor) # 这里就很快了,这次不需要初始化 NCCL
# # 注:以下操作都会触发NCCL初始化:
# dist.all_reduce(tensor) # 全规约
# dist.all_gather([tensor]) # 全收集
# dist.reduce(tensor, dst=0) # 规约
# dist.broadcast(tensor, src=0) # 广播
# dist.all_to_all([tensor], [tensor]) # 全到全- 延迟初始化可以允许灵活、动态、自动地选择需要的 GPU,但可能会造成一些误解
- 注意:一旦 NCCL 初始化以后,就绑定了 GPU 了,再切换 GPU 可能会出现错误
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15def multi_device_lazy_init_issues():
dist.init_process_group(backend="nccl")
# 问题场景:进程需要使用多个GPU
tensor_gpu0 = torch.randn(10).cuda(0)
tensor_gpu1 = torch.randn(10).cuda(1)
# 第一次通信在GPU 0上
dist.all_reduce(tensor_gpu0) # NCCL在GPU 0上初始化
# 尝试在GPU 1上通信
try:
dist.all_reduce(tensor_gpu1) # 可能失败或性能差
except Exception as e:
print(f"Multi-device issue: {e}")
目前尚无代码可直接检测 NCCL 通信器是否已创建,只能通过第一次通信时间来大致判断
生产环境推荐立即初始化,方便代码阅读;开发时可以使用延迟初始化以获得更快的启动