Jiahong 的个人博客

凡事预则立,不预则废


  • Home

  • Tags

  • Archives

  • Navigation

  • Search

Python——多线程可中断文件逐行处理示例


整体说明

  • 本文示例使用 AI 辅助生成,Prompt 为:
    1
    2
    3
    4
    5
    写一个多线程 python 代码,从一个文件读取数据,然后逐行进行处理,加载为 json 后从中读取 'input' 字段并在前后添加 '```',处理完成后写入另一个文件中,要求如下:
    1. 处理过程中实时打印处理进度
    2. 要求不使用 queue 等高级的包,用原生的 Python 和 threading 包实现即可
    3. 要求写入文件顺序和原始文件的顺序相同
    4. 由于文件很大,且执行过程中可能会随时中断,请用一个文件维护完成情况(完整写入文件才算完成),保证可以随时重启(指定参数 resume=True 时则从中断处启动,否则从头开始重新执行)

多线程可中断文件逐行处理示例

  • 代码示例,仅修改 process_line 即可使用:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    226
    227
    228
    229
    230
    231
    232
    233
    234
    235
    236
    237
    238
    239
    240
    241
    242
    243
    244
    245
    246
    247
    248
    249
    250
    251
    252
    253
    254
    255
    256
    257
    258
    259
    260
    261
    262
    263
    264
    265
    266
    267
    268
    269
    270
    271
    272
    273
    274
    275
    276
    import threading
    import json
    import os
    import signal
    import sys
    from typing import Optional
    import traceback

    class MultiThreadFileProcessor:
    def __init__(self, input_file: str, output_file: str, progress_file: str = "progress.txt", num_threads: int = 4):
    """
    初始化文件处理器

    Args:
    input_file: 输入文件路径
    output_file: 输出文件路径
    progress_file: 进度记录文件路径
    num_threads: 线程数量
    """
    self.input_file = input_file
    self.output_file = output_file
    self.progress_file = progress_file
    self.num_threads = num_threads

    # 用于线程同步的锁
    self.task_lock = threading.Lock()
    self.result_lock = threading.Lock()
    self.write_lock = threading.Lock()

    # 所有待处理的行
    self.all_lines = []

    # 当前要分配的任务索引
    self.current_task_index = 0

    # 存储处理结果的字典
    self.results = {}

    # 已完成的行号集合
    self.completed_lines = set()

    # 下一个要写入的行号
    self.next_write_line = 0

    # 总行数
    self.total_lines = 0

    # 处理完成的行数
    self.processed_count = 0

    # 输出文件句柄
    self.output_handle = None

    # 停止标志(用于优雅退出)
    self.stop_flag = threading.Event()

    # 注册信号处理器
    signal.signal(signal.SIGINT, self.signal_handler)
    signal.signal(signal.SIGTERM, self.signal_handler)

    def signal_handler(self, signum, frame):
    """处理 Ctrl+C 和终止信号"""
    print("\n\n收到中断信号,正在优雅退出...")
    print("已处理的数据会保存,可以使用 resume=True 继续")
    self.stop_flag.set() # 设置停止标志

    def load_progress(self) -> set:
    """加载进度文件,返回已完成的行号集合"""
    if os.path.exists(self.progress_file):
    with open(self.progress_file, 'r') as f:
    completed = set(int(line.strip()) for line in f if line.strip())
    return completed
    return set()

    def save_progress(self, line_num: int):
    """保存进度到文件"""
    with open(self.progress_file, 'a') as f:
    f.write(f"{line_num}\n")
    f.flush()

    def process_line(self, line: str) -> str:
    """
    处理单行数据

    Args:
    line: 原始行数据

    Returns:
    处理后的数据
    """
    try:
    data = json.loads(line.strip())
    if 'input' in data:
    data['input'] = f"```{data['input']}```"
    return json.dumps(data, ensure_ascii=False)
    except json.JSONDecodeError as e:
    print(f"\nJSON解析错误: {e}, 原始数据: {line[:100]}")
    return line.strip()

    def get_next_task(self) -> Optional[tuple]:
    """
    获取下一个待处理的任务(线程安全)

    Returns:
    (行号, 行内容) 或 None(无任务)
    """
    with self.task_lock:
    # 检查停止标志
    if self.stop_flag.is_set():
    return None

    # 跳过已完成的任务
    while self.current_task_index < len(self.all_lines):
    line_num, line = self.all_lines[self.current_task_index]
    self.current_task_index += 1

    if line_num not in self.completed_lines:
    return (line_num, line)
    else:
    # 已完成的任务也计入进度
    self.processed_count += 1

    return None

    def worker(self):
    """工作线程函数 - 动态获取任务"""
    try:
    while not self.stop_flag.is_set():
    # 获取下一个任务
    task = self.get_next_task()
    if task is None:
    break # 没有任务了或收到停止信号

    line_num, line = task

    # 处理数据
    processed = self.process_line(line)

    # 检查是否需要停止
    if self.stop_flag.is_set():
    # 将未写入的结果放回(不保存进度)
    break

    # 将结果存储到字典中
    with self.result_lock:
    self.results[line_num] = processed
    self.processed_count += 1

    # 实时打印进度
    progress = (self.processed_count / self.total_lines) * 100
    print(f"\r处理进度: {self.processed_count}/{self.total_lines} ({progress:.2f}%) | 待写入: {len(self.results)}", end='', flush=True)

    # 尝试写入文件(按顺序)
    self.try_write_results()

    except Exception as e:
    print(f"\n线程 {threading.current_thread().name} 发生错误: {e}")
    traceback.print_exc()
    self.stop_flag.set() # 发生错误时通知其他线程停止

    def try_write_results(self):
    """尝试按顺序写入结果到文件"""
    if self.stop_flag.is_set():
    return # 如果收到停止信号,不再写入

    with self.write_lock:
    # 按顺序写入所有可以写入的行
    while self.next_write_line in self.results:
    line_num = self.next_write_line

    with self.result_lock:
    content = self.results.pop(line_num)

    # 写入文件
    self.output_handle.write(content + '\n')
    self.output_handle.flush() # 确保写入磁盘

    # 保存进度
    self.save_progress(line_num)

    # 更新下一个要写入的行号
    self.next_write_line += 1

    def process(self, resume: bool = False):
    """
    主处理函数

    Args:
    resume: 是否从中断处继续
    """
    # 如果不是恢复模式,清空输出文件和进度文件
    if not resume:
    if os.path.exists(self.output_file):
    os.remove(self.output_file)
    if os.path.exists(self.progress_file):
    os.remove(self.progress_file)
    self.completed_lines = set()
    self.next_write_line = 0
    else:
    # 加载已完成的行
    self.completed_lines = self.load_progress()
    self.next_write_line = len(self.completed_lines)
    print(f"从第 {self.next_write_line} 行继续处理...")

    # 读取所有行
    print("正在读取文件...")
    with open(self.input_file, 'r', encoding='utf-8') as f:
    self.all_lines = [(i, line) for i, line in enumerate(f)]

    self.total_lines = len(self.all_lines)
    print(f"文件总行数: {self.total_lines}")
    print(f"已完成行数: {len(self.completed_lines)}")
    print(f"待处理行数: {self.total_lines - len(self.completed_lines)}")

    if len(self.completed_lines) >= self.total_lines:
    print("所有行已处理完成!")
    return

    # 打开输出文件(追加模式)
    self.output_handle = open(self.output_file, 'a', encoding='utf-8')

    try:
    # 创建并启动线程(设置为守护线程)
    threads = []
    for i in range(self.num_threads):
    thread = threading.Thread(target=self.worker, name=f"Worker-{i}")
    thread.daemon = False # 不设置为守护线程,以便优雅退出
    threads.append(thread)
    thread.start()

    # 等待所有线程完成
    for thread in threads:
    thread.join()

    # 检查是否是被中断的
    if self.stop_flag.is_set():
    print(f"\n程序被中断")
    print(f"已完成 {self.next_write_line} 行的处理和写入")
    print(f"使用 resume=True 可以继续处理")
    else:
    # 确保所有结果都已写入
    self.try_write_results()
    print(f"\n处理完成! 输出文件: {self.output_file}")

    except KeyboardInterrupt:
    print("\n\n检测到键盘中断...")
    self.stop_flag.set()

    # 等待线程退出(最多等待5秒)
    for thread in threads:
    thread.join(timeout=5)

    print(f"已完成 {self.next_write_line} 行的处理和写入")

    finally:
    # 关闭输出文件
    if self.output_handle:
    self.output_handle.close()
    print("输出文件已安全关闭")


    # 使用示例
    if __name__ == "__main__":
    # 创建处理器实例
    processor = MultiThreadFileProcessor(
    input_file="input.jsonl",
    output_file="output.jsonl",
    progress_file="progress.txt",
    num_threads=4
    )

    # 从头开始处理
    # processor.process(resume=False)

    # 从中断处继续
    processor.process(resume=True)

