- 参考链接:
整体说明
- PyTorch 原生支持了
DP(DataParallel)和DDP(DistributedDataParallel)是常用的数据并行分布式训练工具 - PyTorch 原生还支持了 FSDP 作为模型并行的分布式训练工具,通过分片模型参数、梯度和优化器状态到多个 GPU,显著降低单卡内存占用
- HuggingFace Accelerate 是一个轻量级库,专为简化 PyTorch 模型在各种硬件配置上的训练和推理而设计,支持选择 DeepSpeed 和 FSDP 等
- HuggingFace 的 Trainer 是 transformers 库中一个核心且功能强大的类,它为 PyTorch 模型提供了完整的训练和评估循环,极大地简化了训练过程,让用户可以专注于模型、数据集和训练参数的配置,而无需手动编写复杂的训练代码
- Trainer 比 Accelerate 更高一级,把循环等也封装了,进需要用户配置参数数据集等即可
- 其他相关的分布式封装框架有 Horovod、Ray 等
- Horovod 是 Uber 开源的跨平台的分布式训练工具,名字来自于俄国传统民间舞蹈,舞者手牵手围成一个圈跳舞,与 Horovod 设备之间的通信模式很像
- Ray 是更高层级的分布式训练框架(利用其他框架),目标是融合数据处理、模型训练、超参数调优和模型服务等各个阶段
- 注意:篇幅有限,本文主要是记录一些简单的使用示例和说明,更详尽的使用细节需要去官网查看
- PyTorch 分布式系统的启动方式见:/Notes/PyTorch/PyTorch——分布式程序启动方式汇总
DataParallel(DP) 使用示例
- DP 是单进程多线程模式,简单易用,适合单机多 GPU 场景
- DP 的每次前向过程都会进行一次从 GPU 间的参数复制(效率较慢)
- 参考博客(有比较清晰的图片):Training Neural Nets on Larger Batches: Practical Tips for 1-GPU, Multi-GPU & Distributed setups
- DP 的使用示例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
class DiyModel(nn.Module):
def __init__(self):
super().__init__()
self.fc = nn.Linear(10, 2)
def forward(self, x):
return self.fc(x)
class DiyDataset(Dataset):
def __len__(self):
return 1000
def __getitem__(self, idx):
return torch.randn(10), torch.randint(0, 2, (1,)).item()
# 第一步:数据准备(与常规方式一致)
dataset = DiyDataset()
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
# 第二步:初始化模型、损失函数、优化器
model = DiyModel()
# 将模型放到DP中(自动分发到多个GPU),核心步骤,相对普通训练方式,仅需要修改这里
model = nn.DataParallel(model) # 注意:DP 仅增加这一步即可
model = model.cuda() # 或 .to('cuda')
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
# 第三步:训练循环
for epoch in range(3):
for inputs, labels in dataloader:
inputs = inputs.cuda()
labels = labels.cuda()
# 注:model 是 DP封装过的模型,所以能执行 DP 的个性化操作
# 在这里 model(inputs) 会执行四个流程:
# * 数据分发
# * 模型复制(主线程GPU到其他线程)
# * 并行前向推理
# * 输出汇总四个流程
outputs = model(inputs)
loss = criterion(outputs, labels)
optimizer.zero_grad()
# 注:loss 本质是从 Model 出来的(loss 的梯度是传到 outputs 上再进一步计算的,而 outputs 是模型的输出,执行梯度计算时能考虑分布式),所以也能执行 DP 的个性化操作
# 这里 loss.backward 执行四个流程:
# * 各GPU损失梯度计算:根据各GPU的outputs与总output的关系来计算各自的损失梯度(不同GPU不一样)
# * 损失梯度分发(将各自的梯度分发到各自 GPU 上
# * 并行后向推理(计算 GPU 自身的局部梯度)
# * 梯度汇总
loss.backward()
optimizer.step() # 仅更新主线程GPU上的模型
print(f"Epoch {epoch}, Loss: {loss.item()}")
nn.DataParallel(model) 发生了什么?
- TLDR:
model = nn.DataParallel(model)的核心作用是通过包装模型实现多 GPU 数据并行 :- 执行这一行后,模型对象已经变了,需要通过
model.module.fc才能访问原模型属性(访问model.fc会出错) - DP 中,仅需要这一步即可实现数据并行 ,后续的代码都无需修改,自动适配了;但代码后面做了很多数据分发和梯度合并的工作
- 这一行后,程序会自动管理设备分配、模型复制、数据拆分与合并、梯度汇总与参数同步
- 执行这一行后,模型对象已经变了,需要通过
- 1)初始化:确定并行设备与包装模型,
nn.DataParallel实例化时会完成以下核心操作:- 检测可用GPU :默认情况下,
DataParallel会自动检测当前可见的GPU(通过CUDA_VISIBLE_DEVICES环境变量控制),并将其ID列表存储在device_ids属性中(默认值为range(torch.cuda.device_count())) - 指定主GPU :主GPU(
device[0])是默认的“主导设备”,负责汇总计算结果、更新参数,并作为数据/模型的初始落脚点。若未指定device_ids,则第一个可见GPU(通常是cuda:0)会被设为主GPU - 包装原始模型 :原始模型会被存入
DataParallel的module属性中(这也是后续访问原模型属性需通过model.module的原因),同时DataParallel会接管模型的forward和backward逻辑
- 检测可用GPU :默认情况下,
- 2)模型的移动与复制:
DataParallel会自动处理模型在设备间的分布:- 主GPU加载模型 :若原始模型在CPU上,
DataParallel会先将其移动到主GPU(device_ids[0]);若模型已在某个GPU上,会检查是否与主GPU一致,不一致则移动 - 副本同步到其他GPU :在首次执行前向传播时,
DataParallel会将主GPU上的模型参数复制到device_ids中的其他GPU,确保所有GPU上的模型初始参数完全一致
- 主GPU加载模型 :若原始模型在CPU上,
- 3)前向传播:数据拆分与并行计算,当调用
output = model(input)时,DataParallel会按以下流程处理:- 数据校验与准备 :检查输入数据是否在主GPU上(若不在,会自动移到主GPU)。输入可以是Tensor、列表、字典等结构,只要包含需要拆分的批量数据(如
batch_size维度的Tensor) - 数据拆分(Split) :沿批量维度(默认是第0维,即
batch_size所在维度)将输入数据均匀拆分到device_ids中的所有GPU。例如,若batch_size=8且使用2个GPU,则每个GPU会收到batch_size=4的子数据 - 并行计算 :每个GPU上的模型副本会独立处理分配到的子数据,执行各自的
forward计算,得到子输出 - 结果收集与合并(Gather) :所有GPU的子输出会被发送回主GPU,然后按拆分的逆过程合并(如拼接),最终形成与单GPU计算格式一致的输出(例如,将2个
batch_size=4的子输出拼接为batch_size=8的完整输出)
- 数据校验与准备 :检查输入数据是否在主GPU上(若不在,会自动移到主GPU)。输入可以是Tensor、列表、字典等结构,只要包含需要拆分的批量数据(如
- 4)反向传播:梯度汇总与参数更新
当调用loss.backward()时,梯度计算与参数更新流程如下:- 各GPU独立计算梯度 :每个GPU会基于自己处理的子数据和子输出,独立计算模型参数的梯度(存储在各自GPU的模型副本中)
- 梯度汇总到主GPU :
DataParallel会自动将所有GPU上的梯度求和(sum)并汇总到主GPU的模型中(即model.module的参数梯度) - 主GPU更新参数 :优化器(如
optimizer.step())仅作用于主GPU上的模型参数,完成参数更新 - 参数同步到其他GPU :主GPU更新后的参数会被自动广播(broadcast)到其他GPU的模型副本中,确保所有GPU的模型参数保持一致,为下一轮计算做准备
- 5)特殊细节与注意事项
- 模型属性访问 :由于原始模型被包装在
DataParallel的module属性中,访问原模型的层或参数时需通过model.module(例如,model.module.fc而非model.fc)。若仅使用单GPU,DataParallel仍会包装模型,因此建议始终通过model.module访问原模型 - 单GPU场景 :若只有1个可见GPU,
DataParallel不会进行拆分计算(本质上是单GPU运行),但仍会包装模型,此时model.module与原始模型等价 - 数据类型与设备兼容 :输入数据必须与主GPU设备兼容(例如,若主GPU是
cuda:0,输入数据需为cuda:0上的Tensor),否则会触发设备不匹配错误 - 局限性 :
DataParallel是单进程多线程模式,受Python GIL限制,多GPU效率可能不如多进程的DistributedDataParallel(DDP),且不支持跨节点并行
- 模型属性访问 :由于原始模型被包装在
DistributedDataParallel(DDP)使用示例
- 更详细的使用说明见:官网说明文档
DDP是多进程模式,支持单机/多机多GPU,效率更高,是PyTorch推荐的分布式训练方式
第一步:编写训练脚本
- 注:这一步只是定义脚本,这个脚本不能通过简单的 python 命令启动(需通过
torch.distributed.launch启动) - DDP 的示例脚本如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75import os
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.utils.data import DataLoader, Dataset
from torch.utils.data.distributed import DistributedSampler # 用于DDP的数据采样
class DiyModel(nn.Module):
def __init__(self):
super().__init__()
self.fc = nn.Linear(10, 2)
def forward(self, x):
return self.fc(x)
class DiyDataset(Dataset):
def __len__(self):
return 1000
def __getitem__(self, idx):
return torch.randn(10), torch.randint(0, 2, (1,)).item()
# 初始化分布式环境
dist.init_process_group(backend='gloo') # 多GPU推荐用'nccl'后端(NVIDIA专门优化过),CPU使用'gloo'或'mpi',亲测Mac上'gloo'可直接使用
rank = dist.get_rank() # 全局进程编号(0,1,2...),也可以用 env_rank = int(os.environ["RANK"])
world_size = dist.get_world_size() # 也可以使用 world_size = int(os.environ["WORLD_SIZE"])
local_rank = int(os.environ["LOCAL_RANK"]) # 当前进程的local_rank
# 定义设备并设置
device = torch.device(f"cuda:{local_rank}" if torch.cuda.is_available() else "cpu")
torch.cuda.set_device(device) # 为当前进程分配 GPU
# 数据准备
dataset = DiyDataset()
sampler = DistributedSampler(dataset) # 重点:sampler确保各进程数据不重叠
dataloader = DataLoader(dataset, batch_size=32, sampler=sampler) # sampler传入时,shuffle参数不生效
# 模型初始化(每个进程单独初始化,再用DDP包装)
model = DiyModel().to(device)
model = torch.nn.parallel.DistributedDataParallel( # 核心步骤
model,
device_ids=[local_rank],
output_device=local_rank
)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
# 训练循环
for epoch in range(3):
sampler.set_epoch(epoch) # 每个epoch打乱数据,避免每个epoch数据顺序一致(epoch内部顺序不随机时容易导致模型学到错误的样本顺序规律)
for inputs, labels in dataloader:
# 使用to(device)将数据移动到指定设备
inputs = inputs.to(device)
labels = labels.to(device)
# 前向过程,会同时追踪需要的同步的数据
outputs = model(inputs)
loss = criterion(outputs, labels)
optimizer.zero_grad()
# 后向过程
# 注:DDP 在初始化时会为模型参数注册特殊的钩子(hook),这些钩子会在 backward() 过程中自动触发
# 当本地梯度计算完成后,钩子会启动 All-Reduce 操作,将所有进程的同一份参数的梯度进行汇总(默认求平均)
# 完成上述流程后,每个进程上的同一份参数会拥有相同的梯度值,存储在 `grad` 属性中
loss.backward()
optimizer.step() # 普通的 optimizer 更新每个进程自己的参数
# 只在主进程(rank=0)打印信息
if rank == 0:
print(f"Epoch {epoch}, Loss: {loss.item()}")
dist.destroy_process_group() # 销毁进程组
附录:DDP 的 DP 间求平均的细节
DDP 中在 DP 间累积梯度后,做了平均,具体实现参见 github.com/pytorch/pytorch/blob/main/torch/csrc/distributed/c10d/reducer.cpp
1
2
3
4
5// 取值与 DP_size 有关(注意: 这里的 size 就是 DDP 中的 world_size,也就是 DP_size)
div_factor_ = process_group_->getSize();
...
// 做除法
bucket_view.div_(div_factor_);问题:为什么 DDP 中,loss.backward() 要对梯度求平均而不是求和?
- 理解:本质应该是一样的,求和求平均都可以,目前实现本质也是先求和再做除法
特别说明:如果是想做 Token 粒度的平均(每个样本的可学习 Token 数不一致),需要多维护一个 Token 数量的变量并执行一次
all_reduce通信- 当然,为了实现与不做 DP 完全一致的效果,这里其实是应该对 Token 也做聚合,再做除法才行的
第二步:启动脚本(通过 torch.distributed.launch 命令)
假设脚本名为
ddp_demo.py单机4卡,使用4个GPU启动:
1
python -m torch.distributed.launch --nproc_per_node=4 ddp_demo.py
--nproc_per_node:指定单机的GPU数量
多机多卡,2个机器,各有4个GPU启动:
1
2
3
4
5# 机器1
python -m torch.distributed.launch --nproc_per_node=4 --nnodes=2 --node_rank=1 --master_addr="localhost" --master_port=29500 DDP_demo.py
# 机器2
python -m torch.distributed.launch --nproc_per_node=4 --nnodes=2 --node_rank=0 --master_addr="localhost" --master_port=29500 DDP_demo.py--nnodes:总节点数--node_rank:当前节点编号--master_addr:指定master节点的IP地址,默认值是”localhost”,但是显示给出来更明确--master_port:指定master节点的端口号,默认值是 29500,但是显示给出来更明确
补充:torchrun 命令启动脚本(结论是在 Mac 系统下得到的,暂时没有在 Linux 上尝试)
torchrun启动单机多卡(与torch.distributed.launch相同):1
2
3
4
5回顾 `torch.distributed.launch` 的启动方式
python -m torch.distributed.launch --nproc_per_node=4 ddp_demo.py
torchrun 的启动方式:替换 `python -m torch.distributed.launch` 为 `torchrun` 即可
torchrun --nproc_per_node=4 ddp_demo.pytorchrun启动多机多卡(相对torch.distributed.launch而言,torchrun会自动推断部分参数(如节点数、进程数))(待确认:Mac 系统下失败)
torchrun无需手动指定node_rank,只需在所有节点上指定相同的--rdzv_id(任务ID)和--rdzv_endpoint(主节点地址):1
2# 所有节点统一执行相同命令(不需要区分不同节点使用不同命令,会自动分配node_rank)
torchrun --nproc_per_node=4 --nnodes=2 --rdzv_id=123 --rdzv_backend=c10d --rdzv_endpoint="localhost:29500" DDP_demo.py--rdzv_id:任务唯一ID(任意整数)rdzv_backend:后端(默认c10d)rdzv_endpoint:主节点IP:端口torchrun还可以用--max_restarts等参数指定最大重启次数
(Mac 系统下成功)在 Mac 系统下,上面的命令执行会出现问题(上面命令在两个窗口分别打开)
1
2torchrun --nproc_per_node=4 --nnodes=2 --node_rank=1 --master_addr="localhost" --master_port=29500 DDP_test.py
torchrun --nproc_per_node=4 --nnodes=2 --node_rank=0 --master_addr="localhost" --master_port=29500 DDP_test.py- 亲测上面的代码可以成功