SpringCloud源码探析(十)-Web消息推送
1.概述
消息推送在日常使用中的场景比较多,比如有人点赞了我的博客或者关注了我,这时我就会收到一条推送消息,以此来吸引我点击或者打开应用。消息推送的方式主要分为两种:web消息推送和移动端消息推送。它将所要发送的信息,发送至用户当前访问的网页或者移动设备。本文主要分析在web端进行消息推送的几种方式,实现用户在web端接收推送消息。
2.消息推送几种方式
web消息推送的方式主要分为两种:一种是主动向服务端请求(简单理解为客户端pull消息)、一种是服务端推送(简单理解为服务端push消息)。两种方式各有利弊:主动向服务端请求会按照一定周期不断去请求服务器,如果客户端数量庞大会对服务端造成极大压力,并且数据具有一定延时性;服务端推送实时性较好,但服务端需要存储客户端会话信息,如果客户端数量较多,服务端查询对应会话压力较大。
2.1 Pull消息
Pull消息主要是客户端发起的操作,定时向服务端进行轮询获取消息,轮询可分为短轮询和长轮询。
短轮询:指定时间间隔,由应用浏览器发送http请求,服务器实时返回消息至客户端,浏览器进行展示;短轮询在前端一般通过JS定时器定时发送请求来实现;
长轮询:是对短轮询的一种优化,客户端发起请求,服务器不会立即返回请求结果,而是将请求挂起等待一段时间,如果此时间段内数据变更,立即响应客户端请求,若是一直无变化则等到指定的超时时间响应请求,客户端重新发起长连接;长轮询在nacos、Kafka、RocketMQ队列中使用较多。
2.2 Push消息
服务端向客户端推送,在一定程度上能节约一部分资源,常用的方式有WebSocket、SSE等,还有一些通过中间件RabbitMQ来实现等。本文主要介绍利用SSE方式和WebSocket方式来推送消息,具体如下:
2.2.1 SSE
SSE(Server-sent events)是一种用于实现服务器向客户端实时推送数据的Web技术。与传统的短轮询和长轮询相比,SSE提供了更高效和实时的数据推送机制。SSE基于HTTP协议,允许服务器将数据以事件流(Event Stream)的形式发送给客户端。客户端通过建立持久的HTTP连接,并监听事件流,可以实时接收服务器推送的数据。SSE的主要特点包括:
简单易用:SSE使用基于文本的数据格式,如纯文本、JSON等,使得数据的发送和解析都相对简单;
单向通信:SSE支持服务器向客户端的单向通信,服务器可以主动推送数据给客户端,而客户端只能接收数据;
实时性:SSE建立长时间的连接,使得服务器可以实时地将数据推送给客户端,而无需客户端频繁地发起请求。
SSE的整体实现思路如下,它的原理其实类似于在线视频播放,视频流会连续不断的推送到浏览器,图如下:

其实可以简单地理解为它是一种单向实时通信技术,一旦客户端与服务端建立连接,只能接收服务端信息,不能向服务端发送信息,且拥有自动重连机制,客户端与服务端断开会进行自动重连,websocket断开不能自动重连,这是SSE优于websocket的地方。
Springboot使用SSE功能发送信息代码如下,由于springboot内嵌sse模块,因此不需要引入额外包:
package com.eckey.lab.controller;import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** @Author: Marin* @CreateTime: 2023-10-08 14:29* @Description: TODO* @Version: 1.0*/
@Slf4j
@RestController
@CrossOrigin //此注解是为了解决测试过程中的跨域问题
@RequestMapping("/sse")
public class SSEController {/*** 使用Map对象来存放userId和对应的会话*/private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();/*** @description: 浏览器端注册,将会话信息存入Map,这种方式会导致一个userId只能与服务器建立一个会话,生产环境慎用这种方式* @author: Marin* @date: 2023/10/9 16:51* @param: [userId]* @return: org.springframework.web.servlet.mvc.method.annotation.SseEmitter**/@GetMapping(path = "/subscribe/{userId}", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})public SseEmitter subscribe(@PathVariable("userId") String userId) throws IOException {// 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutExceptionSseEmitter sseEmitter = new SseEmitter(30_000L);// 设置前端的重试时间为15s,如果不加这个发送一下,前端就不会显示连接成功sseEmitter.send("连接成功");// 注册回调sseEmitter.onCompletion(() -> {log.info("连接结束:{}", userId);});sseEmitter.onError((Throwable throwable) -> {log.error("连接异常:{}", throwable.getMessage());});sseEmitter.onTimeout(() -> {log.warn("连接超时:{}", userId);});//以userId为key,如果一个用户多个设备连接,会不准确sseEmitterMap.put(userId, sseEmitter);log.info("创建新的sse连接,当前用户:{}", userId);return sseEmitter;}/*** @description: 向指定用户发送指定信息* @author: Marin* @date: 2023/10/9 16:53* @param: [userId, message]* @return: void**/@GetMapping(path = "/sendMessage")public void sendMessage(String userId, String message) {if (sseEmitterMap.containsKey(userId)) {try {log.info("向用户:{},发送消息:{}", userId, message);sseEmitterMap.get(userId).send(message, MediaType.APPLICATION_JSON);} catch (IOException e) {log.error("用户[{}]推送异常:{}", userId, e.getMessage());removeUser(userId);}}}/*** @description: 移除用户* @author: Marin* @date: 2023/10/9 16:53* @param: [userId]* @return: void**/private void removeUser(String userId) {sseEmitterMap.remove(userId);log.info("移除用户成功:{}", userId);}/*** @description: 删除与指定用户会话* @author: Marin* @date: 2023/10/9 16:54* @param: [userId]* @return: void**/@GetMapping("/close/{userId}")public void close(@PathVariable("userId") String userId) {removeUser(userId);log.info("关闭连接成功:{}", userId);}
}
前端测试代码如下:
<!DOCTYPE html>
<html lang="en"><head><meta charset="UTF-8"><title>SseEmitter</title>
</head><body>
<button onclick="closeSse()">关闭连接</button>
<div id="message"></div>
</body>
<script>let source = null;// 用时间戳模拟登录用户const userId = new Date().getTime();if (window.EventSource) {// 建立连接source = new EventSource('http://127.0.0.1:9090/sse/subscribe/' + userId);setMessageInnerHTML("连接用户=" + userId);source.addEventListener('open', function(e) {setMessageInnerHTML("建立连接。。。");}, false);source.addEventListener('message', function(e) {setMessageInnerHTML(e.data);});source.addEventListener('error', function(e) {if (e.readyState === EventSource.CLOSED) {setMessageInnerHTML("连接关闭");} else if (e.target.readyState === EventSource.CONNECTING) { console.log('Connecting...');}else {console.log(e);}}, false);} else {setMessageInnerHTML("你的浏览器不支持SSE");}// 监听窗口关闭事件,主动去关闭sse连接,如果服务端设置永不过期,浏览器关闭后手动清理服务端数据window.onbeforeunload = function() {closeSse();};// 关闭Sse连接function closeSse() {source.close();const httpRequest = new XMLHttpRequest();httpRequest.open('GET', 'http://127.0.0.1:9090/sse/close/'+ userId, true);httpRequest.send();console.log("close");}// 将消息显示在网页上function setMessageInnerHTML(innerHTML) {document.getElementById('message').innerHTML += innerHTML + '<br/>';}
</script></html>
打开前端页面,会出现连接信息,如下所示:

调用信息发送接口,跟据用户id发送指定消息,如下:

发送成功后前端接收并显示在页面上,如下:

2.2.2 WebSocket
WebSocket是一种用于实现实时双向通信的Web技术,它使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。它与SSE在某些方面有所不同。下面是SSE和WebSocket之间的比较:
数据推送方向:SSE是服务器向客户端的单向通信,服务器可以主动推送数据给客户端。而WebSocket是双向通信,允许服务器和客户端之间进行实时的双向数据交换;
连接建立:SSE使用基于HTTP的长连接,通过普通的HTTP请求和响应来建立连接,从而实现数据的实时推送。WebSocket使用自定义的协议,通过建立WebSocket连接来实现双向通信;
兼容性:由于SSE基于HTTP协议,它可以在大多数现代浏览器中使用,并且不需要额外的协议升级。WebSocket在绝大多数现代浏览器中也得到了支持,但在某些特殊的网络环境下可能会遇到问题;
适用场景:SSE适用于服务器向客户端实时推送数据的场景,如股票价格更新、新闻实时推送等。WebSocket适用于需要实时双向通信的场景,如聊天应用、多人协同编辑等。
WebSocket原理图如下所示:

Springboot整合websocket如下:
1.引入pom
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
2.编写socket配置
package com.eckey.lab.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.web.servlet.ServletContextInitializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;import javax.servlet.ServletContext;
import javax.servlet.ServletException;/*** @Author: Marin* @CreateTime: 2023-10-08 16:26* @Description: TODO* @Version: 1.0*/
@Slf4j
@Configuration
public class WebSocketConfig implements ServletContextInitializer {/*** 这个bean的注册,用于扫描带有@ServerEndpoint的注解成为websocket,如果你使用外置的tomcat就不需要该配置文件*/@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}@Overridepublic void onStartup(ServletContext servletContext) throws ServletException {String serverInfo = servletContext.getServerInfo();log.info("serverInfo:{}", serverInfo);}}
3.编写SocketServer代码
package com.eckey.lab.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;/*** @Author: Marin* @CreateTime: 2023-10-08 15:49* @Description: TODO* @Version: 1.0*/
@Component
@Slf4j
@ServerEndpoint("/websocket/{userId}")
public class WebSocketServer {//与某个客户端的连接会话,需要通过它来给客户端发送数据private Session session;private static final CopyOnWriteArraySet<WebSocketServer> webSockets = new CopyOnWriteArraySet<>();// 用来存在线连接数private static final Map<String, Session> sessionPool = new HashMap<String, Session>();/*** 链接成功调用的方法*/@OnOpenpublic void onOpen(Session session, @PathParam(value = "userId") String userId) {try {this.session = session;webSockets.add(this);sessionPool.put(userId, session);log.info("websocket消息: 有新的连接,总数为:" + webSockets.size());} catch (Exception e) {log.error("");}}/*** 收到客户端消息后调用的方法*/@OnMessagepublic void onMessage(String message) {log.info("websocket消息: 收到客户端消息:" + message);}/*** 此为单点消息*/public void sendOneMessage(String userId, String message) {Session session = sessionPool.get(userId);if (session != null && session.isOpen()) {try {log.info("websocket发送单点消息:" + message);session.getAsyncRemote().sendText(message);} catch (Exception e) {e.printStackTrace();}}}
}
4.编写controller代码
package com.eckey.lab.controller;import com.alibaba.fastjson.JSON;
import com.eckey.lab.config.WebSocketServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;import java.io.IOException;
import java.util.HashMap;/*** @Author: Marin* @CreateTime: 2023-10-08 15:24* @Description: TODO* @Version: 1.0*/
@Slf4j
@RestController
@CrossOrigin
@RequestMapping("/socket")
public class WebSocketController {@Autowiredprivate WebSocketServer webSocketServer;@GetMapping(path = "/publish/{userId}")public String publish(@PathVariable("userId") String userId, String message) throws IOException {webSocketServer.sendOneMessage(userId, message);log.info("信息发送成功!userId:{},message:{}", userId, message);HashMap maps = new HashMap();maps.put("code", "0");maps.put("msg", "success");return JSON.toJSONString(maps);}}
5.测试
客户端发送消息,服务接收到消息,具体如下:


调用服务端接口发送消息,客户端接收到消息,具体如下:


3.小结
1.SSE方式是一种基于TCP协议的单向数据传输方式,当连接建立完成,只能由服务端向客户端发送信息;
2.WebSocket是一种双向通信技术,能够实现客户端和服务端的全双工通信,它在建立连接时使用HTTP协议,其它时候都是直接基于TCP协议进行通信;
3.在选择SSE或者WebSocket时,需要跟据场景、性能损耗进行综合考虑,合理的技术选型能够有效增强服务的健壮性。
4.参考文献
1.https://zhuanlan.zhihu.com/p/634581294
2.https://juejin.cn/post/7122014462181113887
3.https://javaguide.cn/system-design/web-real-time-message-push.html
5.附录
https://gitee.com/Marinc/nacos.git
相关文章:
SpringCloud源码探析(十)-Web消息推送
1.概述 消息推送在日常使用中的场景比较多,比如有人点赞了我的博客或者关注了我,这时我就会收到一条推送消息,以此来吸引我点击或者打开应用。消息推送的方式主要分为两种:web消息推送和移动端消息推送。它将所要发送的信息&…...
Vue、React和小程序中的组件通信:父传子和子传父
在前端开发中,组件化是一种常见的开发模式,它可以将复杂的用户界面拆分成多个可重用的组件。在Vue、React和小程序中,组件之间的数据和事件传递是非常关键的,其中父传子和子传父是常见的通信方式。本文将介绍在Vue、React和小程序…...
安卓玩机----展讯芯片机型解锁 读写分区工具 操作步骤解析
国内机型大都使用高通和MTK芯片。展讯芯片使用的较少。相对来说高通和mtk机型解锁以及读取分区工具较多。展讯的几乎没有。目前有大佬开发出了一款展讯芯片解锁 与读写分区工具.开源的tools 官方分享说明: 是一款专为 Windows 计算机设计的免费、用户友好的工具&am…...
微软放大招!Bing支持DALL-E3,免费AI绘画等你来体验!
最近 OpenAI 发布了DALL-E3模型,出图效果和Midjourney不相上下,不过要使用它有些门槛,必须是 ChatGPT Plus 账户,而且还要排队,怎么等都等不到,搞得大家都比较焦虑。 不过现在微软在Bing上也支持 DALL-E3 …...
tp5访问的时候必须加index.php,TP5配置隐藏入口index.php文件
PS:这里说的入口文件指的是public/index.php,配置文件就在这个目录下 可以去掉URL地址里面的入口文件index.php,但是需要额外配置WEB服务器的重写规则。 以Apache为例,需要在入口文件的同级添加.htaccess文件(官方默认自带了该文件)&#x…...
16k面试中的10个问题
你好,我是田哥 节前,有位朋友跟我反馈面试中一些问题,这位朋友的基本情况: 坐标:上海,年限:3年不到,期望薪资;16k 下面我们来看看具体问题: 01:请…...
STM32单片机入门学习(六)-光敏传感器控制LED
光敏传感器模块和LED接线 LED负极接B12,正极接VCC 光敏传感模块一DO端接B13,GND接GND,VCC接VCC,AO不接。 如图: 主程序代码:main.c #include "stm32f10x.h" #include "Delay.h" //delay函数所在头文件 #include …...
MFC 鼠标悬停提示框
MFC 鼠标悬停提示框 运行效果 在MFC窗口中添加一个控件 工具栏中拖拽List Box到MFC窗口给List Box添加变量 CListBox m_listbox 增加成员变量 CWnd* m_tip_parent_wnd; CToolTipCtrl m_tip;给m_listbox创建提示框 void create_tip_window(CWnd* tip_wnd, CToolTipCtrl* ti…...
大数据学习,涉及哪些技术?
学习大数据需要涉及多种技术和概念,因为大数据领域非常广泛,涵盖了数据的采集、存储、处理、分析和可视化等多个方面。以下是学习大数据时需要考虑的一些关键技术和概念: 1、数据采集和存储: 数据库管理系统(DBMS&am…...
Clion中使用C/C++开发stm32程序
前言 从刚开始学习阶段,一直是用的keil5开发stm32程序,自从看到稚晖君推荐的CLion开发嵌入式程序后,这次尝试在CLion上开发stm32程序。 1、配置CLion用于STM32开发的环境 这里我就不详细写了,没必要重新写,网上教程很多…...
JavaScript Web APIs第五天笔记
Web APIs - 第5天笔记 目标: 能够利用JS操作浏览器,具备利用本地存储实现学生就业表的能力 BOM操作综合案例 js组成 JavaScript的组成 ECMAScript: 规定了js基础语法核心知识。比如:变量、分支语句、循环语句、对象等等 Web APIs : DOM 文档对象模型&…...
[ICCV-23] Paper List - 3D Generation-related
ICCV-23 paper list 目录 Oral Papers 3D from multi-view and sensors Generative AI Poster Papers 3D Generation (Neural generative models) 3D from a single image and shape-from-x 3D Editing Face and gestures Stylization Dataset Oral Papers 3D from …...
Transformer为什么如此有效 | 通用建模能力,并行
目录 1 更强更通用的建模能力 2 并行计算 3 大规模训练数据 4 多训练技巧的集成 Transformer是一种基于自注意力机制的网络,在最近一两年年可谓是大放异彩,我23年入坑CV的时候,我看到的CV工作似乎还没有一个不用到Transformer里的一些组…...
【初识Jmeter】【接口自动化】
jmeter的使用笔记1 Jmeter介绍与下载安装介绍安装配置配置与扩展组件 jmeter的使用基本功能元素登陆请求与提取cookie其他请求接口关联Cookie-响应成功聚合报告查看 Jmeter介绍与下载安装 介绍 jmeter是apache公司基于java开发的一款开源压力测试工具,体积小&…...
C:数组传值调用和传地址调用
传地址调用 对数组进行修改:排序… #include <stdio.h>// 函数用于交换两个整数的值 void swap(int *a, int *b) {int temp *a;*a *b;*b temp; }// 函数用于对整数数组进行升序排序 void sortArray(int *arr, int size) {for (int i 0; i < size - 1…...
Python数据容器——字典的常用操作(增、删、改、查)
作者:Insist-- 个人主页:insist--个人主页 本文专栏:Python专栏 专栏介绍:本专栏为免费专栏,并且会持续更新python基础知识,欢迎各位订阅关注. 目录 一、理解字典 1. Python字典是什么? 2. 字…...
JavaScript入门——(5)函数
1、为什么需要函数 函数:function,是被设计为执行特定任务的代码块 说明:函数可以把具有相同或相似逻辑的代码“包裹”起来,通过函数调用执行这些被“包裹”的代码逻辑,有利于精简代码方便复用。 比如之前使用的ale…...
数据库sql查询成绩第二高
select * from propro; #查询成绩第二高 select max(id) from propro where id <(select max(id) from propro); #查询成绩第二高的第二种方式 select * from (select * from propro order by id desc limit 2) as b order by id asc limit 1;...
十五、异常(5)
本章概要 异常限制构造器 异常限制 当覆盖方法的时候,只能抛出在基类方法的异常说明里列出的那些异常。这个限制很有用,因为这意味着与基类一起工作的代码,也能和导出类一起正常工作(这是面向对象的基本概念)&#…...
途虎养车上市、京东养车“震虎”,如何突围汽车后市场?
“汽车后市场第一股”终于来了! 赶在十一黄金周之前,途虎养车股份有限公司(09690.HK,下称“途虎养车”)于9月26日挂牌港交所,开盘价为28港元/股,与发行价持平;IPO首日报收29.50港元/股,涨幅5.3…...
Rustup工具链管理深度解析:多版本Rust环境实战指南
Rustup工具链管理深度解析:多版本Rust环境实战指南 【免费下载链接】rustup The Rust toolchain installer 项目地址: https://gitcode.com/gh_mirrors/ru/rustup Rustup作为Rust语言的官方工具链管理器,为开发者提供了稳定、测试版和夜间版多版本…...
DAMOYOLO-S边缘端部署指南:STM32F103C8T6嵌入式平台推理优化
DAMOYOLO-S边缘端部署指南:STM32F103C8T6嵌入式平台推理优化 1. 引言 如果你正在为一个资源极其有限的嵌入式设备寻找一个能跑起来的目标检测方案,比如用一块小小的STM32F103C8T6开发板,那么这篇文章就是为你准备的。你可能已经尝试过一些经…...
SPI Flash时序参数详解:如何用Synopsys VIP验证Micron芯片的HOLD时序
SPI Flash时序验证实战:Synopsys VIP在Micron芯片HOLD时序分析中的应用 当硬件验证工程师面对SPI Flash芯片时,时序参数的精确验证往往是项目成败的关键。Micron作为主流存储芯片供应商,其SPI Flash产品广泛应用于嵌入式系统和FPGA设计中&…...
Chandra OCR真实测评:对比GPT-4o,开源OCR模型表现如何
Chandra OCR真实测评:对比GPT-4o,开源OCR模型表现如何 最近在整理一堆扫描版的实验报告和学术论文,里面混杂着复杂的表格、手写注释和数学公式,真是让人头疼。传统的OCR工具,比如Tesseract,处理这种文档就…...
RTX4090D大模型推理专用镜像体验:Qwen-Image预装环境,一键启动图文对话
RTX4090D大模型推理专用镜像体验:Qwen-Image预装环境,一键启动图文对话 1. 镜像概述与核心优势 1.1 为什么选择专用镜像 在本地部署大语言模型时,环境配置往往是最耗时的环节。以Qwen-VL这样的视觉语言模型为例,需要处理CUDA版…...
手把手教你部署VibeVoice:基于Python的实时TTS系统,300ms超低延迟体验
手把手教你部署VibeVoice:基于Python的实时TTS系统,300ms超低延迟体验 你有没有遇到过这样的场景:开发一个智能助手,用户问完问题,屏幕上的文字回复瞬间就出来了,但语音却要等上好几秒才开始播放ÿ…...
从草图到文档:我用这5个Miro/PlantUML模板,高效搞定团队架构设计评审
从草图到文档:5个高效架构设计模板与团队协作实战指南 在敏捷开发环境中,架构设计往往陷入两难困境——既要快速响应需求变化,又要保证设计文档的准确性与可维护性。Tech Lead们经常面临这样的场景:在白板前与团队激情讨论出的架构…...
3步快速修复Netgear路由器变砖的终极解决方案
3步快速修复Netgear路由器变砖的终极解决方案 【免费下载链接】nmrpflash Netgear Unbrick Utility 项目地址: https://gitcode.com/gh_mirrors/nmr/nmrpflash 路由器变砖是许多网络设备用户最头疼的问题之一,特别是当固件升级失败或意外断电导致设备无法启动…...
AI辅助web开发新体验:让快马智能生成实时Markdown编辑器应用
今天想和大家分享一个特别实用的开发体验——用AI辅助快速构建一个实时Markdown编辑器。作为一个经常需要写技术文档的开发者,我一直希望能有个简洁高效的编辑器工具,这次尝试用InsCode(快马)平台的AI能力来实现这个需求,整个过程出乎意料的顺…...
零代码实现YouTube视频翻译:Hugging Face大语言模型实战教程
零代码实现YouTube视频翻译:Hugging Face大语言模型实战教程 在全球化内容消费的今天,语言障碍成为许多人获取知识的隐形门槛。想象一下,当你发现一个精彩的英文技术讲座视频,却因为语言问题无法充分理解;或是需要将中…...
