Jiahong 的个人博客

凡事预则立,不预则废


  • Home

  • Tags

  • Archives

  • Navigation

  • Search

Python——协程asyncio库的使用


整体说明

  • asyncio 是 Python 内置的 异步 I/O 框架 ,核心用于编写高效的并发代码,尤其适合处理 I/O 密集型任务(如网络请求、文件读写、数据库操作等)
  • asyncio 基于 协程(coroutine) 实现,通过事件循环(Event Loop)调度任务,避免了多线程的上下文切换开销,效率更高
  • asyncio 在较高的 Python 版本中才可以使用,建议在 Python 3.7 以上使用
  • 常见应用场景包括
    • 网络请求:并发调用 API(如结合 aiohttp 库)
    • 文件操作:异步读写文件(如结合 aiofiles 库)
    • 数据库操作:异步操作数据库(如 asyncpg 用于 PostgreSQL,motor 用于 MongoDB)
    • WebSocket 服务:实现高并发的实时通信(如 websockets 库)
    • 定时任务:通过 asyncio.create_task() + 循环实现简单定时任务
  • 仅适用于 I/O 密集型任务:
    • asyncio 是单线程的,CPU 密集型任务会阻塞事件循环,需结合 loop.run_in_executor() 提交到线程池/进程池
    • await 只能在协程中使用:await 关键字不能在普通函数中使用,必须在 async def 定义的协程中
    • 事件循环是单线程的:协程的并发是“协作式”的,需通过 await 主动交出执行权,否则会独占事件循环

asyncio 相关核心概念

协程(Coroutine)

  • 协程是可暂停、可恢复的函数,用 async def 定义(语法),是 asyncio 的核心执行单元
  • 协程函数调用后不会立即执行 ,而是返回一个协程对象(coroutine object),需通过事件循环调度才能运行

事件循环(Event Loop)

  • asyncio 的“大脑”,负责调度所有协程任务:
    • 管理任务的暂停/恢复、监听 I/O 事件、分发任务执行权
  • 通常通过 asyncio.run() 自动创建和管理事件循环(推荐用法)

等待对象(Awaitable)

  • 可被 await 关键字修饰的对象
    • 包括:协程对象、Task、Future
  • await 会暂停当前协程,等待目标对象完成后再恢复,期间事件循环可调度其他协程执行(实现并发)

Task(任务)

  • 对协程的封装,将协程注册到事件循环中,使其可被调度执行
  • 通过 asyncio.create_task() 创建,会自动加入事件循环并运行

Future

  • 表示异步操作的“未来结果”,是低层级的对象(通常无需手动创建,Task 继承自 Future)

asyncio 基础用法

  • 定义和运行协程
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    import asyncio

    # 定义协程函数(async def 关键字)
    async def hello(name):
    print(f"Hello, {name}! (开始)")
    # 模拟 I/O 等待(必须用 await 修饰可等待对象)
    await asyncio.sleep(1) # 暂停 1 秒,期间事件循环可执行其他任务
    print(f"Hello, {name}! (结束)")

    # 运行协程(Python 3.7+ 推荐用 asyncio.run())
    asyncio.run(hello("asyncio"))
    # Hello, asyncio! (开始)
    ## 【等待】... 1s
    # Hello, asyncio! (结束)

并发执行多个协程

  • 通过 asyncio.gather() 或 asyncio.create_task() 实现并发(多个任务同时执行,总耗时接近最长任务的耗时)

  • 方式 1:asyncio.gather()(批量等待多个协程)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    import asyncio

    async def task1():
    await asyncio.sleep(2)
    return "Task 1 完成"

    async def task2():
    await asyncio.sleep(1)
    return "Task 2 完成"

    async def main():
    # 并发执行 task1 和 task2,等待所有完成后返回结果(按传入顺序)
    result1, result2 = await asyncio.gather(task1(), task2())
    print(result1)
    print(result2)

    # 总耗时大约 2 秒(而非 2+1=3 秒)
    asyncio.run(main())

    # Task 1 完成
    # Task 2 完成
  • 方式 2:asyncio.create_task()(手动创建任务)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    import asyncio

    async def task(name, delay):
    await asyncio.sleep(delay)
    print(f"Task {name} 完成(延迟 {delay} 秒)")

    async def main():
    # 创建任务并自动加入事件循环
    task1 = asyncio.create_task(task("A", 2))
    task2 = asyncio.create_task(task("B", 1))

    # 等待任务完成(可单独等待,也可一起等待)
    await task1
    await task2

    asyncio.run(main())

    # Task B 完成(延迟 1 秒)
    # Task A 完成(延迟 2 秒)

附录:处理异常

  • 协程中的异常需通过 try/except 捕获,或在 gather() 中通过 return_exceptions=True 收集异常
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    import asyncio

    async def task1():
    await asyncio.sleep(2)
    return "Task 1 完成"

    async def faulty_task():
    await asyncio.sleep(1)
    raise ValueError("任务执行失败!") # 模拟抛出异常

    async def main():
    # 方式1:捕获单个协程的异常
    try:
    await faulty_task()
    except ValueError as e:
    print(f"捕获异常:{e}") # 捕获异常:任务执行失败!

    # 方式2:批量捕获多个协程的异常(return_exceptions=True)
    results = await asyncio.gather(
    task1(), # 正常任务,输出 "Task 1 完成"
    faulty_task(), # 异常任务,抛出异常 ValueError("任务执行失败!")
    return_exceptions=True # 不终止,返回异常对象
    )
    print(results) # 输出:["Task 1 完成", ValueError("任务执行失败!")]

    asyncio.run(main())

协程的进阶特性

超时控制(asyncio.wait_for())

  • 限制协程的执行时间,超时则抛出 TimeoutError
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    import asyncio

    async def long_task():
    await asyncio.sleep(3) # 模拟耗时 3 秒的任务

    async def main():
    try:
    # 限制任务 2 秒内完成,超时则取消任务并抛出异常
    result = await asyncio.wait_for(long_task(), timeout=2)
    except asyncio.TimeoutError:
    print("任务超时被取消!")

    asyncio.run(main())

任务取消(Task.cancel())

  • 手动取消正在执行的任务,被取消的任务会抛出 CancelledError
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    import asyncio

    async def endless_task():
    try:
    while True:
    print("任务运行中...")
    await asyncio.sleep(1)
    except asyncio.CancelledError:
    print("任务被取消!")
    raise # 可选:重新抛出,让调用方知道任务被取消

    async def main():
    task = asyncio.create_task(endless_task())
    await asyncio.sleep(2) # 运行 2 秒后取消
    task.cancel()
    await task # 必须等待任务处理取消逻辑

    asyncio.run(main())

异步上下文管理器(async with)

  • 用于异步资源的获取和释放(如异步数据库连接、异步文件),需实现 __aenter__ 和 __aexit__ 方法
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    import asyncio

    class AsyncResource:
    async def __aenter__(self):
    print("获取异步资源")
    await asyncio.sleep(0.5)
    return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
    print("释放异步资源")
    await asyncio.sleep(0.5)

    async def main():
    async with AsyncResource() as res:
    print("使用异步资源")

    asyncio.run(main())

    # 获取异步资源
    # 使用异步资源
    # 释放异步资源

异步迭代器(async for)

  • 用于迭代异步生成的数据(如异步流、分页接口),需实现 __aiter__ 和 __anext__ 方法
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    import asyncio

    class AsyncIterator:
    def __init__(self, limit):
    self.limit = limit
    self.count = 0

    def __aiter__(self):
    return self

    async def __anext__(self):
    if self.count >= self.limit:
    raise StopAsyncIteration
    self.count += 1
    await asyncio.sleep(0.5) # 模拟异步获取数据
    return self.count

    async def main():
    async for num in AsyncIterator(3):
    print(f"迭代得到:{num}")

    asyncio.run(main())

    # 迭代得到:1
    # 迭代得到:2
    # 迭代得到:3

