当前位置: 首页 > article >正文

Spring WebFlux 中 WebSocket 使用 DataBuffer 的注意事项

以下是修改后的完整文档,包含在多个多线程环境中使用 retain()release() 方法的示例,且确保在 finally 块中调用 release()


在 Spring WebFlux 中,WebSocketMessage 主要用于表示 WebSocket 的消息载体,其中 getPayload() 方法返回 DataBuffer,用于处理二进制数据流。在使用 DataBuffer 时,需要注意其一次性读取特性,以及潜在的内存管理问题。本文将介绍如何正确使用 DataBuffer,避免重复读取和内存泄漏。

1. 避免重复读取 DataBuffer

DataBuffer 设计为一次性读取流数据,因此,一旦被消费,后续读取将无法获取数据。例如:

String firstRead = webSocketMessage.getPayload().toString(StandardCharsets.UTF_8);
String secondRead = webSocketMessage.getPayload().toString(StandardCharsets.UTF_8); // 此处读取会失败

解决方案

如果需要多次使用 DataBuffer 的数据,可以在第一次读取时缓存:

DataBuffer dataBuffer = webSocketMessage.getPayload();
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
String payload = new String(bytes, StandardCharsets.UTF_8);

这样,后续可以安全地使用 payload 变量,而不会影响 DataBuffer


2. 避免阻塞操作

Spring WebFlux 是基于响应式编程的,WebSocket 处理也应保持非阻塞。如果在 DataBuffer 处理中引入了阻塞操作(如同步 I/O 或 Thread.sleep()),可能会导致 Reactor 线程阻塞,影响整体吞吐量。

解决方案

使用 Flux/Mono 进行异步处理,例如:

session.receive().map(WebSocketMessage::getPayloadAsText)  // 避免直接操作 DataBuffer.flatMap(payload -> processMessage(payload)).subscribe();

3. 处理 DataBuffer 可能带来的内存泄漏

Spring WebFlux 采用 Netty 作为默认底层引擎,而 Netty 的 ByteBuf 需要手动释放,否则可能导致内存泄漏。Spring 提供了 DataBufferUtils.release() 方法来避免 DataBuffer 占用资源不被回收。

正确的释放方式

session.receive().doOnNext(message -> {try {String data = message.getPayloadAsText();System.out.println("Received: " + data);} finally {DataBufferUtils.release(message.getPayload());}}).subscribe();

DataBufferUtils.release() 仅在手动管理 DataBuffer 生命周期时才需要,如果直接通过 WebSocketMessage.getPayloadAsText() 处理字符串,不必显式释放。


4. 在 Flux/Mono 组合操作时避免数据丢失

如果 DataBuffermap() 操作多次消费,可能导致数据丢失或 DataBuffer 为空。例如:

