news 2026/6/9 22:25:51

使用 Go 实现 SSE 流式推送 + 打字机效果(模拟 Coze Chat)

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
使用 Go 实现 SSE 流式推送 + 打字机效果(模拟 Coze Chat)

使用 Go 实现 SSE 流式推送 + 打字机效果(模拟 Coze Chat)

在开发实时聊天、AI 助手或者协作应用时,我们经常需要SSE(Server-Sent Events)实现服务端向前端持续推送数据。本文将分享一个Go SSE 打字机式输出实现,并附上上游模拟示例、curl 测试和前端实时渲染示例。


功能特点

  1. 使用SSE推送消息流,前端无需轮询。
  2. 对消息进行逐字符打字机式输出,模拟 AI 打字效果。
  3. 支持上游 SSE 模拟,方便本地测试。
  4. 可轻松扩展为真实 AI 聊天接口的代理服务。

技术栈

  • Go 1.21+
  • CloudWeGo Hertz 作为 HTTP 框架
  • resty 用于上游 SSE 请求
  • SSE 流式推送(使用hertz-contrib/sse

完整示例代码

packagemainimport("bufio""context""encoding/json""fmt""math/rand""strings""time""github.com/cloudwego/hertz/pkg/app""github.com/cloudwego/hertz/pkg/app/server""github.com/cloudwego/hertz/pkg/common/hlog""github.com/hertz-contrib/sse""resty.dev/v3")funcmain(){h:=server.Default(server.WithHostPorts(":8380"))h.POST("/v3/chat",CozeParseThenTypeWriter)h.GET("/upstream",MockUpstreamSSE)hlog.Info("🚀 Coze SSE parse + typewriter proxy running at :8380")h.Spin()}// 核心逻辑:解析 conversation.message.delta → 打字机式输出funcCozeParseThenTypeWriter(ctx context.Context,c*app.RequestContext){// SSE Headerc.SetStatusCode(200)h:=c.Response.Header h.Set("Content-Type","text/event-stream; charset=utf-8")h.Set("Cache-Control","no-cache, no-store, must-revalidate")h.Set("Pragma","no-cache")h.Set("Connection","keep-alive")h.Set("X-Accel-Buffering","no")stream:=sse.NewStream(c)// Resty 上游请求client:=resty.New().SetTimeout(0)resp,err:=client.R().SetContext(ctx).SetDoNotParseResponse(true).SetHeader("Accept","text/event-stream").Get("http://localhost:8380/upstream")iferr!=nil||resp.RawResponse==nil||resp.RawResponse.Body==nil{hlog.Error("upstream connect failed")return}deferresp.RawResponse.Body.Close()// 打字机准备r:=rand.New(rand.NewSource(time.Now().UnixNano()))scanner:=bufio.NewScanner(resp.RawResponse.Body)varcurrentEventstring// SSE 解析循环forscanner.Scan(){select{case<-ctx.Done():hlog.Warn("client disconnected")returndefault:}line:=scanner.Text()ifline==""{currentEvent=""continue}ifstrings.HasPrefix(line,"event:"){currentEvent=strings.TrimSpace(strings.TrimPrefix(line,"event:"))continue}ifstrings.HasPrefix(line,"data:"){payload:=strings.TrimSpace(strings.TrimPrefix(line,"data:"))ifpayload=="[DONE]"{stream.Publish(&sse.Event{Data:[]byte("[DONE]")})return}ifcurrentEvent=="conversation.message.delta"{vardstruct{Contentstring`json:"content"`}iferr:=json.Unmarshal([]byte(payload),&d);err==nil{typeWriter(stream,r,d.Content)}}ifcurrentEvent=="conversation.message.completed"{stream.Publish(&sse.Event{Event:"conversation.message.completed",Data:[]byte(`{"status":"completed"}`),})}}}}// 打字机逐字符输出functypeWriter(stream*sse.Stream,r*rand.Rand,textstring){fori,ch:=rangetext{time.Sleep(getSleepDuration(r,ch))data:=map[string]any{"id":fmt.Sprintf("char_%d_%d",i,time.Now().UnixNano()%100000),"role":"assistant","type":"answer","content":string(ch),"created_at":time.Now().UnixMilli(),}b,_:=json.Marshal(data)_=stream.Publish(&sse.Event{Event:"conversation.message.delta",ID:fmt.Sprintf("char_%d",i),Data:b,})}}// 延迟策略funcgetSleepDuration(r*rand.Rand,chrune)time.Duration{switch{casech=='\n'||ch=='。'||ch=='!'||ch=='?':returntime.Duration(300+r.Intn(200))*time.Millisecondcasech=='、'||ch==' '||ch=='-'||ch==':'||ch==',':returntime.Duration(150+r.Intn(100))*time.Millisecondcasech=='#'||ch=='*'||ch=='>':returntime.Duration(200+r.Intn(150))*time.Milliseconddefault:returntime.Duration(60+r.Intn(60))*time.Millisecond}}// 上游 Mock(模拟 Coze SSE)funcMockUpstreamSSE(ctx context.Context,c*app.RequestContext){c.SetStatusCode(200)c.Header("Content-Type","text/event-stream")c.Header("Cache-Control","no-cache")c.Header("Connection","keep-alive")c.Header("X-Accel-Buffering","no")c.Flush()send:=func(event,datastring){c.Write([]byte("event: "+event+"\n"+"data: "+data+"\n\n",))c.Flush()}deltas:=[]string{"你","好",",","这","是"," Coze"," SSE"}for_,ch:=rangedeltas{send("conversation.message.delta",`{"content":"`+ch+`"}`)time.Sleep(80*time.Millisecond)}send("conversation.message.completed",`{}`)c.Write([]byte("data: [DONE]\n\n"))c.Flush()}

使用方法

  1. 启动服务
go run main.go
  1. 访问接口
  • 上游 SSE 测试(浏览器可直接访问):
    http://localhost:8380/upstream

  • 打字机代理接口(POST):
    http://localhost:8380/v3/chat


使用curl测试 SSE

# 测试上游 SSEcurl-N http://localhost:8380/upstream# 测试打字机代理 SSEcurl-N -X POST http://localhost:8380/v3/chat

参数说明:

  • -N/--no-buffer:禁用输出缓存,实时显示流式数据。
  • -X POST:因为代理接口是 POST。

运行后,你会在终端看到类似打字机逐字符输出:

event: conversation.message.delta data: {"id":"char_0_12345","role":"assistant","type":"answer","content":"你","created_at":1700000000000} event: conversation.message.delta data: {"id":"char_1_67890","role":"assistant","type":"answer","content":"好","created_at":1700000000050} ... event: conversation.message.completed data: {"status":"completed"} data: [DONE]

前端实时渲染打字机效果示例

在前端,可以使用EventSource监听 SSE 并动态显示内容:

<divid="chat"></div><script>constchatDiv=document.getElementById("chat");constevtSource=newEventSource("http://localhost:8380/v3/chat");evtSource.addEventListener("conversation.message.delta",e=>{constdata=JSON.parse(e.data);chatDiv.innerHTML+=data.content;});evtSource.addEventListener("conversation.message.completed",e=>{console.log("消息完成");});evtSource.onopen=()=>console.log("连接已打开");evtSource.onerror=()=>console.log("连接错误或关闭");</script>

效果:消息逐字符显示,模拟 AI 打字机输出。


核心解析

  1. SSE Header 设置
h.Set("Content-Type","text/event-stream; charset=utf-8")h.Set("Cache-Control","no-cache, no-store, must-revalidate")h.Set("Connection","keep-alive")h.Set("X-Accel-Buffering","no")

保证浏览器或代理实时接收流式数据。

  1. 打字机效果

根据字符类型不同设置不同延迟:

casech=='\n'||ch=='。'||ch=='!'||ch=='?':returntime.Duration(300+r.Intn(200))*time.Millisecond
  1. 上游 SSE 模拟

方便本地测试,无需真实 AI 接口即可验证前端打字机效果。


总结

通过本文示例,你可以快速实现:

  • Go SSE 服务端代理
  • AI 聊天消息打字机式输出
  • 上游 SSE 模拟
  • curl 测试和前端实时渲染
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/9 19:56:31

【实践原创】使用 FastAPI 实现 Coze 流式聊天 SSE 接口

使用 FastAPI 实现 Coze 流式聊天 SSE 接口 在开发 AI 助手或聊天应用时&#xff0c;我们通常希望服务端能够 实时向前端推送消息&#xff0c;让用户看到逐字打字效果。本文演示如何使用 FastAPI Coze Python SDK&#xff08;cozepy&#xff09; 实现 流式聊天 SSE 接口&…

作者头像 李华
网站建设 2026/6/9 19:57:26

MCP DP-420图Agent性能骤降?7个关键指标必须立即检查

第一章&#xff1a;MCP DP-420图Agent性能骤降的典型现象在部署MCP DP-420图Agent的实际生产环境中&#xff0c;部分用户反馈其数据处理吞吐量出现显著下降&#xff0c;响应延迟从正常的200ms上升至超过2秒&#xff0c;严重影响上层业务调用。该问题通常出现在高并发或长时间运…

作者头像 李华
网站建设 2026/6/9 6:30:19

不用花钱不用等!SSL 证书快速到手

还在为网站 “不安全” 警告发愁&#xff1f;还在纠结付费 SSL 证书的高昂费用&#xff0c;或是被传统申请流程的复杂命令行劝退&#xff1f;其实 2025 年的免费 SSL 证书早已实现 “零成本 极速化”&#xff0c;无需专业技术&#xff0c;不用漫长等待&#xff0c;普通人也能 …

作者头像 李华
网站建设 2026/6/9 19:55:33

5分钟快速上手:用ant-design-x-vue构建专业级AI对话界面

5分钟快速上手&#xff1a;用ant-design-x-vue构建专业级AI对话界面 【免费下载链接】ant-design-x-vue Ant Design X For Vue.&#xff08;WIP&#xff09; 疯狂研发中&#x1f525; 项目地址: https://gitcode.com/gh_mirrors/an/ant-design-x-vue 还在为开发智能对话…

作者头像 李华
网站建设 2026/6/9 21:25:26

1988-2025年上市公司工业互联网技术专利数据

数据简介 工业互联网作为“工业4.0”与数字经济深度融合的核心载体&#xff0c;是推动制造业转型升级、实现产业高质量发展的关键支撑技术&#xff0c;其专利布局直接反映企业在工业数字化、网络化、智能化领域的核心竞争力。本数据聚焦上市公司工业互联网技术创新成果的精准量…

作者头像 李华
网站建设 2026/6/9 19:06:38

基于多Agent的负荷预测系统设计(电力AI建模稀缺方案曝光)

第一章&#xff1a;基于多Agent的负荷预测系统设计在现代电力系统中&#xff0c;负荷预测是保障电网稳定运行与能源高效调度的关键环节。传统集中式预测模型难以应对大规模、分布式数据源带来的动态性和异构性挑战。为此&#xff0c;基于多Agent系统的负荷预测架构应运而生&…

作者头像 李华