当前位置: 首页 > news >正文

响应式编程Reactor API大全(上)

Reactor 是一个基于响应式编程的库,主要用于构建异步和事件驱动的应用程序。Reactor 提供了丰富的 API,包括创建、转换、过滤、组合等操作符,用于处理异步数据流。以下是一些 Reactor 的主要 API 示例:

pom依赖

   <dependencyManagement><dependencies><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-bom</artifactId><version>2023.0.0</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><dependencies><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId></dependency><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.junit.jupiter</groupId><artifactId>junit-jupiter</artifactId><version>5.7.2</version><scope>test</scope></dependency></dependencies>

1. 创建 Mono 和 Flux

  • Mono: 用于表示包含零个或一个元素的异步序列。
  • Flux: 用于表示包含零个或多个元素的异步序列。
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;public class ReactorCreateExample {public static void main(String[] args) {// 创建包含单个元素的 MonoMono<String> mono = Mono.just("Hello, Reactor!");// 创建包含多个元素的 FluxFlux<Integer> flux = Flux.fromArray(new Integer[]{1, 2, 3, 4, 5});mono.subscribe(System.out::println); // 输出: Hello, Reactor!flux.subscribe(System.out::println); // 输出: 1, 2, 3, 4, 5}
}

2. 转换操作符

使用转换操作符对数据流进行转换或处理。

import reactor.core.publisher.Flux;public class ReactorTransformExample {public static void main(String[] args) {Flux<Integer> source = Flux.range(1, 5);// 对每个元素进行平方操作Flux<Integer> squared = source.map(x -> x * x);squared.subscribe(System.out::println); // 输出: 1, 4, 9, 16, 25}
}

3. 过滤操作符

使用过滤操作符筛选数据流中的元素。

import reactor.core.publisher.Flux;public class ReactorFilterExample {public static void main(String[] args) {Flux<Integer> source = Flux.range(1, 5);// 筛选偶数Flux<Integer> evenNumbers = source.filter(x -> x % 2 == 0);evenNumbers.subscribe(System.out::println); // 输出: 2, 4}
}

4. 组合操作符

使用组合操作符组合多个数据流。

import reactor.core.publisher.Flux;public class ReactorCombineExample {public static void main(String[] args) {Flux<Integer> source1 = Flux.range(1, 3);Flux<Integer> source2 = Flux.range(4, 3);// 合并两个数据流Flux<Integer> merged = Flux.concat(source1, source2);merged.subscribe(System.out::println); // 输出: 1, 2, 3, 4, 5, 6}
}

这些只是 Reactor API 的一小部分示例。Reactor 提供了丰富的操作符和方法,用于处理复杂的异步数据流。开发人员可以根据具体需求选择适当的操作符进行组合,以构建出符合业务逻辑的异步处理链。

5. 错误处理

Reactor 提供了多种处理错误的方式,例如使用 onErrorResume, onErrorReturn, doOnError 等方法。

import reactor.core.publisher.Flux;public class ReactorErrorHandlingExample {public static void main(String[] args) {Flux<Integer> source = Flux.just(1, 2, 0, 4, 5);// 处理除零异常并提供默认值Flux<Integer> result = source.map(x -> 10 / x).onErrorResume(ex -> Flux.just(-1));result.subscribe(System.out::println); // 输出: 10, 5, -1}
}

6. 背压处理

Reactor 提供了背压处理的支持,允许生产者和消费者之间实现合理的数据流控制。使用 onBackpressureBuffer 或者其他背压操作符可以处理高速生产者和慢速消费者之间的数据流。

import reactor.core.publisher.Flux;public class ReactorBackpressureExample {public static void main(String[] args) {Flux<Integer> source = Flux.range(1, 1000);// 设置缓冲区大小Flux<Integer> buffered = source.onBackpressureBuffer(10);buffered.subscribe(data -> {// 模拟慢速消费者try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(data);},error -> System.err.println("Error: " + error),() -> System.out.println("Done"));}
}
  • TODO:未能实现没有背压和有背压的对比

7. 使用 Reactor WebFlux 处理 Web 请求

Reactor 还提供了 WebFlux 模块,用于处理响应式的 Web 请求。以下是一个简单的示例:

import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;@RestController
public class WebFluxController {@GetMapping("/hello")public Mono<ResponseEntity<String>> hello() {return Mono.just(ResponseEntity.ok("Hello, Reactor WebFlux!"));}
}

8. Reactor 核心概念

Reactor 中有一些核心概念,了解这些概念有助于更好地使用 Reactor API。

  • Publisher(发布者): 代表一个生产数据的源头,通常是 MonoFlux

  • Subscriber(订阅者): 用于消费数据流的组件。通过 subscribe 方法订阅 Publisher

  • Subscription(订阅): 代表 SubscriberPublisher 之间的连接。Subscriber 可以使用 Subscription 来请求数据,取消订阅等。

  • Processor(处理器): 既是 Publisher 又是 Subscriber,用于在两者之间进行转换和处理。

public class ReactorCoreConceptsExample {public static void main(String[] args) {// 创建发布者Flux<Integer> source = Flux.range(1, 5);// 创建处理器,并进行数据处理UnicastProcessor<Integer> processor = UnicastProcessor.create();source.map(value -> value * 2)  // Example: doubling the values.subscribe(processor);// 创建订阅者CustomSubscriber<Integer> subscriber = new CustomSubscriber<>();// 订阅并处理数据processor.subscribe(subscriber);}// 自定义订阅者static class CustomSubscriber<T> extends BaseSubscriber<T> {@Overrideprotected void hookOnNext(T value) {System.out.println("Processed Value: " + value);}@Overrideprotected void hookOnError(Throwable throwable) {System.err.println("Error: " + throwable);}@Overrideprotected void hookOnComplete() {System.out.println("Done");}}
}
  • UnicastProcessor.create()已弃用,可以使用Sinks.many().unicast().onBackpressureBuffer()

9. Reactor 调度器

Reactor 提供了多种调度器,用于控制异步操作的执行线程。例如,Schedulers.boundedElastic() 创建了一个弹性线程池,可以根据需要动态调整线程数。

public class ReactorSchedulersExample {public static void main(String[] args) {Flux.range(1, 5).publishOn(Schedulers.boundedElastic())  // 在弹性线程池上发布.map(x -> x * x).subscribeOn(Schedulers.parallel())  // 在并行线程池上订阅.subscribe(System.out::println);}
}
  • 经测试,大概率只使用了一个线程

11. 组合多个 Mono 或 Flux

使用 zip 操作符可以组合多个 MonoFlux,将它们的元素进行组合。

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;public class ReactorZipExample {public static void main(String[] args) {Mono<String> mono1 = Mono.just("Hello");Mono<String> mono2 = Mono.just("Reactor");// 将两个 Mono 合并为一个 FluxFlux<String> result = Flux.zip(mono1, mono2).map(tuple -> tuple.getT1() + " " + tuple.getT2());result.subscribe(System.out::println); // 输出: Hello Reactor}
}

12. 超时操作

使用 timeout 操作符可以在指定的时间内等待数据流产生结果,如果超时,则触发错误。

public class ReactorTimeoutExample {public static void main(String[] args) throws InterruptedException {Flux<Integer> source = Flux.just(1, 2, 3).delayElements(Duration.ofSeconds(2)); // 模拟延迟// 在指定时间内等待数据流产生结果,否则触发超时source.timeout(Duration.ofSeconds(1)).subscribe(data -> System.out.println("Received: " + data),error -> System.err.println("Error: " + error),() -> System.out.println("Done"));//睡一会,等待任务执行完成Thread.sleep(3333);}
}

13. 并行操作

使用 parallel 操作符可以将一个数据流并行处理,提高处理速度。

public class ReactorParallelExample {public static void main(String[] args) throws InterruptedException {Flux.range(1, 10).parallel().runOn(Schedulers.parallel()).map(x -> x * x).sequential().subscribe(System.out::println);//睡一会,等待任务执行完成Thread.sleep(1111);}
}

14. 与 Java Stream 集成

Reactor 与 Java Stream 可以方便地进行集成。

import reactor.core.publisher.Flux;
import java.util.stream.Stream;public class ReactorJavaStreamIntegrationExample {public static void main(String[] args) {Flux<Integer> flux = Flux.fromStream(Stream.of(1, 2, 3, 4, 5));flux.subscribe(System.out::println); // 输出: 1, 2, 3, 4, 5}
}

15. 使用 Mono 和 Flux 进行条件操作

Reactor 提供了条件操作符,例如 switchIfEmptyfilter,用于根据条件处理数据流。

public class ReactorConditionalOperatorsExample {public static void main(String[] args) {Flux<Integer> empty = Flux.range(1, 0);Flux<Integer> source = Flux.range(1, 5);// 如果数据流为空,则切换到另一个数据流empty.switchIfEmpty(Flux.range(6, 3)).subscribe(System.out::println); // 输出: 6,7,8// 使用 filter 过滤元素source.filter(x -> x % 2 == 0).subscribe(System.out::println); // 输出: 2, 4}
}

16. 使用 Reactor StepVerifier 进行测试

代码需要写在test测试目录下!!!

Reactor 提供了 StepVerifier 类,用于测试异步操作的行为。

public class ReactorTestingExample {public static void main(String[] args) {Flux<Integer> flux = Flux.range(1, 5);// 使用 StepVerifier 验证数据流的行为StepVerifier.create(flux).expectNext(1, 1, 3, 4, 5)//正确顺序应该是12345.expectComplete().verify();}
}

17. 使用 Mono 和 Flux 进行重试

Reactor 提供了 retryWhen 方法,结合 Backoff 操作符,用于在发生错误时进行重试。

public class ReactorRetryExample {public static void main(String[] args) throws InterruptedException {Mono<Object> source = Mono.fromCallable(() -> {throw new RuntimeException("Simulated error");})//最大重试次数为3次,初始重试间隔为1秒,并且采用指数回退策略,直到达到最大的回退时间(这里是5秒)。.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(5)));source.subscribe(data -> System.out.println("Received: " + data),error -> System.err.println("Error: " + error.getMessage()));//得多睡会儿,让它跑完最大重试时间Thread.sleep(999999);}
}

19. 使用 Reactor Context 进行上下文传递

Reactor 提供了 Context 类,用于在操作链中传递上下文信息。这对于在异步操作中共享信息非常有用。

import reactor.core.publisher.Mono;
import reactor.util.context.Context;public class ReactorContextExample {public static void main(String[] args) {Mono<String> mono = Mono.deferContextual(contextView ->Mono.just("Hello, " + contextView.get("user")));String result = mono.contextWrite(Context.of("user", "John")).block();System.out.println(result); // 输出: Hello, John}
}

20. 使用 Reactor 的 doOn 方法进行副作用处理

doOn 系列方法允许在数据流的不同生命周期阶段执行副作用操作,如日志记录、统计等。

import reactor.core.publisher.Flux;public class ReactorDoOnExample {public static void main(String[] args) {Flux<Integer> source = Flux.range(1, 5);source.doOnNext(value -> System.out.println("Processing element: " + value)).doOnComplete(() -> System.out.println("Processing complete")).subscribe(System.out::println);}
}

21. 使用 Reactor 的 transform 方法进行操作链重用

transform 方法允许对操作链进行重用,将一系列操作组合为一个新的 Function

import reactor.core.publisher.Flux;import java.util.function.Function;public class ReactorTransformExample {public static void main(String[] args) {Flux<Integer> source = Flux.range(1, 5);// 定义一个操作链Function<Flux<Integer>, Flux<Integer>> customTransform = flux ->flux.filter(x -> x % 2 == 0).map(x -> x * 2);// 使用 transform 应用自定义操作链source.transform(customTransform).subscribe(System.out::println); // 输出: 4, 8}
}

学习打卡:Java学习笔记-day06-响应式编程Reactor API大全(上)

相关文章:

响应式编程Reactor API大全(上)

Reactor 是一个基于响应式编程的库&#xff0c;主要用于构建异步和事件驱动的应用程序。Reactor 提供了丰富的 API&#xff0c;包括创建、转换、过滤、组合等操作符&#xff0c;用于处理异步数据流。以下是一些 Reactor 的主要 API 示例&#xff1a; pom依赖 <dependencyMan…...

vue3自定义指令

一个自定义指令由一个包含类似组件生命周期钩子的对象来定义。钩子函数会接收到指令所绑定元素作为其参数。 页面内创建自定义指令 下面是一个自定义指令的例子&#xff0c;当一个 input 元素被 Vue 插入到 DOM 中后&#xff0c;它会被自动聚焦&#xff1a; <script setu…...

ECharts 多季度连续显示到一个图中。

效果图 二.相关option 以下option可以复制到 echarts的编辑器 进行查看修改 const site test1; const site2 test2;const qtrlyOption function (data: any, titleText: string): any {//获取最大值 。最大最小值的目的是&#xff1a;使左右里边的所有bar使用同一个指标let …...

【Microsoft Copilot】手机端发布 ——GPT-4, DALL-E3 免费用

Microsoft Copilot 关于Microsoft CopilotMicrosoft Copilot 的特点1. 可以在手机端使用&#xff1a;2. 可以免费使用GPT-4。3. 可以无限制地使用GPT-4。4. 可以使用DALL-E3生成图片。5. 搜索功能6. 图像识别 Microsoft Copilot的缺点和注意事项1. 非常容易报错2. 不支持长篇聊…...

[蓝桥杯 2013 省 AB] 错误票据

题目背景 某涉密单位下发了某种票据&#xff0c;并要在年终全部收回。 题目描述 每张票据有唯一的 ID 号&#xff0c;全年所有票据的 ID 号是连续的&#xff0c;但 ID 的开始数码是随机选定的。因为工作人员疏忽&#xff0c;在录入 ID 号的时候发生了一处错误&#xff0c;造…...

IDEA GitHub令牌原理(Personal Access Token)

1.IDEA的add github account 是什么原理&#xff1f; 在IntelliJ IDEA中添加GitHub账户&#xff0c;主要是为了让IDEA能够与GitHub进行交互&#xff0c;如克隆GitHub上的仓库&#xff0c;提交代码到GitHub等。其基本原理如下&#xff1a; 用户在IDEA中输入GitHub的用户名和密…...

[开发语言][python][c++]:C++中的this指针和Python中的Self -- 26岁生日

C中的this指针和Python中的Self 1. python中的Self2. C中的this指针3. C中的this指针和Python中self的异同点&#xff1a; 以朋友的新岁祝福开篇&#xff0c;祝笔者也祝大家☺️&#xff1a; 一岁一礼 一寸欢喜且喜且乐 且以永日​ From VardoZ癸卯年十一月廿六(兔年)之…...

Android Traceview 定位卡顿问题

Traceview 是一个 Android 性能分析工具&#xff0c;用于时间性能分析&#xff0c;主要帮助开发者了解应用程序中各个方法的执行时间和调用关系。Traceview 可以通过图形化界面查看应用程序的代码执行细节&#xff0c;包括每个方法的调用次数、方法调用的时间消耗、方法调用堆栈…...

第三方 Cookie 被禁用?企业该如何实现用户精准运营和管理?

从 1 月 4 日开始&#xff0c;谷歌 Chrome 浏览器将逐步禁用第三方 Cookie 。作为全球最大的浏览器之一&#xff0c;Chrome 的这一动作无疑将引发行业内的重大变革。一直以来&#xff0c;第三方 Cookie 都是网络营销和广告的重要工具。然而&#xff0c;随着人们对隐私保护的日益…...

Autosar PNC网络管理配置(2)-基于ETAS软件

文章目录 BswM初始化PNC对PDU的控制BswMModeRequestPortBswMModeConditionBswMLogicalExpressionBswMRuleBswMActionListEcuMEcuMWakeupSourceEcuMShutdownCauseEcuMRbAlSwitchOffCalloutEcuMRbOnGoOff...

【SpringMVC快速使用】1.@RestController @RequestMapping 2.logback的使用

背景&#xff1a;为何从这个最简单的 例子写起呢&#xff1f; 那是因为我们的管理后台之类的都是别人写的&#xff0c;我也听说了大家说&#xff1a;只用Post请求就足够了&#xff0c;但是却发现&#xff0c;在浏览器中测试时&#xff0c;默认是GET请求&#xff0c;如果直接写…...

C2593 operator << 不明确

错误 C2593 “operator <<” 不明确&#xff0c;通常出现在C代码中&#xff0c;当你尝试使用<<运算符&#xff08;通常用于输出或位移运算&#xff09;时&#xff0c;编译器无法确定使用哪个重载版本的运算符。这个错误可能由几个原因引起&#xff1a; 多个重载冲突…...

vue:使用【3.0】:条件模块

一、条件层级效果图 二、代码 <template><ContentWrap><!-- 添加条件分支:level1 --><div class"btnBox" v-if"isEdit"><el-button type"primary" click"add">添加条件分支</el-button></div…...

Kafka与RabbitMQ的区别

消息队列介绍 消息队列&#xff08;Message Queue&#xff09;是一种在分布式系统中进行异步通信的机制。它允许一个或多个生产者在发送消息时暂时将消息存储在队列中&#xff0c;然后由一个或多个消费者按顺序读取并处理这些消息。 消息队列具有以下特点&#xff1a; 异步通…...

C++力扣题目538--把二叉搜索树转换为累加树

给出二叉 搜索 树的根节点&#xff0c;该树的节点值各不相同&#xff0c;请你将其转换为累加树&#xff08;Greater Sum Tree&#xff09;&#xff0c;使每个节点 node 的新值等于原树中大于或等于 node.val 的值之和。 提醒一下&#xff0c;二叉搜索树满足下列约束条件&#…...

曲线生成 | 图解贝塞尔曲线生成原理(附ROS C++/Python/Matlab仿真)

目录 0 专栏介绍1 贝塞尔曲线的应用2 图解贝塞尔曲线3 贝塞尔曲线的性质4 算法仿真4.1 ROS C仿真4.2 Python仿真4.3 Matlab仿真 0 专栏介绍 &#x1f525;附C/Python/Matlab全套代码&#x1f525;课程设计、毕业设计、创新竞赛必备&#xff01;详细介绍全局规划(图搜索、采样法…...

【一万字干货】一篇给你讲清楚智慧城市——附送智慧系列开发项目合集

智慧城市的概念 智慧城市&#xff08;Smart City&#xff09;起源于传媒领域&#xff0c;是指利用各种信息技术或创新概念&#xff0c;将城市的系统和服务打通、集成&#xff0c;以提升资源运用的效率&#xff0c;优化城市管理和服务&#xff0c;以及改善市民生活质量。 中国…...

关于如何禁用、暂停或退出OneDrive等操作,看这篇文件就够了

​想知道如何禁用OneDrive?你可以暂停OneDrive的文件同步,退出应用程序,阻止它在启动时打开,或者永远从你的机器上删除该应用程序。我们将向你展示如何在Windows计算机上完成所有这些操作。 如何在Windows上关闭OneDrive 有多种方法可以防止OneDrive在你的电脑上妨碍你。…...

Vue3-46-Pinia-获取全局状态变量的方式

使用说明 在 Pinia 中&#xff0c;获取状态变量的方式非常的简单 &#xff1a; 就和使用对象一样。 使用思路 &#xff1a; 1、导入Store&#xff1b;2、声明Store对象&#xff1b;3、使用对象。 在逻辑代码中使用 但是 Option Store 和 Setup Store 两种方式定义的全局状态变量…...

数据库——DAY1(Linux上安装MySQL8.0.35(网络仓库安装))

一、环境部署 1、Red Hat Enterprise Linux 9.3 64 位 2、删除之前安装过本地镜像版本的MySQL软件&#xff08;以前未安装过&#xff0c;请跳过此步骤&#xff09; [rootlocalhost ~]# dnf remove mysql-server -y [rootlocalhost ~]# rm -rf /var/lib/mysql [rootlocalhost …...

基于距离变化能量开销动态调整的WSN低功耗拓扑控制开销算法matlab仿真

目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.算法仿真参数 5.算法理论概述 6.参考文献 7.完整程序 1.程序功能描述 通过动态调整节点通信的能量开销&#xff0c;平衡网络负载&#xff0c;延长WSN生命周期。具体通过建立基于距离的能量消耗模型&am…...

STM32+rt-thread判断是否联网

一、根据NETDEV_FLAG_INTERNET_UP位判断 static bool is_conncected(void) {struct netdev *dev RT_NULL;dev netdev_get_first_by_flags(NETDEV_FLAG_INTERNET_UP);if (dev RT_NULL){printf("wait netdev internet up...");return false;}else{printf("loc…...

Qwen3-Embedding-0.6B深度解析:多语言语义检索的轻量级利器

第一章 引言&#xff1a;语义表示的新时代挑战与Qwen3的破局之路 1.1 文本嵌入的核心价值与技术演进 在人工智能领域&#xff0c;文本嵌入技术如同连接自然语言与机器理解的“神经突触”——它将人类语言转化为计算机可计算的语义向量&#xff0c;支撑着搜索引擎、推荐系统、…...

新能源汽车智慧充电桩管理方案:新能源充电桩散热问题及消防安全监管方案

随着新能源汽车的快速普及&#xff0c;充电桩作为核心配套设施&#xff0c;其安全性与可靠性备受关注。然而&#xff0c;在高温、高负荷运行环境下&#xff0c;充电桩的散热问题与消防安全隐患日益凸显&#xff0c;成为制约行业发展的关键瓶颈。 如何通过智慧化管理手段优化散…...

前端开发面试题总结-JavaScript篇(一)

文章目录 JavaScript高频问答一、作用域与闭包1.什么是闭包&#xff08;Closure&#xff09;&#xff1f;闭包有什么应用场景和潜在问题&#xff1f;2.解释 JavaScript 的作用域链&#xff08;Scope Chain&#xff09; 二、原型与继承3.原型链是什么&#xff1f;如何实现继承&a…...

高防服务器能够抵御哪些网络攻击呢?

高防服务器作为一种有着高度防御能力的服务器&#xff0c;可以帮助网站应对分布式拒绝服务攻击&#xff0c;有效识别和清理一些恶意的网络流量&#xff0c;为用户提供安全且稳定的网络环境&#xff0c;那么&#xff0c;高防服务器一般都可以抵御哪些网络攻击呢&#xff1f;下面…...

《C++ 模板》

目录 函数模板 类模板 非类型模板参数 模板特化 函数模板特化 类模板的特化 模板&#xff0c;就像一个模具&#xff0c;里面可以将不同类型的材料做成一个形状&#xff0c;其分为函数模板和类模板。 函数模板 函数模板可以简化函数重载的代码。格式&#xff1a;templa…...

音视频——I2S 协议详解

I2S 协议详解 I2S (Inter-IC Sound) 协议是一种串行总线协议&#xff0c;专门用于在数字音频设备之间传输数字音频数据。它由飞利浦&#xff08;Philips&#xff09;公司开发&#xff0c;以其简单、高效和广泛的兼容性而闻名。 1. 信号线 I2S 协议通常使用三根或四根信号线&a…...

探索Selenium:自动化测试的神奇钥匙

目录 一、Selenium 是什么1.1 定义与概念1.2 发展历程1.3 功能概述 二、Selenium 工作原理剖析2.1 架构组成2.2 工作流程2.3 通信机制 三、Selenium 的优势3.1 跨浏览器与平台支持3.2 丰富的语言支持3.3 强大的社区支持 四、Selenium 的应用场景4.1 Web 应用自动化测试4.2 数据…...

【C++】纯虚函数类外可以写实现吗?

1. 答案 先说答案&#xff0c;可以。 2.代码测试 .h头文件 #include <iostream> #include <string>// 抽象基类 class AbstractBase { public:AbstractBase() default;virtual ~AbstractBase() default; // 默认析构函数public:virtual int PureVirtualFunct…...