Jiahong 的个人博客

凡事预则立,不预则废


  • Home

  • Tags

  • Archives

  • Navigation

  • Search

ML——样本不均衡问题处理

  • 参考链接:
    • 一文解决样本不均衡(全)

样本不均衡问题描述

  • 问题描述:机器学习中的样本不均衡问题,是指在分类任务中不同类别的训练样例数量存在显著差异的情况
  • 场景:包括金融欺诈检测、医疗诊断、网络入侵检测等领域

样本不均衡的影响

  • 模型偏向性 :模型倾向于预测label为多数类,因为这样做可以最大化准确率,即使对少数类的预测几乎总是错误的
  • 过拟合风险 :由于少数类样本数量少,模型容易过拟合这些样本,导致模型在新数据上泛化性差
  • 总结:样本不均衡带来的根本影响是模型会学习到训练集中样本比例的这种先验性信息,以致于实际预测时就会对多数类别有侧重(可能导致多数类精度更好,而少数类比较差)通过解决样本不均衡,可以减少模型学习样本比例的先验信息,以获得能学习到辨别好坏本质特征的模型

哪些情况下需要解决样本不均衡问题


评估指标

  • 不能使用:准确率(Accuracy)
  • 建议使用:F1分数、精确率(Precision)、召回率(Recall)和AUC等

解决样本不均衡的方法

数据层面的方法

  • 过采样(Over-sampling) :通过增加少数类样本的数量来缓解不均衡问题。常见的方法包括:
    • 随机过采样 :简单地复制少数类样本,容易导致过拟合
    • 数据增强(Data Augmentation) :通过变换现有样本生成新的样本,适用于图像(翻转,拉伸等)、文本等数据类型
    • SMOTE(Synthetic Minority Over-sampling Technique) :通过在少数类样本间插值生成新的合成样本,一定程度上可以避免过拟合,但生成的数据置信度也存在问题
    • ADASYN(Adaptive Synthetic Sampling) :通过生成少数类别样本改善不均衡问题,与传统的过采样技术(如SMOTE)相比,ADASYN更注重类间距离和局部密度,尤其是对于少数类中的困难样本或噪声敏感区域,它可以更加精细地调整采样策略,详情可参考:不平衡学习的自适应合成采样方法ADASYN(Matlab代码实现)
  • 欠采样(Under-sampling) :通过减少多数类样本的数量来达到平衡。常用的方法有:
    • 随机欠采样 :随机删除多数类样本,但可能丢失重要信息
    • Tomek Links :移除位于类别边界附近的样本对,有助于清理噪声
    • ENN(Edited Nearest Neighbors Rule) :移除与最近邻居类别不同的样本
    • NearMiss :选择与少数类样本最接近的多数类样本
  • 组合采样 :结合过采样和欠采样的方法

模型层面的方法

  • 成本敏感学习(Cost-sensitive Learning) :为不同类别的错误分配不同的固定权重,使模型更加关注少数类的预测
  • 集成学习(Ensemble Learning) :使用集成学习的方法降低过拟合问题(比如多个模型进行投票或加权平均)

损失函数层面

  • 损失函数优化 :比如使用Focal Loss等

异常检测

  • 异常检测(Anomaly Detection) :将少数类视为异常点(特别是少数类样本非常稀少的情况),使用异常检测算法进行识别,常见的异常检测算法通常包括统计方法、距离检测(K近邻方法)、聚类方法(K-Means)、分类模型(one-class SVM)、自编码器、孤立森林等
    • 其中K近邻方法可以用于回归或者分类场景,还可以直接用于异常检测(距离过大的点认为是异常)
    • K-Means方法作异常检测时,原理聚类中心的样本视为异常值
    • 自编码器方法:通过训练一个自编码器来重建数据,重建误差大的数据点被视为异常点
    • 孤立森林(Isolation Forest):通过随机挑选特征、随机挑选分割点构建多棵孤立树,在所有孤立树中的平均路径长度的数据点被视为异常点(孤立森林的核心思想是异常点通常更容易被孤立,即在随机分割数据空间时,异常点往往比正常点更快地被单独分隔出来)

实践建议

  • 评估指标的选择 :在不均衡数据集上,不能使用准确率(Accuracy),建议使用均衡指标(F1-Score,AUC等)
  • 模型选择 :选择对不均衡数据的适应性较好的模型,如决策树和随机森林等,可以多使用一些简单的模型,再进行Bagging来减少过拟合
  • 具体情况具体分析,很多时候不需要特别的进行处理,另外如果真的进行采样,会导致预估均值有偏差,在CTR等对预估绝对值有要求的场景中,还需要校准

RS——从CG到NDCG评估指标


整体说明

  • CG(Cumulative Gain,累计增益),DCG(Discounted Cumulative Gain,折损累计增益)和NDCG(Normalized Discounted Cumulative Gain,归一化折损累计增益)是信息检索和推荐系统中常用的评估指标,用于衡量排序结果的质量

CG(Cumulative Gain,累计增益)

  • 定义 :单纯对前\( k \)个结果的相关性分数求和,不考虑位置的影响
  • 公式 :
    $$
    CG@k = \sum_{i=1}^k rel_i
    $$
    • \( rel_i \):第\( i \)个结果的相关性分数(如:0不相关,1相关,2非常相关等)
    • \( rel_i \)的准确性尤为重要,通常使用:
      • 人工标注相关性
      • 线上用户真实行为数据(比如点击为1,长时间观看为2,购买/下单为3,跳过为0)
      • 注:特殊场景下也会使用模型预测值
  • 局限性 :未考虑排序顺序对用户体验的影响(例如,相关结果排在后面对用户价值更低)

DCG(Discounted Cumulative Gain,折损累计增益)

  • 定义 :在CG基础上引入位置折损 ,排名越靠后的结果对总增益的贡献越小
  • 公式(常用版本) :
    $$
    DCG@k = \sum_{i=1}^k \frac{rel_i}{\log_2(i+1)}
    $$
    • 折损因子 :\( \log_2(i+1) \) 会随着位置\( i \)增大而降低当前结果的贡献
  • 变体(更强调相关性) :
    $$
    DCG@k = \sum_{i=1}^k \frac{2^{rel_i} - 1}{\log_2(i+1)}
    $$
    • 适用于相关性分数差异较大的场景(如0/1/3/5分级)

NDCG(Normalized DCG,归一化折损累计增益)

  • 定义 :将DCG除以理想排序下的DCG(IDCG) ,得到归一化分数(0~1之间)
  • 公式 :
    $$
    NDCG@k = \frac{DCG@k}{IDCG@k}
    $$
    • \( IDCG@k \):将前\( k \)个结果按相关性从高到低排序后计算的DCG(即理论最大值)
  • 特点 :
    • 值越接近1,排序越接近理想状态
    • 解决了不同查询间DCG无法直接比较的问题(因为不同查询的IDCG可能不同)

使用场景与示例

  • 适用领域 :
    • 搜索引擎结果排序
    • 推荐系统(如电影、商品推荐)
    • 问答系统答案排序
  • 示例 :
    • 查询结果的相关性分数:[3, 2, 3, 0, 1](按当前排序)
    • 计算DCG@3:
      $$
      DCG@3 = \frac{3}{\log_2 2} + \frac{2}{\log_2 3} + \frac{3}{\log_2 4} \approx 3 + 1.26 + 1.5 = 5.76
      $$
    • 理想排序的相关性分数:[3, 3, 2] -> \( IDCG@3 \approx 6.43 \)
    • \( NDCG@3 = \frac{5.76}{6.43} \approx 0.90 \)

优缺点

  • 优点 :
    • 考虑相关性分级和位置因素,更贴近用户实际体验
    • NDCG提供标准化比较,适合不同查询间的评估
  • 缺点 :
    • 需要人工标注相关性分数(成本高)
    • 对相关性分数的定义敏感(如0/1/2还是0/1/3/5)

附录:推荐系统中的其他评估指标

HitRate@K

  • 通常包含 用户粒度HitRate@K 和 物品粒度HitRate@K 两种
    • 用户粒度HitRate@K,也称为二值命中率(Binary Hit Rate):
      • 对单个用户而言,只要推荐列表(TopK)中有至少一个相关物品则该样本(用户)算作为命中 (即Hit = 1),否则为不命中(即Hit=0)
      • 用户粒度HitRate@K是所有用户命中情况的平均值:
        $$ HitRate@K = \frac{兴趣出现在TopK物品的用户数量}{总用户数量} $$
    • 物品粒度HitRate@K,也称为命中次数占比(Hit Ratio)
      • 对被推荐的单个物品而言,如果是用户喜欢的则视为命中 (即Hit = 1),否则为不命中(即Hit=0)
      • 物品粒度HitRate@K是所有推荐物品命中情况的平均值
        $$ HitRate@K = \frac{总命中物品数量}{总推荐物品数量} = \frac{\sum_{i}命中用户i的物品数量}{\sum_{i}给用户i的推荐物品总数} $$

MRR

  • TLDR:MRR(Mean Reciprocal Rank)是对每个查询的相关文档在推荐列表中排名的倒数的平均值
  • 具体计算方法为:
    $$MRR=\frac{1}{|Q|}\sum_{i=1}^{|Q|}\frac{1}{rank_i}$$
    • \(|Q|\)是查询的总数
    • \(rank_i\)是第\(i\)个查询中第一个相关文档在推荐列表中的排名
    • 如果一个查询在推荐列表中没有相关文档,则该查询对MRR的贡献为\(0\)
  • MRR主要用于衡量推荐系统在返回相关结果时的排序能力,它特别关注第一个相关结果在推荐列表中的位置,能够反映出推荐系统将最相关的项目排在前面的能力
  • 举例:假设用户有3个查询,对应的推荐列表及相关文档的排名如下:
    • 查询1:推荐列表为$$D_3,D_1,D_2$$,其中\(D_1\)是相关文档,排名为\(2\),则该查询的\(\frac{1}{rank}=\frac{1}{2}\)
    • 查询2:推荐列表为$$D_2,D_4,D_1$$,相关文档\(D_2\)排名为\(1\),该查询的\(\frac{1}{rank}=1\)
    • 查询3:推荐列表为$$D_3,D_4,D_5$$,没有相关文档,该查询的\(\frac{1}{rank}=0\)
    • 那么\(MRR = \frac{(\frac{1}{2}+1+0)}{3}=\frac{1.5}{3}=0.5\)

AP & mAP

  • 参见:RS——推荐系统评估指标-mAP

RS——推荐系统综述

  • 参考博客:
    • http://xtf615.com/2018/05/03/recommender-system-survey/
    • https://zhuanlan.zhihu.com/p/27502172

推荐系统分类(基于推荐依据分类)

[待更新]

基于内容的推荐

Content-based recommenders

  • 推荐和用户曾经喜欢的商品相似的商品
  • 主要是基于商品属性信息和用户画像信息的对比
  • 核心问题是如何刻画商品属性和用户画像以及效用的度量
  • 使用一个效用函数(utility function)来评价特定用户 \(c \in C\) 对特定项目 \(s_{c}’ \in S\) 的评分, \(UserProfit(c), ItemProfit(s)\) 分别表示用户和商品的收益函数(Profit Function)[存疑: 待更新]
    $$u(c,s) = score(UserProfit(c), ItemProfit(s))$$
    • 基于启发式的方法(Heuristic-based method):
      • 特征构建: 基于关键字提取等方法,使用TF-IDF等指标提取关键字作为特征
      • 效用的度量: 使用启发式cosine等相似性指标, 衡量商品特征和用户画像的相似性,相似性越高,效用越大
    • 基于机器学习的方法(Machine learning-based mehod):
      • 特征构建: 使用机器学习算法来构建用户和商品的维度特征,例如建模商品属于某个类别,得到商品的刻画属性
      • 效用的度量: 直接使用机器学习算法拟合效用函数
  • 对于任意的用户 \(c \in C\),我们可以通过选择商品 \(s_{c}’ \in S\) 来得到最大化的效用函数
    $$\forall c \in C, s_{c}’ = \mathop{\arg\max}_{s \in S} u(c,s)$$

基于协同过滤的推荐

Collaborative filtering recommenders

基于内存的推荐
  • 直接对User-Item矩阵进行研究
    • User-based CF: 推荐给特定用户列表中还没有发生过行为、而在相似用户列表中产生过行为的高频商品
    • Item-based CF: 推荐给特定用户列表中还没有发生过行为、并且和当前用户已经发生过行为的商品相似的商品
