Spring Boot WebFlux 中 WebSocket 生命周期解析
Spring Boot WebFlux 中的 WebSocket 提供了一种高效、异步的方式来处理客户端与服务器之间的双向通信。WebSocket 连接的生命周期包括连接建立、消息传输、连接关闭以及资源清理等过程。此外,为了确保 WebSocket 连接的稳定性和可靠性,我们可以加入重试机制,以处理断开或网络问题时自动重新连接。
1. WebSocket 连接建立
WebSocket 的连接是通过 HTTP 的 Upgrade 机制从普通的 HTTP/HTTPS 请求升级而来的。具体流程如下:
1.1 客户端请求 WebSocket 连接
客户端通过 ws:// 或 wss:// 协议来访问 WebSocket 服务器,并发送 HTTP Upgrade 请求头,要求服务器将连接升级为 WebSocket 协议:
GET /ws HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: random-generated-key
Sec-WebSocket-Version: 13
1.2 服务器端处理 WebSocket 连接
Spring WebFlux 通过 WebSocketHandler 来处理 WebSocket 请求。以下是一个简单的 WebSocketHandler 实现:
@Component
public class MyWebSocketHandler implements WebSocketHandler {@Overridepublic Mono<Void> handle(WebSocketSession session) {return session.receive().doOnNext(message -> System.out.println("Received: " + message.getPayloadAsText())).then();}
}
当服务器收到 HTTP Upgrade 请求后,它会检查 Sec-WebSocket-Key 并返回 Sec-WebSocket-Accept 进行握手,建立连接。
1.3 握手成功,连接建立
如果握手成功,服务器会返回 101 Switching Protocols 响应,表示 WebSocket 连接已建立:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: (calculated key)
2. WebSocket 消息处理
连接建立后,WebSocket 进入消息传输阶段,包括消息的接收和发送。
2.1 消息接收
服务器端可以通过 WebSocketSession.receive() 方法来接收客户端发送的消息:
session.receive().map(WebSocketMessage::getPayloadAsText).doOnNext(msg -> System.out.println("Received: " + msg)).then();
session.receive() 返回一个 Flux<WebSocketMessage>,可以处理流式消息,每次接收到新消息时执行 doOnNext() 中的处理逻辑。
2.2 消息发送
服务器端可以通过 WebSocketSession.send() 方法发送消息给客户端:
Flux<String> messages = Flux.interval(Duration.ofSeconds(1)).map(i -> "Message " + i);
return session.send(messages.map(session::textMessage));
send() 方法接收一个 Publisher<WebSocketMessage>,可以使用 Flux 来生成消息流。textMessage() 方法用于创建文本消息。
3. WebSocket 连接关闭
WebSocket 连接可以由客户端、服务器或网络异常等原因主动关闭。连接关闭的主要方式如下:
3.1 正常关闭
- 客户端主动关闭:客户端可以通过调用
WebSocket.close()发送 Close Frame,服务器接收到后会关闭连接。 - 服务器主动关闭:服务器通过
WebSocketSession.close()关闭连接:session.close(CloseStatus.NORMAL);
3.2 异常关闭
- 网络异常:如网络断开或客户端崩溃等,连接会被强制关闭。
- 心跳超时:如果使用 ping/pong 机制检测 WebSocket 是否存活,超时未收到 pong 响应时,连接会关闭。
session.send(Flux.just(session.pingMessage(ByteBuffer.wrap(new byte[0]))));
3.3 连接关闭后的处理
服务器可以使用 session.receive().doOnTerminate() 监听连接关闭事件,执行清理操作:
session.receive().doOnTerminate(() -> System.out.println("WebSocket connection closed")).then();
4. WebSocket 生命周期总结
| 阶段 | 说明 |
|---|---|
| 连接建立 | 客户端发起 WebSocket 连接请求,服务器接受并返回 101 Switching Protocols 响应,连接建立。 |
| 消息传输 | 服务器和客户端可以双向传输文本或二进制消息。 |
| 连接关闭 | 连接可由客户端、服务器、网络异常等原因关闭。 |
| 资源清理 | 连接关闭后需要进行资源清理操作,如取消订阅、清理状态等。 |
5. 完整示例:WebFlux WebSocket 服务器
以下是一个完整的 WebSocket 服务器配置示例,展示了如何在 Spring Boot WebFlux 中配置 WebSocket:
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerMapping;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.Map;@Configuration
public class WebSocketConfig {@Beanpublic WebSocketHandler webSocketHandler() {return session -> {Flux<String> output = Flux.interval(Duration.ofSeconds(1)).map(time -> "Server time: " + time);return session.send(output.map(session::textMessage));};}@Beanpublic WebSocketHandlerMapping handlerMapping(WebSocketHandler handler) {return new WebSocketHandlerMapping(Map.of("/ws", handler));}@Beanpublic WebSocketHandlerAdapter handlerAdapter() {return new WebSocketHandlerAdapter();}
}
说明:
WebSocketHandler处理 WebSocket 连接,发送定时消息。WebSocketHandlerMapping将/ws端点映射到 WebSocketHandler。WebSocketHandlerAdapter用于适配 WebSocket 处理器。
6. 服务器端发起 WebSocket 连接
如果你希望服务器主动连接到其他 WebSocket 服务器,可以使用 WebSocketClient。Spring WebFlux 提供了 ReactorNettyWebSocketClient 来发起 WebSocket 连接。
6.1 示例:服务器端发起 WebSocket 连接
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient;
import reactor.core.publisher.Mono;
import java.net.URI;@Service
public class WebSocketClientService {private final ReactorNettyWebSocketClient client = new ReactorNettyWebSocketClient();public Mono<Void> connectToWebSocketServer() {return client.execute(URI.create("ws://example.com/socket"), session -> {Mono<Void> sendMessage = session.send(Mono.just(session.textMessage("Hello Server!")));session.receive().map(WebSocketMessage::getPayloadAsText).doOnNext(System.out::println).subscribe();return sendMessage;});}
}
6.2 在 Spring Boot 启动时自动连接
通过在 @PostConstruct 中调用连接方法,可以确保 WebSocket 客户端在 Spring Boot 启动时自动连接:
import jakarta.annotation.PostConstruct;
import org.springframework.stereotype.Component;@Component
public class WebSocketClientInitializer {private final WebSocketClientService webSocketClientService;public WebSocketClientInitializer(WebSocketClientService webSocketClientService) {this.webSocketClientService = webSocketClientService;}@PostConstructpublic void init() {webSocketClientService.connectToWebSocketServer().subscribe();}
}
7. WebSocket 连接重试机制
在 WebSocket 的生命周期中,由于网络问题或服务器错误,WebSocket 连接可能会中断。为了提高 WebSocket 连接的可靠性,我们可以为 WebSocket 客户端添加重试机制,以确保断开后能够重新连接。
7.1 使用 retry() 方法重试连接
WebFlux 提供了 retry() 方法来自动重试操作。以下是一个简单的重试机制示例:
import reactor.core.publisher.Mono;public class WebSocketClientService {private final ReactorNettyWebSocketClient client = new ReactorNettyWebSocketClient();public Mono<Void> connectToWebSocketServer() {return client.execute(URI.create("ws://example.com/socket"), session -> {Mono<Void> sendMessage = session.send(Mono.just(session.textMessage("Hello Server!")));session.receive().map(WebSocketMessage::getPayloadAsText).doOnNext(System.out::println).subscribe();return sendMessage;}).retry(5); // 最大重试5次}
}
在这个例子中,retry(5) 表示如果 WebSocket 连接失败,最多会重试 5 次。
7.2 使用 retryWhen() 实现自定义重试逻辑
我们还可以通过 retryWhen() 来实现更复杂的重试策略,例如设置重试间隔时间或根据错误类型决定是否重试:
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
import java.time.Duration;public class WebSocketClientService {private final ReactorNettyWebSocketClient client = new ReactorNettyWebSocketClient();public Mono<Void> connectToWebSocketServer() {return client.execute(URI.create("ws://example.com/socket"), session -> {Mono<Void> sendMessage = session.send(Mono.just(session.textMessage("Hello Server!")));session.receive().map(WebSocketMessage::getPayloadAsText).doOnNext(System.out::println).subscribe();return sendMessage;}).retryWhen(errors ->errors.zipWith(Flux.range(1, 5), (error, count) -> count) // 重试次数.flatMap(retryCount -> Mono.delay(Duration.ofSeconds(retryCount))) // 增加重试间隔);}
}
在这个例子中,retryWhen() 会根据错误进行自定义重试逻辑,设置每次重试间隔递增。
8. 连接关闭后的重试机制
为了确保连接在关闭后重新建立,我们可以监听连接关闭事件并尝试重试:
session.receive().doOnTerminate(() -> {System.out.println("WebSocket connection closed");reconnect(); // 重新连接}).then();private void reconnect() {connectToWebSocketServer().retry(3) // 重试3次.subscribe();
}
8.1 完整的客户端重试代码
public Mono<Void> connectWithRetry() {return client.execute(URI.create("ws://example.com/socket"), session -> {Mono<Void> sendMessage = session.send(Mono.just(session.textMessage("Hello Server!")));session.receive().map(WebSocketMessage::getPayloadAsText).doOnNext(System.out::println).doOnTerminate(() -> reconnect()) // 连接关闭后重试.subscribe();return sendMessage;}).retryWhen(errors ->errors.zipWith(Flux.range(1, 5), (error, count) -> count).flatMap(retryCount -> Mono.delay(Duration.ofSeconds(retryCount))));
}
9. 结论
Spring Boot WebFlux 中 WebSocket 的生命周期包括:
- 连接建立:通过 HTTP Upgrade 握手建立 WebSocket 连接。
- 消息收发:服务器和客户端之间通过
receive()和send()方法进行消息交换。 - 连接关闭:连接可以通过正常关闭、异常关闭或主动关闭的方式结束。
- 资源清理:连接关闭后需要进行资源清理操作,确保系统稳定。
- 重试机制:通过
retry()和retryWhen()方法为 WebSocket 连接添加自动重试机制,提高连接的可靠性。
通过 WebSocket,Spring Boot WebFlux 提供了高效的异步通信方式,特别适合用于实时数据流应用。
相关文章:
Spring Boot WebFlux 中 WebSocket 生命周期解析
Spring Boot WebFlux 中的 WebSocket 提供了一种高效、异步的方式来处理客户端与服务器之间的双向通信。WebSocket 连接的生命周期包括连接建立、消息传输、连接关闭以及资源清理等过程。此外,为了确保 WebSocket 连接的稳定性和可靠性,我们可以加入重试…...
PostgreSQL中的事务隔离
1. 事务隔离的概念 在数据库管理系统中,事务隔离是一项重要的功能,它能确保在并发访问数据库时事务之间能够独立运行,不会相互干扰。数据库系统通常支持不同级别的事务隔离,用来满足不同应用程序之间的需求。 2. 事务隔离的种类…...
基于Rye的Django项目通过Pyinstaller用Github工作流简单打包
前言 Rye的介绍和安装 Ryehttps://rye.astral.sh/Rye 完整使用教程_安装rye-CSDN博客https://blog.csdn.net/zhenndbc/article/details/144544692 正文 项目建立 配置好环境后 新建文件夹 新建文件夹,进入项目 初始化 rye init下载依赖 rye syncpycharm 打…...
ubuntu 20.04 C++ 源码编译 cuda版本 opencv4.5.0
前提条件是安装好了cuda和cudnn 点击下载: opencv_contrib4.5.0 opencv 4.5.0 解压重命名后 进入opencv目录,创建build目录 “CUDA_ARCH_BIN ?” 这里要根据显卡查询一下,我的cuda是11,显卡1650,所以是7.5 查询链接:…...
【VUE】第一期——初使用、基本语法
目录 0 前言 1 准备工作 1.1 创建vue实例 1.2 vue开发者工具 2 插值表达式 2.1 基本用法 3 常用指令 3.1 内容渲染指令 3.1.1 v-text 3.1.2 v-html 3.2 条件渲染指令 3.2.1 v-show 3.2.2 v-if 3.2.3 v-else 和 v-else-if 3.3 事件绑定指令 3.3.1 内联语句 3.3…...
计算光学成像与光学计算概论
计算光学成像所涉及研究的内容非常广泛,虽然计算光学成像的研究内容是发散的,但目的都是一致的:如何让相机记录到客观实物更丰富的信息,延伸并扩展人眼的视觉感知。总的来说,计算光学成像现阶段已经取得了很多令人振奋…...
开启科创服务新篇章:八月瓜科技CRM数字化管理系统成功上线
近日,北京八月瓜科技有限公司(以下简称 “八月瓜科技”)与纷享销客达成深度战略合作,成功部署并上线CRM数字化管理系统。此次合作是八月瓜科技在数字化转型进程中的重要里程碑,标志着其在科技创新服务领域的数字化变革…...
AI提示词(Prompt)的理解和学习指南
AI提示词(Prompt)的理解和学习指南 一、什么是AI提示词? AI提示词(Prompt)是用户输入给人工智能模型的指令或问题,用于引导模型生成特定类型的回答或内容。它如同与AI沟通的“钥匙”,设计得当…...
记录一些面试遇到的问题
重载和重写的区别 重载是overload,覆盖是override 重载属于编译时多态,覆盖属于运行时多态 运行时多态和编译时多态 运行时多态指的是在运行的时候才知道要调用哪一个函数,编译时多态是指在编译的时候就知道调用哪一个函数。 运行时多态…...
OpenHarmony4.0_Linux环境搭建
查看链接:OpenHarmony4.0_Linux环境搭建https://www.yuque.com/xinzaigeek/jishu/fs9msruqhd5nhw4i...
DeepSeek开源Day5:3FSsmallpond技术详解
2 月 24 日,DeepSeek 启动 “开源周”,第四个开源的代码库为 3FS&smallpond(又是一下发布了两个)。 3FS(Fire-Flyer File System)是 DeepSeek 内部开发的一款高性能分布式文件系统,旨在为 A…...
Java集合面试篇
目录 1.概念 1.1.数组与集合的区别,用过哪些? 1.2.说说Java中的集合? 1.3.Java中的线程安全的集合是什么? 1.4.集合遍历的方法有哪些? 2.List 2.1.list可以一边遍历一边修改元素吗? 2.2.Arraylist和…...
plt和cv2有不同的图像表示方式和颜色通道顺序
在处理图像时,matplotlib.pyplot (简称 plt) 和 OpenCV (简称 cv2) 有不同的图像表示方式和颜色通道顺序。了解这些区别对于正确处理和显示图像非常重要。 1. 图像形状和颜色通道顺序 matplotlib.pyplot (plt) 形状:plt 通常使用 (height, width, cha…...
Sqlserver安全篇之_手工创建TLS用到的pfx证书文件
Sqlserver官方提供的Windows Powershell脚本 https://learn.microsoft.com/zh-cn/sql/database-engine/configure-windows/configure-sql-server-encryption?viewsql-server-ver16 # Define parameters $certificateParams {Type "SSLServerAuthentication"Subje…...
基于RapidOCR与DeepSeek的智能表格转换技术实践
基于RapidOCR与DeepSeek的智能表格转换技术实践 一、技术背景与需求场景 在金融分析、数据报表处理等领域,存在大量图片格式的表格数据需要结构化处理。本文介绍基于开源RapidOCR表格识别与DeepSeek大模型的智能转换方案,实现以下典型场景: …...
创建阿里云CDN
创建阿里云CDN CDN域名管理 SSL证书上传...
tomcat的web管理
进入到conf cd /usr/local/tomcat/conf/备份tomcat-users.xml cp tomcat-users.xml{.,bak}编辑tomcat-users.xml vim tomcat-users.xml增加以下内容 配置tomcat-users.xml <role rolename"manager-gui"/><role rolename"admin-gui"/><use…...
【Linux系统】-----进程初相识:原理与概念全解析
Linux系列 文章目录 Linux系列前言一、进程的概念二、进程的管理三、Linux操作系统的进程管理3.1、进程标识符3.2、查看进程3.3、查看进程的PID和PPID 前言 经过前两篇文章的铺垫,我们对操作系统的管理方式已经有了比较完整的认识,今天我们将学习Linux比…...
分布式系统设计(架构能力)
一、微服务架构 服务治理 Nacos 注册中心(AP模式) CAP选择:Nacos 默认采用 AP 模式(可用性 分区容忍性),通过心跳检测实现服务健康管理。服务发现:客户端定时拉取服务列表,支持权重…...
171. Excel 表列序号
Excel 表列序号 题目描述尝试做法推荐做法 题目描述 给你一个字符串 columnTitle ,表示 Excel 表格中的列名称。返回 该列名称对应的列序号 。 例如: A -> 1 B -> 2 C -> 3 … Z -> 26 AA -> 27 AB -> 28 … 示例 1: 输入: colum…...
深度解析Kosmosaos:定制Linux系统镜像的构建、部署与自动化实践
1. 项目概述:一个面向未来的操作系统镜像最近在开源社区里,一个名为chasefort/kosmosaos的项目镜像引起了我的注意。乍一看这个名字,可能会觉得有些陌生,甚至有点“缝合”的感觉——它似乎融合了“Kosmos”和“AOS”的概念。但当你…...
通达信主力进场洗盘拉升出货副图指标公式源码
以下是指标365网整理的通达信主力进场洗盘拉升出货副图指标公式的源码:指标核心逻辑:1、紫色表示主力进场吸筹阶段;2、红色表示试盘洗盘阶段;3、黄色表示拉升阶段;4、绿色表示出货阶段;5、柱子长短表示各阶…...
3小时从零掌握yuzu:在PC上畅玩任天堂Switch游戏的完整指南
3小时从零掌握yuzu:在PC上畅玩任天堂Switch游戏的完整指南 【免费下载链接】yuzu 任天堂 Switch 模拟器 项目地址: https://gitcode.com/GitHub_Trending/yu/yuzu 想在Windows、Linux或Android设备上免费体验任天堂Switch游戏吗?yuzu模拟器正是你…...
别再写一堆CASE WHEN了!PostgreSQL里COALESCE和NULLIF这两个函数,帮你把SQL写得又短又稳
告别冗长SQL:用PostgreSQL的COALESCE和NULLIF重构条件逻辑 在数据处理的世界里,SQL就像是我们与数据库对话的语言。但你是否经常遇到这样的情况:为了处理各种空值和边界条件,你的SQL查询变成了一个由无数CASE WHEN语句组成的庞然大…...
心理学实验小白必看:用E-Prime 3.0从零设计你的第一个Stroop任务(附完整流程)
心理学实验入门:用E-Prime 3.0构建专业级Stroop实验全指南 第一次打开E-Prime时,满屏的控件和属性面板可能让你感到无从下手——这几乎是每个心理学研究生的必经之路。作为认知心理学最经典的实验范式之一,Stroop任务不仅能验证注意与自动加…...
初创团队如何利用Taotoken控制AI实验成本并快速迭代产品
🚀 告别海外账号与网络限制!稳定直连全球优质大模型,限时半价接入中。 👉 点击领取海量免费额度 初创团队如何利用Taotoken控制AI实验成本并快速迭代产品 对于资源有限的初创团队而言,在开发AI功能原型时,…...
情绪语音落地难?ElevenLabs新版本上线首周,92%开发者忽略的3个TTS情感对齐关键阈值,你踩雷了吗?
更多请点击: https://intelliparadigm.com 第一章:ElevenLabs正式情绪语音发布全景与行业意义 ElevenLabs 于2024年第三季度正式推出「Emotion Voice API」,标志着AI语音合成从“可听”迈向“可感”的关键跃迁。该能力支持在TTS输出中动态注…...
QT开发避坑指南:用setWindowFlags搞定自定义标题栏,别再为窗口移动发愁了
QT自定义标题栏实战:从事件重写到优雅封装的完整解决方案 当开发者决定为QT应用打造一套独特的视觉风格时,第一个拦路虎往往是系统默认标题栏的去除与自定义实现。这看似简单的需求背后,隐藏着窗口管理、事件处理、用户体验等一系列技术挑战。…...
Gemini3.1Pro评估ViT平移不变性:4周MVP路线图
利用 Gemini 3.1 Pro 评估视觉 Transformer 的平移不变性:从机制刻画、对照验证到门控降级与4周MVP路线图“平移不变性(Translation Invariance)”是视觉 Transformer(ViT 等)稳健性的核心指标之一:当图像在…...
从雷达、声呐到5G和Wi-Fi 7:波束成形技术的前世今生与应用实战
从雷达、声呐到5G和Wi-Fi 7:波束成形技术的前世今生与应用实战 想象一下,你正站在一个嘈杂的鸡尾酒会上,周围人声鼎沸,但你却能清晰地听到对面朋友的每一句话——这就像波束成形技术为现代通信系统带来的"超能力"。这项…...
