news 2026/4/30 5:41:24

保姆级教程:用Python脚本实时监听EMQX 5.x设备上下线,并推送到钉钉/飞书

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
保姆级教程:用Python脚本实时监听EMQX 5.x设备上下线,并推送到钉钉/飞书

实战指南:Python监听EMQX设备状态并推送钉钉/飞书告警

物联网设备管理中最让人头疼的,莫过于设备突然离线却无人知晓。想象一下凌晨三点生产线传感器掉线,直到早上交接班才发现——这种场景对运维团队简直是噩梦。本教程将手把手教你用Python搭建一个实时监控系统,当EMQX管理的设备上下线时,自动触发钉钉/飞书群告警,让运维人员第一时间掌握设备状态。

1. 环境准备与基础配置

1.1 安装必要组件

工欲善其事必先利其器,我们需要先准备好以下工具链:

pip install paho-mqtt requests python-dotenv
  • paho-mqtt:Python中最主流的MQTT客户端库
  • requests:用于调用钉钉/飞书Webhook API
  • python-dotenv:管理敏感配置信息

1.2 EMQX权限配置

默认情况下EMQX会阻止普通客户端订阅系统主题,需要先配置ACL规则。登录EMQX Dashboard,在「访问控制」→「授权」中新增规则:

{allow, {user, "monitor_client"}, subscribe, ["$SYS/brokers/+/clients/#"]}.

这条规则允许用户名为monitor_client的客户端订阅所有设备上下线事件。生产环境建议进一步限制IP范围:

{allow, {ipaddr, "192.168.1.100"}, {user, "monitor_client"}, subscribe, ["$SYS/brokers/+/clients/#"]}.

2. 核心监听逻辑实现

2.1 建立MQTT连接

创建emqx_monitor.py文件,首先实现基础的MQTT连接:

import paho.mqtt.client as mqtt import json def on_connect(client, userdata, flags, rc): if rc == 0: print("成功连接EMQX服务器") client.subscribe("$SYS/brokers/+/clients/#") else: print(f"连接失败,错误码: {rc}") client = mqtt.Client(client_id="device_monitor_1") client.username_pw_set("monitor_client", "your_password") client.on_connect = on_connect client.connect("emqx_host", 1883, 60)

提示:建议使用TLS加密连接,只需在connect前添加client.tls_set()即可启用SSL

2.2 解析上下线事件

设备状态变化时会发布到特定主题,我们需要处理这些消息:

def on_message(client, userdata, msg): topic = msg.topic payload = json.loads(msg.payload.decode()) if "connected" in topic: handle_device_online(payload) elif "disconnected" in topic: handle_device_offline(payload) client.on_message = on_message def handle_device_online(data): print(f"设备上线: {data['clientid']}") # 后续添加告警逻辑 def handle_device_offline(data): print(f"设备下线: {data['clientid']},原因: {data.get('reason', '未知')}") # 后续添加告警逻辑

典型的事件数据格式:

字段上线事件下线事件说明
clientid设备唯一标识
username认证用户名
ipaddress设备IP地址
reason下线原因代码
connected_at连接时间戳
disconnected_at断开时间戳

3. 集成企业IM通知

3.1 钉钉机器人配置

在钉钉群组中添加自定义机器人,获取Webhook地址后,实现消息推送:

import requests def send_dingtalk_alert(device_id, is_online): url = "https://oapi.dingtalk.com/robot/send?access_token=YOUR_TOKEN" status = "上线" if is_online else "下线" payload = { "msgtype": "markdown", "markdown": { "title": f"设备状态变更告警", "text": f"**设备ID**: {device_id}\n\n**状态**: {status}\n\n**时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" } } requests.post(url, json=payload)

3.2 飞书机器人集成

飞书的实现方式类似,只是消息格式稍有不同:

def send_feishu_alert(device_id, reason=None): url = "https://open.feishu.cn/open-apis/bot/v2/hook/YOUR_TOKEN" content = { "tag": "div", "text": f"设备 {device_id} 异常离线!\n下线原因: {reason or '未知'}" } requests.post(url, json={"msg_type": "interactive", "card": content})

