当前位置: 首页 > news >正文

Springboot整合SSE实现实时消息推送

SSE详细介绍传送门:SSE实时消息推送

简单描述一下SSE推送在实际项目中应用的常见场景

1,项目页面中有消息通知板块,当信息有变化时,只有手动刷新页面,才会看到最新的数据,这里可以采用SSE技术实时推送最新消息
.
2,大屏数据,这种场景是可以用SSE进行推送,但是需要注意的是SSE是单向的服务端向前端推数据,一般要求的是大屏基本没有查询框条件这种,比较合适。

注意点:如果对于实时数据要求很高并且连接要求做到安全稳定,这里推荐用WebSocket,一般来说对于数据量小,并发连接不是很高要求的情况下,SSE足够,用而且SSE的配置对于前后端都比较简单,但是WebSocket的配置对于后端来说需要花费比较多的时间去完善,而且WebSocket是比较消耗服务器资源和网络带宽资源的,另外一个,如果项目中运维配置了代理服务器的话,可能代理服务器也要配置一些支持WebSocket的属性,总体来说WebSocket配置的位置比较多,容易出现各种坑bug,这里注意一下即可。

话不多说,总结一下Springboot整合SSE需要的步骤如下:

1,编写SSE的服务类:主要包括建立连接、关闭连接、异常连接、心跳检测、推送消息等
.
2,controller层写入SSE连接和关闭接口
.
3,在所需要的业务模块中直接调用SSE服务类中推送消息功能即可

SSE步骤简单,无需导入maven依赖,踩坑bug少,主要是SSE内部支持断线重连,爽爽爽

1,SSE服务类

