news 2026/7/1 13:58:34

LLM 工作流自动化:从 Prompt 拼接到 DAG 编排的工程跃迁

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
LLM 工作流自动化:从 Prompt 拼接到 DAG 编排的工程跃迁

LLM 工作流自动化:从 Prompt 拼接到 DAG 编排的工程跃迁

一、Prompt 拼接的天花板:当 LLM 调用链变成意大利面条

LLM 集成的初级形态是"Prompt 拼接"——将用户输入塞进模板,调用 API,返回结果。这在单轮问答场景下足够用。但当业务需要多步骤推理时,问题迅速暴露:第一步的输出是第二步的输入,第二步的分支取决于第一步的判断,第三步需要并行调用两个不同的 LLM。代码中充斥着嵌套的await、散落的try-catch、硬编码的 Prompt 模板字符串。一个 5 步的审批工作流,代码量超过 300 行,修改任意一步都需要通读全部逻辑。更致命的是,这种面条式代码无法可视化、无法回溯中间状态、无法在失败后从断点恢复。核心痛点:LLM 调用链缺乏结构化编排能力,导致复杂工作流的代码可维护性急剧恶化。

二、DAG 编排模型:将工作流从代码中解放出来

解决面条式 LLM 调用链的方案是有向无环图(DAG)编排。将每个 LLM 调用抽象为 DAG 中的一个节点,节点间的数据依赖关系用有向边表示。执行引擎按拓扑排序调度节点,自动处理并行和依赖。

graph TB START([用户输入]) --> A[意图分类节点] A -->|分类:查询| B[知识库检索节点] A -->|分类:操作| C[参数提取节点] B --> D[上下文组装节点] C --> E[工具调用节点] D --> F[回答生成节点] E --> F F --> G{质量校验节点} G -->|通过| END([输出结果]) G -->|不通过| A style A fill:#e1f5fe style F fill:#fff3e0 style G fill:#ffebee

上图是一个典型的 LLM 工作流 DAG。关键设计点:

条件分支:意图分类节点的输出决定后续走查询路径还是操作路径。DAG 引擎根据节点输出的路由键选择下游节点。

并行扇出-扇入:知识库检索和参数提取可以并行执行,回答生成节点等待两者都完成后才触发。DAG 引擎自动识别无依赖关系的节点并行调度。

循环重试:质量校验节点不通过时,将结果回传意图分类节点重新处理。DAG 中允许回边,但必须设置最大重试次数防止死循环。

节点抽象:每个节点只关心输入 Schema 和输出 Schema,不关心上下游是谁。这使得节点可以独立测试、独立替换 LLM 提供商。

三、生产级代码实现:轻量 DAG 编排引擎

以下实现不依赖 LangGraph 或 Prefect,用约 150 行 TypeScript 构建一个可用的 DAG 引擎:

