上周我打通了Milk-V Duo S和机器人的串口控制的脉络,同时制作了两个动作组:下蹲、自由飞翔。
| 阶段 | 状态 |
|---|---|
| 环境搭建 | ✅ 完成 |
| 网络配置 | ✅ 完成 |
| 串口控制 | ✅ 完成 |
| 动作调度 | ✅ 完成 |
| 序列执行控制 | ✅ 完成 |
| 时间控制 | ✅ 完成 |
| 部分动作组制作 | ✅ 完成 |
| 线程执行锁 | ✅ 完成 |
| 云端接入 | ⏳ 下一步 |
| 复杂动作组制作 | ⏳ 下一步 |
| … | ⏳ 下一步 |
本周我将进行Milk-V Duo S开发板与云端服务器进行通信模块的开发,接入云端,以从云端接收数据,从而进一步实现后端执行层与AI分析层之间的连接 。
云端接入
经过我的调研,找到了两种方案如下:
第一种通过MQTT建立与云端的通信
让机器人从“本地脚本控制”跃迁到“云端实时指挥”,核心就是在 Milk‑V Duo S 上架起一座 MQTT 消息桥。我本周开发的通信模块mqtt_control.py扮演了云端指令的接收者和执行调度器。我们使用paho.mqtt.client连接到 MQTT 服务器,订阅robot/control主题,一旦收到 JSON 格式的动作序列(如{"actions": [1, 7, 6, 2, 0]}),就立刻调用之前已经调试成熟的run_sequence函数,驱动真实的舵机表演。
提示词
请生成一个 Python 脚本,文件保存为 mqtt_control.py,用于在 Milk-V Duo S 上通过 MQTT 接收云端指令,并控制机器人执行动作序列。要求如下: - 导入 paho.mqtt.client、json、threading 模块。 - 从 seq_control 模块导入 RobotController、run_sequence、ACTION_DURATION。 串口默认 "/dev/ttyS0",波特率 115200。 - 定义一个全局锁,用于保护串口通信。 - 定义 connect 回调函数 - 定义 message 回调函数 - 将接收到的消息载荷用 json.loads 解析为字典 data。 - 打印 "收到云端指令:" 和 data。 - 否则,使用 threading.Thread 启动一个新线程,目标函数为 run_sequence - 创建 MQTT 客户端对象将 connect 和 message 分别赋值给 client.on_connect 和 client.on_message。 - 设置云服务器地址: SERVER = "47.93.27.196" PORT = 1883 - 打印 "等待云端指令...",然后调用 client.loop_forever() 进入消息循环。 请输出完整的 Python 代码。LLM产出code
import paho.mqtt.client as mqtt import json import threading from seq_control import RobotController, run_sequence, ACTION_DURATION # ---------- 机器人控制器初始化 ---------- robot = RobotController(port="/dev/ttyS0", baudrate=115200) # ---------- 锁,保护串口通信 ---------- lock = threading.Lock() def on_connect(client, userdata, flags, rc): print(f"MQTT 连接成功,状态码: {rc}") client.subscribe("robot/control") def on_message(client, userdata, msg): try: data = json.loads(msg.payload.decode()) print("收到云端指令:", data) sequence = data.get("actions", []) if not sequence: print("指令中无动作序列,忽略") return # 在新线程中执行动作序列,避免阻塞 MQTT 消息接收 threading.Thread( target=run_sequence, args=(robot, sequence), daemon=True ).start() except Exception as e: print("解析或执行错误:", e) # ---------- MQTT 客户端配置 ---------- client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message # 你的云服务器地址(MQTT 默认端口 1883) SERVER = "47.93.27.196" PORT = 1883 client.connect(SERVER, PORT, 60) print("等待云端指令...") client.loop_forever()这里最关键的升级是多线程执行。因为动作组里有长达 72 秒的舞蹈,如果直接在on_message回调里运行,机器人会在这段时间内完全“失聪”,无法响应任何新指令。我们通过threading.Thread将动作序列放到后台线程,并设置为 Daemon 模式,这样主 MQTT 循环不会被阻塞,随时可以接收新的云端命令,甚至能够被更新、中断或重新编排。配合seq_control.py中已有的threading.Lock,两个线程对串口的访问也得到了保护,不会出现指令交错。至此,机器人不再只是一段预设脚本的播放器,而是一个可以实时接收、解析、执行云端动作指令的智能终端,为后续接入 NLP 情绪分析、远程控制等场景铺平了道路。
第二种通过HTTP轮询建立与云端的通信
提示词
生成一个 HTTP 轮询机器人控制脚本 poll_control.py, 从 seq_control 导入 RobotController 和 run_sequence。 服务器地址 http://47.93.27.196:8765,客户端 ID 为 milk_duos_001, 每 2 秒轮询一次 /poll/<id> 获取动作序列, 有新的 command_id 时执行 run_sequence, 并向 /ack/<id> 发送确认。 包含异常处理和 Ctrl+C 停止。 输出完整Python代码.LLM产出code
import requests import time from robot_control import RobotController, run_sequence # ========= 配置 ========= SERVER = "http://47.93.27.196:8765" CLIENT_ID = "milk_duos_001" POLL_INTERVAL = 2 robot = RobotController() def poll_action(): url = f"{SERVER}/poll/{CLIENT_ID}" try: r = requests.get(url, timeout=5) return r.json() except Exception as e: print("轮询失败:", e) return None def ack_action(command_id): url = f"{SERVER}/ack/{CLIENT_ID}" try: requests.post(url, json={"command_id": command_id}) print("已确认:", command_id) except Exception as e: print("确认失败:", e) def main(): print("🚀 Duo S 已连接云端,开始轮询...") last_id = None while True: result = poll_action() if result: # 有动作 if result.get("action_sequence"): cmd_id = result.get("command_id") # 防止重复执行 if cmd_id != last_id: sequence = result["action_sequence"] print("收到动作:", sequence) # ⭐ 核心:执行真实机器人动作 run_sequence(robot, sequence) # ⭐ 执行完告诉云端 ack_action(cmd_id) last_id = cmd_id elif result.get("status") == "no_action": pass else: print("未知响应:", result) time.sleep(POLL_INTERVAL) if __name__ == "__main__": try: main() except KeyboardInterrupt: robot.stop() robot.close()采用HTTP 轮询的方式让机器人从云端获取动作指令。Doo S 周期性地向服务器发起 GET 请求,查询当前是否有待执行的动作序列;如果返回了新的command_id,就调用已有的run_sequence执行真实机器人动作,执行完毕再通过 POST 请求向服务器确认已完成。这种方式实现简单,无需额外的中间件,仅靠 HTTP 就能在有限资源下跑通。但它本质上是一种“拉”模式:机器人每隔 2 秒主动查一次,动作的响应会有最多 2 秒的延迟,不适合需要即时反应的交互场景。不过在 Duo S 这种嵌入式板卡上,该方案不依赖持久的连接,对网络波动容忍度更高,且容易调试和扩展,很适合初期原型验证。
| 对比 | MQTT | HTTP 轮询 |
|---|---|---|
| 消息模式 | “推” 模式,云端有新指令立刻推送给机器人 | “拉” 模式,由机器人定时主动查询云端指令 |
| 实时性 | 实时性高,指令几乎无延迟 | 存在固定查询间隔(如 2 秒),响应不够及时 |
| 连接依赖 | 需要维持长连接,对网络稳定性要求高,断网需重连 | 每次请求独立、无状态,网络适应性强 |
| 资源开销 | 长连接省流量,但客户端需维护心跳 | 频繁发起请求,即便无新指令也会持续消耗带宽和 CPU |
| 适用场景 | 交互频繁、要求低延迟的云控场景 | 指令稀疏、开发早期快速验证场景 |
根据测试情况外加与组长和组员的交流,我们在测试过程中发现,HTTP轮询的查询间隔(2s)延迟并不明显,于是决定先使用HTTP轮询的方式快速进行项目原型验证。
云端&Duo S&机器人端联调
1.确认云服务器状态正常
curl http://47.93.27.196:8765/health{"status":"ok"}表明云服务器状态正常,且Duo S已经成功连上云服务器
2.Duo S在终端运行同时云端发送情绪
curl -X POST http://47.93.27.196:8765/emotion \ -H "Content-Type: application/json" \ -d "{\"content\":\"我人工智能导论只考了95分差一分就能拿到A+了,现在没有达成目标感觉好难过\"}"刚开始我们是失败状态的,
后来发现犯了一个低级错误,Duo S wifi配置自动执行文件的默认账号密码之前在借给队友时候修改过了,然后现在还没修改为我的热点。
到目录/etc/wpa_supplicant.conf文件 修改完成之后,才实现HTTP正常连接,
同时云服务器再次发送emotion
演示视频2