news 2026/7/2 5:19:17

AI 流式响应实战:从同步等待到实时推送

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
AI 流式响应实战:从同步等待到实时推送

AI 流式响应实战:从同步等待到实时推送

在 IM 系统中集成 AI 时,流式响应能显著提升性能。本文介绍 AQChat 如何实现 AI 流式响应,从同步等待到实时推送。

一、为什么需要流式响应?

同步等待的问题

传统同步方式的问题

// ❌ 同步方式:用户需要等待AI完整响应StringaiResponse=aiService.getAnswer(userMessage);// 如果AI响应需要10秒,用户就要等待10秒sendMessage(aiResponse);

问题:

  1. 等待时间长:AI 生成可能需要 5-10 秒,用户长时间等待
  2. 体验差:无法看到生成过程,感觉卡顿
  3. 资源占用:长时间占用连线和线程

流式响应的优势

  1. 实时反馈:逐字显示,用户可立即看到内容
  2. 体验更好:类似 ChatGPT 的打字机效果
  3. 资源利用:边生成边推送,不阻塞

对比

方式首字延迟完整响应时间用户体验
同步等待10秒10秒
流式响应1-2秒10秒

回调函数模式的设计

统一接口设计

定义统一的 AI 服务接口

publicinterfaceIAiService{/** * 流式调用AI服务 * @param userMsg 用户消息 * @param consumer 回调函数,处理每个数据块 */voidstreamCallWithMessage(StringuserMsg,Consumer<AIResult>consumer);/** * 多轮对话 */defaultvoidchat(Stringmessage,List<MessageRecord>messages,Consumer<AIResult>consumer){}}

关键点

  • 使用Consumer<AIResult>作为回调
  • 每个数据块通过回调处理
  • 支持多轮对话

AIResult 设计

