news 2026/4/15 17:10:39

Microsoft Agent Framework - Workflow 并行执行

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Microsoft Agent Framework - Workflow 并行执行

Microsoft Agent Framework - Workflow 并行执行

在之前的文章中,我们可能已经熟悉了顺序执行的工作流,任务按部就班地一步步完成。今天,我们将探讨一个更强大、更高效的模式:并行执行(Concurrent Execution)

什么是并行执行?

想象一下,你需要向多位不同领域的专家(例如,一位英语专家和一位日语专家)咨询同一个问题,并希望同时得到他们的答复,而不是一个接一个地等待。并行执行就是为了解决这类场景而设计的。

在 Workflow 中,我们可以将一个任务同时分发给多个 Agent,让它们并行处理,最后再将它们的结果汇总起来。这种“分而治之”再“合而为一”的模式,极大地提高了处理效率,并能轻松实现多视角分析、任务分发等复杂场景。

关键组件:Fan-OutFan-In

为了在WorkflowBuilder中构建并行流程,我们需要了解两个核心概念:

  1. AddFanOutEdge(扇出): 这个方法创建一个“扇出”边,它将单个源节点(通常是一个分发任务的 Executor)的输出,同时发送给多个目标节点(如此处的多个 Agent)。它就像一个任务分发中心。

  2. AddFanInEdge(扇入): 这个方法创建一个“扇入”边,它会收集多个源节点(如此处的多个 Agent)的输出,然后将这些输出作为一个列表,发送给单个目标节点(通常是一个聚合结果的 Executor)。它就像一个结果回收站。

下面这张图清晰地展示了这个流程:

代码示例

让我们通过一个具体的例子来深入理解。下面的代码演示了如何创建一个向“英语专家”和“日语专家”同时提问,并汇总他们回答的 Workflow。

1. 定义并行任务的启动和聚合 Executor

为了管理并行流程,我们创建了两个专门的Executor

  • ConcurrentStartExecutor: 负责启动整个并行流程。它通过context.SendMessageAsync广播用户消息和TurnToken,从而激活所有连接的 Agent,让它们开始处理任务。

  • ConcurrentAggregationExecutor: 负责聚合结果。它会收集所有并行 Agent 返回的消息。在这个例子中,它被设计为当收集到两条消息(来自英语和日语专家)后,将它们格式化并作为最终结果输出。

