一、模块简介
concurrent.futures
是 Python 标准库提供的 高级并发接口,用来执行多线程或多进程任务。
特点:
-
简化线程/进程管理;
-
提供统一接口
Executor
; -
支持异步结果
Future
对象; -
支持任务异常捕获和超时控制。
核心执行器类:
执行器类 | 说明 |
---|---|
ThreadPoolExecutor |
使用线程池并发执行,适合 I/O 密集型任务(如网络请求、文件操作) |
ProcessPoolExecutor |
使用进程池并发执行,适合 CPU 密集型任务(如数据计算、图像处理) |
二、核心组件
1️⃣ Executor(执行器)
Executor
是一个抽象类,不能直接使用。
它有两个主要子类:
类名 | 说明 | 适用场景 |
---|---|---|
ThreadPoolExecutor |
线程池执行器 | I/O 密集型任务(网络请求、文件操作) |
ProcessPoolExecutor |
进程池执行器 | CPU 密集型任务(数据计算、图像处理) |
Executor 方法
方法 | 说明 |
---|---|
submit(fn, *args, **kwargs) |
提交函数异步执行,返回 Future 对象 |
map(func, *iterables) |
并发执行多个任务,返回结果迭代器 |
shutdown(wait=True) |
关闭池,释放资源,通常用 with 自动调用 |
2️⃣ Future(未来对象)
Future
对象代表一个异步执行的任务结果。当任务提交后立即返回一个 Future
,结果可能还没准备好。我们可以用它来查询、等待或获取任务执行的结果。
常用方法:
方法 | 说明 |
---|---|
.result(timeout=None) |
获取结果(阻塞直到完成) |
.done() |
检查任务是否完成 |
.running() |
检查任务是否正在执行 |
.cancel() |
尝试取消任务 |
.add_done_callback(fn) |
任务完成后自动调用回调函数 |
exception() |
获取任务执行中抛出的异常 |
3️⃣ 辅助函数
函数 | 说明 |
---|---|
as_completed(fs) |
按任务完成顺序返回 Future 对象 |
wait(fs, timeout=None) |
等待所有或部分 Future 完成 |
三、ThreadPoolExecutor 示例(多线程)
🌟 示例 1:简单线程池执行
from concurrent.futures import ThreadPoolExecutor
import timedef download_file(name):print(f"{name} 开始下载...")time.sleep(2)print(f"{name} 下载完成!")return f"{name} 完成"# 创建线程池
with ThreadPoolExecutor(max_workers=3) as executor:futures = [executor.submit(download_file, f"文件{i}") for i in range(5)]# 等待任务结果for f in futures:print(f.result())
输出:
文件0 开始下载...
文件1 开始下载...
文件2 开始下载...
文件3 开始下载...
文件4 开始下载...
文件0 下载完成!
文件1 下载完成!
...
🧩 说明:
-
max_workers
:同时运行的最大线程数。 -
submit(func, *args)
:提交任务,立即返回一个Future
。 -
result()
:阻塞直到任务完成,返回函数的返回值。
🌟 示例 2:使用 map()
简化写法
from concurrent.futures import ThreadPoolExecutor
import timedef square(n):time.sleep(1)return n * nwith ThreadPoolExecutor(max_workers=4) as executor:results = executor.map(square, [1, 2, 3, 4, 5])for r in results:print(r)
🧩 特点:
-
map()
自动调度任务,无需手动管理Future
。 -
返回一个迭代器,按任务提交顺序返回结果。
🌟 示例 3:线程池并发计算
from concurrent.futures import ThreadPoolExecutor, as_completed
import timedef compute_square(n):time.sleep(1) # 模拟耗时任务return n * nnums = [1, 2, 3, 4, 5]
results = {}with ThreadPoolExecutor(max_workers=3) as executor:# 提交任务future_to_num = {executor.submit(compute_square, n): n for n in nums}# 按完成顺序获取结果for future in as_completed(future_to_num):num = future_to_num[future] # 找到当前任务对应的输入值try:results[num] = future.result() # 获取任务返回值并保存到字典except Exception as e:print(f'Task {num} generated an exception: {e}')print(results)
🔍 关键点解析
1️⃣ future_to_num[future]
future_to_num
是一个字典:
{Future1: 1, Future2: 2, Future3: 3, ...}
-
key:提交任务的
Future
对象 -
value:任务对应的标识(这里是输入数字)
-
用途:按任务完成顺序识别是哪一个任务完成。
2️⃣ results[num] = future.result()
-
future.result()
:阻塞等待当前任务完成,获取返回值 -
将结果保存到
results
字典中,用任务输入作为 key。 -
如果任务执行失败,会抛出异常;通过
try/except
捕获。
3️⃣ as_completed(future_to_num)
-
按任务 完成顺序 返回
Future
对象,而不是提交顺序 -
可实现 最快先处理 的策略,提高响应效率
🔁 逻辑流程图
提交任务:
Future1 → compute_square(1)
Future2 → compute_square(2)
Future3 → compute_square(3)as_completed 遍历顺序(可能不同于提交顺序):
Future2 → Future1 → Future3循环:
future = Future2
num = 2
results[2] = Future2.result() # 4最终 results:
{1: 1, 2: 4, 3: 9}
四、ProcessPoolExecutor 示例(多进程)
用于CPU 密集型任务(例如:图像处理、大数据计算、加密解密)。
from concurrent.futures import ProcessPoolExecutor
import math, timedef intensive_task(n):print(f"计算 {n} 的阶乘")return math.factorial(n)if __name__ == '__main__':with ProcessPoolExecutor(max_workers=4) as executor:results = executor.map(intensive_task, [100000, 120000, 150000, 200000])for r in results:print(len(str(r))) # 输出结果长度
🧩 注意:
-
必须放在
if __name__ == '__main__':
保护下(Windows 系统必须,否则进程会无限递归创建)。 -
每个子进程是独立的 Python 进程,不共享内存。
五、回调函数使用(add_done_callback
)
from concurrent.futures import ThreadPoolExecutor
import timedef task(n):time.sleep(1)return n * 2def callback(future):print(f"任务完成,结果:{future.result()}")with ThreadPoolExecutor() as executor:future = executor.submit(task, 10)future.add_done_callback(callback)
🧩 回调函数常用于:
-
异步通知任务完成
-
自动写日志、更新UI、触发后续操作等
六、等待任务:as_completed()
与 wait()
1️⃣ as_completed
(谁先完成谁先返回)
from concurrent.futures import ThreadPoolExecutor, as_completed
import time, randomdef work(x):time.sleep(random.randint(1, 3))return x * xwith ThreadPoolExecutor(max_workers=3) as executor:futures = [executor.submit(work, i) for i in range(5)]for f in as_completed(futures):print("完成:", f.result())
🧩 输出顺序不固定,取决于任务完成时间。
2️⃣ wait
(等待所有或部分完成)
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED
import timedef task(x):time.sleep(x)return xwith ThreadPoolExecutor(max_workers=3) as executor:futures = [executor.submit(task, i) for i in [1, 2, 3]]done, not_done = wait(futures, return_when=FIRST_COMPLETED)print("已完成任务数:", len(done))
七、实战案例:多接口数据抓取(模拟网络请求)
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import randomdef fetch_api_data(api_name):"""模拟 API 请求"""time.sleep(random.uniform(0.5, 2))if random.random() < 0.2: # 模拟请求失败raise ValueError("API Error")return f"{api_name} data"api_list = ["users", "posts", "comments"]
results = {}with ThreadPoolExecutor(max_workers=5) as executor:future_to_api = {executor.submit(fetch_api_data, api): api for api in api_list}for future in as_completed(future_to_api):api_name = future_to_api[future]try:results[api_name] = future.result()print(f"{api_name} 请求成功")except Exception as e:print(f"{api_name} 请求失败: {e}")results[api_name] = Noneprint(results)
-
future_to_api
= 任务映射表 -
future.result()
= 获取返回值或抛出异常 -
通过字典存储结果,保证每个接口都能被记录,即使失败也不会中断程序
八、注意事项与优化建议
问题 | 原因 | 解决方案 |
---|---|---|
任务执行顺序与提交顺序不一致 | as_completed 按完成顺序返回 |
正常现象,非错误 |
CPU 密集型任务低效 | 使用线程池,受 GIL 限制 | 改用 ProcessPoolExecutor |
任务异常未捕获 | future.result() 抛出异常 |
使用 try/except 捕获 |
任务过多导致系统负荷 | max_workers 过大 |
控制线程/进程数,合理设置池大小 |
结果管理混乱 | 没有映射 Future → 任务标识 | 使用字典映射 (future_to_func ) |
九、总结
-
concurrent.futures
提供了 线程池/进程池 + Future 的统一接口 -
核心思想:
-
提交任务 →
executor.submit()
-
获取 Future →
future
-
按完成顺序获取结果 →
as_completed()
-
异常处理 + 保存结果 →
future.result()
+try/except
-
-
小技巧:
-
future_to_func
/future_to_api
映射字典是管理多个异步任务的最佳实践 -
遇到 I/O 密集型用线程池,CPU 密集型用进程池
-
异常捕获保证程序健壮性
-