Python——多线程和多进程


整体说明

  • 在 Python 里,多线程和多进程都可以实现并行处理
  • Python 还提供了方便使用的 ThreadPoolExecutorProcessPoolExecutor 类用于多线程和多进程并行处理
  • 多线程切换和启动任务开销小,但在 Python 中受到 全局解释器锁(GIL) 限制,导致更适合一些 IO 密集型任务
  • 多进程切换和启动任务开销大,但在 Python 中不受 全局解释器锁(GIL) 限制,更适合一些 CPU 密集型任务

多线程(Multithreading)

  • 多线程是在同一个进程内运行多个线程,这些线程共享进程的内存空间
  • 但在 Python 中,由于 全局解释器锁(GIL) 的存在,在 CPU 密集型任务中,多线程无法充分利用多核 CPU 的优势 ,更适合 I/O 密集型任务,像网络请求、文件读写这类
    • 注:这是 Python 特有的问题( 全局解释器锁(GIL))其他语言没有这个问题
  • 多线程的示例代码(原生形式):
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    import threading
    import time

    def io_task(task_id):
    print(f"Task {task_id} starts")
    time.sleep(1) # 模拟非计算密集型的 I/O 操作
    print(f"Task {task_id} ends")

    if __name__ == "__main__":
    start_time = time.time()
    threads = []

    # 创建并启动线程
    for i in range(3):
    thread = threading.Thread(target=io_task, args=(i,))
    threads.append(thread)
    thread.start() # 启动单个线程

    # 等待所有线程完成
    for thread in threads:
    thread.join() # 等待单个线程完成
    print(f"Total time taken: {time.time() - start_time:.2f} seconds")

    # Task 0 starts
    # Task 1 starts
    # Task 2 starts
    # Task 1 ends
    # Task 2 ends
    # Task 0 ends
    # Total time taken: 1.01 seconds

多进程(Multiprocessing)

  • 多进程是指运行多个独立的进程,每个进程都有自己独立的内存空间
  • 多进程不受 GIL 的限制 ,能够充分发挥多核 CPU 的性能 ,所以更适合 CPU 密集型任务 ,例如科学计算、图像处理等
  • 与多线程相比,除了将 threading.Thread 替换成 multiprocessing.Process,用法几乎一模一样
  • 多进程的示例代码:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    import multiprocessing
    import time

    def cpu_task(task_id):
    print(f"Task {task_id} starts,Process ID: {multiprocessing.current_process().pid}")
    sum([i * i for i in range(10**7)]) # 模拟 CPU 密集型操作
    print(f"Task {task_id} ends")

    if __name__ == "__main__":
    start_time = time.time()
    processes = []
    # 创建并启动多个进程
    for i in range(3):
    process = multiprocessing.Process(target=cpu_task, args=(i,))
    processes.append(process)
    process.start() # 启动单个进程
    # 等待所有进程完成
    for process in processes:
    process.join() # 等待单个进程完成
    print(f"Total time taken: {time.time() - start_time:.2f} seconds")

    # Task 0 starts,Process ID: 70140
    # Task 1 starts,Process ID: 70141
    # Task 2 starts,Process ID: 70142
    # Task 0 ends
    # Task 1 ends
    # Task 2 ends
    # Total time taken: 2.03 seconds