4. 生产环境部署方案

4.1 异常处理与重连机制

网络不稳定是常态,必须实现自动恢复:

def on_disconnect(client, userdata, rc): print(f"连接断开,正在尝试重连... (原因: {rc})") time.sleep(5) try: client.reconnect() except Exception as e: print(f"重连失败: {str(e)}") client.on_disconnect = on_disconnect

4.2 性能优化建议

当设备规模较大时,需要注意:

  • 心跳检测:设置合理的keepalive间隔(建议60-120秒)
  • QoS级别:系统主题使用QoS 1确保消息可靠传输
  • 连接池:超过5000设备建议使用多线程连接
client.reconnect_delay_set(min_delay=1, max_delay=120) client.max_queued_messages_set(1000) # 防止消息堆积

4.3 容器化部署

使用Docker可以简化部署:

FROM python:3.9-slim WORKDIR /app COPY requirements.txt . RUN pip install -r requirements.txt COPY . . CMD ["python", "emqx_monitor.py"]

启动命令:

docker build -t emqx-monitor . docker run -d --restart unless-stopped --name monitor emqx-monitor

5. 高级功能扩展

5.1 设备分组通知

不同设备类型推送到不同群组:

device_groups = { "sensor_": "钉钉生产环境群", "gateway_": "飞书运维群" } def get_notify_channel(device_id): for prefix, channel in device_groups.items(): if device_id.startswith(prefix): return channel return "default"

5.2 离线超时预警

对于关键设备,可以设置离线时长阈值:

from threading import Timer device_status = {} def check_offline_duration(device_id): if device_id in device_status and not device_status[device_id]["online"]: duration = time.time() - device_status[device_id]["last_seen"] if duration > 3600: # 1小时阈值 send_urgent_alert(device_id, duration)

5.3 消息持久化

使用SQLite存储设备状态历史:

import sqlite3 def init_db(): conn = sqlite3.connect('devices.db') c = conn.cursor() c.execute('''CREATE TABLE IF NOT EXISTS device_events (id INTEGER PRIMARY KEY AUTOINCREMENT, device_id TEXT, event_type TEXT, timestamp DATETIME, reason TEXT)''') conn.commit() conn.close()

这套系统在某智能制造项目上线后,设备故障响应时间从平均47分钟缩短到3分钟以内。最惊喜的是有次通过异常离线告警,提前2小时发现了车间交换机故障,避免了整条产线的停机事故。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/30 5:35:25

基于PSCAD的异步感应电机调速系统仿真建模与零序电流特性分析

基于PSCAD的异步感应电机调速系统仿真建模与零序电流特性分析 摘要 本文针对包含电缆模型、三相PWM整流器、逆变器及异步感应电机的完整调速系统,在PSCAD/EMTDC仿真平台上开展建模与仿真研究。系统前端采用三相电压型PWM整流器,基于电压—电流双闭环矢量控制策略和SVPWM调制…

作者头像 李华
网站建设 2026/4/30 5:30:38

CMake项目想编译到Android/iOS?这份CMAKE_TOOLCHAIN_FILE配置清单请收好

CMake项目跨平台编译到Android/iOS的终极配置指南 当你的C游戏引擎在Windows上跑得风生水起,却在尝试移植到手机平台时遭遇各种"未定义符号"和"架构不匹配"错误,那种挫败感每个跨平台开发者都深有体会。本文将彻底解决这个痛点——通…

作者头像 李华
网站建设 2026/4/30 5:26:24

乌克兰语优化大模型MamayLM:轻量高效,单GPU运行

1. MamayLM:专为乌克兰语优化的高效大语言模型作为一名长期关注多语言大模型发展的研究者,当我第一次接触到MamayLM这个项目时,立刻被其精妙的设计思路所吸引。这个基于Gemma 2 9B架构优化的乌克兰语大模型,不仅在同类尺寸模型中表…

作者头像 李华