Jiahong 的个人博客

凡事预则立,不预则废


  • Home

  • Tags

  • Archives

  • Navigation

  • Search

ML——XGBoost-推导过程

XGBoost,全称: Extreme Gradient Boosting


概述

  • CART回归树是XGBoost的基分类器

XGBoost模型推导(假设回归树的结构确定)

模型 \(F\) 的加法定义:

$$
\begin{align}
F(x;w) = \sum_{k=0}^{K} f_{k}(x;w_{k})
\end{align}
$$

  • 其中, \(x\) 为输入样本, \(f_{k}\) 为分类回归树,可以是分类回归树空间中的任意一棵树, \(w_{k}\) 是树 \(f_{k}\) 的参数

定义损失函数

$$
\begin{align}
L^{t} &= \sum_{i=1}^{n}l(y_{i},\hat{y}_{i}^{t}) + \Omega(f_{t}) \\
&= \sum_{i=1}^{n}l(y_{i},\hat{y}_{i}^{t-1} + f_{t}(x_{i})) + \gamma T + \frac{1}{2}\lambda \sum_{j=1}^{T}w_{j}^2
\end{align}
$$

  • \(L^{t}\): 第 \(t\) 轮迭代的损失函数
  • \(l(y_{i},\hat{y}_{i}^{t})\) 表示单个样本的损失函数定义
  • \(\hat{y}_{i}^{t} = \hat{y}_{i}^{t-1} + f_{t}(x_{i})\) 表示第 \(t\) 轮 \(x_{i}\) 样本的预测值
    • 加法模型: 第 \(t\) 轮预测值 = 第 \(t-1\) 轮预测值 + 第 \(t\) 棵(轮)决策树的预测值
  • \(\Omega(f_{t})\) 表示正则项
    • \(n\) 表示样本的个数
    • \(T\) 表示叶子结点的个数
    • \(w_{j}\) 表示叶节点分数 , 即任意样本落到第 \(t\) 棵(轮)决策树的第 \(j\) 叶子结点时的预测值(可称为第 \(t\) 棵(轮)决策树的第 \(j\) 叶子结点的预测值)
    • \(\gamma\) 和 \(\lambda\) 都是正则项参数

第 \(t\) 轮训练的目标

  • 找到一个最优的分类器 \(f_t^{\star}(x)\),满足
    $$
    \begin{align}
    f_t^{\star}(x) &= \mathop{\arg\max}_{f_t(x)}L^{t} \\
    &= \mathop{\arg\max}_{f_t(x)}\left(\sum_{i=1}^{n}l(y_{i},\hat{y}_{i}^{t}) + \Omega(f_{t})\right) \\
    &= \mathop{\arg\max}_{f_t(x)}\left(\sum_{i=1}^{n}l(y_{i},\hat{y}_{i}^{t-1} + f_{t}(x_{i})) + \gamma T + \frac{1}{2}\lambda \sum_{j=1}^{T}w_{j}^2 \right)
    \end{align}
    $$

最小化损失函数推导

损失函数二阶泰勒展开

  • 回忆传统泰勒展开:
    $$
    \begin{aligned}
    f(x) &= f(x_0) + f’(x_0)(x-x_0) + \frac{f’’(x_0)}{2!}(x-x_0)^2 + \cdots + \frac{f^{(n)}(x_0)}{n!}(x-x_0)^n \\
    & = \sum\limits_{n=0}^{\infty}\frac{f^{(n)}x_0}{n!}(x - x_0)^n
    \end{aligned}
    $$

  • 二阶泰勒展开公式
    $$
    \begin{align}
    f(x+\Delta x) \approx f(x) + f’(x)\Delta x + \frac{1}{2}f’’(x)\Delta x^2
    \end{align}
    $$

  • 在我们的场景中,令

    • \(x = \hat{y}_{i}^{t-1}\)
    • \(\Delta x = f_{t}(x_{i})\)
  • 则有对单个样本能得到
    $$
    \begin{align}
    l(y_{i},\hat{y}_{i}^{t-1} + f_{t}(x_{i})) \approx l(y_i,\hat{y}_i^{t-1}) + l’(y_i,\hat{y}_i^{t-1})f_t(x_i) + \frac{1}{2}l’’(y_i,\hat{y}_i^{t-1})f_t^2(x_i)
    \end{align}
    $$

    • \(l’(y_i,\hat{y}_i^{t-1})\) 是 \(l(y_i,\hat{y}_i)\) 对 \(\hat{y}_i\) 的一阶导数在 \(\hat{y}_i = \hat{y}_i^{t-1}\) 处的值
    • \(l’’(y_i,\hat{y}_i^{t-1})\) 是 \(l(y_i,\hat{y}_i)\) 对 \(\hat{y}_i\) 的二阶导数在 \(\hat{y}_i = \hat{y}_i^{t-1}\) 处的值
  • 我们第 \(t\) 轮的目标是找到一个最优的分类器 \(f_t^{\star}(x)\) 最小化损失函数 \(L^{t}\)
    $$
    \begin{align}
    L^{t} \approx \sum_{i=1}^{n}\left(l(y_i,\hat{y}_i^{t-1}) + l’(y_i,\hat{y}_i^{t-1})f_t(x_i) + \frac{1}{2}l’’(y_i,\hat{y}_i^{t-1})f_t^2(x_i)\right) + \gamma T + \frac{1}{2}\lambda \sum_{j=1}^{T}w_{j}^2
    \end{align}
    $$

  • 显然上面式子中的 \(l(y_i,\hat{y}_i^{t-1})\) 与 \(f_t(x_i)\) 无关,可以移除,于是有
    $$
    \begin{align}
    L^{t} \approx \sum_{i=1}^{n}\left(l’(y_i,\hat{y}_i^{t-1})f_t(x_i) + \frac{1}{2}l’’(y_i,\hat{y}_i^{t-1})f_t^2(x_i)\right) + \gamma T + \frac{1}{2}\lambda \sum_{j=1}^{T}w_{j}^2
    \end{align}
    $$

  • 令:

    • \(g_i = l’(y_i,\hat{y}_i^{t-1})\) 为 \(l(y_i,\hat{y}_i)\) 对 \(\hat{y}_i\) 的一阶导数在 \(\hat{y}_i = \hat{y}_i^{t-1}\) 处的值
    • \(h_i = l’’(y_i,\hat{y}_i^{t-1})\) 是 \(l(y_i,\hat{y}_i)\) 对 \(\hat{y}_i\) 的二阶导数在 \(\hat{y}_i = \hat{y}_i^{t-1}\) 处的值
  • 则有:
    $$
    \begin{align}
    L^{t} \approx \sum_{i=1}^{n}\left(g_if_t(x_i) + \frac{1}{2}h_if_t^2(x_i)\right) + \gamma T + \frac{1}{2}\lambda \sum_{j=1}^{T}w_{j}^2
    \end{align}
    $$

  • 做一个重要的转换 :
    $$f_t(x_i) = w_j, \quad s.t. \ x_i \in I_j$$

    • 上面的式子前半部分成立的条件是 \(x_i\) 落在叶子结点 \(j\) 上
    • 我们将条件 \(x_i\) 落在叶子结点 \(j\) 上表示为 \(\ x_i \in I_j\)
  • 于是: 我们可以将前面对样本的累加变成对叶子结点的累加
    $$
    \begin{align}
    L^{t} &\approx \sum_{j=1}^{T}\left((\sum_{i \in I_j}g_i)w_j + \frac{1}{2}(\sum_{i \in I_j}h_i)w_j^2\right) + \gamma T + \frac{1}{2}\lambda \sum_{j=1}^{T}w_{j}^2 \\
    &\approx \sum_{j=1}^{T}\left((\sum_{i \in I_j}g_i)w_j + \frac{1}{2}(\sum_{i \in I_j}h_i + \lambda)w_j^2\right) + \gamma T
    \end{align}
    $$

  • 令:

    • \(G_j = \sum_{i \in I_j}g_i\)
    • \(H_j = \sum_{i \in I_j}h_i\)
  • 则有
    $$
    \begin{align}
    L^{t} \approx \sum_{j=1}^{T}\left(G_jw_j + \frac{1}{2}(H_j + \lambda)w_j^2\right) + \gamma T
    \end{align}
    $$

  • \(L^t\) 对 \(w_j\) 求偏导有
    $$
    \begin{align}
    \frac{\partial L^{t}}{\partial w_j} = G_j + (H_j + \lambda)w_j
    \end{align}
    $$

  • 令导数 \(\frac{\partial L^{t}}{\partial w_j} = 0\) 可得
    $$
    \begin{align}
    w_j^{\star} = -\frac{G_j}{H_j+\lambda}
    \end{align}
    $$

  • 同时
    $$
    \begin{align}
    L^{\star^t} &\approx min(L^t) \\
    &\approx \sum_{j=1}^{T}\left(G_j\cdot (-\frac{G_j}{H_j+\lambda}) + \frac{1}{2}(H_j + \lambda)\cdot(-\frac{G_j}{H_j+\lambda})^2\right) + \gamma T \\
    &\approx -\frac{1}{2}\sum_{j=1}^{T}\left(\frac{G_j^2}{H_j+\lambda}\right) + \gamma T
    \end{align}
    $$

  • 目标函数的计算示例: 目标分数越小越好

结论

  • 在第 \(t\) 轮回归树的结构确定后 ,我们得到的最优叶节点分数与结构无关 \(w_j^{\star} = -\frac{G_j}{H_j+\lambda}\)
  • 最小损失与第 \(t\) 轮回归树的结构的复杂度(叶节点数量)相关
  • 我们还需要确定第 \(t\) 轮回归树的结构

回归树的结构确定

普通决策树树结构的确定

  • 详细情况可参考: ML——DT-决策树
ID3
  • 最大化信息增益来选择分裂特征
C4.5
  • 最大化信息增益比来选择分裂特征
CART
分类树
  • 最小化基尼指数来选择分裂特征
回归树
  • 最小化平方误差来选择分裂特征