Python——队列和栈使用

本文从总结Python中栈和队列的基本使用
Python 中queue模块是线程安全的,为多线程任务设计的,没有peek()操作

  • 双端队列(deque)是一个具有栈和队列性质的数据结构,可以从两端弹出

普通的栈和队列

栈

list实现栈
1
2
3
4
5
6
7
8
9
10
11
# init
stack = list()
# push
stack.append(1)
# pop
stack.pop()
# peek
top = stack[-1]
# determine whether it is empty
if len(stack) == 0:
print("stack is empty")
deque实现栈
1
2
3
4
5
6
7
8
9
10
11
12
from collections import deque
# init
stack = deque([1, 2, 3])
# push
stack.append(4)
# pop
stack.pop()
# peek
top = stack[-1]
# determine whether it is empty
if len(stack) == 0:
print("stack is empty")

队列

list实现栈
1
2
3
4
5
6
7
8
9
10
11
12
# init
queue = [1, 2, 3]
# push
queue.append(4)
# pop
queue.pop(0)
# peek
first = queue[0]
last = queue[-1]
# determine whether it is empty
if len(queue) == 0:
print("queue is empty")
deque实现队列
1
2
3
4
5
6
7
8
9
10
11
12
13
from collections import deque
# init
queue = deque([1, 2, 3])
# push
queue.append(4)
# pop
queue.popleft()
# peek
first = queue[0]
last = queue[-1]
# determine whether it is empty
if len(queue) == 0:
print("queue is empty")

