news 2026/2/28 11:44:01

DataAgent 接口层分析

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
DataAgent 接口层分析

DataAgent 接口层分析

请关注公众号【碳硅化合物AI】

概要总结

本文档深入分析了 DataAgent 的接口层设计,重点阐述了系统如何支持多 Agent、多模型、能力池构建和多数据库等复杂场景。

接口层主要通过 RESTful API 提供服务,核心入口是GraphController,它提供了流式查询接口。系统通过agentIdthreadId实现多 Agent 的隔离,每个 Agent 拥有独立的数据源配置、知识库和对话上下文。MultiTurnContextManager负责管理每个线程的多轮对话上下文,使用ConcurrentHashMap实现线程安全的上下文存储。

多模型支持通过AiModelRegistry模型注册表实现,采用单例模式管理全局的 Chat 模型和 Embedding 模型,支持运行时热切换。DynamicModelFactory负责根据配置动态创建模型实例,统一使用 OpenAI 兼容接口,通过baseUrl实现多厂商兼容(Qwen、Deepseek、OpenAI 等)。

能力池通过CodePoolExecutorService接口定义统一的代码执行规范,支持 Docker 和本地两种执行模式。DockerCodePoolExecutorService使用 Docker 容器提供隔离的执行环境,LocalCodePoolExecutorService使用本地 Python 环境执行代码。通过工厂模式根据配置创建相应的执行器实例。

多数据库支持通过DatabaseUtil动态获取每个 Agent 的数据源配置,使用 Druid 连接池管理连接,支持 MySQL、PostgreSQL、H2、达梦等多种数据库。整体接口层设计体现了高内聚、低耦合的原则,各个组件职责清晰,便于维护和扩展。

目录

  • 1. 入口类以及说明
    • 1.1 接口层入口
    • 1.2 关键类关系图
  • 2. 多 Agent 支持机制
    • 2.1 Agent 隔离机制
    • 2.2 多轮对话上下文管理
    • 2.3 StreamContext 管理
    • 2.4 多 Agent 管理架构图
  • 3. 多模型支持机制
    • 3.1 模型注册表 (AiModelRegistry)
    • 3.2 动态模型工厂 (DynamicModelFactory)
    • 3.3 模型热切换机制
    • 3.4 多模型注册表架构图
  • 4. 能力池构建
    • 4.1 代码执行服务接口
    • 4.2 Docker 执行器实现
    • 4.3 本地执行器实现
    • 4.4 工厂模式创建执行器
    • 4.5 能力池架构图
  • 5. 多数据库支持
    • 5.1 数据源配置管理
    • 5.2 Druid 连接池
    • 5.3 多数据库支持架构图
  • 6. 实现关键点说明
    • 6.1 线程安全的上下文管理
    • 6.2 模型热切换机制
    • 6.3 能力池的容器管理
    • 6.4 多数据源的动态切换
    • 6.5 统一接口设计
  • 7. 总结说明

1. 入口类以及说明

1.1 接口层入口

DataAgent 的接口层主要通过 RESTful API 提供服务,核心入口是GraphController,它提供了流式查询接口。同时,系统还提供了多个管理接口,用于管理智能体、数据源、模型配置等。

关键代码

