PyTorch——分布式编程框架总结


整体说明

  • PyTorch 原生支持了 DP(DataParallel)和DDP(DistributedDataParallel)是常用的数据并行分布式训练工具
  • PyTorch 原生还支持了 FSDP 作为模型并行的分布式训练工具,通过分片模型参数、梯度和优化器状态到多个 GPU,显著降低单卡内存占用
  • HuggingFace Accelerate 是一个轻量级库,专为简化 PyTorch 模型在各种硬件配置上的训练和推理而设计,支持选择 DeepSpeed 和 FSDP 等
  • HuggingFace 的 Trainer 是 transformers 库中一个核心且功能强大的类,它为 PyTorch 模型提供了完整的训练和评估循环,极大地简化了训练过程,让用户可以专注于模型、数据集和训练参数的配置,而无需手动编写复杂的训练代码
    • Trainer 比 Accelerate 更高一级,把循环等也封装了,进需要用户配置参数数据集等即可
  • 其他相关的分布式封装框架有 Horovod、Ray 等
    • Horovod 是 Uber 开源的跨平台的分布式训练工具,名字来自于俄国传统民间舞蹈,舞者手牵手围成一个圈跳舞,与 Horovod 设备之间的通信模式很像
    • Ray 是更高层级的分布式训练框架(利用其他框架),目标是融合数据处理、模型训练、超参数调优和模型服务等各个阶段
  • 注意:篇幅有限,本文主要是记录一些简单的使用示例和说明,更详尽的使用细节需要去官网查看
  • PyTorch 分布式系统的启动方式见:/Notes/PyTorch/PyTorch——分布式程序启动方式汇总

DataParallel(DP) 使用示例

  • DP 是单进程多线程模式,简单易用,适合单机多 GPU 场景
  • DP 的每次前向过程都会进行一次从 GPU 间的参数复制(效率较慢)
  • 参考博客(有比较清晰的图片):Training Neural Nets on Larger Batches: Practical Tips for 1-GPU, Multi-GPU & Distributed setups
  • DP 的使用示例如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    import torch
    import torch.nn as nn
    from torch.utils.data import DataLoader, Dataset

    class DiyModel(nn.Module):
    def __init__(self):
    super().__init__()
    self.fc = nn.Linear(10, 2)
    def forward(self, x):
    return self.fc(x)

    class DiyDataset(Dataset):
    def __len__(self):
    return 1000
    def __getitem__(self, idx):
    return torch.randn(10), torch.randint(0, 2, (1,)).item()

    # 第一步:数据准备(与常规方式一致)
    dataset = DiyDataset()
    dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

    # 第二步:初始化模型、损失函数、优化器
    model = DiyModel()
    # 将模型放到DP中(自动分发到多个GPU),核心步骤,相对普通训练方式,仅需要修改这里
    model = nn.DataParallel(model) # 注意:DP 仅增加这一步即可
    model = model.cuda() # 或 .to('cuda')

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01)

    # 第三步:训练循环
    for epoch in range(3):
    for inputs, labels in dataloader:
    inputs = inputs.cuda()
    labels = labels.cuda()

    # 注:model 是 DP封装过的模型,所以能执行 DP 的个性化操作
    # 在这里 model(inputs) 会执行四个流程:
    # * 数据分发
    # * 模型复制(主线程GPU到其他线程)
    # * 并行前向推理
    # * 输出汇总四个流程
    outputs = model(inputs)
    loss = criterion(outputs, labels)

    optimizer.zero_grad()

    # 注:loss 本质是从 Model 出来的(loss 的梯度是传到 outputs 上再进一步计算的,而 outputs 是模型的输出,执行梯度计算时能考虑分布式),所以也能执行 DP 的个性化操作
    # 这里 loss.backward 执行四个流程:
    # * 各GPU损失梯度计算:根据各GPU的outputs与总output的关系来计算各自的损失梯度(不同GPU不一样)
    # * 损失梯度分发(将各自的梯度分发到各自 GPU 上
    # * 并行后向推理(计算 GPU 自身的局部梯度)
    # * 梯度汇总
    loss.backward()
    optimizer.step() # 仅更新主线程GPU上的模型
    print(f"Epoch {epoch}, Loss: {loss.item()}")

