Spring WebFlux之流式输出
🎉🎉🎉🎉🎉🎉
欢迎访问的个人博客:https://swzbk.site/,加好友,拉你入福利群
🎉🎉🎉🎉🎉🎉
流式输出(Streaming Output)是指将数据分块逐步发送给客户端,而不是一次性发送所有数据。这种方式特别适合处理大文件、实时数据或需要逐步展示的场景(如deepseek响应、语音、视频、日志等)。在springboot中通过Spring WebFlux实现。
1. Flux是什么?
- 定义:Flux是一个异步数据流处理库,用于生成、操作和消费数据流(类似集合,但支持异步和非阻塞操作)。
- 核心特点:
- 背压(Backpressure):消费者可以控制生产者的速度,避免数据过载。
- 函数式编程:通过链式操作(如
map、filter、flatMap)处理数据流。 - 异步非阻塞:基于Reactor Netty实现高性能I/O,适合高并发场景。
2. Flux在后端Java中的作用
(1)处理异步与高并发
- 场景:微服务通信、实时数据处理(如消息队列、日志监控)、长连接(WebSocket)。
- 优势:通过异步非阻塞方式减少线程资源消耗,提升系统吞吐量。
(2)响应式Web开发
- 与Spring WebFlux结合:构建响应式REST API,支持HTTP/2和Server-Sent Events(SSE)。
- 示例代码:
(每秒推送一个事件给客户端)@GetMapping("/events") public Flux<Event> getEvents() {return Flux.interval(Duration.ofSeconds(1)).map(sequence -> new Event("Event " + sequence)); }
(3)背压管理
- 避免内存溢出:当消费者处理速度慢于生产者时,Flux通过背压机制暂停生产,确保系统稳定。
(4)数据流操作
- 丰富的操作符:支持过滤、转换、合并、重试等复杂逻辑。
Flux.just(1, 2, 3).map(n -> n * 2).filter(n -> n % 3 == 0).subscribe(System.out::println); // 输出:6
3. 为什么选择Flux?
- 与Spring生态集成:无缝衔接Spring Boot、Spring Cloud,适合企业级应用。
- 轻量高效:相比传统阻塞IO,资源占用更少,适合云原生环境。
- 对比RxJava:Flux是Spring官方推荐的响应式库,更注重与Java生态的兼容性。
4. 案例实现
在 Java 中使用 OkHttp3 发送请求,并通过 Project Reactor 的 Flux 获取实时响应,通常适用于处理流式数据,比如服务器发送的实时更新或者大型数据块的逐步传输。以下为你详细介绍实现步骤并给出示例代码。
实现思路
- 引入依赖:需要在项目中引入 OkHttp3 和 Project Reactor 的依赖。
- 发送请求:使用 OkHttp3 发送 HTTP 请求。
- 处理响应:将 OkHttp3 的响应流转换为
Flux进行响应式处理。
4.1 添加依赖
如果你使用的是 Maven 项目,在 pom.xml 中添加以下依赖:
<dependencies><!-- OkHttp3 --><dependency><groupId>com.squareup.okhttp3</groupId><artifactId>okhttp</artifactId><version>4.9.3</version></dependency><!--1: webflux 会默认引入下面的单独reactor-core--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency><!-- 2:Project Reactor Core、单独引入 --><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId><version>3.4.16</version></dependency>
</dependencies>
4.2 编写代码
你提供的代码片段是结合了 OkHttp3 发起异步请求和 Project Reactor 的 Flux 来处理响应结果。下面为你完善这个示例,并详细解释代码逻辑。
示例代码
import okhttp3.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;import java.io.IOException;public class OkHttpFluxExample {private static final OkHttpClient client = new OkHttpClient();public static Flux<String> makeApiCall(Request apiRequest) {return Flux.create(emitter -> {try {// 发起异步请求client.newCall(apiRequest).enqueue(new Callback() {@Overridepublic void onFailure(Call call, IOException e) {// 处理请求失败的情况System.err.println("请求 API 失败: " + e.getMessage());emitter.error(e);}@Overridepublic void onResponse(Call call, Response response) throws IOException {try (ResponseBody responseBody = response.body()) {if (response.isSuccessful() && responseBody != null) {// 假设响应是逐行文本数据,逐行发布到 Flux 中String[] lines = responseBody.string().split("\n");for (String line : lines) {emitter.next(line);}// 数据发布完成,发送完成信号emitter.complete();} else {// 处理响应不成功的情况String errorMessage = "请求 API 失败,响应码: " + response.code();System.err.println(errorMessage);emitter.error(new IOException(errorMessage));}}}});} catch (Exception e) {// 处理请求过程中发生的异常System.err.println("请求 API 时发生异常: " + e.getMessage());emitter.error(e);}});}public static void main(String[] args) {// 构建请求Request apiRequest = new Request.Builder().url("https://example.com/api") // 替换为实际的 API 地址.build();// 调用 makeApiCall 方法获取 FluxFlux<String> responseFlux = makeApiCall(apiRequest);// 订阅 Flux 并处理响应数据responseFlux.subscribe(// 处理每个接收到的元素line -> System.out.println("Received: " + line),// 处理错误error -> System.err.println("Error: " + error.getMessage()),// 处理完成信号() -> System.out.println("API 请求处理完成"));}
}
代码解释
makeApiCall方法:- 该方法接收一个
Request对象作为参数,返回一个Flux<String>对象。 - 使用
Flux.create创建Flux,在其回调中使用 OkHttp3 的enqueue方法发起异步请求。 onFailure方法:当请求失败时,打印错误信息并调用emitter.error方法将错误信号发送给Flux的订阅者。onResponse方法:- 若响应成功且响应体不为空,将响应体按行分割,逐行调用
emitter.next方法将每行数据发布到Flux中。 - 数据发布完成后,调用
emitter.complete方法发送完成信号。 - 若响应不成功,打印错误信息并调用
emitter.error方法发送错误信号。
- 若响应成功且响应体不为空,将响应体按行分割,逐行调用
- 该方法接收一个
main方法:- 构建一个
Request对象,指定要请求的 API 地址。 - 调用
makeApiCall方法获取Flux。 - 使用
subscribe方法订阅Flux,处理接收到的元素、错误和完成信号。
- 构建一个
注意事项
- 请将
https://example.com/api替换为实际的 API 地址。 - 该示例假设响应是逐行文本数据,你可以根据实际情况调整数据处理逻辑。
- 在实际应用中,需要注意资源管理,例如关闭 OkHttp 客户端等。
🎉🎉🎉🎉🎉🎉
欢迎访问的个人博客:https://swzbk.site/,加好友,拉你入福利群
🎉🎉🎉🎉🎉🎉
相关文章:
Spring WebFlux之流式输出
🎉🎉🎉🎉🎉🎉 欢迎访问的个人博客:https://swzbk.site/,加好友,拉你入福利群 🎉🎉🎉🎉🎉🎉 流式输…...
基于springboot医疗平台系统(源码+lw+部署文档+讲解),源码可白嫖!
摘要 信息化时代,各行各业都以网络为基础飞速发展,而医疗服务行业的发展却进展缓慢,传统的医疗服务行业已经逐渐不满足民众的需求,有些还在以线下预约挂号的方式接待病人,为此设计一个医疗平台系统很有必要。此类系统…...
Stable Diffusion lora训练(一)
一、不同维度的LoRA训练步数建议 2D风格训练 数据规模:建议20-50张高质量图片(分辨率≥10241024),覆盖多角度、多表情的平面风格。步数范围:总步数控制在1000-2000步,公式为 总步数 Repeat Image Epoch …...
网络空间安全(37)获取webshell方法总结
一、直接上传获取Webshell 这是最常见且直接的方法,利用网站对上传文件的过滤不严或存在漏洞,直接上传Webshell文件。 常见场景: 许多PHP和JSP程序存在此类漏洞。例如,一些论坛系统允许用户上传头像或心情图标,攻击者可…...
第十三次CCF-CSP认证(含C++源码)
第十三次CCF-CSP认证 跳一跳满分题解 碰撞的小球满分题解遇到的问题 棋局评估满分题解 跳一跳 题目链接 满分题解 没什么好说的 基本思路就是如何用代码翻译题目所给的一些限制,以及变量应该如何更新,没像往常一样给一个n,怎么读入数据&…...
【Agent】OpenManus-Prompt组件详细分析
1. 提示词架构概述 OpenManus 的提示词组件采用了模块化设计,为不同类型的智能体提供专门的提示词模板。每个提示词模块通常包含两种核心提示词:系统提示词(System Prompt)和下一步提示词(Next Step Prompt࿰…...
swagger ui 界面清除登录信息的办法
我们在开发过程中,用swagger ui 测试接口的时候,可能会要修改当前登录的用户。 但是如果我们在谷歌中对调试的本地swagger ui 登录地址存储过账户密码,每次启动项目调试之后,都会自动登录swagger ui ,登录界面一闪就…...
TensorFlow 的基本概念和使用场景
TensorFlow 是一个由 Google 开发的开源机器学习框架,主要用于构建和训练深度学习模型。下面是一些 TensorFlow 的基本概念和使用场景: 基本概念: 张量(Tensor):在 TensorFlow 中,数据以张量的…...
基于x11vnc的ubuntu远程桌面
1、安装VNC服务 sudo apt install x11vnc -y2、创建连接密码 sudo x11vnc -storepasswd3、安装lightdm服务 x11vnc 在 默认的 GDM3 中不起作用,因此需要使用 lightdm 桌面管理环境 sudo apt install lightdm -y切换至lightdm,上一步已经切换则跳过该…...
Cursor解锁Claude Max,助力AI编程新突破!
Cursor 最新推出的 Claude Max 模型,以其卓越的性能和创新的能力,正在重新定义我们对 AI 辅助编程的认知。这款搭载 Claude3.7 大脑的超级模型,不仅具备超强智能,还凭借一系列技术突破,向传统 AI 编程工具发起了挑战。…...
created在vue3 script setup中的写法
在 Vue 2 里,created 是一个生命周期钩子函数,会在实例已经创建完成之后被调用,主要用于在实例初始化之后、数据观测和 event/watcher 事件配置之前执行代码。而在 Vue 3 的 <script setup> 语法糖里,不再有像 Vue 2 那样直…...
GenICam标准
GenICam的目标是为所有类型的相机提供一个统一的编程接口。无论相机使用的是哪种传输协议或实现了哪些功能,编程接口(API)都是一样的。 GenICam(Generic Interface for Cameras)是一个为工业相机和图像采集设备设计的…...
ESP8266 与 ARM7 接口-LPC2148 创建 Web 服务器以控制 LED
ESP8266 与 ARM7 接口-LPC2148 创建 Web 服务器以控制 LED ESP8266 Wi-Fi 收发器提供了一种将微控制器连接到网络的方法。它被广泛用于物联网项目,因为它便宜、体积小且易于使用。 在本教程中,我们将 ESP8266 Wi-Fi 模块与 ARM7-LPC2148 微控制器连接,并创建一个 Web 服务…...
智享三代 AI 无人直播系统:颠覆传统,重塑直播新格局
在当今数字化浪潮席卷全球的时代,直播行业作为互联网经济的重要组成部分,正以前所未有的速度蓬勃发展。从最初的娱乐直播兴起,到如今电商直播、知识付费直播等多元业态百花齐放,直播已然成为人们生活和商业活动中不可或缺的一环。…...
通过C#脚本更改材质球的参数
// 设置贴图Texture mTexture Resources.Load("myTexture", typeof(Texture )) as Texture;material.SetTexture("_MainTex", mTexture );// 设置整数material.SetInt("_Int", 1);// 设置浮点material.SetFloat("_Float", 0.1f);// 设…...
FPGA管脚约束
目录 前言 一、IO约束 二、延迟约束 前言 IO约束包括管脚约束和延迟约束。 一、IO约束 对管脚进行约束,对应的约束语句: set_property -dict {PACKAGE_PIN AJ16 IOSTANDARD LVCMOS18} [get_ports "led[0]" ] 上面是单端的管脚&…...
已在此计算机上安装相同或更高版本的 .NET Framework 4”安装报错问题
安裝低版本的 .netFramework會被拒絕 需要做兩件事 1,允許windows安裝低版本的.net framework “已在此计算机上安装相同或更高版本的 .NET Framework 4”安装报错问题-CSDN博客 2,設置完成後重新安裝低版本的 .net framework,要用對應開發版本的 Win10 电脑安…...
如何判断 MSF 的 Payload 是 Staged 还是 Stageless(含 Meterpreter 与普通 Shell 对比)
在渗透测试领域,Metasploit Framework(MSF)的 msfvenom 工具是生成 Payload(载荷)的核心利器。然而,当我们选择 Payload 时,经常会遇到一个问题:这个 Payload 是 Staged(…...
【万字总结】前端全方位性能优化指南(一)——Brotli压缩、CDN智能分发、资源合并
前言 2025年前端技术前沿呈现三大核心趋势:AI深度赋能开发全流程,智能工具如GitHub Copilot X和Cursor实现代码生成、实时协作与自动化审查,开发效率提升3倍以上;性能与架构革新,WebAssembly 2.0支持多线程与Rust内存优化,边缘计算将渲染延迟压至50ms内,微前端Module …...
二.使用ffmpeg对原始音频数据重采样并进行AAC编码
重采样:将音频三元组【采样率 采样格式 通道数】之中的任何一个或者多个值改变。 一.为什么要进行重采样? 1.原始音频数据和编码器的数据格式不一致 2.播放器要求的和获取的数据不一致 3.方便运算 二.本次编码流程 1.了解自己本机麦克风参数&#x…...
实现前端.ttf字体包的压缩
前言 平常字体包都有1M的大小,所以网络请求耗时会比较长,所以对字体包的压缩也是前端优化的一个点。但是前端如果想要特点字符打包成字体包,网上查阅资料后,都是把前端代码里面的字符获取,但是对于动态的内容…...
uni-app集成保利威直播、点播SDK经验FQ(二)|小程序直播/APP直播开发适用
通过uniapp集成保利威直播、点播SDK来开发小程序/APP的视频直播能力,在实际开发中可能会遇到的疑问和解决方案,下篇。更多疑问请咨询19924784795。 1.ios不能后台挂起uniapp插件 ios端使用后台音频播放和画中画功能,没有在 manifest.json 进…...
Spring Framework 中 BeanDefinition 是什么
BeanDefinition 是 Spring Framework 中一个核心的接口,它描述了一个 Bean 的定义。你可以把它看作是 Spring IoC 容器中 Bean 的“蓝图”或“配置元数据”。它包含了 Spring 容器创建、配置和管理 Bean 所需的所有信息。 BeanDefinition 中包含的信息:…...
Sensodrive机器人力控关节模组SensoJoint在海洋垃圾清理机器人中的拓展应用
海洋污染已成为全球性的环境挑战,其中海底垃圾的清理尤为困难。据研究,海洋中约有2600万至6600万吨垃圾,超过90%沉积在海底。传统上,潜水员收集海底垃圾不仅成本高昂,而且充满风险。为解决这一问题,欧盟资助…...
MyBatis 配置文件解析使用了哪些设计模式
MyBatis 配置文件解析过程中,主要运用了以下几种设计模式 1. 建造者模式 (Builder Pattern): 应用场景: SqlSessionFactoryBuilder 和 XMLConfigBuilder 类都体现了建造者模式。模式描述: 建造者模式将一个复杂对象的构建过程与其表示分离,使得同样的构…...
sentinel限流算法
限流算法:固定窗口算法、滑动时间窗口、令牌桶和漏桶这四种常见限流算法的原理: 限流算法原理 固定窗口: 固定窗口算法将时间划分为固定大小的窗口,并在每个窗口内限制请求的数量。在每个窗口开始时,计数器重置&#…...
Git的基本指令
一、回滚 1.git init 在项目文件夹中打开bash生成一个.git的子目录,产生一个仓库 2.git status 查看当前目录下的所有文件的状态 3.git add . 将该目录下的所有文件提交到暂存区 4.git add 文件名 将该目录下的指定文件提交到暂存区 5.git commit -m 备注信…...
github上传本地文件到远程仓库(空仓库/已有文件的仓库)
今天搞自己本地训练的代码到仓库留个档,结果遇到了好多问题,到腾了半天才搞明白整个过程,留在这里记录一下。 远程空仓库 主要根据官方教程:Adding locally hosted code to GitHub - GitHub Docs #1. cd到你需要上传的文件夹&a…...
Git 分支使用规范全解(多人协作开发适用)
🚀 Git 分支使用规范全解(多人协作开发适用) 本文将为你梳理一套清晰、标准、适合企业/团队使用的 Git 分支管理策略,适用于前后端、边缘端、AI项目等多种场景。 🧩 为什么要规范分支管理? 防止多人协作混乱、冲突频发清晰区分:开发中 / 待发布 / 已上线 的版本快速定…...
Vitis 2024.1 无法正常编译custom ip的bug(因为Makefile里的wildcard)
现象:如果在vivado中,添加了自己的custom IP,比如AXI4 IP,那么在Vitis(2024.1)编译导出的原本的.xsa的时候,会构建build失败。报错代码是: "Compiling blank_test_ip..."…...