XGBoost结点分裂前后的信息增益

  • 信息增益应该与前面的损失函数相关, 损失越小越好
  • 原始损失函数为
    $$
    \begin{align}
    L^{\star^t} \approx -\frac{1}{2}\sum_{j=1}^{T}\left(\frac{G_j^2}{H_j+\lambda}\right) + \gamma T
    \end{align}
    $$
  • 显然,要使得损失函数最小,我们需要使得下面的式子最大:
    $$
    \begin{align}
    \frac{G_j^2}{H_j+\lambda}
    \end{align}
    $$
  • 一个结点分裂后的信息为
    $$
    \begin{align}
    \frac{G_L^2}{H_L+\lambda} + \frac{G_R^2}{H_R+\lambda}
    \end{align}
    $$
    • 左子树的信息 + 右子树的信息
  • 一个结点分裂前信息为
    $$
    \begin{align}
    \frac{(G_L + G_R)^2}{H_L+ H_R + \lambda}
    \end{align}
    $$
  • 从而我们定义一个结点分列前后的信息增益为:
    $$
    \begin{align}
    Gain = \frac{G_L^2}{H_L+ \lambda} + \frac{G_R^2}{H_R+ \lambda} - \frac{(G_L + G_R)^2}{H_L+ H_R + \lambda} - \gamma
    \end{align}
    $$
    • Gain 越大说明当前结点的当前分裂方式越好
    • 其中 \(\gamma\) 是一个超参数用于防止过拟合 ,可以理解为有两层含义:
      • 用于对叶节点数目进行控制
      • 用于增大分裂前后Gain的阈值
      • 事实上 \(\gamma\) 不是凭空来的,开始的定义中 \(\gamma\) 就是L1正则化(叶节点数量)的系数, 这里分裂后叶节点数量多了1个, 我们希望树的叶节点越少越好(模型越简单越好),而这个的信息增益越大越好, 所以减去 \(\gamma\) 值,防止结点分裂得太多
      • 对于每个特征的每个分裂点, \(\gamma\) 值都相同,所以 \(\gamma\) 值对特征和分裂点的选择没有影响,只是影响了某个结点是否能够被分裂(信息增益太小或者为负时我们可以选择不分裂)

XGBoost树节点分裂

精确分裂算法
  • 单个叶节点精确计算信息增益分裂流程(特征和特征分裂取值的选择, 假设一共m个特征)
    • 注意: 每次只对叶节点分裂,已经分裂了的就不是叶节点了,不能二次分裂
    • 原图来自: https://www.cnblogs.com/massquantity/p/9794480.html
近似分裂算法
  • 将每个特征的取值划分为多个分位点
  • 每次考察特征时值考察分位点,减少计算复杂度
  • 其他的步骤与前面的精确分裂算法相同,包括分数计算和选择最大分数等
分桶策略与GBDT的不同
  • 传统的GBDT分桶时每个样本的权重都是相同的
  • XGBoost中每个样本的权重为损失函数在该样本点的二阶导数(对不同的样本,计算得到的损失函数的二阶导数是不同的), 这里优点AdaBoost的思想,重点关注某些样本的感觉
  • 这里影响的是划分点的位置(我们划分划分点[桶]时都是均匀划分样本到桶里面,当不同样本的权重不同时,每个桶里面的样本数量可能会不同)
  • 下图是一个示例
  • 详细推导解释,为什么选择损失函数在当前样本的二阶导数作为权重?
    $$
    \begin{align}
    L^{t} &\approx \sum_{i=1}^{n}\left(g_if_t(x_i) + \frac{1}{2}h_if_t^2(x_i)\right) + \gamma T + \frac{1}{2}\lambda \sum_{j=1}^{T}w_{j}^2 \\
    &\approx \sum_{i=1}^{n}\frac{1}{2}h_i\left(2\frac{g_i}{h_i}f_t(x_i) + f_t^2(x_i)\right) + \gamma T + \frac{1}{2}\lambda \sum_{j=1}^{T}w_{j}^2 \\
    &\approx \sum_{i=1}^{n}\frac{1}{2}h_i\left(f_t(x_i) - \frac{g_i}{h_i}\right)^2 + \sum_{i=1}^{n}\left ( \frac{g_i^2}{2h_i} - 2g_if_t(x_i) \right) + \gamma T + \frac{1}{2}\lambda \sum_{j=1}^{T}w_{j}^2 \\
    \end{align}
    $$
    • 上面的式子中第二项在原文中用constant来表示,可以被看成某种正则项
    • 第一项是一个平方误差表达式,样本 \(x_i\) 对应的输出值为 \(\frac{g_i}{h_i}\),而样本权重就是 \(h_i\)
    • 权重代表概率,概率越大说明该点出现的次数或者该点附近的值出现的次数就越多.
  • XGBoost分桶流程
    • [待更新]

XGBoost整棵树分裂流程

  • 初始化 \(f^0(x)\)
  • for t=1 to M:
    • 计算损失函数对每个训练样本点的一阶导数 \(g_i\),二阶导数 \(h_i\)
    • 递归对叶子节点运用树分裂算法生成一颗决策树 \(f^t(x)\),
      • (这一步包括叶子节点的分数 \(w_j\) 也都确定下来)
      • (这一步可以使用近似分裂算法加快训练速度)
    • 把新生成的决策树加入到模型中, \(\hat{y}^t = \hat{y}^{t-1} + f_m(x)\)

传统GDBT与XGBoost的比较

  • 参考博客: ML——XGBoost-vs-传统GBDT

DL——Attention

本文主要介绍Attention的原理和变种

  • 参考博客(其中有些错误,本文已经修正): https://zhuanlan.zhihu.com/p/47063917
  • 参考论文: An Introductory Survey on Attention Mechanisms in NLP Problems
  • 强烈推荐一篇写得非常好的动画讲解: 基于Attention的Seq2Seq可视化神经机器翻译机
  • 另一篇不错的Attention和Transformer讲解自然语言处理中的自注意力机制(Self-Attention Mechanism)
  • 这个博客中有李宏毅老师的讲解:Self Attention详解——知乎

RNN的局限: Encoder-Decoder模型

  • RNN 结构
  • Encoder-Decoder结构

Attention机制的引入

  • Attention机制的根本优势在于对不同的
  • 引入Attention前后的Encoder和Decoder对比图
    • 使用 Attention 前: \(\vec{h_{t}^{out}} = f(\vec{h_{t-1}^{out}},\vec{y_{t-1}})\)
    • 使用 Attention 后: \(\vec{h_{t}^{out}} = f(\vec{h_{t-1}^{out}},\vec{y_{t-1}}, \vec{c_{t}})\)
      • \(\vec{c_{t}} = q(\vec{h_{1}^{in}}, \dots, \vec{h_{T}^{in}})\)
      • \(q\) 是个多层的运算,有多重不同实现,详情参考后面的讲解
  • 动态图理解 Attention 机制
    • 图中线条越清晰说明对当前结点的影响越大,不清晰说明影响较小
  • 进一步看结构图
  • 上图中Encoder使用的是双层双向的RNN
    • 第一层倒序从后 \(X_T\) 到前 \(X_1\) 生成, 反方向编码器
    • 第二层正序从前 \(X_1\) 到后 \(X_T\) 生成, 正方向编码器
    • 二者combine为一个更高维度的向量, 这个更高维度的向量整个作为Encoder的隐藏层
  • 流程说明:
    • 利用 RNN 结构得到 Encoder中的 Hidden State (\(\vec{h_1}, \vec{h_2},\dots, \vec{h_T}\))
    • 假设当前 Decoder 的Hidden State 是 \(\vec{s_{t-1}}\),计算每一个 \(\vec{h_j}\) 与当前输入位置的关联性 \(e_{ij} = a(\vec{s_{t-1}}, \vec{h_j})\),得到向量 \(\vec{e_t} = (a(\vec{s_{t-1}}, \vec{h_1}), \dots, a(\vec{s_{t-1}}, \vec{h_T})) \)
      • 这里的 \(a\) 是相关性的(函数)运算符, 常用的可以用向量内积(点成),加权点乘等
        • 内积点乘: \(e_{tj} = \vec{s_{t-1}}^T\cdot\vec{h_j}\)
        • 加权点乘: \(e_{tj} = \vec{s_{t-1}}^TW\vec{h_j}\) (一般使用这个)
        • 更复杂的: \(e_{tj} = \vec{v}^Ttanh(W_1\vec{s_{t-1}}^T + W_2\vec{h_j})\)
    • 对 \(\vec{e_t}\) 进行 softmax 操作,将其归一化得到 Attention 的分布, \(\vec{\alpha_t} = softmax(\vec{e_t})\)
    • 利用 \(\vec{\alpha_t}\),我们可以进行加权求和得到相应的上下文向量(context verctor) \(\vec{c_t} = \sum_{j=1}^T\alpha_{tj}\vec{h_j}\)
    • 计算 Decoder 的下一个 Hidden State \(\vec{s_t} = f_h(\vec{s_{t-1}}, \vec{y_{j-1}}, \vec{c_t})\)

Attention的变种

这里的总结参考博客Attention

  • 基于强化学习的注意力机制:选择性的Attend输入的某个部分
  • 全局&局部注意力机制:其中,局部注意力机制可以选择性的Attend输入的某些部分
  • 多维度注意力机制:捕获不同特征空间中的Attention特征
  • 多源注意力机制:Attend到多种源语言语句
  • 层次化注意力机制:word->sentence->document
  • 注意力之上嵌一个注意力:和层次化Attention有点像
  • 多跳注意力机制:和前面两种有点像,但是做法不太一样。且借助残差连接等机制,可以使用更深的网络构造多跳Attention。使得模型在得到下一个注意力时,能够考虑到之前的已经注意过的词
  • 使用拷贝机制的注意力机制:在生成式Attention基础上,添加具备拷贝输入源语句某部分子序列的能力
  • 基于记忆的注意力机制:把Attention抽象成Query,Key,Value三者之间的交互;引入先验构造记忆库
  • 自注意力机制:自己和自己做attention(这里的自己只每个文档自身),使得每个位置的词都有全局的语义信息,有利于建立长依赖关系

广义的Attention机制