线程安全的栈和队列

queue模块实现队列和栈

1
2
3
4
5
6
7
8
9
10
11
import queue
# init, stack and queue
sstack = queue.LifoQueue()
squeue = queue.Queue()
# push
sstack.put(item)
# pop
sstack.get()
# determine whether it is empty
if sstack.empty():
print("sstack is empty")

Centos——硬盘操作-分区和挂载

  • 参考博客:https://www.cnblogs.com/lizhangshu/p/9719018.html

磁盘分区类型

  • 三种分区
    • 主分区
    • 扩展分区
    • 逻辑分区
  • 分区规则
    • 主分区 + 扩展分区的数量不能超过4个
    • 扩展分区只能有1个
    • 逻辑分区要在扩展分区之上进行划分,逻辑分区没有数量限制,可以任意个
    • 扩展分区是不能直接用的,他是以逻辑分区的方式来使用的,所以说扩展分区可分成若干逻辑分区。他们的关系是包含的关系,所有的逻辑分区都是扩展分区的一部分
  • 硬盘的容量
    • 硬盘容量 = 主分区的容量 + 扩展分区的容量
    • 扩展分区的容量 = 各个逻辑分区的容量之和
  • 为什么 主分区 + 扩展分区数量不能超过4个
    • 主分区就是普通磁盘分盘,但是由于磁盘设备由大量的扇区组成,一个扇区的容量为512字节。磁盘的第一个扇区最为重要,记录了主引导记录与分区表信息。就第一个扇区而言,主引导信息记录需要占用466个字节,分区表64个字节,结束符占用2个字节;其中分区表中每记录一个分区信息就需要16个字节,所以最多只有4个分区信息可以记录在第一个扇区中,所以主分区+扩展分区的数量不能超过4个。但是为了创建更多的分区,就使用扩展分区做份下若干个分区的指针,划分若干个逻辑分区,来满足分区数大于4个的需求。扩展分区不需要挂载,但是可以格式化

分区操作

  • 查看当前设备上的磁盘信息及分区信息

    1
    fdisk -l
  • 选取需要分区的磁盘

    1
    fdisk /dev/[disk_name]
    • 注意这里disk_name是硬盘名不是分区名称
    • 一般来说硬盘名都是sda, sdb, vda, vdb等
    • 一般来说硬盘sda上的分区名成为sda1,sda2等
  • 在进入分区磁盘后打印操作帮助

    1
    m
  • 新建分区

    1
    n
  • 选择是主分区还是扩展分区

    • 主分区

      1
      p
    • 拓展分区

      1
      e
  • 选择分区号[1-4]

    1
    1
  • 起始扇区

    1
    2048
    • 一般默认值即可
  • 结束扇区

    1
    +200G
    • 结束扇区在起始扇区基础上+200G, 表示该分区大小为200G
    • 默认是全部分配给当前分区

格式化分区

  • 先输入

    1
    mkfs.
  • Tab键,查看所有可能的命令

mkfs.bfs mkfs.exfat mkfs.ext3 mkfs.fat mkfs.msdos mkfs.vfat
mkfs.cramfs mkfs.ext2 mkfs.ext4 mkfs.minix mkfs.ntfs

  • 用法

    1
    mkfs.ntfs
  • 说明:

    • mkfs.ntfs命令可以将文件分区为

挂载分区

  • 新建挂载文件夹

    1
    mkdir /mnt/data
    • 建议新建在/mnt/文件夹下
    • 比如我本地的机器为/mnt/SSD和/mnt/HDD

临时挂载

  • 分区临时挂载

    1
    mount /dev/[part_name] /mnt/[dir_name]
    • 没有文件系统的分区不能挂载
  • 分区临时卸载

    1
    umount /dev/[part_name]

开机重启自动挂载