package com.bosera.salesioc.home.sse;
import com.alibaba.fastjson.JSONObject;
import com.bosera.salesioc.domain.home.vo.MessageVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;@Slf4j
@Component
public class SseEmitterServer{private static final ConcurrentHashMap<String, Map<String,SseEmitter>> sseEmitterPool = new ConcurrentHashMap<>();private static final ConcurrentHashMap<String, Timer>  headerPool = new ConcurrentHashMap<>();public  static ConcurrentHashMap<String, Map<String, SseEmitter>> getSseEmitterPool(){return sseEmitterPool;}/*** 建立连接*/public  SseEmitter connect(String  userCode, String userId){log.info("******************开始建立连接*****************");//设置超时时间,0表示不过期,默认是30秒,超过时间未完成会抛出异常SseEmitter sseemitter = new SseEmitter(0L);//注册回调sseemitter.onCompletion(completionCallBack(userCode,userId));sseemitter.onError(errorCallBack(userCode,userId));sseemitter.onTimeout(timeoutCallBack(userCode,userId));sseEmitterPool.computeIfAbsent(userCode, k -> new ConcurrentHashMap<>()).put(userId, sseemitter);// 开启心跳活跃startHeartbeat(sseemitter,userId);return sseemitter;}/*** 关闭当前连接*/public void complete(String userCode, String userId){Map<String, SseEmitter> map = sseEmitterPool.get(userCode);if (map != null)map.get(userId).complete();}/*** 关闭所有连接*/public void completeAll(){if(!sseEmitterPool.isEmpty()){for (Map.Entry<String, Map<String, SseEmitter>> entry : sseEmitterPool.entrySet()) {Map<String, SseEmitter> userIdMap = entry.getValue();if(!userIdMap.isEmpty()){for (Map.Entry<String, SseEmitter> userIdEntry : userIdMap.entrySet()) {userIdEntry.getValue().complete();}}}sseEmitterPool.clear();}}private  Runnable completionCallBack(String userCode, String userId) {return () -> {removeUser(userCode,userId);log.info("{}结束连接:{}",userCode,userId);};}private  Runnable timeoutCallBack(String userCode, String userId){return ()->{removeUser(userCode,userId);log.error("{}连接超時:{}",userCode,userId);};}private  Consumer<Throwable> errorCallBack(String userCode, String userId){return throwable -> {log.error("{}连接异常:{}",userCode,userId);stopHeartbeat(userId);};}/*** 推送消息*/public  void sendMessage(String userCode, MessageVO message){Map<String, SseEmitter> map = sseEmitterPool.get(userCode);if (map != null) {for (Map.Entry<String, SseEmitter> entry : map.entrySet()) {try {// 发送事件entry.getValue().send(JSONObject.toJSONString(message));}catch (Exception e){log.error("{}连接信息:{}, 错误消息:{}",userCode,entry.getKey(),e.getMessage());}}}}private void removeUser(String userCode, String userId){try {Map<String, SseEmitter> map = sseEmitterPool.get(userCode);if (map != null) {map.remove(userId);// 如果该用户的所有会话都已关闭,则移除整个映射if (map.isEmpty())sseEmitterPool.remove(userCode);}// 关闭心跳stopHeartbeat(userId);}catch (Exception e){log.error("关闭连接异常{}",e.getMessage());}}/*** 开启心跳*/public void startHeartbeat(SseEmitter sseemitter, String userId) {Timer heartbeatTimer = new Timer();headerPool.put(userId,heartbeatTimer);heartbeatTimer.schedule(new TimerTask() {@Overridepublic void run() {if (Objects.nonNull(headerPool.get(userId))) {// 发送心跳:保持长连接try {sseemitter.send("connect active");} catch (Exception e) {log.error("connect active error");}}}}, 25000, 25000);}/*** 关闭心跳* @param userId*/public void stopHeartbeat(String userId) {Timer timer = headerPool.get(userId);if (timer!= null)timer.cancel();headerPool.remove(userId);}
}

推送的消息可以统一定义一个类来封装信息
2,消息推送响应体

/*** @Author xiaozq* @Date 2024/2/21* @Description: 消息推送响应体*/
public class MessageVO<T> {// 主题:不同位置推送的内容不同private String topic;// 推送消息private T data;public void setTopic(String topic) {this.topic = topic;}public void setData(T data) {this.data = data;}public String getTopic() {return topic;}public T getData() {return data;}
}

3,controller层编写连接和关闭接口

@RestController
@RequestMapping("/sse")
@Slf4j
public class SSEController{@Autowiredprivate SseEmitterServer sseEmitterServer;/*** 用于创建连接*/@GetMapping(value = "/connect/{userCode}/{userId}",produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter connect(@PathVariable("userCode") String userCode, @PathVariable("userId") String userId){return sseEmitterServer.connect(userCode, userId);}/*** 关闭连接*/@GetMapping(value = "/close/{userCode}/{userId}")public void close(@PathVariable("userCode") String userCode,@PathVariable("userId") String userId ) {sseEmitterServer.complete(userCode, userId);}}

4,业务中实际应用:推送消息

@Autowired
SseInfoService  sseInfoService;
private void handlerMessageInform() {ConcurrentHashMap<String, Map<String, SseEmitter>> sessionPool = SseEmitterServer.getSseEmitterPool();for (Map.Entry<String, Map<String, SseEmitter>> entry : sessionPool.entrySet()) {// 封装消息MessageVO<List<MessageNotificationVO>> messageVO = new MessageVO();messageVO.setTopic(TopicTypeEnum.MESSAGE_INFORM.getTopic());messageVO.setData(messageService.getMessageList(request));// 推送消息sseEmitterServer.sendMessage(entry.getKey(), messageVO);}}

在实践过程中存在的问题:

1,报错504 gateway timeout:这里主要是原项目中配置了响应超时时间,不支持长连接,这里的做法是心跳活跃,保证连接不会被掐断,可以写一个定时任务,每天晚上定时去关闭所有连接,第二天用新的连接,这样可以尽量保证内存的连接数不会过多占用内存,因为夜深人静的时候谁还会打开web项目工作啊,哈哈太卷了吧,所以把时间定在晚上最好。
.
如果项目是集群模式的话,上述代码就得改造了,建议是把消息推送这块单独抽出一个微服务模块来,这样子保证所有的连接统一走单独的一个服务,因为SSE不是双向的,既然是单项连接,与后端集群下的其中一个服务建立连接产生的IO流这是只属于当前服务的本地IO,关闭IO只能连接对应的这台服务去关闭,否则关闭失效。总之,考虑的点还有很多,一般情况下,SSE够用啦

总体来说,应用是比较简单的,涉及到消息实时推送相关的业务,可以尝试SSE

相关文章:

Springboot整合SSE实现实时消息推送

SSE详细介绍传送门&#xff1a;SSE实时消息推送 简单描述一下SSE推送在实际项目中应用的常见场景 1&#xff0c;项目页面中有消息通知板块&#xff0c;当信息有变化时&#xff0c;只有手动刷新页面&#xff0c;才会看到最新的数据&#xff0c;这里可以采用SSE技术实时推送最新…...

在pytorch中利用GPU训练神经网络时代码的执行顺序并提高训练效率

在pytorch中利用GPU训练神经网络时代码的执行顺序并提高训练效率 在 PyTorch 中&#xff0c;大多数操作在 GPU 上默认是异步执行的&#xff0c;但这并不意味着它们是并行执行的。要理解代码是同步还是异步执行&#xff0c;以及是串行还是并行执行&#xff0c;我们需要考虑几个…...

vue3学习

距离vue2学习已经一年度了&#xff0c;现在开始vue3的学习。 一、webpack &#xff08;1&#xff09;创建列表隔行变色项目及webpack使用&#xff1a; 新建项目空白目录&#xff0c;并运行npm init -y命令&#xff0c;初始化包管理配置文件package.json&#xff1b; 新建sr…...

毫秒生成的时间戳如何转化成东八区具体时间

假设现在有一个时间是1709101071419L 后端代码实现 Java代码&#xff08;东八区时间&#xff09; 在Java代码中&#xff0c;我们将时区从UTC调整为东八区&#xff08;UTC8&#xff09;&#xff1a; import java.time.Instant; import java.time.ZoneId; import java.time.Z…...

02. Nginx入门-Nginx安装

Nginx安装 yum安装 编辑yum环境 cat > /etc/yum.repos.d/nginx.repo << EOF [nginx-stable] namenginx stable repo baseurlhttp://nginx.org/packages/centos/$releasever/$basearch/ gpgcheck1 enabled1 gpgkeyhttps://nginx.org/keys/nginx_signing.key module_…...

leetcode73. 矩阵置零

链接见&#xff1a;https://leetcode.cn/problems/set-matrix-zeroes/description/ 题目描述 给定一个 m x n 的矩阵&#xff0c;如果一个元素为 0 &#xff0c;则将其所在行和列的所有元素都设为 0 。请使用 原地 算法。 AC代码 class Solution { public:void setZeroes(vec…...

【中间件】RabbitMQ入门

&#x1f4dd;个人主页&#xff1a;五敷有你 &#x1f525;系列专栏&#xff1a;中间件 ⛺️稳中求进&#xff0c;晒太阳 MQ的优劣&#xff1a; 优势 应用解耦&#xff1a;提升了系统容错性和可维护性异步提速&#xff1a;提升用户体验和系统吞吐量消峰填谷&#xff1…...

rtt的io设备框架面向对象学习-电阻屏LCD设备

目录 1.8080通信的电阻屏LCD设备1.1 构造流程1.2 使用2.i2c和spi通信的电阻屏LCD 电阻屏LCD通信接口有支持I2c、SPI和8080通信接口的。 1.8080通信的电阻屏LCD设备 lcd这块不像其他设备类&#xff0c;rtt没有实现的设备驱动框架层&#xff0c;那么是在驱动层直接实现的。 以…...

商城免费搭建之java商城 java电子商务Spring Cloud+Spring Boot+mybatis+MQ+VR全景

1. 涉及平台 平台管理、商家端&#xff08;PC端、手机端&#xff09;、买家平台&#xff08;H5/公众号、小程序、APP端&#xff08;IOS/Android&#xff09;、微服务平台&#xff08;业务服务&#xff09; 2. 核心架构 Spring Cloud、Spring Boot、Mybatis、Redis 3. 前端框架…...

蓝桥杯刷题--python-16

562. 壁画 - AcWing题库 Tint(input()) j1 while(j<T): N int(input()) ainput() s [0]*(N1) # 求前戳和 for i in range(1, N 1): s[i] int(a[i-1]) s[i - 1] # 枚举 # 区间 max_ float(-inf) k (N 2 - 1) // 2 for i in …...

闰年计算中的计算机Bug

不知道你有没有看过凯瑟琳泽塔琼斯主演的《偷天陷阱》&#xff0c;里面主题思想是用银行结算系统的千年虫bug&#xff0c;精心设计&#xff0c;盗取银行几十亿的精彩动作片。所谓2000 年千禧年的千年虫&#xff0c;其实就是计算机计算闰年的bug。 这个闰年计算的历史源远流长&…...

python水表识别图像识别深度学习 CNN

python水表识别&#xff0c;图像识别深度学习 CNN&#xff0c;Opencv,Keras 重点&#xff1a;项目和文档是本人近期原创所作&#xff01;程序可以将水表图片里面的数据进行深度学习&#xff0c;提取相关信息训练&#xff0c;lw1.3万字重复15%&#xff0c;可以直接上交那种&…...

Java对接快递100实时快递单号查询API接口

目录 1.引入依赖 2.定义配置信息 3.模块结构 4.Controller 5.Service实现类 6.返回数据dto以及dto中的数据dto 7.测试运行 今天也是接到了这个任务&#xff0c;官网有小demo&#xff0c;可以下载下来参考test中代码 官方文档地址&#xff1a; 实时快递查询接口技术文档…...

Redis常见的15个【坑】,避坑指南

一、常见命令 1.1 过期时间意外丢失 原因&#xff1a; SET命令如果不设置过期时间&#xff0c;那么Redis会自动【擦除】这个key的过期时间 1.2 DEL命令阻塞redis key是String类型时&#xff0c;DEL时间复杂度是O(1)key是List/Hash/Set/ZSet类型&#xff0c;DEL时间复杂度是…...

04. Nginx入门-Nginx WEB模块

测试环境 此处使用的yum安装的Nginx路径。 此处域名均在本地配置hosts。 主配置文件 路径&#xff1a;/etc/nginx/nginx.conf user nginx; worker_processes auto;error_log /var/log/nginx/error.log notice; pid /var/run/nginx.pid;events {worker_connection…...

Python在信息安全领域中具有重要的作用

Python在信息安全领域中具有重要的作用。下面是几个方面的说明&#xff1a; 网络安全&#xff1a;Python提供了一系列用于网络安全的库和工具&#xff0c;例如Scapy、Nmap等。这些工具可以应用于漏洞扫描、网络流量分析、数据包嗅探等操作&#xff0c;帮助检测和防御网络攻击。…...

Linux 定时备份文件到另一台服务器

1. 需求 用户要求将 Tomcat 的日志文件定时备份到另一台服务器。同事给我提供了一个写好的 java 框架&#xff0c;但实在不想给用户再维护另一个服务了&#xff0c;所以另寻他法。 2. 问题 使用 scp 等跨服务器传输命令时需要手动输入用户名的密码才可进行文件传输&#xff…...

C++输入输出(I\O)

我们知道C是由C语言发展而来的&#xff0c;几乎完全兼容C语言&#xff0c;换句话说&#xff0c;你可以在C里面编译C语言代码。如下图: C语言是面向过程的语言&#xff0c;C在C语言之上增加了面向对象以及泛型编程机制&#xff0c;因此C更适合中大型程序的开发&#xff0c;然而C…...

基本设计模式

单例模式 ES5 function Duck1(name:string){this.namenamethis.instancenull }Duck1.prototype.getNamefunction(){console.log(this.name) }Duck1.getInstancefunction(name:string){if(!this.instance){this.instance new Duck1(name)} } const aDuck1.getInstance(a) const…...

双通道音频功率放大电路,外接元件少, 通道分离性好,3V 的低压下可正常使用——D2025

D2025 为立体声音频功率放大集成电路&#xff0c;适用于各类袖珍或便携式立体声 收录机中作功率放放大器。 D2025 采用 DIP16 封装形式。 主要特点&#xff1a;  适用于立体声或 BTL 工作模式  外接元件少  通道分离性好  电源电压范围宽&#xff08;3V~12V &#xff…...

智慧工地云平台源码,基于微服务架构+Java+Spring Cloud +UniApp +MySql

智慧工地管理云平台系统&#xff0c;智慧工地全套源码&#xff0c;java版智慧工地源码&#xff0c;支持PC端、大屏端、移动端。 智慧工地聚焦建筑行业的市场需求&#xff0c;提供“平台网络终端”的整体解决方案&#xff0c;提供劳务管理、视频管理、智能监测、绿色施工、安全管…...

使用分级同态加密防御梯度泄漏

抽象 联邦学习 &#xff08;FL&#xff09; 支持跨分布式客户端进行协作模型训练&#xff0c;而无需共享原始数据&#xff0c;这使其成为在互联和自动驾驶汽车 &#xff08;CAV&#xff09; 等领域保护隐私的机器学习的一种很有前途的方法。然而&#xff0c;最近的研究表明&…...

iPhone密码忘记了办?iPhoneUnlocker,iPhone解锁工具Aiseesoft iPhone Unlocker 高级注册版​分享

平时用 iPhone 的时候&#xff0c;难免会碰到解锁的麻烦事。比如密码忘了、人脸识别 / 指纹识别突然不灵&#xff0c;或者买了二手 iPhone 却被原来的 iCloud 账号锁住&#xff0c;这时候就需要靠谱的解锁工具来帮忙了。Aiseesoft iPhone Unlocker 就是专门解决这些问题的软件&…...

对WWDC 2025 Keynote 内容的预测

借助我们以往对苹果公司发展路径的深入研究经验&#xff0c;以及大语言模型的分析能力&#xff0c;我们系统梳理了多年来苹果 WWDC 主题演讲的规律。在 WWDC 2025 即将揭幕之际&#xff0c;我们让 ChatGPT 对今年的 Keynote 内容进行了一个初步预测&#xff0c;聊作存档。等到明…...

JVM暂停(Stop-The-World,STW)的原因分类及对应排查方案

JVM暂停(Stop-The-World,STW)的完整原因分类及对应排查方案,结合JVM运行机制和常见故障场景整理而成: 一、GC相关暂停​​ 1. ​​安全点(Safepoint)阻塞​​ ​​现象​​:JVM暂停但无GC日志,日志显示No GCs detected。​​原因​​:JVM等待所有线程进入安全点(如…...

基于matlab策略迭代和值迭代法的动态规划

经典的基于策略迭代和值迭代法的动态规划matlab代码&#xff0c;实现机器人的最优运输 Dynamic-Programming-master/Environment.pdf , 104724 Dynamic-Programming-master/README.md , 506 Dynamic-Programming-master/generalizedPolicyIteration.m , 1970 Dynamic-Programm…...

Typeerror: cannot read properties of undefined (reading ‘XXX‘)

最近需要在离线机器上运行软件&#xff0c;所以得把软件用docker打包起来&#xff0c;大部分功能都没问题&#xff0c;出了一个奇怪的事情。同样的代码&#xff0c;在本机上用vscode可以运行起来&#xff0c;但是打包之后在docker里出现了问题。使用的是dialog组件&#xff0c;…...

Springboot社区养老保险系统小程序

一、前言 随着我国经济迅速发展&#xff0c;人们对手机的需求越来越大&#xff0c;各种手机软件也都在被广泛应用&#xff0c;但是对于手机进行数据信息管理&#xff0c;对于手机的各种软件也是备受用户的喜爱&#xff0c;社区养老保险系统小程序被用户普遍使用&#xff0c;为方…...

R语言速释制剂QBD解决方案之三

本文是《Quality by Design for ANDAs: An Example for Immediate-Release Dosage Forms》第一个处方的R语言解决方案。 第一个处方研究评估原料药粒径分布、MCC/Lactose比例、崩解剂用量对制剂CQAs的影响。 第二处方研究用于理解颗粒外加硬脂酸镁和滑石粉对片剂质量和可生产…...

IP如何挑?2025年海外专线IP如何购买?

你花了时间和预算买了IP&#xff0c;结果IP质量不佳&#xff0c;项目效率低下不说&#xff0c;还可能带来莫名的网络问题&#xff0c;是不是太闹心了&#xff1f;尤其是在面对海外专线IP时&#xff0c;到底怎么才能买到适合自己的呢&#xff1f;所以&#xff0c;挑IP绝对是个技…...