使用SSE协议进行服务端向客户端主动发送消息
1.创建一个SSE配置类:
1.1代码如下:
package com.campus.platform.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.AsyncSupportConfigurer;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import java.util.concurrent.TimeUnit;@Configuration
public class SseConfig implements WebMvcConfigurer {@Overridepublic void configureAsyncSupport(AsyncSupportConfigurer configurer) {// 设置异步请求超时时间configurer.setDefaultTimeout(TimeUnit.MINUTES.toMillis(30));}
}
1.2 创建一个SSE事件类:
package com.campus.platform.model;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class SseEmitterData {private String id;private String event;private String data;private Long retry;
}
1.3 创建SSE服务接口:
package com.campus.platform.service;import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;public interface SseService {/*** 创建SSE连接*/SseEmitter createSseEmitter(String clientId);/*** 发送消息给指定客户端*/void sendMessage(String clientId, String message);/*** 发送消息给所有客户端*/void sendMessageToAll(String message);/*** 移除指定客户端连接*/void removeClient(String clientId);
}
1.4实现SSE服务:
package com.campus.platform.service.impl;import com.campus.platform.model.SseEmitterData;
import com.campus.platform.service.SseService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;@Slf4j
@Service
public class SseServiceImpl implements SseService {// 用于存储客户端连接private static final Map<String, SseEmitter> SSE_CACHE = new ConcurrentHashMap<>();// 心跳间隔(30秒)private static final long HEARTBEAT_INTERVAL = 30_000;@Overridepublic SseEmitter createSseEmitter(String clientId) {// 设置超时时间为30分钟SseEmitter emitter = new SseEmitter(30 * 60 * 1000L);// 注册回调emitter.onCompletion(() -> {log.info("SSE连接完成, clientId: {}", clientId);removeClient(clientId);});emitter.onTimeout(() -> {log.info("SSE连接超时, clientId: {}", clientId);removeClient(clientId);});emitter.onError(throwable -> {log.error("SSE连接异常, clientId: {}", clientId, throwable);removeClient(clientId);});// 缓存连接SSE_CACHE.put(clientId, emitter);// 发送连接成功消息try {SseEmitterData data = new SseEmitterData();data.setId(clientId);data.setEvent("connected");data.setData("连接成功");data.setRetry(HEARTBEAT_INTERVAL);emitter.send(data, MediaType.APPLICATION_JSON);// 启动心跳线程startHeartbeat(clientId);} catch (IOException e) {log.error("发送连接消息失败", e);removeClient(clientId);}return emitter;}@Overridepublic void sendMessage(String clientId, String message) {SseEmitter emitter = SSE_CACHE.get(clientId);if (emitter != null) {try {SseEmitterData data = new SseEmitterData();data.setId(clientId);data.setEvent("message");data.setData(message);emitter.send(data, MediaType.APPLICATION_JSON);} catch (IOException e) {log.error("发送消息失败, clientId: {}", clientId, e);removeClient(clientId);}}}@Overridepublic void sendMessageToAll(String message) {SSE_CACHE.forEach((clientId, emitter) -> {try {SseEmitterData data = new SseEmitterData();data.setId(clientId);data.setEvent("message");data.setData(message);emitter.send(data, MediaType.APPLICATION_JSON);} catch (IOException e) {log.error("发送消息失败, clientId: {}", clientId, e);removeClient(clientId);}});}@Overridepublic void removeClient(String clientId) {SseEmitter emitter = SSE_CACHE.remove(clientId);if (emitter != null) {try {emitter.complete();} catch (Exception e) {log.error("关闭SSE连接失败", e);}}}/*** 启动心跳线程*/private void startHeartbeat(String clientId) {Thread heartbeatThread = new Thread(() -> {while (SSE_CACHE.containsKey(clientId)) {try {Thread.sleep(HEARTBEAT_INTERVAL);SseEmitter emitter = SSE_CACHE.get(clientId);if (emitter != null) {SseEmitterData data = new SseEmitterData();data.setId(clientId);data.setEvent("heartbeat");data.setData("ping");emitter.send(data, MediaType.APPLICATION_JSON);}} catch (Exception e) {log.error("发送心跳失败", e);removeClient(clientId);break;}}});heartbeatThread.setDaemon(true);heartbeatThread.start();}
}
1.5创建SSE控制器:
package com.campus.platform.controller;import com.campus.platform.service.SseService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;@Slf4j
@Api(tags = "SSE长连接接口")
@RestController
@RequestMapping("/api/sse")
public class SseController {@Autowiredprivate SseService sseService;@ApiOperation("建立SSE连接")@GetMapping(value = "/connect/{clientId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter connect(@ApiParam("客户端ID") @PathVariable String clientId) {log.info("客户端[{}]建立SSE连接", clientId);return sseService.createSseEmitter(clientId);}@ApiOperation("发送消息给指定客户端")@PostMapping("/send/{clientId}")public void sendMessage(@ApiParam("客户端ID") @PathVariable String clientId,@ApiParam("消息内容") @RequestParam String message) {log.info("发送消息给客户端[{}]: {}", clientId, message);sseService.sendMessage(clientId, message);}@ApiOperation("发送消息给所有客户端")@PostMapping("/sendAll")public void sendMessageToAll(@ApiParam("消息内容") @RequestParam String message) {log.info("发送消息给所有客户端: {}", message);sseService.sendMessageToAll(message);}@ApiOperation("断开指定客户端连接")@DeleteMapping("/disconnect/{clientId}")public void disconnect(@ApiParam("客户端ID") @PathVariable String clientId) {log.info("断开客户端[{}]的SSE连接", clientId);sseService.removeClient(clientId);}
}
1.6前端JavaScript示例代码:
// 建立SSE连接
function connectSSE(clientId) {const eventSource = new EventSource(`/api/sse/connect/${clientId}`);// 连接打开eventSource.onopen = function() {console.log('SSE连接已建立');};// 接收消息eventSource.onmessage = function(event) {const data = JSON.parse(event.data);console.log('收到消息:', data);// 处理不同类型的事件switch(data.event) {case 'connected':console.log('连接成功');break;case 'message':console.log('收到业务消息:', data.data);break;case 'heartbeat':console.log('收到心跳');break;}};// 连接错误eventSource.onerror = function(error) {console.error('SSE连接错误:', error);eventSource.close();// 可以在这里实现重连逻辑setTimeout(() => {connectSSE(clientId);}, 5000);};return eventSource;
}// 关闭SSE连接
function closeSSE(eventSource) {if (eventSource) {eventSource.close();}
}
1.7 总结
使用说明:
客户端通过调用/api/sse/connect/{clientId}建立SSE连接
服务端可以通过以下方式发送消息:
/api/sse/send/{clientId} 发送消息给指定客户端
/api/sse/sendAll 发送消息给所有客户端
客户端可以通过/api/sse/disconnect/{clientId}主动断开连接
服务端会每30秒发送一次心跳消息,如果发送失败会自动断开连接
如果连接断开,客户端可以实现自动重连逻辑
主要特点:
使用ConcurrentHashMap存储客户端连接,保证线程安全
实现了心跳机制,保持连接活跃
支持向单个或所有客户端推送消息
实现了完整的异常处理和资源释放
支持自定义消息格式和事件类型
这样就实现了一个基于SSE的长连接服务,可以用于实时消息推送、实时通知等场景。
SSE (Server-Sent Events) 主要适用于以下场景:
实时数据推送
股票/基金价格实时更新
体育比赛实时比分
天气数据实时更新
交易数据实时同步
系统监控和告警
服务器性能监控
系统资源使用率实时展示
异常事件实时告警
日志实时查看
社交媒体功能
实时消息通知
新评论/点赞提醒
在线状态更新
好友动态实时推送
协同办公场景
文档实时同步
多人协作状态更新
任务进度实时通知
会议提醒
物联网应用
设备状态实时监控
传感器数据实时展示
智能家居控制反馈
车联网数据实时更新
游戏应用
游戏状态实时同步
排行榜实时更新
对战结果即时通知
在线玩家状态更新
SSE的特点和优势:
1. 单向通信
服务器到客户端的单向数据流
适合数据推送场景
不适合需要客户端频繁发送数据的场景
简单易用
基于HTTP协议
无需额外协议
开发成本低
浏览器原生支持
3. 自动重连
断线自动重连
无需手动处理重连逻辑
提高连接可靠性
实时性好
数据实时推送
延迟低
适合实时性要求不是特别高的场景
资源占用少
相比WebSocket更轻量
服务器压力小
适合大规模连接
跨域支持好
基于HTTP
支持标准的跨域方案
兼容性好
不适用的场景:
1. 双向通信频繁的场景
即时聊天
在线游戏
视频会议
这些场景建议使用WebSocket
低延迟要求的场景
高频交易
实时竞价
这些场景可能需要专用协议
大文件传输
文件上传下载
视频流传输
这些场景建议使用专门的传输协议
客户端写操作频繁的场景
需要客户端频繁发送数据的应用
这种场景使用SSE会增加HTTP请求数量
使用SSE时的注意事项:
相关文章:
使用SSE协议进行服务端向客户端主动发送消息
1.创建一个SSE配置类: 1.1代码如下:package com.campus.platform.config;import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.servlet.config.annotation.AsyncS…...
FastAPI 高并发与性能优化
FastAPI 高并发与性能优化 目录 🚀 高并发应用设计原则🧑💻 异步 I/O 优化 Web 服务响应速度⏳ 在 FastAPI 中优化异步任务执行顺序🔒 高并发中的共享资源与线程安全问题 1. 🚀 高并发应用设计原则 在构建高并发应…...
DFS+回溯+剪枝(深度优先搜索)——搜索算法
目录 一、递归 1.什么是递归? 2.什么时候使用递归? 3.如何理解递归? 4.如何写好递归? 二、记忆化搜索(记忆递归) 三、回溯 四、剪枝 五、综合试题 1.N皇后 2.解数独 DFS也就是深度优先搜索&am…...
在cursor/vscode中使用godot C#进行游戏开发
要在 Visual Studio Code(VS Code)中启动 C#Godot 项目,可以按照以下步骤进行配置: 1.安装必要的工具 • 安装 Visual Studio Code:确保你已经安装了最新版本的 VS Code。 • 安装.NET SDK:下载并安装.NET 7.x SDK(…...
vant4 van-list组件的使用
<van-listv-if"joblist && joblist.length > 0"v-model:loading"loading":finished"finished":immediate-check"false"finished-text"没有更多了"load"onLoad">// 加载 const loading ref(fals…...
介绍 Liquibase、Flyway、Talend 和 Apache NiFi:选择适合的工具
在现代软件开发中,尤其是在数据库管理和数据集成方面,选择合适的工具至关重要。本文将介绍四个流行的工具:Liquibase、Flyway、Talend 和 Apache NiFi,分析它们的应用、依赖以及如何选择适合的工具。 1. Liquibase 简介ÿ…...
攻防世界33 catcat-new【文件包含/flask_session伪造】
题目: 点击一只猫猫: 看这个url像是文件包含漏洞,试试 dirsearch扫出来/admin,访问也没成功(--delay 0.1 -t 5) 会的那几招全用不了了哈哈,那就继续看答案 先总结几个知识点 1./etc/passwd&am…...
Git -> Git配置密钥对,并查看公钥
Git密钥对的核心作用 私钥 (id_rsa) 你的数字身份证:存放在本机 ~/.ssh 目录下必须严格保密(类似银行卡密码),不可泄露或共享用于 解密 来自服务器的加密信息 公钥 (id_rsa.pub) 可公开的验证锁:需要上传到 Git 服…...
淘宝订单列表Fragment转场动画卡顿解决方案
如何应对产品形态与产品节奏相对确定情况下转变为『在业务需求与产品形态高度不确定性的情况下,如何实现业务交付时间与交付质量的确定性』。我们希望通过混合架构(Native 业务容器 Weex 2.0)作为未来交易终端架构的重要演进方向,…...
【ESP32指向鼠标】——icm20948与esp32通信
【ESP32指向鼠标】——icm20948与esp32通信 ICM-20948介绍 ICM-20948 是一款由 InvenSense(现为 TDK 的一部分)生产的 9 轴传感器集成电路。它结合了 陀螺仪、加速度计和磁力计。 内置了 DMP(Digital Motion Processor)即负责执…...
Xcode证书密钥导入
证书干嘛用 渠道定期会给xcode证书,用来给ios打包用,证书里面有记录哪些设备可以打包进去。 怎么换证书 先更新密钥 在钥匙串访问中,选择系统。(选登录也行,反正两个都要导入就是了)。 mac中双击所有 .p12 后缀的密钥ÿ…...
Ubuntu安装PgSQL17
参考官网教程,Ubuntu24 apt在线安装Postgres 17 1. 要手动配置 Apt 存储库 # 导入存储库签名密钥: sudo apt install curl ca-certificates sudo install -d /usr/share/postgresql-common/pgdg sudo curl -o /usr/share/postgresql-common/pgdg/apt…...
K8S容器启动提示:0/2 nodes are available: 2 Insufficient cpu.
问题:K8S的容器启动报错0/2 nodes are available: 2 Insufficient cpu. 原因:Pod的资源请求(requests)设置不当:在Kubernetes中,调度器根据Pod的requests字段来决定哪个节点可以运行该Pod。如果一个Pod声明…...
LabVIEW外腔二极管激光器稳频实验
本项目利用LabVIEW软件开发了一个用于外腔二极管激光器稳频实验的系统。系统能够实现激光器频率的稳定控制和实时监测,为激光实验提供了重要支持。 项目背景: 系统解决了外腔二极管激光器频率不稳定的问题,以满足对激光器频率稳定性要求较高…...
笔记6——字典dict(dictionary)
文章目录 字典dict(dictionary)定义特点常用操作1.访问值2.添加键值对3.修改值4.删除键值对5.遍历字典6.合并字典 性能应用场景dict和list的区别 字典dict(dictionary) 以 键 - 值对 (key - value pairs)的形式存储数据 定义 字典使用花括号 {} 来定义,键和值之…...
【MySQL】InnoDB单表访问方法
目录 1、背景2、环境3、访问类型【1】const【2】ref【3】ref_or_null【4】range【5】index【6】all 4、总结 1、背景 mysql通过查询条件查询到结果的过程就叫访问方法,一条查询语句的访问方法有很多种,接下来我们就来讲一下各种访问方法。 2、环境 创…...
APP端网络测试与弱网模拟!
当前APP网络环境比较复杂,网络制式有2G、3G、4G网络,还有越来越多的公共Wi-Fi。不同的网络环境和网络制式的差异,都会对用户使用app造成一定影响。另外,当前app使用场景多变,如进地铁、上公交、进电梯等,使…...
【个人开发】deepseed+Llama-factory 本地数据多卡Lora微调
文章目录 1.背景2.微调方式2.1 关键环境版本信息2.2 步骤2.2.1 下载llama-factory2.2.2 准备数据集2.2.3 微调模式2.2.4 微调脚本 2.3 踩坑经验2.3.1 问题一:ValueError: Undefined dataset xxxx in dataset_info.json.2.3.2 问题二: ValueError: Target…...
Redis7.0八种数据结构底层原理
导读 本文介绍redis应用数据结构与物理存储结构,共八种应用数据结构和 一. 内部数据结构 1. sds sds是redis自己设计的字符串结构有以下特点: jemalloc内存管理预分配冗余空间二进制安全(c原生使用\0作为结尾标识,所以无法直接存储\0)动态计数类型(根据字符串长度动态选择…...
Kafka 高吞吐量的底层技术原理
Kafka 之所以能够实现高吞吐量(每秒百万级消息处理),主要依赖于其底层设计和多项优化技术。以下是 Kafka 实现高吞吐量的关键技术原理: 1. 顺序读写磁盘 Kafka 利用磁盘的顺序读写特性,避免了随机读写的性能瓶颈。 顺…...
基于MCP协议构建YouTube视频AI分析工具:原理、部署与应用
1. 项目概述:一个连接AI与YouTube的“翻译官”如果你正在探索如何让AI助手,比如Claude、Cursor或者GPTs,直接帮你处理YouTube视频内容——比如总结一个长达两小时的科技讲座、提取某个教程的所有操作步骤,或者分析某个频道近期的内…...
为LLM智能体构建主动防御:Agent Shield架构解析与实战部署
1. 项目概述:Agent Shield 是什么,以及它为何重要 最近在开源社区里,一个名为 agent-shield 的项目引起了我的注意。这个由 Shahar Dagan 发起的项目,直译过来是“智能体护盾”,其核心目标非常明确:为基于…...
加法器优化:从并行前缀到AXON框架的技术演进
1. 加法器优化:从经典架构到AXON框架的演进在数字电路设计中,加法器作为最基础的算术运算单元,其性能直接影响整个系统的时钟频率和能效表现。传统加法器设计面临一个核心矛盾:如何在延迟(Delay)、功耗&…...
从Andru充电器看情感化硬件设计:EDA工具如何实现功能与体验融合
1. 项目概述:从“无聊”到“有趣”的设计哲学 昨天,我还在想,给手机、相机充个电能有什么花样?无非就是找个充电头,插上线,然后等着。这大概是世界上最“无聊”但又最必需的任务之一了。如果有人跑过来跟我…...
K8s网络插件Flannel与Calico:从原理到实战的选型与部署指南
1. Kubernetes网络插件基础认知 刚接触Kubernetes时,最让我头疼的就是容器网络问题。为什么Pod之间需要通信?为什么有的服务跨节点就访问不了?这些问题的答案都藏在CNI(Container Network Interface)插件里。Flannel和…...
AwesomeQRCode源码阅读笔记:深入理解二维码渲染核心技术
AwesomeQRCode源码阅读笔记:深入理解二维码渲染核心技术 【免费下载链接】AwesomeQRCode An awesome QR code generator for Android. 项目地址: https://gitcode.com/gh_mirrors/aw/AwesomeQRCode 想要为你的Android应用添加炫酷的二维码生成功能吗…...
留学生避坑指南:我实测了4种方法,成功将英文论文AI率从97%降到8%
大家最近都在为英文降aigc率发愁吧,作为研三党,我太懂这种痛了,之前我自己写英文初稿,写完直接拿去查重,结果turnitin检测ai率飙到了89%,当时看着报告整个人都懵了。 怎么给英文降ai?对于非母语…...
示波器有效位数(ENOB)实战指南:从原理到选型与应用
1. 从“看见”到“看清”:示波器有效位数(ENOB)的实战解读在电子工程师的日常里,示波器就是我们观察电路世界的“眼睛”。它能让我们直观地看到信号在连接器、线缆、PCB走线和元器件之间穿梭的模样。但就像视力有1.0和1.5的区别一…...
抖音下载器终极指南:3种场景下的高效内容获取方案
抖音下载器终极指南:3种场景下的高效内容获取方案 【免费下载链接】douyin-downloader A practical Douyin downloader for both single-item and profile batch downloads, with progress display, retries, SQLite deduplication, and browser fallback support. …...
OpenClaw工作空间管理工具:自动化配置维护与AI Agent开发效率提升
1. 项目概述:一个为OpenClaw工作空间量身打造的“管家”如果你正在使用OpenClaw,或者对AI Agent、Claude这类工具构建的自动化工作流感兴趣,那你大概率和我一样,经历过一个甜蜜的烦恼:随着项目越来越复杂,工…...
