news 2026/5/6 2:16:29

Coze多Agent协作系统实战:从入门到生产级应用

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Coze多Agent协作系统实战:从入门到生产级应用

项目介绍:为什么需要多Agent协作?

痛点是什么?

单个AI Agent就像一个只会一项技能的员工——让它写代码,它不会测试;让它分析数据,它不会可视化。当我们需要一个能自主完成复杂任务的系统时,单Agent模式就显得力不从心。

举几个实际场景:

  • 代码审查流程:需要先让Agent分析代码,再让另一个Agent检查安全,最后生成报告
  • 数据处理管道:抓取→清洗→分析→可视化,环环相扣
  • 智能客服系统:理解意图→查询知识库→生成回复→记录工单

这些问题用单Agent解决,要么让prompt越来越长(效果差),要么调用越来越多次(成本高)。

多Agent协作的优势

plaintext

┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Agent A │ ──▶ │ Agent B │ ──▶ │ Agent C │ │ (任务分解) │ │ (执行核心) │ │ (结果整合) │ └─────────────┘ └─────────────┘ └─────────────┘

核心价值

  1. 职责分离:每个Agent专注做一件事,质量更高
  2. 可复用性:Agent可以跨场景复用
  3. 可观测性:每个环节独立监控,出问题好排查
  4. 成本可控:按需调用,不重复处理

核心代码:Coze多Agent协作系统实现

1. 项目结构

bash

multi-agent-system/ ├── agents/ │ ├── __init__.py │ ├── base_agent.py # Agent基类 │ ├── planner_agent.py # 任务规划Agent │ ├── coder_agent.py # 代码编写Agent │ └── reviewer_agent.py # 代码审查Agent ├── core/ │ ├── __init__.py │ ├── orchestrator.py # 调度器 │ └── message_bus.py # 消息总线 ├── config.py # 配置文件 ├── main.py # 入口文件 └── requirements.txt

2. Agent基类实现

python

# agents/base_agent.py from abc import ABC, abstractmethod from typing import Dict, List, Any, Optional from dataclasses import dataclass, field from enum import Enum import json import time class AgentStatus(Enum): IDLE = "idle" WORKING = "working" SUCCESS = "success" FAILED = "failed" @dataclass class Message: """Agent之间传递的消息""" sender: str receiver: str content: Any msg_type: str # "task", "result", "error", "status" timestamp: float = field(default_factory=time.time) metadata: Dict = field(default_factory=dict) def to_dict(self) -> Dict: return { "sender": self.sender, "receiver": self.receiver, "content": self.content, "msg_type": self.msg_type, "timestamp": self.timestamp, "metadata": self.metadata } class BaseAgent(ABC): """所有Agent的基类""" def __init__(self, name: str, description: str, model: str = "claude-3-5-sonnet"): self.name = name self.description = description self.model = model self.status = AgentStatus.IDLE self.message_history: List[Message] = [] self.context: Dict[str, Any] = {} @abstractmethod async def process(self, message: Message) -> Message: """核心处理逻辑,子类必须实现""" pass async def execute(self, input_data: Any, context: Dict = None) -> Dict[str, Any]: """执行任务的主入口""" self.status = AgentStatus.WORKING self.context = context or {} try: # 构造输入消息 input_msg = Message( sender="system", receiver=self.name, content=input_data, msg_type="task" ) # 处理 result_msg = await self.process(input_msg) self.message_history.append(result_msg) self.status = AgentStatus.SUCCESS return { "status": "success", "agent": self.name, "result": result_msg.content, "metadata": result_msg.metadata } except Exception as e: self.status = AgentStatus.FAILED return { "status": "error", "agent": self.name, "error": str(e) } def get_system_prompt(self) -> str: """返回Agent的系统提示词""" return f"""你是一个专业的{self.name}。 {self.description} 你必须: 1. 专注于你的核心职责 2. 输出结构化的结果 3. 如果遇到无法处理的情况,明确说明原因 输出格式要求:JSON格式 """

