DataAgent 接口层分析
请关注公众号【碳硅化合物AI】
概要总结
本文档深入分析了 DataAgent 的接口层设计,重点阐述了系统如何支持多 Agent、多模型、能力池构建和多数据库等复杂场景。
接口层主要通过 RESTful API 提供服务,核心入口是GraphController,它提供了流式查询接口。系统通过agentId和threadId实现多 Agent 的隔离,每个 Agent 拥有独立的数据源配置、知识库和对话上下文。MultiTurnContextManager负责管理每个线程的多轮对话上下文,使用ConcurrentHashMap实现线程安全的上下文存储。
多模型支持通过AiModelRegistry模型注册表实现,采用单例模式管理全局的 Chat 模型和 Embedding 模型,支持运行时热切换。DynamicModelFactory负责根据配置动态创建模型实例,统一使用 OpenAI 兼容接口,通过baseUrl实现多厂商兼容(Qwen、Deepseek、OpenAI 等)。
能力池通过CodePoolExecutorService接口定义统一的代码执行规范,支持 Docker 和本地两种执行模式。DockerCodePoolExecutorService使用 Docker 容器提供隔离的执行环境,LocalCodePoolExecutorService使用本地 Python 环境执行代码。通过工厂模式根据配置创建相应的执行器实例。
多数据库支持通过DatabaseUtil动态获取每个 Agent 的数据源配置,使用 Druid 连接池管理连接,支持 MySQL、PostgreSQL、H2、达梦等多种数据库。整体接口层设计体现了高内聚、低耦合的原则,各个组件职责清晰,便于维护和扩展。
目录
- 1. 入口类以及说明
- 1.1 接口层入口
- 1.2 关键类关系图
- 2. 多 Agent 支持机制
- 2.1 Agent 隔离机制
- 2.2 多轮对话上下文管理
- 2.3 StreamContext 管理
- 2.4 多 Agent 管理架构图
- 3. 多模型支持机制
- 3.1 模型注册表 (AiModelRegistry)
- 3.2 动态模型工厂 (DynamicModelFactory)
- 3.3 模型热切换机制
- 3.4 多模型注册表架构图
- 4. 能力池构建
- 4.1 代码执行服务接口
- 4.2 Docker 执行器实现
- 4.3 本地执行器实现
- 4.4 工厂模式创建执行器
- 4.5 能力池架构图
- 5. 多数据库支持
- 5.1 数据源配置管理
- 5.2 Druid 连接池
- 5.3 多数据库支持架构图
- 6. 实现关键点说明
- 6.1 线程安全的上下文管理
- 6.2 模型热切换机制
- 6.3 能力池的容器管理
- 6.4 多数据源的动态切换
- 6.5 统一接口设计
- 7. 总结说明
1. 入口类以及说明
1.1 接口层入口
DataAgent 的接口层主要通过 RESTful API 提供服务,核心入口是GraphController,它提供了流式查询接口。同时,系统还提供了多个管理接口,用于管理智能体、数据源、模型配置等。
关键代码:
@GetMapping(value="/stream/search",produces=MediaType.TEXT_EVENT_STREAM_VALUE)publicFlux<ServerSentEvent<GraphNodeResponse>>streamSearch(@RequestParam("agentId")StringagentId,@RequestParam(value="threadId",required=false)StringthreadId,@RequestParam("query")Stringquery,// ... 其他参数){Sinks.Many<ServerSentEvent<GraphNodeResponse>>sink=Sinks.many().unicast().onBackpressureBuffer();GraphRequestrequest=GraphRequest.builder().agentId(agentId).threadId(threadId).query(query).build();graphService.graphStreamProcess(sink,request);returnsink.asFlux();}1.2 关键类关系图
2. 多 Agent 支持机制
2.1 Agent 隔离机制
系统通过agentId实现多 Agent 的隔离。每个 Agent 拥有独立的数据源配置、知识库和上下文管理。
关键代码:
privatevoidhandleNewProcess(GraphRequestgraphRequest){StringagentId=graphRequest.getAgentId();StringthreadId=graphRequest.getThreadId();// 每个请求都携带 agentId,用于获取该 Agent 的配置Flux<NodeOutput>nodeOutputFlux=compiledGraph.stream(Map.of(AGENT_ID,agentId,INPUT_KEY,query,...),RunnableConfig.builder().threadId(threadId).build());}2.2 多轮对话上下文管理
MultiTurnContextManager负责管理每个线程(threadId)的多轮对话上下文,实现线程级别的上下文隔离。
关键代码:
@ComponentpublicclassMultiTurnContextManager{privatefinalMap<String,Deque<ConversationTurn>>history=newConcurrentHashMap<>();privatefinalMap<String,PendingTurn>pendingTurns=newConcurrentHashMap<>();publicStringbuildContext(StringthreadId){Deque<ConversationTurn>deque=history.get(threadId);if(deque==null||deque.isEmpty()){return"(无)";}returndeque.stream().map(turn->String.format("Q: %s\nA: %s",turn.userQuestion(),turn.plan())).collect(Collectors.joining("\n\n"));}publicvoidbeginTurn(StringthreadId,StringuserQuestion){pendingTurns.put(threadId,newPendingTurn(userQuestion.trim()));}}实现关键点:
- 使用
ConcurrentHashMap实现线程安全的上下文存储 - 每个
threadId维护独立的对话历史 - 支持上下文长度限制,避免提示词过长
- 支持重启最后一轮对话(用于人工反馈场景)
2.3 StreamContext 管理
GraphServiceImpl使用ConcurrentHashMap<String, StreamContext>管理每个线程的流式处理上下文,实现多线程并发处理。
关键代码:
privatefinalConcurrentHashMap<String,StreamContext>streamContextMap=newConcurrentHashMap<>();publicvoidgraphStreamProcess(Sinks.Many<ServerSentEvent<GraphNodeResponse>>sink,GraphRequestgraphRequest){StringthreadId=graphRequest.getThreadId();StreamContextcontext=streamContextMap.computeIfAbsent(threadId,k->newStreamContext());context.setSink(sink);// 处理工作流执行}2.4 多 Agent 管理架构图
3. 多模型支持机制
3.1 模型注册表 (AiModelRegistry)
AiModelRegistry是模型管理的核心组件,采用单例模式管理全局的 Chat 模型和 Embedding 模型。
关键代码:
@ComponentpublicclassAiModelRegistry{privatevolatileChatClientcurrentChatClient;privatevolatileEmbeddingModelcurrentEmbeddingModel;publicChatClientgetChatClient(){if(currentChatClient==null){synchronized(this){if(currentChatClient==null){ModelConfigDTOconfig=modelConfigDataService.getActiveConfigByType(ModelType.CHAT);if(config!=null){ChatModelchatModel=modelFactory.createChatModel(config);currentChatClient=ChatClient.builder(chatModel).build();}}}}returncurrentChatClient;}publicvoidrefreshChat(){this.currentChatClient=null;log.info("Chat cache cleared.");}}实现关键点:
- 使用双重检查锁定(Double-Checked Locking)实现线程安全的懒加载
- 使用
volatile关键字保证可见性 - 支持热切换:通过
refreshChat()和refreshEmbedding()清除缓存,下次调用时重新加载
3.2 动态模型工厂 (DynamicModelFactory)
DynamicModelFactory负责根据配置动态创建模型实例,统一使用 OpenAI 兼容接口。
关键代码:
@ServicepublicclassDynamicModelFactory{publicChatModelcreateChatModel(ModelConfigDTOconfig){// 构建 OpenAiApi,通过 baseUrl 实现多厂商兼容OpenAiApi.BuilderapiBuilder=OpenAiApi.builder().apiKey(config.getApiKey()).baseUrl(config.getBaseUrl());if(StringUtils.hasText(config.getCompletionsPath())){apiBuilder.completionsPath(config.getCompletionsPath());}OpenAiApiopenAiApi=apiBuilder.build();// 构建运行时选项OpenAiChatOptionsopenAiChatOptions=OpenAiChatOptions.builder().model(config.getModelName()).temperature(config.getTemperature()).maxTokens(config.getMaxTokens()).build();returnOpenAiChatModel.builder().openAiApi(openAiApi).defaultOptions(openAiChatOptions).build();}}实现关键点:
- 统一使用
OpenAiChatModel和OpenAiEmbeddingModel,通过baseUrl实现多厂商兼容 - 支持自定义 API 路径(
completionsPath、embeddingsPath) - 支持配置模型参数(temperature、maxTokens 等)
3.3 模型热切换机制
系统通过ModelConfigOpsService实现模型配置的热切换,无需重启服务即可切换模型。
关键代码:
@Transactional(rollbackFor=Exception.class)publicvoidupdateAndRefresh(ModelConfigDTOdto){// 1. 更新数据库ModelConfigentity=modelConfigDataService.updateConfigInDb(dto);// 2. 检查是否是激活状态if(Boolean.TRUE.equals(entity.getIsActive())){// 3. 刷新内存模型refreshMemoryModel(entity.getModelType());}}privatevoidrefreshMemoryModel(ModelTypetype){if(type==ModelType.CHAT){aiModelRegistry.refreshChat();}elseif(type==ModelType.EMBEDDING){aiModelRegistry.refreshEmbedding();}}3.4 多模型注册表架构图
4. 能力池构建
4.1 代码执行服务接口
CodePoolExecutorService定义了代码执行的统一接口,支持 Docker 和本地两种执行模式。
关键代码:
publicinterfaceCodePoolExecutorService{TaskResponserunTask(TaskRequestrequest);recordTaskRequest(Stringcode,Stringinput,Stringrequirement){}recordTaskResponse(booleanisSuccess,StringstdOut,StringstdErr,StringexceptionMsg){publicstaticTaskResponsesuccess(StringstdOut){returnnewTaskResponse(true,stdOut,null,null);}publicstaticTaskResponsefailure(StringstdOut,StringstdErr){returnnewTaskResponse(false,stdOut,stdErr,"StdErr: "+stdErr);}}}4.2 Docker 执行器实现
DockerCodePoolExecutorService使用 Docker 容器执行 Python 代码,提供隔离的执行环境。
关键代码:
publicclassDockerCodePoolExecutorServiceextendsAbstractCodePoolExecutorService{privatefinalDockerClientdockerClient;@OverrideprotectedStringcreateNewContainer()throwsException{StringcontainerName=this.generateContainerName();PathtempDir=Files.createTempDirectory(containerName);// 创建容器CreateContainerResponsecontainer=dockerClient.createContainerCmd(properties.getImageName()).withName(containerName).withWorkingDir("/app").withHostConfig(hostConfig).withCmd("sh","-c","python3 -u script.py < input_data.txt").exec();returncontainer.getId();}@OverrideprotectedTaskResponseexecTaskInContainer(TaskRequestrequest,StringcontainerId){// 写入代码和输入数据到容器// 启动容器执行// 读取执行结果}}4.3 本地执行器实现
LocalCodePoolExecutorService使用本地 Python 环境执行代码,适合开发和测试环境。
关键代码:
publicclassLocalCodePoolExecutorServiceextendsAbstractCodePoolExecutorService{privatefinalConcurrentHashMap<String,Path>containers;@OverrideprotectedStringcreateNewContainer()throwsException{Pathcontainer=Files.createTempDirectory(this.properties.getContainerNamePrefix());this.containers.put(containerId,container);returncontainerId;}@OverrideprotectedTaskResponseexecTaskInContainer(TaskRequestrequest,StringcontainerId){Pathcontainer=this.containers.get(containerId);PathscriptFile=container.resolve("script.py");Files.write(scriptFile,request.code().getBytes());ProcessBuilderpb=newProcessBuilder("python3",scriptFile.toAbsolutePath().toString());Processprocess=pb.start();// 读取执行结果}}4.4 工厂模式创建执行器
CodePoolExecutorServiceFactory根据配置创建相应的执行器实例。
关键代码:
publicclassCodePoolExecutorServiceFactory{publicstaticCodePoolExecutorServicecreateExecutor(CodeExecutorPropertiesproperties){CodePoolExecutorEnumexecutorType=properties.getExecutorType();switch(executorType){caseDOCKER:returnnewDockerCodePoolExecutorService(properties);caseLOCAL:returnnewLocalCodePoolExecutorService(properties);caseAI_SIMULATION:returnnewAISimulationCodePoolExecutorService(properties);default:thrownewIllegalArgumentException("Unsupported executor type: "+executorType);}}}4.5 能力池架构图
5. 多数据库支持
5.1 数据源配置管理
系统通过DatabaseUtil动态获取每个 Agent 的数据源配置,支持多数据源。
关键代码:
publicDbConfigBOgetAgentDbConfig(IntegeragentId){AgentDatasourceagentDatasource=agentDatasourceService.getCurrentAgentDatasource(agentId);IntegerdatasourceId=agentDatasource.getDatasourceId();DataSourcedataSource=datasourceService.findById(datasourceId);returnDbConfigBO.builder().url(dataSource.getConnectionUrl()).username(dataSource.getUsername()).password(dataSource.getPassword()).driverClassName(dataSource.getDriverClassName()).build();}5.2 Druid 连接池
系统使用 Druid 作为数据库连接池,提供连接管理、监控等功能。
关键配置:
@BeanpublicDataSourcedataSource(){DruidDataSourcedataSource=newDruidDataSource();dataSource.setUrl(jdbcUrl);dataSource.setUsername(username);dataSource.setPassword(password);dataSource.setDriverClassName(driverClassName);// 连接池配置dataSource.setInitialSize(initialSize);dataSource.setMaxActive(maxActive);dataSource.setMinIdle(minIdle);returndataSource;}5.3 多数据库支持架构图
6. 实现关键点说明
6.1 线程安全的上下文管理
系统使用ConcurrentHashMap实现线程安全的上下文管理:
- StreamContext 管理:每个
threadId维护独立的StreamContext,支持并发访问 - 多轮对话历史:使用
ConcurrentHashMap<String, Deque<ConversationTurn>>存储对话历史,Deque操作使用synchronized保证线程安全 - 待处理轮次:使用
ConcurrentHashMap<String, PendingTurn>存储待处理的轮次
6.2 模型热切换机制
模型热切换通过清除缓存实现:
- 更新配置:通过
ModelConfigOpsService.updateAndRefresh()更新数据库配置 - 清除缓存:调用
AiModelRegistry.refreshChat()或refreshEmbedding()清除缓存 - 懒加载:下次调用
getChatClient()或getEmbeddingModel()时,重新从数据库加载配置并创建模型实例
6.3 能力池的容器管理
能力池通过容器池管理执行环境:
- Docker 模式:每个任务创建独立的 Docker 容器,执行完成后清理
- 本地模式:每个任务使用独立的临时目录,执行完成后清理
- 容器复用:通过
AbstractCodePoolExecutorService实现容器池管理,支持容器复用和清理
6.4 多数据源的动态切换
系统支持每个 Agent 配置独立的数据源:
- 配置存储:数据源配置存储在数据库中,支持动态添加和修改
- 动态获取:执行 SQL 时,通过
agentId动态获取对应的数据源配置 - 连接管理:通过 Druid 连接池管理连接,支持连接复用和监控
6.5 统一接口设计
系统通过接口和工厂模式实现灵活的扩展:
- 模型接口:统一使用 OpenAI 兼容接口,通过
baseUrl支持多厂商 - 执行器接口:
CodePoolExecutorService接口支持多种实现(Docker、本地、AI 模拟) - 工厂模式:通过工厂类根据配置创建相应的实现实例
7. 总结说明
DataAgent 的接口层设计充分考虑了多 Agent、多模型、多数据源等复杂场景。通过线程安全的上下文管理、模型热切换机制、能力池构建和多数据源支持,系统能够灵活地支持不同的业务需求。
多 Agent 支持通过agentId和threadId实现隔离,每个 Agent 拥有独立的数据源、知识库和对话上下文。多模型支持通过模型注册表和动态工厂实现,支持运行时热切换。能力池通过接口和工厂模式实现,支持 Docker、本地等多种执行模式。多数据源支持通过动态配置和连接池实现,支持 MySQL、PostgreSQL、H2、达梦等多种数据库。
整体接口层设计体现了高内聚、低耦合的原则,各个组件职责清晰,便于维护和扩展。这种设计为后续的功能扩展和性能优化提供了良好的基础。