使用SSE协议进行服务端向客户端主动发送消息
1.创建一个SSE配置类:
1.1代码如下:
package com.campus.platform.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.AsyncSupportConfigurer;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import java.util.concurrent.TimeUnit;@Configuration
public class SseConfig implements WebMvcConfigurer {@Overridepublic void configureAsyncSupport(AsyncSupportConfigurer configurer) {// 设置异步请求超时时间configurer.setDefaultTimeout(TimeUnit.MINUTES.toMillis(30));}
}
1.2 创建一个SSE事件类:
package com.campus.platform.model;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class SseEmitterData {private String id;private String event;private String data;private Long retry;
}
1.3 创建SSE服务接口:
package com.campus.platform.service;import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;public interface SseService {/*** 创建SSE连接*/SseEmitter createSseEmitter(String clientId);/*** 发送消息给指定客户端*/void sendMessage(String clientId, String message);/*** 发送消息给所有客户端*/void sendMessageToAll(String message);/*** 移除指定客户端连接*/void removeClient(String clientId);
}
1.4实现SSE服务:
package com.campus.platform.service.impl;import com.campus.platform.model.SseEmitterData;
import com.campus.platform.service.SseService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;@Slf4j
@Service
public class SseServiceImpl implements SseService {// 用于存储客户端连接private static final Map<String, SseEmitter> SSE_CACHE = new ConcurrentHashMap<>();// 心跳间隔(30秒)private static final long HEARTBEAT_INTERVAL = 30_000;@Overridepublic SseEmitter createSseEmitter(String clientId) {// 设置超时时间为30分钟SseEmitter emitter = new SseEmitter(30 * 60 * 1000L);// 注册回调emitter.onCompletion(() -> {log.info("SSE连接完成, clientId: {}", clientId);removeClient(clientId);});emitter.onTimeout(() -> {log.info("SSE连接超时, clientId: {}", clientId);removeClient(clientId);});emitter.onError(throwable -> {log.error("SSE连接异常, clientId: {}", clientId, throwable);removeClient(clientId);});// 缓存连接SSE_CACHE.put(clientId, emitter);// 发送连接成功消息try {SseEmitterData data = new SseEmitterData();data.setId(clientId);data.setEvent("connected");data.setData("连接成功");data.setRetry(HEARTBEAT_INTERVAL);emitter.send(data, MediaType.APPLICATION_JSON);// 启动心跳线程startHeartbeat(clientId);} catch (IOException e) {log.error("发送连接消息失败", e);removeClient(clientId);}return emitter;}@Overridepublic void sendMessage(String clientId, String message) {SseEmitter emitter = SSE_CACHE.get(clientId);if (emitter != null) {try {SseEmitterData data = new SseEmitterData();data.setId(clientId);data.setEvent("message");data.setData(message);emitter.send(data, MediaType.APPLICATION_JSON);} catch (IOException e) {log.error("发送消息失败, clientId: {}", clientId, e);removeClient(clientId);}}}@Overridepublic void sendMessageToAll(String message) {SSE_CACHE.forEach((clientId, emitter) -> {try {SseEmitterData data = new SseEmitterData();data.setId(clientId);data.setEvent("message");data.setData(message);emitter.send(data, MediaType.APPLICATION_JSON);} catch (IOException e) {log.error("发送消息失败, clientId: {}", clientId, e);removeClient(clientId);}});}@Overridepublic void removeClient(String clientId) {SseEmitter emitter = SSE_CACHE.remove(clientId);if (emitter != null) {try {emitter.complete();} catch (Exception e) {log.error("关闭SSE连接失败", e);}}}/*** 启动心跳线程*/private void startHeartbeat(String clientId) {Thread heartbeatThread = new Thread(() -> {while (SSE_CACHE.containsKey(clientId)) {try {Thread.sleep(HEARTBEAT_INTERVAL);SseEmitter emitter = SSE_CACHE.get(clientId);if (emitter != null) {SseEmitterData data = new SseEmitterData();data.setId(clientId);data.setEvent("heartbeat");data.setData("ping");emitter.send(data, MediaType.APPLICATION_JSON);}} catch (Exception e) {log.error("发送心跳失败", e);removeClient(clientId);break;}}});heartbeatThread.setDaemon(true);heartbeatThread.start();}
}
1.5创建SSE控制器:
package com.campus.platform.controller;import com.campus.platform.service.SseService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;@Slf4j
@Api(tags = "SSE长连接接口")
@RestController
@RequestMapping("/api/sse")
public class SseController {@Autowiredprivate SseService sseService;@ApiOperation("建立SSE连接")@GetMapping(value = "/connect/{clientId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter connect(@ApiParam("客户端ID") @PathVariable String clientId) {log.info("客户端[{}]建立SSE连接", clientId);return sseService.createSseEmitter(clientId);}@ApiOperation("发送消息给指定客户端")@PostMapping("/send/{clientId}")public void sendMessage(@ApiParam("客户端ID") @PathVariable String clientId,@ApiParam("消息内容") @RequestParam String message) {log.info("发送消息给客户端[{}]: {}", clientId, message);sseService.sendMessage(clientId, message);}@ApiOperation("发送消息给所有客户端")@PostMapping("/sendAll")public void sendMessageToAll(@ApiParam("消息内容") @RequestParam String message) {log.info("发送消息给所有客户端: {}", message);sseService.sendMessageToAll(message);}@ApiOperation("断开指定客户端连接")@DeleteMapping("/disconnect/{clientId}")public void disconnect(@ApiParam("客户端ID") @PathVariable String clientId) {log.info("断开客户端[{}]的SSE连接", clientId);sseService.removeClient(clientId);}
}
1.6前端JavaScript示例代码:
// 建立SSE连接
function connectSSE(clientId) {const eventSource = new EventSource(`/api/sse/connect/${clientId}`);// 连接打开eventSource.onopen = function() {console.log('SSE连接已建立');};// 接收消息eventSource.onmessage = function(event) {const data = JSON.parse(event.data);console.log('收到消息:', data);// 处理不同类型的事件switch(data.event) {case 'connected':console.log('连接成功');break;case 'message':console.log('收到业务消息:', data.data);break;case 'heartbeat':console.log('收到心跳');break;}};// 连接错误eventSource.onerror = function(error) {console.error('SSE连接错误:', error);eventSource.close();// 可以在这里实现重连逻辑setTimeout(() => {connectSSE(clientId);}, 5000);};return eventSource;
}// 关闭SSE连接
function closeSSE(eventSource) {if (eventSource) {eventSource.close();}
}
1.7 总结
使用说明:
客户端通过调用/api/sse/connect/{clientId}建立SSE连接
服务端可以通过以下方式发送消息:
/api/sse/send/{clientId} 发送消息给指定客户端
/api/sse/sendAll 发送消息给所有客户端
客户端可以通过/api/sse/disconnect/{clientId}主动断开连接
服务端会每30秒发送一次心跳消息,如果发送失败会自动断开连接
如果连接断开,客户端可以实现自动重连逻辑
主要特点:
使用ConcurrentHashMap存储客户端连接,保证线程安全
实现了心跳机制,保持连接活跃
支持向单个或所有客户端推送消息
实现了完整的异常处理和资源释放
支持自定义消息格式和事件类型
这样就实现了一个基于SSE的长连接服务,可以用于实时消息推送、实时通知等场景。
SSE (Server-Sent Events) 主要适用于以下场景:
实时数据推送
股票/基金价格实时更新
体育比赛实时比分
天气数据实时更新
交易数据实时同步
系统监控和告警
服务器性能监控
系统资源使用率实时展示
异常事件实时告警
日志实时查看
社交媒体功能
实时消息通知
新评论/点赞提醒
在线状态更新
好友动态实时推送
协同办公场景
文档实时同步
多人协作状态更新
任务进度实时通知
会议提醒
物联网应用
设备状态实时监控
传感器数据实时展示
智能家居控制反馈
车联网数据实时更新
游戏应用
游戏状态实时同步
排行榜实时更新
对战结果即时通知
在线玩家状态更新
SSE的特点和优势:
1. 单向通信
服务器到客户端的单向数据流
适合数据推送场景
不适合需要客户端频繁发送数据的场景
简单易用
基于HTTP协议
无需额外协议
开发成本低
浏览器原生支持
3. 自动重连
断线自动重连
无需手动处理重连逻辑
提高连接可靠性
实时性好
数据实时推送
延迟低
适合实时性要求不是特别高的场景
资源占用少
相比WebSocket更轻量
服务器压力小
适合大规模连接
跨域支持好
基于HTTP
支持标准的跨域方案
兼容性好
不适用的场景:
1. 双向通信频繁的场景
即时聊天
在线游戏
视频会议
这些场景建议使用WebSocket
低延迟要求的场景
高频交易
实时竞价
这些场景可能需要专用协议
大文件传输
文件上传下载
视频流传输
这些场景建议使用专门的传输协议
客户端写操作频繁的场景
需要客户端频繁发送数据的应用
这种场景使用SSE会增加HTTP请求数量
使用SSE时的注意事项:
相关文章:
使用SSE协议进行服务端向客户端主动发送消息
1.创建一个SSE配置类: 1.1代码如下:package com.campus.platform.config;import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.servlet.config.annotation.AsyncS…...
FastAPI 高并发与性能优化
FastAPI 高并发与性能优化 目录 🚀 高并发应用设计原则🧑💻 异步 I/O 优化 Web 服务响应速度⏳ 在 FastAPI 中优化异步任务执行顺序🔒 高并发中的共享资源与线程安全问题 1. 🚀 高并发应用设计原则 在构建高并发应…...
DFS+回溯+剪枝(深度优先搜索)——搜索算法
目录 一、递归 1.什么是递归? 2.什么时候使用递归? 3.如何理解递归? 4.如何写好递归? 二、记忆化搜索(记忆递归) 三、回溯 四、剪枝 五、综合试题 1.N皇后 2.解数独 DFS也就是深度优先搜索&am…...
在cursor/vscode中使用godot C#进行游戏开发
要在 Visual Studio Code(VS Code)中启动 C#Godot 项目,可以按照以下步骤进行配置: 1.安装必要的工具 • 安装 Visual Studio Code:确保你已经安装了最新版本的 VS Code。 • 安装.NET SDK:下载并安装.NET 7.x SDK(…...
vant4 van-list组件的使用
<van-listv-if"joblist && joblist.length > 0"v-model:loading"loading":finished"finished":immediate-check"false"finished-text"没有更多了"load"onLoad">// 加载 const loading ref(fals…...
介绍 Liquibase、Flyway、Talend 和 Apache NiFi:选择适合的工具
在现代软件开发中,尤其是在数据库管理和数据集成方面,选择合适的工具至关重要。本文将介绍四个流行的工具:Liquibase、Flyway、Talend 和 Apache NiFi,分析它们的应用、依赖以及如何选择适合的工具。 1. Liquibase 简介ÿ…...
攻防世界33 catcat-new【文件包含/flask_session伪造】
题目: 点击一只猫猫: 看这个url像是文件包含漏洞,试试 dirsearch扫出来/admin,访问也没成功(--delay 0.1 -t 5) 会的那几招全用不了了哈哈,那就继续看答案 先总结几个知识点 1./etc/passwd&am…...
Git -> Git配置密钥对,并查看公钥
Git密钥对的核心作用 私钥 (id_rsa) 你的数字身份证:存放在本机 ~/.ssh 目录下必须严格保密(类似银行卡密码),不可泄露或共享用于 解密 来自服务器的加密信息 公钥 (id_rsa.pub) 可公开的验证锁:需要上传到 Git 服…...
淘宝订单列表Fragment转场动画卡顿解决方案
如何应对产品形态与产品节奏相对确定情况下转变为『在业务需求与产品形态高度不确定性的情况下,如何实现业务交付时间与交付质量的确定性』。我们希望通过混合架构(Native 业务容器 Weex 2.0)作为未来交易终端架构的重要演进方向,…...
【ESP32指向鼠标】——icm20948与esp32通信
【ESP32指向鼠标】——icm20948与esp32通信 ICM-20948介绍 ICM-20948 是一款由 InvenSense(现为 TDK 的一部分)生产的 9 轴传感器集成电路。它结合了 陀螺仪、加速度计和磁力计。 内置了 DMP(Digital Motion Processor)即负责执…...
Xcode证书密钥导入
证书干嘛用 渠道定期会给xcode证书,用来给ios打包用,证书里面有记录哪些设备可以打包进去。 怎么换证书 先更新密钥 在钥匙串访问中,选择系统。(选登录也行,反正两个都要导入就是了)。 mac中双击所有 .p12 后缀的密钥ÿ…...
Ubuntu安装PgSQL17
参考官网教程,Ubuntu24 apt在线安装Postgres 17 1. 要手动配置 Apt 存储库 # 导入存储库签名密钥: sudo apt install curl ca-certificates sudo install -d /usr/share/postgresql-common/pgdg sudo curl -o /usr/share/postgresql-common/pgdg/apt…...
K8S容器启动提示:0/2 nodes are available: 2 Insufficient cpu.
问题:K8S的容器启动报错0/2 nodes are available: 2 Insufficient cpu. 原因:Pod的资源请求(requests)设置不当:在Kubernetes中,调度器根据Pod的requests字段来决定哪个节点可以运行该Pod。如果一个Pod声明…...
LabVIEW外腔二极管激光器稳频实验
本项目利用LabVIEW软件开发了一个用于外腔二极管激光器稳频实验的系统。系统能够实现激光器频率的稳定控制和实时监测,为激光实验提供了重要支持。 项目背景: 系统解决了外腔二极管激光器频率不稳定的问题,以满足对激光器频率稳定性要求较高…...
笔记6——字典dict(dictionary)
文章目录 字典dict(dictionary)定义特点常用操作1.访问值2.添加键值对3.修改值4.删除键值对5.遍历字典6.合并字典 性能应用场景dict和list的区别 字典dict(dictionary) 以 键 - 值对 (key - value pairs)的形式存储数据 定义 字典使用花括号 {} 来定义,键和值之…...
【MySQL】InnoDB单表访问方法
目录 1、背景2、环境3、访问类型【1】const【2】ref【3】ref_or_null【4】range【5】index【6】all 4、总结 1、背景 mysql通过查询条件查询到结果的过程就叫访问方法,一条查询语句的访问方法有很多种,接下来我们就来讲一下各种访问方法。 2、环境 创…...
APP端网络测试与弱网模拟!
当前APP网络环境比较复杂,网络制式有2G、3G、4G网络,还有越来越多的公共Wi-Fi。不同的网络环境和网络制式的差异,都会对用户使用app造成一定影响。另外,当前app使用场景多变,如进地铁、上公交、进电梯等,使…...
【个人开发】deepseed+Llama-factory 本地数据多卡Lora微调
文章目录 1.背景2.微调方式2.1 关键环境版本信息2.2 步骤2.2.1 下载llama-factory2.2.2 准备数据集2.2.3 微调模式2.2.4 微调脚本 2.3 踩坑经验2.3.1 问题一:ValueError: Undefined dataset xxxx in dataset_info.json.2.3.2 问题二: ValueError: Target…...
Redis7.0八种数据结构底层原理
导读 本文介绍redis应用数据结构与物理存储结构,共八种应用数据结构和 一. 内部数据结构 1. sds sds是redis自己设计的字符串结构有以下特点: jemalloc内存管理预分配冗余空间二进制安全(c原生使用\0作为结尾标识,所以无法直接存储\0)动态计数类型(根据字符串长度动态选择…...
Kafka 高吞吐量的底层技术原理
Kafka 之所以能够实现高吞吐量(每秒百万级消息处理),主要依赖于其底层设计和多项优化技术。以下是 Kafka 实现高吞吐量的关键技术原理: 1. 顺序读写磁盘 Kafka 利用磁盘的顺序读写特性,避免了随机读写的性能瓶颈。 顺…...
基于距离变化能量开销动态调整的WSN低功耗拓扑控制开销算法matlab仿真
目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.算法仿真参数 5.算法理论概述 6.参考文献 7.完整程序 1.程序功能描述 通过动态调整节点通信的能量开销,平衡网络负载,延长WSN生命周期。具体通过建立基于距离的能量消耗模型&am…...
《Qt C++ 与 OpenCV:解锁视频播放程序设计的奥秘》
引言:探索视频播放程序设计之旅 在当今数字化时代,多媒体应用已渗透到我们生活的方方面面,从日常的视频娱乐到专业的视频监控、视频会议系统,视频播放程序作为多媒体应用的核心组成部分,扮演着至关重要的角色。无论是在个人电脑、移动设备还是智能电视等平台上,用户都期望…...
AI Agent与Agentic AI:原理、应用、挑战与未来展望
文章目录 一、引言二、AI Agent与Agentic AI的兴起2.1 技术契机与生态成熟2.2 Agent的定义与特征2.3 Agent的发展历程 三、AI Agent的核心技术栈解密3.1 感知模块代码示例:使用Python和OpenCV进行图像识别 3.2 认知与决策模块代码示例:使用OpenAI GPT-3进…...
3.3.1_1 检错编码(奇偶校验码)
从这节课开始,我们会探讨数据链路层的差错控制功能,差错控制功能的主要目标是要发现并且解决一个帧内部的位错误,我们需要使用特殊的编码技术去发现帧内部的位错误,当我们发现位错误之后,通常来说有两种解决方案。第一…...
基于 TAPD 进行项目管理
起因 自己写了个小工具,仓库用的Github。之前在用markdown进行需求管理,现在随着功能的增加,感觉有点难以管理了,所以用TAPD这个工具进行需求、Bug管理。 操作流程 注册 TAPD,需要提供一个企业名新建一个项目&#…...
人机融合智能 | “人智交互”跨学科新领域
本文系统地提出基于“以人为中心AI(HCAI)”理念的人-人工智能交互(人智交互)这一跨学科新领域及框架,定义人智交互领域的理念、基本理论和关键问题、方法、开发流程和参与团队等,阐述提出人智交互新领域的意义。然后,提出人智交互研究的三种新范式取向以及它们的意义。最后,总结…...
Proxmox Mail Gateway安装指南:从零开始配置高效邮件过滤系统
💝💝💝欢迎莅临我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:「storms…...
用鸿蒙HarmonyOS5实现中国象棋小游戏的过程
下面是一个基于鸿蒙OS (HarmonyOS) 的中国象棋小游戏的实现代码。这个实现使用Java语言和鸿蒙的Ability框架。 1. 项目结构 /src/main/java/com/example/chinesechess/├── MainAbilitySlice.java // 主界面逻辑├── ChessView.java // 游戏视图和逻辑├──…...
Spring AOP代理对象生成原理
代理对象生成的关键类是【AnnotationAwareAspectJAutoProxyCreator】,这个类继承了【BeanPostProcessor】是一个后置处理器 在bean对象生命周期中初始化时执行【org.springframework.beans.factory.config.BeanPostProcessor#postProcessAfterInitialization】方法时…...
【iOS】 Block再学习
iOS Block再学习 文章目录 iOS Block再学习前言Block的三种类型__ NSGlobalBlock____ NSMallocBlock____ NSStackBlock__小结 Block底层分析Block的结构捕获自由变量捕获全局(静态)变量捕获静态变量__block修饰符forwarding指针 Block的copy时机block作为函数返回值将block赋给…...