publicinterfaceAIResult{StringgetContent();// 当前数据块的内容intgetStatus();// 状态:WAIT(0-进行中)、END(1-结束)、FAIL(2-失败)}

状态枚举

publicenumAIMessageStatusEnum{WAIT(0,"wait"),// 流式响应进行中END(1,"end"),// 流式响应结束FAIL(2,"fail");// 流式响应失败}

三、WebSocket 实时推送的实现

整体流程

用户发送消息 ↓RocketMQ异步处理 ↓ AI服务流式调用 ↓ 回调函数处理每个数据块 ↓ 封装为 STREAM_MSG_NOTIFY ↓WebSocket实时推送

代码实现

  1. RocketMQ 消费者接收消息
@ComponentpublicclassAIHelperReceiverimplementsInitializingBean{@ResourceprivateIAiServiceaiService;@ResourceprivateGlobalChannelHolderglobalChannelHolder;publicvoidinitConsumer(){defaultMQPushConsumer.setMessageListener((MessageListenerConcurrently)(messageExtList,context)->{for(MessageExtmessageExt:messageExtList){MessageDtomessageDto=JSONObject.parseObject(msgStr,MessageDto.class);// 提交到独立线程池,不阻塞MQ消费线程threadPoolUtil.submitTask(()->{StringBuilderfullContent=newStringBuilder();try{// 流式调用AI服务aiService.streamCallWithMessage(messageDto.getMessageContent(),aiResult->{// 回调函数:处理每个数据块AIMessageDtoaiMessageDto=newAIMessageDto();aiMessageDto.setMessageId(messageDto.getMessageId());aiMessageDto.setRoomId(messageDto.getRoomId());aiMessageDto.setContent(aiResult.getContent());aiMessageDto.setStatus(aiResult.getStatus());// 实时推送globalChannelHolder.sendBroadcastAIMessage(aiMessageDto,AQBusinessConstant.AI_HELPER_ID);// 累积完整内容fullContent.append(aiResult.getContent());});}catch(Exceptione){// 错误处理LOGGER.error("AI助手处理消息失败",e);AIMessageDtofailMessage=newAIMessageDto();failMessage.setStatus(AIMessageStatusEnum.FAIL.getCode());globalChannelHolder.sendBroadcastAIMessage(failMessage,AQBusinessConstant.AI_HELPER_ID);}finally{// 流式响应结束后,持久化完整消息MessageDtostoreMessage=buildStoreMessage(messageDto,fullContent);messageService.saveMessage(storeMessage);}});}returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;});}}
  1. 封装流式消息并推送
@ComponentpublicclassGlobalChannelHolder{publicvoidsendBroadcastAIMessage(AIMessageDtoaiMessageDto,StringaiId){// 1. 获取AI助手信息UserGlobalInfoDtouserInfo=userHolder.getUserInfo(aiId);// 2. 构建流式消息AQChatMsgProtocol.StreamMsgNotifystreamMsgNotify=AQChatMsgProtocol.StreamMsgNotify.newBuilder().setUser(userBuilder).setMsgId(aiMessageDto.getMessageId()).setRoomId(aiMessageDto.getRoomId()).setContent(aiMessageDto.getContent()==null?"":aiMessageDto.getContent()).setStreamType(aiMessageDto.getStatus())// 0-进行中,1-结束,2-失败.build();// 3. 广播到房间内所有用户messageBroadcaster.broadcast(aiMessageDto.getRoomId(),streamMsgNotify);}}
  1. 消息广播
@ComponentpublicclassMessageBroadcaster{privatefinalMap<String,ChannelGroup>channelGroupMap=newConcurrentHashMap<>();public<TextendsGeneratedMessageV3>voidbroadcast(StringroomId,Tmsg){ChannelGroupchannelGroup=channelGroupMap.get(roomId);if(channelGroup!=null){// 批量发送,高效channelGroup.writeAndFlush(msg);}}}

四、流式消息的封装(STREM_MSG_NOTIFY)

Protobuf 消息定义

// 流式消息通知messageStreamMsgNotify{string roomId=1;// 房间IDstring msgId=2;// 消息IDUseruser=3;// AI助手信息int32 streamType=4;// 流类型:0-进行中,1-结束,2-失败string content=5;// 当前数据块内容}

消息类型

enumMsgCommand{// ...STREAM_MSG_NOTIFY=32;// 流式消息通知// ...}

消息状态流转

用户发送消息 ↓ STREAM_MSG_NOTIFY(streamType=0,content="你")← 第一个数据块 ↓ STREAM_MSG_NOTIFY(streamType=0,content="好")← 第二个数据块 ↓ STREAM_MSG_NOTIFY(streamType=0,content=",")← 第三个数据块 ↓...↓ STREAM_MSG_NOTIFY(streamType=1,content="")← 结束标志

前端处理示例(伪代码)

websocket.onmessage=(event)=>{constmessage=JSON.parse(event.data);if(message.command==='STREAM_MSG_NOTIFY'){if(message.streamType===0){// 进行中:追加内容appendContent(message.content);}elseif(message.streamType===1){// 结束:显示完整消息showCompleteMessage();}elseif(message.streamType===2){// 失败:显示错误提示showErrorMessage();}}};

五、多 AI 平台集成的统一接口设计

问题:不同 AI 平台的 API 不同

  • 阿里百炼:使用Flowable<GenerationResult>
  • Gitee AI:使用MessageHandler<String>
  • 其他平台:可能有不同的流式接口

解决方案:统一接口 + 适配器模式

  1. 统一接口定义
publicinterfaceIAiService{/** * 流式调用,统一使用 Consumer<AIResult> 回调 */voidstreamCallWithMessage(StringuserMsg,Consumer<AIResult>consumer);}
  1. 阿里百炼实现
@Service@PrimarypublicclassQWAiServiceimplementsIAiService{@OverridepublicvoidstreamCallWithMessage(StringuserMsg,Consumer<AIResult>consumer){Generationgen=newGeneration();Messagemessage=Message.builder().role(Role.USER.getValue()).content(userMsg).build();// 调用阿里百炼流式APIFlowable<GenerationResult>result=gen.streamCall(generationParam);// 转换为统一格式result.blockingForEach(r->{Stringcontent=r.getOutput().getChoices().get(0).getMessage().getContent();StringfinishReason=r.getOutput().getChoices().get(0).getFinishReason();QWResultqwResult=newQWResult();qwResult.setContent(content);// 判断是否结束qwResult.setStatus("stop".equals(finishReason)?AIMessageStatusEnum.END.getCode():AIMessageStatusEnum.WAIT.getCode());// 调用统一回调consumer.accept(qwResult);});}}
  1. Gitee AI 实现
@ServicepublicclassGiteeAIServiceimplementsIAiService{@ResourceprivateGiteeAIClientgiteeAIClient;@OverridepublicvoidstreamCallWithMessage(StringuserMsg,Consumer<AIResult>consumer){// 调用Gitee AI流式APIgiteeAIClient.streamChat(message,messageList,data->{JSONObjectparse=JSONObject.parseObject(data);JSONArraychoices=parse.getJSONArray("choices");JSONObjectchoicesIn=choices.getJSONObject(0);StringfinishReason=choicesIn.getString("finish_reason");if(finishReason!=null&&finishReason.equals("stop")){// 结束GiteeResultgiteeResult=newGiteeResult();giteeResult.setStatus(AIMessageStatusEnum.END.getCode());consumer.accept(giteeResult);return;}// 进行中JSONObjectdelta=choicesIn.getJSONObject("delta");Stringcontent=delta.getString("content");if(content!=null&&!content.isEmpty()){GiteeResultgiteeResult=newGiteeResult();giteeResult.setContent(content);giteeResult.setStatus(AIMessageStatusEnum.WAIT.getCode());consumer.accept(giteeResult);}});}}

统一接口的优势

  1. 业务代码无需关心具体平台
  2. 易于扩展新平台
  3. 便于切换平台(通过@Prime注解)

使用示例

// 业务代码只需要调用统一接口@ResourceprivateIAiServiceaiService;// Spring会自动注入@Primary的实现aiService.streamCallWithMessage(userMsg,aiResult->{// 处理流式响应,不关心是哪个AI平台sendBroadcastAIMessage(aiResult);});

六、性能优化

  1. 独立线程池
// AI处理在独立线程池中执行,不阻塞MQ消费线程threadPoolUtil.submitTask(()->{aiService.streamCallWithMessage(userMsg,consumer);});

优势

  • 不阻塞 RocketMQ 消费线程
  • AI 处理失败不影响其他消息
  • 可控制并发数
  1. 异步处理
// 消息发送到RocketMQ,异步处理mqSendingAgent.aiHelper(messageDto);// 立即返回,不等待AI响应

优势

  • 用户发送消息后立即返回
  • AI 响应通过 WebSocket 实时推送
  • 提升响应速度

七、总结

关键点

  1. 流式响应:使用回调函数模式,实时推送每个数据块
  2. 统一接口:IAiservice统一不同 AI 平台的接口
  3. WebSocket 推送:通过STREAM_MSG_NOTIFY实时推送
  4. 异步处理:使用 RocketMQ + 独立线程池,不阻塞主流程

优化效果

指标同步流式响应提升
首字延迟10秒1-2秒5-10倍
用户体验显著提升
资源占用降低

经验总结

  1. 流式响应能显著提升性能
  2. 统一接口便于多平台集成
  3. 异步处理避免阻塞
  4. 回调函数模式适合流式场景

通过以上实现,AQChat 实现了类似 ChatGPT 的流式响应效果,提升了用户体验。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/30 8:24:21

GPU资源不足?如何在低成本虚拟机上成功部署Open-AutoGLM,抢占AI先机

第一章&#xff1a;GPU资源不足&#xff1f;重新定义低成本AI部署策略在AI模型日益庞大的今天&#xff0c;高性能GPU已成为训练和推理的标配。然而&#xff0c;对于中小团队或个人开发者而言&#xff0c;获取充足的GPU资源往往面临成本与可及性的双重挑战。面对这一现实&#x…

作者头像 李华
网站建设 2026/6/29 15:44:53

Open-AutoGLM环境搭建全攻略:手把手教你10分钟完成Python依赖部署

第一章&#xff1a;Open-AutoGLM环境搭建前的准备工作 在开始部署 Open-AutoGLM 之前&#xff0c;必须确保开发环境满足其运行依赖和系统要求。合理的前期准备不仅能提升安装成功率&#xff0c;还能避免后续调试过程中出现兼容性问题。 系统与硬件要求 操作系统&#xff1a;推…

作者头像 李华
网站建设 2026/7/1 15:58:01

Open-AutoGLM低配适配实战(性能提升800%的秘密武器)

第一章&#xff1a;Open-AutoGLM低配适配实战概述在资源受限的设备上部署大型语言模型&#xff08;LLM&#xff09;是当前AI工程化的重要挑战。Open-AutoGLM作为一款支持自动化量化与轻量化推理的开源框架&#xff0c;专为低配置环境设计&#xff0c;能够在CPU或低显存GPU上实现…

作者头像 李华
网站建设 2026/7/1 6:03:10

从零到一:麒麟操作系统学习之旅,国产系统的实用探索​

在数字化浪潮席卷全球的今天&#xff0c;操作系统作为信息技术的核心基石&#xff0c;其自主可控性愈发关键。从国家信息安全战略到企业数字化转型&#xff0c;再到个人日常使用&#xff0c;操作系统的选择直接关系到数据安全、业务稳定与使用体验。麒麟操作系统&#xff08;Ky…

作者头像 李华
网站建设 2026/7/1 23:55:01

【Open-AutoGLM离线部署终极指南】:手把手教你零联网配置AI大模型环境

第一章&#xff1a;Open-AutoGLM离线部署概述Open-AutoGLM 是一个基于 GLM 架构的开源自动化语言模型系统&#xff0c;支持本地化部署与私有化调用&#xff0c;适用于对数据隐私和响应延迟有严格要求的企业级应用场景。通过离线部署&#xff0c;用户可在无公网连接的环境中完成…

作者头像 李华
网站建设 2026/7/1 22:34:58

Open-AutoGLM与安卓14兼容性问题终极解决方案:资深工程师亲授调优秘方

第一章&#xff1a;Open-AutoGLM 安卓 14 优化设置为充分发挥 Open-AutoGLM 在安卓 14 系统下的性能潜力&#xff0c;需对系统底层参数与应用运行环境进行针对性调优。合理的配置不仅能提升模型推理速度&#xff0c;还能有效降低资源占用&#xff0c;增强设备稳定性。启用高性能…

作者头像 李华