// workflow/dag-engine.ts — 轻量 DAG 编排引擎 interface WorkflowNode { id: string; // 节点执行函数:接收上游输出,返回本节点结果 execute: (input: Record<string, unknown>) => Promise<Record<string, unknown>>; // 路由函数:可选,用于条件分支,返回下游节点 ID 列表 route?: (output: Record<string, unknown>) => string[]; } interface WorkflowEdge { from: string; to: string; // 条件谓词:可选,仅当谓词返回 true 时边才激活 condition?: (output: Record<string, unknown>) => boolean; } interface WorkflowDefinition { nodes: WorkflowNode[]; edges: WorkflowEdge[]; entryNode: string; maxIterations: number; // 防止循环导致无限执行 } class DAGEngine { private nodeMap: Map<string, WorkflowNode> = new Map(); private adjacency: Map<string, WorkflowEdge[]> = new Map(); private definition: WorkflowDefinition; constructor(definition: WorkflowDefinition) { this.definition = definition; // 构建节点索引和邻接表,加速运行时查找 definition.nodes.forEach((n) => this.nodeMap.set(n.id, n)); definition.edges.forEach((e) => { const edges = this.adjacency.get(e.from) ?? []; edges.push(e); this.adjacency.set(e.from, edges); }); } async run( initialInput: Record<string, unknown> ): Promise<Record<string, unknown>> { const context: Record<string, Record<string, unknown>> = {}; let currentNodeId: string | null = this.definition.entryNode; let iterations = 0; while (currentNodeId && iterations < this.definition.maxIterations) { const node = this.nodeMap.get(currentNodeId); if (!node) { throw new Error(`节点 ${currentNodeId} 未注册`); } // 聚合上游节点的输出作为当前节点的输入 const upstreamOutputs = this.getUpstreamOutputs(currentNodeId, context); const mergedInput = { ...initialInput, ...upstreamOutputs }; // 执行当前节点,捕获异常并记录到上下文 let output: Record<string, unknown>; try { output = await node.execute(mergedInput); } catch (err) { output = { _error: true, _message: (err as Error).message, _node: currentNodeId }; } context[currentNodeId] = output; iterations++; // 确定下一个节点:优先使用路由函数,否则按边条件过滤 const nextNodes = this.resolveNextNodes(currentNodeId, output); if (nextNodes.length === 0) { // 无下游节点,工作流结束 break; } // 简化处理:线性 DAG 取第一个下游节点 // 完整实现应支持并行扇出,此处从简 currentNodeId = nextNodes[0]; } if (iterations >= this.definition.maxIterations) { throw new Error("工作流超过最大迭代次数,可能存在循环"); } // 返回最后一个节点的输出作为工作流结果 const lastNodeId = Object.keys(context).pop()!; return context[lastNodeId]; } // 获取上游节点的输出,合并为当前输入 private getUpstreamOutputs( nodeId: string, context: Record<string, Record<string, unknown>> ): Record<string, unknown> { const merged: Record<string, unknown> = {}; this.definition.edges .filter((e) => e.to === nodeId) .forEach((e) => { if (context[e.from]) { Object.assign(merged, context[e.from]); } }); return merged; } // 解析下游节点:路由函数 > 边条件 > 默认 private resolveNextNodes( nodeId: string, output: Record<string, unknown> ): string[] { const node = this.nodeMap.get(nodeId)!; // 优先使用节点的路由函数 if (node.route) { return node.route(output); } // 按边条件过滤 const edges = this.adjacency.get(nodeId) ?? []; return edges .filter((e) => !e.condition || e.condition(output)) .map((e) => e.to); } } // 使用示例:构建一个意图分类 + 分支处理的工作流 const workflow = new DAGEngine({ entryNode: "classify", maxIterations: 10, nodes: [ { id: "classify", execute: async (input) => { // 调用 LLM 进行意图分类 const intent = await classifyIntent(input.userMessage as string); return { intent, userMessage: input.userMessage }; }, // 路由函数:根据分类结果决定下游节点 route: (output) => { return output.intent === "query" ? ["retrieve"] : ["extractParams"]; }, }, { id: "retrieve", execute: async (input) => { const docs = await retrieveFromKnowledgeBase(input.userMessage as string); return { ...input, documents: docs }; }, }, { id: "extractParams", execute: async (input) => { const params = await extractParameters(input.userMessage as string); return { ...input, params }; }, }, { id: "generate", execute: async (input) => { const answer = await generateAnswer(input); return { answer }; }, }, ], edges: [ { from: "retrieve", to: "generate" }, { from: "extractParams", to: "generate" }, ], });

设计要点:maxIterations防止循环导致的无限执行。节点执行异常不抛出,而是将错误信息写入上下文,让下游节点或路由函数自行决策。route函数让条件分支逻辑内聚在节点内部,而非散落在全局条件判断中。

四、DAG 编排的暗面:状态管理与调试的深层挑战

DAG 编排解决了代码结构问题,但引入了新的复杂性。