nn.DataParallel(model) 发生了什么?

  • TLDR:model = nn.DataParallel(model) 的核心作用是通过包装模型实现多 GPU 数据并行
    • 执行这一行后,模型对象已经变了,需要通过 model.module.fc 才能访问原模型属性(访问 model.fc 会出错)
    • DP 中,仅需要这一步即可实现数据并行 ,后续的代码都无需修改,自动适配了;但代码后面做了很多数据分发和梯度合并的工作
    • 这一行后,程序会自动管理设备分配、模型复制、数据拆分与合并、梯度汇总与参数同步
  • 1)初始化:确定并行设备与包装模型,nn.DataParallel 实例化时会完成以下核心操作:
    • 检测可用GPU :默认情况下,DataParallel 会自动检测当前可见的GPU(通过 CUDA_VISIBLE_DEVICES 环境变量控制),并将其ID列表存储在 device_ids 属性中(默认值为 range(torch.cuda.device_count())
    • 指定主GPU :主GPU(device[0])是默认的“主导设备”,负责汇总计算结果、更新参数,并作为数据/模型的初始落脚点。若未指定 device_ids,则第一个可见GPU(通常是 cuda:0)会被设为主GPU
    • 包装原始模型 :原始模型会被存入 DataParallelmodule 属性中(这也是后续访问原模型属性需通过 model.module 的原因),同时 DataParallel 会接管模型的 forwardbackward 逻辑
  • 2)模型的移动与复制:DataParallel 会自动处理模型在设备间的分布:
    • 主GPU加载模型 :若原始模型在CPU上,DataParallel 会先将其移动到主GPU(device_ids[0]);若模型已在某个GPU上,会检查是否与主GPU一致,不一致则移动
    • 副本同步到其他GPU :在首次执行前向传播时,DataParallel 会将主GPU上的模型参数复制到 device_ids 中的其他GPU,确保所有GPU上的模型初始参数完全一致
  • 3)前向传播:数据拆分与并行计算,当调用 output = model(input) 时,DataParallel 会按以下流程处理:
    • 数据校验与准备 :检查输入数据是否在主GPU上(若不在,会自动移到主GPU)。输入可以是Tensor、列表、字典等结构,只要包含需要拆分的批量数据(如 batch_size 维度的Tensor)
    • 数据拆分(Split) :沿批量维度(默认是第0维,即 batch_size 所在维度)将输入数据均匀拆分到 device_ids 中的所有GPU。例如,若 batch_size=8 且使用2个GPU,则每个GPU会收到 batch_size=4 的子数据
    • 并行计算 :每个GPU上的模型副本会独立处理分配到的子数据,执行各自的 forward 计算,得到子输出
    • 结果收集与合并(Gather) :所有GPU的子输出会被发送回主GPU,然后按拆分的逆过程合并(如拼接),最终形成与单GPU计算格式一致的输出(例如,将2个 batch_size=4 的子输出拼接为 batch_size=8 的完整输出)
  • 4)反向传播:梯度汇总与参数更新
    当调用 loss.backward() 时,梯度计算与参数更新流程如下:
    • 各GPU独立计算梯度 :每个GPU会基于自己处理的子数据和子输出,独立计算模型参数的梯度(存储在各自GPU的模型副本中)
    • 梯度汇总到主GPUDataParallel 会自动将所有GPU上的梯度求和(sum)并汇总到主GPU的模型中(即 model.module 的参数梯度)
    • 主GPU更新参数 :优化器(如 optimizer.step())仅作用于主GPU上的模型参数,完成参数更新
    • 参数同步到其他GPU :主GPU更新后的参数会被自动广播(broadcast)到其他GPU的模型副本中,确保所有GPU的模型参数保持一致,为下一轮计算做准备
  • 5)特殊细节与注意事项
    • 模型属性访问 :由于原始模型被包装在 DataParallelmodule 属性中,访问原模型的层或参数时需通过 model.module(例如,model.module.fc 而非 model.fc)。若仅使用单GPU,DataParallel 仍会包装模型,因此建议始终通过 model.module 访问原模型
    • 单GPU场景 :若只有1个可见GPU,DataParallel 不会进行拆分计算(本质上是单GPU运行),但仍会包装模型,此时 model.module 与原始模型等价
    • 数据类型与设备兼容 :输入数据必须与主GPU设备兼容(例如,若主GPU是 cuda:0,输入数据需为 cuda:0 上的Tensor),否则会触发设备不匹配错误
    • 局限性DataParallel 是单进程多线程模式,受Python GIL限制,多GPU效率可能不如多进程的 DistributedDataParallel(DDP),且不支持跨节点并行

