news 2026/2/12 7:21:19

ResponseBodyEmitter 实时异步流式推送

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
ResponseBodyEmitter 实时异步流式推送

ResponseBodyEmitter:SpringBoot的流式推送方案

ResponseBodyEmitter是Spring框架提供的流式响应工具,专门为服务器向客户端推送数据而设计:

  • 流式传输:支持持续数据推送

  • 异步处理:非阻塞式数据发送

  • 连接管理:自动处理连接状态

  • 错误处理:完善的异常处理机制

  • 内存友好:避免大数据量占用内存

ResponseBodyEmitter vs 其他方案对比

ResponseBodyEmitter优势

  • 比WebSocket简单,无需额外协议

  • 比SSE更灵活,支持复杂数据格式

  • 比轮询高效,减少无效请求

  • 基于HTTP,兼容性好

适用场景

  • 实时日志查看

  • 进度条更新

  • 数据流推送

  • 实时监控数据

SpringBoot集成ResponseBodyEmitter实战

1. 基础流式推送控制器

@RestController publicclass StreamController { @GetMapping(value = "/stream/logs", produces = MediaType.TEXT_PLAIN_VALUE) public ResponseBodyEmitter streamLogs() { ResponseBodyEmitter emitter = new ResponseBodyEmitter(); // 设置超时时间(-1表示永不超时) emitter.setTimeout(Long.MAX_VALUE); // 添加连接成功事件 try { emitter.send("连接建立成功\n"); } catch (IOException e) { emitter.completeWithError(e); return emitter; } // 添加连接关闭回调 emitter.onCompletion(() -> { log.info("流式连接关闭"); }); emitter.onTimeout(() -> { log.warn("流式连接超时"); emitter.complete(); }); // 启动异步日志推送 startLogStreaming(emitter); return emitter; } private void startLogStreaming(ResponseBodyEmitter emitter) { // 模拟日志数据推送 ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); scheduler.scheduleAtFixedRate(() -> { try { String logLine = String.format("[%s] %s\n", LocalDateTime.now(), "这是实时日志内容 " + System.currentTimeMillis()); emitter.send(logLine); } catch (IOException e) { log.error("日志推送失败", e); emitter.completeWithError(e); scheduler.shutdown(); } }, 0, 1, TimeUnit.SECONDS); } }

2. 实时日志推送服务

@Service publicclass RealTimeLogService { // 存储所有活跃的流式连接 privatefinal Map<String, ResponseBodyEmitter> activeEmitters = new ConcurrentHashMap<>(); public ResponseBodyEmitter createLogStream(String userId, String logType) { String streamId = userId + ":" + logType; ResponseBodyEmitter emitter = new ResponseBodyEmitter(); // 设置超时时间 emitter.setTimeout(Long.MAX_VALUE); // 存储连接 activeEmitters.put(streamId, emitter); // 连接建立成功 try { emitter.send("实时日志流建立成功\n"); } catch (IOException e) { emitter.completeWithError(e); } // 连接关闭时清理 emitter.onCompletion(() -> { activeEmitters.remove(streamId); log.info("日志流连接关闭: {}", streamId); }); emitter.onTimeout(() -> { emitter.complete(); }); return emitter; } public void pushLogToUser(String userId, String logType, String logMessage) { String streamId = userId + ":" + logType; ResponseBodyEmitter emitter = activeEmitters.get(streamId); if (emitter != null) { try { emitter.send(logMessage + "\n"); } catch (IOException e) { // 发送失败,移除连接 activeEmitters.remove(streamId); log.error("日志推送失败", e); } } } public void pushLogToAll(String logMessage) { activeEmitters.forEach((streamId, emitter) -> { try { emitter.send(logMessage + "\n"); } catch (IOException e) { activeEmitters.remove(streamId); } }); } }

3. 进度条实时更新

@RestController publicclass ProgressController { @Autowired private ProgressService progressService; @GetMapping("/stream/progress/{taskId}") public ResponseBodyEmitter streamProgress(@PathVariable String taskId) { ResponseBodyEmitter emitter = new ResponseBodyEmitter(); emitter.setTimeout(Long.MAX_VALUE); // 发送初始进度 try { emitter.send("开始任务\n"); } catch (IOException e) { emitter.completeWithError(e); return emitter; } // 启动进度监控 progressService.startMonitoring(taskId, progress -> { try { String progressMessage = String.format("进度: %d%% - %s\n", progress.getPercentage(), progress.getDescription()); emitter.send(progressMessage); if (progress.isCompleted()) { emitter.complete(); } } catch (IOException e) { log.error("进度推送失败", e); emitter.completeWithError(e); } }); return emitter; } }

4. 数据流推送

@RestController publicclass DataStreamController { @Autowired private DataStreamService dataStreamService; @GetMapping("/stream/data/{streamId}") public ResponseBodyEmitter streamData(@PathVariable String streamId, @RequestParam(defaultValue = "1000") long intervalMs) { ResponseBodyEmitter emitter = new ResponseBodyEmitter(); emitter.setTimeout(Long.MAX_VALUE); // 启动数据流推送 dataStreamService.startDataStreaming(streamId, intervalMs, data -> { try { // 将数据转换为JSON格式并发送 String jsonData = JSON.toJSONString(data); emitter.send(jsonData + "\n"); } catch (IOException e) { log.error("数据流推送失败", e); emitter.completeWithError(e); } }); return emitter; } }

高级特性实现

1. 连接池管理

@Service publicclass StreamConnectionPool { privatefinal Map<String, ResponseBodyEmitter> emitters = new ConcurrentHashMap<>(); privatefinal Map<String, ScheduledExecutorService> schedulers = new ConcurrentHashMap<>(); public ResponseBodyEmitter createStream(String streamId, StreamConfig config) { ResponseBodyEmitter emitter = new ResponseBodyEmitter(); emitter.setTimeout(config.getTimeout()); // 存储连接 emitters.put(streamId, emitter); // 创建调度器 ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); schedulers.put(streamId, scheduler); // 连接管理回调 emitter.onCompletion(() -> { emitters.remove(streamId); ScheduledExecutorService removedScheduler = schedulers.remove(streamId); if (removedScheduler != null) { removedScheduler.shutdown(); } }); emitter.onTimeout(() -> { emitter.complete(); }); return emitter; } public boolean sendToStream(String streamId, String data) { ResponseBodyEmitter emitter = emitters.get(streamId); if (emitter != null) { try { emitter.send(data + "\n"); returntrue; } catch (IOException e) { emitters.remove(streamId); returnfalse; } } returnfalse; } }

2. 缓存与批量推送

@Service publicclass BatchStreamService { privatefinal Map<String, List<String>> messageBuffers = new ConcurrentHashMap<>(); privatefinal ScheduledExecutorService batchScheduler = Executors.newScheduledThreadPool(5); public void addToBuffer(String streamId, String message) { messageBuffers.computeIfAbsent(streamId, k -> new ArrayList<>()).add(message); } public void startBatchProcessing(String streamId, int batchSize, long intervalMs) { batchScheduler.scheduleAtFixedRate(() -> { List<String> messages = messageBuffers.get(streamId); if (messages != null && !messages.isEmpty()) { synchronized (messages) { if (!messages.isEmpty()) { List<String> batch = new ArrayList<>(); int size = Math.min(batchSize, messages.size()); for (int i = 0; i < size; i++) { batch.add(messages.remove(0)); } // 批量发送 sendBatchToStream(streamId, batch); } } } }, 0, intervalMs, TimeUnit.MILLISECONDS); } private void sendBatchToStream(String streamId, List<String> messages) { String batchData = JSON.toJSONString(messages); // 发送到对应的流 // 实现逻辑... } }

3. 断线重连机制

@Service publicclass ReconnectStreamService { privatefinal Map<String, StreamSession> activeSessions = new ConcurrentHashMap<>(); public ResponseBodyEmitter createReconnectableStream(String userId, String sessionId, long lastEventId) { ResponseBodyEmitter emitter = new ResponseBodyEmitter(); emitter.setTimeout(Long.MAX_VALUE); // 创建会话 StreamSession session = new StreamSession(userId, sessionId, emitter, lastEventId); activeSessions.put(sessionId, session); // 发送历史数据(如果需要) if (lastEventId > 0) { sendHistoricalData(emitter, userId, lastEventId); } emitter.onCompletion(() -> { activeSessions.remove(sessionId); }); return emitter; } private void sendHistoricalData(ResponseBodyEmitter emitter, String userId, long lastEventId) { // 获取历史数据并发送 List<DataEvent> history = dataService.getEventsAfter(userId, lastEventId); history.forEach(event -> { try { emitter.send(JSON.toJSONString(event) + "\n"); } catch (IOException e) { log.error("发送历史数据失败", e); } }); } }

性能优化建议

1. 内存管理

@Component publicclass StreamMemoryManager { privatefinal AtomicLong totalConnections = new AtomicLong(0); privatefinal AtomicLong maxConnections = new AtomicLong(1000); // 最大连接数限制 public boolean canAcceptNewConnection() { long current = totalConnections.get(); return current < maxConnections.get(); } public void registerConnection() { totalConnections.incrementAndGet(); } public void unregisterConnection() { totalConnections.decrementAndGet(); } @Scheduled(fixedRate = 60000) // 每分钟检查一次 public void cleanupIdleConnections() { // 清理长时间无活动的连接 // 实现逻辑... } }

2. 异步处理优化

@Service publicclass AsyncStreamService { @Autowired private TaskExecutor streamExecutor; public void sendAsync(String streamId, String data) { streamExecutor.execute(() -> { ResponseBodyEmitter emitter = emitters.get(streamId); if (emitter != null) { try { emitter.send(data + "\n"); } catch (IOException e) { emitters.remove(streamId); } } }); } }

客户端实现

JavaScript客户端

// 创建流式连接 function createStreamConnection(url) { const xhr = new XMLHttpRequest(); xhr.open('GET', url, true); xhr.onreadystatechange = function() { if (xhr.readyState === 3 || xhr.readyState === 4) { const newData = xhr.responseText.substring(xhr.lastIndex || 0); if (newData) { const lines = newData.split('\n'); lines.forEach(line => { if (line.trim()) { handleStreamData(line); } }); xhr.lastIndex = xhr.responseText.length; } } }; xhr.onerror = function() { console.error('流式连接错误'); // 尝试重连 setTimeout(() => createStreamConnection(url), 5000); }; xhr.send(); } // 或者使用fetch API asyncfunction createFetchStream(url) { const response = await fetch(url); const reader = response.body.getReader(); const decoder = new TextDecoder(); while (true) { const { done, value } = await reader.read(); if (done) break; const chunk = decoder.decode(value); const lines = chunk.split('\n'); lines.forEach(line => { if (line.trim()) { handleStreamData(line); } }); } }

安全考虑

1. 认证授权

@RestController publicclass SecureStreamController { @GetMapping("/stream/secure/{userId}") public ResponseBodyEmitter createSecureStream(@PathVariable String userId, @RequestHeader("Authorization") String token) { // 验证token if (!tokenService.validateToken(token, userId)) { thrownew UnauthorizedException("认证失败"); } return streamService.createStream(userId); } }

2. 速率限制

@Service publicclass RateLimitedStreamService { privatefinal Map<String, RateLimiter> rateLimiters = new ConcurrentHashMap<>(); public boolean canSend(String userId, String streamType) { RateLimiter limiter = rateLimiters.computeIfAbsent( userId + ":" + streamType, k -> RateLimiter.create(10) // 每秒最多10条消息 ); return limiter.tryAcquire(); } public void sendWithRateLimit(String userId, String streamType, String data) { if (canSend(userId, streamType)) { sendToStream(userId, data); } else { log.warn("用户 {} 发送频率过高", userId); } } }

监控与运维

1. 连接监控

@Component publicclass StreamMetricsCollector { privatefinal MeterRegistry meterRegistry; public void recordConnection(String userId) { Counter.builder("stream_connections_total") .tag("user_id", userId) .register(meterRegistry) .increment(); } public void recordDataSent(String userId, long dataSize) { Counter.builder("stream_data_sent_bytes_total") .tag("user_id", userId) .register(meterRegistry) .increment(dataSize); } public int getActiveConnections() { return streamService.getActiveConnectionCount(); } }

最佳实践

1. 连接管理

  • 合理设置超时时间:避免长时间占用连接

  • 定期清理无效连接:防止内存泄漏

  • 连接数限制:防止恶意连接

2. 数据格式

  • 统一数据格式:便于客户端处理

  • 消息大小控制:避免传输大消息

  • 编码处理:确保字符编码正确

3. 错误处理

  • 优雅降级:连接失败时回退到轮询

  • 重试机制:连接失败时自动重试

  • 异常监控:及时发现和处理异常

总结

ResponseBodyEmitter是实现服务器推送数据的高效方案,相比传统的轮询方式,它具有以下优势:

  1. 高效传输:减少无效请求

  2. 实时性好:数据变更立即推送

  3. 实现简单:基于HTTP协议

  4. 内存友好:流式处理大数据

记住,ResponseBodyEmitter适合单向服务器推送的场景,对于需要双向通信的场景,还是需要WebSocket。但对大多数实时数据推送需求,ResponseBodyEmitter绝对是更优的选择!

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/2/10 10:12:59

Python ezdxf库实战指南:5个DXF文件处理技巧助你高效工作

Python ezdxf库实战指南&#xff1a;5个DXF文件处理技巧助你高效工作 【免费下载链接】ezdxf Python interface to DXF 项目地址: https://gitcode.com/gh_mirrors/ez/ezdxf 在工程设计和制造领域&#xff0c;DXF文件作为CAD数据交换的标准格式&#xff0c;其自动化处理…

作者头像 李华
网站建设 2026/2/9 23:52:08

经典游戏兼容性终极解决方案:Windows 11完美运行指南

经典游戏兼容性终极解决方案&#xff1a;Windows 11完美运行指南 【免费下载链接】WarcraftHelper Warcraft III Helper , support 1.20e, 1.24e, 1.26a, 1.27a, 1.27b 项目地址: https://gitcode.com/gh_mirrors/wa/WarcraftHelper 还在为经典游戏在Windows 11上频繁崩…

作者头像 李华
网站建设 2026/2/8 18:45:29

句法分析十年演进(2015–2025)

句法分析十年演进&#xff08;2015–2025&#xff09; 一句话总论&#xff1a; 2015年句法分析还是“基于统计的条件随机场&#xff08;CRF&#xff09;手工特征树库依赖解析”的规则时代&#xff0c;2025年已进化成“端到端VLA大模型多模态语义依存量子鲁棒自进化实时意图级句…

作者头像 李华
网站建设 2026/2/11 13:46:10

词性标注十年演进(2015–2025)

词性标注十年演进&#xff08;2015–2025&#xff09; 一句话总论&#xff1a; 2015年词性标注还是“BiLSTM-CRF手工特征固定词表”的序列标注时代&#xff0c;2025年已进化成“端到端VLA多模态大模型实时意图级标注量子鲁棒自进化全语言/口语/多语种统一”的通用智能时代&…

作者头像 李华
网站建设 2026/2/11 20:55:48

揭秘R语言与GPT协同处理数据:3步实现智能格式转换

第一章&#xff1a;R语言与GPT协同处理数据的核心价值在现代数据分析流程中&#xff0c;R语言以其强大的统计计算与可视化能力成为科研与商业分析的首选工具。与此同时&#xff0c;GPT类大语言模型凭借其自然语言理解与代码生成能力&#xff0c;正在重塑开发者与数据科学家的工…

作者头像 李华
网站建设 2026/2/10 15:05:53

隐形掠夺者:VVS窃密木马如何滥用军事级混淆技术劫持Discord账户

网络安全研究团队Unit 42最新分析报告披露&#xff0c;一种基于Python开发的新型窃密木马VVS Stealer&#xff08;或称"VVS $tealer"&#xff09;正通过军事级混淆技术在网络犯罪地下市场传播&#xff0c;专门针对Discord聊天平台的庞大用户群体。该恶意软件滥用合法…

作者头像 李华