using System.ClientModel; using Microsoft.Extensions.AI; using OpenAI; using Microsoft.Agents.AI.Workflows; using OpenAI.Chat; namespaceMSAgentFramework.Learn.workflow { /// <summary> /// Executor that starts the concurrent processing by sending messages to the agents. /// </summary> internal sealed class ConcurrentStartExecutor() : Executor<string>("ConcurrentStartExecutor") { /// <summary> /// Starts the concurrent processing by sending messages to the agents. /// </summary> /// <param name="message">The user message to process</param> /// <param name="context">Workflow context for accessing workflow services and adding events</param> /// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. /// The default is <see cref="CancellationToken.None"/>.</param> /// <returns>A task representing the asynchronous operation</returns> public override async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default) { // Broadcast the message to all connected agents. Receiving agents will queue // the message but will not start processing until they receive a turn token. await context.SendMessageAsync(new Microsoft.Extensions.AI.ChatMessage(ChatRole.User, message), cancellationToken); // Broadcast the turn token to kick off the agents. await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken); } } /// <summary> /// Executor that aggregates the results from the concurrent agents. /// </summary> internal sealed class ConcurrentAggregationExecutor() : Executor<List<Microsoft.Extensions.AI.ChatMessage>>("ConcurrentAggregationExecutor") { privatereadonly List<Microsoft.Extensions.AI.ChatMessage> _messages = []; /// <summary> /// Handles incoming messages from the agents and aggregates their responses. /// </summary> /// <param name="message">The message from the agent</param> /// <param name="context">Workflow context for accessing workflow services and adding events</param> /// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. /// The default is <see cref="CancellationToken.None"/>.</param> /// <returns>A task representing the asynchronous operation</returns> public override async ValueTask HandleAsync(List<Microsoft.Extensions.AI.ChatMessage> message, IWorkflowContext context, CancellationToken cancellationToken = default) { this._messages.AddRange(message); if (this._messages.Count == 2) { var formattedMessages = string.Join(Environment.NewLine, this._messages.Select(m => $"{m.AuthorName}: {m.Text}")); await context.YieldOutputAsync(formattedMessages, cancellationToken); } } } // ... }
2. 构建并运行 Workflow

接下来,我们定义 Agent,并使用WorkflowBuilder将所有部分连接起来。

  1. 定义 Agent: 我们创建english_expertjapan_expert两个 Agent,并分别给予它们不同的指令。

  2. 构建工作流:

  • startExecutor作为起点。

  • 使用AddFanOutEdgestartExecutorenglish_expertjapan_expert连接起来,实现任务分发。

  • 使用AddFanInEdgeenglish_expertjapan_expert的输出连接到aggregationExecutor,实现结果聚合。

  • 最后,指定aggregationExecutor的输出为整个工作流的最终输出。

// ... internalclassConcurrent { public async Task Run() { var endpoint = "https://api.deepseek.com/v1"; var apiKey = "sk-xxx"; var english_expert = new OpenAIClient( new ApiKeyCredential(apiKey) , new OpenAIClientOptions() { Endpoint = new Uri(endpoint) } ) .GetChatClient("deepseek-chat") .CreateAIAgent(instructions: "你是一个英语专家,你总是使用英文回答问题.", name: "english_expert"); var japan_expert = new OpenAIClient( new ApiKeyCredential(apiKey) , new OpenAIClientOptions() { Endpoint = new Uri(endpoint) } ) .GetChatClient("deepseek-chat") .CreateAIAgent(instructions: "你是一个日语专家,你总是使用日语回答问题.", name: "japan_expert"); var startExecutor = new ConcurrentStartExecutor(); var aggregationExecutor = new ConcurrentAggregationExecutor(); // Build the workflow by adding executors and connecting them var workflow = new WorkflowBuilder(startExecutor) .AddFanOutEdge(startExecutor, targets: [english_expert, japan_expert]) .AddFanInEdge(aggregationExecutor, sources: [english_expert, japan_expert]) .WithOutputFrom(aggregationExecutor) .Build(); // Run the workflow awaitusing StreamingRun run = await InProcessExecution.StreamAsync(workflow, "西瓜可以吃吗?"); awaitforeach (WorkflowEvent evt in run.WatchStreamAsync()) { if (evt is WorkflowOutputEvent output) { Console.WriteLine($"Workflow completed with results:\n{output.Data}"); } } } }
3. 查看结果

当我们用问题“西瓜可以吃吗?”运行此工作流时,english_expertjapan_expert会同时开始思考并生成答案。aggregationExecutor收集到两个答案后,将它们组合并输出。最终,你会看到类似下面的结果:

Workflow completed with results: english_expert: Yes, absolutely! Watermelon is not only safe to eat, but it's also a delicious and nutritious fruit. It's rich in vitamins (like vitamin A and C), antioxidants, and has a high water content, making it hydrating. Just be sure to wash the rind before cutting if you plan to slice it, and avoid eating the seeds if you prefer seedless varieties. Enjoy! japan_expert: はい、スイカは食べられます。スイカは夏の定番フルーツで、甘くてみずみずしい果肉が特徴です。生でそのまま食べるほか、サラダやスムージー、デザートなどにも使われます。種を取り除いて食べるのが一般的ですが、種ごと食べられる品種もあります。栄養面では水分が多く、ビタミンCやカリウムなどを含んでいます。ただし、体を冷やす作用があるので、食べ過ぎには注意しましょう。

总结

通过AddFanOutEdgeAddFanInEdge,Microsoft Agent Framework 的 Workflow 功能为我们提供了一种简洁而强大的方式来编排并行任务。这不仅限于多语言翻译,还可以应用于:

  • 多角度分析:让不同角色的 Agent(如“代码审查员”、“安全分析师”、“性能优化师”)同时分析一段代码。

  • 分布式数据处理:将一个大数据集切片,分发给多个 Agent 并行处理。

  • 工具并行调用:同时调用多个外部 API 或工具,并等待所有结果返回。

希望这篇博客能帮助你理解并开始在你的项目中使用并行工作流,以构建更高效、更智能的 Agent 应用。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/13 5:31:14

Anaconda Prompt替代方案:Miniconda-Python3.10终端快捷激活

Miniconda-Python3.10&#xff1a;轻量级终端环境的高效激活实践 在数据科学与AI开发日益普及的今天&#xff0c;一个常见却令人头疼的问题是&#xff1a;为什么同一个Python脚本&#xff0c;在同事电脑上运行正常&#xff0c;到了自己机器上却报错“模块未找到”或“版本不兼容…

作者头像 李华
网站建设 2026/4/13 2:21:31

STM32CubeMX安装包与IDE集成入门操作指南

从零开始搭建STM32开发环境&#xff1a;CubeMX实战入门与IDE无缝集成 你是不是也经历过这样的场景&#xff1f;刚拿到一块STM32开发板&#xff0c;满怀激情打开数据手册&#xff0c;翻到时钟树那一页——密密麻麻的PLL、分频器、倍频路径看得头晕眼花。配错了&#xff0c;系统…

作者头像 李华
网站建设 2026/4/12 23:33:31

信息安全篇---密钥生成、加密、解密

&#x1f4e6; 故事设定小红想接收秘密信件&#xff0c;她要做三件事&#xff1a;造一套魔法锁具&#xff08;生成密钥对&#xff09;把“魔法锁”发给朋友&#xff08;公布公钥&#xff09;用“魔法钥匙”开锁读信&#xff08;私钥解密&#xff09;朋友小明要给小红寄信&#…

作者头像 李华
网站建设 2026/4/2 7:52:28

DownKyi视频下载神器:B站无限下载终极指南

还在为无法离线观看B站精彩内容而烦恼吗&#xff1f;DownKyi作为专业的B站视频下载工具&#xff0c;为你提供全格式视频下载解决方案。这款开源软件支持从标准画质到8K超高清、HDR、杜比视界等高级视频格式&#xff0c;满足各种场景下的下载需求。 【免费下载链接】downkyi 哔哩…

作者头像 李华
网站建设 2026/4/15 14:52:02

LeagueAkari:英雄联盟智能助手完整使用指南

LeagueAkari&#xff1a;英雄联盟智能助手完整使用指南 【免费下载链接】LeagueAkari ✨兴趣使然的&#xff0c;功能全面的英雄联盟工具集。支持战绩查询、自动秒选等功能。基于 LCU API。 项目地址: https://gitcode.com/gh_mirrors/le/LeagueAkari LeagueAkari是一款基…

作者头像 李华
网站建设 2026/4/15 19:01:31

LeaguePrank深度评测:游戏数据展示工具的边界探索

在现代游戏生态中&#xff0c;游戏数据展示工具始终处于技术与道德的交叉地带。LeaguePrank作为一款基于LCUAPI的本地化定制工具&#xff0c;为《英雄联盟》玩家提供了展示层数据展示的可能性。这款工具能否在安全合规的前提下满足用户的个性化需求&#xff1f;让我们从技术解析…

作者头像 李华