修改文件
  • 查看想要挂载分区的UUID

    • 查看所有UUID

      1
      blkid
    • 查看某个分区的UUID

      1
      blkid /dev/[part_name]
  • 编辑文件

    1
    vim /etc/fstab
  • 添加

    1
    2
    UUID=[UUID_NUMBER]   /data/[dir_name]  ext4  defaults   0 0
    分区临时卸 挂载路径 分区格式 参数 是否备份 引导分区相关(引导分区为1,其他分区为0或者2)
修改生效的两种方式
  • 使用命令重新加载/etc/fstab的内容

    1
    mount -a
  • 重新启动机器

    • 如果配置有错机器可能无法正常进入系统,但是会进入Emergency模式,我们可以在Emergency模式下修改/etc/fstab然后重新启动来修复问题

Linux——NFS服务器和客户端的配置

以Centos7为例, Ubuntu相似


NFS Server

  • 安装nfs所需的所有组件

    1
    sudo yum -y install nfs*
  • 设置开机启动nfs和rpcbind服务

    1
    2
    systemctl enable rpcbind.service
    systemctl enable nfs-server.service
  • 启动nfs和rpcbind服务

    1
    2
    systemctl start rpcbind.service
    systemctl start nfs-server.service
  • 配置exports文件

    1
    2
    3
    4
    5
    vim /etc/exports
    >>> input
    /home/jiahong/SharedTest *(rw,no_root_squash,no_all_squash,sync)
    /home/jiahong/SharedTest 123.45.6.7(rw,no_root_squash,no_all_squash,sync)
    >>> input done
    • 这里的*号可以使用ip,表示只有这个ip可以访问共享文件
    • 使用*则表示所有ip均可访问,设置多个ip可以访问则可使用多行
  • 使exports的配置生效

    1
    exportfs -rv
    • -r生效
    • -v显示结果
  • 查看是否生效

    1
    exportfs

NFS Client

  • 安装nfs所需的所有组件

    1
    sudo yum -y install nfs*
  • 设置开机启动rpcbind服务

    1
    systemctl enable rpcbind.service
  • 启动rpcbind服务

    1
    systemctl start rpcbind.service
  • 查看服务器哪些目录可以共享

    1
    showmount -e serverip
  • 新建文件夹以作为mount目标

    1
    mkdir -p /mnt/nfs/shared_dir
  • 挂载操作

    1
    mount -t nfs serverip:server_dir client_dir
  • 查看挂载情况

    1
    df -h
  • 解除挂载

    1
    umount client_dir

相关问题

  • 客户端出现以下情况时, 一般是服务器防火墙有问题, 解决方案是下面的解开防火墙的命令

    • mount.nfs: No route to host
    • mount.nfs: Connection timed out
      1
      2
      3
      4
      5
      firewall-cmd --permanent --add-service=rpc-bind
      firewall-cmd --permanent --add-service=mountd
      firewall-cmd --permanent --add-port=2049/tcp
      firewall-cmd --permanent --add-port=2049/udp
      firewall-cmd --reload
  • 客户端出现以下情况时,说明客户端未umount但服务器解除文件夹了

    • mount.nfs: Stale file handle
    • umount: xx/xx: Stale file handle
      1
      2
      3
      umount -lf /xx/xx

      # then mount /xx/xx again

Linux——文件操作

本文记录一些Linux相关文件操作的常见问题


mv操作中断

  • 由于mv操作等价于先执行cp然后执行rm操作
    • 还在cp阶段,原始数据是完整的,删除目标文档就行
    • 如果已经进入rm阶段,那么说明目标文件时完整的,删除原始文件就行

硬链接与软链接

Linux中链接分为两类:硬链接(hard link)和软链接(soft link),软链接又称为符号链接(symbolic link)

  • 如果一个文件有多个硬链接,那么需要所有硬连接都被删除,当前文件才会被删除
    • 原始文件与硬链接是同一个物理地址的两个不同名字
    • 硬链接是相互的(个人理解: 一个普通的文件就可以理解为一个硬链接)
  • 如果一个文件有一个硬链接和多个软链接(符号链接),那么删除符号链接不影响原始文件
    • 只有文件的所有硬链接都没删除后文件才会被删除
    • 文件被删除后软链接也会自动失效,链接路径链接不上

Linux——服务器防火墙配置

本文介绍Linux下服务器防火墙的设置
更详细的描述可以参考博客:https://blog.51cto.com/andyxu/2137046


防火墙总结

  • iptables:内核层面的netfilter网络过滤器来处理
  • firewalld: 交由内核层面的nftables包过滤框架处理

Centos6

Centos6默认使用iptable作为防火墙

查看防火墙状态

1
service iptable status

关闭防火墙

  • 临时

    1
    service iptables stop
  • 永久

    1
    chkconfig iptables off

打开防火墙

  • 临时

    1
    service iptables start
  • 永久

    1
    chkconfig iptables on

Centos7

