news 2026/3/5 22:35:02

AI聊天助手的SSE流式输出实现过程

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
AI聊天助手的SSE流式输出实现过程

SSE流式输出的实现过程

后端处理

在创建流式会话时,我们要对这个请求设置好SSE所需要的请求头,然后再创建会话,返回会话ID,紧接着就把会话ID传给前端,让前端绑定这个会话,开始准备流式输出

controller层处理新建会话请求

如果是对已存在的会话就不需要新建,从请求体里获取会话ID进行绑定

funcCreateStreamSessionAndSendMessage(c*gin.Context){req:=new(CreateSessionAndSendMessageRequest)userName:=c.GetString("userName")// From JWT middlewareiferr:=c.ShouldBindJSON(req);err!=nil{c.JSON(http.StatusOK,gin.H{"error":"Invalid parameters"})return}// 设置SSE头c.Header("Content-Type","text/event-stream")// SSE协议的标识,告诉客户端这个是流式输出c.Header("Cache-Control","no-cache, no-transform")// 让浏览器不缓存响应,每次都获取新的内容,禁止代理/CDN对响应内容做转换c.Header("Connection","keep-alive")// 保持长连接c.Header("Access-Control-Allow-Origin","*")// 允许跨域c.Header("X-Accel-Buffering","no")// 禁止Nginx的缓存区,如果不设置这个请求头就会出现响应数据被堆积随后再发送给前端的情况,设置为no的话就能让数据实时透传c.Header("Content-Encoding","identity")// 禁止对响应内容进行编码或者压缩,没有这个也会导致消息被堆积// 先创建会话并立即把 sessionId 下发给前端,随后再开始流式输出sessionID,code_:=session.CreateStreamSessionOnly(userName,req.UserQuestion)ifcode_!=code.CodeSuccess{c.SSEvent("error",gin.H{"message":"Failed to create session"})return}// 先把 sessionId 通过 data 事件发送给前端,前端据此绑定当前会话,侧边栏即可出现新标签c.Writer.WriteString(fmt.Sprintf("data: {\"sessionId\": \"%s\"}\n\n",sessionID))c.Writer.Flush()// 然后开始把本次回答进行流式发送(包含最后的 [DONE])code_=session.StreamMessageToExistingSession(userName,sessionID,req.UserQuestion,req.ModelType,http.ResponseWriter(c.Writer))ifcode_!=code.CodeSuccess{c.SSEvent("error",gin.H{"message":"Failed to send message"})return}}
service层处理逻辑

利用Flush把服务端响应的数据直接推送给客户端,不需要等待缓冲区满了才推送。

funcStreamMessageToExistingSession(userNamestring,sessionIDstring,userQuestionstring,modelTypestring,writer http.ResponseWriter)code.Code{log.Printf("[Service] StreamMessageToExistingSession Start. User=%s, Session=%s, Model=%s",userName,sessionID,modelType)// 确保writer支持Flushflusher,ok:=writer.(http.Flusher)// 类型断言if!ok{log.Println("不支持Flush")returncode.CodeServerBusy}//2:获取AIHelper并通过其管理消息manager:=aihelper.GetGlobalManager()config:=map[string]interface{}{"apiKey":"your-api-key",// TODO: 从配置中获取"username":userName,// 用于 RAG 模型获取用户文档}log.Println("[Service] Getting AIHelper...")helper,err:=manager.GetOrCreateAIHelper(userName,sessionID,modelType,config)iferr!=nil{log.Println("StreamMessageToExistingSession GetOrCreateAIHelper error:",err)returncode.AIModelFail}log.Println("[Service] AIHelper Obtained. Starting StreamResponse...")// 定义callback函数来实时flush推送消息cb:=func(msgstring){log.Printf("[SSE] Sending chunk: %s (len=%d)\n",msg,len(msg))payload,er:=json.Marshal(map[string]string){"type":"delta","content":msg,}iferr!=nil{log.Println("[SSE] Marshal error:",err)return}_,err=writer.Write([]byte("data: "+string(payload)+"\n\n"))iferr!=nil{log.Println("[SSE] Write error:",err)return}flusher.Flush()log.Println("[SSE] Flushed")}_,err_:=helper.StreamResponse(userName,ctx,cb,userQuestion)iferr_!=nil{log.Println("StreamMessageToExistingSession StreamResponse error:",err_)returncode.AIModelFail}_,err=writer.Write([]byte("data: [DONE]\n\n"))iferr!=nil{log.Println("StreamMessageToExistingSession write DONE error:",err)returncode.AIModelFail}flusher.Flush()returncode.CodeSuccess}

这里manager和aihelper的执行逻辑就不展示了,这篇blog的目的是要了解流式传输的逻辑。每次生成响应的时候回调cb来将AI生成的消息实时推送给客户端,流响应结束后会额外发一条data:[DONE],前端根据这个来判断此次回答是否结束。

前端处理

关键是通过fetch建立一个长连接,后面从response.body里流式读取数据,直到读到done后break

asyncfunctionhandleStreaming(question){// 占一个位置表示正在回答constaiMessage={role:'assistant',content:'',meta:{status:'streaming'}// mark streaming}constaiMessageIndex=currentMessages.value.length currentMessages.value.push(aiMessage)// 决定用哪个URLconstisDev=window.location.hostname==='localhost'||window.location.hostname==='127.0.0.1'constbackendBase=isDev?`http://${window.location.hostname}:9090/api/v1/AI`:'/api/AI'consturl=tempSession.value?`${backendBase}/chat/send-stream-new-session`:`${backendBase}/chat/send-stream`// 构建请求头请求体constheaders={'Content-Type':'application/json','Authorization':`Bearer${localStorage.getItem('token')||''}`}constbody=tempSession.value?{question:question,modelType:selectedModel.value}:{question:question,modelType:selectedModel.value,sessionId:currentSessionId.value}}try{// 创建 fetch 连接读取 SSE 流constresponse=awaitfetch(url,{method:'POST',headers,body:JSON.stringify(body)})if(!response.ok){loading.value=falsethrownewError('Network response was not ok')}constreader=response.body.getReader()constdecoder=newTextDecoder()letbuffer=''// 读取流数据// eslint-disable-next-line no-constant-conditionwhile(true){const{done,value}=awaitreader.read()if(done)breakconstchunk=decoder.decode(value,{stream:true})buffer+=chunk// 按行分割constlines=buffer.split('\n')buffer=lines.pop()||''// 保留未完成的行for(constlineoflines){constnormalizedLine=line.endsWith('\r')?line.slice(0,-1):lineif(!normalizedLine)continue// 处理 SSE 格式:data: <content>if(normalizedLine.startsWith('data:')){letdata=normalizedLine.slice(5)if(data.startsWith(' '))data=data.slice(1)console.log('[SSE] Received:',data)// 调试日志if(data==='[DONE]'){// 流结束console.log('[SSE] Stream done')loading.value=falsecurrentMessages.value[aiMessageIndex].meta={status:'done'}currentMessages.value=[...currentMessages.value]}elseif(data.startsWith('{')){// 尝试解析 JSON(如 sessionId)try{constparsed=JSON.parse(data)if(parsed.sessionId){constnewSid=String(parsed.sessionId)console.log('[SSE] Session ID:',newSid)if(tempSession.value){lettitle=(question||'').trim()if(!title){title=`会话${newSid}`}elseif(title.length>30){title=`${title.slice(0,30)}...`}sessions.value[newSid]={id:newSid,name:title,messages:[...currentMessages.value]}currentSessionId.value=newSid tempSession.value=false}}elseif(parsed.type==='delta'&&typeofparsed.content==='string'){currentMessages.value[aiMessageIndex].content+=parsed.content}}catch(e){// 不是 JSON,当作普通文本处理currentMessages.value[aiMessageIndex].content+=dataconsole.log('[SSE] Content updated:',currentMessages.value[aiMessageIndex].content.length)}}else{// 普通文本数据,直接追加// 使用数组索引直接更新,强制 Vue 响应式系统检测变化currentMessages.value[aiMessageIndex].content+=dataconsole.log('[SSE] Content updated:',currentMessages.value[aiMessageIndex].content.length)}// 每收到一条数据就立即更新 DOM// 强制更新整个数组以触发响应式currentMessages.value=[...currentMessages.value]// 使用 requestAnimationFrame 强制浏览器重排awaitnewPromise(resolve=>{requestAnimationFrame(()=>{scrollToBottom()resolve()})})}}}// 流读取完成后的处理loading.value=falsecurrentMessages.value[aiMessageIndex].meta={status:'done'}currentMessages.value=[...currentMessages.value]// 同步到 sessions 存储if(!tempSession.value&&currentSessionId.value&&sessions.value[currentSessionId.value]){constsessMsgs=sessions.value[currentSessionId.value].messagesif(Array.isArray(sessMsgs)&&sessMsgs.length){constlastIndex=sessMsgs.length-1if(sessMsgs[lastIndex]&&sessMsgs[lastIndex].role==='assistant'){sessMsgs[lastIndex].content=currentMessages.value[aiMessageIndex].content}}}}catch(err){console.error('Stream error:',err)loading.value=falsecurrentMessages.value[aiMessageIndex].meta={status:'error'}currentMessages.value=[...currentMessages.value]ElMessage.error('流式传输出错')}}
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/4 1:16:03

计算机毕业设计springboot学生闲置品交易平台 Spring Boot框架下高校闲置资源共享平台的设计与开发 基于微服务架构的校园循环经济交易服务平台构建

计算机毕业设计springboot学生闲置品交易平台ax23jts9 &#xff08;配套有源码 程序 mysql数据库 论文&#xff09; 本套源码可以在文本联xi,先看具体系统功能演示视频领取&#xff0c;可分享源码参考。在当今社会&#xff0c;随着经济的快速发展和人们生活水平的提高&#xff…

作者头像 李华
网站建设 2026/3/5 2:31:17

直接上结论:8个降AIGC工具测评,专科生降AI率必备攻略

在当前的学术写作环境中&#xff0c;AI生成内容&#xff08;AIGC&#xff09;已经成为高校和研究机构重点关注的对象。尤其是对于专科生来说&#xff0c;论文中如果存在明显的AI痕迹&#xff0c;不仅会影响查重率&#xff0c;还可能被判定为学术不端行为。因此&#xff0c;如何…

作者头像 李华
网站建设 2026/2/27 20:03:51

大模型驱动的智能体系统:架构设计与技术实现

本文详细探讨了基于大语言模型的智能体系统设计与实现&#xff0c;包括LLM在智能体中的核心作用、三层架构设计、上下文管理与记忆模块&#xff0c;以及与API和向量数据库的集成。介绍了ReAct、Hugging Face和LangChain等关键框架&#xff0c;并通过金融智能客服等实例展示了完…

作者头像 李华
网站建设 2026/2/28 16:10:11

区块链商业价值预测数据分析

摘要&#xff1a;根据Gartner数据&#xff0c;区块链商业价值发展可分为三个阶段&#xff1a;探索期(2018-2022)年均增长22.6%&#xff0c;加速期(2023-2027)年均增长55.2%&#xff0c;成熟期(2028-2030)年均增长51.7%。预计2030年市场规模将达1360亿美元&#xff0c;复合年增长…

作者头像 李华
网站建设 2026/3/5 14:38:35

HCCL Profiling通信耗时埋点与Timeline生成

摘要 在大规模分布式训练中&#xff0c;通信效率直接决定整体性能。HCCL Profiling通过精准的通信操作耗时埋点&#xff0c;生成可视化Timeline&#xff0c;为性能瓶颈定位提供数据支撑。本文将深度解析/hccl/profiler/trace_collector.cpp的实现机制&#xff0c;演示AllReduc…

作者头像 李华