- 参考链接:
Megatron 整体说明
- Megatron 是由 NVIDIA 开发的一个用于训练 LLM 的高性能框架
- Megatron 专为分布式训练优化,支持模型并行、数据并行和混合精度训练,能够在数千个 GPU 上高效运行
- Megatron 集成了多种优化技术,包括张量并行、管道并行、激活检查点等,显著提升了超大规模模型的训练效率
- Megatron 通常与 DeepSpeed 结合使用,形成 Megatron-DeepSpeed 框架,进一步增强训练能力
Megatron 安装
- Megatron 的安装需要结合 NVIDIA 的环境和依赖,以下是详细的安装步骤:
- 推荐在 Linux 系统上安装
安装依赖项 PyTorch
- 根据 CUDA 版本安装对应的 PyTorch:
1
2
3
4
5# 对于CUDA 11.8
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
# 对于CUDA 12.1
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121
安装 Megatron
- 下载源码并安装
1
2
3
4
5
6
7
8
9
10
11
12
13git clone https://github.com/NVIDIA/Megatron-LM.git
cd Megatron-LM
pip install -r requirements.txt
# 安装apex(用于混合精度训练)
git clone https://github.com/NVIDIA/apex
cd apex
pip install -v --disable-pip-version-check --no-cache-dir --global-option="--cpp_ext" --global-option="--cuda_ext" ./
cd ..
# 安装其他依赖
pip install sentencepiece regex datasets
验证安装是否成功
- 创建一个简单的Python脚本来验证 Megatron 是否正确安装:
1
2
3
4
5
6import torch
from megatron import get_args
from megatron.model import GPTModel
# 能导入相关包,说明 Megatron 已成功安装
print("Megatron安装成功!")
Megatron 数据处理
数据预处理负责将
.jsonl的文本数据 tokenize 并处理成 Megatron 可以直接读取的数据格式(.bin和.idx类型的文件),减少训练时的数据处理时间准备
.jsonl文件,文件格式如下:1
2{"text": "Your training text here..."}
{"text": "Another training sample..."}数据预处理:
1
2
3
4
5
6
7python tools/preprocess_data.py \
--input data.jsonl \
--output-prefix processed_data \
--tokenizer-type HuggingFaceTokenizer \
--tokenizer-model /path/to/tokenizer.model \
--workers 8 \
--append-eodoutput-prefix:输出文件的前缀append-eod:是否添加 EOD Token?- 注意:还可以根据需要设置
split_sentences参数,对文档进行拆分成 sentence 再做 tokenize
Megatron 训练开源标准大模型
- Megatron 支持对一些标准的开源大模型进行训练,比如 GPT2,此时不需要修改代码
- 这种标准流程包含以下两部分:
- 需要处理数据为 Megatron 支持的格式
- 使用命令行启动任务
- 本文暂不对这部分进行详细讲解
Megatron 训练自定义模型
核心思路
- 需要做到如下事情
- 1)先把 Megatron 自带的 GPT/BERT/T5 的「壳」理解透
- 2)再把自己的网络结构「套」进去
- 3)最后复用 Megatron 的并行、优化器、数据管道即可
- 基本思路:
- 使用 Megatron 训练自定义模型,不需要改 Megatron 核心 ,只实现 3-4 个钩子即可
- 一些底层的高阶功能,如并行、混合精度、检查点 全部复用官方实现
- 任何模型(CNN、RWKV、RetNet…)只要包装成
MegatronModule并返回 loss ,都能用 Megatron 训练
目录结构
- 目录结构如下,建议自建文件都放到统一的新文件夹
my_model下1
2
3
4
5
6
7
8Megatron-LM/
├─ megatron/ # 官方代码不动
├─ examples/ # 官方示例
├─ my_model/ # 我们自己的
│ ├─ __init__.py
│ ├─ model.py # 自定义网络
│ ├─ layer.py # 自定义层
│ └─ train.py # 入口脚本
自定义模型的 4 个关键钩子说明
- Megatron 的训练循环入口是
pretrain(),它通过 4 个可插拔函数 决定「数据长什么样、模型长什么样、前向怎么算、验证看啥指标」:钩子名称 作用 示例文件 model_provider返回 nn.Module my_model/model.pytrain_valid_test_dataset_provider返回三个 Dataset my_model/data.pyforward_step_func定义一次前向/损失 my_model/train.pyprocess_non_loss_data_funcTensorBoard 画额外指标(可选) my_model/train.py
模型钩子(model_provider)
- 新建
my_model/model.py:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27from megatron.model.module import MegatronModule
from megatron import get_args
class MyCustomModel(MegatronModule):
def __init__(self, num_tokentypes=0):
super().__init__()
args = get_args()
self.embed = nn.Embedding(args.padded_vocab_size, args.hidden_size)
# 这里换成自己的自定义层
self.backbone = ConvNextBackbone(args.hidden_size, args.num_layers)
self.lm_head = nn.Linear(args.hidden_size, args.padded_vocab_size)
def forward(self, input_ids, position_ids, attention_mask, labels=None):
x = self.embed(input_ids)
x = self.backbone(x, attention_mask) # [b, s, h]
logits = self.lm_head(x) # [b, s, V]
if labels is None:
return logits
loss = F.cross_entropy(logits.view(-1, logits.size(-1)), labels.view(-1))
# Megatron 的 GPTModel 实现也是在 Model.forward 直接返回 loss 的
return loss
def model_provider(pre_process=True, post_process=True):
"""给 Megatron 调用的工厂函数,负责返回 MegatronModule 类的模型对象"""
return MyCustomModel()
数据钩子(train_valid_test_dataset_provider)
- 把
.jsonl/txt转成 Megatron 的IndexedDataset:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15# my_model/data.py
from megatron.data.dataset_utils import build_train_valid_test_datasets
from megatron import get_args
def train_valid_test_dataset_provider(train_val_test_num_samples):
args = get_args()
return build_train_valid_test_datasets(
data_prefix=args.data_path,
splits_string=args.split,
train_valid_test_num_samples=train_val_test_num_samples,
seq_length=args.seq_length,
masked_lm_prob=0.15 if args.task == 'BERT' else 0.0,
seed=args.seed,
skip_warmup=True,
)
前向钩子(forward_step_func)
- 定义前向过程(包含
loss计算,需要返回loss)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20# my_model/train.py
from megatron.training import get_model
from megatron.utils import average_losses_across_data_parallel_group
from megatron import get_args
def forward_step(data_iterator, model):
"""一次 micro-batch 的前向"""
args = get_args()
tokens = next(data_iterator)['text'].long().cuda()
labels = tokens[:, 1:].contiguous()
tokens = tokens[:, :-1]
position_ids = torch.arange(tokens.size(1), device=tokens.device).unsqueeze(0)
attention_mask = (tokens != args.pad_token_id).unsqueeze(1).unsqueeze(2)
# 因为传入 labels 参数时,模型的 forward 已经计算出来 loss 了,这里可以不需要自己写参数
loss = model(tokens, position_ids, attention_mask, labels)
reduced = average_losses_across_data_parallel_group([loss])
# 第一个返回值必须是 loss,第二个返回值可以是任意想要记录的辅助信息
return loss, {'lm loss': reduced[0]}
将钩子传入 pretrain() 函数
- 调用
pretrain(),传入前面定义的钩子1
2
3
4
5
6
7
8
9
10
11# my_model/train.py
from megatron.training import pretrain
if __name__ == '__main__':
pretrain(
train_valid_test_dataset_provider, # 用于提供训练、验证和测试数据集的函数或模块
model_provider, # 用于构建模型的函数,调用它可返回模型实例
ModelType.encoder_or_decoder, # 或 encoder_decoder
forward_step, # 定义模型前向传播步骤的函数,包括输入处理、模型计算和损失计算等
process_non_loss_data_func=None # tensorboard 的 额外指标,process_non_loss_data_func 在这里暂未实现
)
启动训练
- 单节点 8 卡示例:
1
2
3
4
5
6
7
8
9
10
11
12torchrun --nproc_per_node=8 my_model/train.py \
--tensor-model-parallel-size 2 \
--pipeline-model-parallel-size 2 \
--num-layers 12 \
--hidden-size 768 \
--data-path data/my_corpus \
--seq-length 1024 \
--micro-batch-size 4 \
--global-batch-size 64 \
--train-iters 50000 \
--lr 1e-4 \
--save checkpoints/myconvnext
附录:使用中遇到的常见问题
- 显存 OOM 问题:
- 解决方案:降 micro-batch、开
--recompute-activations、或加大 TP/PP
- 解决方案:降 micro-batch、开
- loss 不收敛 问题:
- 检查学习率、warmup、初始化;确认
pad_token_id设置正确
- 检查学习率、warmup、初始化;确认
- 多机多卡时卡死不动:
- 确认
MASTER_ADDR/MASTER_PORT一致,NCCL 版本统一等
- 确认
附录:pretrain 函数的详细说明
- Megatron-LM 中的
pretrain函数是模型预训练的核心入口,定义在megatron/training.py文件中
pretrain() 函数参数
train_valid_test_dataset_provider:用于提供训练、验证和测试数据集的函数或模块model_provider:用于构建模型的函数,调用它可返回模型实例model_type:模型的类型,如ModelType.encoder_or_decoderforward_step_func:定义模型前向传播步骤的函数,包括输入处理、模型计算和损失计算等valid_forward_step_func:可选参数,用于验证阶段的前向传播函数args_defaults:可选参数,包含默认的参数设置
pretrain() 函数内部执行的主要流程
- 第一步,初始化Megatron :
- 调用
initialize_megatron函数,初始化 Megatron-LM 所需的分布式环境,包括设置分布式通信后端、初始化分布式进程组、配置日志记录等 - 还会调用
get_args()与get_timers()函数获取配置参数与计时器,并设置 PyTorch JIT 融合选项,同步启动时间
- 调用
- 第二步,设置模型、优化器和学习率计划 :
- 调用
setup_model_and_optimizer函数,传入model_provider和model_type,返回模型、优化器以及学习率调度器
- 调用
- 第三步,获取训练/验证/测试数据集 :
- 根据
args.virtual_pipeline_model_parallel_size是否为None来判断是否需要进行虚拟流水线模型并行处理 - 如果不进行虚拟流水线模型并行,则直接调用
build_train_valid_test_data_iterators函数,获取训练、验证和测试数据迭代器
- 根据
- 第四步,调用train函数训练模型 :
- 判断
args.do_train和args.train_iters是否满足条件,若满足则调用train函数执行训练过程 train函数接收多个参数,包括前向传播步骤函数、模型、优化器、学习率调度器、训练数据和验证数据迭代器等,并返回最后一次迭代的索引和到目前为止执行的浮点运算次数
- 判断
附录:如何修改优化器
- Megatron-LM 里“指定/切换优化器”有两种主流做法:
- 第一种:不动源码,靠命令行参数(最简单,官方已内置)
- 第二种:改源码,注册自定义优化器(想换 Lion、RAdam 等第三方优化器时用)
第一种:命令行直接切换(无需改代码)
- 注:Megatron 从 2024-10 之后的版本开始,把
optimizer也暴露成了 CLI 参数:主要参数 取值 说明 --optimizeradam,sgd默认 adam,会自动选用 Apex 的 FusedAdam--adam-beta10.9 --adam-beta20.95 --adam-eps1e-8 --sgd-momentum0.9 只在 --optimizer sgd时生效--weight-decay0.1 通用 --clip-grad1.0 梯度裁剪 - 示例:把优化器换成 SGD + momentum 的启动脚本如下:
1
2
3
4
5torchrun --nproc_per_node=8 pretrain_gpt.py \
... \
--optimizer sgd \
--sgd-momentum 0.9 \
--weight-decay 1e-4
第二种:源码级自定义优化器(以 Lion 为例)
当你想用官方未内置的优化器(Lion、AdaFactor、RAdam等)时,只要三步:
第一步:在
megatron/core/optimizer/里新建文件lion.py,并定义自己的优化器类1
2
3
4import torch
class Lion(torch.optim.Optimizer): # 继承 Optimizer 类
def __init__(self, params, lr=1e-4, betas=(0.9, 0.99), weight_decay=0.0):
...- 问题:需要特殊处理的优化器,比如可能涉及其他更多超参数的优化器,还需要考虑
第二步:在
megatron/core/optimizer/__init__.py的_get_megatron_optimizer()中注册新的优化器1
2elif opt == 'lion':
optimizer = Lion(param_groups, lr=args.lr, weight_decay=args.weight_decay)第三步:启动脚本里添加新的优化器选项
1
--optimizer lion
Megatron 会自动把上述自定义的 Lion 优化器包装到 DistributedOptimizer(或 DeepSpeed ZeRO,如果启用)里,梯度同步、fp16/bf16 主参数、checkpoint 保存/加载全部复用现有逻辑
附录:CPU Offload & 显存优化
- Megatron 支持把优化器状态卸载到 CPU 以减少显存:
1
2--optimizer-cpu-offload \
--optimizer-offload-fraction 0.8 # 80% 状态放 CPU
附录:Megatron 使用代码简单示例
下面是一个使用 Megatron 训练 GPT 模型的示例代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64import os
import torch
from megatron import get_args
from megatron import mpu
from megatron.initialize import initialize_megatron
from megatron.training import setup_model_and_optimizer
from megatron.model import GPTModel
from megatron.training import train_step
from megatron.data.gpt_dataset import build_train_valid_test_datasets
def main():
# 初始化Megatron
initialize_megatron(extra_args_provider=None, args_defaults={
'tokenizer_type': 'GPT2BPETokenizer',
'micro_batch_size': 4,
'global_batch_size': 32,
'lr': 0.00015,
'min_lr': 0.00001,
'lr_decay_style': 'cosine',
'weight_decay': 0.1,
'clip_grad': 1.0,
'lr_warmup_fraction': 0.01,
'num_layers': 24,
'hidden_size': 1024,
'num_attention_heads': 16,
'seq_length': 1024,
'max_position_embeddings': 1024,
'vocab_size': 50257, # GPT-2 vocab size
'tensor_model_parallel_size': 2,
'pipeline_model_parallel_size': 2,
'pipeline_model_parallel_split_rank': 0,
'fp16': True,
'bf16': False,
'seed': 1234,
})
args = get_args()
# 构建数据集
train_ds, valid_ds, test_ds = build_train_valid_test_datasets(
data_prefix=args.data_path,
data_impl=args.data_impl,
splits_string=args.split,
train_valid_test_num_samples=[args.train_samples, args.valid_samples, args.test_samples],
seq_length=args.seq_length,
seed=args.seed,
skip_warmup=(not args.mmap_warmup)
)
# 设置模型和优化器
model, optimizer, lr_scheduler = setup_model_and_optimizer()
# 训练循环
iteration = 0
max_iterations = args.train_iters
while iteration < max_iterations:
loss = train_step(model, optimizer, lr_scheduler, train_ds)
if torch.distributed.get_rank() == 0 and iteration % args.log_interval == 0:
print(f"迭代: {iteration}/{max_iterations}, 损失: {loss.item()}")
iteration += 1
if __name__ == "__main__":
main()通常使用以下命令启动训练(假设每台机器使用 4 个 GPU):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22# 使用torchrun启动分布式训练
torchrun --nproc_per_node=4 --master_port=12345 your_script.py \
--data-path /path/to/your/dataset \
--vocab-file /path/to/vocab.json \
--merge-file /path/to/merges.txt \
--save /path/to/save/checkpoints \
--load /path/to/load/checkpoints \
--num-layers 24 \
--hidden-size 1024 \
--num-attention-heads 16 \
--seq-length 1024 \
--max-position-embeddings 1024 \
--micro-batch-size 4 \
--global-batch-size 32 \
--lr 0.00015 \
--min-lr 0.00001 \
--lr-decay-style cosine \
--lr-warmup-fraction 0.01 \
--weight-decay 0.1 \
--clip-grad 1.0 \
--fp16 \
--seed 1234
附录:Megatron 中间数据 decode 查看
- 示例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43import os
import torch
# 读取 tokens.bin(假设为 int32 类型,根据预处理配置调整 dtype)
tokens_path = "./processed_tokens_document.bin"
dtype = torch.int32 # 假设是int32,根据实际预处理配置调整(如int64)
bytes_per_token = dtype.itemsize # int32->4字节,int64->8字节
# 获取文件总大小(字节)
file_size = os.path.getsize(tokens_path)
# 计算总token数(总字节数 / 每个token的字节数)
total_tokens = file_size // bytes_per_token
print(f"文件总大小:{file_size} 字节,总token数:{total_tokens}")
from transformers import AutoModelForCausalLM, AutoTokenizer
model_name = "./path_to_model/"
# load the tokenizer and the model
tokenizer = AutoTokenizer.from_pretrained(model_name)
# 抽查的 Token 数量
block_size = 10000
# 分块读取并解码
with open(tokens_path, "rb") as f:
for start in range(0, total_tokens, block_size):
# 计算当前块的结束位置(不超过总token数)
end = min(start + block_size, total_tokens)
current_block_size = end - start
# 读取当前块的二进制数据(字节数 = token数 × 每个token的字节数)
f.seek(start * bytes_per_token) # 移动到当前块的起始位置
block_bytes = f.read(current_block_size * bytes_per_token)
# 将二进制数据转为torch tensor(token索引)
block_tokens = torch.frombuffer(block_bytes, dtype=dtype)
# 解码当前块为文本
block_text = tokenizer.decode(block_tokens.tolist(), skip_special_tokens=False) # skip_special_tokens=True 会缺失 Special Token
print(f"处理块 {start//block_size + 1}/{(total_tokens + block_size -1)//block_size}")
print(block_text)
break # 打开 break 可不断循环读取
附录:ckpt 文件清理
- 使用 Megatron-LM 训练模型时,为了保证可恢复,常常会出现存储多个 ckpt 的情况,一般是一定的步骤就存储一个 ckpt
- 当实验完成后一般仅保留最后一个即可
- Meagtron-LM 的每个 ckpt 中,都完整存储着从这个 ckpt 启动继续训练所需的所有文件,包括模型权重文件等
1
2distrib_optim.pt:分布式优化器状态分片,训练恢复时用
model_optim_rng.pt:随机数生成器状态,训练恢复时用,可能包含模型权重等,根据具体场景可能部分
脚本编写
- 脚本编写的基本要求为:
- 删除某个目录(用参数传入)下所有满足条件的文件夹(包括子文件夹):
- 1)创建日期在 “2024-08-01” 到 “2024-08-10” 之间的
- 2)文件名以
iter_000x命名,且x是100 的整倍数,比如iter_0000600或iter_0001000等 - 3)当前同级目录下还存在以
iter_000x命名,且 x 比自己大的文件夹
- 删除前要求如下:
- 删除每个文件夹时,先询问是否删除,必须等待回应才继续(同时打印被删除的文件夹及其同级的其他文件夹和文件),Y表示删除,N表示不删除,直接Enter表示不删除;
- 注意:为了安全起见,一定要收到 “Y” 作为输入再删除,否则不删除,避免误删
- 删除某个目录(用参数传入)下所有满足条件的文件夹(包括子文件夹):
- 下面是一个脚本实现(大模型实现,经过本人部分修改),用于帮助清理 ckpt(已经测试过可以使用):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92#!/bin/bash
# 用法: ./clean_x00_steps_in_ckpt.sh /path/to/target_dir
TARGET_DIR="$1"
if [ -z "$TARGET_DIR" ]; then
echo "❌ 请传入目标目录路径"
exit 1
fi
if [ ! -d "$TARGET_DIR" ]; then
echo "❌ 目录不存在: $TARGET_DIR"
exit 1
fi
# step_interval
step_interval=100
# 日期范围
START_DATE="2025-08-01"
END_DATE="2025-11-10"
# 转换为时间戳方便比较
start_ts=$(date -d "$START_DATE" +%s)
end_ts=$(date -d "$END_DATE" +%s)
# 遍历所有匹配 iter_000x 的文件夹(递归)
find "$TARGET_DIR" -type d -regextype posix-egrep -regex '.*/iter_000[0-9]+' | while read -r dir; do
basename=$(basename "$dir")
# 提取数字部分
num=$(echo "$basename" | sed -E 's/iter_0+([0-9]+)/\1/')
# 判断是否是 step_interval 的倍数
if (( num % step_interval != 0 )); then
continue
fi
# 获取创建日期(Linux & macOS兼容)
if [[ "$OSTYPE" == "darwin"* ]]; then
create_date=$(stat -f "%SB" -t "%Y-%m-%d" "$dir")
else
create_date=$(stat -c %w "$dir")
if [ "$create_date" = "-" ]; then
create_date=$(stat -c %y "$dir" | cut -d' ' -f1)
fi
fi
# 转换为时间戳
create_ts=$(date -d "$create_date" +%s 2>/dev/null)
if [ -z "$create_ts" ]; then
continue
fi
# 日期范围判断
if (( create_ts < start_ts || create_ts > end_ts )); then
continue
fi
# 检查同级目录是否存在更大的 iter_000y
parent_dir=$(dirname "$dir")
bigger_exist=false
for sibling in "$parent_dir"/iter_000*; do
if [ -d "$sibling" ]; then
sib_num=$(echo "$(basename "$sibling")" | sed -E 's/iter_0+([0-9]+)/\1/')
if (( sib_num > num )); then
bigger_exist=true
break
fi
fi
done
if [ "$bigger_exist" = false ]; then
continue
fi
# 符合条件 -> 询问是否删除
# 打印当前目录及同级目录内容
echo "----------------------------------------"
echo "📂 待删除文件夹: $dir ✅"
echo "----------------------------------------"
echo "同级目录内容:"
ls -l "$parent_dir"
echo "----------------------------------------"
read -p "是否删除? (Y/N, 回车默认不删除): " choice < /dev/tty # 必须使用 < /dev/tty 以确保从交互界面接收到一个输入
if [[ "$choice" =~ ^[Yy]$ ]]; then
rm -rf "$dir" && echo "✅ 已删除 $dir"
else
echo "跳过 $dir"
fi
done