DistributedDataParallel(DDP)使用示例

  • 更详细的使用说明见:官网说明文档
  • DDP是多进程模式,支持单机/多机多GPU,效率更高,是PyTorch推荐的分布式训练方式

第一步:编写训练脚本

  • 注:这一步只是定义脚本,这个脚本不能通过简单的 python 命令启动(需通过torch.distributed.launch启动)
  • DDP 的示例脚本如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    import os

    import torch
    import torch.nn as nn
    import torch.distributed as dist
    from torch.utils.data import DataLoader, Dataset
    from torch.utils.data.distributed import DistributedSampler # 用于DDP的数据采样

    class DiyModel(nn.Module):
    def __init__(self):
    super().__init__()
    self.fc = nn.Linear(10, 2)

    def forward(self, x):
    return self.fc(x)

    class DiyDataset(Dataset):
    def __len__(self):
    return 1000

    def __getitem__(self, idx):
    return torch.randn(10), torch.randint(0, 2, (1,)).item()

    # 初始化分布式环境
    dist.init_process_group(backend='gloo') # 多GPU推荐用'nccl'后端(NVIDIA专门优化过),CPU使用'gloo'或'mpi',亲测Mac上'gloo'可直接使用
    rank = dist.get_rank() # 全局进程编号(0,1,2...),也可以用 env_rank = int(os.environ["RANK"])
    world_size = dist.get_world_size() # 也可以使用 world_size = int(os.environ["WORLD_SIZE"])
    local_rank = int(os.environ["LOCAL_RANK"]) # 当前进程的local_rank

    # 定义设备并设置
    device = torch.device(f"cuda:{local_rank}" if torch.cuda.is_available() else "cpu")
    torch.cuda.set_device(device) # 为当前进程分配 GPU

    # 数据准备
    dataset = DiyDataset()
    sampler = DistributedSampler(dataset) # 重点:sampler确保各进程数据不重叠
    dataloader = DataLoader(dataset, batch_size=32, sampler=sampler) # sampler传入时,shuffle参数不生效

    # 模型初始化(每个进程单独初始化,再用DDP包装)
    model = DiyModel().to(device)
    model = torch.nn.parallel.DistributedDataParallel( # 核心步骤
    model,
    device_ids=[local_rank],
    output_device=local_rank
    )

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01)

    # 训练循环
    for epoch in range(3):
    sampler.set_epoch(epoch) # 每个epoch打乱数据,避免每个epoch数据顺序一致(epoch内部顺序不随机时容易导致模型学到错误的样本顺序规律)
    for inputs, labels in dataloader:
    # 使用to(device)将数据移动到指定设备
    inputs = inputs.to(device)
    labels = labels.to(device)

    # 前向过程,会同时追踪需要的同步的数据
    outputs = model(inputs)
    loss = criterion(outputs, labels)

    optimizer.zero_grad()

    # 后向过程
    # 注:DDP 在初始化时会为模型参数注册特殊的钩子(hook),这些钩子会在 backward() 过程中自动触发
    # 当本地梯度计算完成后,钩子会启动 All-Reduce 操作,将所有进程的同一份参数的梯度进行汇总(默认求平均)
    # 完成上述流程后,每个进程上的同一份参数会拥有相同的梯度值,存储在 `grad` 属性中
    loss.backward()
    optimizer.step() # 普通的 optimizer 更新每个进程自己的参数

    # 只在主进程(rank=0)打印信息
    if rank == 0:
    print(f"Epoch {epoch}, Loss: {loss.item()}")

    dist.destroy_process_group() # 销毁进程组