ProcessPoolExecutor 和 ThreadPoolExecutor 的使用

  • 这两个类都位于 concurrent.futures 模块中,为我们提供了更高级的异步执行接口

    • ThreadPoolExecutor :用于多线程编程
    • ProcessPoolExecutor :用于多进程编程
  • 它们都提供了 submit()map() 方法,还能通过 with 语句来自动管理资源

    • submit():单个任务提交
    • map():批量任务提交
  • 下面是使用这两个类的示例代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    import concurrent.futures
    import time

    def io_task(args):
    task_id, content = args
    print(f"IO Task {task_id} starts")
    time.sleep(1)
    print(f"IO Task received the content: '{content}'")
    print(f"IO Task {task_id} ends")
    return args

    def cpu_task(args):
    task_id, content = args
    print(f"CPU Task {task_id} starts")
    sum([i * i for i in range(10**7)])
    print(f"IO Task received the content: '{content}'")
    print(f"CPU Task {task_id} ends")
    return args

    def io_task_multi_args(task_id, content):
    print(f"IO Task {task_id} starts")
    time.sleep(1)
    print(f"IO Task received the content: '{content}'")
    print(f"IO Task {task_id} ends")
    return task_id, content

    def cpu_task_multi_args(task_id, content):
    print(f"CPU Task {task_id} starts")
    sum([i * i for i in range(10**7)])
    print(f"IO Task received the content: '{content}'")
    print(f"CPU Task {task_id} ends")
    return task_id, content

    if __name__ == "__main__":
    print("== 单参数形式:")
    print("=== ThreadPoolExecutor Demo ===")
    start_time = time.time()
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    params = [(1, "content1"), (2, "content2"), (3, "content3")]
    results = list(executor.map(io_task, params, timeout=10))
    print(f"Total time taken for IO tasks: {time.time() - start_time:.2f} seconds, results={results}")

    print("\n=== ProcessPoolExecutor Demo ===")
    start_time = time.time()
    with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
    params = [(1, "content1"), (2, "content2"), (3, "content3")]
    results = list(executor.map(cpu_task, params, timeout=10))
    print(f"Total time taken for CPU tasks: {time.time() - start_time:.2f} seconds, results={results}")


    print("== 多参数形式:")
    print("=== ThreadPoolExecutor Demo ===")
    start_time = time.time()
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    params = [(1, "content1"), (2, "content2"), (3, "content3")]
    futures = list(executor.submit(io_task_multi_args, *param) for param in params) # 需要使用 lambda 关键字定义新函数
    for future in concurrent.futures.as_completed(futures, timeout=10):
    print(f"future result: {future.result()}")
    print(f"Total time taken for IO tasks: {time.time() - start_time:.2f} seconds")

    print("\n=== ProcessPoolExecutor Demo ===")
    start_time = time.time()
    with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
    params = [(1, "content1"), (2, "content2"), (3, "content3")]
    futures = list(executor.submit(io_task_multi_args, *param) for param in params) # 需要使用 lambda 关键字定义新函数
    for future in concurrent.futures.as_completed(futures, timeout=10):
    print(f"future result: {future.result()}")
    print(f"Total time taken for CPU tasks: {time.time() - start_time:.2f} seconds")

    # == 单参数形式:
    # === ThreadPoolExecutor Demo ===
    # IO Task 1 startsIO Task 2 starts
    #
    # IO Task 3 starts
    # IO Task received the content: 'content1'IO Task received the content: 'content2'
    # IO Task 2 ends
    # IO Task received the content: 'content3'
    # IO Task 3 ends
    #
    # IO Task 1 ends
    # Total time taken for IO tasks: 1.01 seconds, results=[(1, 'content1'), (2, 'content2'), (3, 'content3')]
    #
    # === ProcessPoolExecutor Demo ===
    # CPU Task 1 starts
    # CPU Task 2 starts
    # CPU Task 3 starts
    # IO Task received the content: 'content2'
    # CPU Task 2 ends
    # IO Task received the content: 'content3'
    # CPU Task 3 ends
    # IO Task received the content: 'content1'
    # CPU Task 1 ends
    # Total time taken for CPU tasks: 1.13 seconds, results=[(1, 'content1'), (2, 'content2'), (3, 'content3')]
    # == 多参数形式:
    # === ThreadPoolExecutor Demo ===
    # IO Task 1 starts
    # IO Task 2 starts
    # IO Task 3 starts
    # IO Task received the content: 'content3'
    # IO Task 3 ends
    # future result: (3, 'content3')
    # IO Task received the content: 'content2'
    # IO Task 2 ends
    # future result: (2, 'content2')
    # IO Task received the content: 'content1'
    # IO Task 1 ends
    # future result: (1, 'content1')
    # Total time taken for IO tasks: 1.01 seconds
    #
    # === ProcessPoolExecutor Demo ===
    # IO Task 1 starts
    # IO Task 2 starts
    # IO Task 3 starts
    # IO Task received the content: 'content1'
    # IO Task 1 ends
    # future result: (1, 'content1')
    # IO Task received the content: 'content2'
    # IO Task 2 ends
    # IO Task received the content: 'content3'
    # IO Task 3 ends
    # future result: (2, 'content2')
    # future result: (3, 'content3')
    # Total time taken for CPU tasks: 1.46 seconds
  • 特别说明:concurrent.futures.as_completed() 是 Python 标准库中的一个函数,用于处理异步执行的多个任务

    • 输入:接收一个包含 Future 对象的可迭代对象(如列表)
    • 返回:一个迭代器,该迭代器会在每个 Future 完成时立即返回它的结果(按完成顺序,而非提交顺序)

