news 2026/6/9 18:49:06

can(6) canopen python库使用

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
can(6) canopen python库使用

1.发送消息

# canopen_sender.py - CANopen 发送端(主节点) import canopen import can import time def start_canopen_sender(): """启动 CANopen 发送端,连接虚拟总线并发送消息""" # 统一的虚拟总线配置(必须与 bus_creator.py 一致) VIRTUAL_CHANNEL = "virtual_canopen_bus" BITRATE = 500000 MASTER_NODE_ID = 1 # 主节点 ID SLAVE_NODE_ID = 2 # 接收端(从节点)ID # 1. 初始化 CANopen 网络,连接虚拟总线 network = canopen.Network() network.connect( interface="virtual", channel=VIRTUAL_CHANNEL, bitrate=BITRATE ) print(f"✅ CANopen 发送端已连接虚拟总线:{VIRTUAL_CHANNEL}") master_node = network.add_node(MASTER_NODE_ID, object_dictionary=None) # 2. 发送 CANopen 心跳报文(主节点自身心跳) print("\n===== 发送主节点心跳报文 =====") heartbeat_id = 0x700 + MASTER_NODE_ID # CANopen 心跳 ID 规则:0x700 + 节点ID heartbeat_msg = can.Message( arbitration_id=heartbeat_id, data=[0x05], # 0x05=运行状态 is_extended_id=False, channel=VIRTUAL_CHANNEL ) network.bus.send(heartbeat_msg) print(f"📤 发送心跳报文:ID=0x{heartbeat_id:X},数据=0x05(运行状态)") # 3. 发送 SDO 请求(向从节点读取设备名称 0x1008:0x00) print("\n===== 发送 SDO 读取请求 =====") sdo_request_id = 0x600 + SLAVE_NODE_ID # SDO 请求 ID 规则:0x600 + 从节点ID # SDO 读取请求数据(格式:0x40 + 索引高字节 + 索引低字节 + 子索引 + 保留) sdo_request_data = b'\x40\x08\x10\x00\x00\x00\x00\x00' # 读取 0x1008:0x00 sdo_msg = can.Message( arbitration_id=sdo_request_id, data=sdo_request_data, is_extended_id=False, channel=VIRTUAL_CHANNEL ) network.bus.send(sdo_msg) print(f"📤 发送 SDO 请求:ID=0x{sdo_request_id:X},数据={sdo_request_data.hex()}") # 4. 监听是否有从节点响应(可选) print("\n===== 监听从节点响应(5秒)=====") start_time = time.time() while time.time() - start_time < 5: resp_msg = network.bus.recv(timeout=0.5) if resp_msg: print(f"📥 收到响应:ID=0x{resp_msg.arbitration_id:X},数据={resp_msg.data.hex()}") network.disconnect() if __name__ == "__main__": start_canopen_sender()

2.接收

# canopen_receiver.py - CANopen 接收端(从节点) import canopen import can import time import threading class CANopenReceiver: """CANopen 接收端,监听并响应虚拟总线消息""" def __init__(self): # 统一的虚拟总线配置(与 bus_creator.py 一致) self.VIRTUAL_CHANNEL = "virtual_canopen_bus" self.BITRATE = 500000 self.SLAVE_NODE_ID = 2 # 从节点 ID self.network: canopen.Network = None self.running = False def start(self): """启动接收端,连接虚拟总线并监听消息""" try: # 1. 连接虚拟总线 self.network = canopen.Network() self.network.connect( interface="virtual", channel=self.VIRTUAL_CHANNEL, bitrate=self.BITRATE ) print(f"✅ CANopen 接收端已连接虚拟总线:{self.VIRTUAL_CHANNEL}") self.running = True # 2. 启动监听线程 listen_thread = threading.Thread(target=self._listen_loop, daemon=True) listen_thread.start() print(f"ℹ️ 开始监听虚拟总线消息(节点 ID={self.SLAVE_NODE_ID}),按 Ctrl+C 退出") # 3. 保持脚本运行 while self.running: time.sleep(1) except KeyboardInterrupt: print("\n⚠️ 接收到退出信号,停止监听...") except Exception as e: print(f"❌ 接收端启动失败:{e}") finally: self.running = False self.network.disconnect() print("✅ 接收端已断开虚拟总线连接") def _listen_loop(self): """循环监听虚拟总线消息,并响应 SDO 请求""" while self.running: msg = self.network.bus.recv(timeout=0.1) # 非阻塞接收 if msg: self._process_message(msg) def _process_message(self, msg: can.Message): """解析并处理收到的 CANopen 消息""" arb_id = msg.arbitration_id data = msg.data.hex() print(f"\n📥 收到消息:ID=0x{arb_id:X},数据={data}") # 识别并响应 SDO 请求(0x600 + 从节点ID) if arb_id == 0x600 + self.SLAVE_NODE_ID: print(" → 收到 SDO 读取请求,准备响应...") self._respond_sdo_request(msg) # 识别心跳报文(0x700 + 节点ID) elif 0x700 <= arb_id < 0x800: node_id = arb_id - 0x700 state = msg.data[0] if len(msg.data) > 0 else 0 state_map = {0: "初始化", 5: "运行", 127: "故障"} state_desc = state_map.get(state, f"未知({state})") print(f" → 心跳报文:节点{node_id} 状态={state_desc}") def _respond_sdo_request(self, req_msg: can.Message): """响应 SDO 读取请求(模拟从节点返回设备名称)""" # SDO 响应 ID 规则:0x580 + 从节点ID sdo_resp_id = 0x580 + self.SLAVE_NODE_ID # 模拟返回设备名称 "VirtualCAN"(截断为 4 字节示例) resp_data = b'\x43\x08\x10\x00\x08\x56\x69\x72' # 响应格式 + "Vir..." resp_msg = can.Message( arbitration_id=sdo_resp_id, data=resp_data, is_extended_id=False, channel=self.VIRTUAL_CHANNEL ) self.network.bus.send(resp_msg) print(f"📤 发送 SDO 响应:ID=0x{sdo_resp_id:X},数据={resp_data.hex()}") if __name__ == "__main__": receiver = CANopenReceiver() receiver.start()

