当前位置: 首页 > news >正文

使用SSE协议进行服务端向客户端主动发送消息

1.创建一个SSE配置类:

1.1代码如下:
package com.campus.platform.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.AsyncSupportConfigurer;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import java.util.concurrent.TimeUnit;@Configuration
public class SseConfig implements WebMvcConfigurer {@Overridepublic void configureAsyncSupport(AsyncSupportConfigurer configurer) {// 设置异步请求超时时间configurer.setDefaultTimeout(TimeUnit.MINUTES.toMillis(30));}
}
1.2 创建一个SSE事件类:
package com.campus.platform.model;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class SseEmitterData {private String id;private String event;private String data;private Long retry;
}
1.3 创建SSE服务接口:
package com.campus.platform.service;import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;public interface SseService {/*** 创建SSE连接*/SseEmitter createSseEmitter(String clientId);/*** 发送消息给指定客户端*/void sendMessage(String clientId, String message);/*** 发送消息给所有客户端*/void sendMessageToAll(String message);/*** 移除指定客户端连接*/void removeClient(String clientId);
}

1.4实现SSE服务:

package com.campus.platform.service.impl;import com.campus.platform.model.SseEmitterData;
import com.campus.platform.service.SseService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;@Slf4j
@Service
public class SseServiceImpl implements SseService {// 用于存储客户端连接private static final Map<String, SseEmitter> SSE_CACHE = new ConcurrentHashMap<>();// 心跳间隔(30秒)private static final long HEARTBEAT_INTERVAL = 30_000;@Overridepublic SseEmitter createSseEmitter(String clientId) {// 设置超时时间为30分钟SseEmitter emitter = new SseEmitter(30 * 60 * 1000L);// 注册回调emitter.onCompletion(() -> {log.info("SSE连接完成, clientId: {}", clientId);removeClient(clientId);});emitter.onTimeout(() -> {log.info("SSE连接超时, clientId: {}", clientId);removeClient(clientId);});emitter.onError(throwable -> {log.error("SSE连接异常, clientId: {}", clientId, throwable);removeClient(clientId);});// 缓存连接SSE_CACHE.put(clientId, emitter);// 发送连接成功消息try {SseEmitterData data = new SseEmitterData();data.setId(clientId);data.setEvent("connected");data.setData("连接成功");data.setRetry(HEARTBEAT_INTERVAL);emitter.send(data, MediaType.APPLICATION_JSON);// 启动心跳线程startHeartbeat(clientId);} catch (IOException e) {log.error("发送连接消息失败", e);removeClient(clientId);}return emitter;}@Overridepublic void sendMessage(String clientId, String message) {SseEmitter emitter = SSE_CACHE.get(clientId);if (emitter != null) {try {SseEmitterData data = new SseEmitterData();data.setId(clientId);data.setEvent("message");data.setData(message);emitter.send(data, MediaType.APPLICATION_JSON);} catch (IOException e) {log.error("发送消息失败, clientId: {}", clientId, e);removeClient(clientId);}}}@Overridepublic void sendMessageToAll(String message) {SSE_CACHE.forEach((clientId, emitter) -> {try {SseEmitterData data = new SseEmitterData();data.setId(clientId);data.setEvent("message");data.setData(message);emitter.send(data, MediaType.APPLICATION_JSON);} catch (IOException e) {log.error("发送消息失败, clientId: {}", clientId, e);removeClient(clientId);}});}@Overridepublic void removeClient(String clientId) {SseEmitter emitter = SSE_CACHE.remove(clientId);if (emitter != null) {try {emitter.complete();} catch (Exception e) {log.error("关闭SSE连接失败", e);}}}/*** 启动心跳线程*/private void startHeartbeat(String clientId) {Thread heartbeatThread = new Thread(() -> {while (SSE_CACHE.containsKey(clientId)) {try {Thread.sleep(HEARTBEAT_INTERVAL);SseEmitter emitter = SSE_CACHE.get(clientId);if (emitter != null) {SseEmitterData data = new SseEmitterData();data.setId(clientId);data.setEvent("heartbeat");data.setData("ping");emitter.send(data, MediaType.APPLICATION_JSON);}} catch (Exception e) {log.error("发送心跳失败", e);removeClient(clientId);break;}}});heartbeatThread.setDaemon(true);heartbeatThread.start();}
}
1.5创建SSE控制器:
package com.campus.platform.controller;import com.campus.platform.service.SseService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;@Slf4j
@Api(tags = "SSE长连接接口")
@RestController
@RequestMapping("/api/sse")
public class SseController {@Autowiredprivate SseService sseService;@ApiOperation("建立SSE连接")@GetMapping(value = "/connect/{clientId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter connect(@ApiParam("客户端ID") @PathVariable String clientId) {log.info("客户端[{}]建立SSE连接", clientId);return sseService.createSseEmitter(clientId);}@ApiOperation("发送消息给指定客户端")@PostMapping("/send/{clientId}")public void sendMessage(@ApiParam("客户端ID") @PathVariable String clientId,@ApiParam("消息内容") @RequestParam String message) {log.info("发送消息给客户端[{}]: {}", clientId, message);sseService.sendMessage(clientId, message);}@ApiOperation("发送消息给所有客户端")@PostMapping("/sendAll")public void sendMessageToAll(@ApiParam("消息内容") @RequestParam String message) {log.info("发送消息给所有客户端: {}", message);sseService.sendMessageToAll(message);}@ApiOperation("断开指定客户端连接")@DeleteMapping("/disconnect/{clientId}")public void disconnect(@ApiParam("客户端ID") @PathVariable String clientId) {log.info("断开客户端[{}]的SSE连接", clientId);sseService.removeClient(clientId);}
}
1.6前端JavaScript示例代码:
// 建立SSE连接
function connectSSE(clientId) {const eventSource = new EventSource(`/api/sse/connect/${clientId}`);// 连接打开eventSource.onopen = function() {console.log('SSE连接已建立');};// 接收消息eventSource.onmessage = function(event) {const data = JSON.parse(event.data);console.log('收到消息:', data);// 处理不同类型的事件switch(data.event) {case 'connected':console.log('连接成功');break;case 'message':console.log('收到业务消息:', data.data);break;case 'heartbeat':console.log('收到心跳');break;}};// 连接错误eventSource.onerror = function(error) {console.error('SSE连接错误:', error);eventSource.close();// 可以在这里实现重连逻辑setTimeout(() => {connectSSE(clientId);}, 5000);};return eventSource;
}// 关闭SSE连接
function closeSSE(eventSource) {if (eventSource) {eventSource.close();}
}
1.7 总结
使用说明:
客户端通过调用/api/sse/connect/{clientId}建立SSE连接
服务端可以通过以下方式发送消息:
/api/sse/send/{clientId} 发送消息给指定客户端
/api/sse/sendAll 发送消息给所有客户端
客户端可以通过/api/sse/disconnect/{clientId}主动断开连接
服务端会每30秒发送一次心跳消息,如果发送失败会自动断开连接
如果连接断开,客户端可以实现自动重连逻辑
主要特点:
使用ConcurrentHashMap存储客户端连接,保证线程安全
实现了心跳机制,保持连接活跃
支持向单个或所有客户端推送消息
实现了完整的异常处理和资源释放
支持自定义消息格式和事件类型
这样就实现了一个基于SSE的长连接服务,可以用于实时消息推送、实时通知等场景。
SSE (Server-Sent Events) 主要适用于以下场景:
实时数据推送
股票/基金价格实时更新
体育比赛实时比分
天气数据实时更新
交易数据实时同步
系统监控和告警
服务器性能监控
系统资源使用率实时展示
异常事件实时告警
日志实时查看
社交媒体功能
实时消息通知
新评论/点赞提醒
在线状态更新
好友动态实时推送
协同办公场景
文档实时同步
多人协作状态更新
任务进度实时通知
会议提醒
物联网应用
设备状态实时监控
传感器数据实时展示
智能家居控制反馈
车联网数据实时更新
游戏应用
游戏状态实时同步
排行榜实时更新
对战结果即时通知
在线玩家状态更新
SSE的特点和优势:
1. 单向通信
服务器到客户端的单向数据流
适合数据推送场景
不适合需要客户端频繁发送数据的场景
简单易用
基于HTTP协议
无需额外协议
开发成本低
浏览器原生支持
3. 自动重连
断线自动重连
无需手动处理重连逻辑
提高连接可靠性
实时性好
数据实时推送
延迟低
适合实时性要求不是特别高的场景
资源占用少
相比WebSocket更轻量
服务器压力小
适合大规模连接
跨域支持好
基于HTTP
支持标准的跨域方案
兼容性好
不适用的场景:
1. 双向通信频繁的场景
即时聊天
在线游戏
视频会议
这些场景建议使用WebSocket
低延迟要求的场景
高频交易
实时竞价
这些场景可能需要专用协议
大文件传输
文件上传下载
视频流传输
这些场景建议使用专门的传输协议
客户端写操作频繁的场景
需要客户端频繁发送数据的应用
这种场景使用SSE会增加HTTP请求数量
使用SSE时的注意事项:

相关文章:

使用SSE协议进行服务端向客户端主动发送消息

1.创建一个SSE配置类: 1.1代码如下&#xff1a;package com.campus.platform.config;import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.servlet.config.annotation.AsyncS…...

FastAPI 高并发与性能优化

FastAPI 高并发与性能优化 目录 &#x1f680; 高并发应用设计原则&#x1f9d1;‍&#x1f4bb; 异步 I/O 优化 Web 服务响应速度⏳ 在 FastAPI 中优化异步任务执行顺序&#x1f512; 高并发中的共享资源与线程安全问题 1. &#x1f680; 高并发应用设计原则 在构建高并发应…...

DFS+回溯+剪枝(深度优先搜索)——搜索算法

目录 一、递归 1.什么是递归&#xff1f; 2.什么时候使用递归&#xff1f; 3.如何理解递归&#xff1f; 4.如何写好递归&#xff1f; 二、记忆化搜索&#xff08;记忆递归&#xff09; 三、回溯 四、剪枝 五、综合试题 1.N皇后 2.解数独 DFS也就是深度优先搜索&am…...

在cursor/vscode中使用godot C#进行游戏开发

要在 Visual Studio Code(VS Code)中启动 C#Godot 项目&#xff0c;可以按照以下步骤进行配置&#xff1a; 1.安装必要的工具 • 安装 Visual Studio Code&#xff1a;确保你已经安装了最新版本的 VS Code。 • 安装.NET SDK&#xff1a;下载并安装.NET 7.x SDK&#xff08;…...

vant4 van-list组件的使用

<van-listv-if"joblist && joblist.length > 0"v-model:loading"loading":finished"finished":immediate-check"false"finished-text"没有更多了"load"onLoad">// 加载 const loading ref(fals…...

介绍 Liquibase、Flyway、Talend 和 Apache NiFi:选择适合的工具

在现代软件开发中&#xff0c;尤其是在数据库管理和数据集成方面&#xff0c;选择合适的工具至关重要。本文将介绍四个流行的工具&#xff1a;Liquibase、Flyway、Talend 和 Apache NiFi&#xff0c;分析它们的应用、依赖以及如何选择适合的工具。 1. Liquibase 简介&#xff…...

攻防世界33 catcat-new【文件包含/flask_session伪造】

题目&#xff1a; 点击一只猫猫&#xff1a; 看这个url像是文件包含漏洞&#xff0c;试试 dirsearch扫出来/admin&#xff0c;访问也没成功&#xff08;--delay 0.1 -t 5&#xff09; 会的那几招全用不了了哈哈&#xff0c;那就继续看答案 先总结几个知识点 1./etc/passwd&am…...

Git -> Git配置密钥对,并查看公钥

Git密钥对的核心作用 私钥 (id_rsa) 你的数字身份证&#xff1a;存放在本机 ~/.ssh 目录下必须严格保密&#xff08;类似银行卡密码&#xff09;&#xff0c;不可泄露或共享用于 解密 来自服务器的加密信息 公钥 (id_rsa.pub) 可公开的验证锁&#xff1a;需要上传到 Git 服…...

淘宝订单列表Fragment转场动画卡顿解决方案

如何应对产品形态与产品节奏相对确定情况下转变为『在业务需求与产品形态高度不确定性的情况下&#xff0c;如何实现业务交付时间与交付质量的确定性』。我们希望通过混合架构&#xff08;Native 业务容器 Weex 2.0&#xff09;作为未来交易终端架构的重要演进方向&#xff0c…...

【ESP32指向鼠标】——icm20948与esp32通信

【ESP32指向鼠标】——icm20948与esp32通信 ICM-20948介绍 ICM-20948 是一款由 InvenSense&#xff08;现为 TDK 的一部分&#xff09;生产的 9 轴传感器集成电路。它结合了 陀螺仪、加速度计和磁力计。 内置了 DMP&#xff08;Digital Motion Processor&#xff09;即负责执…...

Xcode证书密钥导入

证书干嘛用 渠道定期会给xcode证书&#xff0c;用来给ios打包用&#xff0c;证书里面有记录哪些设备可以打包进去。 怎么换证书 先更新密钥 在钥匙串访问中&#xff0c;选择系统。(选登录也行&#xff0c;反正两个都要导入就是了)。 mac中双击所有 .p12 后缀的密钥&#xff…...

Ubuntu安装PgSQL17

参考官网教程&#xff0c;Ubuntu24 apt在线安装Postgres 17 1. 要手动配置 Apt 存储库 # 导入存储库签名密钥&#xff1a; sudo apt install curl ca-certificates sudo install -d /usr/share/postgresql-common/pgdg sudo curl -o /usr/share/postgresql-common/pgdg/apt…...

K8S容器启动提示:0/2 nodes are available: 2 Insufficient cpu.

问题&#xff1a;K8S的容器启动报错0/2 nodes are available: 2 Insufficient cpu. 原因&#xff1a;Pod的资源请求&#xff08;requests&#xff09;设置不当&#xff1a;在Kubernetes中&#xff0c;调度器根据Pod的requests字段来决定哪个节点可以运行该Pod。如果一个Pod声明…...

LabVIEW外腔二极管激光器稳频实验

本项目利用LabVIEW软件开发了一个用于外腔二极管激光器稳频实验的系统。系统能够实现激光器频率的稳定控制和实时监测&#xff0c;为激光实验提供了重要支持。 项目背景&#xff1a; 系统解决了外腔二极管激光器频率不稳定的问题&#xff0c;以满足对激光器频率稳定性要求较高…...

笔记6——字典dict(dictionary)

文章目录 字典dict(dictionary)定义特点常用操作1.访问值2.添加键值对3.修改值4.删除键值对5.遍历字典6.合并字典 性能应用场景dict和list的区别 字典dict(dictionary) 以 键 - 值对 (key - value pairs)的形式存储数据 定义 字典使用花括号 {} 来定义&#xff0c;键和值之…...

【MySQL】InnoDB单表访问方法

目录 1、背景2、环境3、访问类型【1】const【2】ref【3】ref_or_null【4】range【5】index【6】all 4、总结 1、背景 mysql通过查询条件查询到结果的过程就叫访问方法&#xff0c;一条查询语句的访问方法有很多种&#xff0c;接下来我们就来讲一下各种访问方法。 2、环境 创…...

APP端网络测试与弱网模拟!

当前APP网络环境比较复杂&#xff0c;网络制式有2G、3G、4G网络&#xff0c;还有越来越多的公共Wi-Fi。不同的网络环境和网络制式的差异&#xff0c;都会对用户使用app造成一定影响。另外&#xff0c;当前app使用场景多变&#xff0c;如进地铁、上公交、进电梯等&#xff0c;使…...

【个人开发】deepseed+Llama-factory 本地数据多卡Lora微调

文章目录 1.背景2.微调方式2.1 关键环境版本信息2.2 步骤2.2.1 下载llama-factory2.2.2 准备数据集2.2.3 微调模式2.2.4 微调脚本 2.3 踩坑经验2.3.1 问题一&#xff1a;ValueError: Undefined dataset xxxx in dataset_info.json.2.3.2 问题二&#xff1a; ValueError: Target…...

Redis7.0八种数据结构底层原理

导读 本文介绍redis应用数据结构与物理存储结构,共八种应用数据结构和 一. 内部数据结构 1. sds sds是redis自己设计的字符串结构有以下特点: jemalloc内存管理预分配冗余空间二进制安全(c原生使用\0作为结尾标识,所以无法直接存储\0)动态计数类型(根据字符串长度动态选择…...

Kafka 高吞吐量的底层技术原理

Kafka 之所以能够实现高吞吐量&#xff08;每秒百万级消息处理&#xff09;&#xff0c;主要依赖于其底层设计和多项优化技术。以下是 Kafka 实现高吞吐量的关键技术原理&#xff1a; 1. 顺序读写磁盘 Kafka 利用磁盘的顺序读写特性&#xff0c;避免了随机读写的性能瓶颈。 顺…...

应用升级/灾备测试时使用guarantee 闪回点迅速回退

1.场景 应用要升级,当升级失败时,数据库回退到升级前. 要测试系统,测试完成后,数据库要回退到测试前。 相对于RMAN恢复需要很长时间&#xff0c; 数据库闪回只需要几分钟。 2.技术实现 数据库设置 2个db_recovery参数 创建guarantee闪回点&#xff0c;不需要开启数据库闪回。…...

脑机新手指南(八):OpenBCI_GUI:从环境搭建到数据可视化(下)

一、数据处理与分析实战 &#xff08;一&#xff09;实时滤波与参数调整 基础滤波操作 60Hz 工频滤波&#xff1a;勾选界面右侧 “60Hz” 复选框&#xff0c;可有效抑制电网干扰&#xff08;适用于北美地区&#xff0c;欧洲用户可调整为 50Hz&#xff09;。 平滑处理&…...

基于uniapp+WebSocket实现聊天对话、消息监听、消息推送、聊天室等功能,多端兼容

基于 ​UniApp + WebSocket​实现多端兼容的实时通讯系统,涵盖WebSocket连接建立、消息收发机制、多端兼容性配置、消息实时监听等功能,适配​微信小程序、H5、Android、iOS等终端 目录 技术选型分析WebSocket协议优势UniApp跨平台特性WebSocket 基础实现连接管理消息收发连接…...

深入浅出:JavaScript 中的 `window.crypto.getRandomValues()` 方法

深入浅出&#xff1a;JavaScript 中的 window.crypto.getRandomValues() 方法 在现代 Web 开发中&#xff0c;随机数的生成看似简单&#xff0c;却隐藏着许多玄机。无论是生成密码、加密密钥&#xff0c;还是创建安全令牌&#xff0c;随机数的质量直接关系到系统的安全性。Jav…...

pam_env.so模块配置解析

在PAM&#xff08;Pluggable Authentication Modules&#xff09;配置中&#xff0c; /etc/pam.d/su 文件相关配置含义如下&#xff1a; 配置解析 auth required pam_env.so1. 字段分解 字段值说明模块类型auth认证类模块&#xff0c;负责验证用户身份&am…...

第25节 Node.js 断言测试

Node.js的assert模块主要用于编写程序的单元测试时使用&#xff0c;通过断言可以提早发现和排查出错误。 稳定性: 5 - 锁定 这个模块可用于应用的单元测试&#xff0c;通过 require(assert) 可以使用这个模块。 assert.fail(actual, expected, message, operator) 使用参数…...

HTML前端开发:JavaScript 常用事件详解

作为前端开发的核心&#xff0c;JavaScript 事件是用户与网页交互的基础。以下是常见事件的详细说明和用法示例&#xff1a; 1. onclick - 点击事件 当元素被单击时触发&#xff08;左键点击&#xff09; button.onclick function() {alert("按钮被点击了&#xff01;&…...

【Linux系统】Linux环境变量:系统配置的隐形指挥官

。# Linux系列 文章目录 前言一、环境变量的概念二、常见的环境变量三、环境变量特点及其相关指令3.1 环境变量的全局性3.2、环境变量的生命周期 四、环境变量的组织方式五、C语言对环境变量的操作5.1 设置环境变量&#xff1a;setenv5.2 删除环境变量:unsetenv5.3 遍历所有环境…...

FFmpeg avformat_open_input函数分析

函数内部的总体流程如下&#xff1a; avformat_open_input 精简后的代码如下&#xff1a; int avformat_open_input(AVFormatContext **ps, const char *filename,ff_const59 AVInputFormat *fmt, AVDictionary **options) {AVFormatContext *s *ps;int i, ret 0;AVDictio…...

GraphQL 实战篇:Apollo Client 配置与缓存

GraphQL 实战篇&#xff1a;Apollo Client 配置与缓存 上一篇&#xff1a;GraphQL 入门篇&#xff1a;基础查询语法 依旧和上一篇的笔记一样&#xff0c;主实操&#xff0c;没啥过多的细节讲解&#xff0c;代码具体在&#xff1a; https://github.com/GoldenaArcher/graphql…...