ProcessPoolExecutor VS ThreadPoolExecutor 进程池对比线程池
示例一: I/O 场景——10 个网页并发下载 + 实时进度
结果
多线程: 100%|██████████| 10/10 [00:07<00:00, 1.41it/s]
【多线程】I/O 并发总耗时:7.10s
多进程: 100%|██████████| 10/10 [00:06<00:00, 1.66it/s]
【多进程】I/O 并发总耗时:6.09s
示例代码
# -*- coding: utf-8 -*-
# ProcessPoolExecutor VS ThreadPoolExecutor 进程池对比线程池
import os
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor
import timeimport requests
from tqdm import tqdm# 核心 : concurrent.futures 标准库# 示例一 I/O 场景——10 个网页并发下载 + 实时进度urls = [f"https://httpbin.org/delay/2?i={i}" for i in range(10)]def fetch(url):response = requests.get(url)return url, len(response.text)def io_demo(func, desc=""):with func(max_workers=os.cpu_count()) as executor:futures = [executor.submit(fetch, u) for u in urls]for f in tqdm(futures, total=len(urls), desc=desc):url, size = f.result()# print(f"URL:{url} 长度:{size}")if __name__ == '__main__':start_time = time.time()io_demo(ThreadPoolExecutor, desc='多线程')print(f"【多线程】I/O 并发总耗时:{time.time() - start_time:.2f}s")start_time = time.time()io_demo(ProcessPoolExecutor, desc='多进程')print(f"【多进程】I/O 并发总耗时:{time.time() - start_time:.2f}s")
示例二: CPU 场景——12 核并行计算 π 的猛烈逼近
结果
【多线程】π ≈ 3.141650
【多线程】CPU 并发总耗时:2.18s
【多进程】π ≈ 3.141946
【多进程】CPU 并发总耗时:0.70s
示例代码
# -*- coding: utf-8 -*-
# ProcessPoolExecutor VS ThreadPoolExecutor 进程池对比线程池
import os
import random
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor
import timeimport requests
from tqdm import tqdm# 核心 : concurrent.futures 标准库# 示例二:CPU 场景。 12 核并行计算 π 的猛烈逼近# 蒙特卡洛法近似计算 π
def pi_partial(samples):inside = 0rnd = random.Random()for _ in range(samples):x = rnd.random()y = rnd.random()if x * x + y * y < 1:inside += 1return insidedef cpu_demo(func, desc=""):workers = os.cpu_count() # 使用CPU核心数samples_per_worker = 5_000_000 # 每个核心计算5百万样本with func(max_workers=workers) as executor:futures = [executor.submit(pi_partial, samples_per_worker) for _ in range(workers)]inside_total = sum(f.result() for f in futures)# 等价于:4 * (总命中数) / (总样本数)pi_estimate = 4 * inside_total / (samples_per_worker * workers)return pi_estimateif __name__ == '__main__':start_time = time.time()thread_res = cpu_demo(ThreadPoolExecutor, desc='多线程')print(f"【多线程】π ≈ {thread_res:.6f}")print(f"【多线程】CPU 并发总耗时:{time.time() - start_time:.2f}s")start_time = time.time()process_res = cpu_demo(ProcessPoolExecutor, desc='多进程')print(f"【多进程】π ≈ {process_res:.6f}")print(f"【多进程】CPU 并发总耗时:{time.time() - start_time:.2f}s")
concurrent.futures 核心 API
方法 | 用途 | 细节要点 |
---|---|---|
submit(fn, *args, **kwargs) | 提交单任务,返回 Future | 立即返回,不阻塞 |
map(fn, iterable, timeout=None, chunksize=1) | 批量任务,结果按输入顺序返回 | 阻塞直到全部完成 |
as_completed(fs, timeout=None) | 谁先完成先拿结果 | 适合进度条、实时反馈 |
wait(fs, return_when=...) | 阻塞到全部/任一/首个任务完成 | 与 threading.Event 类似 |
Future.result() | 获取任务结果 / 抛异常 | 可加 timeout |
Future.cancel() | 取消未开始的任务 | 已运行不会终止 |
上下文管理器 | with Executor(...) as ex: | 自动 shutdown(wait=True) |
|记住:全部都围绕 Future 这一轻量对象。Future 包含 状态、结果、异常、取消接口,让主线程放心拿回信息。
参考
https://zhuanlan.zhihu.com/p/1923728223172265104