当前位置: 首页 > news >正文

使用SSE协议进行服务端向客户端主动发送消息

1.创建一个SSE配置类:

1.1代码如下:
package com.campus.platform.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.AsyncSupportConfigurer;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import java.util.concurrent.TimeUnit;@Configuration
public class SseConfig implements WebMvcConfigurer {@Overridepublic void configureAsyncSupport(AsyncSupportConfigurer configurer) {// 设置异步请求超时时间configurer.setDefaultTimeout(TimeUnit.MINUTES.toMillis(30));}
}
1.2 创建一个SSE事件类:
package com.campus.platform.model;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class SseEmitterData {private String id;private String event;private String data;private Long retry;
}
1.3 创建SSE服务接口:
package com.campus.platform.service;import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;public interface SseService {/*** 创建SSE连接*/SseEmitter createSseEmitter(String clientId);/*** 发送消息给指定客户端*/void sendMessage(String clientId, String message);/*** 发送消息给所有客户端*/void sendMessageToAll(String message);/*** 移除指定客户端连接*/void removeClient(String clientId);
}

1.4实现SSE服务:

package com.campus.platform.service.impl;import com.campus.platform.model.SseEmitterData;
import com.campus.platform.service.SseService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;@Slf4j
@Service
public class SseServiceImpl implements SseService {// 用于存储客户端连接private static final Map<String, SseEmitter> SSE_CACHE = new ConcurrentHashMap<>();// 心跳间隔(30秒)private static final long HEARTBEAT_INTERVAL = 30_000;@Overridepublic SseEmitter createSseEmitter(String clientId) {// 设置超时时间为30分钟SseEmitter emitter = new SseEmitter(30 * 60 * 1000L);// 注册回调emitter.onCompletion(() -> {log.info("SSE连接完成, clientId: {}", clientId);removeClient(clientId);});emitter.onTimeout(() -> {log.info("SSE连接超时, clientId: {}", clientId);removeClient(clientId);});emitter.onError(throwable -> {log.error("SSE连接异常, clientId: {}", clientId, throwable);removeClient(clientId);});// 缓存连接SSE_CACHE.put(clientId, emitter);// 发送连接成功消息try {SseEmitterData data = new SseEmitterData();data.setId(clientId);data.setEvent("connected");data.setData("连接成功");data.setRetry(HEARTBEAT_INTERVAL);emitter.send(data, MediaType.APPLICATION_JSON);// 启动心跳线程startHeartbeat(clientId);} catch (IOException e) {log.error("发送连接消息失败", e);removeClient(clientId);}return emitter;}@Overridepublic void sendMessage(String clientId, String message) {SseEmitter emitter = SSE_CACHE.get(clientId);if (emitter != null) {try {SseEmitterData data = new SseEmitterData();data.setId(clientId);data.setEvent("message");data.setData(message);emitter.send(data, MediaType.APPLICATION_JSON);} catch (IOException e) {log.error("发送消息失败, clientId: {}", clientId, e);removeClient(clientId);}}}@Overridepublic void sendMessageToAll(String message) {SSE_CACHE.forEach((clientId, emitter) -> {try {SseEmitterData data = new SseEmitterData();data.setId(clientId);data.setEvent("message");data.setData(message);emitter.send(data, MediaType.APPLICATION_JSON);} catch (IOException e) {log.error("发送消息失败, clientId: {}", clientId, e);removeClient(clientId);}});}@Overridepublic void removeClient(String clientId) {SseEmitter emitter = SSE_CACHE.remove(clientId);if (emitter != null) {try {emitter.complete();} catch (Exception e) {log.error("关闭SSE连接失败", e);}}}/*** 启动心跳线程*/private void startHeartbeat(String clientId) {Thread heartbeatThread = new Thread(() -> {while (SSE_CACHE.containsKey(clientId)) {try {Thread.sleep(HEARTBEAT_INTERVAL);SseEmitter emitter = SSE_CACHE.get(clientId);if (emitter != null) {SseEmitterData data = new SseEmitterData();data.setId(clientId);data.setEvent("heartbeat");data.setData("ping");emitter.send(data, MediaType.APPLICATION_JSON);}} catch (Exception e) {log.error("发送心跳失败", e);removeClient(clientId);break;}}});heartbeatThread.setDaemon(true);heartbeatThread.start();}
}
1.5创建SSE控制器:
package com.campus.platform.controller;import com.campus.platform.service.SseService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;@Slf4j
@Api(tags = "SSE长连接接口")
@RestController
@RequestMapping("/api/sse")
public class SseController {@Autowiredprivate SseService sseService;@ApiOperation("建立SSE连接")@GetMapping(value = "/connect/{clientId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter connect(@ApiParam("客户端ID") @PathVariable String clientId) {log.info("客户端[{}]建立SSE连接", clientId);return sseService.createSseEmitter(clientId);}@ApiOperation("发送消息给指定客户端")@PostMapping("/send/{clientId}")public void sendMessage(@ApiParam("客户端ID") @PathVariable String clientId,@ApiParam("消息内容") @RequestParam String message) {log.info("发送消息给客户端[{}]: {}", clientId, message);sseService.sendMessage(clientId, message);}@ApiOperation("发送消息给所有客户端")@PostMapping("/sendAll")public void sendMessageToAll(@ApiParam("消息内容") @RequestParam String message) {log.info("发送消息给所有客户端: {}", message);sseService.sendMessageToAll(message);}@ApiOperation("断开指定客户端连接")@DeleteMapping("/disconnect/{clientId}")public void disconnect(@ApiParam("客户端ID") @PathVariable String clientId) {log.info("断开客户端[{}]的SSE连接", clientId);sseService.removeClient(clientId);}
}
1.6前端JavaScript示例代码:
// 建立SSE连接
function connectSSE(clientId) {const eventSource = new EventSource(`/api/sse/connect/${clientId}`);// 连接打开eventSource.onopen = function() {console.log('SSE连接已建立');};// 接收消息eventSource.onmessage = function(event) {const data = JSON.parse(event.data);console.log('收到消息:', data);// 处理不同类型的事件switch(data.event) {case 'connected':console.log('连接成功');break;case 'message':console.log('收到业务消息:', data.data);break;case 'heartbeat':console.log('收到心跳');break;}};// 连接错误eventSource.onerror = function(error) {console.error('SSE连接错误:', error);eventSource.close();// 可以在这里实现重连逻辑setTimeout(() => {connectSSE(clientId);}, 5000);};return eventSource;
}// 关闭SSE连接
function closeSSE(eventSource) {if (eventSource) {eventSource.close();}
}
1.7 总结
使用说明:
客户端通过调用/api/sse/connect/{clientId}建立SSE连接
服务端可以通过以下方式发送消息:
/api/sse/send/{clientId} 发送消息给指定客户端
/api/sse/sendAll 发送消息给所有客户端
客户端可以通过/api/sse/disconnect/{clientId}主动断开连接
服务端会每30秒发送一次心跳消息,如果发送失败会自动断开连接
如果连接断开,客户端可以实现自动重连逻辑
主要特点:
使用ConcurrentHashMap存储客户端连接,保证线程安全
实现了心跳机制,保持连接活跃
支持向单个或所有客户端推送消息
实现了完整的异常处理和资源释放
支持自定义消息格式和事件类型
这样就实现了一个基于SSE的长连接服务,可以用于实时消息推送、实时通知等场景。
SSE (Server-Sent Events) 主要适用于以下场景:
实时数据推送
股票/基金价格实时更新
体育比赛实时比分
天气数据实时更新
交易数据实时同步
系统监控和告警
服务器性能监控
系统资源使用率实时展示
异常事件实时告警
日志实时查看
社交媒体功能
实时消息通知
新评论/点赞提醒
在线状态更新
好友动态实时推送
协同办公场景
文档实时同步
多人协作状态更新
任务进度实时通知
会议提醒
物联网应用
设备状态实时监控
传感器数据实时展示
智能家居控制反馈
车联网数据实时更新
游戏应用
游戏状态实时同步
排行榜实时更新
对战结果即时通知
在线玩家状态更新
SSE的特点和优势:
1. 单向通信
服务器到客户端的单向数据流
适合数据推送场景
不适合需要客户端频繁发送数据的场景
简单易用
基于HTTP协议
无需额外协议
开发成本低
浏览器原生支持
3. 自动重连
断线自动重连
无需手动处理重连逻辑
提高连接可靠性
实时性好
数据实时推送
延迟低
适合实时性要求不是特别高的场景
资源占用少
相比WebSocket更轻量
服务器压力小
适合大规模连接
跨域支持好
基于HTTP
支持标准的跨域方案
兼容性好
不适用的场景:
1. 双向通信频繁的场景
即时聊天
在线游戏
视频会议
这些场景建议使用WebSocket
低延迟要求的场景
高频交易
实时竞价
这些场景可能需要专用协议
大文件传输
文件上传下载
视频流传输
这些场景建议使用专门的传输协议
客户端写操作频繁的场景
需要客户端频繁发送数据的应用
这种场景使用SSE会增加HTTP请求数量
使用SSE时的注意事项:

相关文章:

使用SSE协议进行服务端向客户端主动发送消息

1.创建一个SSE配置类: 1.1代码如下&#xff1a;package com.campus.platform.config;import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.servlet.config.annotation.AsyncS…...

FastAPI 高并发与性能优化

FastAPI 高并发与性能优化 目录 &#x1f680; 高并发应用设计原则&#x1f9d1;‍&#x1f4bb; 异步 I/O 优化 Web 服务响应速度⏳ 在 FastAPI 中优化异步任务执行顺序&#x1f512; 高并发中的共享资源与线程安全问题 1. &#x1f680; 高并发应用设计原则 在构建高并发应…...

DFS+回溯+剪枝(深度优先搜索)——搜索算法

目录 一、递归 1.什么是递归&#xff1f; 2.什么时候使用递归&#xff1f; 3.如何理解递归&#xff1f; 4.如何写好递归&#xff1f; 二、记忆化搜索&#xff08;记忆递归&#xff09; 三、回溯 四、剪枝 五、综合试题 1.N皇后 2.解数独 DFS也就是深度优先搜索&am…...

在cursor/vscode中使用godot C#进行游戏开发

要在 Visual Studio Code(VS Code)中启动 C#Godot 项目&#xff0c;可以按照以下步骤进行配置&#xff1a; 1.安装必要的工具 • 安装 Visual Studio Code&#xff1a;确保你已经安装了最新版本的 VS Code。 • 安装.NET SDK&#xff1a;下载并安装.NET 7.x SDK&#xff08;…...

vant4 van-list组件的使用

<van-listv-if"joblist && joblist.length > 0"v-model:loading"loading":finished"finished":immediate-check"false"finished-text"没有更多了"load"onLoad">// 加载 const loading ref(fals…...

介绍 Liquibase、Flyway、Talend 和 Apache NiFi:选择适合的工具

在现代软件开发中&#xff0c;尤其是在数据库管理和数据集成方面&#xff0c;选择合适的工具至关重要。本文将介绍四个流行的工具&#xff1a;Liquibase、Flyway、Talend 和 Apache NiFi&#xff0c;分析它们的应用、依赖以及如何选择适合的工具。 1. Liquibase 简介&#xff…...

攻防世界33 catcat-new【文件包含/flask_session伪造】

题目&#xff1a; 点击一只猫猫&#xff1a; 看这个url像是文件包含漏洞&#xff0c;试试 dirsearch扫出来/admin&#xff0c;访问也没成功&#xff08;--delay 0.1 -t 5&#xff09; 会的那几招全用不了了哈哈&#xff0c;那就继续看答案 先总结几个知识点 1./etc/passwd&am…...

Git -> Git配置密钥对,并查看公钥

Git密钥对的核心作用 私钥 (id_rsa) 你的数字身份证&#xff1a;存放在本机 ~/.ssh 目录下必须严格保密&#xff08;类似银行卡密码&#xff09;&#xff0c;不可泄露或共享用于 解密 来自服务器的加密信息 公钥 (id_rsa.pub) 可公开的验证锁&#xff1a;需要上传到 Git 服…...

淘宝订单列表Fragment转场动画卡顿解决方案

如何应对产品形态与产品节奏相对确定情况下转变为『在业务需求与产品形态高度不确定性的情况下&#xff0c;如何实现业务交付时间与交付质量的确定性』。我们希望通过混合架构&#xff08;Native 业务容器 Weex 2.0&#xff09;作为未来交易终端架构的重要演进方向&#xff0c…...

【ESP32指向鼠标】——icm20948与esp32通信

【ESP32指向鼠标】——icm20948与esp32通信 ICM-20948介绍 ICM-20948 是一款由 InvenSense&#xff08;现为 TDK 的一部分&#xff09;生产的 9 轴传感器集成电路。它结合了 陀螺仪、加速度计和磁力计。 内置了 DMP&#xff08;Digital Motion Processor&#xff09;即负责执…...

Xcode证书密钥导入

证书干嘛用 渠道定期会给xcode证书&#xff0c;用来给ios打包用&#xff0c;证书里面有记录哪些设备可以打包进去。 怎么换证书 先更新密钥 在钥匙串访问中&#xff0c;选择系统。(选登录也行&#xff0c;反正两个都要导入就是了)。 mac中双击所有 .p12 后缀的密钥&#xff…...

Ubuntu安装PgSQL17

参考官网教程&#xff0c;Ubuntu24 apt在线安装Postgres 17 1. 要手动配置 Apt 存储库 # 导入存储库签名密钥&#xff1a; sudo apt install curl ca-certificates sudo install -d /usr/share/postgresql-common/pgdg sudo curl -o /usr/share/postgresql-common/pgdg/apt…...

K8S容器启动提示:0/2 nodes are available: 2 Insufficient cpu.

问题&#xff1a;K8S的容器启动报错0/2 nodes are available: 2 Insufficient cpu. 原因&#xff1a;Pod的资源请求&#xff08;requests&#xff09;设置不当&#xff1a;在Kubernetes中&#xff0c;调度器根据Pod的requests字段来决定哪个节点可以运行该Pod。如果一个Pod声明…...

LabVIEW外腔二极管激光器稳频实验

本项目利用LabVIEW软件开发了一个用于外腔二极管激光器稳频实验的系统。系统能够实现激光器频率的稳定控制和实时监测&#xff0c;为激光实验提供了重要支持。 项目背景&#xff1a; 系统解决了外腔二极管激光器频率不稳定的问题&#xff0c;以满足对激光器频率稳定性要求较高…...

笔记6——字典dict(dictionary)

文章目录 字典dict(dictionary)定义特点常用操作1.访问值2.添加键值对3.修改值4.删除键值对5.遍历字典6.合并字典 性能应用场景dict和list的区别 字典dict(dictionary) 以 键 - 值对 (key - value pairs)的形式存储数据 定义 字典使用花括号 {} 来定义&#xff0c;键和值之…...

【MySQL】InnoDB单表访问方法

目录 1、背景2、环境3、访问类型【1】const【2】ref【3】ref_or_null【4】range【5】index【6】all 4、总结 1、背景 mysql通过查询条件查询到结果的过程就叫访问方法&#xff0c;一条查询语句的访问方法有很多种&#xff0c;接下来我们就来讲一下各种访问方法。 2、环境 创…...

APP端网络测试与弱网模拟!

当前APP网络环境比较复杂&#xff0c;网络制式有2G、3G、4G网络&#xff0c;还有越来越多的公共Wi-Fi。不同的网络环境和网络制式的差异&#xff0c;都会对用户使用app造成一定影响。另外&#xff0c;当前app使用场景多变&#xff0c;如进地铁、上公交、进电梯等&#xff0c;使…...

【个人开发】deepseed+Llama-factory 本地数据多卡Lora微调

文章目录 1.背景2.微调方式2.1 关键环境版本信息2.2 步骤2.2.1 下载llama-factory2.2.2 准备数据集2.2.3 微调模式2.2.4 微调脚本 2.3 踩坑经验2.3.1 问题一&#xff1a;ValueError: Undefined dataset xxxx in dataset_info.json.2.3.2 问题二&#xff1a; ValueError: Target…...

Redis7.0八种数据结构底层原理

导读 本文介绍redis应用数据结构与物理存储结构,共八种应用数据结构和 一. 内部数据结构 1. sds sds是redis自己设计的字符串结构有以下特点: jemalloc内存管理预分配冗余空间二进制安全(c原生使用\0作为结尾标识,所以无法直接存储\0)动态计数类型(根据字符串长度动态选择…...

Kafka 高吞吐量的底层技术原理

Kafka 之所以能够实现高吞吐量&#xff08;每秒百万级消息处理&#xff09;&#xff0c;主要依赖于其底层设计和多项优化技术。以下是 Kafka 实现高吞吐量的关键技术原理&#xff1a; 1. 顺序读写磁盘 Kafka 利用磁盘的顺序读写特性&#xff0c;避免了随机读写的性能瓶颈。 顺…...

基于距离变化能量开销动态调整的WSN低功耗拓扑控制开销算法matlab仿真

目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.算法仿真参数 5.算法理论概述 6.参考文献 7.完整程序 1.程序功能描述 通过动态调整节点通信的能量开销&#xff0c;平衡网络负载&#xff0c;延长WSN生命周期。具体通过建立基于距离的能量消耗模型&am…...

《Qt C++ 与 OpenCV:解锁视频播放程序设计的奥秘》

引言:探索视频播放程序设计之旅 在当今数字化时代,多媒体应用已渗透到我们生活的方方面面,从日常的视频娱乐到专业的视频监控、视频会议系统,视频播放程序作为多媒体应用的核心组成部分,扮演着至关重要的角色。无论是在个人电脑、移动设备还是智能电视等平台上,用户都期望…...

AI Agent与Agentic AI:原理、应用、挑战与未来展望

文章目录 一、引言二、AI Agent与Agentic AI的兴起2.1 技术契机与生态成熟2.2 Agent的定义与特征2.3 Agent的发展历程 三、AI Agent的核心技术栈解密3.1 感知模块代码示例&#xff1a;使用Python和OpenCV进行图像识别 3.2 认知与决策模块代码示例&#xff1a;使用OpenAI GPT-3进…...

3.3.1_1 检错编码(奇偶校验码)

从这节课开始&#xff0c;我们会探讨数据链路层的差错控制功能&#xff0c;差错控制功能的主要目标是要发现并且解决一个帧内部的位错误&#xff0c;我们需要使用特殊的编码技术去发现帧内部的位错误&#xff0c;当我们发现位错误之后&#xff0c;通常来说有两种解决方案。第一…...

基于 TAPD 进行项目管理

起因 自己写了个小工具&#xff0c;仓库用的Github。之前在用markdown进行需求管理&#xff0c;现在随着功能的增加&#xff0c;感觉有点难以管理了&#xff0c;所以用TAPD这个工具进行需求、Bug管理。 操作流程 注册 TAPD&#xff0c;需要提供一个企业名新建一个项目&#…...

人机融合智能 | “人智交互”跨学科新领域

本文系统地提出基于“以人为中心AI(HCAI)”理念的人-人工智能交互(人智交互)这一跨学科新领域及框架,定义人智交互领域的理念、基本理论和关键问题、方法、开发流程和参与团队等,阐述提出人智交互新领域的意义。然后,提出人智交互研究的三种新范式取向以及它们的意义。最后,总结…...

Proxmox Mail Gateway安装指南:从零开始配置高效邮件过滤系统

&#x1f49d;&#x1f49d;&#x1f49d;欢迎莅临我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐&#xff1a;「storms…...

用鸿蒙HarmonyOS5实现中国象棋小游戏的过程

下面是一个基于鸿蒙OS (HarmonyOS) 的中国象棋小游戏的实现代码。这个实现使用Java语言和鸿蒙的Ability框架。 1. 项目结构 /src/main/java/com/example/chinesechess/├── MainAbilitySlice.java // 主界面逻辑├── ChessView.java // 游戏视图和逻辑├──…...

Spring AOP代理对象生成原理

代理对象生成的关键类是【AnnotationAwareAspectJAutoProxyCreator】&#xff0c;这个类继承了【BeanPostProcessor】是一个后置处理器 在bean对象生命周期中初始化时执行【org.springframework.beans.factory.config.BeanPostProcessor#postProcessAfterInitialization】方法时…...

【iOS】 Block再学习

iOS Block再学习 文章目录 iOS Block再学习前言Block的三种类型__ NSGlobalBlock____ NSMallocBlock____ NSStackBlock__小结 Block底层分析Block的结构捕获自由变量捕获全局(静态)变量捕获静态变量__block修饰符forwarding指针 Block的copy时机block作为函数返回值将block赋给…...