背景痛点:当“多人同时问”撞上“单点大脑”
做 AI 对话产品最怕的不是模型答不好,而是“答串了”。想象一个场景:教育 SaaS 里 30 名学生同时打开 ChatGPT 界面做口语练习,如果后台把 A 同学的语音转写结果推送给 B 同学,老师当场社死。更麻烦的是,大模型推理本身吃 GPU,连接数一多就出现资源竞争——显存爆满、队列堆积,用户侧感受就是“转圈 10 秒才有声音”。因此,在动手调模型之前,先把“谁是谁”“谁能连”“连多久”三件事捋顺,是新手入门的第一关。
技术对比:Session Token vs JWT vs OAuth
先给三种主流认证方案画个速查表,方便按场景取用。
| 维度 | Session Token(中心化) | JWT(自包含) | OAuth(授权码) |
|---|---|---|---|
| 状态存储 | 服务端 Redis | 无状态 | 服务端+授权服务器 |
| 扩容难度 | 需共享存储 | 天然横向扩容 | 需统一授权中心 |
| 吊销速度 | 实时删除 Key | 等过期或维护黑名单 | 刷新令牌失效 |
| 适合场景 | 后台可运维,低延迟吊销 | 无状态 API、边缘节点 | 第三方登录、SSO |
| 并发风险 | Redis 成单点 | 令牌被盗即裸奔 | Code 换 Token 步骤多 |
结论:纯内部闭环、需要“随时踢人”的 AI 对话系统,优先 Session Token;若想边缘节点无状态,可 JWT+短有效期+Redis 白名单兜底;OAuth 则留给外部账号快捷登录那一层,别让它直接进 WebSocket 链路。
核心实现:ChatGPT 的会话隔离与并发控制
1. 连接池管理示意图
客户端1 ---WS1---> 网关层 --映射--> 连接池[Conn-A] --> 业务实例-A 客户端2 ---WS2---> 网关层 --映射--> 连接池[Conn-B] --> 业务实例-B 客户端3 ---WS3---> 网关层 --映射--> 连接池[Conn-C] --> 业务实例-C关键点:网关只做“路由+心跳”,不碰推理;业务实例里每个连接对象持有 user_id+session_id,保证上下文隔离。
2. Python 代码示例(基于 FastAPI + WebSocket)
# ws_server.py import asyncio import uuid import json import time from typing import Dict from fastapi import FastAPI, WebSocket, WebSocketDisconnect app = FastAPI() POOL: Dict[str, WebSocket] = {} # 简单本机池,生产换 Redis USER_MAP: Dict[str, str] = {} # user_id -> session_id async def heartbeat(ws: WebSocket, session_id: str): """每 30s ping 一次,超 5s 无 pong 即踢掉""" try: while True: await ws.send_text('ping') await asyncio.wait_for(ws.receive_text(), timeout=5) await asyncio.sleep(30) except asyncio.TimeoutError: await disconnect(session_id) async def disconnect(session_id: str): """清理双层映射,防止幽灵连接""" if session_id in POOL: await POOL[session_id].close() del POOL[session_id] # 反向清理 USER_MAP uid = [k for k, v in USER_MAP.items() if v == session_id] for u in uid: USER_MAP.pop(u, None) @app.websocket("/ws/{user_id}") async def ws_endpoint(ws: WebSocket, user_id: str): await ws.accept() # 签发一次性 session_id session_id = str(uuid.uuid4()) POOL[session_id] = ws USER_MAP[user_id] = session_id # 启动心跳 hb_task = asyncio.create_task(heartbeat(ws, session_id)) try: while True: data = await ws.receive_text() msg = json.loads(data) # 仅回写给当前 session,防止串线 await ws.send_text(json.dumps({ "echo": msg.get("text", ""), "session": session_id }, ensure_ascii=False)) except WebSocketDisconnect: hb_task.cancel() await disconnect(session_id). 3 Node.js 代码示例(基于 ws 库)
// ws_server.js const WebSocket = require('ws'); const { v4: uuid } = require('uuid'); const wss = new WebSocket.Server({ port: 8080 }); const POOL = new Map(); // sessionId -> ws const USER_MAP = new Map(); // userId -> sessionId function heartbeat(ws, sessionId) { const i = setInterval(() => { if (ws.readyState === WebSocket.OPEN) { ws.ping(); // 发送 ping 帧 } else { clearInterval(i); cleanup(sessionId); } }, 30000); ws.on('pong', () => { /* 刷新存活时间 */ }); ws.on('close', () => { clearInterval(i); cleanup(sessionId); }); } function cleanup(sessionId) { POOL.delete(sessionId); for (const [uid, sid] of USER_MAP.entries()) { if (sid === sessionId) USER_MAP.delete(uid); } } wss.on('connection', (ws, req) => { const userId = new URL(req.url, 'http://localhost').searchParams.get('userId'); const sessionId = uuid(); POOL.set(sessionId, ws); USER_MAP.set(userId, sessionId); heartbeat(ws, sessionId); ws.on('message', data => { // 原样回声,确保隔离 ws.send(JSON.stringify({ echo: data.toString(), session: sessionId })); }); });性能考量:万级并发下的资源账本
内存占用
一条 WebSocket 连接在 Python 约 20 kB、Node.js 约 12 kB,万并发≈200 MB,还没算业务对象。把“用户上下文”拆成 Redis Hash,单机可扛;若 GPU 推理实例是瓶颈,用“队列+异步回写”模型,让 WebSocket 层只负责收发,不阻塞显存。响应延迟
公网环境下,TLS 握手 + 回环 RTT 经常 200 ms 起跳。把心跳包大小压到 4 字节,关闭 Nagle(TCP_NODELAY),边缘节点开启 WebSocket 压缩扩展 (permessage-deflate),可把空载下行延迟压到 30 ms 以内。水平扩容
网关层无状态,直接 K8s HPA 根据连接数扩容;业务实例(GPU Pod)用 KEDA 按队列长度伸缩,防止“人多连得上却排不上推理”。
避坑指南:生产环境 3 大常见病
Token 泄露导致会话劫持
症状:用户 A 看到用户 B 的历史消息。
解法:WebSocket 建立时再用一次“短期 Token”校验,绑定 IP+UserAgent 指纹,异常立刻重签。幽灵连接堆积
症状:监控显示在线 1 w,实际只有 8 k 活跃。
解法:心跳超时必须双向清理,网关+业务层都对齐close(),并在 Redis 记录“最后 pong 时间”,定时扫描。GPU 推理阻塞导致掉线
症状:大模型生成 20 s 不回包,网关层面超时断连。
解法:WebSocket 只推“任务 ID”,推理完通过 MQ 回包,前端轮询或新开回调 WS,保持通道轻量。
延伸思考:设计一个可动态扩容的会话管理系统
如果把上面的小池子演进到云原生,可以玩这些脑洞:
- 把“会话”抽象成 CRD:
Session对象含 userId、createTime、gpuNode、status 字段,K8s 控制器监听 Session 数量,自动申请 GPU Pod; - 用 NATS/JetStream 做“回包总线”,WebSocket 网关订阅
reply.{sessionId}主题,推理节点发布结果,彻底解耦; - 引入“会话迁移”:GPU 节点维护成本过高时,把 Session 状态快照到 Redis,再在新节点热加载,实现用户无感切换;
- 在池化之上再封装“房间”语义,支持多人同频道语音聊天,给教育、会议场景留接口。
动手实验是检验理解的唯一标准。若你也想从零撸一套“能听会说”的实时 AI,不妨试跑下这个在线动手营——从0打造个人豆包实时通话AI。我跟着步骤 30 分钟就搭出了可对话的 Web 页面,改两行 JSON 就能换音色,小白也能顺利体验。先跑通,再回来对照本文的并发模型做二次改造,相信你会更有体感。