基于模型的推荐
损失函数+正则项(Loss Function)
神经网络+层(Neural Network)
图模型+圈(Graph Model)
基于矩阵分解的推荐
  • Traditional SVD
    • 传统的SVD分解
      $$R_{m\times n} = U_{m \times k} \Sigma_{k \times k} V_{k \times n}^T$$
    • 缺点:
      • 普通SVD分解时矩阵必须是稠密的,即每个位置都必须有值
      • 如果矩阵是稀疏的,有空值,那么需要先用均值或者其他统计学方法来填充矩阵
      • 计算复杂度高,空间消耗大
  • FunkSVD
    • 分解为两个低秩矩阵而不是三个矩阵
      $$\hat{r}_{u,i} = q_i^T p_u$$
      • \(\hat{r}_{u,i}\) 指用户对商品的评分
      • \(q_{i}\) 是商品 \(i\) 在隐空间的隐向量表示
      • \(p_{u}\) 是用户 \(u\) 在隐空间的隐向量表示
    • 不使用传统的矩阵分解方式,定一个损失函数(针对 \(\hat{r}_{u,i}\) 未缺失的地方, \(\hat{r}_{u,i}\) 缺失的地方训练时不用管),然后用梯度下降法进行参数优化
      • 优化问题(最小化损失函数)定义如下
        $$q^{\star}, p^{\star} = \min_{q, p} \sum_{(u,i) \in R_{train}} (r_{u,i} - q_i^T p_u)^2 + \lambda (||q_i||^2 + ||p_u||^2 )$$
        • 正则项是两个参数的L2正则
        • \(R_{train}\) 是评分矩阵中可观测的数据对构成的集合, 缺失值不参与训练, 缺失值是需要我们最终预测的
  • BiasSVD
    • BiasSVD是FunkSVD诸多变形版本的一个相对成功的方法
    • 带有偏执项的SVD分解
    • 基于假设:
      • 关于用户的假设: 天生存在偏好,有的喜欢给商品好评,有的喜欢给商品差评
        • 这些用户的固有属性与商品无关
      • 关于商品的假设: 天生存在优劣,有的容易被人给好评,有的容易被人给差评
        • 这些商品的固有属性与用户无关
    • 优化问题(最小化损失函数)定义
      $$
      \begin{align}
      &\hat{r}_{ui} = \mu + b_u + b_i + q_i^Tp_u \\
      &min \sum_{r_{ui} \in R_{train}} \left(r_{ui} - \hat{r}_{ui} \right)^2 + \lambda\left(b_i^2 + b_u^2 + ||q_i||^2 + ||p_u||^2\right)
      \end{align}
      $$
    • 梯度下降更新公式
      $$
      \begin{align}
      \begin{split}b_u &\leftarrow b_u &+ \gamma (e_{ui} - \lambda b_u) \\
      b_i &\leftarrow b_i &+ \gamma (e_{ui} - \lambda b_i)\\
      p_u &\leftarrow p_u &+ \gamma (e_{ui} \cdot q_i - \lambda p_u)\\
      q_i &\leftarrow q_i &+ \gamma (e_{ui} \cdot p_u - \lambda q_i)\end{split}
      \end{align}
      $$
      • \(e_{ui} = r_{ui} - \hat{r}_{ui}\)
      • \(b_{u}\) [待更新]
      • \(b_{i}\) [待更新]
      • \(u\) [待更新]
  • SVD++
    • SVD++是对BiasSVD进行改进的
    • 基于假设:
      • 除了显示的评分行为以外,用户对于商品的浏览记录或购买记录(隐式反馈)也可以从侧面反映用户的偏好。相当于引入了额外的信息源,能够解决因显示评分行为较少导致的冷启动问题
    • 优化问题定义[待更新]
  • TimeSVD++
    • 之前的模型都是静态的,这里TimeSVD++是动态的,每个时间段学习一个参数,不同时间段使用该时间段的训练数据进行学习
    • 优化问题定义[待更新]
  • BiasSVDwithU
  • NMF
    • 非负矩阵分解,Nonnegative Matrix Factorization
  • PMF
    • 概率矩阵分解,Probabilistic Matrix Factorization
  • WRMF
    • weighted regularized matrix factorization

混合的推荐

  • 基于混合的推荐,顾名思义,是对以上算法的融合
  • 淘宝上就既有基于内容的推荐也有协同过滤的推荐
  • 对模型的融合可以参考集成学习的三种集成方式ML——Ensemble-集成学习

推荐系统分类(基于推荐的最终输出形式分类)

评分预测模型

Rating prediction

  • 核心目的: 填充User-Item矩阵中的缺失值
  • 模型衡量指标: 均方根误差(RMSE, Root Mean Squard Error), 平均绝对误差(MAE, Mean Absolute Error)

排序预测模型

Ranking prediction (top-n recommendation)

  • 核心目的: 给定一个用户,推荐一个有序的商品列表
  • 模型衡量指标: Precision@K, Recall@K等

分类模型

Classification

  • 核心目的: 将候选商品分类,然后用于推荐
  • 模型衡量指标: Accuracy

RS——推荐系统评估指标-mAP


整体说明

  • 在推荐系统中,mAP(Mean Average Precision)是衡量推荐结果准确性的重要指标 ,主要用于评估排序型推荐任务(如搜索、推荐列表排序)的性能
  • 综合考虑了推荐结果的相关性(Relevance)和排序质量(Ranking Quality) ,尤其适用于多类别推荐场景或需要衡量整体推荐精度的场景

AP(Average Precision,平均精度)

  • 定义 :对于单个查询(或用户),AP是其所有相关推荐结果的 Precision(精确率)的平均值 ,且仅对相关结果计算
  • 计算逻辑 :
    • 假设推荐列表中有 \( N \) 个结果,按排序从左到右依次判断每个结果是否为“相关项”(需预先定义相关性标准,如用户点击、购买等)
    • 对于每个相关项,计算其当前位置的精确率(即截至该位置,相关项在已推荐结果中的比例),并将所有相关项的精确率求平均,得到AP
  • 公式示例 :若推荐列表中有5个结果,相关性标记为 \( [1, 0, 1, 1, 0] \)(1表示相关,0表示不相关),则:
    • 第1个相关项位置:1,精确率 \( P_1 = 1/1 = 1 \)
    • 第3个相关项位置:3,精确率 \( P_3 = 2/3 \)
    • 第4个相关项位置:4,精确率 \( P_4 = 3/4 \)
    • AP = \( (1 + 2/3 + 3/4) / 3 ≈ 0.9167 \)

mAP(Mean Average Precision,平均AP)

  • 定义 :将AP指标扩展到所有查询(或用户) ,计算所有AP值的平均值,用于衡量推荐系统的整体性能
  • 公式 :
    $$
    \text{mAP} = \frac{1}{M} \sum_{i=1}^{M} \text{AP}_i
    $$
    • 其中,\( M \) 为查询(或用户)总数,\( \text{AP}_i \) 为第 \( i \) 个查询的AP值

附录:完整的mAP计算示例

  • 假设存在2个用户(查询),推荐列表及相关性如下:
  • 用户1 :推荐列表 = [A(1), B(0), C(1), D(0), E(1)]
    • 相关项位置:1、3、5
    • 各相关项精确率:
    • \( P_1 = 1/1 = 1 \)
    • \( P_3 = 2/3 ≈ 0.6667 \)
    • \( P_5 = 3/5 = 0.6 \)
    • \( \text{AP}_1 = (1 + 0.6667 + 0.6) / 3 ≈ 0.7556 \)
  • 用户2 :推荐列表 = [X(0), Y(1), Z(1)]
    • 相关项位置:2、3
    • 各相关项精确率:
      • \( P_2 = 1/2 = 0.5 \)
      • \( P_3 = 2/3 ≈ 0.6667 \)
      • \( \text{AP}_2 = (0.5 + 0.6667) / 2 ≈ 0.5833 \)
  • mAP计算 :
    $$
    \text{mAP} = \frac{0.7556 + 0.5833}{2} ≈ 0.6695
    $$

DL——UserID和ItemID特征的处理

本文分析UserID(User ID)和ItemID(Item ID)特征的及其常见处理方法

  • 参考链接:
    • 深度学习推荐算法中user-id和item-id是否需要放入模型中作为特征进行训练呢? - 杨木易的回答 - 知乎
    • Empowering Long-tail Item Recommendation through Cross Decoupling Network (CDN), Google, KDD 2023

UserID和ItemID类特征有什么特点?

  • UserID和ItemID可训练的交互数据一般服从长尾分布。从UserID视角看,头部UserID占据了样本的大多数;从ItemID视角看,头部ItemID占据了样本的大多数
  • 头部的ID对应的Embedding可以得到充分学习,但是尾部的ID由于数据不够,他们的Embedding往往得不到充足的学习(一轮数据训练完成,Embedding几乎没有被更新过,那就与随机初始化参数无异了)
  • 通常UserID和ItemID是高维稀疏特征,如果表示成One-Hot特征的话,会出现几千万甚至上亿维度的向量中,只有一个维度为1的情况
  • 这部分特征样本区分度很高,常常称为“记忆性”特征,他们没有什么泛化能力,但是可以记住某些交互行为(比如用户A购买过物品B)

是否应该加入模型?

  • 基本理念 :适合加入的情况应该是大部分ID训练样本充足的情况,且线上serving时遇到的新ID不能太多
  • 数据量要求:
    • 基本思路:每个ID取值下,平均样本量越少的场景越不建议加入ID特征,如果数据过于集中于极少数头部ID,还需要看看长尾情况,长尾商家(样本不足的商家)越多,越不建议加入ID特征
    • 理解:ID是否能用还是要看是否有充足的数据量训练ID对应的Embedding
  • 冷起要求:
    • 基本思路:ItemID更新越频繁(即物品池频繁汰换)的场景越不建议加入ItemID特征;新用户占比越高的场景越不建议加入UserID特征
    • 理解:新ID过多时,新的ID都是模型没有见过的,模型效果会衰减的非常快,现实生活中,可以通过使用 T+N 的数据集作为测试集验证是否有衰减问题

加入模型的哪个位置?

  • 数据充足时 :每个ID数据充足的场景,可以将这些ID特征当做普通特征对待,直接Embedding后加入模型
  • 数据不足时 :
    • 可以考虑使用特征粒度Dropout(直接按照Embedding维度Dropout)的方式来增强泛化性【TOOD:如何理解?】
    • 可以考虑将这部分特征放到一个独立的塔,然后其他特征在另一个塔,引入一个门控网络来控制两个塔对最终预估结果的影响。可参考谷歌的Empowering Long-tail Item Recommendation through Cross Decoupling Network (CDN),其网络结构图如下:

PyTorch——FSDP的使用

  • 参考链接:
    • 官方文档:docs.pytorch.org/docs/stable/fsdp.html

整体说明

  • FSDP(Fully Sharded Data Parallel)是 PyTorch 推出的分布式训练技术,专为大规模模型训练设计
  • FSDP 通过分片模型参数、梯度和优化器状态到多个 GPU,显著降低单卡内存占用,支持训练远超单卡内存容量的大型模型(如千亿参数级模型)
  • 与传统的 DDP(Distributed Data Parallel)相比
    • DDP 会在每个 GPU 上保存完整的模型参数、梯度和优化器状态(数据并行)
    • FSDP 仅在每个 GPU 上保存部分分片,大幅提升内存效率(模型并行)
  • FSDP 的核心特性包括
    • 全分片机制 :模型参数、梯度、优化器状态均被分片存储在不同 GPU,仅在计算时临时聚合所需分片
    • 自动通信优化 :通过重叠计算与通信、按需聚合分片,减少分布式训练的通信开销
    • 混合精度支持 :原生支持 FP16/BF16 混合精度训练,进一步节省内存
    • 灵活的包装策略 :可通过 wrap_policy 指定需要分片的子模块,支持部分模块分片、部分模块复制(如小模型组件)
    • 与模型并行结合 :可与 Tensor Parallel 等模型并行技术结合,支持超大规模模型训练
  • FSDP 的工作流程
    • 1)初始化 :将模型划分为多个子模块,每个子模块的参数被分片到不同 GPU
    • 2)前向传播 :当需要某子模块参数时,FSDP 自动聚合所需分片到当前 GPU,计算完成后释放临时聚合的参数
    • 3)反向传播 :梯度按相同策略分片存储,避免全量梯度占用内存
    • 4)参数更新 :优化器仅更新本地保存的分片参数,通过通信同步确保全局一致性
  • 注意事项:
    • 模型保存/加载时需使用 FSDP 提供的 state_dict() 方法,避免直接保存原始模型参数(分片后参数分布在不同进程)
    • 可结合 activation checkpointing 进一步节省内存

