PyTorch——分布式程序启动方式汇总


整体介绍

  • torch.distributed.dist.init_process_group 是 PyTorch 分布式 包中用于初始化进程组的核心函数,它在分布式训练中负责协调多个进程之间的通信
  • 本文重点讲解 torch.distributed.dist.init_process_group 函数的使用

dist.init_process_group 函数使用注意事项

  • 必须在所有进程中调用该函数,且参数需保持一致(除 rank 外)
  • 初始化后需调用 dist.destroy_process_group() 进行清理,否则在复杂的程序中,容易出现资源泄露问题
  • 实际使用中推荐通过 torch.distributed.launchtorchrun 工具启动,他们会自动设置环境变量
    • 使用 torch.multiprocessing.spawn 启动则需要自己管理参数或环境变量
  • 不同后端有不同的适用场景:GPU 集群优先用 nccl,CPU 集群用 gloo(Mac 上实验使用 gloo
  • 确保所有进程能够访问到 init_method 指定的地址或文件
  • 初始化时的 IP 问题,有两个特点:
    • MASTER_ADDR 必须是 rank=0 的机器所在的 IP(torchrun --node_rank=0的机器所在的 IP),该进程负责作为 master 完成交流和初始化操作
    • dist.init_process_group 执行后所有进程的地位是等价的,都可以作为 master (虚拟的含义:处理主要事务)
      • 比如,dist.broadcast(tensor, src) 中的 src 可以被指定为任意值
      • 亲测,在单机启动后,想用哪个进程作为 master(处理主要事务)都可以
    • 一般建议使用 rank=0 的进程作为 master 处理主要事务

函数原型及其参数讲解

  • dist.init_process_group 函数原型
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    torch.distributed.init_process_group(
    backend: Optional[str] = None,
    init_method: Optional[str] = None,
    timeout: Optional[timedelta] = None,
    world_size: int = -1,
    rank: int = -1,
    store: Optional[Store] = None,
    group_name: str = "",
    pg_options: Optional[Any] = None,
    device_id: Optional[torch.device] = None,
    )

dist.init_process_group 主要参数说明

  • 1)backend(必填)

    • 指定通信后端,决定了进程间通信使用的底层协议
    • 可选值:'nccl'(推荐 GPU 通信)、'gloo'(CPU 和 GPU 均可)、'mpi'(需 MPI 库支持)
    • 注意:'nccl' 仅支持 GPU 且性能最优,'gloo' 对 CPU 支持更好
  • 2)init_method(可选)

    • 指定进程组的初始化方式,常用方式如下面
    • 'env://'(推荐):从环境变量读取配置(需设置 MASTER_ADDRMASTER_PORT 等)
    • 'tcp://ip:port':指定主节点的 IP 和端口
    • 'file:///path':通过共享文件系统初始化(需所有进程可访问该路径)
    • init_method 参数的默认值为 None,当使用默认值时,其行为取决于是否设置了环境变量 TORCH_DISTRIBUTED_INIT_METHOD
      • 如果设置了 TORCH_DISTRIBUTED_INIT_METHOD 环境变量
        • 函数会自动使用该环境变量的值作为初始化方法(等价于显式传入 init_method=环境变量值
        • 例如,若环境变量设置为 tcp://127.0.0.1:23456,则会以该 TCP 地址进行初始化
      • 如果未设置 TORCH_DISTRIBUTED_INIT_METHOD 环境变量
        • 此时会触发 默认初始化逻辑 ,函数会尝试从环境变量中读取分布式配置(等价于 init_method='env://'
        • 此时要求必须设置以下环境变量才能正常初始化:
          • MASTER_ADDR:主节点的 IP 地址
          • MASTER_PORT:主节点的端口号(需所有进程可访问)
          • WORLD_SIZE:总进程数(可选,部分启动工具会自动设置)
          • RANK:当前进程的全局编号(0 为主进程,可选,部分启动工具会自动设置)
        • 如果这些环境变量未正确设置,会抛出类似 RuntimeError: Expected env:// init method but no MASTER_ADDR or MASTER_PORT found 的错误
          • 但无需担心:使用 torchrun(或 python -m torch.distributed.launch)启动时,若不指定 --master_addr--master_port,则 torchrun 会默认 环境变量设置为 MASTER_ADDR:MASTER_PORT=127.0.0.1:29500
  • 3)world_size(可选)

    • 总进程数,即参与分布式训练的进程总数
    • 使用 'env://' 时可由环境变量 WORLD_SIZE 指定
  • 4)rank(可选)

    • 当前进程的编号(0 到 world_size-1),0 通常为主进程
    • 使用 'env://' 时可由环境变量 RANK 指定
  • 5)timeout

    • 通信超时时间,默认为 30 分钟(1800 秒)
    • 对于耗时较长的操作可适当增大
  • 6)store

    • init_method 参数互斥,不指明 store 时,由 init_method 参数传入的方式决定初始化什么类型的 Store
    • init_method
    • 详情见附录
  • 7)pg_options

    • pg_options 参数用于配置进程组的特定选项,它是一个可选参数,允许用户为不同的后端指定特定的配置选项
    • 该参数的类型通常是Optional[Any],具体的选项内容取决于所使用的通信后端,主要用于:
      • 配置后端特定的优化参数
      • 设置通信超时时间
      • 调整内存使用策略
      • 配置网络相关参数等
    • 举例:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      import torch.distributed as dist

      # 示例:为NCCL后端配置特定选项
      pg_options = {
      'timeout': 1800, # 30分钟超时
      'init_method_timeout': 300, # 初始化超时
      }

      dist.init_process_group(
      backend="nccl",
      world_size=4,
      rank=0,
      pg_options=pg_options
      )
  • 8)device_id

    • device_id 参数用于将进程”绑定”到单个特定设备,从而实现后端特定优化,是一个 torch.device 类型的可选参数,主要用于 GPU 训练场景
    • NCCL 后端的特殊效果:在 NCCL 后端下,device_id 参数有两个重要影响
      • 1)立即形成通信器 :通信器会立即形成(直接调用ncclCommInit*而非延迟初始化)
      • 2)内存占用优化 :每个进程会在指定的GPU上占用显存,而不是在第一个可访问的GPU上[citation:7]
    • 示例:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      import torch
      import torch.distributed as dist

      # 指定当前进程使用的GPU设备
      device_id = torch.device(f"cuda:{local_rank}")

      dist.init_process_group(
      backend="nccl",
      world_size=world_size,
      rank=rank,
      device_id=device_id
      )