Python——多重继承


Python多重继承简单示例

  • Python多重继承简单示例:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    class Parent1:
    def method1(self):
    print("This is method 1 from Parent1")

    class Parent2:
    def method2(self):
    print("This is method 2 from Parent2")

    class Child(Parent1, Parent2):
    def child_method(self):
    print("This is a method from Child")

    # 创建Child类的实例
    child = Child()
    # 调用从Parent1继承的方法
    child.method1()
    # 调用从Parent2继承的方法
    child.method2()
    # 调用Child类自身的方法
    child.child_method()

super().__init__()的调用规则:

  • super().__init__()前,需要先构造整个MRO,super().__init__()本质实在调用MRO中该类的下一个

  • 所以:在单继承时会调用当前类的父类,但在调用MRO中的第一个方法父类的初始化,且仅调用第一个

  • 多重继承时,父类的MRO相对顺序会得到保障,但是可能会插入其他类(注:此时父类调用super().__init__()时可能不再直接调用其真实父类的初始化函数,而是MRO中的下一个)

    • 比如没有D时,B的直接父类是A(B的MRO是B-next->A),但是因为在D的MRO中,B的后一个是C(D的MRO是B-next->C),此时如果B没有初始化super().__init__(),则# Initializing C不会打印
  • 代码示例1:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    class Base1():
    def __init__(self):
    self.name = "Base1"
    print("Initializing Base1")

    def show(self):
    print(f"Name from Base1: {self.name}")

    class Base2():
    def __init__(self):
    self.name = "Base2"
    print("Initializing Base2")

    def show(self):
    print(f"Name from Base2: {self.name}")

    class DerivedUsingInit(Base1, Base2):
    def __init__(self):
    Base1.__init__(self)
    Base2.__init__(self)
    print("Initializing DerivedUsingInit")


    print("\nUsing init for initialization:")
    derived1 = DerivedUsingInit()
    derived1.show()
    print("DerivedUsingInit.__mro__:", DerivedUsingInit.__mro__)

    print("------------------")
    class DerivedUsingSuper(Base1, Base2):
    def __init__(self):
    super().__init__() # 仅调用Base1的__init__函数,不会调用Base2的
    print("Initializing DerivedUsingSuper")

    print("\nUsing super() for initialization:")
    derived2 = DerivedUsingSuper()
    derived2.show()
    print("DerivedUsingSuper.__mro__:", DerivedUsingSuper.__mro__)

    # Initializing Base1
    # Initializing Base2
    # Initializing DerivedUsingInit
    # Name from Base1: Base2
    # DerivedUsingInit.__mro__: (<class '__main__.DerivedUsingInit'>, <class '__main__.Base1'>, <class '__main__.Base2'>, <class 'object'>)
    # ------------------
    #
    # Using super() for initialization:
    # Initializing Base1
    # Initializing DerivedUsingSuper
    # Name from Base1: Base1
    # DerivedUsingSuper.__mro__: (<class '__main__.DerivedUsingSuper'>, <class '__main__.Base1'>, <class '__main__.Base2'>, <class 'object'>)
  • 代码示例2:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    class A:
    def __init__(self):
    print("Initializing A")

    class B(A):
    def __init__(self):
    super().__init__() # 单继承时调用A的init,多继承时可能调用其他的类的init
    print("Initializing B")

    class C(A):
    def __init__(self):
    super().__init__()
    print("Initializing C")

    class D(B, C):
    def __init__(self):
    super().__init__()
    print("Initializing D")

    d = D()
    print("D.__mro__: ", D.__mro__)
    print("B.__mro__: ", B.__mro__)

    # Initializing A
    # Initializing C
    # Initializing B
    # Initializing D
    # D.__mro__: (<class '__main__.D'>, <class '__main__.B'>, <class '__main__.C'>, <class '__main__.A'>, <class 'object'>)
    # B.__mro__: (<class '__main__.B'>, <class '__main__.A'>, <class 'object'>)
  • 代码示例3:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    class A:
    def __init__(self):
    print("Initializing A")

    class B(A):
    def __init__(self):
    # super().__init__()
    print("Initializing B")

    class C(A):
    def __init__(self):
    super().__init__()
    print("Initializing C")

    class D(B, C):
    def __init__(self):
    super().__init__()
    print("Initializing D")

    d = D()
    print("D.__mro__: ", D.__mro__)
    print("B.__mro__: ", B.__mro__)

    # Initializing B
    # Initializing D
    # D.__mro__: (<class '__main__.D'>, <class '__main__.B'>, <class '__main__.C'>, <class '__main__.A'>, <class 'object'>)
    # B.__mro__: (<class '__main__.B'>, <class '__main__.A'>, <class 'object'>)

MRO的简单理解

  • Python 2.3 及以后的版本采用 C3 线性化算法来确定 MRO,C3 线性化算法的核心目标是保证 MRO 满足三个重要特性:
    • 单调性:子类必须在父类之前被检查
    • 局部优先性:类定义中父类的顺序会被保留
    • 一致性:如果一个类继承自多个父类,那么 MRO 必须保证所有父类的 MRO 顺序一致
    • 注意:对于相同名称的同一个函数,MRO顺序靠前的生效,靠后的会被靠前的覆盖
  • 示例
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    class A:
    pass

    class B(A):
    pass

    class C(A):
    pass

    class D(B, C):
    pass

    # 打印 D 类的 MRO
    print(D.mro())

继承顺序要满足MRO规则

  • 错误示例,下面的GrandChild1不满足MRO规则,会报错:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    class Parent:
    def __init__(self):
    self.value = "I'm from Parent"

    class Child1(Parent):
    def __init__(self):
    self.value = "I'm from Child1"

    class Child2(Parent):
    def __init__(self):
    self.value = "I'm from Child2"

    class GrandChild(Child1, Child2, Parent): # OK
    def __init__(self):
    self.value = "I'm from GrandChild"

    class GrandChild1(Parent, Child1, Child2): # 报错
    def __init__(self):
    self.value = "I'm from GrandChild1"
    # TypeError: Cannot create a consistent method resolution order (MRO) for bases Parent, Child1, Child2

补充:super()函数详细说明

  • super()函数本质是返回了当前类的 MRO 的一个下一个对象,对于单继承模式而言,就是当前类的父类