Centos7默认使用的时firewall作为防火墙, 默认使用systemctl管理服务,接下来介绍systemctl管理firewall服务的操作,其他服务也可用systemctl以类似方法管理,只需将filewall名称换成其他服务名称即可

  • 关于服务名称的命名
    • 一般来说都是正常名称后加上一位’d.service’,比如’firewalld.service’,’mysqld.service’等

安装

  • Centos7自带firewalld
  • Ubuntu:
    1
    sudo apt-get install firewalld

配置文件

  • /usr/lib/firewalld/
    • 系统配置,尽量不修改
  • /etc/firewalld/
    • 用户配置地址

关于systemctl的使用

  • 列出所有服务

    1
    systemctl list-unit-files
  • 列出所有打开的服务

    1
    systemctl list-unit-files|grep enabled
  • 列出某个服务

    1
    systemctl list-unit-files|grep [service name]

查看防火墙状态

1
firewall-cmd --state
  • 输出not running或者running

查看防火墙服务

  • 方法一

    1
    systemctl list-unit-files|grep firewalld.service
    • 输出firewalld.service disabled或者firewalld.service enabled
  • 方法二

    1
    systemctl status firewalld.service
    • 输出更详细的信息

开机启动

  • 禁止

    1
    systemctl disable firewalld.service
  • 允许

    1
    systemctl enable firewalld.service
  • 查看

    1
    systemctl is-enabled firewalld.service;echo $?

启动防火墙服务

  • 启动防火墙后默认只开放22端口,其他端口都关闭

    1
    systemctl start firewalld.service
  • 不能启动的解决方案

    1
    2
    systemctl unmask firewalld.service 
    systemctl start firewalld.service

关闭防火墙服务

  • 1
    systemctl stop firewalld.service

端口相关操作命令

  • 查看所有以开放端口

    1
    firewall-cmd --list-ports
  • 查看某个端口是否开启

    1
    firewall-cmd --query-port=8080/tcp
  • 开放端口

    1
    firewall-cmd --zone=public --add-port=80/tcp --permanent
    • 命令含义:
      • –zone #作用域
      • –add-port=80/tcp #添加端口,格式为:端口/通讯协议
      • –permanent #永久生效,没有此参数重启后失效
  • 重启服务

  • 开放端口后需要重启服务才能生效*

    1
    firewall-cmd --reload
  • 移除指定端口

    1
    firewall-cmd --permanent --remove-port=8080/tcp

防火墙的域

域的作用
  • 一共9种,常用的就一种public,开放时把所有访问该端口的用户当做公共人员,不完全信任,trusted为完全信任:
    • block dmz drop external home internal public trusted work
    • 下面图片来自博客:https://blog.51cto.com/13503302/2095633
  • 在开放端口时,可以为其添加域(默认为public),不同的域代表不同的信任
域的操作
  • 查看默认zone

    1
    firewall-cmd --get-default-zone
  • 修改默认zone

    1
    firewall-cmd --set-default-zone=public

防火墙与SSH登录

默认启动firewall后,防火墙不会打开22端口[已测试],但为何还能正常ssh登录呢?

1
firewall-cmd --list-all