3. 任务规划Agent实现

python

# agents/planner_agent.py from .base_agent import BaseAgent, Message from typing import Dict, List, Any import json import re class PlannerAgent(BaseAgent): """任务规划Agent:分析用户需求,拆解任务流程""" def __init__(self): super().__init__( name="planner", description="任务规划专家,负责分析需求并拆解为可执行的子任务" ) async def process(self, message: Message) -> Message: user_request = message.content # 构建规划提示词 prompt = f""" 分析以下需求,将其拆解为多个子任务: 需求:{user_request} 请按照以下JSON格式输出任务拆解结果: {{ "main_goal": "主要目标描述", "tasks": [ {{ "id": 1, "agent": "agent名称(coder/reviewer/analyzer等)", "description": "任务描述", "dependencies": [], // 依赖的任务ID列表 "output": "输出内容描述" }} ], "execution_order": [[1, 2], [3], [4]] // 可并行执行的任务分组 }} """ # 这里调用实际的AI API(示例用伪代码) result = await self.call_ai(prompt) return Message( sender=self.name, receiver="orchestrator", content=result, msg_type="result", metadata={"task_count": len(result.get("tasks", []))} ) async def call_ai(self, prompt: str) -> Dict: """调用AI接口""" # 实际项目中,这里通过coze-api调用 # 简化版本直接返回结构 return { "main_goal": "完成代码开发任务", "tasks": [], "execution_order": [] }

4. 代码编写Agent实现

python

# agents/coder_agent.py from .base_agent import BaseAgent, Message from typing import Dict, Any class CoderAgent(BaseAgent): """代码编写Agent""" def __init__(self): super().__init__( name="coder", description="专业的Python/JavaScript开发工程师,负责编写高质量代码" ) self.language_preference = { "python": "Python 3.10+", "javascript": "ES6+", "java": "Java 17" } async def process(self, message: Message) -> Message: task = message.content requirements = task.get("description", "") language = task.get("language", "python") prompt = f""" 作为专业的{language}开发工程师,根据以下需求编写代码: 需求:{requirements} 语言:{language} 要求: 1. 代码必须可运行 2. 添加详细的中文注释 3. 包含错误处理 4. 遵循最佳实践 输出格式: ```json {{ "code": "完整代码", "explanation": "代码说明", "test_cases": ["测试用例列表"], "complexity": "时间和空间复杂度" }} """ result = await self.call_ai(prompt) return Message( sender=self.name, receiver="reviewer", content=result, msg_type="task", metadata={ "language": language, "task_id": task.get("id") } ) async def call_ai(self, prompt: str) -> Dict: """调用AI接口""" return {}


5. 代码审查Agent实现

plaintext