super()函数的高阶用法

  • super()函数还可以传入参数,示例如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    class A:
    def method(self):
    print("Method in A")

    class B(A):
    def method(self):
    print("Method in B")

    class C(A):
    def method(self):
    print("Method in C")

    class D(B, C):
    def method(self):
    super(B, self).method() # 调用C类的method方法
    # super(B, self).method1() # 报错,因为C类没有method1方法
    print("Method in D")

    d = D()
    d.method()
    print(D.__mro__)
    print(B.__mro__)

    # Method in C
    # Method in D
    # (<class '__main__.D'>, <class '__main__.B'>, <class '__main__.C'>, <class '__main__.A'>, <class 'object'>)
    # (<class '__main__.B'>, <class '__main__.A'>, <class 'object'>)
    • super(B, self) 是按照方法解析顺序(MRO)来查找 B 类在当前实例 self 的继承关系中的下一个类
    • Python会根据类的MRO来确定方法的调用顺序,MRO是一个列表,它定义了类及其父类的搜索顺序。当使用 super(B, self) 时,它会在 self 所属类的MRO中,从 B 类的下一个位置开始查找方法。例如在多重继承中,通过这种方式可以明确指定从某个类之后的MRO顺序中查找方法,以实现特定的方法调用逻辑
    • 在这个例子中, D 类继承自 B 和 C 类,而 B 和 C 又都继承自 A 类。在 D 类的 method 方法中,使用 super(B, self).method() 明确指定跳过 B 类,调用 C 类中继承自 A 类的 method 方法
  • 注意:在Python的 super() 函数中,默认是调用自身类和对象作为参数的 super 函数,例如:

    1
    2
    3
    4
    class A(B):
    def __init__(self):
    # 等价于调用super().__init__(),
    super(A, self).__init__()
    • 注:Python3中调用super(A, self).__init__() 等价于调用super().__init__(),但在部分Python2旧版本中必须明确指明类和对象
  • 自定义类层次结构中的方法调用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    class Base:
    def operation(self):
    print("Base operation")

    class Derived1(Base):
    def operation(self):
    print("Derived1 operation")

    class Derived2(Base):
    def operation(self):
    print("Derived2 operation")

    class Composite:
    def __init__(self):
    self.derived1 = Derived1()
    self.derived2 = Derived2()
    def operation(self):
    super(Derived1, self.derived1).operation() # 调用Base的operation方法
    super(Derived2, self.derived2).operation() # 调用Base的operation方法
    composite = Composite()
    composite.operation()
    print(Composite.__mro__)
    print(Derived1.__mro__)
    print(Derived2.__mro__)

    # Base operation
    # Base operation
    # (<class '__main__.Composite'>, <class 'object'>)
    # (<class '__main__.Derived1'>, <class '__main__.Base'>, <class 'object'>)
    # (<class '__main__.Derived2'>, <class '__main__.Base'>, <class 'object'>)
    • 在 Composite 类中,通过 super(cls, instance) 的方式,分别调用了 Derived1 和 Derived2 类的父类 Base 中的 operation 方法。这种方式可以在自定义的类层次结构中,灵活地控制方法的调用路径
    • 不建议使用过于复杂的操作,非必要也不做多重继承,通常情况下,使用默认的 super() 函数调用方式就能满足大多数的编程需求,但是很多官方的库中,会明确显式知名其自身类和对象,比如PyTorch的 nn.Linear 类的实现:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      class Linear(Module):
      """...comments..."""
      # ...
      def __init__(self, in_features: int, out_features: int, bias: bool = True,
      device=None, dtype=None) -> None:
      factory_kwargs = {'device': device, 'dtype': dtype}
      super(Linear, self).__init__() # 这里是显示调用其父类的初始化函数,等价于 super().__init__()
      self.in_features = in_features
      self.out_features = out_features
      self.weight = Parameter(torch.empty((out_features, in_features), **factory_kwargs))
      if bias:
      self.bias = Parameter(torch.empty(out_features, **factory_kwargs))
      else:
      self.register_parameter('bias', None)
      self.reset_parameters()

Python——抽象基类ABC的使用


Python定义抽象类需要使用ABC

  • 定义抽象类的关键步骤
    • 抽象基类 :需要继承ABC
    • 抽象方法 :需要使用@abstractmethod装饰器标记的抽象方法
  • 这样做的好处:
    • 防止实例化 :抽象基类本身不能被实例化(不继承ABC则不算是抽象基类,其子类不做实例化的检查),只能通过子类继承并实现抽象方法后才能使用
    • 可读性 :继承了ABC类的抽象接口更容易阅读

ABC类的说明及使用

  • 在Python中,ABC类(Abstract Base Class,抽象基类)用于 定义抽象基类。抽象基类的主要目的是为其他类提供一个共同的接口或规范,确保子类实现特定的方法或属性
  • 注意:注意是用于定义抽象基类,不是抽象类,ABC是抽象基类的基类

定义抽象类的最佳实践

  • 为了代码的清晰性和规范性,建议继承ABC 来 定义抽象基类。例如:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    from abc import ABC, abstractmethod

    class Shape(ABC):
    @abstractmethod
    def area(self):
    pass

    @abstractmethod
    def perimeter(self):
    pass

    class Rectangle(Shape):
    def __init__(self, width, height):
    self.width = width
    self.height = height

    def area(self):
    return self.width * self.height

    def perimeter(self):
    return 2 * (self.width + self.height)

    # 实例化子类
    rect = Rectangle(5, 10)
    print(rect.area()) # 输出: 50
    print(rect.perimeter()) # 输出: 30

    # 实例化检查:如果子类未实现抽象方法,实例化时会抛出 TypeError
    class Circle(Shape):
    pass

    circle = Circle() # 抛出 TypeError: Can't instantiate abstract class Circle

抽象基类不继承ABC会怎样?

  • 如果不继承ABC,可以通过以下方式实现类似抽象基类的功能:

替代方法:手动抛出NotImplementedError

  • 一种不继承ABC但实现抽象类的方法
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    class MyAbstractClass:
    def my_abstract_method(self):
    raise NotImplementedError("Subclasses must implement this method")

    class MyConcreteClass(MyAbstractClass):
    def my_abstract_method(self):
    print("Implemented abstract method")

    # 实例化子类
    obj = MyConcreteClass()
    obj.my_abstract_method() # 输出: Implemented abstract method

    # 如果子类未实现抽象方法,调用时会抛出异常
    class IncompleteClass(MyAbstractClass):
    pass

    obj = IncompleteClass()
    obj.my_abstract_method() # 抛出 NotImplementedError

错误方法示例:使用abc模块但不继承ABC

  • 下面的方法仅使用@abstractmethod,无法实现抽象类,必须要继承ABC才行:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    from abc import abstractmethod

    # 不继承ABC,增加abstractmethod也没用
    class MyAbstractClass:
    @abstractmethod
    def my_abstract_method(self):
    pass

    # 如果子类未实现抽象方法,实例化时也不会抛出异常
    class IncompleteClass(MyAbstractClass):
    pass

    obj = IncompleteClass() # 可以正常初始化

总体结论

  • 继承 ABC 并使用 @abstractmethod 是定义抽象基类的必须方式
  • 如果选择不继承 ABC,可以通过抛出 NotImplementedError 或手动检查方法实现来模拟抽象基类的行为,但这种方式不够优雅且容易出错

附录:抽象方法实现时的参数规则

  • 核心结论:
    • 抽象方法的子类实现,参数可以不一致 ,Python 语法层面完全允许,不会报错
    • 但是 强烈不建议参数不一致 ,违背抽象基类的设计初衷,会造成严重的代码问题和逻辑混乱,比如多态无法使用

语法层面:子类实现抽象方法,参数完全可以不一样(实测验证)

  • Python 是弱语法约束的语言,对于抽象方法的重写,不强制要求子类方法的形参 和 父类抽象方法的形参完全一致 ,包括 参数个数、参数名称、默认参数 都可以不同,代码能正常运行,不会报语法/运行时错误
  • 示例:形参个数、名称完全不同(合法,无报错)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    from abc import ABC, abstractmethod

    # 抽象基类:抽象方法有2个参数
    class Animal(ABC):
    @abstractmethod
    def speak(self, sound, volume):
    pass

    # 子类实现:只有1个参数,参数名也不一样(语法完全允许)
    class Dog(Animal):
    def speak(self, voice):
    print(f"小狗:{voice}")

    # 实例化+调用,完全正常运行
    d = Dog()
    d.speak("汪汪汪~") # 输出:小狗:汪汪汪~

为什么 强烈不建议参数不一致 ?

  • Python 语法允许 不等于 写法合理,参数不一致 会彻底违背我们使用抽象基类的核心目的,主要有3个致命问题:
    • 违背抽象基类的设计初衷
    • 引发多态失效 + 调用报错
    • 造成代码可读性极差 + 协作灾难

附录:父类初始化

  • 如果重写了父类的初始化方法,一定要在子类的 __init__ 函数中 执行 super().__init__() ,以完成父类的初始化
    • __init__ 是 Python 的构造方法,用于初始化实例属性
    • 子类继承父类时,如果子类重写了 __init__ 但不主动调用父类的 __init__ ,父类的构造方法不会被自动执行,进而导致父类中定义的属性 / 初始化逻辑完全失效

DL——混合精度训练

混合精度 (Automatically Mixed Precision, AMP)

数字类型