参考博客: https://www.sohu.com/a/226596189_500659

  • Attention的本质:
    • 一个Attention函数可以被描述为一个把查询(Query)和键-值(Key-Value)对集合变换成输出(Attention Value)的映射
    • 简单的讲就是一个把 (Query,[Key-Value]s) 映射成一个 Attention Value (输出)
    • An attention function can be described as Mapping aquery and a set of key-value pairs to an output
  • 表示成数学公式如下
  • 如上图所示,在计算 Attention 时主要分为三步
    • 第一步是将 Query 和每个 Key 进行相似度计算得到权重,常用的相似度函数有点积,拼接,感知机等
    • 第二步一般是使用一个 Softmax 函数对这些权重进行归一化
    • 第三步将权重和相应的键值 Value 进行加权求和得到最后的 Attention
  • Attention过程还可以大致分为两步理解:
      1. 将Query和Key经过相似度计算(某种数学运算)的结果通过 Softmax 激活函数激活得到上文所说的权重得分布 \(\vec{\alpha} = (\alpha_1\dots \alpha_n)\)
        • 变换一般包括
          • 点乘(Dot): \(f(Q,K_i) = Q^TK_i\)
          • 加权点乘(General): \(f(Q,K_i) = Q^TW_{\alpha}K_i\), \(W_{\alpha}\) 对不同的 \(\alpha_i\)
          • 拼接(Concat): \(f(Q,K_i) = W[Q^T;K_i]\)
          • 感知机(Perceptorn): \(f(Q,K) = \boldsymbol{v}^T tanh(W_Q, UK_i)\)
        • Query和Key在不同任务中是不同的东西
          • 在阅读理解中: Query指的是问题,Key指的是文档
          • 在简单的文本分类中: Query和Key可以是同一个句子(这也就是Self Attention), 也就是句子自己和自己做两个词之间的相似度计算的到权重分布
      1. 将权重分布 \(\vec{\alpha} = (\alpha_1\dots \alpha_n)\) 对Value做加权求和得到最终的特征表示
        • 在当前NLP任务中, 基本上 Key == Value
        • 阅读理解任务中, Value指的就是前面的Key, 是文档
        • 简单文本分类中, Value指句子
        • 在 Self Attention 机制中, 由于之前提到过, Query == Key , 所以有Key == Value == Query
          • 输入一个句子,那么里面的每个词都要和该句子中的所有词进行 Attention 计算, 然后Softmax得到当前句子中每个词的权重,进而对句子中的词求和, 输出当前句子在当前模型中的Attention表示(Attention Value), 即$$\boldsymbol{Y_{AttentionOutput}} = Self Attention(\boldsymbol{Q},\boldsymbol{K},\boldsymbol{V}) = Attention(\boldsymbol{X},\boldsymbol{X},\boldsymbol{X})$$

对Attention的直观解释是

请求为向量时

  • 现有查询向量 q
  • 想从 Value 矩阵(每列对应一个样本) 中按照比例选择样本进行加权求和得到与 q 相关的查询结果
    • 要求是样本与 q 越相关,权重越大
  • Value 中的每个样本都有 Key 矩阵 中的一个样本与之对应(NLP中 Key 往往是 Value 自己)
  • 将 q 与 Key 的每个样本做相关性计算,得到其与 Key 中每个样本的相关性
  • 对 q 与 Key 的所有相关性做归一化,得到权重比例
  • 按照这个比例将 Value 中的样本加权输出结果
  • 该结果就是 Value 经过 \(F(q, Key)\) 加权求和后的结果
  • 也就是 q 对应的结果

请求为矩阵时

  • 现有查询矩阵 Query, 包含 m 个查询向量
  • 相当于重复 m 次做单个请求为向量的运算
  • 每个 q 都能得到一个结果
  • 在实际计算时,可以将整个矩阵一起计算,主要注意归一化是对单个 q 向量与 Key 矩阵生成的结果即可

更多分析

  • 当 Key 与 Value 相同时
    • 其实是说计算 Value 每个样本的权重就用自己去与 q 计算即可
    • NLP中一般都是这样的
  • 当 Key 与 Query 相同时,
    • 其实是找自身不同样本间的相关性
    • 然后根据不同样本的对应其他样本的相关性对其他样本进行加权求和得到自己对应的结果
    • NLP中Self-Attention是这样的
  • Self-Attention是 Key == Value == Query 的情况

Attention研究发展趋势

Shell——进程查找


ps

  • 应用场景:当使用命令sh run.sh启动一个进程后,想要删除,却不知道进程号
  • 查找步骤:
    • 首先使用ps aux | grep run.sh列出进程
    • 杀死进程kill -9 [PID]

TensorFlow——variable_scope和name_scope

  • 在 TensorFlow 1.x 中,variable_scope 和 name_scope 都是用于管理命名空间的工具,但它们的用途和行为有所不同,本文将详细介绍二者的区别

主要用途不同

  • variable_scope :主要用于管理变量的命名和共享,特别是在构建复杂的神经网络模型时,确保不同层或不同部分的变量可以正确命名和复用
  • name_scope :主要用于组织图中的操作,使图的结构更加清晰,便于在 TensorBoard 中查看和分析

对 tf.Variable 的影响相同

  • variable_scope :variable_scope 会为 tf.Variable 创建的变量添加前缀:

    1
    2
    3
    4
    5
    import tensorflow as tf

    with tf.variable_scope('var_scope'):
    var3 = tf.Variable(3.0, name='var3')
    print(var3.name) # 输出: var_scope/var3:0
  • name_scope :name_scope 同样会为 tf.Variable 创建的变量添加前缀:

    1
    2
    3
    4
    5
    import tensorflow as tf

    with tf.name_scope('name_scope'):
    var4 = tf.Variable(4.0, name='var4')
    print(var4.name) # 输出: name_scope/var4:0

对 tf.get_variable 的影响不同

  • variable_scope :variable_scope 会影响 tf.get_variable 创建的变量的命名,并且支持变量共享。tf.get_variable 创建的变量名称会带上 variable_scope 的前缀:

    1
    2
    3
    4
    5
    import tensorflow as tf

    with tf.variable_scope('var_scope'):
    var1 = tf.get_variable('var1', shape=[1], initializer=tf.constant_initializer(1.0))
    print(var1.name) # 输出: var_scope/var1:0
  • name_scope :name_scope 不会影响 tf.get_variable 创建的变量的命名。tf.get_variable 创建的变量会忽略 name_scope,直接使用 variable_scope 或默认的命名空间:

    1
    2
    3
    4
    5
    import tensorflow as tf

    with tf.name_scope('name_scope'):
    var2 = tf.get_variable('var2', shape=[1], initializer=tf.constant_initializer(2.0))
    print(var2.name) # 输出: var2:0

variable_scope变量共享功能

  • variable_scope :支持变量共享,通过设置 reuse 参数(如 reuse=True 或 reuse=tf.AUTO_REUSE),可以在不同的作用域中复用相同名称的变量

  • 共享变量功能在构建具有共享参数的神经网络时非常有用,下面是构建神经网络的最佳实践:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    import tensorflow as tf

    def my_network(inputs):
    with tf.variable_scope('my_network', reuse=tf.AUTO_REUSE):
    w = tf.get_variable('weights', shape=[1], initializer=tf.constant_initializer(3.0))
    output = inputs * w
    return output

    input1 = tf.constant(1.0)
    input2 = tf.constant(2.0)

    output1 = my_network(input1)
    output2 = my_network(input2)
    # 这里 w 在两个调用中是共享的
  • 注:name_scope :不支持变量共享 ,主要用于组织操作(如 tf.add、tf.matmul 等)的命名,方便在 TensorBoard 中可视化


附录:variable_scope vs name_scope更多代码示例

  • 测试代码1:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    def test():
    data = tf.ones(shape=[3,5], dtype=tf.float32)
    with tf.variable_scope("vs_test"):
    x = tf.get_variable("x", initializer=[10])
    y = tf.constant(20)
    z = tf.layers.dense(inputs=data, units=1, name="output")
    a = tf.Variable("a")
    with tf.name_scope("ns_test"):
    x1 = tf.get_variable("x", initializer=[10])
    y1 = tf.constant(20)
    z1 = tf.layers.dense(inputs=data, units=1, name="output")
    a1 = tf.Variable("a")
    test()
    # <tf.Variable 'vs_test/x:0' shape=(1,) dtype=int32_ref>
    # Tensor("vs_test/Const:0", shape=(), dtype=int32)
    # Tensor("vs_test/output/BiasAdd:0", shape=(3, 1), dtype=float32)
    # <tf.Variable 'vs_test/Variable:0' shape=() dtype=string_ref>
    # ==========
    # <tf.Variable 'x:0' shape=(1,) dtype=int32_ref>
    # Tensor("ns_test/Const:0", shape=(), dtype=int32)
    # Tensor("ns_test/output/BiasAdd:0", shape=(3, 1), dtype=float32)
    # <tf.Variable 'ns_test/Variable:0' shape=() dtype=string_ref>
  • 结论1:

    • 对于variable_scope()来说,所有方式获取的变量或layer等调用都会被加上前缀
    • variable_scope()包含reuse参数,对这个scope下的所有变量生效(包括通过layer调用或get_variable获取的变量)
      • reuse = True: 复用之前的同名变量,没有同名变量则抛出异常
      • reuse = False: 创建新变量,有同名变量则抛出异常
      • reuse = tf.AUTO_REUSE: 如果有同名变量,则复用之前的同名变量,否则创建新变量
    • 对于name_scope()来说,通过tf.get_variable和layer获取到的变量不会被加上前缀,上面示例中打印出来的不是变量,而是网络输出值,可以被name_scope来管理
    • name_scope()没有reuse参数
  • 参考链接:https://blog.csdn.net/shenxiaoming77/article/details/79141078

    name_scope: 为了更好地管理变量的命名空间而提出的。比如在 tensorboard 中,因为引入了 name_scope, 我们的 Graph 看起来才井然有序
    variable_scope: 大部分情况下,跟 tf.get_variable() 配合使用,实现变量共享的功能

  • 测试代码2:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    def test():
    data = tf.ones(shape=[3,5], dtype=tf.float32)
    with tf.variable_scope("vs_test"):
    x = tf.get_variable("x", initializer=[10])
    y = tf.constant(20)
    z = tf.layers.dense(inputs=data, units=1, name="output")
    a = tf.Variable(1, name="a")
    with tf.variable_scope("vs_test"):
    x = tf.get_variable("y", initializer=[10])
    # 下面这行会报错ValueError: Variable vs_test/output/kernel already exists
    # z = tf.layers.dense(inputs=data, units=1, name="output")
    a = tf.Variable(1, name="b")
    with tf.name_scope("ns_test"):
    x1 = tf.get_variable("x", initializer=[10])
    y1 = tf.constant(20)
    z1 = tf.layers.dense(inputs=data, units=1, name="output")
    a1 = tf.Variable(1, name="a")
    test()

    print "=====trainable===="
    trainable_var = tf.trainable_variables()
    for v in trainable_var: print v

    print "=====vs_test===="
    main_qnet_var = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='vs_test')
    for v in main_qnet_var: print v

    print "=====ns_test===="
    main_qnet_var = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='ns_test')
    for v in main_qnet_var: print v

    # =====trainable====
    # <tf.Variable 'vs_test/x:0' shape=(1,) dtype=int32_ref>
    # <tf.Variable 'vs_test/output/kernel:0' shape=(5, 1) dtype=float32_ref>
    # <tf.Variable 'vs_test/output/bias:0' shape=(1,) dtype=float32_ref>
    # <tf.Variable 'vs_test/a:0' shape=() dtype=int32_ref>
    # <tf.Variable 'vs_test/y:0' shape=(1,) dtype=int32_ref>
    # <tf.Variable 'vs_test_1/b:0' shape=() dtype=int32_ref>
    # <tf.Variable 'x:0' shape=(1,) dtype=int32_ref>
    # <tf.Variable 'output/kernel:0' shape=(5, 1) dtype=float32_ref>
    # <tf.Variable 'output/bias:0' shape=(1,) dtype=float32_ref>
    # <tf.Variable 'ns_test/a:0' shape=() dtype=int32_ref>
    # =====vs_test====
    # <tf.Variable 'vs_test/x:0' shape=(1,) dtype=int32_ref>
    # <tf.Variable 'vs_test/output/kernel:0' shape=(5, 1) dtype=float32_ref>
    # <tf.Variable 'vs_test/output/bias:0' shape=(1,) dtype=float32_ref>
    # <tf.Variable 'vs_test/a:0' shape=() dtype=int32_ref>
    # <tf.Variable 'vs_test/y:0' shape=(1,) dtype=int32_ref>
    # <tf.Variable 'vs_test_1/b:0' shape=() dtype=int32_ref>
    # =====ns_test====
    # <tf.Variable 'ns_test/a:0' shape=() dtype=int32_ref>
  • 结论2:

    • 在重复定义vs_test后,
      • tf.get_variable获得的变量命名是vs_test开头的
      • tf.Variable获得的变量命名是vs_test_1开头的(变量名自增)