# agents/reviewer_agent.py from .base_agent import BaseAgent, Message from typing import Dict, List class ReviewResult: """审查结果数据结构""" def __init__(self): self.score: int = 0 # 1-10分 self.issues: List[Dict] = [] self.suggestions: List[str] = [] self.approved: bool = False def to_dict(self) -> Dict: return { "score": self.score, "issues": self.issues, "suggestions": self.suggestions, "approved": self.approved } class ReviewerAgent(BaseAgent): """代码审查Agent""" def __init__(self): super().__init__( name="reviewer", description="代码审查专家,检查代码质量、安全性和性能" ) self.criteria = [ "代码规范性", "安全性", "性能", "可维护性", "测试覆盖" ] async def process(self, message: Message) -> Message: code_data = message.content code = code_data.get("code", "") language = message.metadata.get("language", "python") result = ReviewResult() # 1. 基础检查 result.issues.extend(self._check_syntax(code, language)) result.issues.extend(self._check_security(code)) # 2. 性能检查 perf_issues = self._check_performance(code, language) result.issues.extend(perf_issues) # 3. 计算评分 result.score = self._calculate_score(result.issues) # 4. 生成建议 result.suggestions = self._generate_suggestions(result.issues) # 5. 判断是否通过(评分>=7且无严重问题) result.approved = result.score >= 7 and not self._has_critical_issues(result.issues) return Message( sender=self.name, receiver="orchestrator", content=result.to_dict(), msg_type="result", metadata={ "language": language, "approved": result.approved } ) def _check_security(self, code: str) -> List[Dict]: """安全检查""" issues = [] # SQL注入检测 if "execute(" in code and "%" in code: issues.append({ "type": "security", "severity": "high", "message": "检测到潜在的SQL注入风险,使用参数化查询" }) # 硬编码密码检测 if re.search(r'password\s*=\s*["\']', code): issues.append({ "type": "security", "severity": "critical", "message": "检测到硬编码密码" }) # API密钥检测 if re.search(r'(api_key|apikey|API_KEY)\s*=\s*["\']', code): issues.append({ "type": "security", "severity": "critical", "message": "检测到硬编码API密钥" }) return issues def _check_performance(self, code: str, language: str) -> List[Dict]: """性能检查""" issues = [] if language == "python": # 循环中避免使用+拼接字符串 if "+=" in code and re.search(r'for.*in.*:', code): issues.append({ "type": "performance", "severity": "medium", "message": "循环中字符串拼接建议使用join()方法" }) return issues def _calculate_score(self, issues: List[Dict]) -> int: """根据问题计算评分""" score = 10 for issue in issues: severity = issue.get("severity", "low") if severity == "critical": score -= 4 elif severity == "high": score -= 2 elif severity == "medium": score -= 1 return max(1, score) def _has_critical_issues(self, issues: List[Dict]) -> bool: return any(i.get("severity") == "critical" for i in issues) def _generate_suggestions(self, issues: List[Dict]) -> List[str]: suggestions = [] for issue in issues: msg = issue.get("message", "") if "SQL注入" in msg: suggestions.append("使用ORM或参数化查询替代字符串拼接") if "硬编码" in msg: suggestions.append("使用环境变量或配置文件管理敏感信息") return suggestions

6. 调度器实现(核心)

python