FP32

  • FP32,Single-precision floating-point
  • 4B, 32位,符号位1位,指数位8位,尾数位23位
    • 符号位:用于表示数值的正负
    • 指数位:用于表示数值的范围
    • 尾数位(Fraction):也称为小数位,用于表示数值的精度
  • 数字表达范围: \([-3e^{38},-1e^{-45}] \bigcup [1e^{-45}, 3e^{38}]\)
  • 下溢精度: \(1e^{-38}\)

FP16

  • FP16,Half-precision floating-point
  • 2B, 16位,符号位1位,指数位5位,尾数位10位
  • 数字表达范围: \([-65504,-5.9e^{-8}] \bigcup [5.9e^{-8}, 65504]\)
  • 下溢精度: \(5.9e^{-8}\)

BF16

  • BF16,Brain Floating Point
  • 2B,16位,符号位1位,指数位8位,尾数位7位
  • 数字表达范围: \([−3e^{38}, -9.2^{-41}],[9.2^{-41}, 3e^{38}]\)
  • 下溢精度: \(9.2^{-41}\)

FP32 vs FP16

  • 能表达的数字范围和精度远小于FP32
  • 浮点数都有个上下溢问题:
    • 上/下溢出:FP16 的表示范围不大,非常容易溢出
    • 超过 \(6.5e^4\) 的数字会上溢出变成 inf,小于 \(5.9e^{-8}\) 的数字会下溢出变成 0

FP16 vs BF16

  • 都是2B存储
  • BF16指数位更多:可以表示更大范围的数值
  • FP16尾数位更多:可以表示更精确的数值,如果都是小数值,用FP16更好
    • 注意:下溢精度不等于精度,下溢精度与指数关系更大

混合精度训练

  • 最早论文:Mixed precision training
    • 作者:百度,英伟达
    • 一次迭代过程
    • 基本思路:保持原始参数还是fp32的情况下,将计算梯度等所有流程都使用fp16进行,节省内存/显存的同时提升训练速度
    • fp16会损失精度,所以在过程中需要用到scaling操作
  • 混合精度训练的优点
    • 减少显存占用:FP16 的显存占用只有 FP32 的一半,这使得我们可以用更大的 batch size;
      • 混合精度训练下,需要存储的变量为:FP16的梯度,FP16的参数,FP32的参数;好像并没有减少显存啊?
    • 加速训练:使用 FP16,模型的训练速度几乎可以提升 1 倍
  • FP16的下溢值这么大,梯度一般都很小,为什么能存储梯度?
    • 通过loss scaling技术,对loss进行缩放(放大很多倍)可以确保梯度不会下溢,在更新时转换成FP32再unscale回去即可(需要FP32)
  • FP32有什么用?(为什么不能只使用fp16呢?)
    • 防止FP16导致误差过大:将模型权重、激活值、梯度等数据用 FP16 来存储,同时维护一份 FP32 的模型权重副本用于更新。在反向传播得到 FP16 的梯度以后,将其转化成 FP32 并 unscale,最后更新 FP32 的模型权重。因为整个更新过程是在 FP32 的环境中进行的,所以不会出现舍入误差。
  • 为了节省存储和加快训练速度,特别是大模型时代,越来越重要

混合精度的使用

  • 更新最新参考资料:由浅入深的混合精度训练教程

IDEA——lombok包使用说明

参考链接:https://blog.csdn.net/sunayn/article/details/85252507

  • 在Java使用lombok包提供Setter和Getter等注解可以简化编程

lombok包使用流程

  • 在pom中导入lombok包依赖

    1
    2
    3
    4
    5
    6
    <!--lombok 注解-->
    <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.16.10</version>
    </dependency>
  • 使用注解

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    package com.example.springboot.model;

    import lombok.Getter;
    import lombok.Setter;

    import java.util.Date;

    @Setter
    @Getter
    public class User {
    private Integer id;

    private String name;

    private Integer age;

    private String sex;

    private Date birthday;

    }
    • 上面的例子是为整个类的属性加上了注解,但更多的使用时针对类里面的某个属性单独添加这@Setter和@Getter注解

可能存在的问题

IDEA中无法识别注解

  • 这可能导致代码找不到Getter和Setter方法而报错
  • 此时IDEA需要安装lombok插件才能正常使用

ML——样本不均衡问题


关于样本不均衡的解决

  • 相关实践: 某次比赛中, 训练集中正负样本数量分别为20663和569877,占比分别为3.499%和96.501%, 差别很大

基于数据的解决方案

过采样
  • 增加负样本的数量
    • 简单复制样本多次: 容易过拟合,模型变得复杂
    • 使用SMOTE算法采样: 对少数样本集 \(S_{min}\) 中的每一个样本 \(x\),从他的K近邻中随机选取一个样本 \(y\),然后在 \(x, y\) 之间随机选取一个新的点作为新合成的样本
      • 能降低过拟合风险
      • 需要一个样本间距离定义的函数,且对于少数样本多时选取最近邻的复杂度太大
      • 由于为每个样本都采样了,所以可能会增大类间重叠度(生成的样本可能是符合多数样本类别的,很容易造成生成没有意义甚至是噪声)
    • 使用Borderline-SMOTE算法优化SMOTE算法:
      • 只在分类边界上的少数样本进行采样
    • 使用ADASYN算法优化SMOTE算法:
      • 给不同的少数类样本合成不同个数的新样本
负采样
  • 减少正样本数量,让分类器更重视负样本
    • Easy Ensemble算法: 每次从 \(S_{maj}\) 中随机抽取子集 \(E\),然后用 \(E + S_{min}\),最后融合多个分类模型的结果
      • 融合模型时简单的可以对模型输出做均值(评估方式为AUC分数)或者投票(评估方式为分类精度)
    • Balance Cascade算法: 级联结构, 在每一级中从 \(S_{maj}\) 中随机抽取子集 \(E\),然后用 \(E + S_{min}\) 训练当前级的分类其,然后将 \(S_{maj}\) 能正确被当前分类器正确分类的样本剔除, 剩下不能正确分类的样本进行下一级操作,重复若干次后得到级联结构,最终的输出结果是各级分类器结果的融合
      • 这里有点像是Boosting方法,对样本权重进行修改,使得模型更重视上次分类错误的样本

基于算法的解决方案

  • 修改模型的训练目标函数
    • 比如使用 AUC_ROC 的负数作为损失函数
    • 深度学习中可以使用Focal Loss代替传统的SGD
  • 也可以将问题转化为基于单类学习的异常检测问题
  • 单类学习(One Class Learning), 异常检测(Anomaly Detection) *
    • 单类学习(One Class Learning): 训练数据只有一个类别,学习一个能够远离这个类别样本的Boundary,比如
      • 单类别SVM(One Class SVM)
      • K近邻非参数方法

一个重要的特殊说明

  • 在深度神经网络中,如果样本类别不平衡,不要使用BN, 否则会出现问题

KG——图嵌入综述

本文主要介绍图嵌入(Graph Embedding)的发展和方法,先简单记录,待后续有时间充实


图(Graph)

  • 结点和边的集合
  • 经典图论中的图, 知识图谱, 概率图模型中的图等
  • 传统的图算法包括(图论中的):
    • 最小生成树算法: Prim算法, Kruskal算法等
    • 最短路径算法: Dijkstra算法, Floyed算法等
  • 图神经网络算法包括
    • 图嵌入(Graph Embedding): 基于随机游走(Random Walk)生成路径
    • 图卷积神经网络(Graph CNN, GCN): 基于邻居汇聚实现

图嵌入

  • 用低维的向量来表示结点, 同时要求任意在图中相似的节点在向量空间中也接近.
  • 得到的节点的向量表示可以用来解决节点分类等下游任务

