news 2026/1/11 15:54:37

【珍藏干货】用LangGraph构建多专家并行分析系统:Fate Whisper项目实战!

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
【珍藏干货】用LangGraph构建多专家并行分析系统:Fate Whisper项目实战!

简介

本文详细介绍了如何使用LangGraph构建Fate Whisper智能命理分析系统,实现多专家(八字、手相、面相)并行分析工作流。文章涵盖状态定义、并行节点设计、流式输出处理和结果汇总等关键技术点,解决了并行执行、状态管理和实时反馈等挑战。通过实战案例展示了LangGraph在复杂工作流中的优势,包括声明式编程、状态机模型和灵活的事件系统,为开发者提供了宝贵的实践经验和踩坑指南。


前言

在之前的系列文章中,我们已经深入探讨了 LangGraph 的基础概念和核心特性。今天,我们将通过一个真实的项目案例——Fate Whisper 智能命理分析系统,来看看如何用 LangGraph 构建一个支持多专家并行分析的工作流。

这个项目的核心需求是:用户可以选择多个命理专家(八字、手相、面相),系统需要并行调用这些专家进行分析,最后生成一份综合报告。这正好是 LangGraph 擅长的场景——状态机工作流 + 并行节点执行

一、项目需求分析

业务场景

Fate Whisper 是一个智能命理分析平台,用户可以:

  1. 选择一个或多个命理专家(八字、手相、面相)
  2. 根据专家要求填写或上传相应信息
  3. 系统并行调用所选专家进行分析
  4. 实时流式返回各专家的分析结果
  5. 最后生成一份综合命理分析报告

技术挑战

  1. 并行执行

    :多个专家需要同时分析,不能串行等待

  2. 状态管理

    :需要收集各专家的结果,并合并到最终状态

  3. 流式输出

    :用户需要实时看到分析进度,不能等全部完成

  4. 结果汇聚

    :多个专家的结果需要智能整合成综合报告

这些挑战,LangGraph 都能很好地解决。

二、核心架构设计

2.1 状态定义

首先,我们需要定义图的状态结构。在 LangGraph 中,状态使用TypedDict定义:

class FateGraphState(TypedDict): user_data: Dict[str, Any] # 用户输入数据 streaming_chunks: Annotated[List[Dict[str, Any]], merge_lists] # 流式输出块 expert_reports: Annotated[Dict[str, Any], merge_dicts] # 专家分析报告

这里有几个关键点:

  1. user_data

    :存储用户输入的数据,按专家ID组织

  2. streaming_chunks

    :用于流式输出的数据块列表

  3. expert_reports

    :各专家的分析结果

注意Annotated的使用,这是 LangGraph 处理并行节点状态更新的关键:

  • merge_lists

    :当多个节点同时更新streaming_chunks时,使用列表合并

  • merge_dicts

    :当多个节点同时更新expert_reports时,使用字典合并

2.2 状态合并函数

并行节点执行时,多个节点可能同时更新同一个状态字段。LangGraph 需要知道如何合并这些更新:

def merge_dicts(left: Dict[str, Any], right: Dict[str, Any]) -> Dict[str, Any]: """合并两个字典,用于并行节点更新 expert_reports""" result = left.copy() result.update(right) return resultdef merge_lists(left: List[Dict[str, Any]], right: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """合并两个列表,用于并行节点更新 streaming_chunks""" return left + right

这两个函数会在并行节点执行时自动调用,确保状态正确合并。

三、图构建:并行专家节点

3.1 图的拓扑结构

我们的工作流结构如下:

START ├─> 专家1节点 ─┐ ├─> 专家2节点 ─┤ ├─> 专家3节点 ─┼─> collect节点 ─> END └─> ... ┘

所有专家节点从 START 并行开始,完成后都汇聚到collect节点,最后结束。

3.2 构建图的代码

def _build_graph(self) -> CompiledStateGraph: """构建并行专家分析图""" ifnotself.analysis_experts: raise ValueError("专家列表不能为空") workflow = StateGraph(FateGraphState) # 添加专家节点和汇聚节点 expert_node_names = [] for expert_config inself.analysis_experts: expert_id = expert_config.get("id") expert_node = self._create_expert_node_factory(expert_config) workflow.add_node(expert_id, expert_node) expert_node_names.append(expert_id) workflow.add_node("collect", self._collect_node) # 连接节点:START -> 各专家节点 -> collect -> END for node_name in expert_node_names: workflow.add_edge(START, node_name) workflow.add_edge(node_name, "collect") workflow.add_edge("collect", END) return workflow.compile(checkpointer=self.checkpointer)

关键点:

  1. 动态节点创建

    :根据选择的专家动态创建节点

  2. 并行连接

    :所有专家节点都从 START 开始,实现并行执行

  3. 汇聚节点

    :所有专家完成后,统一进入collect节点

3.3 专家节点工厂

每个专家需要独立的节点函数,我们使用工厂模式创建:

def _create_expert_node_factory(self, expert_config: Dict[str, Any]): """创建专家节点工厂函数,绑定专家配置""" async def expert_node(state: FateGraphState) -> FateGraphState: return await self._create_expert_node(state, expert_config) return expert_node

这样每个专家节点都有自己独立的配置,但共享相同的执行逻辑。

针对所有自学遇到困难的同学们,我帮大家系统梳理大模型学习脉络,将这份LLM大模型资料分享出来:包括LLM大模型书籍、640套大模型行业报告、LLM大模型学习视频、LLM大模型学习路线、开源大模型学习教程等, 😝有需要的小伙伴,可以扫描下方二维码领取🆓↓↓↓


四、专家节点实现

4.1 节点执行逻辑

专家节点的核心逻辑(伪代码):

async def_create_expert_node(state, expert_config): """专家节点函数,执行专家分析""" # 1. 从配置中提取专家信息和必填字段 expert_id = expert_config.get("id") expert_name = expert_config.get("name") required_fields = expert_config.get("required_fields", []) # 2. 构建消息列表 messages = [SystemMessage(content=expert_config.get("prompt"))] # 3. 从状态中提取该专家需要的数据,根据字段类型构建消息 expert_data = state.get("user_data").get(expert_id) for field in required_fields: field_type = field.get("field_type") field_value = expert_data.get(field.get("field_id")) if field_type == "datetime": # 日期类型:计算八字并添加到消息 messages.append(HumanMessage(content=calculate_bazi(field_value))) elif field_type == "image": # 图片类型:添加图片消息(base64编码) messages.append(HumanMessage(content=[{"type": "image", "image": f"data:image/jpeg;base64,{field_value}"}])) else: # 文本类型:直接添加文本消息 messages.append(HumanMessage(content=field_value)) # 4. 根据是否有图片字段选择模型 needs_vision = any(field.get("field_type") == "image"for field in required_fields) llm = vision_llm if needs_vision else fast_llm # 5. 调用 LLM 进行分析 response = await llm.ainvoke(messages) content = parse_response(response, needs_vision) # 6. 返回部分状态更新(只包含该节点的更新) return { "expert_reports": {expert_name: content}, "streaming_chunks": [{"expert_name": expert_name, "expert_report": content}] }

4.2 结果处理

每个节点返回的是部分状态更新,而不是完整状态:

def _process_result(self, expert_name, export_report, state): """处理执行结果,返回该节点要添加的部分状态""" chunk = { "expert_name": expert_name, "expert_report": export_report } result = { "expert_reports": {expert_name: export_report}, "streaming_chunks": [chunk] } return result

注意这里返回的是字典,LangGraph 会自动与当前状态合并。

4.3 特殊处理:八字计算

对于日期字段,我们需要计算八字:

def caculate_bazi(self, field_name, field_value) -> str: date_str = str(field_value).strip() date_formats = "%Y-%m-%d %H:%M" parsed_datetime = datetime.strptime(date_str, date_formats) year = parsed_datetime.year month = parsed_datetime.month day = parsed_datetime.day hour = parsed_datetime.hour # 计算八字 year_zhu, month_zhu, day_zhu, hour_zhu = tian_gan_di_zhi(year, month, day, hour) bazi_info = f"{field_name}:{field_value}\n" bazi_info += f"八字:{year_zhu} {month_zhu} {day_zhu} {hour_zhu}" bazi_info += f"\n年柱:{year_zhu},月柱:{month_zhu},日柱:{day_zhu},时柱:{hour_zhu}" return bazi_info

五、汇聚节点:综合报告生成

所有专家分析完成后,需要生成综合报告:

async def_collect_node(state): """汇聚节点,收集所有专家的分析结果并生成最终报告""" # 1. 收集各专家的分析报告 expert_reports = state.get("expert_reports", {}) # 2. 如果只有一个专家,直接返回该专家的报告 iflen(expert_reports) == 1: final_report = list(expert_reports.values())[0] else: # 3. 多个专家时,构建综合提示词 synthesis_prompt = """你是一个命理分析师,擅长综合多个专家的分析结果。 请整合各专家的观点,找出共同点和差异,生成一份完整的综合命理分析报告。 报告格式:性格、事业、财运、婚姻、健康、未来趋势、建议""" # 4. 将所有专家报告汇总 summary_text = "\n".join([f"# {name}分析\n{content}" for name, content in expert_reports.items()]) # 5. 调用 LLM 生成综合报告 messages = [ SystemMessage(content=synthesis_prompt), HumanMessage(content=f"各专家分析结果:\n{summary_text}\n请生成综合报告") ] response = await fast_llm.ainvoke(messages) final_report = f"# 综合命理分析报告\n\n{response.content}" # 6. 返回综合报告的状态更新 return { "expert_reports": {"命理师综合分析": final_report}, "streaming_chunks": [{"expert_name": "命理师综合分析", "expert_report": final_report}] }

六、流式处理:实时输出分析结果

6.1 流式接口

用户需要实时看到分析进度,我们使用 LangGraph 的astream_eventsAPI:

async defchat_with_planning_stream(self, task_id: str, user_data: Dict[str, Dict[str, Any]]) -> AsyncIterator[Dict[str, Any]]: """流式聊天接口""" initial_state = { "user_data": user_data, "streaming_chunks": [], "expert_reports": {} } config = RunnableConfig(configurable={"thread_id": task_id}) events = self.graph.astream_events(initial_state, config=config) asyncfor chunk inself.process_streaming_events(events): yield chunk

6.2 事件处理逻辑

astream_events会返回大量事件,我们需要过滤和处理(伪代码):

async defprocess_streaming_events(events): """处理流式事件的公共方法""" sent_experts = set() # 跟踪已发送的专家报告,避免重复 asyncfor event in events: event_type = event.get('event') # 事件类型 event_name = event.get('name') # 节点名称 # 1. 处理单个节点的完成事件(实时发送专家报告) if event_type == "on_chain_end"and event_name != "LangGraph": # 从事件数据中提取 streaming_chunks chunk = event.get("data", {}).get("chunk", {}) streaming_chunks = chunk.get("streaming_chunks", []) # 发送专家报告(排除综合报告) for chunk in streaming_chunks: expert_name = chunk.get("expert_name") if expert_name and"综合"notin expert_name: if expert_name notin sent_experts: sent_experts.add(expert_name) yield { "expert_name": expert_name, "expert_report": chunk.get("expert_report") } # 2. 处理整个图的完成事件(发送综合报告) elif event_type == "on_chain_end"and event_name == "LangGraph": output = event.get("data", {}).get("output", {}) streaming_chunks = output.get("streaming_chunks", []) # 先发送遗漏的专家报告 for chunk in streaming_chunks: expert_name = chunk.get("expert_name") if expert_name and"综合"notin expert_name: if expert_name notin sent_experts: sent_experts.add(expert_name) yield {"expert_name": expert_name, "expert_report": chunk.get("expert_report")} # 最后发送综合报告 for chunk in streaming_chunks: if"综合"in chunk.get("expert_name", ""): yield {"expert_name": chunk.get("expert_name"), "expert_report": chunk.get("expert_report")}

关键点:

  1. 事件过滤

    :只处理on_chain_end事件,区分节点级别和图级别

  2. 去重处理

    :使用sent_experts集合避免重复发送

  3. 顺序控制

    :先发送各专家报告,最后发送综合报告

6.3 前端流式接收

后端通过 Server-Sent Events (SSE) 推送数据(伪代码):

def create_streaming_response(stream_generator): """创建流式响应""" asyncdefgenerate_stream(): asyncfor chunk in stream_generator: # 将数据转换为 SSE 格式 yieldf"data: {json.dumps(chunk, ensure_ascii=False)}\n\n" return StreamingResponse( generate_stream(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive" } )

前端使用 EventSource 或 fetch 的流式 API 接收数据,实时更新 UI。

七、实战经验与踩坑

7.1 并行节点的状态更新

问题:多个并行节点同时更新状态时,如何避免冲突?

解决:使用Annotated和合并函数。LangGraph 会自动处理并行更新,确保状态正确合并。

7.2 流式事件处理的复杂性

问题astream_events返回的事件很多,如何准确提取需要的数据?

解决

  1. 明确事件类型:只处理on_chain_end事件
  2. 区分节点级别和图级别的事件
  3. 使用去重机制避免重复发送

7.3 视觉模型的内容解析

问题:视觉模型返回的内容可能是数组格式,需要特殊处理。

解决

def _parse_text_content(self, content: str) -> str: """解析并拼接文本内容""" text_parts = [] for item in content: if isinstance(item, dict) and 'text' in item: text_parts.append(item.get('text')) return ''.join(text_parts) if text_parts else ""

7.4 状态序列化问题

问题:自定义对象无法序列化到检查点。

解决:使用自定义序列化器:

from utils.custom_serializer import CustomSerializerself.checkpointer = MemorySaver(serde=CustomSerializer())

八、总结

通过这个项目,我们看到了 LangGraph 在实际应用中的强大能力:

  1. 并行执行

    :轻松实现多专家并行分析

  2. 状态管理

    :自动处理并行节点的状态合并

  3. 流式输出

    :通过astream_events实现实时反馈

  4. 灵活扩展

    :动态创建节点,支持任意数量的专家

LangGraph 的核心优势在于:

  • 声明式编程

    :用图结构清晰表达工作流

  • 状态机模型

    :天然支持复杂的状态转换

  • 事件系统

    :丰富的钩子函数,支持细粒度控制

对于需要复杂工作流、并行执行、状态管理的场景,LangGraph 是一个非常好的选择。

九、如何学习AI大模型?

大模型时代,火爆出圈的LLM大模型让程序员们开始重新评估自己的本领。 “AI会取代那些行业?”“谁的饭碗又将不保了?”等问题热议不断。

不如成为「掌握AI工具的技术人」,毕竟AI时代,谁先尝试,谁就能占得先机!

想正式转到一些新兴的 AI 行业,不仅需要系统的学习AI大模型。同时也要跟已有的技能结合,辅助编程提效,或上手实操应用,增加自己的职场竞争力。

但是LLM相关的内容很多,现在网上的老课程老教材关于LLM又太少。所以现在小白入门就只能靠自学,学习成本和门槛很高

那么针对所有自学遇到困难的同学们,我帮大家系统梳理大模型学习脉络,将这份LLM大模型资料分享出来:包括LLM大模型书籍、640套大模型行业报告、LLM大模型学习视频、LLM大模型学习路线、开源大模型学习教程等, 😝有需要的小伙伴,可以扫描下方二维码领取🆓↓↓↓

学习路线

第一阶段: 从大模型系统设计入手,讲解大模型的主要方法;

第二阶段: 在通过大模型提示词工程从Prompts角度入手更好发挥模型的作用;

第三阶段: 大模型平台应用开发借助阿里云PAI平台构建电商领域虚拟试衣系统;

第四阶段: 大模型知识库应用开发以LangChain框架为例,构建物流行业咨询智能问答系统;

第五阶段: 大模型微调开发借助以大健康、新零售、新媒体领域构建适合当前领域大模型;

第六阶段: 以SD多模态大模型为主,搭建了文生图小程序案例;

第七阶段: 以大模型平台应用与开发为主,通过星火大模型,文心大模型等成熟大模型构建大模型行业应用。

👉学会后的收获:👈

• 基于大模型全栈工程实现(前端、后端、产品经理、设计、数据分析等),通过这门课可获得不同能力;

• 能够利用大模型解决相关实际项目需求: 大数据时代,越来越多的企业和机构需要处理海量数据,利用大模型技术可以更好地处理这些数据,提高数据分析和决策的准确性。因此,掌握大模型应用开发技能,可以让程序员更好地应对实际项目需求;

• 基于大模型和企业数据AI应用开发,实现大模型理论、掌握GPU算力、硬件、LangChain开发框架和项目实战技能, 学会Fine-tuning垂直训练大模型(数据准备、数据蒸馏、大模型部署)一站式掌握;

• 能够完成时下热门大模型垂直领域模型训练能力,提高程序员的编码能力: 大模型应用开发需要掌握机器学习算法、深度学习框架等技术,这些技术的掌握可以提高程序员的编码能力和分析能力,让程序员更加熟练地编写高质量的代码。

1.AI大模型学习路线图
2.100套AI大模型商业化落地方案
3.100集大模型视频教程
4.200本大模型PDF书籍
5.LLM面试题合集
6.AI产品经理资源合集

👉获取方式:
😝有需要的小伙伴,可以保存图片到wx扫描二v码免费领取【保证100%免费】🆓

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/1/9 22:52:24

抖音小程序开发(uniapp)

1、下载抖音开发者工具 抖音开发者工具下载地址: https://developer.open-douyin.com/docs/resource/zh-CN/mini-app/develop/dev-tools/developer-instrument/download/developer-instrument-update-and-download 2、启动项目 选择如图运行到抖音开发者工具 如…

作者头像 李华
网站建设 2026/1/8 23:46:49

错过再等一年!Dify工作流重试机制内部资料曝光(附源码级解析)

第一章:错过再等一年!Dify工作流重试机制全貌揭秘在构建高可用的AI应用时,网络波动、模型超时或临时性服务异常难以避免。Dify 工作流引擎内置了智能重试机制,确保关键任务在短暂失败后仍能自动恢复执行,极大提升系统鲁…

作者头像 李华
网站建设 2025/12/24 7:16:01

js未授权简介

一、什么是未授权? 首先理解什么是未授权漏洞 未授权字面上理解是未获得授权,对于正常的业务来说,有些功能点需要经过登录之后才能进行,那么如果我们通过一些绕过,无需登录也可以完成此类操作,那么便是未授权访问漏洞了。 二、常见的未授权访问漏洞 常见的未授权漏洞一…

作者头像 李华
网站建设 2025/12/23 15:26:07

方舟引擎如何打破性能枷锁,铸造“超级隐私模式”的实现之道

摘要: 在数字时代,用户隐私与应用性能似乎陷入了一场零和博弈。我们渴望极致的隐私保护,却又无法忍受由此带来的性能下降和体验割裂。本文将跳出传统浏览器“无痕模式”的局限,构想一种系统级的“超级隐私模式”,并深入…

作者头像 李华
网站建设 2025/12/24 14:26:11

为什么你的Shiny应用越跑越慢?(多模态缓存缺失的代价)

第一章:为什么你的Shiny应用越跑越慢?当你最初部署 Shiny 应用时,响应迅速、交互流畅。但随着用户量增加或数据规模扩大,应用逐渐变得卡顿甚至无响应。性能下降通常并非单一原因所致,而是多个潜在瓶颈累积的结果。无效…

作者头像 李华
网站建设 2025/12/24 11:22:03

7、Linux 文件共享与查找全攻略

Linux 文件共享与查找全攻略 在 Linux 系统中,文件共享和查找是非常重要的操作,掌握这些操作可以帮助我们更好地管理和使用文件。下面将详细介绍 Linux 中文件共享和查找的相关知识和操作方法。 1. 文件共享 1.1 分组协作 在 Linux 里,组是为了实现文件共享和促进协作而…

作者头像 李华