FSDP 单机多卡使用示例

  • FSDP 单机多卡示例代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    import torch
    import torch.nn as nn
    import torch.optim as optim
    from torch.utils.data import Dataset, DataLoader
    import torch.distributed as dist
    from torch.nn.parallel import DistributedDataParallel as DDP
    from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
    from torch.distributed.fsdp.fully_sharded_data_parallel import (
    CPUOffload,
    BackwardPrefetch,
    ) # 用括号的导入方式跟普通方式效果完全相同,区别在于括号的方式看起来更容易管理和注释某一行
    from torch.distributed.fsdp.wrap import size_based_auto_wrap_policy
    import torch.multiprocessing as mp
    from torch.utils.data.distributed import DistributedSampler

    # 1. 定义数据集
    class DiyDataset(Dataset):
    def __len__(self):
    return 1000

    def __getitem__(self, idx):
    x = torch.randn(10) # 输入特征
    y = torch.randint(0, 2, (1,)).item() # 分类标签(二分类)
    return x, y

    # 2. 定义模型(包含多个子模块,便于后续使用 FSDP 分片)
    class SubModule(nn.Module):
    def __init__(self, input_dim, output_dim):
    super().__init__()
    self.fc = nn.Linear(input_dim, output_dim)
    self.relu = nn.ReLU()

    def forward(self, x):
    return self.relu(self.fc(x))

    class MainModel(nn.Module):
    def __init__(self):
    super().__init__()
    self.layer1 = SubModule(10, 20)
    self.layer2 = SubModule(20, 30)
    self.layer3 = SubModule(30, 2) # 输出维度为2(二分类)

    def forward(self, x):
    x = self.layer1(x)
    x = self.layer2(x)
    x = self.layer3(x)
    return x

    # 3. 分布式训练主函数
    def train(rank, world_size):
    # 初始化分布式环境 通过`dist.init_process_group`配置 NCCL 后端(GPU 推荐),并绑定进程到对应 GPU
    dist.init_process_group(
    backend='nccl', # GPU 推荐使用 nccl 后端,与 DataParallel 类似
    init_method='tcp://localhost:12355', # init_method 是必填参数(或通过环境变量指定),用于初始化分布式进程组,确保单机内的多个进程(每个进程对应一个 GPU)能够相互发现并建立通信,localhost表示只有本地一台机器
    rank=rank,
    world_size=world_size
    )
    torch.cuda.set_device(rank) # 绑定当前进程到对应的GPU

    # 4. 配置 FSDP 参数
    # 定义自动包装策略:当子模块参数数量超过 1e6 时进行分片
    auto_wrap_policy = size_based_auto_wrap_policy(min_num_params=1e6)

    # 初始化模型并应用 FSDP
    model = MainModel().cuda(rank)
    model = FSDP(
    model,
    auto_wrap_policy=auto_wrap_policy, # 自动递归包装子模块,避免手动指定 `wrap_module`,简化大模型配置 `size_based_auto_wrap_policy` 函数生成的对象,表示按参数大小进行分片,前面定义的是 >1e6 的子模块被分片
    cpu_offload=CPUOffload(offload_params=False), # False(默认值)表示仅将梯度卸载到 CPU;True 表示将梯度和参数都卸载到 CPU;前向传播时加载,反向传播后卸载
    backward_prefetch=BackwardPrefetch.BACKWARD_PRE, # 反向传播前预取参数,控制 反向传播预取
    sharding_strategy=FSDP.ShardingStrategy.FULL_SHARD, # 全分片策略(`FULL_SHARD`策略),即参数、梯度、优化器状态均分片(内存使用最小的策略);还可以使用 `SHARD_GRAD_OP`(平衡内存和性能)
    device_id=rank, # 当前设备 ID
    )

    # 5. 数据加载(分布式采样)
    dataset = DiyDataset()
    sampler = DistributedSampler(dataset, shuffle=True) # 确保各卡数据不重复,类似 DDP 的做法
    dataloader = DataLoader(
    dataset,
    batch_size=32,
    sampler=sampler,
    num_workers=2
    )

    # 6. 定义损失函数和优化器
    criterion = nn.CrossEntropyLoss().cuda(rank)
    optimizer = optim.Adam(model.parameters(), lr=1e-3)

    # 7. 训练循环
    model.train()
    for epoch in range(3):
    sampler.set_epoch(epoch) # 每个 epoch 打乱数据
    total_loss = 0.0
    for x, y in dataloader:
    x = x.cuda(rank)
    y = y.cuda(rank)

    optimizer.zero_grad() # 清零本地梯度分片
    outputs = model(x) # 前向传播(FSDP 自动聚合所需参数分片)
    loss = criterion(outputs, y) # 损失计算,loss 与 outputs 绑定,从而与模型绑定,故而可以使用 FSDP 的特性
    loss.backward() # 反向传播:触发梯度计算与分片梯度同步(核心行),类似操作可以参考 DataParallel 的实现方式
    optimizer.step() # 用同步后的梯度更新本地参数分片

    total_loss += loss.item()

    # 仅在主进程(rank=0)打印日志
    if rank == 0:
    avg_loss = total_loss / len(dataloader)
    print(f"Epoch {epoch+1}, Average Loss: {avg_loss:.4f}")

    # 清理分布式环境
    dist.destroy_process_group()

    # 8. 启动多进程训练
    def main():
    world_size = 2 # 使用 2 个 GPU,注意这里是单机模式,可以直接写死各种配置
    mp.spawn(train, args=(world_size,), nprocs=world_size, join=True)

    if __name__ == "__main__":
    main()
    • 反向传播预取说明:
      • FSDP 中,模型参数被分片存储在不同 GPU 上
      • 在反向传播时,计算某层梯度可能需要其他 GPU 上的参数分片(如计算梯度需要完整的参数信息)
      • backward_prefetch 决定了何时提前获取这些所需的参数分片,从而让数据传输(通信)与计算重叠进行,减少空闲时间
  • 运行上述单机多卡代码,正常使用 python 命令即可,不需要特殊命令启动:

    1
    python fsdp_example.py

FSDP 多机多卡使用示例

  • FSDP 多机多卡使用示例(其实也可以用多机多卡直接改成单机单卡形式)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    import torch
    import torch.nn as nn
    import torch.optim as optim
    from torch.utils.data import Dataset, DataLoader
    import torch.distributed as dist
    from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
    from torch.distributed.fsdp.wrap import size_based_auto_wrap_policy
    from torch.utils.data.distributed import DistributedSampler
    import torch.multiprocessing as mp
    import argparse

    # 数据集和模型定义(正常定义)
    class DiyDataset(Dataset):
    def __len__(self):
    return 10000
    def __getitem__(self, idx):
    return torch.randn(128), torch.randint(0, 10, (1,)).item()

    class SubModule(nn.Module):
    def __init__(self, in_dim, out_dim):
    super().__init__()
    self.fc = nn.Linear(in_dim, out_dim)
    self.bn = nn.BatchNorm1d(out_dim)
    self.relu = nn.ReLU()
    def forward(self, x):
    return self.relu(self.bn(self.fc(x)))

    class MainModel(nn.Module):
    def __init__(self):
    super().__init__()
    self.layer1 = SubModule(128, 256)
    self.layer2 = SubModule(256, 512)
    self.layer3 = SubModule(512, 1024)
    self.final = nn.Linear(1024, 10)
    def forward(self, x):
    x = self.layer1(x)
    x = self.layer2(x)
    x = self.layer3(x)
    return self.final(x)

    # 子进程函数(每个进程对应一个GPU)
    def train_fn(local_rank, args, world_size):
    rank = args.start_rank + local_rank # 全局进程编号 = 起始rank + 本地进程编号
    print(rank) # 输出全局进程编号
    # 初始化分布式进程组
    dist.init_process_group(
    backend="gloo", # GPU 时使用 `nccl`
    init_method=f"tcp://{args.master_addr}:{args.master_port}", # 多机多卡需要指定服务器地址,不能写死成 local
    world_size=world_size,
    rank=rank, # 全局进程编号 = 起始rank + 本地进程编号
    )

    # 绑定本地GPU
    torch.cuda.set_device(local_rank)
    device = torch.device("cuda", local_rank)

    # 初始化 FSDP 模型
    auto_wrap_policy = size_based_auto_wrap_policy(min_num_params=2e6)
    model = MainModel().to(device)

    model = FSDP( # 整体配置同单进程模式,为了简化,这里不写 cpu_offload 和 backward_prefetch 的配置
    model,
    auto_wrap_policy=auto_wrap_policy, # 自动递归包装子模块,避免手动指定 `wrap_module`,简化大模型配置
    sharding_strategy=FSDP.ShardingStrategy.FULL_SHARD,
    device_id=local_rank,
    )

    # 数据加载和训练逻辑(与之前类似)
    dataset = DiyDataset()
    sampler = DistributedSampler(dataset, world_size=world_size, rank=args.start_rank + local_rank) # 为不同 GPU 分发不同机器,分成 world_size 个并取 第 args.start_rank + rank 份
    dataloader = DataLoader(dataset, batch_size=64, sampler=sampler, num_workers=4)
    criterion = nn.CrossEntropyLoss().to(device)
    optimizer = optim.Adam(model.parameters(), lr=1e-4)

    model.train()
    for epoch in range(5):
    sampler.set_epoch(epoch)
    total_loss = 0.0
    for x, y in dataloader:
    x, y = x.to(device), y.to(device)
    optimizer.zero_grad()
    outputs = model(x)
    loss = criterion(outputs, y)
    loss.backward()
    optimizer.step()
    total_loss += loss.item()

    if args.start_rank + local_rank == 0: # 主进程打印日志
    print(f"Epoch {epoch+1}, Avg Loss: {total_loss/len(dataloader):.4f}")

    dist.destroy_process_group()

    def main():
    # 对于多机多卡,为了复用同一套代码(方便管理),使用传入参数的方式启动代码,方便传参实现不同 node 机器的启动
    parser = argparse.ArgumentParser()
    parser.add_argument("--master_addr", type=str, default="localhost", help="主节点IP")
    parser.add_argument("--master_port", type=str, default="12355", help="主节点端口")
    parser.add_argument("--world_size", type=int, required=True, help="总进程数")
    parser.add_argument("--start_rank", type=int, required=True, help="当前机器进程的起始全局rank")
    parser.add_argument("--num_gpus", type=int, default=torch.cuda.device_count(), help="当前机器的GPU数量")
    args = parser.parse_args()
    print(args)
    mp.spawn(
    train_fn,
    args=(args, args.world_size), # 传递给train_fn的参数
    nprocs=args.num_gpus, # 进程数=当前机器的GPU数,启动当前机器的所有进程(num_gpus个),每台机器启动自己的进程数即可
    join=True # 等待所有子进程完成
    )

    if __name__ == "__main__":
    main()

关于启动命令项的说明

  • 也可以通过 torchrun 启动上述多进程(而不使用 mp.spawn),此时 torchrun 会自动生成环境变量(注意:无需手动定义环境变量即可启动)
  • 更多 PyTorch 分布式程序启动详情可参考:/Notes/PyTorch/PyTorch——分布式程序启动方式汇总

附录:集成了 FSDP 分布式框架 HuggingFace Accelerate

  • HuggingFace Accelerate 是集成 了 FSDP 技术的工具,属于 accelerate 库
  • 两者辨析:FSDP 是一种分布式训练技术,Accelerate 是集成该技术的工具
    • FSDP 是由 PyTorch 官方推出的分布式训练策略,核心是通过将模型参数、梯度和优化器状态进行分片存储,显著降低单卡内存占用,支持训练超大规模模型(如千亿参数模型)
      • 属于“分布式训练方法”的范畴
    • HuggingFace Accelerate 是一个简化分布式训练的工具库,其核心功能是屏蔽不同硬件环境(单卡、多卡、GPU/TPU)和分布式策略(如数据并行、模型并行、FSDP 等)的底层细节,让用户用少量代码即可实现分布式训练
      • Accelerate 内部集成了 FSDP ,将其作为一种可选的分布式策略供用户使用
  • Accelerate 对 FSDP 起到封装和简化作用 :使用原生 PyTorch 的 FSDP 时,需要手动编写较多分布式配置代码(如初始化进程组、包装模型、设置通信策略等)
    • Accelerate 通过抽象接口,将 FSDP 的使用简化:
    • 用户只需通过 Accelerator 类,并在配置中指定使用 FSDP,即可自动完成 FSDP 的初始化和模型包装
    • 例如,通过 accelerate launch 命令启动训练时,Accelerate 会根据配置自动选择包括 FSDP 在内的最佳分布式策略,无需用户手动调用 torch.distributed 或 torch.distributed.fsdp 的底层 API
  • Accelerate 则不局限于 FSDP,还支持数据并行、DeepSpeed 等其他策略,其价值在于统一接口 ,让用户在不同分布式策略之间无缝切换,无需大幅修改代码
  • HuggingFace Accelerate 的使用见:PyTorch——Accelerate使用总结