嵌套作用域的reuse继承和覆盖

  • 在多层级 tf.variable_scope 中使用 reuse 参数时,reuse 参数的状态在嵌套的 variable_scope 中会进行继承和覆盖
    • 继承 :子作用域会继承父作用域的 reuse 状态,
    • 覆盖 :子作用域可以通过显式设置 reuse 参数来覆盖继承的状态

子作用域未显式设置 reuse 参数(继承)

  • 当子作用域没有显式设置 reuse 参数时,它会继承父作用域的 reuse 状态

    1
    2
    3
    4
    5
    import tensorflow as tf

    with tf.variable_scope('outer_scope', reuse=True) as outer:
    with tf.variable_scope('inner_scope') as inner:
    print(inner.reuse) # 输出: True
  • 在上述代码中,outer_scope 的 reuse 设置为 True,inner_scope 未显式设置 reuse 参数,所以 inner_scope 继承了 outer_scope 的 reuse 状态,即 True

子作用域显式设置 reuse 参数(覆盖)

  • 若子作用域显式设置了 reuse 参数,那么它会覆盖从父作用域继承的状态

    1
    2
    3
    4
    5
    import tensorflow as tf

    with tf.variable_scope('outer_scope', reuse=True) as outer:
    with tf.variable_scope('inner_scope', reuse=False) as inner:
    print(inner.reuse) # 输出: False
    • 这里,outer_scope 的 reuse 为 True,但 inner_scope 显式将 reuse 设置为 False,所以 inner_scope 的 reuse 状态为 False

reuse=tf.AUTO_REUSE(与True和False一致)

  • reuse=tf.AUTO_REUSE 允许在变量存在时复用,不存在时创建。在多层级作用域中,它同样遵循继承和覆盖规则

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    import tensorflow as tf

    def create_or_reuse_variable():
    with tf.variable_scope('outer', reuse=tf.False):
    with tf.variable_scope('inner', reuse=tf.AUTO_REUSE):
    var = tf.get_variable('my_var', shape=[1], initializer=tf.constant_initializer(1.0))
    return var

    # 两次调用
    var1 = create_or_reuse_variable()
    var2 = create_or_reuse_variable()

    print(var1.name) # 输出: outer/inner/my_var:0
    print(var2.name) # 输出: outer/inner/my_var:0
    • 在这个例子中,两次调用 create_or_reuse_variable 函数时,由于使用了 reuse=tf.AUTO_REUSE,第二次调用会复用第一次创建的变量

TensorFlow——使用笔记

TensorFlow封装了很多有用的函数,本文主要介绍介绍其中常用的函数

  • 优秀参考链接:
    • https://www.cnblogs.com/ying-chease/p/9723309.html
    • https://www.cnblogs.com/wuzhitj/p/6298004.html

tf.identify()

  • 参考链接: https://blog.csdn.net/qq_23981335/article/details/81361748
  • y=x 图上是看不到y的具体节点的,因为这仅仅是个普通的Python变量引用复制,不是一个TF操作
    • 由于不是一个TF操作,所以执行y与对x取值操作一样,没有多余的操作
  • tf.identify()会复制一个变量,此时图中会增加一个操作

string与数字类型的转换

  • tf.as_string()
  • tf.string_to_number()

tf.pad()

  • 用于填充tensor
  • 函数定义如下:
    1
    2
    3
    4
    5
    6
    7
    tf.pad(
    tensor,
    paddings,
    mode='CONSTANT',
    constant_values=0,
    name=None
    )

tf.slice()

  • tf.slice(x, [0, 2], [-1, 1])等价于x[:, 2:3]
    • 两者都会在graph中增加一个节点
    • 需要注意,在graph里面两者的节点不一样,但是获得的结果是相同的
      • 图中x[:, 2:3]会多几个输出属性,所以猜测tf.slice更好些【不精确】

tf.map_fn()

  • 主行执行某个函数

tf.add_n()

  • 函数说明(将多个tf.Tensor对象相加并返回一个tf.Tensor对象):

    1
    tf.add_n(inputs, name=None)
    • inputs:一个包含多个 tf.Tensor 的列表(或可迭代对象)
    • name(可选):操作的名称
    • 返回:返回值是一个tf.Tensor对象
  • 示例代码执行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    import tensorflow as tf

    # 创建多个张量
    tensor1 = tf.constant([1, 2, 3])
    tensor2 = tf.constant([4, 5, 6])
    tensor3 = tf.constant([7, 8, 9])

    # 使用 tf.add_n 对多个张量进行相加
    result = tf.add_n([tensor1, tensor2, tensor3])

    # 创建会话并运行计算
    with tf.Session() as sess:
    result_value = sess.run(result)
    print("相加结果:", result_value)

    # 相加结果: [12 15 18]
    • 注:在tf=1.x的版本中,必须使用with tf.Session() as sess:加sess.run来执行图并抽取tensor的值,在tf=2.x的版本中则可以直接使用print("相加结果:", result.numpy())

tf.add_n和tf.reduce_sum的区别

  • tf.reduce_sum的输入是一个张量,输出会降维(调用时若输入一个tensor的列表,也会默认地被TensorFlow隐式转换为一个tensor然后再输入,相当于隐式调用了tf.stack)
  • tf.add_n输入是一个列表,输出和列表中的元素shape相同
  • 等价实现示例:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    import tensorflow as tf

    # 创建多个张量
    tensor1 = tf.constant([1, 2, 3])
    tensor2 = tf.constant([4, 5, 6])
    tensor3 = tf.constant([7, 8, 9])

    # 使用 tf.add_n 对多个张量进行相加
    result_add_n = tf.add_n([tensor1, tensor2, tensor3])

    # 将多个张量堆叠成一个新的张量
    stacked_tensors = tf.stack([tensor1, tensor2, tensor3])

    # 使用 tf.reduce_sum 对堆叠后的张量在第 0 维上求和,达到与 tf.add_n 相同的效果
    result_reduce_sum = tf.reduce_sum(stacked_tensors, axis=0)

    ## 不考虑可读性时,也可以使用下面的方法调用,tensorflow将隐式地将`[tensor1, tensor2, tensor3]`合并成一个tensor对象
    # result_reduce_sum = tf.reduce_sum([tensor1, tensor2, tensor3], axis=0)

    # 创建会话并运行计算
    with tf.Session() as sess:
    result_add_n_value = sess.run(result_add_n)
    result_reduce_sum_value = sess.run(result_reduce_sum)

    print("tf.add_n 的结果:", result_add_n_value)
    print("tf.reduce_sum 的结果:", result_reduce_sum_value)

    # tf.add_n 的结果: [12 15 18]
    # tf.reduce_sum 的结果: [12 15 18]

字符串转固定二维数组

  • 场景:每行包含一个字符串,每个字符串中包含多个值,按照’;’分隔开

    转换方式1

  • 方案:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    nums = tf.string_split(tf.reshape(nums, [-1]), delimiter=";")

    # SparseTensor不能使用string_to_number,下面的句子不能用
    # nums = tf.string_to_number(nums, out_type=tf.float32)

    # 如果知道batch_size的话可以用下面的句子来减少内存使用,但不是必要的
    # nums = tf.sparse_slice(nums, [0, 0], [batch_size, target_len])

    nums = tf.sparse_tensor_to_dense(nums, default_value='0.0')
    nums = tf.string_to_number(nums, out_type=tf.float32)
    # 目前无法保障行的长度,通过pad 0实现最大长度,然后再截断到目标长度(有点浪费内存了)
    nums = tf.pad(nums, paddings=[[0, 0], [0, target_len]], mode="CONSTANT")
    # nums = nums[:, :target_len] ==
    nums = tf.slice(nums, [0, 0], [-1, target_len])
  • 缺点

    • 浪费内存和时间

