LLM 工作流自动化:从 Prompt 拼接到 DAG 编排的工程跃迁
一、Prompt 拼接的天花板:当 LLM 调用链变成意大利面条
LLM 集成的初级形态是"Prompt 拼接"——将用户输入塞进模板,调用 API,返回结果。这在单轮问答场景下足够用。但当业务需要多步骤推理时,问题迅速暴露:第一步的输出是第二步的输入,第二步的分支取决于第一步的判断,第三步需要并行调用两个不同的 LLM。代码中充斥着嵌套的await、散落的try-catch、硬编码的 Prompt 模板字符串。一个 5 步的审批工作流,代码量超过 300 行,修改任意一步都需要通读全部逻辑。更致命的是,这种面条式代码无法可视化、无法回溯中间状态、无法在失败后从断点恢复。核心痛点:LLM 调用链缺乏结构化编排能力,导致复杂工作流的代码可维护性急剧恶化。
二、DAG 编排模型:将工作流从代码中解放出来
解决面条式 LLM 调用链的方案是有向无环图(DAG)编排。将每个 LLM 调用抽象为 DAG 中的一个节点,节点间的数据依赖关系用有向边表示。执行引擎按拓扑排序调度节点,自动处理并行和依赖。
graph TB START([用户输入]) --> A[意图分类节点] A -->|分类:查询| B[知识库检索节点] A -->|分类:操作| C[参数提取节点] B --> D[上下文组装节点] C --> E[工具调用节点] D --> F[回答生成节点] E --> F F --> G{质量校验节点} G -->|通过| END([输出结果]) G -->|不通过| A style A fill:#e1f5fe style F fill:#fff3e0 style G fill:#ffebee上图是一个典型的 LLM 工作流 DAG。关键设计点:
条件分支:意图分类节点的输出决定后续走查询路径还是操作路径。DAG 引擎根据节点输出的路由键选择下游节点。
并行扇出-扇入:知识库检索和参数提取可以并行执行,回答生成节点等待两者都完成后才触发。DAG 引擎自动识别无依赖关系的节点并行调度。
循环重试:质量校验节点不通过时,将结果回传意图分类节点重新处理。DAG 中允许回边,但必须设置最大重试次数防止死循环。
节点抽象:每个节点只关心输入 Schema 和输出 Schema,不关心上下游是谁。这使得节点可以独立测试、独立替换 LLM 提供商。
三、生产级代码实现:轻量 DAG 编排引擎
以下实现不依赖 LangGraph 或 Prefect,用约 150 行 TypeScript 构建一个可用的 DAG 引擎:
// workflow/dag-engine.ts — 轻量 DAG 编排引擎 interface WorkflowNode { id: string; // 节点执行函数:接收上游输出,返回本节点结果 execute: (input: Record<string, unknown>) => Promise<Record<string, unknown>>; // 路由函数:可选,用于条件分支,返回下游节点 ID 列表 route?: (output: Record<string, unknown>) => string[]; } interface WorkflowEdge { from: string; to: string; // 条件谓词:可选,仅当谓词返回 true 时边才激活 condition?: (output: Record<string, unknown>) => boolean; } interface WorkflowDefinition { nodes: WorkflowNode[]; edges: WorkflowEdge[]; entryNode: string; maxIterations: number; // 防止循环导致无限执行 } class DAGEngine { private nodeMap: Map<string, WorkflowNode> = new Map(); private adjacency: Map<string, WorkflowEdge[]> = new Map(); private definition: WorkflowDefinition; constructor(definition: WorkflowDefinition) { this.definition = definition; // 构建节点索引和邻接表,加速运行时查找 definition.nodes.forEach((n) => this.nodeMap.set(n.id, n)); definition.edges.forEach((e) => { const edges = this.adjacency.get(e.from) ?? []; edges.push(e); this.adjacency.set(e.from, edges); }); } async run( initialInput: Record<string, unknown> ): Promise<Record<string, unknown>> { const context: Record<string, Record<string, unknown>> = {}; let currentNodeId: string | null = this.definition.entryNode; let iterations = 0; while (currentNodeId && iterations < this.definition.maxIterations) { const node = this.nodeMap.get(currentNodeId); if (!node) { throw new Error(`节点 ${currentNodeId} 未注册`); } // 聚合上游节点的输出作为当前节点的输入 const upstreamOutputs = this.getUpstreamOutputs(currentNodeId, context); const mergedInput = { ...initialInput, ...upstreamOutputs }; // 执行当前节点,捕获异常并记录到上下文 let output: Record<string, unknown>; try { output = await node.execute(mergedInput); } catch (err) { output = { _error: true, _message: (err as Error).message, _node: currentNodeId }; } context[currentNodeId] = output; iterations++; // 确定下一个节点:优先使用路由函数,否则按边条件过滤 const nextNodes = this.resolveNextNodes(currentNodeId, output); if (nextNodes.length === 0) { // 无下游节点,工作流结束 break; } // 简化处理:线性 DAG 取第一个下游节点 // 完整实现应支持并行扇出,此处从简 currentNodeId = nextNodes[0]; } if (iterations >= this.definition.maxIterations) { throw new Error("工作流超过最大迭代次数,可能存在循环"); } // 返回最后一个节点的输出作为工作流结果 const lastNodeId = Object.keys(context).pop()!; return context[lastNodeId]; } // 获取上游节点的输出,合并为当前输入 private getUpstreamOutputs( nodeId: string, context: Record<string, Record<string, unknown>> ): Record<string, unknown> { const merged: Record<string, unknown> = {}; this.definition.edges .filter((e) => e.to === nodeId) .forEach((e) => { if (context[e.from]) { Object.assign(merged, context[e.from]); } }); return merged; } // 解析下游节点:路由函数 > 边条件 > 默认 private resolveNextNodes( nodeId: string, output: Record<string, unknown> ): string[] { const node = this.nodeMap.get(nodeId)!; // 优先使用节点的路由函数 if (node.route) { return node.route(output); } // 按边条件过滤 const edges = this.adjacency.get(nodeId) ?? []; return edges .filter((e) => !e.condition || e.condition(output)) .map((e) => e.to); } } // 使用示例:构建一个意图分类 + 分支处理的工作流 const workflow = new DAGEngine({ entryNode: "classify", maxIterations: 10, nodes: [ { id: "classify", execute: async (input) => { // 调用 LLM 进行意图分类 const intent = await classifyIntent(input.userMessage as string); return { intent, userMessage: input.userMessage }; }, // 路由函数:根据分类结果决定下游节点 route: (output) => { return output.intent === "query" ? ["retrieve"] : ["extractParams"]; }, }, { id: "retrieve", execute: async (input) => { const docs = await retrieveFromKnowledgeBase(input.userMessage as string); return { ...input, documents: docs }; }, }, { id: "extractParams", execute: async (input) => { const params = await extractParameters(input.userMessage as string); return { ...input, params }; }, }, { id: "generate", execute: async (input) => { const answer = await generateAnswer(input); return { answer }; }, }, ], edges: [ { from: "retrieve", to: "generate" }, { from: "extractParams", to: "generate" }, ], });设计要点:maxIterations防止循环导致的无限执行。节点执行异常不抛出,而是将错误信息写入上下文,让下游节点或路由函数自行决策。route函数让条件分支逻辑内聚在节点内部,而非散落在全局条件判断中。
四、DAG 编排的暗面:状态管理与调试的深层挑战
DAG 编排解决了代码结构问题,但引入了新的复杂性。
状态持久化的缺失。上述轻量引擎将所有中间状态保存在内存中。一旦进程重启,工作流状态全部丢失。对于耗时较长的工作流(如人工审批节点等待数小时),必须将状态持久化到数据库。这需要引入检查点(Checkpoint)机制:每个节点执行完毕后,将上下文快照写入持久存储。恢复时从最近的检查点继续执行。实现检查点并不复杂,但需要处理 Schema 版本兼容——工作流定义更新后,旧检查点的数据结构可能不兼容。
LLM 调用的不确定性。DAG 的路由依赖 LLM 输出,但 LLM 输出具有非确定性。同一个输入,意图分类可能有时返回"query",有时返回"operation"。这导致工作流执行路径不可复现。解决方案是对路由节点的 LLM 调用设置temperature: 0,并在 Prompt 中严格约束输出格式。但即使如此,仍无法 100% 保证路由稳定性。
并行节点的错误传播。当两个并行节点中一个失败时,扇入节点应该如何处理?等待全部完成?忽略失败节点?还是整体回滚?不同业务场景需要不同策略,DAG 引擎必须提供可配置的错误传播语义。
适用边界:DAG 编排适合步骤数 ≥ 3、存在条件分支或并行执行、且需要可视化调试的 LLM 工作流。对于简单的线性调用链(步骤 ≤ 2),直接写await更直观。
五、总结
LLM 工作流自动化的核心跃迁,是从命令式的 Prompt 拼接走向声明式的 DAG 编排。将每个 LLM 调用抽象为 DAG 节点,用边定义数据依赖,让执行引擎自动处理调度、并行和路由。落地路线建议:第一步,用线性await链快速验证业务逻辑的可行性;第二步,当步骤数超过 3 且出现条件分支时,引入 DAG 引擎重构;第三步,为长耗时工作流添加检查点持久化,确保故障恢复能力。编排不是目的,可维护性才是。