NLP——Megatron框架的使用


Megatron 整体说明

  • Megatron 是由 NVIDIA 开发的一个用于训练 LLM 的高性能框架
  • Megatron 专为分布式训练优化,支持模型并行、数据并行和混合精度训练,能够在数千个 GPU 上高效运行
  • Megatron 集成了多种优化技术,包括张量并行、管道并行、激活检查点等,显著提升了超大规模模型的训练效率
  • Megatron 通常与 DeepSpeed 结合使用,形成 Megatron-DeepSpeed 框架,进一步增强训练能力

Megatron 安装

  • Megatron 的安装需要结合 NVIDIA 的环境和依赖,以下是详细的安装步骤:
  • 推荐在 Linux 系统上安装

安装依赖项 PyTorch

  • 根据 CUDA 版本安装对应的 PyTorch:
    1
    2
    3
    4
    5
    # 对于CUDA 11.8
    pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118

    # 对于CUDA 12.1
    pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121

安装 Megatron

  • 下载源码并安装
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    git clone https://github.com/NVIDIA/Megatron-LM.git
    cd Megatron-LM

    pip install -r requirements.txt

    # 安装apex(用于混合精度训练)
    git clone https://github.com/NVIDIA/apex
    cd apex
    pip install -v --disable-pip-version-check --no-cache-dir --global-option="--cpp_ext" --global-option="--cuda_ext" ./
    cd ..

    # 安装其他依赖
    pip install sentencepiece regex datasets

验证安装是否成功

  • 创建一个简单的Python脚本来验证 Megatron 是否正确安装:
    1
    2
    3
    4
    5
    6
    import torch
    from megatron import get_args
    from megatron.model import GPTModel

    # 能导入相关包,说明 Megatron 已成功安装
    print("Megatron安装成功!")

Megatron 数据处理

  • 详情参考:github.com/NVIDIA/Megatron-LM

  • 数据预处理负责将 .jsonl 的文本数据 tokenize 并处理成 Megatron 可以直接读取的数据格式(.bin.idx 类型的文件),减少训练时的数据处理时间

  • 准备 .jsonl 文件,文件格式如下:

    1
    2
    {"text": "Your training text here..."}
    {"text": "Another training sample..."}
  • 数据预处理:

    1
    2
    3
    4
    5
    6
    7
    python tools/preprocess_data.py \
    --input data.jsonl \
    --output-prefix processed_data \
    --tokenizer-type HuggingFaceTokenizer \
    --tokenizer-model /path/to/tokenizer.model \
    --workers 8 \
    --append-eod
    • output-prefix:输出文件的前缀
    • append-eod:是否添加 EOD Token?
    • 注意:还可以根据需要设置 split_sentences 参数,对文档进行拆分成 sentence 再做 tokenize

Megatron 训练开源标准大模型

  • Megatron 支持对一些标准的开源大模型进行训练,比如 GPT2,此时不需要修改代码
  • 这种标准流程包含以下两部分:
    • 需要处理数据为 Megatron 支持的格式
    • 使用命令行启动任务
  • 本文暂不对这部分进行详细讲解

Megatron 训练自定义模型

核心思路

  • 需要做到如下事情
    • 1)先把 Megatron 自带的 GPT/BERT/T5 的「壳」理解透
    • 2)再把自己的网络结构「套」进去
    • 3)最后复用 Megatron 的并行、优化器、数据管道即可
  • 基本思路:
    • 使用 Megatron 训练自定义模型,不需要改 Megatron 核心 ,只实现 3-4 个钩子即可
    • 一些底层的高阶功能,如并行、混合精度、检查点 全部复用官方实现
    • 任何模型(CNN、RWKV、RetNet…)只要包装成 MegatronModule返回 loss ,都能用 Megatron 训练