转换方式2

  • 方案:

    1
    2
    3
    4
    5
    6
    7
    8
    nums = tf.string_split(tf.reshape(nums, [-1]), delimiter=";")

    # 确保每行大小不超过target_len,否则sparse_to_dense在稀疏长度大于目标长度会报错
    # target_len可以大于稀疏长度的,第一个维度不能为-1,可用nums.dense_shape[0]
    nums = tf.sparse_slice(nums, [0, 0], [nums.dense_shape[0], target_len])
    # sparse_to_dense的batch_size是必要的,不能为-1,处理到最后一个batch时,`nums = tf.sparse_to_dense(nums.indices, [batch_size, target_len], nums.values, default_value='0.0')`会出现问题,因为最后一层真实的数据量不是batch_size
    nums = tf.sparse_to_dense(nums.indices, [nums.dense_shape[0], target_len], nums.values, default_value='0.0')
    nums = tf.string_to_number(nums, out_type=tf.float32)
  • 优点

    • 简单快捷,节省内存

数据mask处理

  • 应用场景:屏蔽部分数据,比如包含所有粗排队列及输入ctr长度信息,需要屏蔽掉没有进入CTR的数据
  • 屏蔽方案:
    1
    2
    3
    4
    5
    6
    # 屏蔽被移除的广告对应的数据,max_len为最大长度,fix_len为截断长度,默认值为0
    def mask_fix_len_nums(nums, max_len, real_len, name):
    with tf.name_scope("mask_%s" % name):
    mask = tf.sequence_mask(real_len, max_len)
    mask_nums = tf.where(mask, nums, tf.zeros_like(nums))
    return mask_nums

tf.layers.max_pooling1d

  • 需要先将数据处理为rank=3才可以,如果原始数据为rank=1或rank=2,可通过reshape函数修改shape,然后pooling处理完后再恢复到原来的shape

  • 举例

    1
    2
    3
    4
    5
    pool_raw_bids = tf.reshape(raw_bids, [-1, 1, 100])
    print pool_raw_bids.shape
    bid_pool_result = tf.layers.max_pooling1d(pool_raw_bids, 10, strides=10, padding="valid", data_format="channels_first")
    # 10个一组,pooling后长度为10
    final_result = tf.reshape(bid_pool_result, [-1, 10])
  • 关于参数valid:https://vimsky.com/article/3881.html

  • 参数data_format:

    • “channels_first”: 指明channel维度在前数据维度前
    • “channels_last”: 指明channel维度在前数据维度后
    • 详情看函数源码文档即可

人工转换数值型特征为类别型

  • 代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    queue_len = tf.convert_to_tensor([81, 72, 63, 144, 33, 10, 209])
    ori_len = tf.convert_to_tensor([84, 109, 80, 203, 68, 300, 245])
    queue_len = tf.reshape(queue_len, [-1])
    ori_len = tf.reshape(ori_len, [-1])
    b = [4,10,19,33,52,78,114,164,240]
    def bucket(x):
    # 下面个的代码无法保证顺序,会出现编码混乱的局面,返回值全是[8,8,...]
    # bucket_dict = {
    # tf.less(x, b[0]): lambda: 0,
    # tf.less(x, b[1]): lambda: 1,
    # tf.less(x, b[2]): lambda: 2,
    # tf.less(x, b[3]): lambda: 3,
    # tf.less(x, b[4]): lambda: 4,
    # tf.less(x, b[5]): lambda: 5,
    # tf.less(x, b[6]): lambda: 6,
    # tf.less(x, b[7]): lambda: 7,
    # tf.less(x, b[8]): lambda: 8
    # }
    # 下面的代码也无法保证顺序
    # bucket_dict = list()
    # for i in range(len(b)):
    # bucket_dict.append((tf.less(x, b[i]), lambda: b[i]))
    bucket_dict = [
    (tf.less(x, b[0]), lambda: 0),
    (tf.less(x, b[1]), lambda: 1),
    (tf.less(x, b[2]), lambda: 2),
    (tf.less(x, b[3]), lambda: 3),
    (tf.less(x, b[4]), lambda: 4),
    (tf.less(x, b[5]), lambda: 5),
    (tf.less(x, b[6]), lambda: 6),
    (tf.less(x, b[7]), lambda: 7),
    (tf.less(x, b[8]), lambda: 8)
    ]
    return tf.case(bucket_dict, default=lambda:len(b), exclusive=False)
    ori_len_bucket = tf.map_fn(bucket, elems=ori_len)
    queue_len_bucket = tf.map_fn(lambda x: x/10, elems=queue_len)

慎用随机算子

  • 使用随机算子后,每次调用都会重新初始化随机算子,获取到的值可能不同

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    data = tf.random_uniform([1], 0.0, 1.0)
    # data = tf.constant([2], tf.float32)
    data = tf.Print(data, [data])
    pow_data_2 = tf.pow(data, [2])
    data_3 = tf.multiply(pow_data_2, 3)
    # print data
    with tf.Session() as s:
    s.run(tf.global_variables_initializer())
    # print s.run([data])
    print s.run([data])
    print s.run([pow_data_2])
    print s.run([data_3])
    # 以上代码会答应3次tf.Print,3次print的结果不同,因为调用了两次初始化操作
    # TensorFlow是惰性的,不执行s.run操作就不会走一遍算子
  • 进一步的

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    data = tf.random_uniform([1], 0.0, 1.0)
    data_list = [data, data, data]
    # data = tf.constant([2], tf.float32)
    data = tf.Print(data, [data])
    pow_data_2 = tf.pow(data, 2)
    data_3 = tf.multiply(pow_data_2, 3)
    data_list = tf.Print(data_list, [data_list])
    # print data
    with tf.Session() as s:
    s.run(tf.global_variables_initializer())
    # print s.run([data])
    print s.run([data])
    print s.run([pow_data_2])
    print s.run([data_3])
    print s.run([data_list])
    """ 输出如下, 上面四行为Print结果,后面的是run的结果
    [0.974354625]
    [0.14039886]
    [0.219203591]
    [[0.507081509][0.507081509][0.507081509]]
    [array([0.9743546], dtype=float32)]
    [array([0.01971184], dtype=float32)]
    [array([0.14415064], dtype=float32)]
    [array([[0.5070815],
    [0.5070815],
    [0.5070815]], dtype=float32)]
    """
    # 由于data_list是一个对象,只会进行一次打印,同理,使用`s.run([data, pow_data_2])`时将打印出相同data对应的data和data^2

tf.stop_gradient

  • 参考链接:关于tf.stop_gradient的使用及理解

tf.AUTO_REUSE

  • tf.layers.dense, 通过name复用
    • 被复用时,以dense对应的整个层为单位,这个层的参数数量应该相同
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      # encoding=utf-8
      import tensorflow as tf

      input_layer = tf.Variable([[1,2,3,4]], trainable=False, dtype=tf.float32)
      input_layer10 = tf.Variable([[10,20,30,40]], trainable=False, dtype=tf.float32)

      def get_net1():
      kernel_initializer = tf.random_normal_initializer(mean=0.0, stddev=0.01)

      net = tf.layers.dense(
      inputs=input_layer[:, :1],
      units=1,
      kernel_initializer=kernel_initializer,
      trainable=True,
      # reuse=False, # 这个值可以随便设置,似乎不影响程序正确性?
      name='m')
      return net


      def get_net2():
      kernel_initializer = tf.random_normal_initializer(mean=0.0, stddev=0.01)

      net = tf.layers.dense(
      inputs=input_layer10[:, :1],
      units=1,
      kernel_initializer=kernel_initializer,
      trainable=True,
      # # 当外层有reuse=tf.AUTO_REUSE的variable_scope时,这个reuse为True或False,只要name相同,都会复用;
      # # 否则,当没有外层没有reuse=tf.AUTO_REUSE的variable_scope时,则True代表复用,False代表不复用(不复用时name不能相同)
      # reuse=True,
      name='m')
      return net
      with tf.variable_scope('main_qnet', reuse=tf.AUTO_REUSE):
      net1 = get_net1()
      net2 = get_net2()
      net_list = [net1, net2]
      with tf.Session() as s:
      s.run(tf.global_variables_initializer())
      print s.run([net_list])

minimize分解为compute_gradients和apply_gradients两步骤

  • 参考链接:以终为始:compute_gradients 和 apply_gradients

tf.GraphKeys.UPDATE_OPS

  • 参考链接:tensorflow中的batch_norm以及tf.control_dependencies和tf.GraphKeys.UPDATE_OPS的探究

tf.train.get_or_create_global_step()

  • 创建或获取当前

Estimator.export_savedmodel()

  • 重点参数:serving_input_receiver_fn
    • 返回tf.estimator.export.ServingInputReceiver或tf.estimator.export.TensorServingInputReceiver对象的函数
    • 对象的构造包含了输入参数的信息,相当于在定义model_fn的features字段
    • 该对象中的输入字段均是占位符的形式
  • 函数动作:
    • 通过调用serving_input_receiver_fn生成相关输入tensors,并按照字典结构存储到一个对象,暂命名为features中
    • 将该features作为参数传入model_fn并构造网络
    • 从cpkt中恢复相关参数并将模型必要信息存储到指定目录下
    • 说明:线上使用文件时,可直接加载网络进行推断,其中占位符可以被线上实时指定的向量替代
  • 需要关注的点:
    • 如果使用estimator框架训练模型,则该函数存储时默认仅存储model_fn部分,在input_fn部分进行的一些无参数的特征工程等操作是不会被继承下来的,也就是说,线上线下一致性保障仅仅从model_fn处开始
      • 一种良好的编程习惯是:将线上特征处理操作在model_fn前完成,及tfrecord生成或input_fn中完成,后续的操作都放到model_fn中,从而保证线上线下的一致性

一般TensorFlow模型存储为PB文件

  • 参考链接:TensorFlow 保存模型为 PB 文件

变量的reuse和trainable

  • reuse和trainable互不影响
    • 两次resue变量时使用不同的trainable是不会影响变量复用的,只要name相同,符合reuse条件即可
    • 变量的trainable属性(是否被加入tf.GraphKeys.TRAINABLE_VARIABLES)由其第一次被定义时决定,后续对该变量的复用(reuse=true)将不会影响变量的trainable属性(不管后续变量申明时使用trainable=True or trainable=False)