状态持久化的缺失。上述轻量引擎将所有中间状态保存在内存中。一旦进程重启,工作流状态全部丢失。对于耗时较长的工作流(如人工审批节点等待数小时),必须将状态持久化到数据库。这需要引入检查点(Checkpoint)机制:每个节点执行完毕后,将上下文快照写入持久存储。恢复时从最近的检查点继续执行。实现检查点并不复杂,但需要处理 Schema 版本兼容——工作流定义更新后,旧检查点的数据结构可能不兼容。

LLM 调用的不确定性。DAG 的路由依赖 LLM 输出,但 LLM 输出具有非确定性。同一个输入,意图分类可能有时返回"query",有时返回"operation"。这导致工作流执行路径不可复现。解决方案是对路由节点的 LLM 调用设置temperature: 0,并在 Prompt 中严格约束输出格式。但即使如此,仍无法 100% 保证路由稳定性。

并行节点的错误传播。当两个并行节点中一个失败时,扇入节点应该如何处理?等待全部完成?忽略失败节点?还是整体回滚?不同业务场景需要不同策略,DAG 引擎必须提供可配置的错误传播语义。

适用边界:DAG 编排适合步骤数 ≥ 3、存在条件分支或并行执行、且需要可视化调试的 LLM 工作流。对于简单的线性调用链(步骤 ≤ 2),直接写await更直观。

五、总结

LLM 工作流自动化的核心跃迁,是从命令式的 Prompt 拼接走向声明式的 DAG 编排。将每个 LLM 调用抽象为 DAG 节点,用边定义数据依赖,让执行引擎自动处理调度、并行和路由。落地路线建议:第一步,用线性await链快速验证业务逻辑的可行性;第二步,当步骤数超过 3 且出现条件分支时,引入 DAG 引擎重构;第三步,为长耗时工作流添加检查点持久化,确保故障恢复能力。编排不是目的,可维护性才是。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/7/1 13:55:36

Mythos漏洞挖掘模型:可规模化自主发现RCE的AI安全新范式

1. 这不是一次普通模型发布&#xff1a;Mythos 的真实分量与行业震感你可能已经刷到过“Anthropic 发布 Claude Mythos”这条新闻&#xff0c;标题里带着“Preview”“Gated Release”这类字眼&#xff0c;很容易被当成又一场科技公司的例行发布会。但如果你真这么想&#xff0…

作者头像 李华
网站建设 2026/7/1 13:54:37

大模型推理部署实战:从 GPU 显存管理到高并发服务化的全链路设计

大模型推理部署实战&#xff1a;从 GPU 显存管理到高并发服务化的全链路设计一、Token 吞吐与显存瓶颈&#xff1a;LLM 部署的工程困境 大模型推理部署的核心矛盾在于&#xff1a;GPU 显存是有限的&#xff0c;而模型参数和 KV Cache 的显存需求几乎是无上限的。一个 70B 参数的…

作者头像 李华
网站建设 2026/7/1 13:52:43

ICM-45605与TM4C1294NCPDT在工业IMU系统中的应用与优化

1. 为什么选择ICM-45605与TM4C1294NCPDT这对组合 在工业级惯性测量领域&#xff0c;传感器与处理器的搭配往往决定了整个系统的性能上限。ICM-45605作为TDK InvenSense新一代6DOF IMU&#xff0c;其核心优势在于16g的加速度计量程和2000dps的陀螺仪范围&#xff0c;配合0.375mg…

作者头像 李华
网站建设 2026/7/1 13:52:01

在平湖看牙,我注意到数字化设备在日常诊疗中的应用

今年陪家人看牙&#xff0c;在平湖跑了三家机构&#xff0c;注意到了一些数字化设备在日常诊疗中的实际应用。写下来记录一下。第一次去公立医院拍了X光片&#xff0c;是那种传统的二维片子。医生看了看&#xff0c;说“可以处理”&#xff0c;就让我们去约时间了。过程很快&am…

作者头像 李华