附录:FSDP 和 FSDP2 的区别

  • FSDP2 是 FSDP 的全新版本
  • 架构上:
    • FSDP1 采用 FlatParameter 设计,将一组张量展平、连接并分块,使得每个设备上的数据推理和重新分片变得复杂
    • FSDP2 使用 DTensor 基础架构,通过在 dim-0 上对每个参数进行分片,每个参数在数据并行工作器之间按 dim-0 进行分块,提供了更简单的分片表示,也使得对单个参数的操作更便捷,还实现了无需通信的分片状态字典
  • 内存管理上:
    • FSDP1 使用 recordStream 机制,导致 GPU 内存使用不够优化且非确定性,有时还需要 CPU 同步
      • recordStream 机制是一种用于通过 CUDA 流(Stream)记录和同步张量生命周期 的内存管理技术,主要用于优化 GPU 显存使用和计算效率,但也存在一定局限性
      • recordStream 跟踪张量在 CUDA 流中的使用时机,确保张量在计算完成后再被释放或重用于其他操作,避免因显存提前回收导致的计算错误
      • FSDP V1 在训练过程中需要频繁在不同设备(或分片)间迁移参数(如 Forward 时 AllGather 完整参数、Backward 后释放显存)
    • FSDP2 借助 DTensor,实现了更低且确定性的 GPU 内存使用,避免了 recordStream 机制,无需 CPU 同步,能更有效地管理内存
  • API设计:上有所增减
  • 在实际测试中,FSDP2 相比 FSDP1实现了更高的MFU(模型浮点利用率),峰值内存降低了 7%,且能保持相同的损失曲线(Llama-7B 模型在 8×H100)
  • FSDP2 不直接支持完整的状态字典,用户需要使用 DTensor 的 API 或更高层次的 API 将包含 DTensor 的分片状态字典重新分片为完整状态字典,而 FSDP1 支持完整的状态字典
    • FSDP1 使用 FlatParameter 将多个小参数张量展平并拼接成一个大张量,然后对这个大张量进行分片
      • 在这种情况下,FSDP1 可以通过 state_dict_config 配置来保存和加载完整的状态字典,其中完整的状态字典中的值是未分片的 PyTorch 张量
    • FSDP2 采用了基于 DTensor 的逐参数维度分片方式,每个参数张量直接按维度进行分片,其参数表示直接与分片状态字典匹配
      • 在 FSDP2 中,调用 model.state_dict() 返回的是一个分片状态字典,且不涉及任何计算或通信,FSDP2 也只支持加载分片状态字典
      • 如果需要完整的状态字典,用户需要使用 DTensor 的相关 API,如 dtensor.full_tensor(),或使用更高层次的 API,如 PyTorch 分布式检查点的分布式状态字典 API,在 FSDP2 之外进行转换

PyTorch——分布式程序启动方式汇总


整体介绍

  • torch.distributed.dist.init_process_group 是 PyTorch 分布式 包中用于初始化进程组的核心函数,它在分布式训练中负责协调多个进程之间的通信
  • 本文重点讲解 torch.distributed.dist.init_process_group 函数的使用

dist.init_process_group 函数使用注意事项

  • 必须在所有进程中调用该函数,且参数需保持一致(除 rank 外)
  • 初始化后需调用 dist.destroy_process_group() 进行清理,否则在复杂的程序中,容易出现资源泄露问题
  • 实际使用中推荐通过 torch.distributed.launch 或 torchrun 工具启动,他们会自动设置环境变量
    • 使用 torch.multiprocessing.spawn 启动则需要自己管理参数或环境变量
  • 不同后端有不同的适用场景:GPU 集群优先用 nccl,CPU 集群用 gloo(Mac 上实验使用 gloo)
  • 确保所有进程能够访问到 init_method 指定的地址或文件
  • 初始化时的 IP 问题,有两个特点:
    • MASTER_ADDR 必须是 rank=0 的机器所在的 IP(torchrun --node_rank=0的机器所在的 IP),该进程负责作为 master 完成交流和初始化操作
    • 在 dist.init_process_group 执行后所有进程的地位是等价的,都可以作为 master (虚拟的含义:处理主要事务)
      • 比如,dist.broadcast(tensor, src) 中的 src 可以被指定为任意值
      • 亲测,在单机启动后,想用哪个进程作为 master(处理主要事务)都可以
    • 一般建议使用 rank=0 的进程作为 master 处理主要事务

函数原型及其参数讲解

  • dist.init_process_group 函数原型
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    torch.distributed.init_process_group(
    backend: Optional[str] = None,
    init_method: Optional[str] = None,
    timeout: Optional[timedelta] = None,
    world_size: int = -1,
    rank: int = -1,
    store: Optional[Store] = None,
    group_name: str = "",
    pg_options: Optional[Any] = None,
    device_id: Optional[torch.device] = None,
    )

dist.init_process_group 主要参数说明

  • 1)backend(必填)

    • 指定通信后端,决定了进程间通信使用的底层协议
    • 可选值:'nccl'(推荐 GPU 通信)、'gloo'(CPU 和 GPU 均可)、'mpi'(需 MPI 库支持)
    • 注意:'nccl' 仅支持 GPU 且性能最优,'gloo' 对 CPU 支持更好
  • 2)init_method(可选)

    • 指定进程组的初始化方式,常用方式如下面
    • 'env://'(推荐):从环境变量读取配置(需设置 MASTER_ADDR、MASTER_PORT 等)
    • 'tcp://ip:port':指定主节点的 IP 和端口
    • 'file:///path':通过共享文件系统初始化(需所有进程可访问该路径)
    • init_method 参数的默认值为 None,当使用默认值时,其行为取决于是否设置了环境变量 TORCH_DISTRIBUTED_INIT_METHOD:
      • 如果设置了 TORCH_DISTRIBUTED_INIT_METHOD 环境变量
        • 函数会自动使用该环境变量的值作为初始化方法(等价于显式传入 init_method=环境变量值)
        • 例如,若环境变量设置为 tcp://127.0.0.1:23456,则会以该 TCP 地址进行初始化
      • 如果未设置 TORCH_DISTRIBUTED_INIT_METHOD 环境变量
        • 此时会触发 默认初始化逻辑 ,函数会尝试从环境变量中读取分布式配置(等价于 init_method='env://')
        • 此时要求必须设置以下环境变量才能正常初始化:
          • MASTER_ADDR:主节点的 IP 地址
          • MASTER_PORT:主节点的端口号(需所有进程可访问)
          • WORLD_SIZE:总进程数(可选,部分启动工具会自动设置)
          • RANK:当前进程的全局编号(0 为主进程,可选,部分启动工具会自动设置)
        • 如果这些环境变量未正确设置,会抛出类似 RuntimeError: Expected env:// init method but no MASTER_ADDR or MASTER_PORT found 的错误
          • 但无需担心:使用 torchrun(或 python -m torch.distributed.launch)启动时,若不指定 --master_addr 和 --master_port,则 torchrun 会默认 环境变量设置为 MASTER_ADDR:MASTER_PORT=127.0.0.1:29500
  • 3)world_size(可选)

    • 总进程数,即参与分布式训练的进程总数
    • 使用 'env://' 时可由环境变量 WORLD_SIZE 指定
  • 4)rank(可选)

    • 当前进程的编号(0 到 world_size-1),0 通常为主进程
    • 使用 'env://' 时可由环境变量 RANK 指定
  • 5)timeout

    • 通信超时时间,默认为 30 分钟(1800 秒)
    • 对于耗时较长的操作可适当增大
  • 6)store

    • 与 init_method 参数互斥,不指明 store 时,由 init_method 参数传入的方式决定初始化什么类型的 Store
    • init_method
    • 详情见附录
  • 7)pg_options

    • pg_options 参数用于配置进程组的特定选项,它是一个可选参数,允许用户为不同的后端指定特定的配置选项
    • 该参数的类型通常是Optional[Any],具体的选项内容取决于所使用的通信后端,主要用于:
      • 配置后端特定的优化参数
      • 设置通信超时时间
      • 调整内存使用策略
      • 配置网络相关参数等
    • 举例:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      import torch.distributed as dist

      # 示例:为NCCL后端配置特定选项
      pg_options = {
      'timeout': 1800, # 30分钟超时
      'init_method_timeout': 300, # 初始化超时
      }

      dist.init_process_group(
      backend="nccl",
      world_size=4,
      rank=0,
      pg_options=pg_options
      )
  • 8)device_id

    • device_id 参数用于将进程”绑定”到单个特定设备,从而实现后端特定优化,是一个 torch.device 类型的可选参数,主要用于 GPU 训练场景
    • NCCL 后端的特殊效果:在 NCCL 后端下,device_id 参数有两个重要影响
      • 1)立即形成通信器 :通信器会立即形成(直接调用ncclCommInit*而非延迟初始化)
      • 2)内存占用优化 :每个进程会在指定的GPU上占用显存,而不是在第一个可访问的GPU上[citation:7]
    • 示例:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      import torch
      import torch.distributed as dist

      # 指定当前进程使用的GPU设备
      device_id = torch.device(f"cuda:{local_rank}")

      dist.init_process_group(
      backend="nccl",
      world_size=world_size,
      rank=rank,
      device_id=device_id
      )

示例:单节点多进程(使用 torch.multiprocessing)

  • 下面的代码启动单节点时,无需配置环境变量,也不需要特殊的启动命令,使用 python 命令即可
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    import torch
    import torch.distributed as dist
    import torch.multiprocessing as mp

    def init_process(rank, world_size):
    # 设置环境变量
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # 初始化进程组
    dist.init_process_group(
    backend='nccl', # 使用NCCL后端(GPU)
    init_method='env://',
    world_size=world_size,
    rank=rank
    )

    # 后续分布式操作...
    print(f"Process {rank} initialized")

    # 销毁进程组
    dist.destroy_process_group()

    if __name__ == "__main__":
    world_size = 4 # 4个进程
    mp.spawn(init_process, args=(world_size,), nprocs=world_size, join=True)

示例:多节点分布式(通过环境变量配置)

  • 在每个节点上运行的脚本中:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    import torch.distributed as dist
    import os

    # 环境变量通常由分布式启动工具设置
    # 如 torch.distributed.launch 或 torchrun
    dist.init_process_group(backend='nccl')

    # 可通过以下方式获取当前进程信息
    rank = dist.get_rank()
    world_size = dist.get_world_size()
    local_rank = int(os.environ.get('LOCAL_RANK', 0)) # 节点内的进程编号

    print(f"Rank {rank}/{world_size}, Local rank {local_rank}")
  • 上面的代码运行多节点时需要提前指定环境变量(不同机器不同),或通过 torchrun 等命令启动


使用时的一些常规规范写法

  • 分布式启动后,常用到下面三个参数:

    1
    2
    3
    rank: 当前进程在的全局进程编号
    word_size: 分布式系统总进程数
    local_rank: 当前进程在当前节点上的本地进程编号
  • (标准用法)当使用 torchrun 命令(或 torch.distributed.launch 命令时):

    • 可通过环境变量获取这三个参数(此时是默认设置的)

      1
      2
      3
      rank = int(os.environ["RANK"])
      world_size = int(os.environ["WORLD_SIZE"])
      local_rank = int(os.environ["LOCAL_RANK"])
    • 此时启动环境,可以仅指定后端即可

      1
      dist.init_process_group(backend='nccl')
    • 启动命令需要在不同的机器上执行以下,且在命令中传入当前机器对应的参数(自动转化成环境变量),比如:

      1
      2
      3
      4
      5
      # 机器1
      python -m torch.distributed.launch --nproc_per_node=4 --nnodes=2 --node_rank=0 --master_addr="localhost" --master_port=29500 DDP_demo.py

      # 机器2
      python -m torch.distributed.launch --nproc_per_node=4 --nnodes=2 --node_rank=1 --master_addr="localhost" --master_port=29500 DDP_demo.py
      • 易错点,要非常小心:
        • python -m torch.distributed.launch(或 torchrun) 的启动参数必须在脚本名 DDP_demo.py 之前
        • DDP_demo.py 之后的参数都是传递给 DDP_demo.py 的,不再是 python -m torch.distributed.launch(或 torchrun)的参数
        • 不建议使用环境变量的方式定义 --master_port 等参数(包括 MASTER_PORT=12368 torchrun xx.py 等方式也不建议)
          • 因为 torchrun 会自动覆盖环境变量 MASTER_PORT(即使没有显示传入 --master_port 参数也会用默认值 27500 覆盖)
  • (使用较少)当使用 torch.multiprocessing.spawn 启动多进程时,需要自己主动管理环境变量,或通过参数传入

  • 无论使用哪种方式使用,均可使用下面的代码获取参数

    1
    2
    3
    rank = dist.get_rank() 
    world_size = dist.get_world_size()
    local_rank = int(os.environ["LOCAL_RANK"]) # 若未配置则需要使用参数显示传入