DeepWalk

  • 论文链接: DeepWalk KDD 2014
  • 核心思想: 通过将游走路径(walks)当做句子(sentences), 用从截断随机游走(truncated random walks)中得到的局部信息来学习隐式表示
    • 类似于Word2Vec, node对应word, walks对应sentence
  • 核心方法:
    • 随机游走方法进行采样
    • 使用Skip-Gram方法训练采样的样本
  • 实验结果:
    • 论文中展示了DeepWalk在多个多标签网络分类任务中的隐式表示, 比如BlogCatalog, Flickr和YouTube
    • 在某些实验中,仅仅用60%的训练数据即可到达(超过)所有Baseline方法
    • 在稀疏标记数据上F1分数表现良好

随机游走

  • 随机游走方法: 一种可重复访问(有放回采样)的深度优先遍历算法(DFS)
    • 给定起始访问节点A
    • 从A的邻居中随机采样一个节点B作为下一个节点
    • 从B的邻居中随机采样一个节点C作为下一个节点
    • ….
    • 直到序列长度满足truncated条件, 得到一个walk

Skip-Gram 训练

  • 对随机游走采样到的数据进行Skip-Gram训练
  • 最终得到每个节点的表示向量

Node2Vec

  • 论文链接: Node2Vec KDD 2016
  • 核心思想: 综合考虑DFS和BFS的图嵌入方法, 可以视为DeepWalk的一种扩展(DeepWalk的随机游走仅仅是考虑DFS的,不考虑BFS)

优化目标

  • Node2Vec要解决的问题: 找到一种从节点到embedding向量的映射函数 \(f\),最大化整体后验概率(乘积). 每个节点的后验概率为: 给定某个节点, 相邻节点出现的概率 \(Pr(N_S(u)|f(u))\)
    $$
    \begin{align}
    \max_f\sum_{u \in V} log Pr(N_S(u)|f(u))
    \end{align}
    $$

  • 为了简化上述问题,作者引入两个假设

    • 条件独立性(Conditional independence):
      $$
      \begin{align}
      Pr(N_S(u)|f(u)) = \prod_{n_i \in N_S(u)} Pr(n_i|f(u))
      \end{align}
      $$
    • 特征空间中的对称性(Symmetry in feature space): 假设源节点和邻居节点在特征空间中有对称
      $$
      \begin{align}
      Pr(n_i|f(u)) = \frac{exp(f(n_i)\cdot f(u))}{\prod_{v\in V}exp(f(v)\cdot f(u))}
      \end{align}
      $$
      • 本质上表达的是: 一个节点作为源节点或者邻近节点时都使用同一个特征向量表示
      • 理解: 特征向量的点乘(内积)表示两个点之间的关联程度?

KG——Neo4j使用笔记

本文主要记录Neo4j使用过程中的遇到的问题和解决方案等


安装与启动

安装

  • 安装Neo4j前,一般需要安装Java环境, Ubuntu自带的版本不行的话需要重新下载安装新的版本并设置环境变量
自动安装
  • Ubuntu上安装和卸载命令
    1
    2
    sudo apt-get install neo4j
    sudo apt-get remove neo4j
手动安装
  • 下载neo4j已经编译好的文件, 官网为http://www.neo4j.com/
    • 不要直接点download neo4j, 从官网选中products->neo4j database->拉到最下面选择Community Edition_>选择对应的release版本(.tar文件)下载即可
  • 将下载到的.tar文件解压到安装目录中(一般选择/usr/local/neo4j/)
  • 现已经安装成功,直接进入安装目录即可使用./bin/neo4j console启动neo4j数据库
  • 初始的账户和密码都是neo4j, 第一次登录需要重新设置密码

启动

  • 使用console启动

    1
    ./bin/neo4j console
    • terminal将输出实时运行log信息,关闭terminal,neo4j数据库随之关闭
    • 此时用其他电脑不能访问,只能本地电脑localhost:7474/browser访问
  • 查看启动状态

    1
    ./bin/neo4j status
  • 启动

    1
    ./bin/neo4j start
  • 停止

    1
    ./bin/neo4j stop
  • 默认访问端口: 7474, 比如本地访问网址为: localhost:7474/browser


Neo4J导入Turtle文件

Turtle简介

参考博客: https://blog.csdn.net/u011801161/article/details/78833958

  • Turtle是最常用的RDF序列化方式, 比RDF/XML更紧凑, 可读性比N-Triples更好
  • 其他序列化方式包括:
    • RDF/XML: 用XML格式来表示RDF数据
    • N-Triples: 用多个三元组来表示RDF数据集合,是最直观的表示方法,每一行表示一个三元组,方便机器解析和处理,DBpedia 是按照这个方式来发布数据的
    • RDFa: (The Resource Description Framework in Attributes)
    • JSON-LD

安装neosemantics插件

注意,这里要求neo4j安装方式是手动安装的,(自动安装的neo4j本人找不到/plugins目录),手动安装方式参考前面的安装流程

  • 下载插件release版本, 项目地址: https://github.com/neo4j-labs/neosemantics

    • 注意,下载时一定要查看版本与已经安装的neo4j数据库是否兼容,否则可能造成运行时异常,或者找不到方法名等
  • 按照项目中的README.md安装插件

    • 复制release版本到/plugins目录下

    • 修改/conf/neo4j.conf文件,添加一行

      1
      dbms.unmanaged_extension_classes=semantics.extension=/rdf
    • 重启neo4j服务器

    • 使用call dbms.procedures()测试是否安装成功

导入Turtle文件

  • 导入云端文件

    1
    CALL semantics.importRDF("https://raw.githubusercontent.com/jbarrasa/neosemantics/3.5/docs/rdf/nsmntx.ttl","Turtle")
  • 导入本地文件

    1
    CALL semantics.importRDF("file:///home/jiahong/neosemantics/3.5/docs/rdf/nsmntx.ttl","Turtle")

彻底清空Neo4J数据库

参考链接: https://blog.csdn.net/u012485480/article/details/83088818

使用Cypher语句

  • 直接使用下面的Cypher语句即可
    1
    match (n) detach delete n
特点
  • 无需操作文件
  • 无需停止和启动服务
  • 对于数据量大的情况下删除很慢(需要先查询再删除,内存可能会溢出)

删除数据库文件

  • 停止neo4j服务

    1
    sudo ./bin/neo4j stop
  • 删除./data/databases/graph.db目录

    1
    sudo rm -rf ./data/database/graph.db
  • 启动neo4j服务

    1
    sudo ./bin/neo4j start
特点
  • 需要删除文件操作
  • 需要停止和启动服务
  • 对于数据量大的情况下删除速度也非常快速

Neo4j约束

查看当前数据库中的所有约束

  • Cypher查询语句CQL
    1
    :schema

创建约束

1
create constraint on (p:Person) assert p.name is unique

删除约束

1
drop constraint on (p:Person) assert p.name is unique

节点的唯一性约束

  • 为某个标签在某个属性上创建唯一性约束

    1
    create constraint on (p:Person) assert p.name is unique
    • 创建唯一性约束后会自动为该标签对应的属性创建索引Index
    • 这里的索引为ON :Person(name) ONLINE (for uniqueness constraint)
    • 理解,因为要确保唯一性,所以需要索引加快每次插入节点前检索的效率
    • 手动创建索引(为了加快某个标签的某个属性的检索效率)的方法为:
      1
      create index on :Person(name)
  • 唯一性约束设置后,当写入重复的数据时,会报错

    Neo.ClientError.Schema.ConstraintValidationFailed
    Node(19718935) already exists with label Person and property name = ‘Joe’


节点操作

