整体说明
- 在 Python 里,多线程和多进程都可以实现并行处理
- Python 还提供了方便使用的
ThreadPoolExecutor和ProcessPoolExecutor类用于多线程和多进程并行处理 - 多线程切换和启动任务开销小,但在 Python 中受到 全局解释器锁(GIL) 限制,导致更适合一些 IO 密集型任务
- 多进程切换和启动任务开销大,但在 Python 中不受 全局解释器锁(GIL) 限制,更适合一些 CPU 密集型任务
多线程(Multithreading)
- 多线程是在同一个进程内运行多个线程,这些线程共享进程的内存空间
- 但在 Python 中,由于 全局解释器锁(GIL) 的存在,在 CPU 密集型任务中,多线程无法充分利用多核 CPU 的优势 ,更适合 I/O 密集型任务,像网络请求、文件读写这类
- 注:这是 Python 特有的问题( 全局解释器锁(GIL))其他语言没有这个问题
- 多线程的示例代码(原生形式):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30import threading
import time
def io_task(task_id):
print(f"Task {task_id} starts")
time.sleep(1) # 模拟非计算密集型的 I/O 操作
print(f"Task {task_id} ends")
if __name__ == "__main__":
start_time = time.time()
threads = []
# 创建并启动线程
for i in range(3):
thread = threading.Thread(target=io_task, args=(i,))
threads.append(thread)
thread.start() # 启动单个线程
# 等待所有线程完成
for thread in threads:
thread.join() # 等待单个线程完成
print(f"Total time taken: {time.time() - start_time:.2f} seconds")
# Task 0 starts
# Task 1 starts
# Task 2 starts
# Task 1 ends
# Task 2 ends
# Task 0 ends
# Total time taken: 1.01 seconds
多进程(Multiprocessing)
- 多进程是指运行多个独立的进程,每个进程都有自己独立的内存空间
- 多进程不受 GIL 的限制 ,能够充分发挥多核 CPU 的性能 ,所以更适合 CPU 密集型任务 ,例如科学计算、图像处理等
- 与多线程相比,除了将
threading.Thread替换成multiprocessing.Process,用法几乎一模一样 - 多进程的示例代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28import multiprocessing
import time
def cpu_task(task_id):
print(f"Task {task_id} starts,Process ID: {multiprocessing.current_process().pid}")
sum([i * i for i in range(10**7)]) # 模拟 CPU 密集型操作
print(f"Task {task_id} ends")
if __name__ == "__main__":
start_time = time.time()
processes = []
# 创建并启动多个进程
for i in range(3):
process = multiprocessing.Process(target=cpu_task, args=(i,))
processes.append(process)
process.start() # 启动单个进程
# 等待所有进程完成
for process in processes:
process.join() # 等待单个进程完成
print(f"Total time taken: {time.time() - start_time:.2f} seconds")
# Task 0 starts,Process ID: 70140
# Task 1 starts,Process ID: 70141
# Task 2 starts,Process ID: 70142
# Task 0 ends
# Task 1 ends
# Task 2 ends
# Total time taken: 2.03 seconds
ProcessPoolExecutor 和 ThreadPoolExecutor 的使用
这两个类都位于
concurrent.futures模块中,为我们提供了更高级的异步执行接口- ThreadPoolExecutor :用于多线程编程
- ProcessPoolExecutor :用于多进程编程
它们都提供了
submit()和map()方法,还能通过with语句来自动管理资源submit():单个任务提交map():批量任务提交
下面是使用这两个类的示例代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123import concurrent.futures
import time
def io_task(args):
task_id, content = args
print(f"IO Task {task_id} starts")
time.sleep(1)
print(f"IO Task received the content: '{content}'")
print(f"IO Task {task_id} ends")
return args
def cpu_task(args):
task_id, content = args
print(f"CPU Task {task_id} starts")
sum([i * i for i in range(10**7)])
print(f"IO Task received the content: '{content}'")
print(f"CPU Task {task_id} ends")
return args
def io_task_multi_args(task_id, content):
print(f"IO Task {task_id} starts")
time.sleep(1)
print(f"IO Task received the content: '{content}'")
print(f"IO Task {task_id} ends")
return task_id, content
def cpu_task_multi_args(task_id, content):
print(f"CPU Task {task_id} starts")
sum([i * i for i in range(10**7)])
print(f"IO Task received the content: '{content}'")
print(f"CPU Task {task_id} ends")
return task_id, content
if __name__ == "__main__":
print("== 单参数形式:")
print("=== ThreadPoolExecutor Demo ===")
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
params = [(1, "content1"), (2, "content2"), (3, "content3")]
results = list(executor.map(io_task, params, timeout=10))
print(f"Total time taken for IO tasks: {time.time() - start_time:.2f} seconds, results={results}")
print("\n=== ProcessPoolExecutor Demo ===")
start_time = time.time()
with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
params = [(1, "content1"), (2, "content2"), (3, "content3")]
results = list(executor.map(cpu_task, params, timeout=10))
print(f"Total time taken for CPU tasks: {time.time() - start_time:.2f} seconds, results={results}")
print("== 多参数形式:")
print("=== ThreadPoolExecutor Demo ===")
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
params = [(1, "content1"), (2, "content2"), (3, "content3")]
futures = list(executor.submit(io_task_multi_args, *param) for param in params) # 需要使用 lambda 关键字定义新函数
for future in concurrent.futures.as_completed(futures, timeout=10):
print(f"future result: {future.result()}")
print(f"Total time taken for IO tasks: {time.time() - start_time:.2f} seconds")
print("\n=== ProcessPoolExecutor Demo ===")
start_time = time.time()
with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
params = [(1, "content1"), (2, "content2"), (3, "content3")]
futures = list(executor.submit(io_task_multi_args, *param) for param in params) # 需要使用 lambda 关键字定义新函数
for future in concurrent.futures.as_completed(futures, timeout=10):
print(f"future result: {future.result()}")
print(f"Total time taken for CPU tasks: {time.time() - start_time:.2f} seconds")
# == 单参数形式:
# === ThreadPoolExecutor Demo ===
# IO Task 1 startsIO Task 2 starts
#
# IO Task 3 starts
# IO Task received the content: 'content1'IO Task received the content: 'content2'
# IO Task 2 ends
# IO Task received the content: 'content3'
# IO Task 3 ends
#
# IO Task 1 ends
# Total time taken for IO tasks: 1.01 seconds, results=[(1, 'content1'), (2, 'content2'), (3, 'content3')]
#
# === ProcessPoolExecutor Demo ===
# CPU Task 1 starts
# CPU Task 2 starts
# CPU Task 3 starts
# IO Task received the content: 'content2'
# CPU Task 2 ends
# IO Task received the content: 'content3'
# CPU Task 3 ends
# IO Task received the content: 'content1'
# CPU Task 1 ends
# Total time taken for CPU tasks: 1.13 seconds, results=[(1, 'content1'), (2, 'content2'), (3, 'content3')]
# == 多参数形式:
# === ThreadPoolExecutor Demo ===
# IO Task 1 starts
# IO Task 2 starts
# IO Task 3 starts
# IO Task received the content: 'content3'
# IO Task 3 ends
# future result: (3, 'content3')
# IO Task received the content: 'content2'
# IO Task 2 ends
# future result: (2, 'content2')
# IO Task received the content: 'content1'
# IO Task 1 ends
# future result: (1, 'content1')
# Total time taken for IO tasks: 1.01 seconds
#
# === ProcessPoolExecutor Demo ===
# IO Task 1 starts
# IO Task 2 starts
# IO Task 3 starts
# IO Task received the content: 'content1'
# IO Task 1 ends
# future result: (1, 'content1')
# IO Task received the content: 'content2'
# IO Task 2 ends
# IO Task received the content: 'content3'
# IO Task 3 ends
# future result: (2, 'content2')
# future result: (3, 'content3')
# Total time taken for CPU tasks: 1.46 seconds特别说明:
concurrent.futures.as_completed()是 Python 标准库中的一个函数,用于处理异步执行的多个任务- 输入:接收一个包含
Future对象的可迭代对象(如列表) - 返回:一个迭代器,该迭代器会在每个
Future完成时立即返回它的结果(按完成顺序,而非提交顺序)
- 输入:接收一个包含
multiprocessing.spawn 用法
multiprocessing.spawn是 Pythonmultiprocessing模块中用于启动子进程的方法,适用于需要在新进程中执行函数的场景,尤其在分布式训练(如 PyTorch 多GPU训练)中常用spawn会创建全新的Python解释器进程,每个子进程独立运行,拥有独立的内存空间,避免了多线程中的全局解释器锁(GIL)限制基本用法示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27import multiprocessing as mp
# 子进程要执行的函数
def worker_function(rank, world_size, shared_arg):
print(f"子进程 {rank}/{world_size} 启动,参数: {shared_arg}")
# 子进程的具体逻辑...
def main():
# 总进程数(例如等于GPU数量)
world_size = 4
# 共享参数(会被传递给每个子进程)
shared_arg = "hello from main"
# 启动子进程
# args 是传递给 worker_function 的参数元组(除了 rank 之外的参数)
mp.spawn(
worker_function, # 子进程执行的函数
args=(world_size, shared_arg), # 传递给函数的参数(第一个参数固定为 rank)
nprocs=world_size, # 子进程数量
join=True # 是否等待所有子进程结束后再继续
)
print("所有子进程执行完毕")
if __name__ == "__main__":
# 在Windows系统中必须放在 if __name__ == "__main__" 下
mp.set_start_method("spawn") # 显式指定启动方法(可选,默认可能为fork)
main()fn:子进程要执行的函数(第一个参数必须是rank,表示进程编号,从0开始)args:传递给函数的额外参数(元组形式)nprocs:要启动的子进程数量join:若为True,主进程会等待所有子进程执行完毕再继续daemon:是否将子进程设为守护进程(主进程退出时自动终止子进程)
注意:多线程没有完全对应的API ,只能通过
threading.Thread实现类似的多线程启动逻辑
使用 subprocess.Popen 函数启动进程
subprocess.Popen是 Python 标准库subprocess模块中的一个类,用于启动一个新的进程subprocess.Popen可以连接到其输入/输出/错误管道,并获得其返回码subprocess.Popen为更复杂的进程管理提供了灵活的接口,是替代os.system、os.spawn*、os.popen*等旧有方法的推荐方式
基本用法即常用参数说明
用法说明
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18import subprocess
p = subprocess.Popen(
args,
bufsize=-1,
executable=None,
stdin=None,
stdout=None,
stderr=None,
preexec_fn=None,
close_fds=True,
shell=False,
cwd=None,
env=None,
universal_newlines=False,
startupinfo=None,
creationflags=0
)subprocess.Popen常用参数详细说明见下文args- 字符串或序列(如列表)
- 指定要执行的命令及其参数- 当
shell=False时,推荐使用列表,如['ls', '-l']- 当
shell=True时,通常传递字符串,如"ls -l"
- 当
- 注:如果命令中包含空格或特殊字符,建议使用列表方式,避免命令解析错误
bufsize- 整数
- 设置缓冲策略
0:无缓冲(直接读写)1:行缓冲(文本模式下有效)- 其他正整数:指定缓冲区大小(以字节为单位)
-1(默认):使用系统默认缓冲策略
- 通常用法 :一般保持默认即可,特殊需求时才设置
executable- 字符串
- 指定要执行的程序的路径(用于替换默认可执行文件)
- 例如,
executable="/usr/bin/python3"可以强制使用指定的解释器
- 例如,
stdin, stdout, stderr- 取值可以是文件对象、文件描述符、
subprocess.PIPE、subprocess.DEVNULL、None、subprocess.STDOUT - 分别指定子进程的标准输入、输出、错误
None(默认):继承父进程的对应流subprocess.PIPE:创建管道,允许父进程与子进程通信subprocess.DEVNULL:丢弃输入/输出- 文件对象:如
open('output.txt', 'w'),将输出写入文件
- 举例:
stdout=subprocess.PIPE表示捕获标准输出,后续可通过p.communicate()返回读取;stderr=subprocess.PIPE表示捕获标准错误stdout=fp将标准输出写到指定文件中(常用a追加形式打开文件fp)stderr=subprocess.STDOUT将子进程的标准错误(stderr)也重定向到标准输出(stdout),即错误信息也写入日志文件
- 取值可以是文件对象、文件描述符、
preexec_fn- 可调用对象(函数)
- 在子进程启动前执行指定的函数(仅限Unix)
- 比如设置进程组、修改环境等
- 注:在Windows平台无效
close_fds- 布尔值
- 是否在子进程中关闭除
stdin/stdout/stderr以外的所有文件描述符- 默认:
True(Unix),False(Windows)
- 默认:
- 通常建议保持默认,除非有特殊文件句柄传递需求
- 默认定义是:
close_fds: bool = ...表示bool类型的占位,在实际调用subprocess.Popen时,close_fds的默认值由具体的实现决定(如在 Unix 下默认True,Windows 下默认False) - 也可以在调用时显式传递
close_fds=True或close_fds=False
- 默认定义是:
shell- 布尔值
- 是否通过 shell 运行命令
True:命令通过 shell 解析(如/bin/sh或cmd.exe),可用 shell 特性(如重定向、管道)False(默认):直接执行指定的程序
- 安全提示 :
shell=True存在命令注入风险,处理外部输入时需谨慎
cwd- 字符串
- 指定子进程的工作目录
- 举例:
cwd="/tmp"表示子进程在/tmp目录下运行
env- 字典
- 设置子进程的环境变量
- 若为
None,则继承父进程环境 - 可自定义环境变量,如:
env={"PATH": "/usr/bin", "USER": "test"}
- 若为
- 注:未指定的变量将丢失,需包含必需的环境变量
universal_newlines/text- 布尔值
- 指明子进程是否以文本模式处理输入输出(自动编码/解码)
True:与子进程通信时,输入输出为字符串(str),自动处理换行False(默认):以字节流处理(bytes)
- 处理文本数据时设为
True或text=True - 注:Python 3.7 以后,建议使用
text参数替代universal_newlines参数- 虽然
universal_newlines依然还在,但不建议使用 text和universal_newlines是等价参数,不能同时设置;如果同时传递,会抛出 ValueError 异常
- 虽然
startupinfo,creationflags(仅Windows)startupinfo:用于指定进程启动信息(如窗口显示方式)creationflags:用于指定进程创建标志(如subprocess.CREATE_NEW_CONSOLE)
其他参数
restore_signals(3.2+):是否恢复信号处理(Unix)start_new_session(3.2+):是否在新会话中启动进程(Unix)
用法示例
最简单的用法:
1
2
3import subprocess
p = subprocess.Popen(['ls', '-l']) # 启动进程并执行,此外不做任何操作捕获输出:
1
2
3
4
5import subprocess
p = subprocess.Popen(['ls', '-l'], stdout=subprocess.PIPE) # 启动进程并执行,同时创建管道,允许父进程与子进程通信
out, err = p.communicate() # 与主进程通信,并返回执行结果信息
print(out.decode()) # 若没有 stdout=subprocess.PIPE,这里的 out 是 None,不可以执行 decode() 命令通过 shell 运行命令:
1
2
3
4import subprocess
p = subprocess.Popen("echo Hello World", shell=True) # 启动进程并执行,此外不作任何操作,这里没有任何输出
out, err = p.communicate() # 与主进程通信,由于没有 stdout=subprocess.PIPE 和 stderr=subprocess.PIPE,out 和 err 均为 None传递输入:
1
2
3
4
5import subprocess
p = subprocess.Popen(['cat'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) # 启动进程并执行,cat命令启动进入等待输入,cat 命令启动后,接下来每次输入数据都会被原样输出
out, err = p.communicate(input=b'Hello\n') # 与主进程通信,并返回执行结果信息
print(out.decode())管道操作示例:
1
2
3
4
5
6
7import subprocess
p1 = subprocess.Popen(['ls'], stdout=subprocess.PIPE)
p2 = subprocess.Popen(['grep', 'py'], stdin=p1.stdout, stdout=subprocess.PIPE)
p1.stdout.close()
out, err = p2.communicate()
print(out.decode())
Popen 类对象相关常用方法汇总
communicate(input=None, timeout=None):- 与子进程交互,发送数据到 stdin,读取 stdout 和 stderr,等待进程结束
wait(timeout=None):- 等待子进程结束,返回退出码
poll():- 检查子进程是否结束,未结束返回 None
terminate():- 终止子进程(发送 SIGTERM)
kill():- 强制杀死子进程(发送 SIGKILL)
pid:- 子进程的进程号
returncode:- 子进程的返回码
使用注意事项
- 如果需要捕获输出,记得设置
stdout=subprocess.PIPE和/或stderr=subprocess.PIPE,否则无法通过communicate()获取输出 - 使用
shell=True时,命令通常以字符串形式传递,并注意安全风险(如命令注入) - 如果子进程输出很大,建议及时读取输出,避免死锁,详细理解见附录
- Python 3.5+ 推荐用
subprocess.run简化常见用法,但Popen适合更复杂的场景
附录:subprocess.Popen 死锁情况分析
问题说明:当子进程输出数据量很大时,父进程必须及时读取这些数据,否则操作系统管道缓冲区会被写满,导致子进程和父进程互相等待,程序陷入死锁
管道缓冲区有限 ,当用
subprocess.PIPE捕获子进程的标准输出(stdout)或标准错误(stderr)时,父进程和子进程之间通过一个操作系统管道通信这个管道是有缓冲区大小限制的(通常几 KB 到几十 KB,依赖操作系统)
如果子进程输出的数据量超过了缓冲区大小,而父进程没有及时读取这些数据,缓冲区会被写满
当缓冲区被写满后,子进程会被阻塞,无法继续写入新的输出(即子进程暂停在写操作)
如果此时父进程又在等待子进程结束(比如调用
wait()或communicate()),但没有及时读取管道内容,父子进程就会互相等待,导致死锁 :- 子进程等着缓冲区有空间继续输出;
- 父进程等着子进程结束,却没读取缓冲区内容;
- 结果两者都无法继续执行,程序卡死
问题代码示例:
1
2
3
4
5import subprocess
p = subprocess.Popen(['cat ./big_document.txt'], stdout=subprocess.PIPE)
p.wait() # 只等子进程结束,不读取输出
output = p.stdout.read() # 这一步可能永远无法执行到正确用法建议:及时读取输出 ,常用的做法是用
communicate(),它会在等待子进程结束的同时,自动持续读取所有输出,防止缓冲区写满:1
2
3
4import subprocess
p = subprocess.Popen(['cat ./big_document.txt'], stdout=subprocess.PIPE)
output, _ = p.communicate() # 推荐做法,自动避免死锁如果需要实时处理输出,可以循环读取:
1
2
3
4
5
6import subprocess
p = subprocess.Popen(['cat ./big_document.txt'], stdout=subprocess.PIPE)
for line in p.stdout:
process(line) # 逐行处理,及时清空缓冲区
p.wait()
附录:使用 subprocess.call 函数启动进程
subprocess.call可用于启动进程执行外部命令,是高层封装的函数,本质上是Popen的简化接口- 它内部会创建
Popen对象并等待命令执行完成,返回命令的退出码 - 适用于简单场景,只需知道是否成功(退出码),无需复杂交互
- 它内部会创建
subprocess.Popen是底层核心类,提供最完整的功能和灵活性- 它直接创建进程对象,允许用户与子进程进行复杂交互(如输入/输出处理、异步执行等)
- 适合需要精细控制的场景
subprocess.call 的使用示例
call()是阻塞式的:调用后会等待命令执行完毕才返回,返回值是命令的退出码(0 表示成功),示例:1
2import subprocess
ret_code = subprocess.call(["echo", "hello"]) # 等待命令完成,返回 0注:
Popen默认是非阻塞式的:创建进程后立即返回Popen对象,不会等待命令结束(需显式调用wait()或communicate()等待完成),示例如下:1
2
3import subprocess
proc = subprocess.Popen(["echo", "hello"]) # 立即返回,不等待
ret_code = proc.wait() # 手动等待命令完成,获取退出码注:
Popen支持更多高级操作,而call()不支持,比如Popen支持输入/输出重定向(通过stdin/stdout/stderr与子进程交互),但call不支持:1
2
3# 捕获命令输出
proc = subprocess.Popen(["ls"], stdout=subprocess.PIPE)
output, _ = proc.communicate() # 获取输出Popen支持异步执行,可以在命令运行时做其他事情,再回头处理结果Popen支持信号处理,可通过send_signal()向子进程发送信号(如终止进程)Popen支持管道操作,可多个Popen对象可通过管道连接(类似 Linux 管道|)
使用 subprocess.check_all 启动进程并监控失败异常
subprocess.check_call(args, ...)执行指定的命令,等待命令运行结果的返回码(return code),同时还具备call没有的抛出异常功能- 如果命令执行成功(返回码为 0),则无返回值(或说返回 0);
- 如果命令执行失败(返回码非 0),则会抛出
subprocess.CalledProcessError异常(call不会抛出异常) - 示例:
1
2
3
4
5
6import subprocess
try:
subprocess.check_call(["ls", "-l"]) # 执行 ls -l 命令
print("命令执行成功")
except subprocess.CalledProcessError as e:
print(f"命令执行失败,返回码:{e.returncode}")
使用 subprocess.check_output 启动进程、监控失败异常,并读取输出结果
subprocess.check_output(args, ...)执行指定的命令,并返回命令的输出结果(stdout),同时还具备call没有的抛出异常功能- 如果命令执行成功(返回码为 0),返回输出内容(字节串,可通过
text=True参数转为字符串); - 如果命令执行失败(返回码非 0),则会抛出
subprocess.CalledProcessError异常 - 示例:
1
2
3
4
5
6import subprocess
try:
result = subprocess.check_output(["echo", "hello"], text=True)
print(f"命令输出:{result.strip()}") # 输出:hello
except subprocess.CalledProcessError as e:
print(f"命令执行失败,返回码:{e.returncode}")
- 如果命令执行成功(返回码为 0),返回输出内容(字节串,可通过
与
subprocess.check_call(args, ...)的区别为,subprocess.check_output(args, ...)返回值为输出结果而不是执行状态(返回码, return code)
附录:multiprocessing.Pool 的用法
- 使用
multiprocessing.Pool的示例1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73import multiprocessing
import time
import random
# 定义一个待并行执行的任务函数
def process_task(x):
time.sleep(random.uniform(0.1, 0.5)) # 模拟任务耗时
result = x * x
print(f"处理 {x} -> {result} (进程ID: {multiprocessing.current_process().pid})")
return result
def main():
# 生成待处理的数据
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
print(f"原始数据: {data}\n")
# 创建进程池(默认使用 CPU 核心数,也可指定 processes 参数)
with multiprocessing.Pool(processes=4) as pool: # 推荐使用 with 语句管理进程池的资源
# 使用 map:阻塞式,返回列表(按输入顺序)
print("=== 使用 pool.map ===")
map_results = pool.map(process_task, data)
print(f"map 结果: {map_results}\n")
# 使用 imap:非阻塞式,返回迭代器(按输入顺序,逐步获取结果),注:返回的迭代器只能遍历一次,且遍历过程中会阻塞等待对应任务完成
print("=== 使用 pool.imap ===")
imap_iter = pool.imap(process_task, data)
# 迭代获取结果(每次迭代会阻塞直到对应任务完成,完成一个返回一个)
imap_results = [res for res in imap_iter] # 注:返回结果是有序的,与 data 的序一致
print(f"imap 结果: {imap_results}\n")
# imap 高阶用法,可指定单次分配给进程的任务数量,可避免多次分配 (下面的代码每批给进程池分配 50 个任务)
imap_iter = pool.imap(process_task, data, chunksize=50) # 默认 chunksize=1
# for 循环读取方式(读取到的结果是有序的,与输入数据 data 的顺序一致),注:pool.imap(process_task, data) 的结果也可以这样读取
for i, result in enumerate(imap_iter):
if i < 5:
print(result) # 依次输出:1,2,3,4,5
else:
break
# 使用 imap_unordered:非阻塞式,返回迭代器(按任务完成顺序),注:返回的迭代器只能遍历一次,且遍历过程中会阻塞等待对应任务完成
print("=== 使用 pool.imap_unordered ===")
imap_unordered_iter = pool.imap_unordered(process_task, data)
unordered_results = [res for res in imap_unordered_iter]
print(f"imap_unordered 结果(无序): {unordered_results}\n")
# 使用 apply:单次提交任务(阻塞式,适合单个任务)
print("=== 使用 pool.apply ===")
single_result = pool.apply(process_task, args=(100,)) # 传入单个参数
print(f"apply 单个结果: {single_result}\n")
# 使用 starmap:处理多参数任务(类似 map,但支持元组拆包)
print("=== 使用 pool.starmap ===")
# 定义一个多参数函数
def multi_param_task(a, b):
return a + b
# 数据为元组列表(每个元组对应一组参数)
multi_data = [(1, 2), (3, 4), (5, 6)]
starmap_results = pool.starmap(multi_param_task, multi_data)
print(f"starmap 结果: {starmap_results}")
# 使用 starmap_async:异步版本的starmap(非阻塞)
print("=== 使用 pool.starmap_async ===")
# 提交异步任务,立即返回AsyncResult对象,不阻塞主进程
async_result = pool.starmap_async(multi_param_task, multi_data)
# 可以在这里执行其他操作(演示非阻塞特性)
print("等待异步任务完成...")
time.sleep(0.5) # 模拟主进程其他工作
# 获取结果(get()方法会阻塞直到任务完成)
starmap_async_results = async_result.get()
print(f"starmap_async 结果: {starmap_async_results}")
if __name__ == "__main__":
main()
相关核心函数说明
pool.map(func, iterable)- 阻塞式:等待所有任务完成后返回结果列表
- 结果顺序与输入
iterable一致 - 适合简单的单参数任务
pool.imap(func, iterable)- 非阻塞式:返回一个迭代器,可逐步获取结果(迭代时会阻塞直到对应任务完成)
- 结果顺序与输入一致,适合处理大量数据时节省内存(无需等待全部完成)
pool.imap_unordered(func, iterable)- 非阻塞式:返回迭代器,但结果顺序与任务完成顺序一致(不保证输入顺序)
- 适合对结果顺序无要求的场景,可更快获取部分结果
pool.apply(func, args)- 阻塞式:单次提交一个任务,
args为函数参数 - 适合偶尔提交单个任务,效率较低(不建议批量使用)
- 阻塞式:单次提交一个任务,
pool.starmap(func, iterable_of_tuples)- 类似
map,但支持多参数函数:iterable_of_tuples中的每个元组会被拆分为函数的参数(如(a,b)对应func(a,b))
- 类似
注意事项
- 进程池使用
with语句可自动关闭,无需手动调用pool.close()和pool.join() imap和imap_unordered返回的迭代器只能遍历一次,且遍历过程中会阻塞等待对应任务完成