通过 torch.multiprocessing.spawn 启动完整示例

  • 在代码里面使用 torch.multiprocessing.spawn 函数启动多进程(不使用任何环境变量,整体管理较为复杂)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    import torch.distributed as dist
    import torch.multiprocessing as mp

    def train_fn(local_rank, args, world_size):
    rank = args.start_rank + local_rank # 全局进程编号 = 起始rank + 本地进程编号
    print(rank) # 输出全局进程编号
    # 初始化分布式进程组
    dist.init_process_group(
    backend="nccl",
    init_method=f"tcp://{args.master_addr}:{args.master_port}", # 多机多卡需要指定服务器地址,不能写死成 local
    world_size=world_size,
    rank=rank, # 全局进程编号 = 起始rank + 本地进程编号
    )
    # 绑定本地GPU
    torch.cuda.set_device(local_rank)
    device = torch.device("cuda", local_rank)
    # ...

    # 对于多机多卡,为了复用同一套代码(方便管理),使用传入参数的方式启动代码,方便传参实现不同 node 机器的启动
    parser = argparse.ArgumentParser()
    parser.add_argument("--master_addr", type=str, default="localhost", help="主节点IP")
    parser.add_argument("--master_port", type=str, default="12355", help="主节点端口")
    parser.add_argument("--world_size", type=int, required=True, help="总进程数")
    parser.add_argument("--start_rank", type=int, required=True, help="当前机器进程的起始全局rank")
    parser.add_argument("--num_gpus", type=int, default=torch.cuda.device_count(), help="当前机器的GPU数量")
    args = parser.parse_args()
    print(args)
    # 启动当前机器的所有进程(num_gpus个),每台机器启动自己的进程数即可
    mp.spawn(
    train_fn,
    args=(args, args.world_size), # 传递给train_fn的参数
    nprocs=args.num_gpus, # 进程数=当前机器的GPU数
    join=True # 等待所有子进程完成
    )
  • 此时启动命令可以仅使用普通的 python 命令(后面的参数可选)

    1
    python demo.py --xxx xx
    • 每个节点通过传入不同参数即可指定分布式进程数量,允许不同节点不同数量(start_rank 数和启动时的 num_gpus 参数控制)

通过 torch.distributed.launch 命令启动

  • 假设脚本名为ddp_demo.py

  • 单机4卡,使用4个GPU启动:

    1
    python -m torch.distributed.launch --nproc_per_node=4 ddp_demo.py
    • --nproc_per_node:指定单机的GPU数量
  • 多机多卡,2个机器,各有4个GPU启动:

    1
    2
    3
    4
    5
    # 机器1
    python -m torch.distributed.launch --nproc_per_node=4 --nnodes=2 --node_rank=1 --master_addr="localhost" --master_port=29500 DDP_demo.py

    # 机器2
    python -m torch.distributed.launch --nproc_per_node=4 --nnodes=2 --node_rank=0 --master_addr="localhost" --master_port=29500 DDP_demo.py
    • --nnodes:总节点数
    • --node_rank:当前节点编号
    • --master_addr:指定master节点的IP地址,默认值是”localhost”,但是显示给出来更明确
    • --master_port:指定master节点的端口号,默认值是 29500,但是显示给出来更明确

通过 torchrun 命令启动

  • 以下结论是在 Mac 系统下得到的,暂时没有在 Linux 上尝试

  • torchrun 和 torch.distributed.launch 都是 PyTorch 中用于启动分布式训练的工具

  • torchrun 是 PyTorch 1.10 后推出的新一代工具,旨在替代 torch.distributed.launch

  • 两者的核心区别:

    特性 torch.distributed.launch torchrun
    推出时间 较早版本(已逐步废弃) PyTorch 1.10+ 推出(推荐使用)
    进程管理 依赖用户手动管理进程(如指定 --node_rank 等) 自动管理进程,支持弹性训练(节点故障后自动恢复)
    配置方式 大部分参数需通过命令行传入 支持从环境变量、命令行、配置文件读取参数
    容错能力 无弹性训练支持,进程崩溃后需手动重启 支持弹性训练(--max_restarts 等参数)
    日志管理 日志输出较为基础 提供更规范的日志管理,区分不同进程的输出
  • torchrun 启动单机多卡(与 torch.distributed.launch 相同):

    1
    2
    3
    4
    5
    # 回顾 `torch.distributed.launch` 的启动方式
    python -m torch.distributed.launch --nproc_per_node=4 ddp_demo.py

    # torchrun 的启动方式:替换 `python -m torch.distributed.launch` 为 `torchrun` 即可
    torchrun --nproc_per_node=4 ddp_demo.py
  • torchrun 启动多机多卡(相对 torch.distributed.launch而言,torchrun 会自动推断部分参数(如节点数、进程数))

    • 回顾 torch.distributed.launch 需要手动指定总节点数(--nnodes)和当前节点序号(--node_rank)

      1
      2
      3
      4
      5
      # 机器1
      python -m torch.distributed.launch --nproc_per_node=4 --nnodes=2 --node_rank=0 --master_addr="localhost" --master_port=29500 DDP_demo.py

      # 机器2
      python -m torch.distributed.launch --nproc_per_node=4 --nnodes=2 --node_rank=1 --master_addr="localhost" --master_port=29500 DDP_demo.py
    • (待确认:Mac 系统下失败)torchrun 无需手动指定 node_rank,只需在所有节点上指定相同的 --rdzv_id(任务ID)和 --rdzv_endpoint(主节点地址):

      1
      2
      # 所有节点统一执行相同命令(不需要区分不同节点使用不同命令,会自动分配node_rank)
      torchrun --nproc_per_node=4 --nnodes=2 --rdzv_id=123 --rdzv_backend=c10d --rdzv_endpoint="localhost:29500" DDP_demo.py
      • --rdzv_id:任务唯一ID(任意整数)
      • rdzv_backend:后端(默认c10d)
      • rdzv_endpoint:主节点IP:端口
      • torchrun还可以用 --max_restarts 等参数指定最大重启次数
    • (Mac 系统下成功)在 Mac 系统下,上面的命令执行会出现问题(上面命令在两个窗口分别打开)

      1
      2
      torchrun --nproc_per_node=4 --nnodes=2 --node_rank=1 --master_addr="localhost" --master_port=29500 DDP_test.py
      torchrun --nproc_per_node=4 --nnodes=2 --node_rank=0 --master_addr="localhost" --master_port=29500 DDP_test.py
      • 亲测上面的代码可以成功
  • 特别地,部分介绍文档会说 torch.distributed.launch 和 torchrun 对代码的使用有一些不同,主要是初始化不同:

    • 初始化方式有下面两种:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      # 方式一:
      rank = int(os.environ["RANK"])
      world_size = int(os.environ["WORLD_SIZE"])
      local_rank = int(os.environ["LOCAL_RANK"])
      dist.init_process_group(backend='gloo', world_size=world_size, rank=rank)

      # 方式二:
      dist.init_process_group(backend='gloo')
      rank = dist.get_rank()
      world_size = dist.get_world_size()
      local_rank = int(os.environ["LOCAL_RANK"])
    • 特别说明:部分文档介绍说 torchrun 只能用方式二, torch.distributed.launch 只能用方式一

    • 亲测:

      • torch.distributed.launch 可用方式一和方式二初始化两种方式均可用,具体使用命令为:

        1
        2
        3
        4
        5
        # 机器1
        python -m torch.distributed.launch --nproc_per_node=4 --nnodes=2 --node_rank=0 --master_addr="localhost" --master_port=29500 DDP_demo.py

        # 机器2
        python -m torch.distributed.launch --nproc_per_node=4 --nnodes=2 --node_rank=1 --master_addr="localhost" --master_port=29500 DDP_demo.py
      • torchrun 可用方式二定义,在使用下面的命令时,方式一方式二均可:

        1
        torchrun --nproc_per_node=4 --nnodes=2 --node_rank=0 --master_addr="localhost" --master_port=29500 DDP_test.py
  • 使用建议:

    • 优先使用 torchrun :它是 PyTorch 官方推荐的新一代工具,简化了分布式配置,支持弹性训练,且兼容未来的功能更新
    • 避免使用 torch.distributed.launch :该工具已逐步被废弃,不再添加新功能,仅为兼容性保留

附录:python -m torch.distributed.launch 命令具体在做什么?

  • 本节讲述 python -m torch.distributed.launch,实际上 torchrun 是 python -m torch.distributed.launch 的升级版本,做的事情差不多,但还多了些功能

启动多个进程

  • 根据 --nproc_per_node 参数指定的数量,为每个 GPU 启动一个独立的进程

    • 例如,如果 --nproc_per_node=4,则会在当前节点启动 4 个进程,每个进程绑定到一个 GPU 上
  • 每个进程会独立执行训练脚本(如 train.py),并通过 dist.init_process_group 初始化分布式环境

    • dist.init_process_group 会让当前进程于其他进程简历通信
  • 代码中,通过下面的命令执行即可使用进程自己的 GPU(实现 GPU 的分配):

    1
    2
    3
    4
    import os
    import torch
    local_rank = int(os.environ["LOCAL_RANK"])
    torch.cuda.set_device(local_rank)
  • 注:这也是在使用普通的 python 命令启动时,需要在代码里自己手动启动多进程的原因

设置环境变量(分布式环境所需的)

  • python -m torch.distributed.launch 命令会设置分布式训练所需的环境变量(如 WORLD_SIZE、RANK、LOCAL_RANK、MASTER_ADDR、MASTER_PORT 等)
  • 每个进程的环境变量不同,RANK、LOCAL_RANK 等环境变量均是有差异的