示例:单节点多进程(使用 torch.multiprocessing

  • 下面的代码启动单节点时,无需配置环境变量,也不需要特殊的启动命令,使用 python 命令即可
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    import torch
    import torch.distributed as dist
    import torch.multiprocessing as mp

    def init_process(rank, world_size):
    # 设置环境变量
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # 初始化进程组
    dist.init_process_group(
    backend='nccl', # 使用NCCL后端(GPU)
    init_method='env://',
    world_size=world_size,
    rank=rank
    )

    # 后续分布式操作...
    print(f"Process {rank} initialized")

    # 销毁进程组
    dist.destroy_process_group()

    if __name__ == "__main__":
    world_size = 4 # 4个进程
    mp.spawn(init_process, args=(world_size,), nprocs=world_size, join=True)

示例:多节点分布式(通过环境变量配置)

  • 在每个节点上运行的脚本中:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    import torch.distributed as dist
    import os

    # 环境变量通常由分布式启动工具设置
    # 如 torch.distributed.launch 或 torchrun
    dist.init_process_group(backend='nccl')

    # 可通过以下方式获取当前进程信息
    rank = dist.get_rank()
    world_size = dist.get_world_size()
    local_rank = int(os.environ.get('LOCAL_RANK', 0)) # 节点内的进程编号

    print(f"Rank {rank}/{world_size}, Local rank {local_rank}")
  • 上面的代码运行多节点时需要提前指定环境变量(不同机器不同),或通过 torchrun 等命令启动


使用时的一些常规规范写法

  • 分布式启动后,常用到下面三个参数:

    1
    2
    3
    rank: 当前进程在的全局进程编号
    word_size: 分布式系统总进程数
    local_rank: 当前进程在当前节点上的本地进程编号
  • (标准用法)当使用 torchrun 命令(或 torch.distributed.launch 命令时):

    • 可通过环境变量获取这三个参数(此时是默认设置的)

      1
      2
      3
      rank = int(os.environ["RANK"])
      world_size = int(os.environ["WORLD_SIZE"])
      local_rank = int(os.environ["LOCAL_RANK"])
    • 此时启动环境,可以仅指定后端即可

      1
      dist.init_process_group(backend='nccl')
    • 启动命令需要在不同的机器上执行以下,且在命令中传入当前机器对应的参数(自动转化成环境变量),比如:

      1
      2
      3
      4
      5
      # 机器1
      python -m torch.distributed.launch --nproc_per_node=4 --nnodes=2 --node_rank=0 --master_addr="localhost" --master_port=29500 DDP_demo.py

      # 机器2
      python -m torch.distributed.launch --nproc_per_node=4 --nnodes=2 --node_rank=1 --master_addr="localhost" --master_port=29500 DDP_demo.py
      • 易错点,要非常小心:
        • python -m torch.distributed.launch(或 torchrun) 的启动参数必须在脚本名 DDP_demo.py 之前
        • DDP_demo.py 之后的参数都是传递给 DDP_demo.py 的,不再是 python -m torch.distributed.launch(或 torchrun)的参数
        • 不建议使用环境变量的方式定义 --master_port 等参数(包括 MASTER_PORT=12368 torchrun xx.py 等方式也不建议)
          • 因为 torchrun 会自动覆盖环境变量 MASTER_PORT(即使没有显示传入 --master_port 参数也会用默认值 27500 覆盖)
  • (使用较少)当使用 torch.multiprocessing.spawn 启动多进程时,需要自己主动管理环境变量,或通过参数传入

  • 无论使用哪种方式使用,均可使用下面的代码获取参数

    1
    2
    3
    rank = dist.get_rank() 
    world_size = dist.get_world_size()
    local_rank = int(os.environ["LOCAL_RANK"]) # 若未配置则需要使用参数显示传入

通过 torch.multiprocessing.spawn 启动完整示例

  • 在代码里面使用 torch.multiprocessing.spawn 函数启动多进程(不使用任何环境变量,整体管理较为复杂)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    import torch.distributed as dist
    import torch.multiprocessing as mp

    def train_fn(local_rank, args, world_size):
    rank = args.start_rank + local_rank # 全局进程编号 = 起始rank + 本地进程编号
    print(rank) # 输出全局进程编号
    # 初始化分布式进程组
    dist.init_process_group(
    backend="nccl",
    init_method=f"tcp://{args.master_addr}:{args.master_port}", # 多机多卡需要指定服务器地址,不能写死成 local
    world_size=world_size,
    rank=rank, # 全局进程编号 = 起始rank + 本地进程编号
    )
    # 绑定本地GPU
    torch.cuda.set_device(local_rank)
    device = torch.device("cuda", local_rank)
    # ...

    # 对于多机多卡,为了复用同一套代码(方便管理),使用传入参数的方式启动代码,方便传参实现不同 node 机器的启动
    parser = argparse.ArgumentParser()
    parser.add_argument("--master_addr", type=str, default="localhost", help="主节点IP")
    parser.add_argument("--master_port", type=str, default="12355", help="主节点端口")
    parser.add_argument("--world_size", type=int, required=True, help="总进程数")
    parser.add_argument("--start_rank", type=int, required=True, help="当前机器进程的起始全局rank")
    parser.add_argument("--num_gpus", type=int, default=torch.cuda.device_count(), help="当前机器的GPU数量")
    args = parser.parse_args()
    print(args)
    # 启动当前机器的所有进程(num_gpus个),每台机器启动自己的进程数即可
    mp.spawn(
    train_fn,
    args=(args, args.world_size), # 传递给train_fn的参数
    nprocs=args.num_gpus, # 进程数=当前机器的GPU数
    join=True # 等待所有子进程完成
    )
  • 此时启动命令可以仅使用普通的 python 命令(后面的参数可选)

    1
    python demo.py --xxx xx
    • 每个节点通过传入不同参数即可指定分布式进程数量,允许不同节点不同数量(start_rank 数和启动时的 num_gpus 参数控制)

通过 torch.distributed.launch 命令启动

  • 假设脚本名为ddp_demo.py

  • 单机4卡,使用4个GPU启动:

    1
    python -m torch.distributed.launch --nproc_per_node=4 ddp_demo.py
    • --nproc_per_node:指定单机的GPU数量
  • 多机多卡,2个机器,各有4个GPU启动:

    1
    2
    3
    4
    5
    # 机器1
    python -m torch.distributed.launch --nproc_per_node=4 --nnodes=2 --node_rank=1 --master_addr="localhost" --master_port=29500 DDP_demo.py

    # 机器2
    python -m torch.distributed.launch --nproc_per_node=4 --nnodes=2 --node_rank=0 --master_addr="localhost" --master_port=29500 DDP_demo.py
    • --nnodes:总节点数
    • --node_rank:当前节点编号
    • --master_addr:指定master节点的IP地址,默认值是”localhost”,但是显示给出来更明确
    • --master_port:指定master节点的端口号,默认值是 29500,但是显示给出来更明确

通过 torchrun 命令启动

  • 以下结论是在 Mac 系统下得到的,暂时没有在 Linux 上尝试

  • torchruntorch.distributed.launch 都是 PyTorch 中用于启动分布式训练的工具

  • torchrun 是 PyTorch 1.10 后推出的新一代工具,旨在替代 torch.distributed.launch

  • 两者的核心区别:

    特性 torch.distributed.launch torchrun
    推出时间 较早版本(已逐步废弃) PyTorch 1.10+ 推出(推荐使用)
    进程管理 依赖用户手动管理进程(如指定 --node_rank 等) 自动管理进程,支持弹性训练(节点故障后自动恢复)
    配置方式 大部分参数需通过命令行传入 支持从环境变量、命令行、配置文件读取参数
    容错能力 无弹性训练支持,进程崩溃后需手动重启 支持弹性训练(--max_restarts 等参数)
    日志管理 日志输出较为基础 提供更规范的日志管理,区分不同进程的输出
  • torchrun 启动单机多卡(与 torch.distributed.launch 相同):

    1
    2
    3
    4
    5
    # 回顾 `torch.distributed.launch` 的启动方式
    python -m torch.distributed.launch --nproc_per_node=4 ddp_demo.py

    # torchrun 的启动方式:替换 `python -m torch.distributed.launch` 为 `torchrun` 即可
    torchrun --nproc_per_node=4 ddp_demo.py
  • torchrun 启动多机多卡(相对 torch.distributed.launch而言,torchrun 会自动推断部分参数(如节点数、进程数))

    • 回顾 torch.distributed.launch 需要手动指定总节点数(--nnodes)和当前节点序号(--node_rank

      1
      2
      3
      4
      5
      # 机器1
      python -m torch.distributed.launch --nproc_per_node=4 --nnodes=2 --node_rank=0 --master_addr="localhost" --master_port=29500 DDP_demo.py

      # 机器2
      python -m torch.distributed.launch --nproc_per_node=4 --nnodes=2 --node_rank=1 --master_addr="localhost" --master_port=29500 DDP_demo.py
    • (待确认:Mac 系统下失败)torchrun 无需手动指定 node_rank,只需在所有节点上指定相同的 --rdzv_id(任务ID)和 --rdzv_endpoint(主节点地址):

      1
      2
      # 所有节点统一执行相同命令(不需要区分不同节点使用不同命令,会自动分配node_rank)
      torchrun --nproc_per_node=4 --nnodes=2 --rdzv_id=123 --rdzv_backend=c10d --rdzv_endpoint="localhost:29500" DDP_demo.py
      • --rdzv_id:任务唯一ID(任意整数)
      • rdzv_backend:后端(默认c10d)
      • rdzv_endpoint:主节点IP:端口
      • torchrun还可以用 --max_restarts 等参数指定最大重启次数
    • (Mac 系统下成功)在 Mac 系统下,上面的命令执行会出现问题(上面命令在两个窗口分别打开)

      1
      2
      torchrun --nproc_per_node=4 --nnodes=2 --node_rank=1 --master_addr="localhost" --master_port=29500 DDP_test.py
      torchrun --nproc_per_node=4 --nnodes=2 --node_rank=0 --master_addr="localhost" --master_port=29500 DDP_test.py
      • 亲测上面的代码可以成功
  • 特别地,部分介绍文档会说 torch.distributed.launchtorchrun 对代码的使用有一些不同,主要是初始化不同:

    • 初始化方式有下面两种:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      # 方式一:
      rank = int(os.environ["RANK"])
      world_size = int(os.environ["WORLD_SIZE"])
      local_rank = int(os.environ["LOCAL_RANK"])
      dist.init_process_group(backend='gloo', world_size=world_size, rank=rank)

      # 方式二:
      dist.init_process_group(backend='gloo')
      rank = dist.get_rank()
      world_size = dist.get_world_size()
      local_rank = int(os.environ["LOCAL_RANK"])
    • 特别说明:部分文档介绍说 torchrun 只能用方式二, torch.distributed.launch 只能用方式一

    • 亲测:

      • torch.distributed.launch 可用方式一和方式二初始化两种方式均可用,具体使用命令为:

        1
        2
        3
        4
        5
        # 机器1
        python -m torch.distributed.launch --nproc_per_node=4 --nnodes=2 --node_rank=0 --master_addr="localhost" --master_port=29500 DDP_demo.py

        # 机器2
        python -m torch.distributed.launch --nproc_per_node=4 --nnodes=2 --node_rank=1 --master_addr="localhost" --master_port=29500 DDP_demo.py
      • torchrun 可用方式二定义,在使用下面的命令时,方式一方式二均可:

        1
        torchrun --nproc_per_node=4 --nnodes=2 --node_rank=0 --master_addr="localhost" --master_port=29500 DDP_test.py
  • 使用建议:

    • 优先使用 torchrun :它是 PyTorch 官方推荐的新一代工具,简化了分布式配置,支持弹性训练,且兼容未来的功能更新
    • 避免使用 torch.distributed.launch :该工具已逐步被废弃,不再添加新功能,仅为兼容性保留

附录:python -m torch.distributed.launch 命令具体在做什么?

  • 本节讲述 python -m torch.distributed.launch,实际上 torchrunpython -m torch.distributed.launch 的升级版本,做的事情差不多,但还多了些功能

启动多个进程

  • 根据 --nproc_per_node 参数指定的数量,为每个 GPU 启动一个独立的进程

    • 例如,如果 --nproc_per_node=4,则会在当前节点启动 4 个进程,每个进程绑定到一个 GPU 上
  • 每个进程会独立执行训练脚本(如 train.py),并通过 dist.init_process_group 初始化分布式环境

    • dist.init_process_group 会让当前进程于其他进程简历通信
  • 代码中,通过下面的命令执行即可使用进程自己的 GPU(实现 GPU 的分配):

    1
    2
    3
    4
    import os
    import torch
    local_rank = int(os.environ["LOCAL_RANK"])
    torch.cuda.set_device(local_rank)
  • 注:这也是在使用普通的 python 命令启动时,需要在代码里自己手动启动多进程的原因

设置环境变量(分布式环境所需的)

  • python -m torch.distributed.launch 命令会设置分布式训练所需的环境变量(如 WORLD_SIZERANKLOCAL_RANKMASTER_ADDRMASTER_PORT 等)
  • 每个进程的环境变量不同,RANKLOCAL_RANK 等环境变量均是有差异的

代码内部在做什么?(非启动命令的工作)

  • 代码内部通过 dist.init_process_group 初始化分布式通信后端(如 ncclgloo),确保所有进程能够协同工作
    • backend:通信后端(如 nccl 用于 GPU,gloo 用于 CPU)
    • init_method:初始化方法(如 tcp:// 指定 master 地址和端口)
  • 代码示例
    1
    dist.init_process_group(backend='nccl', init_method='env://')

附录:init_process_group 的 store 参数详细用法说明

  • torch.distributed.init_process_group() 函数中,store 参数是一个可选参数,用于指定分布式进程间通信所使用的键值存储后端
  • 该参数允许用户显式创建和配置存储实例,而不是依赖默认的自动创建机制
  • store 参数的作用:进程间协调,store 参数指定的存储系统用于:
    • 存储分布式训练过程中的元数据
    • 协调各个进程的初始化过程
    • 实现进程间的同步和通信
    • 管理进程组的状态信息

支持的 Store 类型

  • PyTorch 分布式包支持三种主要的键值存储类型,这些 Store 的核心功能一致(同步元数据),但实现方式不同,选择时需根据分布式环境的网络、存储和调度方式决定
  • 下面是最常见的三种 Store 的使用示例
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    import torch.distributed as dist

    # 第一类:TCPStore
    # # server store 持有数据,client store 可以连接到 server store(by TCP)访问数据,
    store = dist.TCPStore(
    host_name="localhost", # 服务器(master)地址,所有参与分布式训练的进程必须能通过此地址访问到 master
    port=12345, # 需确保该端口在主机上未被占用,且所有进程可访问此端口
    world_size=4, # 指明 store users 数,默认为 None(表示没有固定的 store users 数量),注:一般与分布式环境的 world_size保持一致
    is_master=True, # 是否为master节点,只有主进程会启动 TCP 服务器,其他进程(非主进程)会作为客户端连接到主进程
    timeout=300, # 超时时间(秒)
    wait_for_workers=True, # (bool, optional), 默认为 True,是否等待所有 workers 连接到 store 服务器,只有当 world_size 为固定值时生效
    )
    # # 第二类:FileStore
    # store = dist.FileStore("/tmp/distributed_store", world_size=4)

    # # 第三类:HashStore(通常用于单机多进程)
    # store = dist.HashStore()

    # 使用自定义store初始化进程组
    dist.init_process_group(
    backend="nccl",
    world_size=4,
    rank=0,
    store=store
    )

使用 init_method 指定初始化 Store 类型

  • 在 PyTorch 的 dist.init_process_group 中,不指明 store 参数时,init_method 参数决定了分布式进程初始化时使用的 Store 类型
  • 不同的 init_method 对应不同的 Store 实现,用于在进程间同步元数据(如进程编号、通信地址等)
  • 不同初始化方法如下:
    • init_method='tcp://master_ip:port':(对应 TCPStore
      • 基于 TCP 协议的集中式 Store,需要指定一个主节点(master)的 IP 和端口
      • 主进程会在指定地址创建 TCP 监听,其他进程通过该地址连接主进程,完成元数据交换
      • 适用于大多数分布式场景(单机多卡、多机多卡),无需依赖外部服务
    • init_method='file:///path/to/shared_file'(对应 FileStore
      • 基于共享文件系统的 Store,所有进程通过读写同一个共享文件同步元数据
      • 要求所有进程可访问同一个共享文件系统(如 NFS、本地文件系统,单机多卡场景常用)
      • 无需网络通信,但依赖文件系统的可靠性和性能
    • init_method='env://'(由环境变量指定的 Store(通常是 TCPStore))
      • 不直接指定 Store 类型,而是通过环境变量(如 MASTER_ADDRMASTER_PORTWORLD_SIZERANK 等)配置初始化信息
      • 本质上仍会创建 TCPStore,但参数由环境变量而非函数参数传入
      • 常用于容器化环境(如 Kubernetes)或需要动态配置的场景
    • 第三方分布式框架集成(如 Slurm、MPI)(对应 SlurmStoreMPIStore 等(取决于框架))
      • 当使用 Slurm 或 MPI 启动分布式任务时,PyTorch 可自动检测并使用对应框架的 Store
      • 例如,init_method='slurm://' 会使用 SlurmStore,通过 Slurm 的环境变量和接口同步元数据,无需手动指定主节点
  • 总结核心差异总结
    init_method 类型 Store 实现 依赖条件 适用场景
    tcp://master_ip:port TCPStore 网络连通性 通用分布式场景(单机/多机)
    file:///path FileStore 共享文件系统 单机多卡或共享存储的多机场景
    env:// 通常为 TCPStore 环境变量配置 容器化、动态配置场景
    第三方框架(如 Slurm) 框架专属 Store 对应调度框架环境 集群管理系统(Slurm/MPI)

使用 PrefixStore 进行多进程组管理

  • 使用 PrefixStore 可以为不同的进程组创建隔离的命名空间
    1
    2
    3
    4
    5
    6
    7
    8
    9
    # 创建基础store
    base_store = dist.TCPStore("localhost", 12345, world_size, is_master=True)

    # 为不同进程组创建前缀store
    pg1_store = dist.PrefixStore("group1_", base_store)
    pg2_store = dist.PrefixStore("group2_", base_store)

    # 使用不同的store初始化不同的进程组
    dist.init_process_group(backend="nccl", store=pg1_store, ...)

注意事项

  • 当同时指定了环境变量和 store 参数时,store 参数会优先使用:

    1
    2
    3
    4
    5
    6
    # 即使设置了环境变量,也会使用显式指定的store
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    custom_store = dist.TCPStore("192.168.1.100", 29500, world_size, is_master)
    dist.init_process_group(backend="nccl", store=custom_store) # 使用custom_store
  • init_method 参数是互斥的,只能指定其中一个:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    # 方法1:使用init_method
    dist.init_process_group(
    backend="nccl",
    init_method="tcp://192.168.1.100:29500"
    )

    # 方法2:使用store参数
    store = dist.TCPStore("192.168.1.100", 29500, world_size, is_master)
    dist.init_process_group(
    backend="nccl",
    store=store
    )
  • 可根据环境动态配置不同 store

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    def create_store(backend, world_size, rank):
    if backend == "nccl":
    # GPU训练使用TCPStore
    return dist.TCPStore(
    host_name=os.environ.get('MASTER_ADDR', 'localhost'),
    port=int(os.environ.get('MASTER_PORT', '29500')),
    world_size=world_size,
    is_master=(rank == 0)
    )
    else:
    # CPU训练可以使用FileStore
    return dist.FileStore("/tmp/dist_store", world_size)

附录:init_process_group 的 device_id 参数详细用法说明

  • device_id 参数用于将进程”绑定”到单个特定设备,从而实现后端特定优化,是一个 torch.device 类型的可选参数,主要用于 GPU 训练场景

  • device_id 参数在 NCCL 后端下有特殊效果:在 NCCL 后端下,device_id 参数有两个重要影响

    • 1)立即形成通信器 :通信器会立即形成(直接调用ncclCommInit*而非延迟初始化)
    • 2)内存占用优化 :每个进程会在指定的 GPU 上占用显存,而不是在第一个可访问的 GPU 上
    • If you want to know NCCL initialization error early, you can also use this field
  • 使用 gloo 后端时,不需要设置 device_id

  • 基本用法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    import torch
    import torch.distributed as dist

    # 指定当前进程使用的GPU设备
    device_id = torch.device(f"cuda:{local_rank}")

    dist.init_process_group(
    backend="nccl",
    world_size=world_size,
    rank=rank,
    device_id=device_id # 不指定 device_id 默认会在可访问的第一个 GPU 上占用内存?
    )
  • 多 GPU 训练场景

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    import torch
    import torch.distributed as dist
    import os

    def setup_distributed():
    # 获取本地rank
    local_rank = int(os.environ.get("LOCAL_RANK", 0))
    world_size = int(os.environ.get("WORLD_SIZE", 1))
    rank = int(os.environ.get("RANK", 0))

    # 设置当前进程的GPU设备
    torch.cuda.set_device(local_rank) # 根据不同 rank 获取不同的设备
    device_id = torch.device(f"cuda:{local_rank}")

    # 初始化进程组,绑定到特定GPU
    dist.init_process_group(
    backend="nccl",
    world_size=world_size,
    rank=rank,
    device_id=device_id # 绑定到特定设备
    )

    return device_id

附录:init_process_group 中 NCCL 的延迟初始化

  • NCCL 是 NVIDIA 集合通信库(NVIDIA Collective Communications Library)

    • 提供多 GPU / 多节点通信原语(如 all-reduce、broadcast、all-gather、reduce-scatter 等)
    • 针对 PCIe/NVLink 与 NVIDIA 网络优化,用于加速深度学习分布式训练
    • 开源地址:github.com/NVIDIA/nccl
  • 延迟初始化(Lazy Initialization)是指 NCCL 通信器不在 init_process_group() 调用时立即创建,而是推迟到 第一次实际需要进行集合通信操作时才创建

    • 注:若 init_process_group() 函数指定了 device_id 参数,则 NCCL 会立即初始化到当前设备上
  • 立即初始化(指定 device_id 时)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    device_id = torch.device(f"cuda:{local_rank}")
    dist.init_process_group(backend="nccl", device_id=device_id)

    # 在这一行执行时,NCCL立即:
    # 1. 调用 ncclCommInit* 系列函数
    # 2. 创建通信器对象
    # 3. 分配通信缓冲区
    # 4. 建立进程间的通信通道
    # 5. 进行通信拓扑优化
  • 延迟初始化(不指定 device_id 时)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    dist.init_process_group(backend="nccl")  # 没有device_id

    # 在这一行执行时,NCCL只是:
    # 1. 记录进程组的元信息
    # 2. 设置必要的环境变量
    # 3. 但不创建实际的通信器

    # 真正的初始化发生在第一次集合通信时:
    tensor = torch.randn(10).cuda()
    dist.all_reduce(tensor) # 在这里才真正初始化 NCCL 通信器!自动识别需要绑定的 GPU 并初始化,这里会很慢,因为要初始化 NCCL

    dist.all_reduce(tensor) # 这里就很快了,这次不需要初始化 NCCL

    # # 注:以下操作都会触发NCCL初始化:
    # dist.all_reduce(tensor) # 全规约
    # dist.all_gather([tensor]) # 全收集
    # dist.reduce(tensor, dst=0) # 规约
    # dist.broadcast(tensor, src=0) # 广播
    # dist.all_to_all([tensor], [tensor]) # 全到全
    • 延迟初始化可以允许灵活、动态、自动地选择需要的 GPU,但可能会造成一些误解
    • 注意:一旦 NCCL 初始化以后,就绑定了 GPU 了,再切换 GPU 可能会出现错误
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      def multi_device_lazy_init_issues():
      dist.init_process_group(backend="nccl")

      # 问题场景:进程需要使用多个GPU
      tensor_gpu0 = torch.randn(10).cuda(0)
      tensor_gpu1 = torch.randn(10).cuda(1)

      # 第一次通信在GPU 0上
      dist.all_reduce(tensor_gpu0) # NCCL在GPU 0上初始化

      # 尝试在GPU 1上通信
      try:
      dist.all_reduce(tensor_gpu1) # 可能失败或性能差
      except Exception as e:
      print(f"Multi-device issue: {e}")
  • 目前尚无代码可直接检测 NCCL 通信器是否已创建,只能通过第一次通信时间来大致判断

  • 生产环境推荐立即初始化,方便代码阅读;开发时可以使用延迟初始化以获得更快的启动