tf.layers.batch_normalization()方法

  • 需要注意两个参数
    • training: 是否处于train模式?默认为False
      • True表示会根据当前的batch滑动平均更新均值和方差参数
      • False则表示不会更新该值,如果训练阶段设置为False,则滑动平均的均值和方差不会被更新
    • trainable: 是否将参数加入GraphKeys.TRAINABLE_VARIABLES中
      • 这里的参数不包括均值和方差(均值和方差由滑动平均根据Batch数据更新,不是训练参数)
      • 参数指beta和gamma
      • 只有参数被加入GraphKeys.TRAINABLE_VARIABLES中时才会被更新
  • 总结,参数training和trainable参数在训练阶段一般设置为True,其他阶段设置为False
    • 其中training负责滑动平均的均值和方差更新,设置为False,相当于均值和方差都不会自动更新

tf.identity、tf.Print与tf.gradients

  • tf.Print与tf.identity类似,可以认为是增加打印其他变量的tf.identity
  • tf.Print与tf.identity操作可以认为是复制操作,梯度为1
  • 注意:tf.Print与tf.identity都是图里面的一个操作,会分支出来一个节点,在求梯度时,必须保证目标自变量和因变量在同一个路径上(注意判断identity节点是否在梯度路径上)

梯度裁剪相关函数

  • 参考链接:https://www.cnblogs.com/marsggbo/p/10055760.html

不可导梯度处理

  • 在神经网络中,有许多包含不可导点的函数,常用的比如
    • MAE损失函数在x=0(或pred和label相等)时不可导
    • ReLU在x=0处不可导
  • 在TensorFlow中,定义这些函数时都可以直接正常定义,在计算梯度tf.gradients时,会自动处理不可导点,处理逻辑如下:
    • MAE中,x<0梯度为-1,x>0梯度为1,则x=0取两者之间的值均可,比如TensorFlow中默认取0
    • ReLU类似的,在x<0时梯度为0,x>0时梯度为1,则x=0时取两者之间的值均可,比如TensorFlow中默认取0
  • 注意,整数可以做MAE运算,但是不能求梯度,会报错TypeError: Fetch argument None has invalid type <type 'NoneType'>
    • 这一点上,所有运算都一样,整数不能求梯度

tf.squeeze慎用

  • 在预估服务中,往往是单个请求输入的,输入会导致tf.squeeze将该维度丢弃,从而产生一些意想不到的问题

tf.shape和a.get_shape

  • tf.shape: 返回一个Tensor
  • a.get_shape: 返回一个元组
    • 对于batch等未知值返回?, 例如(?, 1)
  • 在某些特殊情况下,使用tf.shape作为reshape的shape参数,会导致输出shape为None,但是使用a.get_shape则不会,建议任何地方均优先使用a.get_shape

TensorFlow——变量初始化

  • 注:本文的设置仅针对 TensorFlow 1.x

变量的内存分配与初始值设定

  • 变量创建 :在 TensorFlow 1.x 中,当你定义一个变量(Variable)时,实际上是在图中创建了一个占位符(创建了一个表示该变量的节点),它仅仅定义了变量的形状和数据类型 ,并未为其分配实际的内存空间和设定初始值

    1
    2
    3
    import tensorflow as tf

    my_variable = tf.Variable(initial_value=3.0, dtype=tf.float32)
    • 这里的 my_variable 只是一个图中的节点,它描述了变量的基本信息,但在内存中还没有真正存储值
  • 变量初始化 :在会话中执行初始化操作时(注意,初始化操作需要和变量在同一个图中),TensorFlow 会为这个变量分配内存空间,用于存储其值(注意:如果有多个会话绑定同一个图,变量需要重新初始化)

    • 在 TensorFlow 1.x 里,变量的状态是与会话(Session)相关联的。每个会话都有自己独立的变量副本和状态
    • 多个 Session 绑定同一个 Graph 的特殊场景 :当在一个会话中初始化变量时,只会影响该会话中的变量状态,其他会话中的同一变量仍然处于未初始化状态,如果有多个会话绑定同一个图,为了能在每个会话中正常使用变量,需要分别对每个会话中的变量进行初始化操作
  • 防止未定义行为 :如果不进行初始化就尝试使用变量,会导致未定义行为,因为变量在内存中的值是不确定的。这可能会引发错误或得到意外的计算结果。通过强制要求初始化变量,可以确保在使用变量之前,它们已经被正确地赋予了初始值,从而保证计算的正确性和稳定性


全局变量初始化

  • 在大多数情形下,你需要对所有的变量进行初始化。可以借助tf.global_variables_initializer()来实现这一目的。示例如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    import tensorflow as tf

    a = tf.Variable(3, name='a')
    b = tf.Variable(2, name='b')
    c = tf.add(a, b)

    with tf.Session() as sess:
    # 初始化所有全局变量
    init = tf.global_variables_initializer()
    sess.run(init)
    result = sess.run(c)
    print("结果: ", result)
    • 在这个示例中,tf.global_variables_initializer()会初始化所有定义的全局变量。在运行计算图之前,必须先运行这个初始化操作

初始化部分变量

  • 如果你只想初始化部分变量 ,可以使用tf.variables_initializer()。示例如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    import tensorflow as tf

    a = tf.Variable(3, name='a')
    b = tf.Variable(2, name='b')
    c = tf.add(a, b)

    with tf.Session() as sess:
    # 初始化部分变量
    init_ab = tf.variables_initializer([a, b])
    sess.run(init_ab)
    result = sess.run(c)
    print("结果: ", result)
    • 在这个示例中,tf.variables_initializer([a, b])仅初始化了变量a和b

初始化单个变量

  • 你还可以使用variable.initializer来初始化单个变量。示例如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    import tensorflow as tf

    a = tf.Variable(3, name='a')
    b = tf.Variable(2, name='b')
    c = tf.add(a, b)

    with tf.Session() as sess:
    # 初始化单个变量
    sess.run(a.initializer)
    sess.run(b.initializer)
    result = sess.run(c)
    print("结果: ", result)
    • 在这个示例中,分别对变量a和b进行了初始化

新加入的变量都要初始化

  • 在已经创建session后,依然是可以加入新的变量的,但是需要进行初始化才可以使用
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    import tensorflow as tf

    var1 = tf.Variable(3, dtype=tf.float32)
    var2 = tf.Variable(5, dtype=tf.float32)
    add_op1 = tf.add(var1, var2)

    sess = tf.Session()

    init_op = tf.global_variables_initializer() # 初始化所有变量
    sess.run(init_op)

    result1 = sess.run(add_op1)
    print("两个变量的和:", result1)

    # 在Session后继续创建变量
    var3 = tf.Variable(7, dtype=tf.float32)
    add_op2 = tf.add(add_op1, var3)

    init_new_var = tf.variables_initializer([var3]) # 初始化新创建的变量
    sess.run(init_new_var)

    result2 = sess.run(add_op2)
    print("三个变量的和:", result2)

    sess.close() # 显式关闭会话

    # 两个变量的和: 8.0
    # 三个变量的和: 15.0

附录:多次初始化变量会发生什么?

  • 在 TensorFlow 1.x 中,对同一个变量进行两次初始化后,从内存占用的角度来说,变量所占用的内存位置通常是一致的,但变量存储的值会被重置为初始值

  • 当你在 TensorFlow 1.x 里:

    • 定义一个变量时,实际上是在图中创建了一个表示该变量的节点
    • 在会话中执行初始化操作时,TensorFlow 会为这个变量分配内存空间,用于存储其值
    • 同一个Session中,再次对该变量执行初始化操作,并不会重新分配内存空间,而是直接在已分配的内存位置上修改存储的值
      • 同一个图中,内存分配操作只会在第一次初始化时发生
  • 示例代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    import tensorflow as tf

    var = tf.Variable(10, dtype=tf.float32)
    init = tf.global_variables_initializer()

    with tf.Session() as sess:
    # 第一次初始化变量
    sess.run(init)
    print("第一次初始化后变量的值:", sess.run(var))

    # 修改变量的值
    assign_op = var.assign(20)
    sess.run(assign_op)
    print("修改变量后的值:", sess.run(var))

    # 再次初始化变量
    sess.run(init)
    print("第二次初始化后变量的值:", sess.run(var))

    # 第一次初始化后变量的值: 10.0
    # 修改变量后的值: 20.0
    # 第二次初始化后变量的值: 10.0
    • 定义变量和初始化操作 :定义了变量 var,并创建了初始化所有全局变量的操作 init
    • 第一次初始化 :运行 init 操作,此时 TensorFlow 为变量 var 分配内存空间,并将初始值 10 存储在该内存位置
    • 修改变量的值 :使用 assign 操作将变量 var 的值修改为 20,这会直接在已分配的内存位置上更新存储的值
    • 第二次初始化 :再次运行 init 操作,TensorFlow 不会重新分配内存空间,而是直接将内存中存储的值重置为初始值 10

附录:多个Session执行同一个图

  • 核心注意点 :在 TensorFlow 1.x 中,当一个图被多个 Session 执行时,需要分别对每个 Session 中的变量进行初始化

  • 在 TensorFlow 1.x 里,变量的状态是与会话(Session)相关联的。每个会话都有自己独立的变量副本和状态。当你在一个会话中初始化变量时,只会影响该会话中的变量状态,其他会话中的同一变量仍然处于未初始化状态。因此,为了能在每个会话中正常使用变量,需要分别对每个会话中的变量进行初始化操作

  • 多个Session执行同一个图的示例如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    import tensorflow as tf

    # 创建一个图
    graph = tf.Graph()
    with graph.as_default():
    var = tf.Variable(10, dtype=tf.float32)
    add_op = tf.add(var, 5)
    init_op = tf.global_variables_initializer() # 注意:这个操作必须添加到图中,后续所有Session的初始化都可以调用这个操作

    # 创建第一个会话
    sess1 = tf.Session(graph=graph)
    # 初始化第一个会话中的变量
    sess1.run(init_op)
    result1 = sess1.run(add_op)
    print("第一个会话执行结果:", result1)

    # 创建第二个会话
    sess2 = tf.Session(graph=graph)
    try:
    result2 = sess2.run(add_op) # 尝试在未初始化变量的情况下执行操作
    except tf.errors.FailedPreconditionError:
    print("第二个会话未初始化变量,操作失败")
    # 初始化第二个会话中的变量
    sess2.run(init_op)
    result2 = sess2.run(add_op) # 在第二个会话中执行操作
    print("第二个会话初始化变量后执行结果:", result2)

    # 关闭会话以释放资源
    sess1.close()
    sess2.close()

    # 第一个会话执行结果: 15.0
    # 第二个会话未初始化变量,操作失败
    # 第二个会话初始化变量后执行结果: 15.0
    • 图的定义 :创建了一个图,并在图中定义了一个变量 var 和一个操作 add_op,同时定义初始化操作 init_op
    • 第一个会话 :创建第一个会话 sess1,对其中的变量进行初始化(执行图中的操作 init_op),然后执行操作并打印结果
    • 第二个会话 :创建第二个会话 sess2,尝试在未初始化变量的情况下执行操作,会抛出 FailedPreconditionError 异常;捕获异常后,对第二个会话中的变量进行初始化(执行图中的操作 init_op),再次执行操作并打印结果