创建节点

  • 创建结点方式如下,其中p

    1
    create (p:Person{name:'Joe'})
    • 上面的句子创建了一个结点

    • 结点标签为: Person

      • 如果之前没有Person标签则新建Person标签
      • 如果没有添加索引,那么这个标签在所有Person标签的节点都被删除后也会自动消失
      • 如果添加了索引,则删除所有结点和相关索引后该标签会自动消失
    • 结点名称为: p

      • p本质上在这里是一个变量
      • 如果当前执行语句中对当前结点没有更多操作, 甚至可以省略节点名称p
        1
        create (:Person{name: 'Joe'})
    • 结点属性name的值为: ‘Joe’

      • 这个属性很有用, 可以在显示结点的时候直接在结点中显示出来”Joe”, 方便查看
      • 测试: 换成其他属性,比如属性a后, 在Neo4j可视化结点时是不显示的
      • name本身也可以省略
        1
        create (:Person)

删除结点

  • 删除标签为Person且名字为”Joe”的所有结点
    1
    match (p:Person{name:'Joe'}) delete p

标签操作

  • Neo4j中一般为节点创建一个标签即可,通常一些标准的知识图谱还会为同一个节点创建多个标签,说明这个节点属于多个标签
  • 节点的标签数量可以为0个,1个或多个
  • 没有标签的结点可通过id获取到
    1
    match (n) where id(n)=<node-id> return n

直接创建标签

  • 单个标签创建

    1
    create (<node-name>:<label-name>)
  • 多个标签创建

    1
    create (<node-name>:<label-name1>:<label-name2>:...:<label-nameN>)
    • 从很多知识图谱的例子来看,标签之间并不是完全的从属关系
    • 从属关系: Person:Student
    • 并列关系: Man:Student

给已有的节点添加标签

  • 使用set关键字添加标签标签
    1
    match (p:Person{name:'Joe'}) set p:Student

移除已有结点的标签

  • 使用remove关键字删除标签
    1
    match (p:Person{name:'Joe'}) remove p:Student

属性操作

  • 属性操作与标签操作类似, 使用的也是REMOVE和SET指令

直接创建属性

  • 使用CREATE指令
    1
    create (p:Person{name:'Joe'})

添加属性

  • 使用SET指令
    1
    match (p:Person{name:"Joe"}) set p.sex="male"

移除属性

  • 使用REMOVE指令
    1
    match (p:Person{name:"Joe"}) remove p.sex

各种操作命令总结

  • DELETE和CREATE指令用于删除节点和关联关系
  • REMOVE和SET指令用于删除标签和属性

Neo4j同时创建多个数据库

  • Neo4j中无法同时创建多个数据库,但是我们可以通过硬性和软性的方法分别实现等价功能

硬件上实现多个数据库

  • Neo4j的数据库文件为./data/databases/graph.db
    • 我们可以手动修改该文件的名称,然后重新创建文件实现
  • Neo4j的数据库配置文件为./conf/neo4j.conf
    • 可以修改#dbms.active_database=graph.db
    • 修改方法为将注释取消并且修改数据库为对应的数据库名称

软件上实现多个数据库

  • 为不同数据库的每个结点分别指定同一个数据库名称对应的标签
    • 比如”Docker”和”School”分别对应Docker知识图谱和学校知识图谱

Py2neo中结点如何被图识别?

  • 每个Python结点对象都有个唯一的标识符ID
    • 对应属性为identity
    • 对于从Graph中读出的结点,该属性为一个唯一的数值,与图数据库中结点的数值一致
    • 对于Python直接初始化的结点对象,该属性是None
  • 只要identity属性指定了,其他属性与数据库中的结点不同也可以的
    • 使用Graph.push(local_node)可以把本地结点更新到远处数据库中

Neo4j和JVM版本兼容问题

报错:

1
sudo ./bin/neo4j start

ERROR! Neo4j cannot be started using java version 1.8.0_222.

  • Please use Oracle(R) Java(TM) 11, OpenJDK(TM) 11 to run Neo4j.
  • Please see https://neo4j.com/docs/ for Neo4j installation instructions.

解决方案

  • 安装对应版本的Java虚拟机(这里不会修改操作系统中原来的JAVA_HOME)

    1
    2
    sudo yum search jdk
    sudo yum install java-11-openjdk
  • 将对应的JAVA_HONE配置到Neo4j中(不修改原来系统中的JAVA_HOME)

    1
    sudo vim ./conf/neo4j.conf
  • 在文件最后一行添加

JAVA_HOME=/usr/lib/jvm/java-11-openjdk-11.0.5.10-0.el7_7.x86_64

  • 重新启动neo4j

    1
    sudo ./bin/neo4j start
  • 注意: 以上方法都不会影响系统的JAVA_HOME和JAVA环境


Neo4j服务器配置远程访问功能

  • 打开配置文件

    1
    sudo vim ./conf/neo4j.conf
  • 将下面的语句注释取消

dbms.connectors.default_listen_address=0.0.0.0

  • 注意: 如果是服务器有防火墙,则需要把以下端口打开
    • 7474: http
    • 7687: bolt

Neo4j dump数据

  • 参考链接:https://www.jianshu.com/p/8c501b49adb7
  • dump原始数据库为.dump文件
    bin/neo4j-admin dump --database graph.db --to [dir]
    • 将数据库graph.db中的数据dump为.dump文件,文件名字自动生成为对应的数据库名称.dump
  • 将.dump文件导入到库中,(库需要停掉,并且库名不能有相同的名字)
    bin/neo4j-admin load --from graph.db.dump
    • 相当于是dump的逆向操作,数据库文件名称自动生成为前缀(注意不能与已经存在的数据库产生冲突)
  • 当不同版本库相互倒数据时需要把该参数开启,在conf/neo4j.conf中
    dbms.allow_format_migration=true
  • 这个dump命令只有在3.2.0才有的

Neo4j 4.0.0以后

  • 数据库文件夹变化了,不能像之前一样修改文件夹名为graph.db来更改数据库

使用Cypher查询数据库时的效率问题

  • 应该把Cypher语句和MySQL的查询语句联系起来看
  • Cypher从第一句开始匹配,然后依次匹配相关的每一句
  • 如果存在两条匹配过程的路径,而又需要把这两个路径联系起来,那么需要使用WHERE子句
    • WHERE语句非常有用,能避免很多不必要的问题,还能加入与或非的逻辑
  • 但是一定要注意,WHERE语句使用方便,但是容易造成检索慢的问题