@GetMapping(value="/stream/search",produces=MediaType.TEXT_EVENT_STREAM_VALUE)publicFlux<ServerSentEvent<GraphNodeResponse>>streamSearch(@RequestParam("agentId")StringagentId,@RequestParam(value="threadId",required=false)StringthreadId,@RequestParam("query")Stringquery,// ... 其他参数){Sinks.Many<ServerSentEvent<GraphNodeResponse>>sink=Sinks.many().unicast().onBackpressureBuffer();GraphRequestrequest=GraphRequest.builder().agentId(agentId).threadId(threadId).query(query).build();graphService.graphStreamProcess(sink,request);returnsink.asFlux();}

1.2 关键类关系图

2. 多 Agent 支持机制

2.1 Agent 隔离机制

系统通过agentId实现多 Agent 的隔离。每个 Agent 拥有独立的数据源配置、知识库和上下文管理。

关键代码

privatevoidhandleNewProcess(GraphRequestgraphRequest){StringagentId=graphRequest.getAgentId();StringthreadId=graphRequest.getThreadId();// 每个请求都携带 agentId,用于获取该 Agent 的配置Flux<NodeOutput>nodeOutputFlux=compiledGraph.stream(Map.of(AGENT_ID,agentId,INPUT_KEY,query,...),RunnableConfig.builder().threadId(threadId).build());}

2.2 多轮对话上下文管理

MultiTurnContextManager负责管理每个线程(threadId)的多轮对话上下文,实现线程级别的上下文隔离。

关键代码

@ComponentpublicclassMultiTurnContextManager{privatefinalMap<String,Deque<ConversationTurn>>history=newConcurrentHashMap<>();privatefinalMap<String,PendingTurn>pendingTurns=newConcurrentHashMap<>();publicStringbuildContext(StringthreadId){Deque<ConversationTurn>deque=history.get(threadId);if(deque==null||deque.isEmpty()){return"(无)";}returndeque.stream().map(turn->String.format("Q: %s\nA: %s",turn.userQuestion(),turn.plan())).collect(Collectors.joining("\n\n"));}publicvoidbeginTurn(StringthreadId,StringuserQuestion){pendingTurns.put(threadId,newPendingTurn(userQuestion.trim()));}}

实现关键点

  • 使用ConcurrentHashMap实现线程安全的上下文存储
  • 每个threadId维护独立的对话历史
  • 支持上下文长度限制,避免提示词过长
  • 支持重启最后一轮对话(用于人工反馈场景)

2.3 StreamContext 管理

GraphServiceImpl使用ConcurrentHashMap<String, StreamContext>管理每个线程的流式处理上下文,实现多线程并发处理。

关键代码

privatefinalConcurrentHashMap<String,StreamContext>streamContextMap=newConcurrentHashMap<>();publicvoidgraphStreamProcess(Sinks.Many<ServerSentEvent<GraphNodeResponse>>sink,GraphRequestgraphRequest){StringthreadId=graphRequest.getThreadId();StreamContextcontext=streamContextMap.computeIfAbsent(threadId,k->newStreamContext());context.setSink(sink);// 处理工作流执行}

2.4 多 Agent 管理架构图

3. 多模型支持机制

3.1 模型注册表 (AiModelRegistry)

AiModelRegistry是模型管理的核心组件,采用单例模式管理全局的 Chat 模型和 Embedding 模型。

关键代码

@ComponentpublicclassAiModelRegistry{privatevolatileChatClientcurrentChatClient;privatevolatileEmbeddingModelcurrentEmbeddingModel;publicChatClientgetChatClient(){if(currentChatClient==null){synchronized(this){if(currentChatClient==null){ModelConfigDTOconfig=modelConfigDataService.getActiveConfigByType(ModelType.CHAT);if(config!=null){ChatModelchatModel=modelFactory.createChatModel(config);currentChatClient=ChatClient.builder(chatModel).build();}}}}returncurrentChatClient;}publicvoidrefreshChat(){this.currentChatClient=null;log.info("Chat cache cleared.");}}

实现关键点

  • 使用双重检查锁定(Double-Checked Locking)实现线程安全的懒加载
  • 使用volatile关键字保证可见性
  • 支持热切换:通过refreshChat()refreshEmbedding()清除缓存,下次调用时重新加载

3.2 动态模型工厂 (DynamicModelFactory)

DynamicModelFactory负责根据配置动态创建模型实例,统一使用 OpenAI 兼容接口。

关键代码

@ServicepublicclassDynamicModelFactory{publicChatModelcreateChatModel(ModelConfigDTOconfig){// 构建 OpenAiApi,通过 baseUrl 实现多厂商兼容OpenAiApi.BuilderapiBuilder=OpenAiApi.builder().apiKey(config.getApiKey()).baseUrl(config.getBaseUrl());if(StringUtils.hasText(config.getCompletionsPath())){apiBuilder.completionsPath(config.getCompletionsPath());}OpenAiApiopenAiApi=apiBuilder.build();// 构建运行时选项OpenAiChatOptionsopenAiChatOptions=OpenAiChatOptions.builder().model(config.getModelName()).temperature(config.getTemperature()).maxTokens(config.getMaxTokens()).build();returnOpenAiChatModel.builder().openAiApi(openAiApi).defaultOptions(openAiChatOptions).build();}}

实现关键点

  • 统一使用OpenAiChatModelOpenAiEmbeddingModel,通过baseUrl实现多厂商兼容
  • 支持自定义 API 路径(completionsPathembeddingsPath
  • 支持配置模型参数(temperature、maxTokens 等)

3.3 模型热切换机制

系统通过ModelConfigOpsService实现模型配置的热切换,无需重启服务即可切换模型。

关键代码

@Transactional(rollbackFor=Exception.class)publicvoidupdateAndRefresh(ModelConfigDTOdto){// 1. 更新数据库ModelConfigentity=modelConfigDataService.updateConfigInDb(dto);// 2. 检查是否是激活状态if(Boolean.TRUE.equals(entity.getIsActive())){// 3. 刷新内存模型refreshMemoryModel(entity.getModelType());}}privatevoidrefreshMemoryModel(ModelTypetype){if(type==ModelType.CHAT){aiModelRegistry.refreshChat();}elseif(type==ModelType.EMBEDDING){aiModelRegistry.refreshEmbedding();}}

3.4 多模型注册表架构图

4. 能力池构建

4.1 代码执行服务接口

CodePoolExecutorService定义了代码执行的统一接口,支持 Docker 和本地两种执行模式。

关键代码

publicinterfaceCodePoolExecutorService{TaskResponserunTask(TaskRequestrequest);recordTaskRequest(Stringcode,Stringinput,Stringrequirement){}recordTaskResponse(booleanisSuccess,StringstdOut,StringstdErr,StringexceptionMsg){publicstaticTaskResponsesuccess(StringstdOut){returnnewTaskResponse(true,stdOut,null,null);}publicstaticTaskResponsefailure(StringstdOut,StringstdErr){returnnewTaskResponse(false,stdOut,stdErr,"StdErr: "+stdErr);}}}

4.2 Docker 执行器实现

DockerCodePoolExecutorService使用 Docker 容器执行 Python 代码,提供隔离的执行环境。

关键代码

publicclassDockerCodePoolExecutorServiceextendsAbstractCodePoolExecutorService{privatefinalDockerClientdockerClient;@OverrideprotectedStringcreateNewContainer()throwsException{StringcontainerName=this.generateContainerName();PathtempDir=Files.createTempDirectory(containerName);// 创建容器CreateContainerResponsecontainer=dockerClient.createContainerCmd(properties.getImageName()).withName(containerName).withWorkingDir("/app").withHostConfig(hostConfig).withCmd("sh","-c","python3 -u script.py < input_data.txt").exec();returncontainer.getId();}@OverrideprotectedTaskResponseexecTaskInContainer(TaskRequestrequest,StringcontainerId){// 写入代码和输入数据到容器// 启动容器执行// 读取执行结果}}

4.3 本地执行器实现

LocalCodePoolExecutorService使用本地 Python 环境执行代码,适合开发和测试环境。

关键代码

publicclassLocalCodePoolExecutorServiceextendsAbstractCodePoolExecutorService{privatefinalConcurrentHashMap<String,Path>containers;@OverrideprotectedStringcreateNewContainer()throwsException{Pathcontainer=Files.createTempDirectory(this.properties.getContainerNamePrefix());this.containers.put(containerId,container);returncontainerId;}@OverrideprotectedTaskResponseexecTaskInContainer(TaskRequestrequest,StringcontainerId){Pathcontainer=this.containers.get(containerId);PathscriptFile=container.resolve("script.py");Files.write(scriptFile,request.code().getBytes());ProcessBuilderpb=newProcessBuilder("python3",scriptFile.toAbsolutePath().toString());Processprocess=pb.start();// 读取执行结果}}

4.4 工厂模式创建执行器

CodePoolExecutorServiceFactory根据配置创建相应的执行器实例。

关键代码

publicclassCodePoolExecutorServiceFactory{publicstaticCodePoolExecutorServicecreateExecutor(CodeExecutorPropertiesproperties){CodePoolExecutorEnumexecutorType=properties.getExecutorType();switch(executorType){caseDOCKER:returnnewDockerCodePoolExecutorService(properties);caseLOCAL:returnnewLocalCodePoolExecutorService(properties);caseAI_SIMULATION:returnnewAISimulationCodePoolExecutorService(properties);default:thrownewIllegalArgumentException("Unsupported executor type: "+executorType);}}}

4.5 能力池架构图

5. 多数据库支持

5.1 数据源配置管理

系统通过DatabaseUtil动态获取每个 Agent 的数据源配置,支持多数据源。

关键代码

publicDbConfigBOgetAgentDbConfig(IntegeragentId){AgentDatasourceagentDatasource=agentDatasourceService.getCurrentAgentDatasource(agentId);IntegerdatasourceId=agentDatasource.getDatasourceId();DataSourcedataSource=datasourceService.findById(datasourceId);returnDbConfigBO.builder().url(dataSource.getConnectionUrl()).username(dataSource.getUsername()).password(dataSource.getPassword()).driverClassName(dataSource.getDriverClassName()).build();}

5.2 Druid 连接池

系统使用 Druid 作为数据库连接池,提供连接管理、监控等功能。

关键配置

@BeanpublicDataSourcedataSource(){DruidDataSourcedataSource=newDruidDataSource();dataSource.setUrl(jdbcUrl);dataSource.setUsername(username);dataSource.setPassword(password);dataSource.setDriverClassName(driverClassName);// 连接池配置dataSource.setInitialSize(initialSize);dataSource.setMaxActive(maxActive);dataSource.setMinIdle(minIdle);returndataSource;}

5.3 多数据库支持架构图

6. 实现关键点说明

6.1 线程安全的上下文管理

系统使用ConcurrentHashMap实现线程安全的上下文管理:

  • StreamContext 管理:每个threadId维护独立的StreamContext,支持并发访问
  • 多轮对话历史:使用ConcurrentHashMap<String, Deque<ConversationTurn>>存储对话历史,Deque操作使用synchronized保证线程安全
  • 待处理轮次:使用ConcurrentHashMap<String, PendingTurn>存储待处理的轮次

6.2 模型热切换机制

模型热切换通过清除缓存实现:

  1. 更新配置:通过ModelConfigOpsService.updateAndRefresh()更新数据库配置
  2. 清除缓存:调用AiModelRegistry.refreshChat()refreshEmbedding()清除缓存
  3. 懒加载:下次调用getChatClient()getEmbeddingModel()时,重新从数据库加载配置并创建模型实例

6.3 能力池的容器管理

能力池通过容器池管理执行环境:

  • Docker 模式:每个任务创建独立的 Docker 容器,执行完成后清理
  • 本地模式:每个任务使用独立的临时目录,执行完成后清理
  • 容器复用:通过AbstractCodePoolExecutorService实现容器池管理,支持容器复用和清理

6.4 多数据源的动态切换

系统支持每个 Agent 配置独立的数据源:

  • 配置存储:数据源配置存储在数据库中,支持动态添加和修改
  • 动态获取:执行 SQL 时,通过agentId动态获取对应的数据源配置
  • 连接管理:通过 Druid 连接池管理连接,支持连接复用和监控

6.5 统一接口设计

系统通过接口和工厂模式实现灵活的扩展:

  • 模型接口:统一使用 OpenAI 兼容接口,通过baseUrl支持多厂商
  • 执行器接口CodePoolExecutorService接口支持多种实现(Docker、本地、AI 模拟)
  • 工厂模式:通过工厂类根据配置创建相应的实现实例

7. 总结说明

DataAgent 的接口层设计充分考虑了多 Agent、多模型、多数据源等复杂场景。通过线程安全的上下文管理、模型热切换机制、能力池构建和多数据源支持,系统能够灵活地支持不同的业务需求。

多 Agent 支持通过agentIdthreadId实现隔离,每个 Agent 拥有独立的数据源、知识库和对话上下文。多模型支持通过模型注册表和动态工厂实现,支持运行时热切换。能力池通过接口和工厂模式实现,支持 Docker、本地等多种执行模式。多数据源支持通过动态配置和连接池实现,支持 MySQL、PostgreSQL、H2、达梦等多种数据库。

整体接口层设计体现了高内聚、低耦合的原则,各个组件职责清晰,便于维护和扩展。这种设计为后续的功能扩展和性能优化提供了良好的基础。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/2/18 19:24:35

亲测SenseVoiceSmall,AI情感识别真实体验分享

亲测SenseVoiceSmall&#xff0c;AI情感识别真实体验分享 最近在做语音交互项目时&#xff0c;一直在寻找一款既能准确转写语音&#xff0c;又能理解说话人情绪的模型。市面上大多数语音识别工具还停留在“听清你说什么”的阶段&#xff0c;而我需要的是能“读懂你的情绪”的能…

作者头像 李华
网站建设 2026/2/18 6:11:18

快速启动YOLOE容器,GPU环境配置一步到位

快速启动YOLOE容器&#xff0c;GPU环境配置一步到位 你是否也经历过这样的场景&#xff1a;好不容易找到一个前沿的开放词汇检测模型&#xff0c;结果卡在环境配置上——CUDA版本不匹配、PyTorch编译失败、CLIP依赖冲突、Gradio端口起不来……折腾半天&#xff0c;连第一张图片…

作者头像 李华
网站建设 2026/2/27 2:34:40

Docker build缓存失效真相:87%的“强制更新”其实根本没生效!用docker image history -v反向验证你的每一层是否真被重建(附自动化校验工具)

第一章&#xff1a;Docker build缓存失效的真相与认知误区Docker 构建缓存并非“智能记忆”&#xff0c;而是严格基于构建上下文、指令顺序与内容哈希的确定性机制。许多开发者误以为只要 Dockerfile 未修改&#xff0c;缓存就必然复用&#xff1b;实则任意上游层&#xff08;如…

作者头像 李华
网站建设 2026/2/27 12:46:07

三分钟掌握m3u8视频下载神器:MediaGo深度体验指南

三分钟掌握m3u8视频下载神器&#xff1a;MediaGo深度体验指南 【免费下载链接】m3u8-downloader m3u8 视频在线提取工具 流媒体下载 m3u8下载 桌面客户端 windows mac 项目地址: https://gitcode.com/gh_mirrors/m3u8/m3u8-downloader 还在为网页视频无法保存而烦恼吗&a…

作者头像 李华
网站建设 2026/2/27 23:29:18

YOLOE模型推理提速秘诀,官方镜像真香

YOLOE模型推理提速秘诀&#xff0c;官方镜像真香 在智能安防、工业质检和自动驾驶等实时性要求极高的场景中&#xff0c;目标检测与分割的“快”与“准”始终是一对难以调和的矛盾。传统方案往往依赖高性能GPU集群才能勉强满足帧率需求&#xff0c;部署成本居高不下。而随着YO…

作者头像 李华
网站建设 2026/2/23 3:53:56

DeepSeek-Coder-V2:让编程效率翻倍的智能代码助手

DeepSeek-Coder-V2&#xff1a;让编程效率翻倍的智能代码助手 【免费下载链接】DeepSeek-Coder-V2 项目地址: https://gitcode.com/GitHub_Trending/de/DeepSeek-Coder-V2 你是不是经常在夜深人静的时候&#xff0c;对着屏幕上的bug百思不得其解&#xff1f;或者在学习…

作者头像 李华