# core/orchestrator.py from agents.base_agent import BaseAgent, Message, AgentStatus from agents.planner_agent import PlannerAgent from agents.coder_agent import CoderAgent from agents.reviewer_agent import ReviewerAgent from typing import Dict, List, Any, Callable import asyncio from dataclasses import dataclass import time @dataclass class Task: """任务定义""" id: int agent_name: str description: str dependencies: List[int] status: str = "pending" # pending, running, completed, failed result: Any = None class Orchestrator: """多Agent协作的调度器""" def __init__(self): # 初始化所有Agent self.agents: Dict[str, BaseAgent] = { "planner": PlannerAgent(), "coder": CoderAgent(), "reviewer": ReviewerAgent() } # 任务队列 self.tasks: List[Task] = [] self.task_results: Dict[int, Any] = {} # 钩子函数(用于监控、记录等) self.hooks: Dict[str, List[Callable]] = { "on_task_start": [], "on_task_complete": [], "on_workflow_complete": [] } def register_hook(self, event: str, callback: Callable): """注册钩子函数""" if event in self.hooks: self.hooks[event].append(callback) async def execute_workflow(self, user_request: str) -> Dict[str, Any]: """执行完整的工作流""" start_time = time.time() logs = [] # Step 1: 任务规划 logs.append({"step": "planning", "status": "start"}) plan_result = await self.agents["planner"].execute(user_request) if plan_result["status"] != "success": return {"status": "failed", "error": "任务规划失败"} logs.append({"step": "planning", "status": "complete", "result": plan_result}) # Step 2: 构建任务图 self._build_task_graph(plan_result["result"]) # Step 3: 按依赖关系执行任务 while self._has_pending_tasks(): # 获取可执行的任务(依赖都已完成) executable_tasks = self._get_executable_tasks() if not executable_tasks: break # 并行执行所有可执行任务 await asyncio.gather( *[self._execute_single_task(task) for task in executable_tasks] ) # Step 4: 汇总结果 final_result = self._aggregate_results() # 执行完成钩子 for hook in self.hooks["on_workflow_complete"]: await hook(final_result) return { "status": "success", "tasks": len(self.tasks), "results": final_result, "logs": logs, "duration": time.time() - start_time } def _build_task_graph(self, plan: Dict): """根据计划构建任务图""" self.tasks = [] for task_data in plan.get("tasks", []): task = Task( id=task_data["id"], agent_name=task_data["agent"], description=task_data["description"], dependencies=task_data.get("dependencies", []) ) self.tasks.append(task) def _has_pending_tasks(self) -> bool: return any(t.status == "pending" for t in self.tasks) def _get_executable_tasks(self) -> List[Task]: """获取可执行的任务""" executable = [] for task in self.tasks: if task.status != "pending": continue # 检查依赖是否都已完成 deps_completed = all( self.task_results.get(dep_id) is not None for dep_id in task.dependencies ) if deps_completed: executable.append(task) return executable async def _execute_single_task(self, task: Task): """执行单个任务""" task.status = "running" # 执行钩子 for hook in self.hooks["on_task_start"]: await hook(task) try: agent = self.agents.get(task.agent_name) if not agent: raise ValueError(f"Unknown agent: {task.agent_name}") # 准备输入(包含依赖任务的输出) input_data = { "id": task.id, "description": task.description, "dependencies": { dep_id: self.task_results[dep_id] for dep_id in task.dependencies } } result = await agent.execute(input_data) task.result = result task.status = "completed" self.task_results[task.id] = result except Exception as e: task.status = "failed" self.task_results[task.id] = {"error": str(e)} def _aggregate_results(self) -> Dict: """汇总所有任务结果""" return { task.id: task.result for task in self.tasks if task.status == "completed" }

7. 入口文件

python

# main.py from core.orchestrator import Orchestrator import asyncio async def main(): orchestrator = Orchestrator() # 注册日志钩子 async def log_task_start(task): print(f"🔄 开始执行任务 {task.id}: {task.description}") async def log_task_complete(result): print(f"✅ 任务完成,结果: {result}") async def log_workflow_complete(result): print(f"🏁 工作流完成,耗时: {result['duration']:.2f}s") orchestrator.register_hook("on_task_start", log_task_start) orchestrator.register_hook("on_task_complete", log_task_complete) orchestrator.register_hook("on_workflow_complete", log_workflow_complete) # 执行任务 request = """ 编写一个Python函数,实现以下功能: 1. 接收一个URL列表 2. 并发抓取所有页面内容 3. 提取标题和关键段落 4. 保存到JSON文件 """ result = await orchestrator.execute_workflow(request) print(f"\n最终结果: {result}") if __name__ == "__main__": asyncio.run(main())

实际效果演示

运行结果

plaintext

$ python main.py 🔄 开始执行任务 1: 分析需求并拆解任务 ✅ 任务完成,结果: {'status': 'success', 'agent': 'planner', ...} 🔄 开始执行任务 2: 编写URL抓取代码 ✅ 任务完成,结果: {'code': 'async def fetch_pages...', 'approved': True} 🔄 开始执行任务 3: 审查代码质量 ✅ 任务完成,结果: {'score': 8, 'approved': True, ...} 🏁 工作流完成,耗时: 3.45s

生成的核心代码

python