目录结构

  • 目录结构如下,建议自建文件都放到统一的新文件夹 my_model
    1
    2
    3
    4
    5
    6
    7
    8
    Megatron-LM/
    ├─ megatron/ # 官方代码不动
    ├─ examples/ # 官方示例
    ├─ my_model/ # 我们自己的
    │ ├─ __init__.py
    │ ├─ model.py # 自定义网络
    │ ├─ layer.py # 自定义层
    │ └─ train.py # 入口脚本

自定义模型的 4 个关键钩子说明

  • Megatron 的训练循环入口是 pretrain(),它通过 4 个可插拔函数 决定「数据长什么样、模型长什么样、前向怎么算、验证看啥指标」:
    钩子名称 作用 示例文件
    model_provider 返回 nn.Module my_model/model.py
    train_valid_test_dataset_provider 返回三个 Dataset my_model/data.py
    forward_step_func 定义一次前向/损失 my_model/train.py
    process_non_loss_data_func TensorBoard 画额外指标(可选) my_model/train.py

模型钩子(model_provider)

  • 新建 my_model/model.py
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    from megatron.model.module import MegatronModule
    from megatron import get_args

    class MyCustomModel(MegatronModule):
    def __init__(self, num_tokentypes=0):
    super().__init__()
    args = get_args()
    self.embed = nn.Embedding(args.padded_vocab_size, args.hidden_size)
    # 这里换成自己的自定义层
    self.backbone = ConvNextBackbone(args.hidden_size, args.num_layers)
    self.lm_head = nn.Linear(args.hidden_size, args.padded_vocab_size)

    def forward(self, input_ids, position_ids, attention_mask, labels=None):
    x = self.embed(input_ids)
    x = self.backbone(x, attention_mask) # [b, s, h]
    logits = self.lm_head(x) # [b, s, V]

    if labels is None:
    return logits
    loss = F.cross_entropy(logits.view(-1, logits.size(-1)), labels.view(-1))

    # Megatron 的 GPTModel 实现也是在 Model.forward 直接返回 loss 的
    return loss

    def model_provider(pre_process=True, post_process=True):
    """给 Megatron 调用的工厂函数,负责返回 MegatronModule 类的模型对象"""
    return MyCustomModel()

数据钩子(train_valid_test_dataset_provider)

  • .jsonl/txt 转成 Megatron 的 IndexedDataset
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    # my_model/data.py
    from megatron.data.dataset_utils import build_train_valid_test_datasets
    from megatron import get_args

    def train_valid_test_dataset_provider(train_val_test_num_samples):
    args = get_args()
    return build_train_valid_test_datasets(
    data_prefix=args.data_path,
    splits_string=args.split,
    train_valid_test_num_samples=train_val_test_num_samples,
    seq_length=args.seq_length,
    masked_lm_prob=0.15 if args.task == 'BERT' else 0.0,
    seed=args.seed,
    skip_warmup=True,
    )