multiprocessing.spawn 用法

  • multiprocessing.spawn 是 Python multiprocessing 模块中用于启动子进程的方法,适用于需要在新进程中执行函数的场景,尤其在分布式训练(如 PyTorch 多GPU训练)中常用

  • spawn 会创建全新的Python解释器进程,每个子进程独立运行,拥有独立的内存空间,避免了多线程中的全局解释器锁(GIL)限制

  • 基本用法示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    import multiprocessing as mp

    # 子进程要执行的函数
    def worker_function(rank, world_size, shared_arg):
    print(f"子进程 {rank}/{world_size} 启动,参数: {shared_arg}")
    # 子进程的具体逻辑...

    def main():
    # 总进程数(例如等于GPU数量)
    world_size = 4
    # 共享参数(会被传递给每个子进程)
    shared_arg = "hello from main"

    # 启动子进程
    # args 是传递给 worker_function 的参数元组(除了 rank 之外的参数)
    mp.spawn(
    worker_function, # 子进程执行的函数
    args=(world_size, shared_arg), # 传递给函数的参数(第一个参数固定为 rank)
    nprocs=world_size, # 子进程数量
    join=True # 是否等待所有子进程结束后再继续
    )
    print("所有子进程执行完毕")

    if __name__ == "__main__":
    # 在Windows系统中必须放在 if __name__ == "__main__" 下
    mp.set_start_method("spawn") # 显式指定启动方法(可选,默认可能为fork)
    main()
    • fn:子进程要执行的函数(第一个参数必须是 rank,表示进程编号,从0开始)
    • args:传递给函数的额外参数(元组形式)
    • nprocs:要启动的子进程数量
    • join:若为 True,主进程会等待所有子进程执行完毕再继续
    • daemon:是否将子进程设为守护进程(主进程退出时自动终止子进程)
  • 注意:多线程没有完全对应的API ,只能通过 threading.Thread 实现类似的多线程启动逻辑


使用 subprocess.Popen 函数启动进程

  • subprocess.Popen 是 Python 标准库 subprocess 模块中的一个类,用于启动一个新的进程
  • subprocess.Popen 可以连接到其输入/输出/错误管道,并获得其返回码
  • subprocess.Popen 为更复杂的进程管理提供了灵活的接口,是替代 os.systemos.spawn*os.popen* 等旧有方法的推荐方式