Scala——Scala对象转Java对象

Background

  • 使用 Scala 语言调用 Java 的接口时,经常出现参数传递需要将 Scala 对象转换成 Java 对象的情况

转换方式

  • 增加一行接口引用

    1
    import scala.collection.JavaConverters._
  • 将对象转成Java对象

    • 原生的一些对象转换

      1
      2
      val a: Double = 1.0
      calDouble(java.lang.Double.valueOf(a)) // calDouble的参数要求是Java的对象
    • List 对象转换

      1
      2
      val a: List[Double] = List(1.0D,2.0D,3.0D,4.0D)
      calList(list.map(java.lang.Double.valueOf).asJava) // calDouble的参数要求是Java的对象

TensorFlow——LossNan问题


问题描述

  • 训练时梯度出现 nan 或 -nan
  • 问题详细描述:在训练时,出现异常:NanLossDuringTrainingError: NaN loss during training.

问题分析

  • 进一步分析,batch_size=1 且 worker=1 时,发现首先是线上梯度出现 nan 或 -nan,然后下一轮跌待中所有数据都是 nan,进一步导致 loss 为 nan
  • 在问题确认过程中,需要一步步排除:
    • 排除分母除 0 的情况,可以简单的用tf.div_no_nan代替\
    • 排除 log 参数为负数或 0 的情况
    • 排除抽取 batch 中选择符合条件的行后出现行数为 [] 的情况,使用 tf.shape(x)[0] 是否为 0 来判断是否为空
    • 重点:排除整数除法等导致的值为 0 的情况,这种情况比较隐晦,难以感知
    • 有时候会是 TensorFlow 版本导致,需要注意

TensorFlow——计算图管理


整体说明

  • 在 TensorFlow 1.x 里,placeholder函数用于创建占位符张量
  • 定义计算图时 :定义计算图时指定输入数据的形状和类型
  • 在会话中执行图时 :在运行计算图时需要为这些占位符张量提供具体数值
    • 注意:这个数值不能是TensorFlow的Tensor类型,必须是numpy或者Python原生类型
    • 注意:当实际运行会话(Session)并传入数据时,如果 placeholder 的类型与实际输入数据的类型不一致,TensorFlow 会抛出错误

placeholder函数的定义

  • 函数定义:

    1
    tf.placeholder(dtype, shape=None, name=None)
    • dtype:必须指定,代表占位符张量的数据类型,像tf.float32、tf.int32等
    • shape:可选参数,代表占位符张量的形状
      • 常见的设置为shape=(None, 2),表示第一维为batch_size,维度可变
      • 注:若设为shape=None,就表示可以接受任意形状的输入
    • name:可选参数,为占位符张量赋予名称,方便调试和可视化

placeholder函数的示例

  • 示例代码如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    import tensorflow as tf

    # 创建占位符
    x = tf.placeholder(tf.float32, shape=[None, 2], name='x')
    y = tf.placeholder(tf.float32, shape=[None, 1], name='y')

    # 定义计算图
    w = tf.Variable(tf.random_normal([2, 1]), name='w')
    b = tf.Variable(tf.zeros([1]), name='b')
    pred = tf.matmul(x, w) + b

    loss = tf.reduce_mean(tf.square(pred - y))
    optimizer = tf.train.GradientDescentOptimizer(learning_rate=0.01)
    train_op = optimizer.minimize(loss)

    # 初始化变量
    init = tf.global_variables_initializer()

    # 模拟数据
    import numpy as np
    x_data = np.array([[1, 2], [3, 4], [5, 6]])
    y_data = np.array([[3], [7], [11]])

    # 运行计算图
    with tf.Session() as sess:
    sess.run(init)
    for i in range(100):
    _, loss_val = sess.run([train_op, loss], feed_dict={x: x_data, y: y_data})
    if (i + 1) % 10 == 0:
    print(f'Step {i + 1}, Loss: {loss_val}')

    # Step 10, Loss: 0.23915673792362213
    # Step 20, Loss: 0.2156977653503418
    # Step 30, Loss: 0.19454030692577362
    # Step 40, Loss: 0.17545770108699799
    # Step 50, Loss: 0.15824727714061737
    # Step 60, Loss: 0.1427248865365982
    # Step 70, Loss: 0.1287250965833664
    # Step 80, Loss: 0.1160985603928566
    # Step 90, Loss: 0.10471046715974808
    # Step 100, Loss: 0.09443938732147217

placeholder使用过程中需要注意的问题

  • 数据类型匹配 :在使用feed_dict为占位符提供数据时,要保证提供的数据类型和占位符定义的数据类型一致,不然会引发类型错误
  • 形状匹配 :提供的数据形状必须和占位符定义的形状相匹配。要是占位符形状中有None,那就表示该维度可以接受任意长度的输入
  • 必须提供值 :在运行计算图时,所有依赖占位符的操作都得在feed_dict里为占位符提供具体的值,否则会抛出异常
    • 计算目标不依赖的的不输入
  • 性能问题 :频繁使用feed_dict会带来一定的性能开销,尤其是在处理大规模数据时。可以考虑使用tf.data.Dataset来提升性能

附录:使用tf.data.Dataset提升性能

  • tf.data.Dataset提升性能的示例代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    import tensorflow as tf
    import numpy as np

    x_data = np.array([[1, 2], [3, 4], [5, 6]], dtype=np.float32)
    y_data = np.array([[3], [7], [11]], dtype=np.float32)

    dataset = tf.data.Dataset.from_tensor_slices((x_data, y_data)) # 创建 tf.data.Dataset
    dataset = dataset.repeat().batch(3) # 对数据集进行重复和批量处理
    iterator = dataset.make_one_shot_iterator()# 创建迭代器
    next_x, next_y = iterator.get_next() # 获取下一个批次的数据

    # 定义计算图
    w = tf.Variable(tf.random_normal([2, 1]), name='w')
    b = tf.Variable(tf.zeros([1]), name='b')
    pred = tf.matmul(next_x, w) + b

    # 定义损失函数
    loss = tf.reduce_mean(tf.square(pred - next_y))

    # 定义优化器
    optimizer = tf.train.GradientDescentOptimizer(learning_rate=0.01)
    train_op = optimizer.minimize(loss)

    # 初始化变量
    init = tf.global_variables_initializer()

    # 运行计算图
    with tf.Session() as sess:
    sess.run(init)
    for i in range(100):
    _, loss_val = sess.run([train_op, loss]) # 由于不使用placeholder,故而无需使用feed_back
    if (i + 1) % 10 == 0:
    print(f'Step {i + 1}, Loss: {loss_val}')
  • 同时使用Dataset和placeholder的示例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    import tensorflow as tf
    import numpy as np

    train_x = np.array([[1, 2], [3, 4], [5, 6]], dtype=np.float32)
    train_y = np.array([[3], [7], [11]], dtype=np.float32)

    # 创建训练数据集
    train_dataset = tf.data.Dataset.from_tensor_slices((train_x, train_y))
    train_dataset = train_dataset.shuffle(buffer_size=len(train_x)).batch(3).repeat()

    # 创建迭代器
    train_iterator = train_dataset.make_one_shot_iterator()
    next_train_x, next_train_y = train_iterator.get_next()

    # 通过placeholder定义模型
    x = tf.placeholder(tf.float32, shape=[None, 2])
    w = tf.Variable(tf.random_normal([2, 1]), name='w')
    b = tf.Variable(tf.zeros([1]), name='b')
    pred = tf.matmul(x, w) + b

    # 通过Dataset输出,重新定义训练损失函数(包含模型结构,即如何训练模型变量 w)
    train_pred = tf.matmul(next_train_x, w) + b
    train_loss = tf.reduce_mean(tf.square(train_pred - next_train_y))

    # 定义优化器
    optimizer = tf.train.GradientDescentOptimizer(learning_rate=0.01)
    train_op = optimizer.minimize(train_loss)

    # 初始化变量
    init = tf.global_variables_initializer()

    # 训练模型
    with tf.Session() as sess:
    sess.run(init)
    for i in range(100):
    sess.run(train_op) # 由于train_op不依赖placeholder,无需使用feed_back

    # 使用 feed_dict 进行推理
    test_x = np.array([[7, 8]], dtype=np.float32)
    prediction = sess.run(pred, feed_dict={x: test_x}) # pred依赖placeholder,故而需要使用feed_back
    print("推理结果:", prediction)

TensorFlow——分布式训练之PS架构


整体说明

  • 深度学习的PS(Parameter Server,参数服务器)架构是一种分布式训练框架,用于高效管理大规模模型参数的更新与同步
  • 其核心思想是将参数存储和计算分离,通过一个或多个中心服务器(Parameter Server)集中维护模型参数,而工作节点(Worker)负责本地计算和梯度更新,并通过网络与服务器通信
  • PS 架构支持异步或同步训练,适合处理海量数据和超大规模模型,显著提升了分布式深度学习的扩展性和效率

角色介绍

Parameter Server(参数服务器,PS)

  • 角色 :
    • 负责存储和更新模型的全局参数(如神经网络的权重、梯度等)
    • 聚合来自不同Worker的梯度,应用优化算法(如SGD)更新参数
    • 提供参数的集中式管理,确保一致性
  • 机器类型 :
    • 通常是高性能的CPU服务器(因为参数更新是计算密集型操作,但不需要GPU的并行计算能力)
    • 可以是单台机器或多台机器组成的集群(根据模型规模横向扩展)

Worker(工作节点)

  • 角色 :
    • 负责计算 :读取训练数据、执行前向传播和反向传播,计算本地梯度
    • 将梯度发送给PS,或从PS拉取最新的参数(同步或异步更新)
    • 如果Worker包含Chief(主Worker),它还负责模型初始化、检查点保存、日志汇总等额外任务
  • 机器类型 :
    • 通常是配备GPU的机器(适合大规模矩阵运算)
    • 每个Worker可以独立处理一个数据分片(数据并行)