举例

  • 尽量不要写出如下语句

    1
    2
    3
    4
    MATCH (image:Image), (base_image:BaseImage) 
    WHERE image.name = "ubuntu:latest" and
    (image)-[:hasBaseImage]->(base_image)
    RETURN base_image
    • 上述语句将匹配所有的Image对象
    • 然后匹配所有BaseImage对象
    • 接着执行WHERE子句过滤
    • 最后再返回
  • 上面的句子可以换成如下语句

    1
    2
    MATCH (image:Image{name:"ubuntu:latest"), (image)-[:hasBaseImage]->(base_image)
    RETURN base_image
    • 上面的句子直接找到名称为”ubuntu:latest“的镜像
    • 然后直接从改对象开始搜索相关关系的BaseImage
    • 最后返回
  • 实验表明:后面一句比前一句速度快很多

Bash——Shell中的环境变量高级解析方法

参考链接:https://www.cnblogs.com/flintlovesam/p/6677037.html


Shell中的${}、##和%%使用范例

  • 假设我们定义了一个变量为:

    1
    file=/dir1/dir2/dir3/my.file.txt
  • 可以用${ }分别替换得到不同的值:

    1
    2
    3
    4
    5
    6
    7
    8
    ${file#*/}: # 删掉第一个 / 及其左边的字符串:dir1/dir2/dir3/my.file.txt
    ${file##*/}: # 删掉最后一个 / 及其左边的字符串:my.file.txt
    ${file#*.}: # 删掉第一个 . 及其左边的字符串:file.txt
    ${file##*.}: # 删掉最后一个 . 及其左边的字符串:txt
    ${file%/*}: # 删掉最后一个 / 及其右边的字符串:/dir1/dir2/dir3
    ${file%%/*}: # 删掉第一个 / 及其右边的字符串:(空值)
    ${file%.*}: # 删掉最后一个 . 及其右边的字符串:/dir1/dir2/dir3/my.file
    ${file%%.*}: # 删掉第一个 . 及其右边的字符串:/dir1/dir2/dir3/my
  • 记忆的方法为:

    • %和#分别在$的右边和左边
    • #是去掉左边(键盘上#在 $ 的左边)
    • %是去掉右边(键盘上% 在$ 的右边)
    • 单一符号是最小匹配;两个符号是最大匹配
      1
      2
      ${file:0:5}: # 提取最左边的 5 个字节:/dir1
      ${file:5:5}: # 提取第 5 个字节右边的连续5个字节:/dir2
  • 也可以对变量值里的字符串作替换:

    1
    2
    ${file/dir/path}: # 将第一个dir 替换为path:/path1/dir2/dir3/my.file.txt
    ${file//dir/path}: # 将全部dir 替换为 path:/path1/path2/path3/my.file.txt
  • 利用 ${ } 还可针对不同的变数状态赋值(沒设定、空值、非空值):

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    ${file-my.file.txt} : # 假如 $file 沒有设定,則使用 my.file.txt 作传回值(空值及非空值時不作处理)
    ${file:-my.file.txt} : # 假如 $file 沒有設定或為空值,則使用 my.file.txt 作傳回值(非空值時不作处理)
    ${file+my.file.txt} : # 假如 $file 設為空值或非空值,均使用 my.file.txt 作傳回值(沒設定時不作处理)
    ${file:+my.file.txt} : # 若 $file 為非空值,則使用 my.file.txt 作傳回值(沒設定及空值時不作处理)
    ${file=my.file.txt} : # 若 $file 沒設定,則使用 my.file.txt 作傳回值,同時將 $file 賦值為 my.file.txt (空值及非空值時不作处理)
    ${file:=my.file.txt} : # 若 $file 沒設定或為空值,則使用 my.file.txt 作傳回值,同時將 $file 賦值為my.file.txt (非空值時不作处理)
    ${file?my.file.txt} : # 若 $file 沒設定,則將 my.file.txt 輸出至 STDERR(空值及非空值時不作处理)
    ${file:?my.file.txt} : # 若 $file 没设定或为空值,则将 my.file.txt 输出至 STDERR(非空值時不作处理)
    ${#var} # 可计算出变量值的长度
    ${#file} # 可得到 27 ,因为/dir1/dir2/dir3/my.file.txt 是27个字节

Bash——Shell编程总结


整体说明

  • Shell 脚本是一种用命令行解释器(Shell)来执行的程序
  • 常用语自动化重复性的任务、编写复杂的程序流程

Shell 脚本执行相关基础

  • 通常,Shell 脚本文件的扩展名是 .sh,但这不是强制的

  • 编写并执行一个 shell 脚本的步骤

  • 第一步,创建文件和编写代码:

    1
    2
    3
    4
    #!/bin/bash
    # 这是一个简单的脚本,用于打印 "Hello, Shell!"

    echo "Hello, Shell!"
    • #!/bin/bash: 这被称为 Shebang
      • 这行代码告诉系统使用 /bin/bash 这个解释器来执行这个脚本(建议指定)
    • #: 井号 # 后的内容是注释
    • echo: 用于在终端上打印文本的命令
  • 第二步,赋予执行权限: 默认情况下,新创建的文件没有执行权限,需要使用 chmod 命令来添加它

    1
    chmod +x hello.sh
    • +x 表示添加执行权限
  • 第三步,运行脚本: 现在,可以通过以下方式运行的脚本

    1
    ./hello.sh
    • ./ 表示在当前目录执行
    • 若没有执行权限的脚本可以使用 sh ./hello.sh 来执行(此时不再使用指定的解释器)
      • Shebang 只有在直接运行脚本时才生效

变量的使用

  • 变量用于存储数据,在 Shell 脚本中,定义变量不需要声明类型

  • 定义变量

    1
    2
    name="Gemini"
    age=25
    • 注意 : 赋值号 = 的两边 不能 有空格
  • 使用变量:要使用变量,需要在变量名前面加上美元符号 $

    1
    2
    3
    4
    5
    6
    7
    #!/bin/bash

    name="Alice"
    echo "我的名字是 $name。"

    # 也可以用大括号括起来,这在某些情况下很有用,比如变量名紧跟在其他文本后面
    echo "我的名字是 ${name},今年 ${age} 岁。"
  • 环境变量的读取:环境变量是系统预设的变量,可以在任何地方访问

    • 例如,$PATH 存储了系统命令的搜索路径,$HOME 存储了用户的主目录
      1
      echo "我的主目录是 $HOME"

特殊变量

  • Shell 提供了一些特殊的变量,用于获取脚本运行时的信息

    变量名 含义
    $0 脚本文件名
    $1, $2, … 命令行参数。$1 是第一个参数,$2 是第二个,以此类推。
    $# 传递给脚本的参数个数
    $* 所有的命令行参数,作为一个字符串
    $@ 所有的命令行参数,每个参数是独立的字符串
    $? 上一个命令的退出状态码。0 表示成功,非 0 表示失败。
    $$ 当前脚本的进程 ID
  • 特殊变量使用示例

    1
    2
    3
    4
    5
    6
    #!/bin/bash

    echo "脚本名称: $0"
    echo "参数个数: $#"
    echo "第一个参数: $1"
    echo "所有参数: $*"
    • 保存为 args.sh,然后运行 bash args.sh apple banana cherry,观察输出
      1
      2
      3
      4
      脚本名称: hello.sh
      参数个数: 3
      第一个参数: apple
      所有参数: apple banana cherry

条件判断(if 语句)

  • if 语句用于根据条件执行不同的代码块

  • if 基本语法

    1
    2
    3
    if [ 条件 ]; then
    # 如果条件为真,执行这里的代码
    fi
  • if 完整语法

    1
    2
    3
    4
    5
    6
    7
    if [ 条件 ]; then
    # 如果条件为真
    elif [ 其他条件 ]; then
    # 如果第一个条件为假,且第二个条件为真
    else
    # 所有条件都为假
    fi
    • 特别注意 : [ 和 ] 之间必须有空格

if 语句相关的常用条件运算符号/表达式

  • 字符串比较:
    • = 或 ==: 字符串相等
    • !=: 字符串不相等
    • -z: 字符串为空
    • -n: 字符串不为空
  • 数字比较:
    • -eq: 相等(Equal)
    • -ne: 不相等(Not Equal)
    • -gt: 大于(Greater Than)
    • -lt: 小于(Less Than)
    • -ge: 大于等于(Greater than or Equal)
    • -le: 小于等于(Less than or Equal)
  • 文件测试:
    • -e: 文件或目录存在
    • -f: 是文件
    • -d: 是目录
    • -r: 可读
    • -w: 可写
    • -x: 可执行
  • 常用条件表达式示例
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    #!/bin/bash

    if [ -f "test.txt" ]; then
    echo "文件 test.txt 存在。"
    else
    echo "文件 test.txt 不存在。"
    fi

    if [ 10 -gt 5 ]; then
    echo "10 大于 5。"
    fi

循环语句

  • 循环用于重复执行一段代码,直到满足特定条件

for 循环

  • for 循环用于遍历列表中的项目
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    #!/bin/bash

    # 遍历一个列表
    for fruit in apple banana cherry; do
    echo "水果: $fruit"
    done

    # 遍历数字范围
    for i in {1..5}; do
    echo "数字: $i"
    done

while 循环

  • 当条件为真时,持续执行循环

    1
    2
    3
    4
    5
    6
    7
    8
    #!/bin/bash

    count=1

    while [ $count -le 5 ]; do
    echo "计数: $count"
    count=$((count + 1)) # 算术运算
    done
    • $((...)) 是 Shell 中的算术扩展,用于执行数学运算

函数

  • 定义函数方式一:

    1
    2
    3
    4
    function my_func {
    # 函数体
    echo "这是一个函数。"
    }
  • 定义函数方式二:

    1
    2
    3
    4
    my_func() {
    # 函数体
    echo "这也是一个函数。"
    }
  • 调用函数

    1
    my_func
  • 函数参数:函数内部的参数使用 $1, $2 等来访问,与脚本的命令行参数类似(定义时不需要指定参数)

    1
    2
    3
    4
    5
    6
    7
    8
    #!/bin/bash

    greet() {
    echo "Hello, $1!"
    return 0
    }

    greet "Bob" # 调用函数并传递参数 "Bob"
    • return 命令可以返回一个退出状态码,通常 0 表示成功,非 0 表示失败

附录:最佳实践(持续更新)

常用实践

  • 为了避免因为文件名中含有空格而导致的错误,始终使用双引号来引用变量
    • 如 "$name"
  • 使用 $((...)) 进行算术运算
    • 例如 $((a+b)),要特别注意是$加双括号
  • 使用 $(...) 来执行命令并获取其输出(注意是$加单括号)
    • 例如 current_date=$(date),这比老式的反引号 `` 更推荐使用

脚本开头的常用设置

  • 在 Shell 脚本的开头,通常会看到一些 set 命令,这些设置可以编写更健壮、更可靠的脚本,有效避免一些常见的错误

  • 通常,一个健壮的 Shell 脚本开头会包含以下几行:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    #!/bin/bash

    # 遇到错误立即退出
    set -e

    # 如果管道中的任何一个命令失败,整个管道命令就失败
    set -o pipefail

    # 打印所有执行的命令,方便调试
    # set -x

    # 未设置的变量会立即报错退出
    set -u
    • 这些命令可以单独使用,也可以组合在一起,比如 set -e -o pipefail 或更简洁的 set -euo pipefail
  • 对常见组合 set -xeo pipefail 的详解,包含了以下几个命令

    • set -e 或 set -o errexit
      • 含义是如果脚本中的任何命令失败(返回非零退出状态),脚本会立即退出
      • 可以防止脚本在遇到错误后继续执行,从而导致意想不到的后果
        • 例如,正在删除一个目录,但 rm 命令失败了,如果没有 -e,脚本会继续执行下面的命令,可能导致数据不一致或更严重的错误
    • set -x 或 set -o xtrace
      • 含义是在执行每个命令之前,先打印该命令及其所有参数
      • 对于 调试 非常有用,当运行脚本时,你会看到每个命令的完整展开形式,帮助你追踪脚本的执行流程和变量的值
        • 在调试完成后,通常会注释掉或删除 -x
    • set -o pipefail
      • 含义是如果管道(|)中的任何一个命令失败,整个管道命令的退出状态就是失败
        • 注:不论前面的管道命令是否成功,后面的管道都会执行,且整个管道的状态由最后一个状态决定
      • 默认情况下,管道命令的退出状态只取决于最后一个命令
        • 问题在于这意味着 command1 | command2,即使 command1 失败了,只要 command2 成功,整个命令依然被认为是成功的
        • pipefail 解决了这个问题,确保你不会忽略管道中间的错误
    • set -u 或 set -o nounset
      • 含义是如果尝试使用一个未设置的变量,脚本会立即报错并退出(注意:如果不设置这个参数是不会报错,也不会退出的,shell 脚本会跳过这行命令)
      • 这行命令可以避免因拼写错误或变量未正确赋值而引发的问题
        • 例如,如果想使用 $user,但无意中写成了 $userr,set -u 会立即提醒你,而不是让脚本使用一个空值继续执行

脚本输出

  • 将脚本的输出重定向到文件,方便调试和后续查看,例如 my_script.sh > log.txt 2>&1
    • 这个命令将脚本的 标准输出 和 标准错误 都重定向到一个文件中
  • > log.txt 2>&1 这个命令可以分解为以下几个部分来理解:
    • 标准输出 (Standard Output, stdout) :文件描述符为 1
    • 标准错误 (Standard Error, stderr) :文件描述符为 2
    • >:标准输出(stdout)重定向
      • 注意这个操作符的定义是标准输出重定向 ,相当于是 1> 的简写
    • 2> 标准错误(stderr)重定向
    • log.txt:这是重定向的目标文件,标准输出的内容将被写入到这个文件中
    • &1:这是一种特殊的写法,表示将文件描述符 2(标准错误)重定向到与文件描述符 1(标准输出)相同的位置
    • 总体来看,这行命令将有两个效果:
      • 将文件描述符 2(标准错误)重定向到与文件描述符 1(标准输出)相同的位置
      • 将 1(标准输出)重定向到文件
  • TLDR:my_script.sh > log.txt 2>&1 表示 执行 my_script.sh,并将所有成功输出和错误输出都发送到 log.txt 文件中

输出脚本同时打印到屏幕

  • 如果想既将输出和错误记录到文件,又同时在屏幕上看到它们,可以使用 tee 命令
  • tee 命令的作用是“分流”输入,它将标准输入的内容复制一份到标准输出,同时写入一个或多个文件

将所有输出都分流

  • 使用 tee 命令,可以这样写:

    1
    my_script.sh 2>&1 | tee log.txt
    • my_script.sh 2>&1:首先,将脚本的标准错误(2)合并到标准输出(1)
      • 这样,所有的成功信息和错误信息都变成了“标准输出”
    • |:这是一个管道符号,它将前一个命令的标准输出作为下一个命令的标准输入
    • tee log.txt:将管道接收到的所有内容(即脚本的所有输出)打印到屏幕(标准输出),同时将其 写入 log.txt 文件

只将标准输出分流,错误输出只打印到屏幕

  • 如果只希望将成功信息记录到文件,而错误信息只在屏幕上显示,可以这样:

    1
    my_script.sh | tee log.txt
    • 在这种情况下
      • 标准输出会通过管道传输给 tee,并同时显示在屏幕和写入文件;
      • 标准错误不会被管道捕获 ,它会直接打印到屏幕上

eval 命令的使用

  • shell 脚本中,eval 是一个用于执行字符串作为命令的内置命令
    • eval 会先对传入的参数进行二次解析 ,然后执行解析后的结果作为 shell 命令
  • 具体来说,eval 的功能包括:
    • 1)将所有参数拼接成一个字符串
    • 2)shell 会对这个字符串进行再次解析(包括变量替换、通配符扩展等)
    • 3)最后执行解析后的命令

eval 命令使用示例

  • 动态执行命令:当命令需要动态生成时(比如包含变量)

    1
    2
    cmd="ls -l"
    eval $cmd # 相当于直接执行 `ls -l`
  • 处理复杂变量引用:特别是多层变量嵌套时

    1
    2
    3
    var1="hello"
    var2="var1"
    eval echo \$$var2 # 输出 hello,相当于 `echo $var1`,并最终等价于 `echo hello`
  • 动态生成变量名:

    1
    2
    3
    4
    5
    for i in 1 2 3; do
    eval "num$i=$i" # 生成变量 num1=1, num2=2, num3=3
    done
    echo $num1 # 输出 1
    echo $num2 # 输出 2

eval 命令使用注意事项

  • eval 会执行任何解析后的命令,使用不当可能带来安全风险(特别是处理用户输入时)

    • 危险示例:下面的示例中,如果 user_input 包含恶意命令(如 ; rm -rf /),会被执行
      1
      2
      user_input="some input"
      eval "echo $user_input"
  • 由于会进行二次解析,可能导致意想不到的结果,建议谨慎使用

  • 复杂场景下,有时可以用函数或数组替代 eval 实现更安全的逻辑

1…404142…66
Joe Zhou

Joe Zhou

Stay Hungry. Stay Foolish.

659 posts
53 tags
GitHub E-Mail
© 2026 Joe Zhou
Powered by Hexo
|
Theme — NexT.Gemini v5.1.4