代码内部在做什么?(非启动命令的工作)

  • 代码内部通过 dist.init_process_group 初始化分布式通信后端(如 nccl 或 gloo),确保所有进程能够协同工作
    • backend:通信后端(如 nccl 用于 GPU,gloo 用于 CPU)
    • init_method:初始化方法(如 tcp:// 指定 master 地址和端口)
  • 代码示例 :
    1
    dist.init_process_group(backend='nccl', init_method='env://')

附录:init_process_group 的 store 参数详细用法说明

  • 在 torch.distributed.init_process_group() 函数中,store 参数是一个可选参数,用于指定分布式进程间通信所使用的键值存储后端
  • 该参数允许用户显式创建和配置存储实例,而不是依赖默认的自动创建机制
  • store 参数的作用:进程间协调,store 参数指定的存储系统用于:
    • 存储分布式训练过程中的元数据
    • 协调各个进程的初始化过程
    • 实现进程间的同步和通信
    • 管理进程组的状态信息

支持的 Store 类型

  • PyTorch 分布式包支持三种主要的键值存储类型,这些 Store 的核心功能一致(同步元数据),但实现方式不同,选择时需根据分布式环境的网络、存储和调度方式决定
  • 下面是最常见的三种 Store 的使用示例
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    import torch.distributed as dist

    # 第一类:TCPStore
    # # server store 持有数据,client store 可以连接到 server store(by TCP)访问数据,
    store = dist.TCPStore(
    host_name="localhost", # 服务器(master)地址,所有参与分布式训练的进程必须能通过此地址访问到 master
    port=12345, # 需确保该端口在主机上未被占用,且所有进程可访问此端口
    world_size=4, # 指明 store users 数,默认为 None(表示没有固定的 store users 数量),注:一般与分布式环境的 world_size保持一致
    is_master=True, # 是否为master节点,只有主进程会启动 TCP 服务器,其他进程(非主进程)会作为客户端连接到主进程
    timeout=300, # 超时时间(秒)
    wait_for_workers=True, # (bool, optional), 默认为 True,是否等待所有 workers 连接到 store 服务器,只有当 world_size 为固定值时生效
    )
    # # 第二类:FileStore
    # store = dist.FileStore("/tmp/distributed_store", world_size=4)

    # # 第三类:HashStore(通常用于单机多进程)
    # store = dist.HashStore()

    # 使用自定义store初始化进程组
    dist.init_process_group(
    backend="nccl",
    world_size=4,
    rank=0,
    store=store
    )

使用 init_method 指定初始化 Store 类型

  • 在 PyTorch 的 dist.init_process_group 中,不指明 store 参数时,init_method 参数决定了分布式进程初始化时使用的 Store 类型
  • 不同的 init_method 对应不同的 Store 实现,用于在进程间同步元数据(如进程编号、通信地址等)
  • 不同初始化方法如下:
    • init_method='tcp://master_ip:port':(对应 TCPStore)
      • 基于 TCP 协议的集中式 Store,需要指定一个主节点(master)的 IP 和端口
      • 主进程会在指定地址创建 TCP 监听,其他进程通过该地址连接主进程,完成元数据交换
      • 适用于大多数分布式场景(单机多卡、多机多卡),无需依赖外部服务
    • init_method='file:///path/to/shared_file'(对应 FileStore)
      • 基于共享文件系统的 Store,所有进程通过读写同一个共享文件同步元数据
      • 要求所有进程可访问同一个共享文件系统(如 NFS、本地文件系统,单机多卡场景常用)
      • 无需网络通信,但依赖文件系统的可靠性和性能
    • init_method='env://'(由环境变量指定的 Store(通常是 TCPStore))
      • 不直接指定 Store 类型,而是通过环境变量(如 MASTER_ADDR、MASTER_PORT、WORLD_SIZE、RANK 等)配置初始化信息
      • 本质上仍会创建 TCPStore,但参数由环境变量而非函数参数传入
      • 常用于容器化环境(如 Kubernetes)或需要动态配置的场景
    • 第三方分布式框架集成(如 Slurm、MPI)(对应 SlurmStore、MPIStore 等(取决于框架))
      • 当使用 Slurm 或 MPI 启动分布式任务时,PyTorch 可自动检测并使用对应框架的 Store
      • 例如,init_method='slurm://' 会使用 SlurmStore,通过 Slurm 的环境变量和接口同步元数据,无需手动指定主节点
  • 总结核心差异总结
    init_method 类型 Store 实现 依赖条件 适用场景
    tcp://master_ip:port TCPStore 网络连通性 通用分布式场景(单机/多机)
    file:///path FileStore 共享文件系统 单机多卡或共享存储的多机场景
    env:// 通常为 TCPStore 环境变量配置 容器化、动态配置场景
    第三方框架(如 Slurm) 框架专属 Store 对应调度框架环境 集群管理系统(Slurm/MPI)

使用 PrefixStore 进行多进程组管理

  • 使用 PrefixStore 可以为不同的进程组创建隔离的命名空间 :
    1
    2
    3
    4
    5
    6
    7
    8
    9
    # 创建基础store
    base_store = dist.TCPStore("localhost", 12345, world_size, is_master=True)

    # 为不同进程组创建前缀store
    pg1_store = dist.PrefixStore("group1_", base_store)
    pg2_store = dist.PrefixStore("group2_", base_store)

    # 使用不同的store初始化不同的进程组
    dist.init_process_group(backend="nccl", store=pg1_store, ...)

注意事项

  • 当同时指定了环境变量和 store 参数时,store 参数会优先使用:

    1
    2
    3
    4
    5
    6
    # 即使设置了环境变量,也会使用显式指定的store
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    custom_store = dist.TCPStore("192.168.1.100", 29500, world_size, is_master)
    dist.init_process_group(backend="nccl", store=custom_store) # 使用custom_store
  • 和 init_method 参数是互斥的,只能指定其中一个:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    # 方法1:使用init_method
    dist.init_process_group(
    backend="nccl",
    init_method="tcp://192.168.1.100:29500"
    )

    # 方法2:使用store参数
    store = dist.TCPStore("192.168.1.100", 29500, world_size, is_master)
    dist.init_process_group(
    backend="nccl",
    store=store
    )
  • 可根据环境动态配置不同 store

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    def create_store(backend, world_size, rank):
    if backend == "nccl":
    # GPU训练使用TCPStore
    return dist.TCPStore(
    host_name=os.environ.get('MASTER_ADDR', 'localhost'),
    port=int(os.environ.get('MASTER_PORT', '29500')),
    world_size=world_size,
    is_master=(rank == 0)
    )
    else:
    # CPU训练可以使用FileStore
    return dist.FileStore("/tmp/dist_store", world_size)

附录:init_process_group 的 device_id 参数详细用法说明

  • device_id 参数用于将进程”绑定”到单个特定设备,从而实现后端特定优化,是一个 torch.device 类型的可选参数,主要用于 GPU 训练场景

  • device_id 参数在 NCCL 后端下有特殊效果:在 NCCL 后端下,device_id 参数有两个重要影响

    • 1)立即形成通信器 :通信器会立即形成(直接调用ncclCommInit*而非延迟初始化)
    • 2)内存占用优化 :每个进程会在指定的 GPU 上占用显存,而不是在第一个可访问的 GPU 上
    • If you want to know NCCL initialization error early, you can also use this field
  • 使用 gloo 后端时,不需要设置 device_id

  • 基本用法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    import torch
    import torch.distributed as dist

    # 指定当前进程使用的GPU设备
    device_id = torch.device(f"cuda:{local_rank}")

    dist.init_process_group(
    backend="nccl",
    world_size=world_size,
    rank=rank,
    device_id=device_id # 不指定 device_id 默认会在可访问的第一个 GPU 上占用内存?
    )
  • 多 GPU 训练场景

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    import torch
    import torch.distributed as dist
    import os

    def setup_distributed():
    # 获取本地rank
    local_rank = int(os.environ.get("LOCAL_RANK", 0))
    world_size = int(os.environ.get("WORLD_SIZE", 1))
    rank = int(os.environ.get("RANK", 0))

    # 设置当前进程的GPU设备
    torch.cuda.set_device(local_rank) # 根据不同 rank 获取不同的设备
    device_id = torch.device(f"cuda:{local_rank}")

    # 初始化进程组,绑定到特定GPU
    dist.init_process_group(
    backend="nccl",
    world_size=world_size,
    rank=rank,
    device_id=device_id # 绑定到特定设备
    )

    return device_id

附录:init_process_group 中 NCCL 的延迟初始化

  • NCCL 是 NVIDIA 集合通信库(NVIDIA Collective Communications Library)

    • 提供多 GPU / 多节点通信原语(如 all-reduce、broadcast、all-gather、reduce-scatter 等)
    • 针对 PCIe/NVLink 与 NVIDIA 网络优化,用于加速深度学习分布式训练
    • 开源地址:github.com/NVIDIA/nccl
  • 延迟初始化(Lazy Initialization)是指 NCCL 通信器不在 init_process_group() 调用时立即创建,而是推迟到 第一次实际需要进行集合通信操作时才创建

    • 注:若 init_process_group() 函数指定了 device_id 参数,则 NCCL 会立即初始化到当前设备上
  • 立即初始化(指定 device_id 时)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    device_id = torch.device(f"cuda:{local_rank}")
    dist.init_process_group(backend="nccl", device_id=device_id)

    # 在这一行执行时,NCCL立即:
    # 1. 调用 ncclCommInit* 系列函数
    # 2. 创建通信器对象
    # 3. 分配通信缓冲区
    # 4. 建立进程间的通信通道
    # 5. 进行通信拓扑优化
  • 延迟初始化(不指定 device_id 时)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    dist.init_process_group(backend="nccl")  # 没有device_id

    # 在这一行执行时,NCCL只是:
    # 1. 记录进程组的元信息
    # 2. 设置必要的环境变量
    # 3. 但不创建实际的通信器

    # 真正的初始化发生在第一次集合通信时:
    tensor = torch.randn(10).cuda()
    dist.all_reduce(tensor) # 在这里才真正初始化 NCCL 通信器!自动识别需要绑定的 GPU 并初始化,这里会很慢,因为要初始化 NCCL

    dist.all_reduce(tensor) # 这里就很快了,这次不需要初始化 NCCL

    # # 注:以下操作都会触发NCCL初始化:
    # dist.all_reduce(tensor) # 全规约
    # dist.all_gather([tensor]) # 全收集
    # dist.reduce(tensor, dst=0) # 规约
    # dist.broadcast(tensor, src=0) # 广播
    # dist.all_to_all([tensor], [tensor]) # 全到全
    • 延迟初始化可以允许灵活、动态、自动地选择需要的 GPU,但可能会造成一些误解
    • 注意:一旦 NCCL 初始化以后,就绑定了 GPU 了,再切换 GPU 可能会出现错误
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      def multi_device_lazy_init_issues():
      dist.init_process_group(backend="nccl")

      # 问题场景:进程需要使用多个GPU
      tensor_gpu0 = torch.randn(10).cuda(0)
      tensor_gpu1 = torch.randn(10).cuda(1)

      # 第一次通信在GPU 0上
      dist.all_reduce(tensor_gpu0) # NCCL在GPU 0上初始化

      # 尝试在GPU 1上通信
      try:
      dist.all_reduce(tensor_gpu1) # 可能失败或性能差
      except Exception as e:
      print(f"Multi-device issue: {e}")
  • 目前尚无代码可直接检测 NCCL 通信器是否已创建,只能通过第一次通信时间来大致判断

  • 生产环境推荐立即初始化,方便代码阅读;开发时可以使用延迟初始化以获得更快的启动

PyTorch——分布式编程之子通信组


整体介绍

  • 子通信组允许在分布式环境中灵活地划分进程,实现更精细的通信控制
  • torch.distributed.new_group 是 PyTorch 分布式训练中用于创建子通信组的函数
  • 使用场景包括:
    • 部分进程通信:当需要让部分进程单独通信(如模型并行中不同层的参数同步)
    • 灵活分组:动态划分进程组,适应复杂的分布式策略(如混合数据并行+模型并行)
  • 使用步骤包括:
    • 1)初始化全局进程组:先通过 init_process_group 初始化全局通信环境
    • 2)创建子组:调用 new_group 划分进程
    • 3)子组内通信:使用返回的 ProcessGroup 对象进行通信操作
  • 子通信组使用的核心注意事项
    • 进程一致性:所有进程必须调用 new_group ,即使不加入子组(此时可传入 ranks 不包含自身,或后续不使用返回的组对象)
    • 后端兼容性:子组的 backend 需与全局后端兼容(如 GPU 通信推荐 nccl)

new_group 函数定义

  • new_group 函数形式说明:

    1
    2
    3
    4
    5
    torch.distributed.new_group(
    ranks=None, # 参与新组的进程编号列表
    timeout=datetime.timedelta(seconds=1800), # 超时时间
    backend=None # 通信后端,默认为全局后端
    )
  • new_group 核心参数说明如下文

    • ranks(可选,列表/元组):
      • 指定加入新组的进程编号(全局进程编号,非局部编号)
      • 若为 None,则默认包含所有进程(等价于全局组)
      • 例如:ranks=[0,1,2] 表示仅 0、1、2 号进程加入新组
    • timeout**(可选,datetime.timedelta):
      • 组内通信的超时时间,超时未完成会抛出异常
      • 默认为 30 分钟(1800 秒)
    • backend**(可选,字符串):
      • 指定该组使用的通信后端(如 nccl、gloo 等)
      • 若为 None,则继承全局初始化的后端(init_process_group 中指定的 backend)
  • new_group 返回值

    • 返回一个 ProcessGroup 对象 ,用于后续子组内的通信操作(如 allreduce、broadcast 等)

附录:为什么所有进程都要调用子通信组初始化函数

  • 在 PyTorch 分布式的最佳实践中,即使不加入子进程组的 rank(如例子中的 rank=3),也必须调用 dist.new_group(ranks=[0, 1], ...)
    • 这是由分布式通信的一致性要求决定的
  • 必须调用的核心原因是:避免死锁
    • PyTorch 分布式通信的底层实现要求所有进程必须参与子组的创建过程 ,无论是否加入该子组
    • 若部分进程调用 new_group 而其他进程不调用,会导致进程间同步失衡,触发分布式死锁(所有进程会阻塞等待未调用的进程)
    • 即使某进程明确不加入子组(不在 ranks 列表中),也需要通过调用 new_group 完成“知晓该子组存在”的协议同步
  • 不加入子组的进程如何处理返回的 ProcessGroup 对象?
    • 对于不加入子组的进程(如 rank=3),调用 new_group 后会返回一个有效的 ProcessGroup 对象,但该进程不属于该组
    • 此时的最佳实践是:保留该对象但不使用它进行通信(或在通信前先判断是否属于子组)
    • 可通过 dist.get_rank(group=subgroup) 检查:若返回 -1,说明当前进程不属于该子组,应跳过子组内的通信操作

子通信组示例代码

  • 代码示例:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    import torch
    import torch.distributed as dist
    from datetime import timedelta

    dist.init_process_group(backend="nccl")
    rank = dist.get_rank()

    # 所有进程(无论是否加入子组)必须调用 `dist.new_group`,否则会导致分布式死锁
    subgroup = dist.new_group(ranks=[0, 1], timeout=timedelta(seconds=30))

    # 检查当前进程是否属于子组,非子组成员进程可通过 `dist.get_rank(group=subgroup) != -1` 判断身份,避免无效通信
    is_in_subgroup = dist.get_rank(group=subgroup) != -1

    if is_in_subgroup:
    # 子组成员执行通信操作
    tensor = torch.tensor([rank], device="cuda")
    dist.all_reduce(tensor, op=dist.ReduceOp.SUM, group=subgroup)
    print(f"Rank {rank}(子组成员):通信结果 = {tensor.item()}")
    else:
    # 非子组成员跳过通信,或执行其他逻辑
    print(f"Rank {rank}(非子组成员):不参与子组通信")

    dist.destroy_process_group()