public
target: default
icmp-block-inversion: no
interfaces:
sources:
services: ssh dhcpv6-client
ports:
protocols:
masquerade: no
forward-ports:
source-ports:
icmp-blocks:
rich rules:

  • 从上面的输出中可以看出来ssh服务默认被开启防火墙了,所以无需开启22端口即可使用ssh指令登录到服务器
  • 同时开启的还有dhcpv6-client服务,这个服务用于ipv6的DHCP服务(为什么ipv4不需要这个服务呢?详情参考问答:https://unix.stackexchange.com/questions/176717/what-is-dhcpv6-client-service-in-firewalld-and-can-i-safely-remove-it )

Linux——查看服务器内核和系统版本

本文主要介绍Linux系统的内核版本和系统版本等信息用命令行如何查看


系统类型和版本

Centos

  • 文件存在表示为Centos

    1
    cat /etc/redhat-release
  • 上述指令同时会输出Centos版本

Ubuntu

  • 命令可以执行表示为Ubuntu

    1
    lsb_release -a
  • 执行下面命令可以看出Ubuntu的版本

    1
    cat /etc/issue

更进一步的内核信息

  • 内核名称

    1
    uname -s
  • 结点名称

    1
    uname -n
  • 内核发行号

    1
    uname -r
  • 处理器类型

    1
    uname -p
  • 操作系统

    1
    uname -o

Ubuntu——自动加载bashrc

本文描述了如何为Ubuntu用户创建默认的/.bashrc并设置自动加载/.bashrc


创建.bashrc

  • 如果.bashrc存在,则无需创建,很多程序安装时可能会自动创建,否则,需要我们复制一个

  • 复制命令

    1
    cp /etc/skel/.bashrc ~/
  • 亲测,无需复制,创建一个新的即可


使.bashrc生效

暂时生效

用户重新登录不会生效

1
source ~/.bashrc

永久生效

用户登录后默认生效

  • 新建或打开文件

    1
    vim ~/.profile
  • 添加下面的语句

    1
    2
    3
    4
    5
    6
    7
    8
    9
    # ~/.profile: executed by Bourne-compatible login shells.

    if [ "$BASH" ]; then
    if [ -f ~/.bashrc ]; then
    . ~/.bashrc
    fi
    fi

    mesg n
  • 保存后退出,以后默认的,我们登录后~/.bashrc即可生效

  • 马上生效~/.profile

    1
    source ~/.profile

Linux——rpm和deb包的区别

Linux系统管理之rpm命令的使用


帮助信息

  • 执行man rpm可获取rpm命令的详细帮助信息

Centos

  • yum是用于安装和管理RPM包的
  • RPM包是一种预先在linux机器上被打包好的文件,文件后缀为.rpm,类似于Ubuntu上的deb

yum和rpm的区别

  • yum和rpm都是管理RPM包的
  • yum可以联网下载需要的RPM包
  • yum自己可以处理依赖

Centos安装deb包

  • 安装alien

    1
    2
    3
    4
    5
    6
    7
    # download alien source code
    # uncompress source code
    tar -zxvf alien_x.xx.tar.gz
    cd alien
    # compile
    make
    make install
  • 转换deb包为rpm包

    1
    2
    # generate a rpm package with name xxx.rpm
    alien -r xxx.deb
  • 安装rpm包

    1
    rpm -ivh xxx.rpm

Ubuntu

  • apt-get是用于管理deb包的

Ubuntu上安装rpm包

  • 安装alien

    1
    sudo apt-get install alien
  • 转换

    1
    2
    # generate a deb package with name xxx.deb
    sudo alien xxx.rpm
  • 安装包

    1
    sudo dpkg -i xxx.deb
  • 说明

    • 不是所有的RPM包都能通过alien成功转换成deb包并成功安装的,能找到deb包的最好使用deb包安装

总结

  • Ubuntu使用deb包(apt-get, dpkg),Centos使用RPM包(yum, rpm)
  • deb包和RPM包可互相转换(使用alien包转换即可)

DL——BERT

  • 参考博客:
    • 从Word Embedding到Bert模型—自然语言处理中的预训练技术发展史
    • BERT详解
  • BERT论文: BERT: Pre-training of Deep Bidirectional Transformers for Language Understanding

BERT之前

Word2Vec的缺点

  • 多义词问题 : 传统的Word Embedding无法识别多义词
    • 确切的说是所有词的固定表征的方式(静态方式)的缺点
    • 所谓静态指的是训练好之后每个单词的表达就固定住了

从Word Embedding到ELMo

ELMo, Embedding from Language Models
根据当前上下文对Word Embedding动态调整的思路

  • ELMo 论文原文: NAACL 2018 Best Paper, Deep contextualized word representations
  • ELMo的本质思想:
    • 事先用语言模型学好一个单词的Word Embedding,此时多义词无法区分
    • 实际使用Word Embedding的时候,单词已经具备了特定的上下文了,这个时候我可以根据上下文单词的语义去调整单词的Word Embedding表示
    • 这样经过调整后的Word Embedding更能表达在这个上下文中的具体含义,自然也就解决了多义词的问题了
  • ELMo是典型的两阶段训练过程: 预训练 + 特征融合?
    • 第一个阶段是利用语言模型进行预训练
    • 第二阶段通过基于特征融合的方式训练
  • ELMo预训练过程示意图
  • ELMo预训练后如何处理下游任务?
    • 预训练训练完成后, 模型训练时使用在线特征抽取,和特征集成的方式对词向量在不同的上下文中进行不同的修正,从而区分多义词
补充知识: 下游任务

下游任务包括很多, 整体上可以分为四大类

序列标注
  • 分词
  • POS Tag
  • NER
  • 语义标注
  • …
分类任务
  • 文本分类
  • 情感计算
  • …
句子关系判断
  • Entailment
  • QA
  • 自然语言推理
  • …
机器翻译
  • 机器翻译
  • 文本摘要
  • …
预训练模型
  • 预训练模型是什么?
    • 预训练模型是指在训练结束是结果比较好的一组权重值,研究人员分享出来供他人使用,基于这些预训练好的权重可以提升我们的模型训练速度和精度
    • 预训练模型能够成功的本质是我们假设预训练模型足够好, 能学到句子的所有信息(包括序列信息等)
  • 两阶段预训练模型如何处理下游任务?
    • 预训练与下游任务无关
      • 预训练阶段是预训练模型自己选择相应的NLP任务,然后让模型在学习处理这些任务的途中实现参数的训练
      • 比如BERT选择的就是MLM(屏蔽语言模型)和NSP(Next Sentence Predition, 下一个句子预测)两个任务来做预训练
    • 不同的下游任务往往需要修改第二阶段中的模型结构等
    • 为适应不同的下游任务, 第二阶段可能使用不同结构, 比如添加Softmax层等方式
ELMo的优缺点
  • 优点:
    • 很好的解决了多义词问题,而且效果非常好
    • 采用上下文来训练词(从上下文预测单词, 上文称为Context-before, 下文称为Context-after)
  • 缺点:
    • 特征提取器没有使用新贵Transformer, 而是传统的LSTM, 特征抽取能力不足

从Word Embedding到GPT

GPT, Generative Pre-Training

  • ELMo的训练方法和图像领域的预训练方法对比,模式不同, ELMo使用的是基于特征融合的预训练方法
  • GPT使用的预训练方法则是在NLP领域开创了和图像领域一致的预训练方法基于Fine-tuning的模式
  • GPT也采用两阶段过程: 预训练 + Fine-tuning
    • 第一个阶段是利用语言模型进行预训练
    • 第二阶段通过Fine-tuning的模式训练
  • GPT预训练后如何处理下游任务?
  • 一些下游任务的Fine-tuning结构

GPT的优缺点

  • 优点:
    • 特征提取器是Transformer,不是RNN, 所以特征提取效果好
  • 缺点
    • GPT使用的是单向语言模型 : 也就是说只用到了上文来预测词
    • 词嵌入时没有单词的下文, 失去了很多信息

BERT结构和原理

下面的讲解都将按照原论文的思路讲解

  • BERT(Bidirectional Encoder Representations from Transformers), 原文 BERT: Pre-training of Deep Bidirectional Transformers for Language Understanding
  • 下图是BERT与GPT和ELMo对比的结构图
    • 图中的每个 Trm 组件就是一个 Transformer 的Encoder部分,也就是下图中的左半部分

BERT的特点

  • 一体化特征融合的双向(Bidirectional)语言模型
    • 利用语言的双向信息
    • GPT是单向语言模型, 只能利用一个方向的信息
    • ELMo也是双向语言模型,但是 ELMo实际上是两个方向相反的单向语言模型的拼接, 融合特征的能力比BERT那种一体化的融合特征方式弱
  • 特征提取器:
    • 使用Transformer(实际上使用的是Transformer的Encoder部分, 图中每个)
  • 预训练任务:
    • 屏蔽语言模型(MLM, Masked Language Model) + 相邻句子判断(NSP, Next Sentence Prediction)两个任务的多任务训练目标
  • 训练数据量:
    • 超大规模的数据训练使得BERT结果达到了全新的高度
    • 可以使用BERT作为词嵌入(Word2Vec)的转换矩阵, 实现比其他词嵌入模型更好的结果

输入表示

  • 输入结构图
  • BERT输入是一个512维的编码向量, 是三个嵌入特征的单位和
WordPiece嵌入
  • 对应图中的Token Embedding
  • WordPiece是指将单词划分成一组有限的公共子词单元,能在单词的有效性和字符的灵活性之间取得一个折中的平衡
  • 举例: 原文中 “playing” 被拆分成了 “play” 和 “##ing” 两部分
Segment Embedding

分割嵌入

  • 对应图中的Segment Embedding
  • 用于区分两个句子,例如B是否是A的下文(对话场景,问答场景等)
  • 对于句子对,第一个句子的特征值是0,第二个句子的特征值是1, 从而模型可以表达出词出现在前一个句子还是后一个句子
Position Embedding

位置嵌入

  • 对应图中的 Position Embedding
  • 位置嵌入是指将单词的位置信息编码成特征向量
  • 这是继承自论文Google Brain, NIPS 2017: Attention Is All You Need中, 文章中的Transformer架构没有使用RNN,不能编码位置信息,就是在进入Attention前使用了 词嵌入 + 位置嵌入 的方式让模型能够表达位置信息的

预训练

  • 通常预训练是指在训练阶段让模型去解决自然语言任务, 从而训练完成后得到可移植到其他模型(或者当前模型)使用的参数(包括词向量等)
  • BERT的预训练使用了两个NLP任务: MLM + NSP
  • BERT预训练和使用概览:
    • 从上图可以看出, BERT的预训练包含了两方面的任务, NSP和 MLM
    • 实验证明, MLM 优于标准的 LTR(left-to-right)语言模型(OpenAI GPT 使用的就是这个)
屏蔽语言模型(MLM)

Masked Language Model

  • Masked Language Model(MLM)核心思想取自Wilson Taylor在1953年发表的一篇论文“Cloze Procedure”: A New Tool for Measuring Readability
  • 在训练的时候随机从输入预料上屏蔽(Mask)掉一些单词,然后通过的上下文预测该单词(“完形填空”)
  • 传统的语言模型是Left-to-Right(LTR)或者是Right-to-Left(RTL)的, 和 RNN 结构匹配
  • MLM 的性质 和 Transformer 的结构匹配
  • BERT实验中, 有15%的WordPiece Token句子会被屏蔽掉, 在屏蔽的时候,又有不同的概率
  • 如果已经选中(15%概率)要屏蔽 my dog is hairy 中的 hairy, 那么我们的处理方式是:
    • 80%: my dog is hairy -> my dog is [MASK]
    • 10%: my dog is hairy -> my dog is apple
    • 10%: my dog is hairy -> my dog is hairy
    • 防止句子中的某个Token 100%都会被mask掉,那么在Fine-tuning的时候模型就会有一些没有见过的单词
    • 加入随机Token的原因是因为Transformer要保持对每个输入Token的分布式表征,否则模型就会记住这个[MASK]是 Token “hairy”
    • 错误单词带来的负面影响: 一个单词被随机替换掉的概率只有 \(15% \times 10% = 1.5%\) 这个负面影响其实是可以忽略不计的
  • 另外: 文章指出每次只预测15%的单词,因此模型收敛的比较慢
为什么使用MLM
  • 因为效果好,解释就是MLM更符合Transformer的结构
  • 论文中的实验结果:
    • MNLI(Multi-Genre Natural Language Inference)是多类型自然语言推理任务, 是一个大规模的众包蕴含分类任务, 给定一个句子,目标是预测第二句相对与第一句是一个蕴含语句, 矛盾语句, 还是中性语句
    • 从图中可以看出,在MNLI任务中, MLM预训练 + MNLI Fine-tuning 的效果明显优于 LTR预训练 + MNLI Fine-tuning 的效果
相邻句子预测(NSP)

Next Sentence Prediction

  • NSP 的任务是判断句子B是否是句子A的下文
  • 图中的[CLS]符号就是用于分类的, 如果是的话输出’IsNext‘,否则输出’NotNext‘
  • 训练数据的生成方式是从平行语料中随机抽取的连续两句话,其中50%保留抽取的两句话,它们符合IsNext关系,另外50%的第二句话是随机从预料中提取的,它们的关系是NotNext的
  • 举例来说:
    • Input = [CLS] the man went to [MASK] store [SEP] he bought a gallon [MASK] milk [SEP]
    • Label = IsNext
    • Input = [CLS] the man [MASK] to the store [SEP] penguin [MASK] are flight ##less birds [SEP]
    • Label = NotNext

Fine-tuning 处理下游任务

Fine-tining, 中文也称为微调

  • 下图是BERT在不同任务的的微调方法

  • 第二阶段,Fine-Tuning阶段,这个阶段的做法和GPT是一样的。当然,它也面临着下游任务网络结构改造的问题,在改造任务方面Bert和GPT有些不同

    • 句子类关系任务: 和GPT一样,增加起始和终结符号,输出部分Transformer最后一层每个单词对应部分都进行分类即可
    • 除了生成任务外, 其他任务Bert都涉及到了

BERT的使用

  • Google公开了两个不同规模的 BERT模型:
    • ** \(BERT_{BASE}\) ** : 110M模型参数
    • ** \(BERT_{LARGE}\) ** : 340M模型参数
  • 同时公开了两个模型在大规模数据预训练后的参数集合, 供开发者下载和使用

基于BERT的新秀

  • Token仍然使用词, 但是MLM屏蔽时选择屏蔽短语或者实体

ERNIE from Baidu

  • 参考文章: [ERNIE: Enhanced Representation through Knowledge Integration]
  • 核心思想:
    • 用先验知识来加强预训练模型(考虑实体,短语等级别的屏蔽)
    • 在BERT的预训练阶段, MLM模型中屏蔽一个实体(Entity)或者短语(Phrase)而不是屏蔽一个字(Word)
  • 文中提出三种级别的屏蔽方式
    • 基本级别(Basic-level)
    • 实体级别(Entity-level)
    • 短语级别(Phrase-level)
实验对比

ERINE from THU

  • 参考文章: ERNIE: Enhanced Language Representation with Informative Entities
  • 核心思想:
    • 利用先验知识来加强预训练模型(引入知识图谱)
    • 提出了将知识纳入语言表达模型的方法
    • 使用知识聚合器(Knowledgeable aggregator)和预训练任务 dEA, 更好的融合来自文本和知识图谱的异构信息
  • 知识信息
  • 模型架构
1…434445…67
Joe Zhou

Joe Zhou

Stay Hungry. Stay Foolish.

662 posts
53 tags
GitHub E-Mail
© 2026 Joe Zhou
Powered by Hexo
|
Theme — NexT.Gemini v5.1.4