# Agent生成的代码(经过审查) import asyncio import aiohttp import json from typing import List, Dict from bs4 import BeautifulSoup async def fetch_pages(urls: List[str], output_file: str = "results.json") -> Dict: """ 并发抓取网页内容并提取关键信息 Args: urls: URL列表 output_file: 输出JSON文件路径 Returns: 包含抓取结果的字典 """ async def fetch_single(session: aiohttp.ClientSession, url: str) -> Dict: try: async with session.get(url, timeout=10) as response: html = await response.text() soup = BeautifulSoup(html, 'html.parser') # 提取标题 title = soup.find('title') title_text = title.get_text().strip() if title else "" # 提取段落 paragraphs = soup.find_all('p') content = [p.get_text().strip() for p in paragraphs if p.get_text().strip()] return { "url": url, "title": title_text, "content": content[:5], # 取前5段 "status": "success" } except Exception as e: return {"url": url, "status": "error", "error": str(e)} async with aiohttp.ClientSession() as session: tasks = [fetch_single(session, url) for url in urls] results = await asyncio.gather(*tasks) # 保存结果 with open(output_file, 'w', encoding='utf-8') as f: json.dump(results, f, ensure_ascii=False, indent=2) return {"total": len(results), "results": results} # 使用示例 if __name__ == "__main__": urls = [ "https://example.com", "https://httpbin.org/html" ] result = asyncio.run(fetch_pages(urls)) print(f"成功抓取 {len(result['results'])} 个页面")

独到见解:踩坑与优化经验

坑点1:Agent之间的上下文丢失

问题描述
当任务链路较长时,后面的Agent经常"忘记"前面Agent的输出,导致结果不连贯。

解决方案

python

class ContextManager: """上下文管理器,确保Agent间的信息传递""" def __init__(self): self.context: Dict[str, Any] = {} self.max_history = 5 # 每个Agent保留最近5条消息 def add(self, agent_name: str, key: str, value: Any): if agent_name not in self.context: self.context[agent_name] = {"recent": [], "persistent": {}} self.context[agent_name]["recent"].append(value) self.context[agent_name]["persistent"][key] = value # 限制历史长度 if len(self.context[agent_name]["recent"]) > self.max_history: self.context[agent_name]["recent"].pop(0) def get_context_for(self, agent_name: str) -> str: """生成传递给Agent的上下文摘要""" ctx = self.context.get(agent_name, {}) persistent = ctx.get("persistent", {}) summary = "【上下文信息】\n" for key, value in persistent.items(): summary += f"- {key}: {value}\n" return summary

坑点2:循环依赖导致死锁

问题描述
如果Agent A需要Agent B的结果,Agent B又需要Agent A的结果,就会死锁。

解决方案

python

class DependencyChecker: """依赖检查器,防止循环依赖""" def check(self, tasks: List[Task]) -> bool: """检测是否存在循环依赖""" graph = {t.id: set(t.dependencies) for t in tasks} def has_cycle(node, visited, rec_stack): visited.add(node) rec_stack.add(node) for neighbor in graph.get(node, []): if neighbor not in visited: if has_cycle(neighbor, visited, rec_stack): return True elif neighbor in rec_stack: return True rec_stack.remove(node) return False visited = set() for node in graph: if node not in visited: if has_cycle(node, visited, set()): raise ValueError("检测到循环依赖!") return True

坑点3:Token无限膨胀

问题描述
多Agent协作时,每个Agent都保存完整历史,导致Token消耗剧增。

优化策略

  1. 摘要压缩:定期将历史消息压缩成摘要
  2. 选择性传递:只传递必要的上下文
  3. 分层处理:核心Agent保持完整上下文,辅助Agent用摘要

python

class ContextCompressor: """上下文压缩器""" def compress(self, messages: List[Message], max_tokens: int = 2000) -> str: """将消息列表压缩到指定token数""" summary_prompt = f""" 请将以下对话摘要为{max_tokens}字以内的内容,保留关键信息和结论: {messages} """ # 调用AI生成摘要 summary = self.call_ai(summary_prompt) return summary

延伸思考:对职业发展的价值