其他辅助角色:

  • Chief Worker(可选):
    • 一个特殊的Worker(通常编号为worker0),负责全局协调(如初始化参数、恢复训练、保存模型等)
    • 在TF 2.x中,这部分功能逐渐被整合到更高级的API(如tf.distribute.Strategy)中
  • Evaluator(评估器,可选):
    • 独立于训练过程,定期加载模型快照进行验证/测试

一次完整训练过程

  • 以下是包含 4个PS、8个Worker(含1个Chief Worker)和 1个Evaluator 的分布式TensorFlow训练过程的详细步骤

初始化阶段

  • Chief Worker(worker0) :
    • 构建计算图,定义模型结构(如神经网络层、损失函数、优化器等)
    • 生成参数的初始值(如随机初始化),并将这些参数分片推送到4个PS(每个PS存储部分参数)
    • 通知其他Worker和Evaluator初始化完成
  • PS :
    • 存储Chief Worker推送的初始参数(每个PS负责存储分配给自己的参数分片)
    • 等待Worker的梯度更新请求
  • 其他Worker(worker1~worker7) :
    • 等待Chief Worker完成参数初始化
    • 从PS拉取各自所需的参数分片
  • Evaluator :
    • 从PS拉取初始参数,准备后续的验证任务

训练阶段(同步更新示例)

步骤① Worker计算梯度
  • 每个Worker(包括Chief):
    • 从本地数据分片中读取一个batch的数据
    • 从PS拉取最新的参数(全量或分片)
    • 执行前向传播和反向传播,计算本地梯度
步骤② 梯度聚合与参数更新
  • 等待所有8个 Worker 完成梯度计算并上传梯度到 PS
  • PS 聚合所有 Worker 的梯度(求平均)
  • PS 应用优化器(如SGD)更新参数
  • 等待 PS 参数更新完成后,Worker 拉取新参数进入下一轮训练
步骤③ 循环迭代
  • 重复步骤①~②,直到达到最大训练步数或收敛

验证(由 Evaluator 并行执行)

  • 每隔 N 个训练步:
    • Evaluator 从 PS 拉取最新参数
    • 在独立的验证数据集上计算指标(如准确率、损失)
    • 将结果反馈给 Chief Worker(可选)

检查点保存(由 Chief Worker 负责)

  • 每隔 M 个训练步:
    • Chief Worker 将模型参数和训练状态保存到磁盘(Chief 可部分读取PS参数,增量写入 Checkpoint)
    • 如果训练中断,可从检查点恢复

关键交互流程说明

1
2
3
4
5
Chief -> PS : 初始化(推送)/保存模型(拉取)
Worker -> PS : 拉取参数
Worker -> PS : 推送梯度
PS -> PS : 内部同步参数分片(如需,一般怒需要这一步)
Evaluator -> PS : 拉取参数验证

角色分工总结

角色 数量 职责
PS 4 存储参数分片,接收梯度并更新参数。
Worker 8 计算梯度(worker0是Chief,负责初始化/保存模型)。
Evaluator 1 定期验证模型性能,不影响训练流程。

其他注意事项

  • PS 不需要 GPU,使用大内存 + CPU 即可
  • PS 的数量一般为1个即可,除非参数量很大(一个存不下),此外,如果 worker 数量太多时,也可以适当增加 PS 数量,防止网络带宽成为瓶颈
  • 可选择异步训练,此时Worker无需等待其他节点,直接推送梯度到PS并更新参数(但可能梯度冲突)
  • Worker不一定需要存储全部参数,每次可以仅拉去一个层或者某个特定参数进行计算
  • 在 TensorFlow 的 PS 架构中,训练过程默认是异步的 ,但也可以通过配置实现同步训练
  • PS 架构可以支持Torch,但需要结合特定的第三方工具或框架来实现分布式训练

附录:训练代码示例

  • 一个简单的 PS 架构分布式训练代码Demo

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    import tensorflow as tf
    import os
    import argparse

    def build_model():
    """简单模型构建"""
    model = tf.keras.Sequential([
    tf.keras.layers.Dense(64, activation='relu', input_shape=(784,)),
    tf.keras.layers.Dense(10, activation='softmax')
    ])
    model.compile(
    optimizer='adam',
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
    )
    return model

    def main(args):
    # 设置TF_CONFIG环境变量
    tf_config = {
    'cluster': {
    'chief': [args.chief],
    'worker': args.workers,
    'ps': args.ps_servers, # 参数服务器角色
    'evaluator': [args.evaluator]
    },
    'task': {
    'type': args.task_type,
    'index': args.task_index
    }
    }
    os.environ['TF_CONFIG'] = tf.constant(tf_config) # 所有任务的这个配置都是一样的

    # 根据任务类型选择不同的分布式策略
    if args.task_type in ['chief', 'worker', 'ps']:
    # 使用ParameterServerStrategy
    strategy = tf.distribute.experimental.ParameterServerStrategy()
    else:
    # Evaluator不需要分布式策略
    strategy = tf.distribute.get_strategy()

    # 数据加载和预处理
    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
    x_train = x_train.reshape(-1, 784).astype('float32') / 255.0
    x_test = x_test.reshape(-1, 784).astype('float32') / 255.0

    # 创建数据集
    train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(10000).batch(64)
    test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(64)

    # 根据任务类型执行不同操作
    if args.task_type == 'chief':
    print("Running chief task...")
    with strategy.scope():
    # chief下,下面的语句会进行参数初始化
    model = build_model() # 上面指定的strategy策略会依据不同角色做不同的事情,这里 ParameterServerStrategy 策略会在首席节点(chief)对模型参数进行初始化,之后再把这些参数分发给各个工作节点(worker)和参数服务器(PS)

    callbacks = [
    tf.keras.callbacks.ModelCheckpoint(filepath='./checkpoints/model.ckpt'),
    tf.keras.callbacks.TensorBoard(log_dir='./logs')
    ]

    model.fit(train_dataset, epochs=10, callbacks=callbacks)
    print("Chief training completed.")

    elif args.task_type == 'worker':
    print(f"Running worker task {args.task_index}...")
    with strategy.scope():
    model = build_model() # strategy策略会依据不同角色做不同的事情

    model.fit(train_dataset, epochs=10)
    print(f"Worker {args.task_index} training completed.")

    elif args.task_type == 'ps':
    print(f"Running parameter server task {args.task_index}...")
    # 参数服务器不需要显式代码,策略会自动管理
    server = tf.distribute.Server(
    tf_config['cluster']['ps'][args.task_index],
    job_name="ps",
    task_index=args.task_index
    )
    server.join() # 参数服务器会一直运行直到训练结束

    elif args.task_type == 'evaluator':
    print("Running evaluator task...")
    model = build_model()
    model.load_weights('./checkpoints/model.ckpt') # 加载最新的模型,进行评估工作
    eval_results = model.evaluate(test_dataset)
    print(f"Evaluation results: {eval_results}")

    if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('--chief', type=str, required=True, help='Chief address')
    parser.add_argument('--workers', type=str, nargs='+', required=True, help='Worker addresses')
    parser.add_argument('--ps_servers', type=str, nargs='+', required=True, help='Parameter server addresses')
    parser.add_argument('--evaluator', type=str, required=True, help='Evaluator address')
    parser.add_argument('--task_type', type=str, required=True, choices=['chief', 'worker', 'ps', 'evaluator'], help='Task type')
    parser.add_argument('--task_index', type=int, required=True, help='Task index')

    args = parser.parse_args()
    main(args)
  • 注:以上是TensorFlow 2.x版本(tf.distribute.experimental.ParameterServerStrategy()就是TensorFlow 2.x才有的)

    • 实际上,TensorFlow 1.x也可以类似实现(使用tf.train.Server)
    • 同时TensorFlow 1.x的 Estimator API提供了自己的一些自己的训练形式(tf.estimator)

启动脚本

  • 注:以下脚本均以localhost为例,实际使用时需要替换为对应不同服务器的IP

  • 启动chief节点:

    1
    2
    3
    4
    5
    6
    7
    python distributed_training.py \
    --chief="localhost:2222" \
    --workers="localhost:2223" "localhost:2224" \
    --ps_servers="localhost:2225" "localhost:2226" \
    --evaluator="localhost:2225" \
    --task_type="chief" \
    --task_index=0
  • 启动worker节点:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    # Worker 0
    python distributed_training.py \
    --chief="localhost:2222" \
    --workers="localhost:2223" "localhost:2224" \
    --ps_servers="localhost:2225" "localhost:2226" \
    --evaluator="localhost:2225" \
    --task_type="worker" \
    --task_index=0

    # Worker 1
    python distributed_training.py \
    --chief="localhost:2222" \
    --workers="localhost:2223" "localhost:2224" \
    --ps_servers="localhost:2225" "localhost:2226" \
    --evaluator="localhost:2225" \
    --task_type="worker" \
    --task_index=1
  • 启动evaluator节点:

    1
    2
    3
    4
    5
    6
    7
    python distributed_training.py \
    --chief="localhost:2222" \
    --workers="localhost:2223" "localhost:2224" \
    --ps_servers="localhost:2225" "localhost:2226" \
    --evaluator="localhost:2225" \
    --task_type="evaluator" \
    --task_index=0
  • 启动 PS 节点

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    # 启动参数服务器0
    python distributed_training.py \
    --chief="localhost:2222" \
    --workers="localhost:2223" "localhost:2224" \
    --ps_servers="localhost:2225" "localhost:2226" \
    --evaluator="localhost:2227" \
    --task_type="ps" \
    --task_index=0

    # 启动参数服务器1
    python distributed_training.py \
    --chief="localhost:2222" \
    --workers="localhost:2223" "localhost:2224" \
    --ps_servers="localhost:2225" "localhost:2226" \
    --evaluator="localhost:2227" \
    --task_type="ps" \
    --task_index=1
1…555657…64
Joe Zhou

Joe Zhou

Stay Hungry. Stay Foolish.

631 posts
53 tags
GitHub E-Mail
© 2026 Joe Zhou
Powered by Hexo
|
Theme — NexT.Gemini v5.1.4