简介
本文详细介绍了如何使用LangGraph构建Fate Whisper智能命理分析系统,实现多专家(八字、手相、面相)并行分析工作流。文章涵盖状态定义、并行节点设计、流式输出处理和结果汇总等关键技术点,解决了并行执行、状态管理和实时反馈等挑战。通过实战案例展示了LangGraph在复杂工作流中的优势,包括声明式编程、状态机模型和灵活的事件系统,为开发者提供了宝贵的实践经验和踩坑指南。
前言
在之前的系列文章中,我们已经深入探讨了 LangGraph 的基础概念和核心特性。今天,我们将通过一个真实的项目案例——Fate Whisper 智能命理分析系统,来看看如何用 LangGraph 构建一个支持多专家并行分析的工作流。
这个项目的核心需求是:用户可以选择多个命理专家(八字、手相、面相),系统需要并行调用这些专家进行分析,最后生成一份综合报告。这正好是 LangGraph 擅长的场景——状态机工作流 + 并行节点执行。
一、项目需求分析
业务场景
Fate Whisper 是一个智能命理分析平台,用户可以:
- 选择一个或多个命理专家(八字、手相、面相)
- 根据专家要求填写或上传相应信息
- 系统并行调用所选专家进行分析
- 实时流式返回各专家的分析结果
- 最后生成一份综合命理分析报告
技术挑战
并行执行
:多个专家需要同时分析,不能串行等待
状态管理
:需要收集各专家的结果,并合并到最终状态
流式输出
:用户需要实时看到分析进度,不能等全部完成
结果汇聚
:多个专家的结果需要智能整合成综合报告
这些挑战,LangGraph 都能很好地解决。
二、核心架构设计
2.1 状态定义
首先,我们需要定义图的状态结构。在 LangGraph 中,状态使用TypedDict定义:
class FateGraphState(TypedDict): user_data: Dict[str, Any] # 用户输入数据 streaming_chunks: Annotated[List[Dict[str, Any]], merge_lists] # 流式输出块 expert_reports: Annotated[Dict[str, Any], merge_dicts] # 专家分析报告这里有几个关键点:
user_data:存储用户输入的数据,按专家ID组织
streaming_chunks:用于流式输出的数据块列表
expert_reports:各专家的分析结果
注意Annotated的使用,这是 LangGraph 处理并行节点状态更新的关键:
merge_lists:当多个节点同时更新
streaming_chunks时,使用列表合并merge_dicts:当多个节点同时更新
expert_reports时,使用字典合并
2.2 状态合并函数
并行节点执行时,多个节点可能同时更新同一个状态字段。LangGraph 需要知道如何合并这些更新:
def merge_dicts(left: Dict[str, Any], right: Dict[str, Any]) -> Dict[str, Any]: """合并两个字典,用于并行节点更新 expert_reports""" result = left.copy() result.update(right) return resultdef merge_lists(left: List[Dict[str, Any]], right: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """合并两个列表,用于并行节点更新 streaming_chunks""" return left + right这两个函数会在并行节点执行时自动调用,确保状态正确合并。
三、图构建:并行专家节点
3.1 图的拓扑结构
我们的工作流结构如下:
START ├─> 专家1节点 ─┐ ├─> 专家2节点 ─┤ ├─> 专家3节点 ─┼─> collect节点 ─> END └─> ... ┘所有专家节点从 START 并行开始,完成后都汇聚到collect节点,最后结束。
3.2 构建图的代码
def _build_graph(self) -> CompiledStateGraph: """构建并行专家分析图""" ifnotself.analysis_experts: raise ValueError("专家列表不能为空") workflow = StateGraph(FateGraphState) # 添加专家节点和汇聚节点 expert_node_names = [] for expert_config inself.analysis_experts: expert_id = expert_config.get("id") expert_node = self._create_expert_node_factory(expert_config) workflow.add_node(expert_id, expert_node) expert_node_names.append(expert_id) workflow.add_node("collect", self._collect_node) # 连接节点:START -> 各专家节点 -> collect -> END for node_name in expert_node_names: workflow.add_edge(START, node_name) workflow.add_edge(node_name, "collect") workflow.add_edge("collect", END) return workflow.compile(checkpointer=self.checkpointer)关键点:
动态节点创建
:根据选择的专家动态创建节点
并行连接
:所有专家节点都从 START 开始,实现并行执行
汇聚节点
:所有专家完成后,统一进入
collect节点
3.3 专家节点工厂
每个专家需要独立的节点函数,我们使用工厂模式创建:
def _create_expert_node_factory(self, expert_config: Dict[str, Any]): """创建专家节点工厂函数,绑定专家配置""" async def expert_node(state: FateGraphState) -> FateGraphState: return await self._create_expert_node(state, expert_config) return expert_node这样每个专家节点都有自己独立的配置,但共享相同的执行逻辑。
针对所有自学遇到困难的同学们,我帮大家系统梳理大模型学习脉络,将这份LLM大模型资料分享出来:包括LLM大模型书籍、640套大模型行业报告、LLM大模型学习视频、LLM大模型学习路线、开源大模型学习教程等, 😝有需要的小伙伴,可以扫描下方二维码领取🆓↓↓↓
![]()
四、专家节点实现
4.1 节点执行逻辑
专家节点的核心逻辑(伪代码):
async def_create_expert_node(state, expert_config): """专家节点函数,执行专家分析""" # 1. 从配置中提取专家信息和必填字段 expert_id = expert_config.get("id") expert_name = expert_config.get("name") required_fields = expert_config.get("required_fields", []) # 2. 构建消息列表 messages = [SystemMessage(content=expert_config.get("prompt"))] # 3. 从状态中提取该专家需要的数据,根据字段类型构建消息 expert_data = state.get("user_data").get(expert_id) for field in required_fields: field_type = field.get("field_type") field_value = expert_data.get(field.get("field_id")) if field_type == "datetime": # 日期类型:计算八字并添加到消息 messages.append(HumanMessage(content=calculate_bazi(field_value))) elif field_type == "image": # 图片类型:添加图片消息(base64编码) messages.append(HumanMessage(content=[{"type": "image", "image": f"data:image/jpeg;base64,{field_value}"}])) else: # 文本类型:直接添加文本消息 messages.append(HumanMessage(content=field_value)) # 4. 根据是否有图片字段选择模型 needs_vision = any(field.get("field_type") == "image"for field in required_fields) llm = vision_llm if needs_vision else fast_llm # 5. 调用 LLM 进行分析 response = await llm.ainvoke(messages) content = parse_response(response, needs_vision) # 6. 返回部分状态更新(只包含该节点的更新) return { "expert_reports": {expert_name: content}, "streaming_chunks": [{"expert_name": expert_name, "expert_report": content}] }4.2 结果处理
每个节点返回的是部分状态更新,而不是完整状态:
def _process_result(self, expert_name, export_report, state): """处理执行结果,返回该节点要添加的部分状态""" chunk = { "expert_name": expert_name, "expert_report": export_report } result = { "expert_reports": {expert_name: export_report}, "streaming_chunks": [chunk] } return result注意这里返回的是字典,LangGraph 会自动与当前状态合并。
4.3 特殊处理:八字计算
对于日期字段,我们需要计算八字:
def caculate_bazi(self, field_name, field_value) -> str: date_str = str(field_value).strip() date_formats = "%Y-%m-%d %H:%M" parsed_datetime = datetime.strptime(date_str, date_formats) year = parsed_datetime.year month = parsed_datetime.month day = parsed_datetime.day hour = parsed_datetime.hour # 计算八字 year_zhu, month_zhu, day_zhu, hour_zhu = tian_gan_di_zhi(year, month, day, hour) bazi_info = f"{field_name}:{field_value}\n" bazi_info += f"八字:{year_zhu} {month_zhu} {day_zhu} {hour_zhu}" bazi_info += f"\n年柱:{year_zhu},月柱:{month_zhu},日柱:{day_zhu},时柱:{hour_zhu}" return bazi_info五、汇聚节点:综合报告生成
所有专家分析完成后,需要生成综合报告:
async def_collect_node(state): """汇聚节点,收集所有专家的分析结果并生成最终报告""" # 1. 收集各专家的分析报告 expert_reports = state.get("expert_reports", {}) # 2. 如果只有一个专家,直接返回该专家的报告 iflen(expert_reports) == 1: final_report = list(expert_reports.values())[0] else: # 3. 多个专家时,构建综合提示词 synthesis_prompt = """你是一个命理分析师,擅长综合多个专家的分析结果。 请整合各专家的观点,找出共同点和差异,生成一份完整的综合命理分析报告。 报告格式:性格、事业、财运、婚姻、健康、未来趋势、建议""" # 4. 将所有专家报告汇总 summary_text = "\n".join([f"# {name}分析\n{content}" for name, content in expert_reports.items()]) # 5. 调用 LLM 生成综合报告 messages = [ SystemMessage(content=synthesis_prompt), HumanMessage(content=f"各专家分析结果:\n{summary_text}\n请生成综合报告") ] response = await fast_llm.ainvoke(messages) final_report = f"# 综合命理分析报告\n\n{response.content}" # 6. 返回综合报告的状态更新 return { "expert_reports": {"命理师综合分析": final_report}, "streaming_chunks": [{"expert_name": "命理师综合分析", "expert_report": final_report}] }六、流式处理:实时输出分析结果
6.1 流式接口
用户需要实时看到分析进度,我们使用 LangGraph 的astream_eventsAPI:
async defchat_with_planning_stream(self, task_id: str, user_data: Dict[str, Dict[str, Any]]) -> AsyncIterator[Dict[str, Any]]: """流式聊天接口""" initial_state = { "user_data": user_data, "streaming_chunks": [], "expert_reports": {} } config = RunnableConfig(configurable={"thread_id": task_id}) events = self.graph.astream_events(initial_state, config=config) asyncfor chunk inself.process_streaming_events(events): yield chunk6.2 事件处理逻辑
astream_events会返回大量事件,我们需要过滤和处理(伪代码):
async defprocess_streaming_events(events): """处理流式事件的公共方法""" sent_experts = set() # 跟踪已发送的专家报告,避免重复 asyncfor event in events: event_type = event.get('event') # 事件类型 event_name = event.get('name') # 节点名称 # 1. 处理单个节点的完成事件(实时发送专家报告) if event_type == "on_chain_end"and event_name != "LangGraph": # 从事件数据中提取 streaming_chunks chunk = event.get("data", {}).get("chunk", {}) streaming_chunks = chunk.get("streaming_chunks", []) # 发送专家报告(排除综合报告) for chunk in streaming_chunks: expert_name = chunk.get("expert_name") if expert_name and"综合"notin expert_name: if expert_name notin sent_experts: sent_experts.add(expert_name) yield { "expert_name": expert_name, "expert_report": chunk.get("expert_report") } # 2. 处理整个图的完成事件(发送综合报告) elif event_type == "on_chain_end"and event_name == "LangGraph": output = event.get("data", {}).get("output", {}) streaming_chunks = output.get("streaming_chunks", []) # 先发送遗漏的专家报告 for chunk in streaming_chunks: expert_name = chunk.get("expert_name") if expert_name and"综合"notin expert_name: if expert_name notin sent_experts: sent_experts.add(expert_name) yield {"expert_name": expert_name, "expert_report": chunk.get("expert_report")} # 最后发送综合报告 for chunk in streaming_chunks: if"综合"in chunk.get("expert_name", ""): yield {"expert_name": chunk.get("expert_name"), "expert_report": chunk.get("expert_report")}关键点:
事件过滤
:只处理
on_chain_end事件,区分节点级别和图级别去重处理
:使用
sent_experts集合避免重复发送顺序控制
:先发送各专家报告,最后发送综合报告
6.3 前端流式接收
后端通过 Server-Sent Events (SSE) 推送数据(伪代码):
def create_streaming_response(stream_generator): """创建流式响应""" asyncdefgenerate_stream(): asyncfor chunk in stream_generator: # 将数据转换为 SSE 格式 yieldf"data: {json.dumps(chunk, ensure_ascii=False)}\n\n" return StreamingResponse( generate_stream(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive" } )前端使用 EventSource 或 fetch 的流式 API 接收数据,实时更新 UI。
七、实战经验与踩坑
7.1 并行节点的状态更新
问题:多个并行节点同时更新状态时,如何避免冲突?
解决:使用Annotated和合并函数。LangGraph 会自动处理并行更新,确保状态正确合并。
7.2 流式事件处理的复杂性
问题:astream_events返回的事件很多,如何准确提取需要的数据?
解决:
- 明确事件类型:只处理
on_chain_end事件 - 区分节点级别和图级别的事件
- 使用去重机制避免重复发送
7.3 视觉模型的内容解析
问题:视觉模型返回的内容可能是数组格式,需要特殊处理。
解决:
def _parse_text_content(self, content: str) -> str: """解析并拼接文本内容""" text_parts = [] for item in content: if isinstance(item, dict) and 'text' in item: text_parts.append(item.get('text')) return ''.join(text_parts) if text_parts else ""7.4 状态序列化问题
问题:自定义对象无法序列化到检查点。
解决:使用自定义序列化器:
from utils.custom_serializer import CustomSerializerself.checkpointer = MemorySaver(serde=CustomSerializer())八、总结
通过这个项目,我们看到了 LangGraph 在实际应用中的强大能力:
并行执行
:轻松实现多专家并行分析
状态管理
:自动处理并行节点的状态合并
流式输出
:通过
astream_events实现实时反馈灵活扩展
:动态创建节点,支持任意数量的专家
LangGraph 的核心优势在于:
声明式编程
:用图结构清晰表达工作流
状态机模型
:天然支持复杂的状态转换
事件系统
:丰富的钩子函数,支持细粒度控制
对于需要复杂工作流、并行执行、状态管理的场景,LangGraph 是一个非常好的选择。
九、如何学习AI大模型?
大模型时代,火爆出圈的LLM大模型让程序员们开始重新评估自己的本领。 “AI会取代那些行业?”“谁的饭碗又将不保了?”等问题热议不断。
不如成为「掌握AI工具的技术人」,毕竟AI时代,谁先尝试,谁就能占得先机!
想正式转到一些新兴的 AI 行业,不仅需要系统的学习AI大模型。同时也要跟已有的技能结合,辅助编程提效,或上手实操应用,增加自己的职场竞争力。
但是LLM相关的内容很多,现在网上的老课程老教材关于LLM又太少。所以现在小白入门就只能靠自学,学习成本和门槛很高
那么针对所有自学遇到困难的同学们,我帮大家系统梳理大模型学习脉络,将这份LLM大模型资料分享出来:包括LLM大模型书籍、640套大模型行业报告、LLM大模型学习视频、LLM大模型学习路线、开源大模型学习教程等, 😝有需要的小伙伴,可以扫描下方二维码领取🆓↓↓↓
学习路线
第一阶段: 从大模型系统设计入手,讲解大模型的主要方法;
第二阶段: 在通过大模型提示词工程从Prompts角度入手更好发挥模型的作用;
第三阶段: 大模型平台应用开发借助阿里云PAI平台构建电商领域虚拟试衣系统;
第四阶段: 大模型知识库应用开发以LangChain框架为例,构建物流行业咨询智能问答系统;
第五阶段: 大模型微调开发借助以大健康、新零售、新媒体领域构建适合当前领域大模型;
第六阶段: 以SD多模态大模型为主,搭建了文生图小程序案例;
第七阶段: 以大模型平台应用与开发为主,通过星火大模型,文心大模型等成熟大模型构建大模型行业应用。
👉学会后的收获:👈
• 基于大模型全栈工程实现(前端、后端、产品经理、设计、数据分析等),通过这门课可获得不同能力;
• 能够利用大模型解决相关实际项目需求: 大数据时代,越来越多的企业和机构需要处理海量数据,利用大模型技术可以更好地处理这些数据,提高数据分析和决策的准确性。因此,掌握大模型应用开发技能,可以让程序员更好地应对实际项目需求;
• 基于大模型和企业数据AI应用开发,实现大模型理论、掌握GPU算力、硬件、LangChain开发框架和项目实战技能, 学会Fine-tuning垂直训练大模型(数据准备、数据蒸馏、大模型部署)一站式掌握;
• 能够完成时下热门大模型垂直领域模型训练能力,提高程序员的编码能力: 大模型应用开发需要掌握机器学习算法、深度学习框架等技术,这些技术的掌握可以提高程序员的编码能力和分析能力,让程序员更加熟练地编写高质量的代码。
1.AI大模型学习路线图
2.100套AI大模型商业化落地方案
3.100集大模型视频教程
4.200本大模型PDF书籍
5.LLM面试题合集
6.AI产品经理资源合集
👉获取方式:
😝有需要的小伙伴,可以保存图片到wx扫描二v码免费领取【保证100%免费】🆓