PyTorch——分布式编程框架总结

  • 参考链接:
    • 简单入门可参考:「指北」PyTorch分布式训练 - Will Lee的文章 - 知乎

整体说明

  • PyTorch 原生支持了 DP(DataParallel)和DDP(DistributedDataParallel)是常用的数据并行分布式训练工具
  • PyTorch 原生还支持了 FSDP 作为模型并行的分布式训练工具,通过分片模型参数、梯度和优化器状态到多个 GPU,显著降低单卡内存占用
  • HuggingFace Accelerate 是一个轻量级库,专为简化 PyTorch 模型在各种硬件配置上的训练和推理而设计,支持选择 DeepSpeed 和 FSDP 等
  • HuggingFace 的 Trainer 是 transformers 库中一个核心且功能强大的类,它为 PyTorch 模型提供了完整的训练和评估循环,极大地简化了训练过程,让用户可以专注于模型、数据集和训练参数的配置,而无需手动编写复杂的训练代码
    • Trainer 比 Accelerate 更高一级,把循环等也封装了,进需要用户配置参数数据集等即可
  • 其他相关的分布式封装框架有 Horovod、Ray 等
    • Horovod 是 Uber 开源的跨平台的分布式训练工具,名字来自于俄国传统民间舞蹈,舞者手牵手围成一个圈跳舞,与 Horovod 设备之间的通信模式很像
    • Ray 是更高层级的分布式训练框架(利用其他框架),目标是融合数据处理、模型训练、超参数调优和模型服务等各个阶段
  • 注意:篇幅有限,本文主要是记录一些简单的使用示例和说明,更详尽的使用细节需要去官网查看
  • PyTorch 分布式系统的启动方式见:/Notes/PyTorch/PyTorch——分布式程序启动方式汇总

DataParallel(DP) 使用示例

  • DP 是单进程多线程模式,简单易用,适合单机多 GPU 场景
  • DP 的每次前向过程都会进行一次从 GPU 间的参数复制(效率较慢)
  • 参考博客(有比较清晰的图片):Training Neural Nets on Larger Batches: Practical Tips for 1-GPU, Multi-GPU & Distributed setups
  • DP 的使用示例如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    import torch
    import torch.nn as nn
    from torch.utils.data import DataLoader, Dataset

    class DiyModel(nn.Module):
    def __init__(self):
    super().__init__()
    self.fc = nn.Linear(10, 2)
    def forward(self, x):
    return self.fc(x)

    class DiyDataset(Dataset):
    def __len__(self):
    return 1000
    def __getitem__(self, idx):
    return torch.randn(10), torch.randint(0, 2, (1,)).item()

    # 第一步:数据准备(与常规方式一致)
    dataset = DiyDataset()
    dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

    # 第二步:初始化模型、损失函数、优化器
    model = DiyModel()
    # 将模型放到DP中(自动分发到多个GPU),核心步骤,相对普通训练方式,仅需要修改这里
    model = nn.DataParallel(model) # 注意:DP 仅增加这一步即可
    model = model.cuda() # 或 .to('cuda')

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01)

    # 第三步:训练循环
    for epoch in range(3):
    for inputs, labels in dataloader:
    inputs = inputs.cuda()
    labels = labels.cuda()

    # 注:model 是 DP封装过的模型,所以能执行 DP 的个性化操作
    # 在这里 model(inputs) 会执行四个流程:
    # * 数据分发
    # * 模型复制(主线程GPU到其他线程)
    # * 并行前向推理
    # * 输出汇总四个流程
    outputs = model(inputs)
    loss = criterion(outputs, labels)

    optimizer.zero_grad()

    # 注:loss 本质是从 Model 出来的(loss 的梯度是传到 outputs 上再进一步计算的,而 outputs 是模型的输出,执行梯度计算时能考虑分布式),所以也能执行 DP 的个性化操作
    # 这里 loss.backward 执行四个流程:
    # * 各GPU损失梯度计算:根据各GPU的outputs与总output的关系来计算各自的损失梯度(不同GPU不一样)
    # * 损失梯度分发(将各自的梯度分发到各自 GPU 上
    # * 并行后向推理(计算 GPU 自身的局部梯度)
    # * 梯度汇总
    loss.backward()
    optimizer.step() # 仅更新主线程GPU上的模型
    print(f"Epoch {epoch}, Loss: {loss.item()}")

nn.DataParallel(model) 发生了什么?

  • TLDR:model = nn.DataParallel(model) 的核心作用是通过包装模型实现多 GPU 数据并行 :
    • 执行这一行后,模型对象已经变了,需要通过 model.module.fc 才能访问原模型属性(访问 model.fc 会出错)
    • DP 中,仅需要这一步即可实现数据并行 ,后续的代码都无需修改,自动适配了;但代码后面做了很多数据分发和梯度合并的工作
    • 这一行后,程序会自动管理设备分配、模型复制、数据拆分与合并、梯度汇总与参数同步
  • 1)初始化:确定并行设备与包装模型,nn.DataParallel 实例化时会完成以下核心操作:
    • 检测可用GPU :默认情况下,DataParallel 会自动检测当前可见的GPU(通过 CUDA_VISIBLE_DEVICES 环境变量控制),并将其ID列表存储在 device_ids 属性中(默认值为 range(torch.cuda.device_count()))
    • 指定主GPU :主GPU(device[0])是默认的“主导设备”,负责汇总计算结果、更新参数,并作为数据/模型的初始落脚点。若未指定 device_ids,则第一个可见GPU(通常是 cuda:0)会被设为主GPU
    • 包装原始模型 :原始模型会被存入 DataParallel 的 module 属性中(这也是后续访问原模型属性需通过 model.module 的原因),同时 DataParallel 会接管模型的 forward 和 backward 逻辑
  • 2)模型的移动与复制:DataParallel 会自动处理模型在设备间的分布:
    • 主GPU加载模型 :若原始模型在CPU上,DataParallel 会先将其移动到主GPU(device_ids[0]);若模型已在某个GPU上,会检查是否与主GPU一致,不一致则移动
    • 副本同步到其他GPU :在首次执行前向传播时,DataParallel 会将主GPU上的模型参数复制到 device_ids 中的其他GPU,确保所有GPU上的模型初始参数完全一致
  • 3)前向传播:数据拆分与并行计算,当调用 output = model(input) 时,DataParallel 会按以下流程处理:
    • 数据校验与准备 :检查输入数据是否在主GPU上(若不在,会自动移到主GPU)。输入可以是Tensor、列表、字典等结构,只要包含需要拆分的批量数据(如 batch_size 维度的Tensor)
    • 数据拆分(Split) :沿批量维度(默认是第0维,即 batch_size 所在维度)将输入数据均匀拆分到 device_ids 中的所有GPU。例如,若 batch_size=8 且使用2个GPU,则每个GPU会收到 batch_size=4 的子数据
    • 并行计算 :每个GPU上的模型副本会独立处理分配到的子数据,执行各自的 forward 计算,得到子输出
    • 结果收集与合并(Gather) :所有GPU的子输出会被发送回主GPU,然后按拆分的逆过程合并(如拼接),最终形成与单GPU计算格式一致的输出(例如,将2个 batch_size=4 的子输出拼接为 batch_size=8 的完整输出)
  • 4)反向传播:梯度汇总与参数更新
    当调用 loss.backward() 时,梯度计算与参数更新流程如下:
    • 各GPU独立计算梯度 :每个GPU会基于自己处理的子数据和子输出,独立计算模型参数的梯度(存储在各自GPU的模型副本中)
    • 梯度汇总到主GPU :DataParallel 会自动将所有GPU上的梯度求和(sum)并汇总到主GPU的模型中(即 model.module 的参数梯度)
    • 主GPU更新参数 :优化器(如 optimizer.step())仅作用于主GPU上的模型参数,完成参数更新
    • 参数同步到其他GPU :主GPU更新后的参数会被自动广播(broadcast)到其他GPU的模型副本中,确保所有GPU的模型参数保持一致,为下一轮计算做准备
  • 5)特殊细节与注意事项
    • 模型属性访问 :由于原始模型被包装在 DataParallel 的 module 属性中,访问原模型的层或参数时需通过 model.module(例如,model.module.fc 而非 model.fc)。若仅使用单GPU,DataParallel 仍会包装模型,因此建议始终通过 model.module 访问原模型
    • 单GPU场景 :若只有1个可见GPU,DataParallel 不会进行拆分计算(本质上是单GPU运行),但仍会包装模型,此时 model.module 与原始模型等价
    • 数据类型与设备兼容 :输入数据必须与主GPU设备兼容(例如,若主GPU是 cuda:0,输入数据需为 cuda:0 上的Tensor),否则会触发设备不匹配错误
    • 局限性 :DataParallel 是单进程多线程模式,受Python GIL限制,多GPU效率可能不如多进程的 DistributedDataParallel(DDP),且不支持跨节点并行

DistributedDataParallel(DDP)使用示例

  • 更详细的使用说明见:官网说明文档
  • DDP是多进程模式,支持单机/多机多GPU,效率更高,是PyTorch推荐的分布式训练方式

第一步:编写训练脚本

  • 注:这一步只是定义脚本,这个脚本不能通过简单的 python 命令启动(需通过torch.distributed.launch启动)
  • DDP 的示例脚本如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    import os

    import torch
    import torch.nn as nn
    import torch.distributed as dist
    from torch.utils.data import DataLoader, Dataset
    from torch.utils.data.distributed import DistributedSampler # 用于DDP的数据采样

    class DiyModel(nn.Module):
    def __init__(self):
    super().__init__()
    self.fc = nn.Linear(10, 2)

    def forward(self, x):
    return self.fc(x)

    class DiyDataset(Dataset):
    def __len__(self):
    return 1000

    def __getitem__(self, idx):
    return torch.randn(10), torch.randint(0, 2, (1,)).item()

    # 初始化分布式环境
    dist.init_process_group(backend='gloo') # 多GPU推荐用'nccl'后端(NVIDIA专门优化过),CPU使用'gloo'或'mpi',亲测Mac上'gloo'可直接使用
    rank = dist.get_rank() # 全局进程编号(0,1,2...),也可以用 env_rank = int(os.environ["RANK"])
    world_size = dist.get_world_size() # 也可以使用 world_size = int(os.environ["WORLD_SIZE"])
    local_rank = int(os.environ["LOCAL_RANK"]) # 当前进程的local_rank

    # 定义设备并设置
    device = torch.device(f"cuda:{local_rank}" if torch.cuda.is_available() else "cpu")
    torch.cuda.set_device(device) # 为当前进程分配 GPU

    # 数据准备
    dataset = DiyDataset()
    sampler = DistributedSampler(dataset) # 重点:sampler确保各进程数据不重叠
    dataloader = DataLoader(dataset, batch_size=32, sampler=sampler) # sampler传入时,shuffle参数不生效

    # 模型初始化(每个进程单独初始化,再用DDP包装)
    model = DiyModel().to(device)
    model = torch.nn.parallel.DistributedDataParallel( # 核心步骤
    model,
    device_ids=[local_rank],
    output_device=local_rank
    )

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01)

    # 训练循环
    for epoch in range(3):
    sampler.set_epoch(epoch) # 每个epoch打乱数据,避免每个epoch数据顺序一致(epoch内部顺序不随机时容易导致模型学到错误的样本顺序规律)
    for inputs, labels in dataloader:
    # 使用to(device)将数据移动到指定设备
    inputs = inputs.to(device)
    labels = labels.to(device)

    # 前向过程,会同时追踪需要的同步的数据
    outputs = model(inputs)
    loss = criterion(outputs, labels)

    optimizer.zero_grad()

    # 后向过程
    # 注:DDP 在初始化时会为模型参数注册特殊的钩子(hook),这些钩子会在 backward() 过程中自动触发
    # 当本地梯度计算完成后,钩子会启动 All-Reduce 操作,将所有进程的同一份参数的梯度进行汇总(默认求平均)
    # 完成上述流程后,每个进程上的同一份参数会拥有相同的梯度值,存储在 `grad` 属性中
    loss.backward()
    optimizer.step() # 普通的 optimizer 更新每个进程自己的参数

    # 只在主进程(rank=0)打印信息
    if rank == 0:
    print(f"Epoch {epoch}, Loss: {loss.item()}")

    dist.destroy_process_group() # 销毁进程组