session.receive().map(message -> {DataBuffer payload = message.getPayload();DataBufferUtils.release(payload); // 这里释放后,后续的 map() 操作会读取不到数据return payload;}).map(buffer -> buffer.toString(StandardCharsets.UTF_8)) // 这里可能会失败.subscribe();

正确的方式

  • 确保 DataBuffer 只在最终消费时释放。
  • 处理 DataBuffer 时,转换为 byte[] 以避免流式数据的重复读取。
session.receive().map(WebSocketMessage::getPayload).map(dataBuffer -> {byte[] bytes = new byte[dataBuffer.readableByteCount()];dataBuffer.read(bytes);DataBufferUtils.release(dataBuffer);  // 读取完毕后释放return new String(bytes, StandardCharsets.UTF_8);}).subscribe(System.out::println);

5. retain()release() 方法的补充

Spring WebFlux 中,WebSocketMessage 还提供了 retain()release() 方法,用于管理 DataBuffer 的引用计数和释放资源。下面介绍如何在多线程环境中正确使用这些方法。

retain() 方法

retain() 方法确保 DataBuffer 的引用计数增加,以便在需要时能够安全使用:

public WebSocketMessage retain() {if (reactorNetty2Present) {return ReactorNetty2Helper.retain(this);}DataBufferUtils.retain(this.payload);return this;
}

retain() 方法会增加 DataBuffer 的引用计数,防止在处理过程中被提前释放。这对于需要多个组件共享同一 DataBuffer 实例的情况非常重要。

release() 方法

release() 方法用于释放 DataBuffer,减少引用计数,释放底层资源,防止内存泄漏:

public void release() {DataBufferUtils.release(this.payload);
}

release() 方法通常在处理完成后调用,确保底层的 DataBuffer 被正确释放。

使用示例:在多线程环境中使用 retain() 和 release()

在 WebSocket 消息处理时,确保在多线程环境中正确管理 DataBuffer 的生命周期。示例如下,使用 retain() 保证资源被正确引用,并在 finally 块中调用 release() 确保即使出现异常时也会释放资源:

session.receive().doOnNext(message -> {// 在多线程环境中保留引用message.retain();try {String data = message.getPayloadAsText();System.out.println("Received: " + data);// 模拟处理过程,可能会涉及多线程操作// 例如:通过某个线程池处理消息processMessageAsync(data);} finally {// 确保释放资源message.release();  // 释放资源}}).subscribe();

在上面的示例中,retain() 确保了 DataBuffer 在多个线程中可以安全访问,直到最终的 release() 被调用来释放资源。无论操作成功与否,finally 块中的 release() 都会被执行,确保不会发生内存泄漏。


6. 总结

在 Spring WebFlux 中使用 WebSocketMessageDataBuffer 需要注意以下几点:

  1. 避免重复读取 DataBuffer,建议在读取后缓存数据。
  2. 避免阻塞操作,尽量使用 Flux/Mono 进行异步处理。
  3. 防止内存泄漏,在手动管理 DataBuffer 生命周期时使用 DataBufferUtils.release() 释放资源。
  4. 确保 DataBuffer 只在最终消费时释放,避免 Flux 流程中数据丢失。
  5. 使用 retain()release() 方法 来管理 DataBuffer 的引用计数,确保资源的正确释放,特别是在多线程环境中,确保在 finally 中释放资源。

通过遵循这些实践,可以有效地管理 WebSocket 消息的内存使用,并提高应用的性能和可靠性。


相关文章:

Spring WebFlux 中 WebSocket 使用 DataBuffer 的注意事项

以下是修改后的完整文档,包含在多个多线程环境中使用 retain() 和 release() 方法的示例,且确保在 finally 块中调用 release(): 在 Spring WebFlux 中,WebSocketMessage 主要用于表示 WebSocket 的消息载体,其中 getP…...

Android ChatOn-v1.66.536-598-[构建于ChatGPT和GPT-4o之上]

ChatOn 链接:https://pan.xunlei.com/s/VOKYnq-i3C83CK-HJ1gfLf4gA1?pwdwzwc# 添加了最大无限积分 删除了所有调试信息 语言:全语言支持...

游戏树搜索与优化策略:Alpha-Beta剪枝及其实例分析

1.Alpha-Beta搜索 Alpha-Beta 搜索是一种用于对抗性游戏(比如象棋、围棋)的智能算法,目的是帮助计算机快速找到“最优走法”,同时避免不必要的计算。它的核心思想是:通过剪掉明显糟糕的分支,大幅减少需要计…...

基于Qwen-VL的手机智能体开发

先上Demo: vl_agent_demo 代码如下: 0 设置工作目录: 你的工作目录需要如下: 其中utils文件夹和qwenvl_agent.py均参考自 GitHub - QwenLM/Qwen2.5-VL: Qwen2.5-VL is the multimodal large language model series developed by …...

记录一次Spring事务失效导致的生产问题

一、背景介绍 公司做的是“聚合支付”业务,对接了微信、和包、数字人民币等等多家支付机构,我们提供统一的支付、退款、自动扣款签约、解约等能力给全国的省公司、机构、商户等。 同时,需要做对账功能,即支付机构将对账文件给到…...

深度学习实战:用TensorFlow构建高效CNN的完整指南

一、为什么每个开发者都要掌握CNN? 在自动驾驶汽车识别路标的0.1秒里,在医疗AI诊断肺部CT片的精准分析中,甚至在手机相册自动分类宠物的日常场景里,卷积神经网络(CNN)正悄然改变着我们的世界。本文将以工业…...

算法 之 贪心思维训练!

文章目录 从最大/最小开始贪心2279.装满石头的背包的最大数量2971.找到最大周长的多边形 从最左、最右开始贪心2712.使所有字符相等的最小成本 划分型贪心1221.分割平衡字符串 贪心策略在处理一些题目的时候能够带来意想不到的效果 从最小/最大开始贪心,优先考虑最小…...

从0到1构建AI深度学习视频分析系统--基于YOLO 目标检测的动作序列检查系统:(1)视频信息的获取与转发

文章大纲 基于YOLO的动作序列检查系统架构设计系统架构图实时视频传输协议技术对比视频流 常见协议对比表三、WebSocket内网传输设计方案四、样例程序(Python + JavaScript)五、性能优化建议新兴技术预警参考文献提示词参考基于YOLO的动作序列检查系统架构设计 系统架构图 #…...

大语言模型学习--LangChain

LangChain基本概念 ReAct学习资料 https://zhuanlan.zhihu.com/p/660951271 LangChain官网地址 Introduction | 🦜️🔗 LangChain LangChain是一个基于语言模型开发应用程序的框架。它可以实现以下应用程序: 数据感知:将语言模型…...

【PCIe 总线及设备入门学习专栏 4.5 -- PCIe 中断 MSI 与 MSI-X 机制介绍】

文章目录 PCI 设备中断机制PCIe 设备中断机制PCIe MSI 中断机制MSI CapabilityMSI-X 中断机制MSI-X capabilityMSI-X TablePBAMSI-X capability 解析MSI/MSI-X 操作流程扫描设备配置设备MSI 配置MSI-X 配置中断触发与处理PCI 设备中断机制 以前的PCI 设备是支持 物理上的 INTA…...

wxWidgets GUI 跨平台 入门学习笔记

准备 参考 https://wiki.wxwidgets.org/Microsoft_Visual_C_NuGethttps://wiki.wxwidgets.org/Tools#Rapid_Application_Development_.2F_GUI_Buildershttps://docs.wxwidgets.org/3.2/https://docs.wxwidgets.org/latest/overview_helloworld.htmlhttps://wizardforcel.gitb…...

valgrind 检测多线程 bug,检测 并发 bug concurrent bug parallel bug

valgrind --toolhelgrind ./your_program 如果检测的对象是大型程序,可以设定仅在某些函数中开启 valgrind 的检测: Valgrind 提供了一些客户请求(client requests),可以在代码中插入特定的宏来控制 Valgrind 的行为。…...

OpenMCU(一):STM32F407 FreeRTOS移植

概述 本文主要描述了STM32F407移植FreeRTOS的简要步骤。移植描述过程中,忽略了Keil软件的部分使用技巧。默认读者熟练使用Keil软件。本文的描述是基于OpenMCU_FreeRTOS这个工程,该工程已经下载放好了移植stm32f407 FreeRTOS的所有文件 OpenMCU_FreeRTOS工…...

割平面法的理解

割平面法的理解 1. 简介 割平面法(Cutting Plane Method)用于求解整数规划问题,通过逐步添加线性约束(割平面)逼近整数解。本文以Gomory割平面法为例,结合简单示例拆解核心步骤。 2. 示例详解 问题描述 …...

[自动驾驶-传感器融合] 多激光雷达的外参标定

文章目录 引言外参标定原理ICP匹配示例参考文献 引言 多激光雷达系统通常用于自动驾驶或机器人,每个雷达的位置和姿态不同,需要将它们的数据统一到同一个坐标系下。多激光雷达外参标定的核心目标是通过计算不同雷达坐标系之间的刚性变换关系&#xff08…...

C++ 学习(八)(模板,可变参数模板,模板专业化(完整模板专业化,部分模板专业化),类型 Traits,SFINAE(替换失败不是错误),)

C 模板 C 中的模板是一项强大的功能,允许您编写通用代码,这意味着您可以编写可以处理不同数据类型的单个函数或类。这意味着您无需为要使用的每种数据类型编写单独的函数或类。 模板函数 要创建模板函数,请使用 关键字,后跟类型…...

AI---DevOps常备工具(‌AI-Integrated DevOps Essential Tools)

AI---DevOps常备工具 技术领域正在迅速发展,随着我们步入 2025 年,有一点是明确的:人工智能(AI)不再只是一个流行词,它是每个 DevOps 工程师都需要掌握的工具。随着云环境的复杂性增加、对更快部署的需求以…...

题目梳理2025[长期更新]

题目梳理 组合类题目(2025年3月5日) 组合总数1,组合总数2,组合总数3 -> 递归回溯的思想 组合总数4 -> 爬楼的思想,动态规划,确定递归边界,确定递归入口,最后一步怎么走的思想...

Maven 中 SNAPSHOT 版本与 RELEASE 版本的区别

Maven 仓库分为 Snapshot 快照仓库和 Release 发行仓库两种类型的仓库。Snapshot 快照仓库用于保存 SNAPSHOT 版本,Release 发行仓库用于保存 RELEASE 版本。 SNAPSHOT 是一种特殊的版本标识,主要用于表示项目的不稳定、正在开发中的版本,而…...

JavaScript 知识点整理

1. 什么是AST?它在前端有哪些应用场景? AST Abstract Syntax Tree抽象语法树,用于表达源码的树形结构 应用: Babel:一个广泛使用的 JS 编译器,将ES6 或 JSX 等现代语法转换为兼容性较好的 ES5 代码。Esl…...

迷你世界脚本出生点接口:Spawnport

出生点接口:Spawnport 彼得兔 更新时间: 2023-04-26 10:19:56 具体函数名及描述如下: 序号 函数名 函数描述 1 getSpawnPoint(...) 获取默认出生点 2 setSpawnPoint(...) 设置出生点位置 3 getChunkValidSpawnPos(...) 获取区块有效刷新点…...

鸿蒙与DeepSeek深度整合:构建下一代智能操作系统生态

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站。 https://www.captainbed.cn/north 目录 技术融合背景与价值鸿蒙分布式架构解析DeepSeek技术体系剖析核心整合架构设计智能调度系统实现…...

利用行波展开法测量横观各向同性生物组织的生物力学特性|文献速递-医学影像人工智能进展

Title 题目 Measurement of biomechanical properties of transversely isotropic biological tissue using traveling wave expansion 利用行波展开法测量横观各向同性生物组织的生物力学特性 01 文献速递介绍 纤维嵌入结构在自然界中普遍存在。从脑白质(罗曼…...

AR配置静态IP双链路负载分担示例

AR配置静态IP双链路负载分担示例 适用于大部分企业网络出口 业务需求: 运营商1分配的接口IP为100.100.1.2,子网掩码为255.255.255.252,网关IP为100.100.1.1。 运营商2分配的接口IP为200.200.1.2,子网掩码为255.255.255.248&am…...

文件操作(详细讲解)(1/2)

你好这里是我说风俗,希望各位客官点点赞,收收藏,关关注,各位对我的支持是我持续更新的动力!!!!第二期会马上更的关注我获得最新消息哦!!!&#xf…...

[AI]从零开始的so-vits-svc歌声推理及混音教程

一、前言 在之前的教程中已经为大家讲解了如何安装so-vits-svc以及使用现有的模型进行文本转语音。可能有的小伙伴就要问了,那么我们应该怎么使用so-vits-svc来进行角色歌曲的创作呢?其实歌曲的创作会相对麻烦一些,会使用到好几个软件&#x…...

华为OD机试-停车场最大距离(Java 2024 E卷 100分)

题目描述 停车场有一排车位,用 0 表示空位,1 表示已停车。至少有一辆车停在车位上,也至少有一个空位。为了防剐蹭,需要找到一个空位,使得该空位与最近的车辆之间的距离最大。返回这个最大距离。 输入描述 一个用半角逗号分隔的停车标识字符串,停车标识为 0 或 1,0 表示…...

SpringMVC控制器定义:@Controller注解详解

文章目录 引言一、Controller注解基础二、RequestMapping与请求映射三、参数绑定与数据校验四、RestController与RESTful API五、控制器建议与全局处理六、控制器测试策略总结 引言 在SpringMVC框架中,控制器(Controller)是整个Web应用的核心组件,负责处…...

免费分享一个软件SKUA-GOCAD-2022版本

若有需要,可以下载。 下载地址 通过网盘分享的文件:Paradigm SKUA-GOCAD 22 build 2022.06.20 (x64).rar 链接: https://pan.baidu.com/s/10plenNcMDftzq3V-ClWpBg 提取码: tm3b 安装教程 Paradigm SKUA-GOCAD 2022版本v2022.06.20安装和破解教程-CS…...

学习threejs,使用LineBasicMaterial基础线材质

👨‍⚕️ 主页: gis分享者 👨‍⚕️ 感谢各位大佬 点赞👍 收藏⭐ 留言📝 加关注✅! 👨‍⚕️ 收录于专栏:threejs gis工程师 文章目录 一、🍀前言1.1 ☘️THREE.LineBasicMaterial1.…...