importmultiprocessingimporttimeimportrandomfrom typingimportDict,List,Tuple #=====================模拟USB设备通信(可替换为真实USB库)=====================classUSBDeviceSimulator:"""模拟USB设备(真实场景替换为pyusb/pyserial等库的调用)"""def__init__(self,device_id:str):self.device_id=device_id # 设备唯一标识(如USB端口号/设备SN) self.is_connected=False defconnect(self)->bool:"""连接USB设备(真实场景:打开USB端口/建立通信)"""try:# 模拟连接耗时与随机失败(真实场景:调用USB库的连接接口) time.sleep(random.uniform(0.1,0.3))self.is_connected=random.choice([True]*9+[False])#90%连接成功率print(f"[设备{self.device_id}] 连接{'成功' if self.is_connected else '失败'}")returnself.is_connected except Exception as e:print(f"[设备{self.device_id}] 连接异常: {str(e)}")self.is_connected=FalsereturnFalse defsend_command(self,cmd:str)->str:"""发送指令到USB设备(真实场景:USB数据发送)"""ifnotself.is_connected:return"ERROR: 设备未连接"try:# 模拟指令处理耗时 time.sleep(random.uniform(0.05,0.2))# 模拟不同指令的响应(产测常用指令示例) cmd_responses={"GET_VERSION":f"VERSION:V1.0.{self.device_id}","TEST_LED":"LED:OK","TEST_USB_SPEED":f"SPEED:{random.choice(['HIGH', 'FULL'])}","TEST_COMM":"COMM:OK"}# 模拟10%指令响应失败(真实场景:校验USB返回的原始数据)ifrandom.random()<0.1:returnf"ERROR: {cmd}响应超时"returncmd_responses.get(cmd,f"UNKNOWN_CMD:{cmd}")except Exception as e:returnf"ERROR: {str(e)}"defdisconnect(self):"""断开USB设备(真实场景:关闭USB端口)"""self.is_connected=Falseprint(f"[设备{self.device_id}] 断开连接")#=====================单设备测试进程=====================defdevice_test_worker(device_id:str,task_queue:multiprocessing.Queue,result_queue:multiprocessing.Queue):""" 单个USB设备的测试进程:param device_id:设备唯一标识:param task_queue:主进程下发的任务队列:param result_queue:子进程回传的结果队列""" # 初始化设备 usb_device=USBDeviceSimulator(device_id)test_result={"device_id":device_id,"connect_status":False,"test_items":{},"total_pass":False,"error_msg":""}try:#1.连接设备 test_result["connect_status"]=usb_device.connect()ifnottest_result["connect_status"]:test_result["error_msg"]="设备连接失败"result_queue.put(test_result)return#2.执行产测指令(从任务队列获取测试项)whilenottask_queue.empty():test_item=task_queue.get()iftest_item=="EXIT":# 退出信号break# 发送测试指令并记录结果 response=usb_device.send_command(test_item)test_result["test_items"][test_item]={"response":response,"pass":"ERROR"notin response # 简单校验:无ERROR即为通过}#3.判定整体测试结果 test_result["total_pass"]=all([item["pass"]foritem in test_result["test_items"].values()])test_result["error_msg"]=""iftest_result["total_pass"]else"部分测试项失败"except Exception as e:test_result["error_msg"]=f"测试异常: {str(e)}"test_result["total_pass"]=False finally:#4.清理资源并返回结果 usb_device.disconnect()result_queue.put(test_result)print(f"[设备{device_id}] 测试进程结束")#=====================主进程(上位机控制逻辑)=====================classUSBTestHost:"""产测上位机主控制类(一托多USB设备测试)"""def__init__(self,device_ids:List[str]):self.device_ids=device_ids # 待测试的USB设备ID列表 self.task_queues:Dict[str,multiprocessing.Queue]={}# 每个设备的任务队列 self.result_queue=multiprocessing.Queue()# 所有设备的结果队列 self.processes:Dict[str,multiprocessing.Process]={}# 设备对应的进程 definit_tasks(self,test_items:List[str])->None:"""初始化每个设备的测试任务队列"""fordev_id in self.device_ids:task_queue=multiprocessing.Queue()# 放入测试项foritem in test_items:task_queue.put(item)# 放入退出信号(所有任务执行完后退出) task_queue.put("EXIT")self.task_queues[dev_id]=task_queue defstart_test(self)->List[Dict]:"""启动所有设备的测试进程"""#1.初始化测试任务(产测指令列表) test_items=["GET_VERSION","TEST_LED","TEST_USB_SPEED","TEST_COMM"]self.init_tasks(test_items)#2.启动每个设备的测试进程print(f"\n===== 启动{len(self.device_ids)}台USB设备测试 =====")fordev_id in self.device_ids:p=multiprocessing.Process(target=device_test_worker,args=(dev_id,self.task_queues[dev_id],self.result_queue))self.processes[dev_id]=p p.start()print(f"[主进程] 设备{dev_id}测试进程已启动 (PID:{p.pid})")#3.等待所有进程结束并收集结果 test_results=[]fordev_id,p in self.processes.items():p.join(timeout=10)# 设置超时,避免进程卡死ifp.is_alive():p.terminate()# 超时终止进程 test_results.append({"device_id":dev_id,"connect_status":False,"test_items":{},"total_pass":False,"error_msg":"测试进程超时"})#4.从结果队列获取所有测试结果whilenotself.result_queue.empty():test_results.append(self.result_queue.get())returntest_results defprint_test_summary(self,results:List[Dict])->None:"""打印测试汇总报告"""print("\n===== USB设备产测汇总报告 =====")total_devices=len(results)pass_devices=sum([1forres in resultsifres["total_pass"]])connect_fail=sum([1forres in resultsifnotres["connect_status"]])print(f"测试总数: {total_devices} 台")print(f"通过数: {pass_devices} 台")print(f"连接失败: {connect_fail} 台")print(f"通过率: {pass_devices / total_devices * 100:.1f}%")# 打印每台设备的详细结果forres in results:print(f"\n--- 设备{res['device_id']} ---")print(f"连接状态: {'成功' if res['connect_status'] else '失败'}")print(f"整体结果: {'PASS' if res['total_pass'] else 'FAIL'}")ifres["error_msg"]:print(f"错误信息: {res['error_msg']}")# 打印各测试项结果fortest_item,item_res in res["test_items"].items():print(f" {test_item}: {item_res['response']} ({'PASS' if item_res['pass'] else 'FAIL'})")#=====================运行示例=====================if__name__=="__main__":# 模拟5台USB设备(真实场景:枚举系统中的USB设备,获取设备ID/端口号) device_ids=["USB-001","USB-002","USB-003","USB-004","USB-005"]# 初始化上位机并启动测试 test_host=USBTestHost(device_ids)test_results=test_host.start_test()# 打印测试汇总 test_host.print_test_summary(test_results)python 多进程
张小明
前端开发工程师
【毕业设计】基于springboot的学院停车场管理系统(源码+文档+远程调试,全bao定制等)
博主介绍:✌️码农一枚 ,专注于大学生项目实战开发、讲解和毕业🚢文撰写修改等。全栈领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战 ✌️技术范围:&am…
如何制作一个简单的 .deb Debian 包 ?
制作自定义的 Debian 包(.deb 文件)是一项在 Debian、Ubuntu 等 Linux 发行版上高效分发软件的重要技能。本指南将引导您完成创建一个简单 .deb 包的全过程。 核心概念:.deb 包的结构 一个 .deb 文件本质上是使用 ar命令打包的归档文件&…
如何利用SQL计算ABC库存分类(帕累托分析)?
目录 一、核心概念:ABC分类与帕累托法则 1. ABC库存分类法定义 2. 帕累托法则(二八定律) 3. 为什么用SQL窗口函数实现? 二、SQL实现:完整ABC分类方案 1. 需求明确 2. 数据准备 3. NTILE(n) 函数简介 核心定义 基本语法 特点 分配算法原理 4. 完整SQL代码 5.…
RAG 检索模型如何学习:三种损失函数的机制解析
Agent 系统发展得这么快那么检索模型还重要吗?RAG 本身都已经衍生出 Agentic RAG和 Self-RAG(这些更复杂的变体了。 答案是肯定的,无论 Agent 方法在效率和推理上做了多少改进,底层还是离不开检索。检索模型越准,需要…
【毕业设计】基于springboot的食品安全管理系统(源码+文档+远程调试,全bao定制等)
博主介绍:✌️码农一枚 ,专注于大学生项目实战开发、讲解和毕业🚢文撰写修改等。全栈领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战 ✌️技术范围:&am…
亿可达×飞书:一键搞定定时群通知,告别人工重复提醒
有没有过这样的职场日常? 每天下午临近下班,都要特意定个闹钟提醒自己:“别忘了发例会通知”“记得同步今日工作小结到飞书群”;每周一早上,总要专门抽5分钟,在部门群推送本周任务清单;甚至节假…