附录:DDP 的 DP 间求平均的细节
  • DDP 中在 DP 间累积梯度后,做了平均,具体实现参见 github.com/pytorch/pytorch/blob/main/torch/csrc/distributed/c10d/reducer.cpp

    1
    2
    3
    4
    5
    // 取值与 DP_size 有关(注意: 这里的 size 就是 DDP 中的 world_size,也就是 DP_size)
    div_factor_ = process_group_->getSize();
    ...
    // 做除法
    bucket_view.div_(div_factor_);
  • 问题:为什么 DDP 中,loss.backward() 要对梯度求平均而不是求和?

    • 理解:本质应该是一样的,求和求平均都可以,目前实现本质也是先求和再做除法
  • 特别说明:如果是想做 Token 粒度的平均(每个样本的可学习 Token 数不一致),需要多维护一个 Token 数量的变量并执行一次 all_reduce 通信

    • 当然,为了实现与不做 DP 完全一致的效果,这里其实是应该对 Token 也做聚合,再做除法才行的

第二步:启动脚本(通过 torch.distributed.launch 命令)

  • 假设脚本名为ddp_demo.py

  • 单机4卡,使用4个GPU启动:

    1
    python -m torch.distributed.launch --nproc_per_node=4 ddp_demo.py
    • --nproc_per_node:指定单机的GPU数量
  • 多机多卡,2个机器,各有4个GPU启动:

    1
    2
    3
    4
    5
    # 机器1
    python -m torch.distributed.launch --nproc_per_node=4 --nnodes=2 --node_rank=1 --master_addr="localhost" --master_port=29500 DDP_demo.py

    # 机器2
    python -m torch.distributed.launch --nproc_per_node=4 --nnodes=2 --node_rank=0 --master_addr="localhost" --master_port=29500 DDP_demo.py
    • --nnodes:总节点数
    • --node_rank:当前节点编号
    • --master_addr:指定master节点的IP地址,默认值是”localhost”,但是显示给出来更明确
    • --master_port:指定master节点的端口号,默认值是 29500,但是显示给出来更明确

补充:torchrun 命令启动脚本(结论是在 Mac 系统下得到的,暂时没有在 Linux 上尝试)

  • torchrun 启动单机多卡(与 torch.distributed.launch 相同):

    1
    2
    3
    4
    5
    # 回顾 `torch.distributed.launch` 的启动方式
    python -m torch.distributed.launch --nproc_per_node=4 ddp_demo.py

    # torchrun 的启动方式:替换 `python -m torch.distributed.launch` 为 `torchrun` 即可
    torchrun --nproc_per_node=4 ddp_demo.py
  • torchrun 启动多机多卡(相对 torch.distributed.launch而言,torchrun 会自动推断部分参数(如节点数、进程数))

    • (待确认:Mac 系统下失败)torchrun 无需手动指定 node_rank,只需在所有节点上指定相同的 --rdzv_id(任务ID)和 --rdzv_endpoint(主节点地址):

      1
      2
      # 所有节点统一执行相同命令(不需要区分不同节点使用不同命令,会自动分配node_rank)
      torchrun --nproc_per_node=4 --nnodes=2 --rdzv_id=123 --rdzv_backend=c10d --rdzv_endpoint="localhost:29500" DDP_demo.py
      • --rdzv_id:任务唯一ID(任意整数)
      • rdzv_backend:后端(默认c10d)
      • rdzv_endpoint:主节点IP:端口
      • torchrun还可以用 --max_restarts 等参数指定最大重启次数
    • (Mac 系统下成功)在 Mac 系统下,上面的命令执行会出现问题(上面命令在两个窗口分别打开)

      1
      2
      torchrun --nproc_per_node=4 --nnodes=2 --node_rank=1 --master_addr="localhost" --master_port=29500 DDP_test.py
      torchrun --nproc_per_node=4 --nnodes=2 --node_rank=0 --master_addr="localhost" --master_port=29500 DDP_test.py
      • 亲测上面的代码可以成功
  • 更多启动详情见:/Notes/PyTorch/PyTorch——分布式程序启动方式汇总


HuggingFace Accelerate 介绍及使用示例

  • 详情见:/Notes/PyTorch/PyTorch——HF-Accelerate使用总结

HuggingFace Trainer 介绍及使用示例

  • 详情见:/Notes/NLP/NLP——HF-Trainer使用总结

PyTorch——分布式训练Debug笔记


整体说明

  • 为了方便在分布式训练中查看代码信息,需要一些日志打印

优雅打印对象的函数

  • 示例:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    import torch
    from typing import Any
    import os
    import inspect

    def get_current_location() -> str:
    """
    获取当前执行位置的「文件绝对路径」和「所在函数名称」
    :return: (file_path, func_name)
    - file_path: 当前文件的绝对路径
    - func_name: 所在函数名称(模块级代码返回 "<module>",匿名函数返回 "<lambda>")
    """
    # 获取调用栈:index=1 对应「调用当前函数的位置」(即目标位置)
    try:
    # 栈帧结构:inspect.stack()[index] -> FrameInfo 对象
    frame_info = inspect.stack()[1]
    frame = frame_info.frame # 提取栈帧

    # 1. 获取文件绝对路径
    file_path = os.path.abspath(frame.f_code.co_filename)

    # 2. 获取函数名称
    func_name = frame.f_code.co_name

    # 特殊处理:模块级代码(无函数包裹)的函数名显示为 "<module>"
    # (inspect 默认返回 "<module>",无需额外处理)

    return f"python file path:{file_path} #function_name:{func_name}"

    finally:
    # 手动清理栈帧引用,避免内存泄漏(关键!)
    del frame_info
    del frame

    def print_obj_info(obj: Any, indent: int = 0) -> None:
    """
    打印对象的详细信息,包括类型、大小/长度、关键属性及嵌套对象信息
    :param obj: 待打印的对象
    :param indent: 缩进级别(用于嵌套结构格式化)
    """
    # 缩进格式化
    prefix = " " * indent
    type_name = type(obj).__name__

    # 基础信息:类型 + 核心属性
    base_info = f"{prefix}[{type_name}] "

    # 1. 列表类型(含嵌套列表)
    if isinstance(obj, list):
    base_info += f"len={len(obj)}"
    print(base_info)
    # 递归打印前3个元素(避免超长输出),超过则提示
    for i, item in enumerate(obj[:2]):
    print(f"{prefix} - 索引{i}:", end=" ")
    print_obj_info(item, indent + 2)
    if len(obj) > 2:
    print(f"{prefix} - ... 还有{len(obj)-2}个元素")

    # 2. 字典类型
    elif isinstance(obj, dict):
    base_info += f"len={len(obj)}, keys={list(obj.keys())}"
    print(base_info)
    # 递归打印每个value
    for k, v in obj.items():
    print(f"{prefix} - key='{k}':", end=" ")
    print_obj_info(v, indent + 2)

    # 3. PyTorch Tensor类型
    elif isinstance(obj, torch.Tensor):
    base_info += f"shape={tuple(obj.shape)}, dtype={obj.dtype}, device={obj.device}"
    print(base_info)

    # 4. 其他普通类型(数字、字符串、布尔等)
    else:
    # 补充长度信息(字符串)和值信息
    if hasattr(obj, "__len__") and not isinstance(obj, (int, float, bool)):
    obj_str = f"{obj}"
    log_obj_str = obj_str[:100]
    base_info += f"len={len(obj_str)}, value={log_obj_str}" + (f", 还有{len(obj_str)-100} 个字符" if len(obj_str) > 100 else "")
    else:
    base_info += f"value={obj}"
    print(base_info)


    def test_function():
    # 测试数据
    test_obj = {
    "int_val": 42,
    "str_val": "hello" * 30,
    "tensor1": torch.randn(3, 4),
    "nested_list": [1, torch.tensor([2,3]), [4.5, "6"]],
    "bool_val": True
    }

    # 调用函数

    print("="*30)
    print(f"call_location={get_current_location()}")
    print_obj_info(test_obj)
    print("="*60)

    if __name__ == "__main__":
    test_function()

    # ==============================
    # call_location=python file path:/path_to_log_helper.py #function_name:test_function
    # [dict] len=5, keys=['int_val', 'str_val', 'tensor1', 'nested_list', 'bool_val']
    # - key='int_val': [int] value=42
    # - key='str_val': [str] len=150, value=hellohellohellohellohellohellohellohellohellohellohellohellohellohellohellohellohellohellohellohello, 还有50 个字符
    # - key='tensor1': [Tensor] shape=(3, 4), dtype=torch.float32, device=cpu
    # - key='nested_list': [list] len=3
    # - 索引0: [int] value=1
    # - 索引1: [Tensor] shape=(2,), dtype=torch.int64, device=cpu
    # - ... 还有1个元素
    # - key='bool_val': [bool] value=True
    # ============================================================

反向解析并打印某未知函数

  • 有时候调用的函数是经过多次封装得到的,比如 Megatron-LM 项目中存在大量的封装代码
  • 打印函数信息示例
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    import functools
    import inspect
    def print_function_info(function):
    print("=" * 30 + "Inspecting function")

    # 如果是 partial 封装过的对象,需要特殊逻辑 function.func 取出真实的函数
    if isinstance(function, functools.partial):
    print("[Type]: functools.partial")
    print(f"[Original Function]: {function.func.__name__}")
    print(f"[Preset Args]: {function.args}")
    print(f"[Preset Keywords]: {function.keywords}")
    real_func = function.func
    else:
    print(f"[Type]: {type(function)}")
    if hasattr(function, '__name__'):
    print(f"[Name]: {function.__name__}")
    real_func = function

    print("-" * 20)
    try:
    source = inspect.getsource(real_func) # 根据真实函数取出其源代码
    print("[Source Code]:")
    print(source)
    except Exception as e:
    print(f"[Source Code]: Unable to retrieve source. ({e})")
    print("=" * 30)
    print_function_info(my_diy_function)

参数值检查

  • 参数值输出示例:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    # 关闭梯度确保不影响梯度值
    def print_tensor(model_head)
    with torch.no_grad():
    weights = model_head.weight.detach().cpu()
    weights_flat = weights.view(-1)
    num_params = min(1000, weights_flat.numel())
    first_1000_params = weights_flat[:num_params].tolist()
    print("="*30 + "print init_param")
    print(f"Shape of weights: {weights.shape}")
    print(f"First {num_params} parameters of weights:\n{first_1000_params}")

随机种子查看和设置

  • 随机种子涉及到 shuffle,模型参数初始化等操作,如果要对齐两个配置相同的模型,种子也需要对齐

torch Seed 打印

  • torch Seed 打印代码:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    # 打印 torch 的随机种子情况
    def print_torch_seeds():
    print("=" * 30 + "PyTorch Random Seeds Status")
    print("=" * 30)
    cpu_seed = torch.initial_seed()
    print(f"[CPU] Seed: {cpu_seed}")

    if torch.cuda.is_available():
    try:
    gpu_seed = torch.cuda.initial_seed()
    current_device = torch.cuda.current_device()
    device_name = torch.cuda.get_device_name(current_device)

    print(f"[GPU] Seed: {gpu_seed}")
    print(f" Device: {current_device} ({device_name})")
    except Exception as e:
    print(f"[GPU] Error getting seed: {e}")
    else:
    print("[GPU] CUDA is not available.")

    print("=" * 30)
    print_torch_seeds()

torch Seed 设置

  • 全局 torch Seed 设置代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    import torch

    # 固定CPU种子
    torch.manual_seed(42)

    # 固定所有GPU的种子(单GPU/多GPU通用)
    if torch.cuda.is_available():
    torch.cuda.manual_seed_all(42) # 替代 torch.cuda.manual_seed(42)(单GPU)

    # GPU上生成随机排列
    perm = torch.randperm(10, device="cuda") # 注意:需要指定 "cuda" 才会在 GPU 上执行
    print("GPU随机排列:", perm) # 每次运行结果一致
    print("draw a random number:", torch.rand()) # 每次运行结果一致
  • 使用独立的 torch 生成器(独立管理自己的随机生成器):

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    import torch

    # 创建独立的生成器并设置种子
    generator = torch.Generator()
    generator.manual_seed(42)

    # 生成随机排列时指定生成器
    perm1 = torch.randperm(10, generator=generator)
    perm2 = torch.randperm(10, generator=generator)

    print("独立生成器-第一次:", perm1) # tensor([2, 7, 3, 1, 0, 9, 4, 5, 8, 6])
    print("独立生成器-第二次:", perm2) # tensor([2, 0, 7, 9, 8, 4, 3, 6, 1, 5])

    # 重置生成器种子,结果重复
    generator.manual_seed(42)
    perm3 = torch.randperm(10, generator=generator)
    print("重置生成器后:", perm3) # tensor([2, 7, 3, 1, 0, 9, 4, 5, 8, 6])(和perm1一致)
    • 说明:torch.Generator 是 PyTorch 中统一的随机数生成器(RNG)核心对象,几乎所有 PyTorch 内置的随机操作都支持通过 generator 参数指定该生成器
1…282930…63
Joe Zhou

Joe Zhou

Stay Hungry. Stay Foolish.

630 posts
53 tags
GitHub E-Mail
© 2026 Joe Zhou
Powered by Hexo
|
Theme — NexT.Gemini v5.1.4