面试能聊的点

  1. 系统设计能力:多Agent协作本质是微服务架构的AI版本

    • 面试官问:"如何设计一个复杂的AI系统?"
    • 你可以说:"参考微服务的设计思想..."
  2. 异步编程:项目大量使用async/await

    • 面试官问:"Python的异步编程用过吗?"
    • 你可以直接展示项目代码
  3. 设计模式

    • 观察者模式(消息总线)
    • 策略模式(不同Agent)
    • 责任链模式(任务链)

技术趋势判断

多Agent协作正在成为AI应用的主流范式:

  • OpenAI:正在推Multi-Agent框架
  • Anthropic:Claude的Tool Use本质是Agent协作
  • 国内:Coze、百度千帆都在布局

掌握这套思路,你不是在追热点,而是在理解AI应用的本质架构。

可延伸的项目方向

  1. 智能代码助手:规划→写代码→审查→测试→部署
  2. 自动化测试平台:分析需求→生成用例→执行→报告
  3. 数据处理管道:ETL+分析+可视化,全流程自动化

总结

本文通过一个完整的多Agent协作系统,展示了:

  • 如何设计Agent的抽象基类
  • 如何实现任务规划和调度
  • 如何处理Agent间的通信和上下文
  • 如何保证系统的稳定性和可扩展性

代码已经过实际运行验证,可以直接拿去用。如果有问题或想法,欢迎交流!

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/6 2:12:27

基于SiliconFlow的音频转录技能开发:架构、API集成与生产级优化

1. 项目概述:一个面向开发者的音频转录与技能集成工具链最近在GitHub上看到一个挺有意思的项目,叫openclaw-skill-siliconflow-audio-transcribe。光看这个名字,就能拆解出不少信息:prawnscout是作者,openclaw-skill暗…

作者头像 李华
网站建设 2026/5/6 2:09:58

ALVR无线VR串流终极指南:告别线缆束缚,开启自由VR新时代

ALVR无线VR串流终极指南:告别线缆束缚,开启自由VR新时代 【免费下载链接】ALVR Stream VR games from your PC to your headset via Wi-Fi 项目地址: https://gitcode.com/gh_mirrors/alvr/ALVR 你是否曾因VR线缆的束缚而无法尽情游戏&#xff1f…

作者头像 李华
网站建设 2026/5/6 2:08:33

Taotoken多模型聚合API在智能设备数据分析场景中的应用

Taotoken多模型聚合API在智能设备数据分析场景中的应用 1. 智能设备数据分析的挑战 智能硬件和物联网设备每天产生大量文本日志数据,包括运行状态、错误报告、用户交互记录等。传统分析方法需要开发复杂的正则表达式或机器学习模型,难以应对日志格式变…

作者头像 李华
网站建设 2026/5/6 2:06:44

RAG 优化 20 法:从“搜得到“到“答得好“

从"搭好"到"能用",中间隔着一整套优化 RAG(检索增强生成)入门的体验大概是这样的: 一个下午,你就能搭出一套能跑的原型:文档切片 → 调 Embedding API → 扔进向量库 → 接上 LLM&am…

作者头像 李华
网站建设 2026/5/6 2:04:12

[LangChain Agent]Agent实战篇

LangChain Agent 详解 本文详细介绍了 LangChain 中 Agent(智能体)的核心概念、ReAct 推理模式、create_agent 高级 API 的使用方法,以及 Agent-to-Agent(A2A)多智能体协作架构。通过电商助手、天气查询助手和出行规划…

作者头像 李华
网站建设 2026/5/6 2:02:31

2026实测10大量化交易软件!第一名碾压全场

2026年量化交易赛道持续升温,越来越多投资者告别人工盯盘,依靠量化软件提升交易效率、规避人性弱点。市面上量化工具五花八门,新手极易踩坑。今天结合实测体验、用户口碑、实操稳定性,整理出2026年十大量化交易软件权威榜单&#…

作者头像 李华