前向钩子(forward_step_func)

  • 定义前向过程(包含 loss 计算,需要返回 loss
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    # my_model/train.py
    from megatron.training import get_model
    from megatron.utils import average_losses_across_data_parallel_group
    from megatron import get_args

    def forward_step(data_iterator, model):
    """一次 micro-batch 的前向"""
    args = get_args()
    tokens = next(data_iterator)['text'].long().cuda()
    labels = tokens[:, 1:].contiguous()
    tokens = tokens[:, :-1]
    position_ids = torch.arange(tokens.size(1), device=tokens.device).unsqueeze(0)
    attention_mask = (tokens != args.pad_token_id).unsqueeze(1).unsqueeze(2)

    # 因为传入 labels 参数时,模型的 forward 已经计算出来 loss 了,这里可以不需要自己写参数
    loss = model(tokens, position_ids, attention_mask, labels)
    reduced = average_losses_across_data_parallel_group([loss])

    # 第一个返回值必须是 loss,第二个返回值可以是任意想要记录的辅助信息
    return loss, {'lm loss': reduced[0]}

将钩子传入 pretrain() 函数

  • 调用 pretrain(),传入前面定义的钩子
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    # my_model/train.py
    from megatron.training import pretrain

    if __name__ == '__main__':
    pretrain(
    train_valid_test_dataset_provider, # 用于提供训练、验证和测试数据集的函数或模块
    model_provider, # 用于构建模型的函数,调用它可返回模型实例
    ModelType.encoder_or_decoder, # 或 encoder_decoder
    forward_step, # 定义模型前向传播步骤的函数,包括输入处理、模型计算和损失计算等
    process_non_loss_data_func=None # tensorboard 的 额外指标,process_non_loss_data_func 在这里暂未实现
    )

启动训练

  • 单节点 8 卡示例:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    torchrun --nproc_per_node=8 my_model/train.py \
    --tensor-model-parallel-size 2 \
    --pipeline-model-parallel-size 2 \
    --num-layers 12 \
    --hidden-size 768 \
    --data-path data/my_corpus \
    --seq-length 1024 \
    --micro-batch-size 4 \
    --global-batch-size 64 \
    --train-iters 50000 \
    --lr 1e-4 \
    --save checkpoints/myconvnext

附录:使用中遇到的常见问题

  • 显存 OOM 问题:
    • 解决方案:降 micro-batch、开 --recompute-activations、或加大 TP/PP
  • loss 不收敛 问题:
    • 检查学习率、warmup、初始化;确认 pad_token_id 设置正确
  • 多机多卡时卡死不动:
    • 确认 MASTER_ADDR/MASTER_PORT 一致,NCCL 版本统一等

附录:pretrain 函数的详细说明

  • Megatron-LM 中的 pretrain 函数是模型预训练的核心入口,定义在 megatron/training.py 文件中

pretrain() 函数参数

  • train_valid_test_dataset_provider:用于提供训练、验证和测试数据集的函数或模块
  • model_provider:用于构建模型的函数,调用它可返回模型实例
  • model_type:模型的类型,如ModelType.encoder_or_decoder
  • forward_step_func:定义模型前向传播步骤的函数,包括输入处理、模型计算和损失计算等
  • valid_forward_step_func:可选参数,用于验证阶段的前向传播函数
  • args_defaults:可选参数,包含默认的参数设置

pretrain() 函数内部执行的主要流程

  • 第一步,初始化Megatron
    • 调用 initialize_megatron 函数,初始化 Megatron-LM 所需的分布式环境,包括设置分布式通信后端、初始化分布式进程组、配置日志记录等
    • 还会调用get_args()get_timers()函数获取配置参数与计时器,并设置 PyTorch JIT 融合选项,同步启动时间
  • 第二步,设置模型、优化器和学习率计划
    • 调用setup_model_and_optimizer函数,传入model_providermodel_type,返回模型、优化器以及学习率调度器
  • 第三步,获取训练/验证/测试数据集
    • 根据 args.virtual_pipeline_model_parallel_size 是否为 None 来判断是否需要进行虚拟流水线模型并行处理
    • 如果不进行虚拟流水线模型并行,则直接调用build_train_valid_test_data_iterators函数,获取训练、验证和测试数据迭代器
  • 第四步,调用train函数训练模型
    • 判断 args.do_trainargs.train_iters 是否满足条件,若满足则调用 train 函数执行训练过程
    • train 函数接收多个参数,包括前向传播步骤函数、模型、优化器、学习率调度器、训练数据和验证数据迭代器等,并返回最后一次迭代的索引和到目前为止执行的浮点运算次数

附录:如何修改优化器

  • Megatron-LM 里“指定/切换优化器”有两种主流做法:
    • 第一种:不动源码,靠命令行参数(最简单,官方已内置)
    • 第二种:改源码,注册自定义优化器(想换 Lion、RAdam 等第三方优化器时用)

第一种:命令行直接切换(无需改代码)

  • 注:Megatron 从 2024-10 之后的版本开始,把 optimizer 也暴露成了 CLI 参数:
    主要参数 取值 说明
    --optimizer adam, sgd 默认 adam,会自动选用 Apex 的 FusedAdam
    --adam-beta1 0.9
    --adam-beta2 0.95
    --adam-eps 1e-8
    --sgd-momentum 0.9 只在 --optimizer sgd 时生效
    --weight-decay 0.1 通用
    --clip-grad 1.0 梯度裁剪
  • 示例:把优化器换成 SGD + momentum 的启动脚本如下:
    1
    2
    3
    4
    5
    torchrun --nproc_per_node=8 pretrain_gpt.py \
    ... \
    --optimizer sgd \
    --sgd-momentum 0.9 \
    --weight-decay 1e-4

第二种:源码级自定义优化器(以 Lion 为例)

  • 当你想用官方未内置的优化器(Lion、AdaFactor、RAdam等)时,只要三步:

  • 第一步:在 megatron/core/optimizer/ 里新建文件 lion.py,并定义自己的优化器类

    1
    2
    3
    4
    import torch
    class Lion(torch.optim.Optimizer): # 继承 Optimizer 类
    def __init__(self, params, lr=1e-4, betas=(0.9, 0.99), weight_decay=0.0):
    ...
    • 问题:需要特殊处理的优化器,比如可能涉及其他更多超参数的优化器,还需要考虑
  • 第二步:在 megatron/core/optimizer/__init__.py_get_megatron_optimizer() 中注册新的优化器

    1
    2
    elif opt == 'lion':
    optimizer = Lion(param_groups, lr=args.lr, weight_decay=args.weight_decay)
  • 第三步:启动脚本里添加新的优化器选项

    1
    --optimizer lion
  • Megatron 会自动把上述自定义的 Lion 优化器包装到 DistributedOptimizer(或 DeepSpeed ZeRO,如果启用)里,梯度同步、fp16/bf16 主参数、checkpoint 保存/加载全部复用现有逻辑


附录:CPU Offload & 显存优化

  • Megatron 支持把优化器状态卸载到 CPU 以减少显存:
    1
    2
    --optimizer-cpu-offload \
    --optimizer-offload-fraction 0.8 # 80% 状态放 CPU

附录:Megatron 使用代码简单示例

  • 下面是一个使用 Megatron 训练 GPT 模型的示例代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    import os
    import torch
    from megatron import get_args
    from megatron import mpu
    from megatron.initialize import initialize_megatron
    from megatron.training import setup_model_and_optimizer
    from megatron.model import GPTModel
    from megatron.training import train_step
    from megatron.data.gpt_dataset import build_train_valid_test_datasets

    def main():
    # 初始化Megatron
    initialize_megatron(extra_args_provider=None, args_defaults={
    'tokenizer_type': 'GPT2BPETokenizer',
    'micro_batch_size': 4,
    'global_batch_size': 32,
    'lr': 0.00015,
    'min_lr': 0.00001,
    'lr_decay_style': 'cosine',
    'weight_decay': 0.1,
    'clip_grad': 1.0,
    'lr_warmup_fraction': 0.01,
    'num_layers': 24,
    'hidden_size': 1024,
    'num_attention_heads': 16,
    'seq_length': 1024,
    'max_position_embeddings': 1024,
    'vocab_size': 50257, # GPT-2 vocab size
    'tensor_model_parallel_size': 2,
    'pipeline_model_parallel_size': 2,
    'pipeline_model_parallel_split_rank': 0,
    'fp16': True,
    'bf16': False,
    'seed': 1234,
    })

    args = get_args()

    # 构建数据集
    train_ds, valid_ds, test_ds = build_train_valid_test_datasets(
    data_prefix=args.data_path,
    data_impl=args.data_impl,
    splits_string=args.split,
    train_valid_test_num_samples=[args.train_samples, args.valid_samples, args.test_samples],
    seq_length=args.seq_length,
    seed=args.seed,
    skip_warmup=(not args.mmap_warmup)
    )

    # 设置模型和优化器
    model, optimizer, lr_scheduler = setup_model_and_optimizer()

    # 训练循环
    iteration = 0
    max_iterations = args.train_iters

    while iteration < max_iterations:
    loss = train_step(model, optimizer, lr_scheduler, train_ds)
    if torch.distributed.get_rank() == 0 and iteration % args.log_interval == 0:
    print(f"迭代: {iteration}/{max_iterations}, 损失: {loss.item()}")
    iteration += 1

    if __name__ == "__main__":
    main()
  • 通常使用以下命令启动训练(假设每台机器使用 4 个 GPU):

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    # 使用torchrun启动分布式训练
    torchrun --nproc_per_node=4 --master_port=12345 your_script.py \
    --data-path /path/to/your/dataset \
    --vocab-file /path/to/vocab.json \
    --merge-file /path/to/merges.txt \
    --save /path/to/save/checkpoints \
    --load /path/to/load/checkpoints \
    --num-layers 24 \
    --hidden-size 1024 \
    --num-attention-heads 16 \
    --seq-length 1024 \
    --max-position-embeddings 1024 \
    --micro-batch-size 4 \
    --global-batch-size 32 \
    --lr 0.00015 \
    --min-lr 0.00001 \
    --lr-decay-style cosine \
    --lr-warmup-fraction 0.01 \
    --weight-decay 0.1 \
    --clip-grad 1.0 \
    --fp16 \
    --seed 1234

附录:Megatron 中间数据 decode 查看

  • 示例代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    import os
    import torch

    # 读取 tokens.bin(假设为 int32 类型,根据预处理配置调整 dtype)
    tokens_path = "./processed_tokens_document.bin"

    dtype = torch.int32 # 假设是int32,根据实际预处理配置调整(如int64)
    bytes_per_token = dtype.itemsize # int32->4字节,int64->8字节

    # 获取文件总大小(字节)
    file_size = os.path.getsize(tokens_path)
    # 计算总token数(总字节数 / 每个token的字节数)
    total_tokens = file_size // bytes_per_token
    print(f"文件总大小:{file_size} 字节,总token数:{total_tokens}")

    from transformers import AutoModelForCausalLM, AutoTokenizer
    model_name = "./path_to_model/"
    # load the tokenizer and the model
    tokenizer = AutoTokenizer.from_pretrained(model_name)

    # 抽查的 Token 数量
    block_size = 10000

    # 分块读取并解码
    with open(tokens_path, "rb") as f:
    for start in range(0, total_tokens, block_size):
    # 计算当前块的结束位置(不超过总token数)
    end = min(start + block_size, total_tokens)
    current_block_size = end - start

    # 读取当前块的二进制数据(字节数 = token数 × 每个token的字节数)
    f.seek(start * bytes_per_token) # 移动到当前块的起始位置
    block_bytes = f.read(current_block_size * bytes_per_token)

    # 将二进制数据转为torch tensor(token索引)
    block_tokens = torch.frombuffer(block_bytes, dtype=dtype)

    # 解码当前块为文本
    block_text = tokenizer.decode(block_tokens.tolist(), skip_special_tokens=False) # skip_special_tokens=True 会缺失 Special Token

    print(f"处理块 {start//block_size + 1}/{(total_tokens + block_size -1)//block_size}")
    print(block_text)
    break # 打开 break 可不断循环读取

附录:ckpt 文件清理

  • 使用 Megatron-LM 训练模型时,为了保证可恢复,常常会出现存储多个 ckpt 的情况,一般是一定的步骤就存储一个 ckpt
  • 当实验完成后一般仅保留最后一个即可
  • Meagtron-LM 的每个 ckpt 中,都完整存储着从这个 ckpt 启动继续训练所需的所有文件,包括模型权重文件等
    1
    2
    distrib_optim.pt:分布式优化器状态分片,训练恢复时用
    model_optim_rng.pt:随机数生成器状态,训练恢复时用,可能包含模型权重等,根据具体场景可能部分

脚本编写

  • 脚本编写的基本要求为:
    • 删除某个目录(用参数传入)下所有满足条件的文件夹(包括子文件夹):
      • 1)创建日期在 “2024-08-01” 到 “2024-08-10” 之间的
      • 2)文件名以 iter_000x 命名,且x是100 的整倍数,比如 iter_0000600iter_0001000
      • 3)当前同级目录下还存在以 iter_000x 命名,且 x 比自己大的文件夹
    • 删除前要求如下:
      • 删除每个文件夹时,先询问是否删除,必须等待回应才继续(同时打印被删除的文件夹及其同级的其他文件夹和文件),Y表示删除,N表示不删除,直接Enter表示不删除;
      • 注意:为了安全起见,一定要收到 “Y” 作为输入再删除,否则不删除,避免误删
  • 下面是一个脚本实现(大模型实现,经过本人部分修改),用于帮助清理 ckpt(已经测试过可以使用):
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    #!/bin/bash

    # 用法: ./clean_x00_steps_in_ckpt.sh /path/to/target_dir
    TARGET_DIR="$1"

    if [ -z "$TARGET_DIR" ]; then
    echo "❌ 请传入目标目录路径"
    exit 1
    fi

    if [ ! -d "$TARGET_DIR" ]; then
    echo "❌ 目录不存在: $TARGET_DIR"
    exit 1
    fi

    # step_interval
    step_interval=100

    # 日期范围
    START_DATE="2025-08-01"
    END_DATE="2025-11-10"

    # 转换为时间戳方便比较
    start_ts=$(date -d "$START_DATE" +%s)
    end_ts=$(date -d "$END_DATE" +%s)

    # 遍历所有匹配 iter_000x 的文件夹(递归)
    find "$TARGET_DIR" -type d -regextype posix-egrep -regex '.*/iter_000[0-9]+' | while read -r dir; do
    basename=$(basename "$dir")

    # 提取数字部分
    num=$(echo "$basename" | sed -E 's/iter_0+([0-9]+)/\1/')

    # 判断是否是 step_interval 的倍数
    if (( num % step_interval != 0 )); then
    continue
    fi

    # 获取创建日期(Linux & macOS兼容)
    if [[ "$OSTYPE" == "darwin"* ]]; then
    create_date=$(stat -f "%SB" -t "%Y-%m-%d" "$dir")
    else
    create_date=$(stat -c %w "$dir")
    if [ "$create_date" = "-" ]; then
    create_date=$(stat -c %y "$dir" | cut -d' ' -f1)
    fi
    fi

    # 转换为时间戳
    create_ts=$(date -d "$create_date" +%s 2>/dev/null)
    if [ -z "$create_ts" ]; then
    continue
    fi

    # 日期范围判断
    if (( create_ts < start_ts || create_ts > end_ts )); then
    continue
    fi

    # 检查同级目录是否存在更大的 iter_000y
    parent_dir=$(dirname "$dir")
    bigger_exist=false
    for sibling in "$parent_dir"/iter_000*; do
    if [ -d "$sibling" ]; then
    sib_num=$(echo "$(basename "$sibling")" | sed -E 's/iter_0+([0-9]+)/\1/')
    if (( sib_num > num )); then
    bigger_exist=true
    break
    fi
    fi
    done

    if [ "$bigger_exist" = false ]; then
    continue
    fi

    # 符合条件 -> 询问是否删除
    # 打印当前目录及同级目录内容
    echo "----------------------------------------"
    echo "📂 待删除文件夹: $dir ✅"
    echo "----------------------------------------"
    echo "同级目录内容:"
    ls -l "$parent_dir"
    echo "----------------------------------------"
    read -p "是否删除? (Y/N, 回车默认不删除): " choice < /dev/tty # 必须使用 < /dev/tty 以确保从交互界面接收到一个输入

    if [[ "$choice" =~ ^[Yy]$ ]]; then
    rm -rf "$dir" && echo "✅ 已删除 $dir"
    else
    echo "跳过 $dir"
    fi
    done