手把手教你用Python模拟IEC 60870-5-101规约主站与子站通信(附完整代码)
在电力自动化系统中,IEC 60870-5-101规约作为基础通信协议,承担着主站与子站间数据交互的重要职责。对于开发者而言,理解协议规范只是第一步,更重要的是能够将其转化为可执行的代码实现。本文将带你从零开始,用Python构建完整的IEC 101通信模拟环境,涵盖帧构造、链路管理、数据交互等核心环节。
1. 环境准备与基础框架搭建
1.1 协议栈依赖安装
实现IEC 101规约需要以下Python库支持:
pip install pyserial crcmod- pyserial:处理串口通信(RS232/RS485)
- crcmod:计算帧校验和(CS)
1.2 通信基础类设计
首先定义协议常量与基础类:
class IEC101Constants: # 帧格式标识 FIXED_FRAME_START = 0x10 VARIABLE_FRAME_START = 0x68 FRAME_END = 0x16 # 控制域功能码 FC_RESET_LINK = 0x00 FC_REQUEST_LINK_STATUS = 0x09 FC_TOTAL_CALL = 0x03 class IEC101Frame: def __init__(self, control, address, data=None): self.control = control self.address = address self.data = data or b''2. 帧格式解析与构造
2.1 固定帧长格式实现
固定帧长格式用于控制命令传输,结构如下:
| 字段 | 长度 | 说明 |
|---|---|---|
| 启动字符 | 1 | 0x10 |
| 控制域 | 1 | 通信控制信息 |
| 链路地址 | 1-2 | 子站地址 |
| 校验和 | 1 | 算术和(256模) |
| 结束字符 | 1 | 0x16 |
Python实现代码:
def build_fixed_frame(control, address): frame = bytearray() frame.append(IEC101Constants.FIXED_FRAME_START) frame.append(control) # 处理1或2字节地址 if isinstance(address, int): frame.append(address & 0xFF) if address > 0xFF: frame.append((address >> 8) & 0xFF) # 计算校验和 checksum = sum(frame[1:]) % 256 frame.append(checksum) frame.append(IEC101Constants.FRAME_END) return bytes(frame)2.2 可变帧长格式实现
可变帧长用于数据传输,典型结构示例:
def build_variable_frame(control, address, data): length = 2 + len(data) # 控制域+地址域+数据 frame = bytearray() frame.append(IEC101Constants.VARIABLE_FRAME_START) frame.append(length) frame.append(length) # 重复长度 frame.append(IEC101Constants.VARIABLE_FRAME_START) frame.append(control) # 地址处理 if isinstance(address, int): frame.append(address & 0xFF) if address > 0xFF: frame.append((address >> 8) & 0xFF) # 添加数据 frame.extend(data) # 校验和计算 checksum = sum(frame[4:]) % 256 frame.append(checksum) frame.append(IEC101Constants.FRAME_END) return bytes(frame)3. 链路管理实现
3.1 链路建立流程
标准链路建立包含三个阶段:
- 链路测试:主站发送请求链路状态帧(C_RQ_NA_1)
- 链路复位:主站发送复位远方链路帧(C_RL_NA_1)
- 确认响应:子站返回确认帧(M_RL_NA_1)
关键代码实现:
def establish_link(serial_port, station_address): # 步骤1:请求链路状态 request_frame = build_fixed_frame( control=0x49, # PRM=1, FCB=0, FCV=1, FC=9 address=station_address ) serial_port.write(request_frame) # 等待响应(超时3秒) response = wait_for_response(serial_port, timeout=3) if not validate_frame(response): raise IEC101Error("链路状态请求失败") # 步骤2:复位链路 reset_frame = build_fixed_frame( control=0x40, # PRM=1, FCB=0, FCV=0, FC=0 address=station_address ) serial_port.write(reset_frame) # 验证复位确认 ack_frame = wait_for_response(serial_port) if ack_frame[1] != 0x80: # 确认帧控制域应为0x80 raise IEC101Error("链路复位失败")3.2 超时与重传机制
工业通信必须考虑传输可靠性:
class IEC101LinkLayer: def __init__(self, serial_port): self.port = serial_port self.retry_count = 0 self.max_retries = 3 def send_with_retry(self, frame, expect_ack=True): while self.retry_count < self.max_retries: self.port.write(frame) try: response = self._wait_for_ack(timeout=2) self.retry_count = 0 return response except TimeoutError: self.retry_count += 1 raise IEC101Error("超过最大重试次数") def _wait_for_ack(self, timeout): start_time = time.time() while time.time() - start_time < timeout: if self.port.in_waiting: return self._read_frame() raise TimeoutError4. 数据交互实战
4.1 总召唤(总召)实现
总召唤流程代码示例:
def execute_total_call(link_layer, station_addr): # 构建总召唤命令帧 call_frame = build_variable_frame( control=0x73, # PRM=1, FCB=1, FCV=1, FC=3 address=station_addr, data=bytes([ 0x64, # 类型标识100 0x01, # 结构限定词 0x06, # 激活原因 0x00, # 原因高位 station_addr & 0xFF, (station_addr >> 8) & 0xFF if station_addr > 0xFF else 0x00, 0x00, 0x00, # 信息体地址 0x14 # 总召唤限定词20 ]) ) # 发送并等待确认 ack = link_layer.send_with_retry(call_frame) if ack[8] != 0x07: # 检查确认原因 raise IEC101Error("总召唤激活失败") # 处理数据响应(示例仅显示遥测帧) while True: response = link_layer.receive_frame() if response[7] == 0x15: # 遥测类型标识 process_measurement(response) elif response[7] == 0x64 and response[8] == 0x0A: # 结束帧 break4.2 遥测数据处理
解析遥测数据帧的典型实现:
def process_measurement(frame): num_points = frame[9] & 0x7F # 获取遥测数量 start_addr = frame[14] | (frame[15] << 8) values = [] for i in range(num_points): offset = 16 + i*2 value = frame[offset] | (frame[offset+1] << 8) values.append({ 'address': start_addr + i, 'value': value / 32767.0 # 归一化处理 }) return { 'type': 'measurement', 'cause': frame[10], 'values': values }5. 完整通信流程示例
5.1 主站工作流程
def master_workflow(port, station_addr): link = IEC101LinkLayer(port) try: # 1. 建立链路 establish_link(link, station_addr) # 2. 时间同步 sync_time(link, station_addr) # 3. 执行总召唤 execute_total_call(link, station_addr) # 4. 循环召唤1级数据 while True: call_class1_data(link, station_addr) time.sleep(1) except IEC101Error as e: print(f"通信错误: {e}") finally: port.close()5.2 子站模拟实现
子站需要实现的状态机核心逻辑:
class IEC101Slave: STATES = ['IDLE', 'LINK_ACTIVE', 'DATA_TRANSFER'] def __init__(self, address): self.state = 'IDLE' self.address = address self.fcb = 0 def handle_frame(self, frame): if frame[0] == 0x10: # 固定帧 return self._handle_control_frame(frame) elif frame[0] == 0x68: # 可变帧 return self._handle_data_frame(frame) def _handle_control_frame(self, frame): control = frame[1] if (control & 0x40) and not (control & 0x80): # 主站命令 if (control & 0x0F) == 0x09: # 请求链路状态 return self._build_link_status_response() elif (control & 0x0F) == 0x00: # 复位链路 self.state = 'LINK_ACTIVE' return self._build_reset_confirm()6. 调试与故障排查
6.1 常见问题解决
- 校验和错误:确保所有参与计算的字节包括控制域、地址域和数据区
- 超时无响应:检查物理连接和波特率设置(默认9600, 偶校验)
- 帧格式错误:使用十六进制调试工具对比标准帧结构
6.2 调试工具推荐
def hex_dump(frame): return ' '.join(f'{b:02X}' for b in frame) # 示例输出:10 49 01 4A 16实际项目中,建议结合Wireshark的串口插件或专业的规约分析仪进行报文抓取分析。