当前位置: 首页 > news >正文

使用SSE协议进行服务端向客户端主动发送消息

1.创建一个SSE配置类:

1.1代码如下:
package com.campus.platform.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.AsyncSupportConfigurer;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import java.util.concurrent.TimeUnit;@Configuration
public class SseConfig implements WebMvcConfigurer {@Overridepublic void configureAsyncSupport(AsyncSupportConfigurer configurer) {// 设置异步请求超时时间configurer.setDefaultTimeout(TimeUnit.MINUTES.toMillis(30));}
}
1.2 创建一个SSE事件类:
package com.campus.platform.model;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class SseEmitterData {private String id;private String event;private String data;private Long retry;
}
1.3 创建SSE服务接口:
package com.campus.platform.service;import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;public interface SseService {/*** 创建SSE连接*/SseEmitter createSseEmitter(String clientId);/*** 发送消息给指定客户端*/void sendMessage(String clientId, String message);/*** 发送消息给所有客户端*/void sendMessageToAll(String message);/*** 移除指定客户端连接*/void removeClient(String clientId);
}

1.4实现SSE服务:

package com.campus.platform.service.impl;import com.campus.platform.model.SseEmitterData;
import com.campus.platform.service.SseService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;@Slf4j
@Service
public class SseServiceImpl implements SseService {// 用于存储客户端连接private static final Map<String, SseEmitter> SSE_CACHE = new ConcurrentHashMap<>();// 心跳间隔(30秒)private static final long HEARTBEAT_INTERVAL = 30_000;@Overridepublic SseEmitter createSseEmitter(String clientId) {// 设置超时时间为30分钟SseEmitter emitter = new SseEmitter(30 * 60 * 1000L);// 注册回调emitter.onCompletion(() -> {log.info("SSE连接完成, clientId: {}", clientId);removeClient(clientId);});emitter.onTimeout(() -> {log.info("SSE连接超时, clientId: {}", clientId);removeClient(clientId);});emitter.onError(throwable -> {log.error("SSE连接异常, clientId: {}", clientId, throwable);removeClient(clientId);});// 缓存连接SSE_CACHE.put(clientId, emitter);// 发送连接成功消息try {SseEmitterData data = new SseEmitterData();data.setId(clientId);data.setEvent("connected");data.setData("连接成功");data.setRetry(HEARTBEAT_INTERVAL);emitter.send(data, MediaType.APPLICATION_JSON);// 启动心跳线程startHeartbeat(clientId);} catch (IOException e) {log.error("发送连接消息失败", e);removeClient(clientId);}return emitter;}@Overridepublic void sendMessage(String clientId, String message) {SseEmitter emitter = SSE_CACHE.get(clientId);if (emitter != null) {try {SseEmitterData data = new SseEmitterData();data.setId(clientId);data.setEvent("message");data.setData(message);emitter.send(data, MediaType.APPLICATION_JSON);} catch (IOException e) {log.error("发送消息失败, clientId: {}", clientId, e);removeClient(clientId);}}}@Overridepublic void sendMessageToAll(String message) {SSE_CACHE.forEach((clientId, emitter) -> {try {SseEmitterData data = new SseEmitterData();data.setId(clientId);data.setEvent("message");data.setData(message);emitter.send(data, MediaType.APPLICATION_JSON);} catch (IOException e) {log.error("发送消息失败, clientId: {}", clientId, e);removeClient(clientId);}});}@Overridepublic void removeClient(String clientId) {SseEmitter emitter = SSE_CACHE.remove(clientId);if (emitter != null) {try {emitter.complete();} catch (Exception e) {log.error("关闭SSE连接失败", e);}}}/*** 启动心跳线程*/private void startHeartbeat(String clientId) {Thread heartbeatThread = new Thread(() -> {while (SSE_CACHE.containsKey(clientId)) {try {Thread.sleep(HEARTBEAT_INTERVAL);SseEmitter emitter = SSE_CACHE.get(clientId);if (emitter != null) {SseEmitterData data = new SseEmitterData();data.setId(clientId);data.setEvent("heartbeat");data.setData("ping");emitter.send(data, MediaType.APPLICATION_JSON);}} catch (Exception e) {log.error("发送心跳失败", e);removeClient(clientId);break;}}});heartbeatThread.setDaemon(true);heartbeatThread.start();}
}
1.5创建SSE控制器:
package com.campus.platform.controller;import com.campus.platform.service.SseService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;@Slf4j
@Api(tags = "SSE长连接接口")
@RestController
@RequestMapping("/api/sse")
public class SseController {@Autowiredprivate SseService sseService;@ApiOperation("建立SSE连接")@GetMapping(value = "/connect/{clientId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter connect(@ApiParam("客户端ID") @PathVariable String clientId) {log.info("客户端[{}]建立SSE连接", clientId);return sseService.createSseEmitter(clientId);}@ApiOperation("发送消息给指定客户端")@PostMapping("/send/{clientId}")public void sendMessage(@ApiParam("客户端ID") @PathVariable String clientId,@ApiParam("消息内容") @RequestParam String message) {log.info("发送消息给客户端[{}]: {}", clientId, message);sseService.sendMessage(clientId, message);}@ApiOperation("发送消息给所有客户端")@PostMapping("/sendAll")public void sendMessageToAll(@ApiParam("消息内容") @RequestParam String message) {log.info("发送消息给所有客户端: {}", message);sseService.sendMessageToAll(message);}@ApiOperation("断开指定客户端连接")@DeleteMapping("/disconnect/{clientId}")public void disconnect(@ApiParam("客户端ID") @PathVariable String clientId) {log.info("断开客户端[{}]的SSE连接", clientId);sseService.removeClient(clientId);}
}
1.6前端JavaScript示例代码:
// 建立SSE连接
function connectSSE(clientId) {const eventSource = new EventSource(`/api/sse/connect/${clientId}`);// 连接打开eventSource.onopen = function() {console.log('SSE连接已建立');};// 接收消息eventSource.onmessage = function(event) {const data = JSON.parse(event.data);console.log('收到消息:', data);// 处理不同类型的事件switch(data.event) {case 'connected':console.log('连接成功');break;case 'message':console.log('收到业务消息:', data.data);break;case 'heartbeat':console.log('收到心跳');break;}};// 连接错误eventSource.onerror = function(error) {console.error('SSE连接错误:', error);eventSource.close();// 可以在这里实现重连逻辑setTimeout(() => {connectSSE(clientId);}, 5000);};return eventSource;
}// 关闭SSE连接
function closeSSE(eventSource) {if (eventSource) {eventSource.close();}
}
1.7 总结
使用说明:
客户端通过调用/api/sse/connect/{clientId}建立SSE连接
服务端可以通过以下方式发送消息:
/api/sse/send/{clientId} 发送消息给指定客户端
/api/sse/sendAll 发送消息给所有客户端
客户端可以通过/api/sse/disconnect/{clientId}主动断开连接
服务端会每30秒发送一次心跳消息,如果发送失败会自动断开连接
如果连接断开,客户端可以实现自动重连逻辑
主要特点:
使用ConcurrentHashMap存储客户端连接,保证线程安全
实现了心跳机制,保持连接活跃
支持向单个或所有客户端推送消息
实现了完整的异常处理和资源释放
支持自定义消息格式和事件类型
这样就实现了一个基于SSE的长连接服务,可以用于实时消息推送、实时通知等场景。
SSE (Server-Sent Events) 主要适用于以下场景:
实时数据推送
股票/基金价格实时更新
体育比赛实时比分
天气数据实时更新
交易数据实时同步
系统监控和告警
服务器性能监控
系统资源使用率实时展示
异常事件实时告警
日志实时查看
社交媒体功能
实时消息通知
新评论/点赞提醒
在线状态更新
好友动态实时推送
协同办公场景
文档实时同步
多人协作状态更新
任务进度实时通知
会议提醒
物联网应用
设备状态实时监控
传感器数据实时展示
智能家居控制反馈
车联网数据实时更新
游戏应用
游戏状态实时同步
排行榜实时更新
对战结果即时通知
在线玩家状态更新
SSE的特点和优势:
1. 单向通信
服务器到客户端的单向数据流
适合数据推送场景
不适合需要客户端频繁发送数据的场景
简单易用
基于HTTP协议
无需额外协议
开发成本低
浏览器原生支持
3. 自动重连
断线自动重连
无需手动处理重连逻辑
提高连接可靠性
实时性好
数据实时推送
延迟低
适合实时性要求不是特别高的场景
资源占用少
相比WebSocket更轻量
服务器压力小
适合大规模连接
跨域支持好
基于HTTP
支持标准的跨域方案
兼容性好
不适用的场景:
1. 双向通信频繁的场景
即时聊天
在线游戏
视频会议
这些场景建议使用WebSocket
低延迟要求的场景
高频交易
实时竞价
这些场景可能需要专用协议
大文件传输
文件上传下载
视频流传输
这些场景建议使用专门的传输协议
客户端写操作频繁的场景
需要客户端频繁发送数据的应用
这种场景使用SSE会增加HTTP请求数量
使用SSE时的注意事项:

相关文章:

使用SSE协议进行服务端向客户端主动发送消息

1.创建一个SSE配置类: 1.1代码如下&#xff1a;package com.campus.platform.config;import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.servlet.config.annotation.AsyncS…...

FastAPI 高并发与性能优化

FastAPI 高并发与性能优化 目录 &#x1f680; 高并发应用设计原则&#x1f9d1;‍&#x1f4bb; 异步 I/O 优化 Web 服务响应速度⏳ 在 FastAPI 中优化异步任务执行顺序&#x1f512; 高并发中的共享资源与线程安全问题 1. &#x1f680; 高并发应用设计原则 在构建高并发应…...

DFS+回溯+剪枝(深度优先搜索)——搜索算法

目录 一、递归 1.什么是递归&#xff1f; 2.什么时候使用递归&#xff1f; 3.如何理解递归&#xff1f; 4.如何写好递归&#xff1f; 二、记忆化搜索&#xff08;记忆递归&#xff09; 三、回溯 四、剪枝 五、综合试题 1.N皇后 2.解数独 DFS也就是深度优先搜索&am…...

在cursor/vscode中使用godot C#进行游戏开发

要在 Visual Studio Code(VS Code)中启动 C#Godot 项目&#xff0c;可以按照以下步骤进行配置&#xff1a; 1.安装必要的工具 • 安装 Visual Studio Code&#xff1a;确保你已经安装了最新版本的 VS Code。 • 安装.NET SDK&#xff1a;下载并安装.NET 7.x SDK&#xff08;…...

vant4 van-list组件的使用

<van-listv-if"joblist && joblist.length > 0"v-model:loading"loading":finished"finished":immediate-check"false"finished-text"没有更多了"load"onLoad">// 加载 const loading ref(fals…...

介绍 Liquibase、Flyway、Talend 和 Apache NiFi:选择适合的工具

在现代软件开发中&#xff0c;尤其是在数据库管理和数据集成方面&#xff0c;选择合适的工具至关重要。本文将介绍四个流行的工具&#xff1a;Liquibase、Flyway、Talend 和 Apache NiFi&#xff0c;分析它们的应用、依赖以及如何选择适合的工具。 1. Liquibase 简介&#xff…...

攻防世界33 catcat-new【文件包含/flask_session伪造】

题目&#xff1a; 点击一只猫猫&#xff1a; 看这个url像是文件包含漏洞&#xff0c;试试 dirsearch扫出来/admin&#xff0c;访问也没成功&#xff08;--delay 0.1 -t 5&#xff09; 会的那几招全用不了了哈哈&#xff0c;那就继续看答案 先总结几个知识点 1./etc/passwd&am…...

Git -> Git配置密钥对,并查看公钥

Git密钥对的核心作用 私钥 (id_rsa) 你的数字身份证&#xff1a;存放在本机 ~/.ssh 目录下必须严格保密&#xff08;类似银行卡密码&#xff09;&#xff0c;不可泄露或共享用于 解密 来自服务器的加密信息 公钥 (id_rsa.pub) 可公开的验证锁&#xff1a;需要上传到 Git 服…...

淘宝订单列表Fragment转场动画卡顿解决方案

如何应对产品形态与产品节奏相对确定情况下转变为『在业务需求与产品形态高度不确定性的情况下&#xff0c;如何实现业务交付时间与交付质量的确定性』。我们希望通过混合架构&#xff08;Native 业务容器 Weex 2.0&#xff09;作为未来交易终端架构的重要演进方向&#xff0c…...

【ESP32指向鼠标】——icm20948与esp32通信

【ESP32指向鼠标】——icm20948与esp32通信 ICM-20948介绍 ICM-20948 是一款由 InvenSense&#xff08;现为 TDK 的一部分&#xff09;生产的 9 轴传感器集成电路。它结合了 陀螺仪、加速度计和磁力计。 内置了 DMP&#xff08;Digital Motion Processor&#xff09;即负责执…...

Xcode证书密钥导入

证书干嘛用 渠道定期会给xcode证书&#xff0c;用来给ios打包用&#xff0c;证书里面有记录哪些设备可以打包进去。 怎么换证书 先更新密钥 在钥匙串访问中&#xff0c;选择系统。(选登录也行&#xff0c;反正两个都要导入就是了)。 mac中双击所有 .p12 后缀的密钥&#xff…...

Ubuntu安装PgSQL17

参考官网教程&#xff0c;Ubuntu24 apt在线安装Postgres 17 1. 要手动配置 Apt 存储库 # 导入存储库签名密钥&#xff1a; sudo apt install curl ca-certificates sudo install -d /usr/share/postgresql-common/pgdg sudo curl -o /usr/share/postgresql-common/pgdg/apt…...

K8S容器启动提示:0/2 nodes are available: 2 Insufficient cpu.

问题&#xff1a;K8S的容器启动报错0/2 nodes are available: 2 Insufficient cpu. 原因&#xff1a;Pod的资源请求&#xff08;requests&#xff09;设置不当&#xff1a;在Kubernetes中&#xff0c;调度器根据Pod的requests字段来决定哪个节点可以运行该Pod。如果一个Pod声明…...

LabVIEW外腔二极管激光器稳频实验

本项目利用LabVIEW软件开发了一个用于外腔二极管激光器稳频实验的系统。系统能够实现激光器频率的稳定控制和实时监测&#xff0c;为激光实验提供了重要支持。 项目背景&#xff1a; 系统解决了外腔二极管激光器频率不稳定的问题&#xff0c;以满足对激光器频率稳定性要求较高…...

笔记6——字典dict(dictionary)

文章目录 字典dict(dictionary)定义特点常用操作1.访问值2.添加键值对3.修改值4.删除键值对5.遍历字典6.合并字典 性能应用场景dict和list的区别 字典dict(dictionary) 以 键 - 值对 (key - value pairs)的形式存储数据 定义 字典使用花括号 {} 来定义&#xff0c;键和值之…...

【MySQL】InnoDB单表访问方法

目录 1、背景2、环境3、访问类型【1】const【2】ref【3】ref_or_null【4】range【5】index【6】all 4、总结 1、背景 mysql通过查询条件查询到结果的过程就叫访问方法&#xff0c;一条查询语句的访问方法有很多种&#xff0c;接下来我们就来讲一下各种访问方法。 2、环境 创…...

APP端网络测试与弱网模拟!

当前APP网络环境比较复杂&#xff0c;网络制式有2G、3G、4G网络&#xff0c;还有越来越多的公共Wi-Fi。不同的网络环境和网络制式的差异&#xff0c;都会对用户使用app造成一定影响。另外&#xff0c;当前app使用场景多变&#xff0c;如进地铁、上公交、进电梯等&#xff0c;使…...

【个人开发】deepseed+Llama-factory 本地数据多卡Lora微调

文章目录 1.背景2.微调方式2.1 关键环境版本信息2.2 步骤2.2.1 下载llama-factory2.2.2 准备数据集2.2.3 微调模式2.2.4 微调脚本 2.3 踩坑经验2.3.1 问题一&#xff1a;ValueError: Undefined dataset xxxx in dataset_info.json.2.3.2 问题二&#xff1a; ValueError: Target…...

Redis7.0八种数据结构底层原理

导读 本文介绍redis应用数据结构与物理存储结构,共八种应用数据结构和 一. 内部数据结构 1. sds sds是redis自己设计的字符串结构有以下特点: jemalloc内存管理预分配冗余空间二进制安全(c原生使用\0作为结尾标识,所以无法直接存储\0)动态计数类型(根据字符串长度动态选择…...

Kafka 高吞吐量的底层技术原理

Kafka 之所以能够实现高吞吐量&#xff08;每秒百万级消息处理&#xff09;&#xff0c;主要依赖于其底层设计和多项优化技术。以下是 Kafka 实现高吞吐量的关键技术原理&#xff1a; 1. 顺序读写磁盘 Kafka 利用磁盘的顺序读写特性&#xff0c;避免了随机读写的性能瓶颈。 顺…...

web vue 项目 Docker化部署

Web 项目 Docker 化部署详细教程 目录 Web 项目 Docker 化部署概述Dockerfile 详解 构建阶段生产阶段 构建和运行 Docker 镜像 1. Web 项目 Docker 化部署概述 Docker 化部署的主要步骤分为以下几个阶段&#xff1a; 构建阶段&#xff08;Build Stage&#xff09;&#xff1a…...

RocketMQ延迟消息机制

两种延迟消息 RocketMQ中提供了两种延迟消息机制 指定固定的延迟级别 通过在Message中设定一个MessageDelayLevel参数&#xff0c;对应18个预设的延迟级别指定时间点的延迟级别 通过在Message中设定一个DeliverTimeMS指定一个Long类型表示的具体时间点。到了时间点后&#xf…...

学校招生小程序源码介绍

基于ThinkPHPFastAdminUniApp开发的学校招生小程序源码&#xff0c;专为学校招生场景量身打造&#xff0c;功能实用且操作便捷。 从技术架构来看&#xff0c;ThinkPHP提供稳定可靠的后台服务&#xff0c;FastAdmin加速开发流程&#xff0c;UniApp则保障小程序在多端有良好的兼…...

渲染学进阶内容——模型

最近在写模组的时候发现渲染器里面离不开模型的定义,在渲染的第二篇文章中简单的讲解了一下关于模型部分的内容,其实不管是方块还是方块实体,都离不开模型的内容 🧱 一、CubeListBuilder 功能解析 CubeListBuilder 是 Minecraft Java 版模型系统的核心构建器,用于动态创…...

Cinnamon修改面板小工具图标

Cinnamon开始菜单-CSDN博客 设置模块都是做好的&#xff0c;比GNOME简单得多&#xff01; 在 applet.js 里增加 const Settings imports.ui.settings;this.settings new Settings.AppletSettings(this, HTYMenusonichy, instance_id); this.settings.bind(menu-icon, menu…...

【Go】3、Go语言进阶与依赖管理

前言 本系列文章参考自稀土掘金上的 【字节内部课】公开课&#xff0c;做自我学习总结整理。 Go语言并发编程 Go语言原生支持并发编程&#xff0c;它的核心机制是 Goroutine 协程、Channel 通道&#xff0c;并基于CSP&#xff08;Communicating Sequential Processes&#xff0…...

.Net Framework 4/C# 关键字(非常用,持续更新...)

一、is 关键字 is 关键字用于检查对象是否于给定类型兼容,如果兼容将返回 true,如果不兼容则返回 false,在进行类型转换前,可以先使用 is 关键字判断对象是否与指定类型兼容,如果兼容才进行转换,这样的转换是安全的。 例如有:首先创建一个字符串对象,然后将字符串对象隐…...

有限自动机到正规文法转换器v1.0

1 项目简介 这是一个功能强大的有限自动机&#xff08;Finite Automaton, FA&#xff09;到正规文法&#xff08;Regular Grammar&#xff09;转换器&#xff0c;它配备了一个直观且完整的图形用户界面&#xff0c;使用户能够轻松地进行操作和观察。该程序基于编译原理中的经典…...

2023赣州旅游投资集团

单选题 1.“不登高山&#xff0c;不知天之高也&#xff1b;不临深溪&#xff0c;不知地之厚也。”这句话说明_____。 A、人的意识具有创造性 B、人的认识是独立于实践之外的 C、实践在认识过程中具有决定作用 D、人的一切知识都是从直接经验中获得的 参考答案: C 本题解…...

以光量子为例,详解量子获取方式

光量子技术获取量子比特可在室温下进行。该方式有望通过与名为硅光子学&#xff08;silicon photonics&#xff09;的光波导&#xff08;optical waveguide&#xff09;芯片制造技术和光纤等光通信技术相结合来实现量子计算机。量子力学中&#xff0c;光既是波又是粒子。光子本…...