基本用法即常用参数说明

  • 用法说明

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    import subprocess

    p = subprocess.Popen(
    args,
    bufsize=-1,
    executable=None,
    stdin=None,
    stdout=None,
    stderr=None,
    preexec_fn=None,
    close_fds=True,
    shell=False,
    cwd=None,
    env=None,
    universal_newlines=False,
    startupinfo=None,
    creationflags=0
    )
  • subprocess.Popen 常用参数详细说明见下文

  • args

    • 字符串或序列(如列表)
    • 指定要执行的命令及其参数- 当 shell=False 时,推荐使用列表,如 ['ls', '-l']
      • shell=True 时,通常传递字符串,如 "ls -l"
    • 注:如果命令中包含空格或特殊字符,建议使用列表方式,避免命令解析错误
  • bufsize

    • 整数
    • 设置缓冲策略
      • 0:无缓冲(直接读写)
      • 1:行缓冲(文本模式下有效)
      • 其他正整数:指定缓冲区大小(以字节为单位)
      • -1(默认):使用系统默认缓冲策略
    • 通常用法 :一般保持默认即可,特殊需求时才设置
  • executable

    • 字符串
    • 指定要执行的程序的路径(用于替换默认可执行文件)
      • 例如,executable="/usr/bin/python3" 可以强制使用指定的解释器
  • stdin, stdout, stderr

    • 取值可以是文件对象、文件描述符、subprocess.PIPEsubprocess.DEVNULLNonesubprocess.STDOUT
    • 分别指定子进程的标准输入、输出、错误
      • None(默认):继承父进程的对应流
      • subprocess.PIPE:创建管道,允许父进程与子进程通信
      • subprocess.DEVNULL:丢弃输入/输出
      • 文件对象:如 open('output.txt', 'w'),将输出写入文件
    • 举例:
      • stdout=subprocess.PIPE 表示捕获标准输出,后续可通过 p.communicate() 返回读取;
      • stderr=subprocess.PIPE 表示捕获标准错误
      • stdout=fp 将标准输出写到指定文件中(常用 a 追加形式打开文件 fp
      • stderr=subprocess.STDOUT 将子进程的标准错误(stderr)也重定向到标准输出(stdout),即错误信息也写入日志文件
  • preexec_fn

    • 可调用对象(函数)
    • 在子进程启动前执行指定的函数(仅限Unix)
    • 比如设置进程组、修改环境等
    • 注:在Windows平台无效
  • close_fds

    • 布尔值
    • 是否在子进程中关闭除 stdin/stdout/stderr 以外的所有文件描述符
      • 默认:True(Unix),False(Windows)
    • 通常建议保持默认,除非有特殊文件句柄传递需求
      • 默认定义是:close_fds: bool = ... 表示 bool 类型的占位,在实际调用 subprocess.Popen 时,close_fds 的默认值由具体的实现决定(如在 Unix 下默认 True,Windows 下默认 False
      • 也可以在调用时显式传递 close_fds=Trueclose_fds=False
  • shell

    • 布尔值
    • 是否通过 shell 运行命令
      • True:命令通过 shell 解析(如 /bin/shcmd.exe),可用 shell 特性(如重定向、管道)
      • False(默认):直接执行指定的程序
    • 安全提示shell=True 存在命令注入风险,处理外部输入时需谨慎
  • cwd

    • 字符串
    • 指定子进程的工作目录
    • 举例:cwd="/tmp" 表示子进程在 /tmp 目录下运行
  • env

    • 字典
    • 设置子进程的环境变量
      • 若为 None,则继承父进程环境
      • 可自定义环境变量,如:env={"PATH": "/usr/bin", "USER": "test"}
    • 注:未指定的变量将丢失,需包含必需的环境变量
  • universal_newlines / text

    • 布尔值
    • 指明子进程是否以文本模式处理输入输出(自动编码/解码)
      • True:与子进程通信时,输入输出为字符串(str),自动处理换行
      • False(默认):以字节流处理(bytes
    • 处理文本数据时设为 Truetext=True
    • 注:Python 3.7 以后,建议使用 text 参数替代 universal_newlines 参数
      • 虽然 universal_newlines 依然还在,但不建议使用
      • textuniversal_newlines 是等价参数,不能同时设置;如果同时传递,会抛出 ValueError 异常
  • startupinfo, creationflags(仅Windows)

    • startupinfo:用于指定进程启动信息(如窗口显示方式)
    • creationflags:用于指定进程创建标志(如 subprocess.CREATE_NEW_CONSOLE
  • 其他参数

    • restore_signals(3.2+):是否恢复信号处理(Unix)
    • start_new_session(3.2+):是否在新会话中启动进程(Unix)

用法示例

  • 最简单的用法:

    1
    2
    3
    import subprocess

    p = subprocess.Popen(['ls', '-l']) # 启动进程并执行,此外不做任何操作
  • 捕获输出:

    1
    2
    3
    4
    5
    import subprocess

    p = subprocess.Popen(['ls', '-l'], stdout=subprocess.PIPE) # 启动进程并执行,同时创建管道,允许父进程与子进程通信
    out, err = p.communicate() # 与主进程通信,并返回执行结果信息
    print(out.decode()) # 若没有 stdout=subprocess.PIPE,这里的 out 是 None,不可以执行 decode() 命令
  • 通过 shell 运行命令:

    1
    2
    3
    4
    import subprocess

    p = subprocess.Popen("echo Hello World", shell=True) # 启动进程并执行,此外不作任何操作,这里没有任何输出
    out, err = p.communicate() # 与主进程通信,由于没有 stdout=subprocess.PIPE 和 stderr=subprocess.PIPE,out 和 err 均为 None
  • 传递输入:

    1
    2
    3
    4
    5
    import subprocess

    p = subprocess.Popen(['cat'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) # 启动进程并执行,cat命令启动进入等待输入,cat 命令启动后,接下来每次输入数据都会被原样输出
    out, err = p.communicate(input=b'Hello\n') # 与主进程通信,并返回执行结果信息
    print(out.decode())
  • 管道操作示例:

    1
    2
    3
    4
    5
    6
    7
    import subprocess

    p1 = subprocess.Popen(['ls'], stdout=subprocess.PIPE)
    p2 = subprocess.Popen(['grep', 'py'], stdin=p1.stdout, stdout=subprocess.PIPE)
    p1.stdout.close()
    out, err = p2.communicate()
    print(out.decode())

Popen 类对象相关常用方法汇总

  • communicate(input=None, timeout=None)
    • 与子进程交互,发送数据到 stdin,读取 stdout 和 stderr,等待进程结束
  • wait(timeout=None)
    • 等待子进程结束,返回退出码
  • poll()
    • 检查子进程是否结束,未结束返回 None
  • terminate()
    • 终止子进程(发送 SIGTERM)
  • kill()
    • 强制杀死子进程(发送 SIGKILL)
  • pid
    • 子进程的进程号
  • returncode
    • 子进程的返回码

使用注意事项

  • 如果需要捕获输出,记得设置 stdout=subprocess.PIPE 和/或 stderr=subprocess.PIPE,否则无法通过 communicate() 获取输出
  • 使用 shell=True 时,命令通常以字符串形式传递,并注意安全风险(如命令注入)
  • 如果子进程输出很大,建议及时读取输出,避免死锁,详细理解见附录
  • Python 3.5+ 推荐用 subprocess.run 简化常见用法,但 Popen 适合更复杂的场景

附录:subprocess.Popen 死锁情况分析

  • 问题说明:当子进程输出数据量很大时,父进程必须及时读取这些数据,否则操作系统管道缓冲区会被写满,导致子进程和父进程互相等待,程序陷入死锁

  • 管道缓冲区有限 ,当用 subprocess.PIPE 捕获子进程的标准输出(stdout)或标准错误(stderr)时,父进程和子进程之间通过一个操作系统管道通信

  • 这个管道是有缓冲区大小限制的(通常几 KB 到几十 KB,依赖操作系统)

  • 如果子进程输出的数据量超过了缓冲区大小,而父进程没有及时读取这些数据,缓冲区会被写满

  • 当缓冲区被写满后,子进程会被阻塞,无法继续写入新的输出(即子进程暂停在写操作)

  • 如果此时父进程又在等待子进程结束(比如调用 wait()communicate()),但没有及时读取管道内容,父子进程就会互相等待,导致死锁

    • 子进程等着缓冲区有空间继续输出;
    • 父进程等着子进程结束,却没读取缓冲区内容;
    • 结果两者都无法继续执行,程序卡死
  • 问题代码示例:

    1
    2
    3
    4
    5
    import subprocess

    p = subprocess.Popen(['cat ./big_document.txt'], stdout=subprocess.PIPE)
    p.wait() # 只等子进程结束,不读取输出
    output = p.stdout.read() # 这一步可能永远无法执行到
  • 正确用法建议:及时读取输出 ,常用的做法是用 communicate(),它会在等待子进程结束的同时,自动持续读取所有输出,防止缓冲区写满:

    1
    2
    3
    4
    import subprocess

    p = subprocess.Popen(['cat ./big_document.txt'], stdout=subprocess.PIPE)
    output, _ = p.communicate() # 推荐做法,自动避免死锁
  • 如果需要实时处理输出,可以循环读取:

    1
    2
    3
    4
    5
    6
    import subprocess

    p = subprocess.Popen(['cat ./big_document.txt'], stdout=subprocess.PIPE)
    for line in p.stdout:
    process(line) # 逐行处理,及时清空缓冲区
    p.wait()

附录:使用 subprocess.call 函数启动进程

  • subprocess.call 可用于启动进程执行外部命令,是高层封装的函数,本质上是 Popen 的简化接口
    • 它内部会创建 Popen 对象并等待命令执行完成,返回命令的退出码
    • 适用于简单场景,只需知道是否成功(退出码),无需复杂交互
  • subprocess.Popen 是底层核心类,提供最完整的功能和灵活性
    • 它直接创建进程对象,允许用户与子进程进行复杂交互(如输入/输出处理、异步执行等)
    • 适合需要精细控制的场景

subprocess.call 的使用示例

  • call()阻塞式的:调用后会等待命令执行完毕才返回,返回值是命令的退出码(0 表示成功),示例:

    1
    2
    import subprocess
    ret_code = subprocess.call(["echo", "hello"]) # 等待命令完成,返回 0
  • 注:Popen 默认是非阻塞式的:创建进程后立即返回 Popen 对象,不会等待命令结束(需显式调用 wait()communicate() 等待完成),示例如下:

    1
    2
    3
    import subprocess
    proc = subprocess.Popen(["echo", "hello"]) # 立即返回,不等待
    ret_code = proc.wait() # 手动等待命令完成,获取退出码
  • 注:Popen 支持更多高级操作,而 call() 不支持,比如

    • Popen 支持输入/输出重定向(通过 stdin/stdout/stderr 与子进程交互),但 call 不支持:

      1
      2
      3
      # 捕获命令输出
      proc = subprocess.Popen(["ls"], stdout=subprocess.PIPE)
      output, _ = proc.communicate() # 获取输出
    • Popen 支持异步执行,可以在命令运行时做其他事情,再回头处理结果

    • Popen 支持信号处理,可通过 send_signal() 向子进程发送信号(如终止进程)

    • Popen 支持管道操作,可多个 Popen 对象可通过管道连接(类似 Linux 管道 |

使用 subprocess.check_all 启动进程并监控失败异常

  • subprocess.check_call(args, ...) 执行指定的命令,等待命令运行结果的返回码(return code),同时还具备 call 没有的抛出异常功能
    • 如果命令执行成功(返回码为 0),则无返回值(或说返回 0);
    • 如果命令执行失败(返回码非 0),则会抛出 subprocess.CalledProcessError 异常(call 不会抛出异常)
    • 示例:
      1
      2
      3
      4
      5
      6
      import subprocess
      try:
      subprocess.check_call(["ls", "-l"]) # 执行 ls -l 命令
      print("命令执行成功")
      except subprocess.CalledProcessError as e:
      print(f"命令执行失败,返回码:{e.returncode}")

使用 subprocess.check_output 启动进程、监控失败异常,并读取输出结果

  • subprocess.check_output(args, ...) 执行指定的命令,并返回命令的输出结果(stdout),同时还具备 call 没有的抛出异常功能

    • 如果命令执行成功(返回码为 0),返回输出内容(字节串,可通过 text=True 参数转为字符串);
    • 如果命令执行失败(返回码非 0),则会抛出 subprocess.CalledProcessError 异常
    • 示例:
      1
      2
      3
      4
      5
      6
      import subprocess
      try:
      result = subprocess.check_output(["echo", "hello"], text=True)
      print(f"命令输出:{result.strip()}") # 输出:hello
      except subprocess.CalledProcessError as e:
      print(f"命令执行失败,返回码:{e.returncode}")
  • subprocess.check_call(args, ...) 的区别为,subprocess.check_output(args, ...) 返回值为输出结果而不是执行状态(返回码, return code)


附录:multiprocessing.Pool 的用法

  • 使用 multiprocessing.Pool 的示例
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    import multiprocessing
    import time
    import random

    # 定义一个待并行执行的任务函数
    def process_task(x):
    time.sleep(random.uniform(0.1, 0.5)) # 模拟任务耗时
    result = x * x
    print(f"处理 {x} -> {result} (进程ID: {multiprocessing.current_process().pid})")
    return result

    def main():
    # 生成待处理的数据
    data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    print(f"原始数据: {data}\n")

    # 创建进程池(默认使用 CPU 核心数,也可指定 processes 参数)
    with multiprocessing.Pool(processes=4) as pool: # 推荐使用 with 语句管理进程池的资源
    # 使用 map:阻塞式,返回列表(按输入顺序)
    print("=== 使用 pool.map ===")
    map_results = pool.map(process_task, data)
    print(f"map 结果: {map_results}\n")

    # 使用 imap:非阻塞式,返回迭代器(按输入顺序,逐步获取结果),注:返回的迭代器只能遍历一次,且遍历过程中会阻塞等待对应任务完成
    print("=== 使用 pool.imap ===")
    imap_iter = pool.imap(process_task, data)
    # 迭代获取结果(每次迭代会阻塞直到对应任务完成,完成一个返回一个)
    imap_results = [res for res in imap_iter] # 注:返回结果是有序的,与 data 的序一致
    print(f"imap 结果: {imap_results}\n")

    # imap 高阶用法,可指定单次分配给进程的任务数量,可避免多次分配 (下面的代码每批给进程池分配 50 个任务)
    imap_iter = pool.imap(process_task, data, chunksize=50) # 默认 chunksize=1
    # for 循环读取方式(读取到的结果是有序的,与输入数据 data 的顺序一致),注:pool.imap(process_task, data) 的结果也可以这样读取
    for i, result in enumerate(imap_iter):
    if i < 5:
    print(result) # 依次输出:1,2,3,4,5
    else:
    break

    # 使用 imap_unordered:非阻塞式,返回迭代器(按任务完成顺序),注:返回的迭代器只能遍历一次,且遍历过程中会阻塞等待对应任务完成
    print("=== 使用 pool.imap_unordered ===")
    imap_unordered_iter = pool.imap_unordered(process_task, data)
    unordered_results = [res for res in imap_unordered_iter]
    print(f"imap_unordered 结果(无序): {unordered_results}\n")

    # 使用 apply:单次提交任务(阻塞式,适合单个任务)
    print("=== 使用 pool.apply ===")
    single_result = pool.apply(process_task, args=(100,)) # 传入单个参数
    print(f"apply 单个结果: {single_result}\n")

    # 使用 starmap:处理多参数任务(类似 map,但支持元组拆包)
    print("=== 使用 pool.starmap ===")
    # 定义一个多参数函数
    def multi_param_task(a, b):
    return a + b
    # 数据为元组列表(每个元组对应一组参数)
    multi_data = [(1, 2), (3, 4), (5, 6)]
    starmap_results = pool.starmap(multi_param_task, multi_data)
    print(f"starmap 结果: {starmap_results}")

    # 使用 starmap_async:异步版本的starmap(非阻塞)
    print("=== 使用 pool.starmap_async ===")
    # 提交异步任务,立即返回AsyncResult对象,不阻塞主进程
    async_result = pool.starmap_async(multi_param_task, multi_data)
    # 可以在这里执行其他操作(演示非阻塞特性)
    print("等待异步任务完成...")
    time.sleep(0.5) # 模拟主进程其他工作
    # 获取结果(get()方法会阻塞直到任务完成)
    starmap_async_results = async_result.get()
    print(f"starmap_async 结果: {starmap_async_results}")

    if __name__ == "__main__":
    main()

相关核心函数说明

  • pool.map(func, iterable)
    • 阻塞式:等待所有任务完成后返回结果列表
    • 结果顺序与输入 iterable 一致
    • 适合简单的单参数任务
  • pool.imap(func, iterable)
    • 非阻塞式:返回一个迭代器,可逐步获取结果(迭代时会阻塞直到对应任务完成)
    • 结果顺序与输入一致,适合处理大量数据时节省内存(无需等待全部完成)
  • pool.imap_unordered(func, iterable)
    • 非阻塞式:返回迭代器,但结果顺序与任务完成顺序一致(不保证输入顺序)
    • 适合对结果顺序无要求的场景,可更快获取部分结果
  • pool.apply(func, args)
    • 阻塞式:单次提交一个任务,args 为函数参数
    • 适合偶尔提交单个任务,效率较低(不建议批量使用)
  • pool.starmap(func, iterable_of_tuples)
    • 类似 map,但支持多参数函数:iterable_of_tuples 中的每个元组会被拆分为函数的参数(如 (a,b) 对应 func(a,b)

注意事项

  • 进程池使用 with 语句可自动关闭,无需手动调用 pool.close()pool.join()
  • imapimap_unordered 返回的迭代器只能遍历一次,且遍历过程中会阻塞等待对应任务完成