附录:DDP 的 DP 间求平均的细节
  • DDP 中在 DP 间累积梯度后,做了平均,具体实现参见 github.com/pytorch/pytorch/blob/main/torch/csrc/distributed/c10d/reducer.cpp

    1
    2
    3
    4
    5
    // 取值与 DP_size 有关(注意: 这里的 size 就是 DDP 中的 world_size,也就是 DP_size)
    div_factor_ = process_group_->getSize();
    ...
    // 做除法
    bucket_view.div_(div_factor_);
  • 问题:为什么 DDP 中,loss.backward() 要对梯度求平均而不是求和?

    • 理解:本质应该是一样的,求和求平均都可以,目前实现本质也是先求和再做除法
  • 特别说明:如果是想做 Token 粒度的平均(每个样本的可学习 Token 数不一致),需要多维护一个 Token 数量的变量并执行一次 all_reduce 通信

    • 当然,为了实现与不做 DP 完全一致的效果,这里其实是应该对 Token 也做聚合,再做除法才行的

第二步:启动脚本(通过 torch.distributed.launch 命令)

  • 假设脚本名为ddp_demo.py

  • 单机4卡,使用4个GPU启动:

    1
    python -m torch.distributed.launch --nproc_per_node=4 ddp_demo.py
    • --nproc_per_node:指定单机的GPU数量
  • 多机多卡,2个机器,各有4个GPU启动:

    1
    2
    3
    4
    5
    # 机器1
    python -m torch.distributed.launch --nproc_per_node=4 --nnodes=2 --node_rank=1 --master_addr="localhost" --master_port=29500 DDP_demo.py

    # 机器2
    python -m torch.distributed.launch --nproc_per_node=4 --nnodes=2 --node_rank=0 --master_addr="localhost" --master_port=29500 DDP_demo.py
    • --nnodes:总节点数
    • --node_rank:当前节点编号
    • --master_addr:指定master节点的IP地址,默认值是”localhost”,但是显示给出来更明确
    • --master_port:指定master节点的端口号,默认值是 29500,但是显示给出来更明确

补充:torchrun 命令启动脚本(结论是在 Mac 系统下得到的,暂时没有在 Linux 上尝试)

  • torchrun 启动单机多卡(与 torch.distributed.launch 相同):

    1
    2
    3
    4
    5
    # 回顾 `torch.distributed.launch` 的启动方式
    python -m torch.distributed.launch --nproc_per_node=4 ddp_demo.py

    # torchrun 的启动方式:替换 `python -m torch.distributed.launch` 为 `torchrun` 即可
    torchrun --nproc_per_node=4 ddp_demo.py
  • torchrun 启动多机多卡(相对 torch.distributed.launch而言,torchrun 会自动推断部分参数(如节点数、进程数))

    • (待确认:Mac 系统下失败)torchrun 无需手动指定 node_rank,只需在所有节点上指定相同的 --rdzv_id(任务ID)和 --rdzv_endpoint(主节点地址):

      1
      2
      # 所有节点统一执行相同命令(不需要区分不同节点使用不同命令,会自动分配node_rank)
      torchrun --nproc_per_node=4 --nnodes=2 --rdzv_id=123 --rdzv_backend=c10d --rdzv_endpoint="localhost:29500" DDP_demo.py
      • --rdzv_id:任务唯一ID(任意整数)
      • rdzv_backend:后端(默认c10d)
      • rdzv_endpoint:主节点IP:端口
      • torchrun还可以用 --max_restarts 等参数指定最大重启次数
    • (Mac 系统下成功)在 Mac 系统下,上面的命令执行会出现问题(上面命令在两个窗口分别打开)

      1
      2
      torchrun --nproc_per_node=4 --nnodes=2 --node_rank=1 --master_addr="localhost" --master_port=29500 DDP_test.py
      torchrun --nproc_per_node=4 --nnodes=2 --node_rank=0 --master_addr="localhost" --master_port=29500 DDP_test.py
      • 亲测上面的代码可以成功
  • 更多启动详情见:/Notes/PyTorch/PyTorch——分布式程序启动方式汇总


HuggingFace Accelerate 介绍及使用示例


HuggingFace Trainer 介绍及使用示例