当前位置: 首页 > news >正文

多进程

import time
from concurrent.futures import ProcessPoolExecutor, TimeoutError
from pydantic import BaseModel
import multiprocessing as mp
import os
import signalclass SharedData(BaseModel):value: int = 0def worker(shared_value, data_dict):# 真实场景中的任务,没有循环检查stop_eventfor i in range(30):data_dict['value'] = ishared_value.value = i * 10print(f'子进程 data_dict:{data_dict["value"]} shared_value:{shared_value.value}')time.sleep(1)return data_dict['value'] + shared_value.valuedef init_worker():"""设置子进程的信号处理"""signal.signal(signal.SIGTERM, lambda sig, frame: os._exit(0))if __name__ == '__main__':manager = mp.Manager()shared_value = manager.Value('i', 0)data_dict = manager.dict({'value': 0})with ProcessPoolExecutor(initializer=init_worker) as executor:future = executor.submit(worker, shared_value, data_dict)try:# 主进程监控5秒for count in range(5):print(f'\n第 {count + 1} 次检查 - 状态: {"运行中" if future.running() else "完成" if future.done() else "等待"}')print(f'主进程 shared_value:{shared_value.value} data_dict:{data_dict["value"]}')time.sleep(1)# 检查是否完成if not future.done():print("\n任务超时,强制终止子进程...")# 获取子进程PID并发送终止信号for pid, process in executor._processes.items():if process.is_alive():os.kill(pid, signal.SIGTERM)raise TimeoutError("任务超时")result = future.result()print(f"Result: {result}")except TimeoutError:print("子进程已被强制终止")future.cancel()finally:executor.shutdown(wait=True)print(f"最终值 - shared: {shared_value.value}, dict: {data_dict['value']}")

  

 

 

 

import time
from concurrent.futures import ProcessPoolExecutor, TimeoutError
from pydantic import BaseModel
import multiprocessing as mp
import signal
import osclass SharedData(BaseModel):value: int = 0def worker(shared_value, data_dict, stop_event):print('子进程PID', os.getpid())i = 0while i < 30 and not stop_event.is_set():# data_dict['value'] = i# shared_value.value = i * 10print(f'子进程 data_dict:{data_dict["value"]} shared_value:{shared_value.value}')i += 1time.sleep(1)return data_dict['value'] + shared_value.valueif __name__ == '__main__':print('主进程PID', os.getpid())manager = mp.Manager()shared_value = manager.Value('i', 0)data_dict = manager.dict({'value': 0})stop_event = manager.Event()with ProcessPoolExecutor() as executor:future = executor.submit(worker, shared_value, data_dict, stop_event)try:# 主进程监控5秒for _ in range(5):print("future.running() =>", future.running())print(f'主进程 shared_value:{shared_value.value} data_dict:{data_dict["value"]}')time.sleep(1)# 检查子进程是否完成result = future.result(timeout=0.1)print(f"Result: {result}")except TimeoutError:print("任务超时,通知子进程停止...")stop_event.set()  # 通知子进程优雅停止try:# 给子进程一些时间进行清理result = future.result(timeout=0.1)print(f"子进程已优雅停止,Result: {result}")except TimeoutError:print("子进程未及时响应,强制取消...")future.cancel()# 正常关闭执行器,等待所有进程完成executor.shutdown(wait=True)print("future.running() =>", future.running())print(f"最终值 - shared: {shared_value.value}, dict: {data_dict['value']}")

  

http://www.hskmm.com/?act=detail&tid=7522

相关文章:

  • 93. 递归实现组合型枚举
  • Sort方法学习(伪代码记录)
  • 深入解析:【每日一问】运算放大器与比较器有什么区别?
  • 9.17支配对问题专题总结
  • Xじゃないか
  • 开源收银体系_大型收银系统源码_OctShop
  • XXL-JOB(2)
  • P9753 [CSP-S 2023] 消消乐
  • 9.16 CSP-S模拟22 改题记录
  • 记录知识
  • AT_agc058_b [AGC058B] Adjacent Chmax
  • Jenkins CVE-2018-1000600漏洞利用与SSRF攻击分析
  • NOIP 集训日记(学术)
  • linux中mysql如何远程连接
  • 详细介绍:Python:OpenCV 教程——从传统视觉到深度学习:YOLOv8 与 OpenCV DNN 模块协同实现工业缺陷检测
  • 深入解析:PYcharm——pyqt音乐播放器
  • Day02
  • 专题:Python实现贝叶斯线性回归与MCMC采样数据可视化分析2实例|附代码数据
  • 威联通NAS如何导入本地docker镜像
  • 一种将离散化状态方程映射为并行多处理器计算机的方法
  • 基本数据类型题目
  • 一种基于动作指令交互的动态活体检测技术,提升人脸识别安全性
  • 2025.9.17
  • mysql库缺失
  • flask简单路由(视图函数)
  • 【学习笔记】拉格朗日插值
  • [系统] Windows 已有office版本和visio不兼容的解决方案
  • CF 2127F Hamed and AghaBalaSar
  • AT_agc055_b [AGC055B] ABC Supremacy
  • “Sequential Thinking MCP Server 和codex等AI工具本身任务拆解功能对比