3.代码解释

3.1 interface="virtual", -- CAN 总线的通信接口类型

其他类型

  • Windows:interface='pcan'(PCAN 硬件)、interface='kvaser'(Kvaser 硬件);
  • 虚拟总线(跨平台):interface='virtual'(python-can 内置虚拟总线)。
  • interface = ‘socketcan'Linux 系统下 Python 操作 CAN 总线的专属接口类型,依赖 Linux 的 SocketCAN 协议栈,需配合 Linux 的 CAN 接口(物理 / 虚拟)使用
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/9 0:33:51

## 基于MATLAB的五次谐波滤波器系统设计(源码+万字报告+讲解)(支持资料、图片参考_相关定制)

基于MATLAB的五次谐波滤波器系统设计 带通滤波器通过其独特的电路设计&#xff0c;能够精确地控制信号的传输范围。它利用电感和电容的组合&#xff0c;形成一个特定的频率响应曲线&#xff0c;使得只有在特定频率范围内的信号能够顺利通过&#xff0c;而其他频率的信号则被大幅…

作者头像 李华
网站建设 2026/6/9 1:31:50

企业AI Agent的伦理设计与道德决策框架

企业AI Agent的伦理设计与道德决策框架关键词&#xff1a;企业AI Agent、伦理设计、道德决策框架、人工智能伦理、企业应用摘要&#xff1a;本文聚焦于企业AI Agent的伦理设计与道德决策框架。随着人工智能在企业领域的广泛应用&#xff0c;AI Agent的伦理问题日益凸显。文章首…

作者头像 李华
网站建设 2026/6/9 1:44:33

2026年最好用的降AI率工具Top5:学长学姐都在用

“用降AI率工具的话&#xff0c;哪个比较好&#xff1f;” 这个问题我被问了不下十遍。作为一个帮过无数学弟学妹处理论文的"老学长"&#xff0c;今天就来分享一下2026降AI工具的使用心得&#xff0c;都是我和周围学长学姐们亲测过的。 为什么学长学姐的推荐更靠谱&…

作者头像 李华
网站建设 2026/6/9 18:42:16

学霸同款10个AI论文平台,助你轻松搞定研究生论文!

学霸同款10个AI论文平台&#xff0c;助你轻松搞定研究生论文&#xff01; AI 工具助你轻松应对论文写作难题 在研究生阶段&#xff0c;论文写作往往成为最让人头疼的环节。无论是选题、文献综述&#xff0c;还是撰写初稿、修改润色&#xff0c;每一个步骤都可能耗费大量时间和精…

作者头像 李华
网站建设 2026/6/9 20:09:15

2026必备!8个AI论文写作软件,助你轻松搞定本科生毕业论文!

2026必备&#xff01;8个AI论文写作软件&#xff0c;助你轻松搞定本科生毕业论文&#xff01; 论文写作的“神助攻”来了&#xff0c;AI 工具让学术之路更轻松 随着人工智能技术的不断进步&#xff0c;越来越多的本科生开始借助 AI 工具来提升论文写作效率。尤其是在当前 AIGC&…

作者头像 李华