当前位置: 首页 > news >正文

响应式编程Reactor API大全(上)

Reactor 是一个基于响应式编程的库,主要用于构建异步和事件驱动的应用程序。Reactor 提供了丰富的 API,包括创建、转换、过滤、组合等操作符,用于处理异步数据流。以下是一些 Reactor 的主要 API 示例:

pom依赖

   <dependencyManagement><dependencies><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-bom</artifactId><version>2023.0.0</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><dependencies><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId></dependency><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.junit.jupiter</groupId><artifactId>junit-jupiter</artifactId><version>5.7.2</version><scope>test</scope></dependency></dependencies>

1. 创建 Mono 和 Flux

  • Mono: 用于表示包含零个或一个元素的异步序列。
  • Flux: 用于表示包含零个或多个元素的异步序列。
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;public class ReactorCreateExample {public static void main(String[] args) {// 创建包含单个元素的 MonoMono<String> mono = Mono.just("Hello, Reactor!");// 创建包含多个元素的 FluxFlux<Integer> flux = Flux.fromArray(new Integer[]{1, 2, 3, 4, 5});mono.subscribe(System.out::println); // 输出: Hello, Reactor!flux.subscribe(System.out::println); // 输出: 1, 2, 3, 4, 5}
}

2. 转换操作符

使用转换操作符对数据流进行转换或处理。

import reactor.core.publisher.Flux;public class ReactorTransformExample {public static void main(String[] args) {Flux<Integer> source = Flux.range(1, 5);// 对每个元素进行平方操作Flux<Integer> squared = source.map(x -> x * x);squared.subscribe(System.out::println); // 输出: 1, 4, 9, 16, 25}
}

3. 过滤操作符

使用过滤操作符筛选数据流中的元素。

import reactor.core.publisher.Flux;public class ReactorFilterExample {public static void main(String[] args) {Flux<Integer> source = Flux.range(1, 5);// 筛选偶数Flux<Integer> evenNumbers = source.filter(x -> x % 2 == 0);evenNumbers.subscribe(System.out::println); // 输出: 2, 4}
}

4. 组合操作符

使用组合操作符组合多个数据流。

import reactor.core.publisher.Flux;public class ReactorCombineExample {public static void main(String[] args) {Flux<Integer> source1 = Flux.range(1, 3);Flux<Integer> source2 = Flux.range(4, 3);// 合并两个数据流Flux<Integer> merged = Flux.concat(source1, source2);merged.subscribe(System.out::println); // 输出: 1, 2, 3, 4, 5, 6}
}

这些只是 Reactor API 的一小部分示例。Reactor 提供了丰富的操作符和方法,用于处理复杂的异步数据流。开发人员可以根据具体需求选择适当的操作符进行组合,以构建出符合业务逻辑的异步处理链。

5. 错误处理

Reactor 提供了多种处理错误的方式,例如使用 onErrorResume, onErrorReturn, doOnError 等方法。

import reactor.core.publisher.Flux;public class ReactorErrorHandlingExample {public static void main(String[] args) {Flux<Integer> source = Flux.just(1, 2, 0, 4, 5);// 处理除零异常并提供默认值Flux<Integer> result = source.map(x -> 10 / x).onErrorResume(ex -> Flux.just(-1));result.subscribe(System.out::println); // 输出: 10, 5, -1}
}

6. 背压处理

Reactor 提供了背压处理的支持,允许生产者和消费者之间实现合理的数据流控制。使用 onBackpressureBuffer 或者其他背压操作符可以处理高速生产者和慢速消费者之间的数据流。

import reactor.core.publisher.Flux;public class ReactorBackpressureExample {public static void main(String[] args) {Flux<Integer> source = Flux.range(1, 1000);// 设置缓冲区大小Flux<Integer> buffered = source.onBackpressureBuffer(10);buffered.subscribe(data -> {// 模拟慢速消费者try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(data);},error -> System.err.println("Error: " + error),() -> System.out.println("Done"));}
}
  • TODO:未能实现没有背压和有背压的对比

7. 使用 Reactor WebFlux 处理 Web 请求

Reactor 还提供了 WebFlux 模块,用于处理响应式的 Web 请求。以下是一个简单的示例:

import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;@RestController
public class WebFluxController {@GetMapping("/hello")public Mono<ResponseEntity<String>> hello() {return Mono.just(ResponseEntity.ok("Hello, Reactor WebFlux!"));}
}

8. Reactor 核心概念

Reactor 中有一些核心概念,了解这些概念有助于更好地使用 Reactor API。

  • Publisher(发布者): 代表一个生产数据的源头,通常是 MonoFlux

  • Subscriber(订阅者): 用于消费数据流的组件。通过 subscribe 方法订阅 Publisher

  • Subscription(订阅): 代表 SubscriberPublisher 之间的连接。Subscriber 可以使用 Subscription 来请求数据,取消订阅等。

  • Processor(处理器): 既是 Publisher 又是 Subscriber,用于在两者之间进行转换和处理。

public class ReactorCoreConceptsExample {public static void main(String[] args) {// 创建发布者Flux<Integer> source = Flux.range(1, 5);// 创建处理器,并进行数据处理UnicastProcessor<Integer> processor = UnicastProcessor.create();source.map(value -> value * 2)  // Example: doubling the values.subscribe(processor);// 创建订阅者CustomSubscriber<Integer> subscriber = new CustomSubscriber<>();// 订阅并处理数据processor.subscribe(subscriber);}// 自定义订阅者static class CustomSubscriber<T> extends BaseSubscriber<T> {@Overrideprotected void hookOnNext(T value) {System.out.println("Processed Value: " + value);}@Overrideprotected void hookOnError(Throwable throwable) {System.err.println("Error: " + throwable);}@Overrideprotected void hookOnComplete() {System.out.println("Done");}}
}
  • UnicastProcessor.create()已弃用,可以使用Sinks.many().unicast().onBackpressureBuffer()

9. Reactor 调度器

Reactor 提供了多种调度器,用于控制异步操作的执行线程。例如,Schedulers.boundedElastic() 创建了一个弹性线程池,可以根据需要动态调整线程数。

public class ReactorSchedulersExample {public static void main(String[] args) {Flux.range(1, 5).publishOn(Schedulers.boundedElastic())  // 在弹性线程池上发布.map(x -> x * x).subscribeOn(Schedulers.parallel())  // 在并行线程池上订阅.subscribe(System.out::println);}
}
  • 经测试,大概率只使用了一个线程

11. 组合多个 Mono 或 Flux

使用 zip 操作符可以组合多个 MonoFlux,将它们的元素进行组合。

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;public class ReactorZipExample {public static void main(String[] args) {Mono<String> mono1 = Mono.just("Hello");Mono<String> mono2 = Mono.just("Reactor");// 将两个 Mono 合并为一个 FluxFlux<String> result = Flux.zip(mono1, mono2).map(tuple -> tuple.getT1() + " " + tuple.getT2());result.subscribe(System.out::println); // 输出: Hello Reactor}
}

12. 超时操作

使用 timeout 操作符可以在指定的时间内等待数据流产生结果,如果超时,则触发错误。

public class ReactorTimeoutExample {public static void main(String[] args) throws InterruptedException {Flux<Integer> source = Flux.just(1, 2, 3).delayElements(Duration.ofSeconds(2)); // 模拟延迟// 在指定时间内等待数据流产生结果,否则触发超时source.timeout(Duration.ofSeconds(1)).subscribe(data -> System.out.println("Received: " + data),error -> System.err.println("Error: " + error),() -> System.out.println("Done"));//睡一会,等待任务执行完成Thread.sleep(3333);}
}

13. 并行操作

使用 parallel 操作符可以将一个数据流并行处理,提高处理速度。

public class ReactorParallelExample {public static void main(String[] args) throws InterruptedException {Flux.range(1, 10).parallel().runOn(Schedulers.parallel()).map(x -> x * x).sequential().subscribe(System.out::println);//睡一会,等待任务执行完成Thread.sleep(1111);}
}

14. 与 Java Stream 集成

Reactor 与 Java Stream 可以方便地进行集成。

import reactor.core.publisher.Flux;
import java.util.stream.Stream;public class ReactorJavaStreamIntegrationExample {public static void main(String[] args) {Flux<Integer> flux = Flux.fromStream(Stream.of(1, 2, 3, 4, 5));flux.subscribe(System.out::println); // 输出: 1, 2, 3, 4, 5}
}

15. 使用 Mono 和 Flux 进行条件操作

Reactor 提供了条件操作符,例如 switchIfEmptyfilter,用于根据条件处理数据流。

public class ReactorConditionalOperatorsExample {public static void main(String[] args) {Flux<Integer> empty = Flux.range(1, 0);Flux<Integer> source = Flux.range(1, 5);// 如果数据流为空,则切换到另一个数据流empty.switchIfEmpty(Flux.range(6, 3)).subscribe(System.out::println); // 输出: 6,7,8// 使用 filter 过滤元素source.filter(x -> x % 2 == 0).subscribe(System.out::println); // 输出: 2, 4}
}

16. 使用 Reactor StepVerifier 进行测试

代码需要写在test测试目录下!!!

Reactor 提供了 StepVerifier 类,用于测试异步操作的行为。

public class ReactorTestingExample {public static void main(String[] args) {Flux<Integer> flux = Flux.range(1, 5);// 使用 StepVerifier 验证数据流的行为StepVerifier.create(flux).expectNext(1, 1, 3, 4, 5)//正确顺序应该是12345.expectComplete().verify();}
}

17. 使用 Mono 和 Flux 进行重试

Reactor 提供了 retryWhen 方法,结合 Backoff 操作符,用于在发生错误时进行重试。

public class ReactorRetryExample {public static void main(String[] args) throws InterruptedException {Mono<Object> source = Mono.fromCallable(() -> {throw new RuntimeException("Simulated error");})//最大重试次数为3次,初始重试间隔为1秒,并且采用指数回退策略,直到达到最大的回退时间(这里是5秒)。.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(5)));source.subscribe(data -> System.out.println("Received: " + data),error -> System.err.println("Error: " + error.getMessage()));//得多睡会儿,让它跑完最大重试时间Thread.sleep(999999);}
}

19. 使用 Reactor Context 进行上下文传递

Reactor 提供了 Context 类,用于在操作链中传递上下文信息。这对于在异步操作中共享信息非常有用。

import reactor.core.publisher.Mono;
import reactor.util.context.Context;public class ReactorContextExample {public static void main(String[] args) {Mono<String> mono = Mono.deferContextual(contextView ->Mono.just("Hello, " + contextView.get("user")));String result = mono.contextWrite(Context.of("user", "John")).block();System.out.println(result); // 输出: Hello, John}
}

20. 使用 Reactor 的 doOn 方法进行副作用处理

doOn 系列方法允许在数据流的不同生命周期阶段执行副作用操作,如日志记录、统计等。

import reactor.core.publisher.Flux;public class ReactorDoOnExample {public static void main(String[] args) {Flux<Integer> source = Flux.range(1, 5);source.doOnNext(value -> System.out.println("Processing element: " + value)).doOnComplete(() -> System.out.println("Processing complete")).subscribe(System.out::println);}
}

21. 使用 Reactor 的 transform 方法进行操作链重用

transform 方法允许对操作链进行重用,将一系列操作组合为一个新的 Function

import reactor.core.publisher.Flux;import java.util.function.Function;public class ReactorTransformExample {public static void main(String[] args) {Flux<Integer> source = Flux.range(1, 5);// 定义一个操作链Function<Flux<Integer>, Flux<Integer>> customTransform = flux ->flux.filter(x -> x % 2 == 0).map(x -> x * 2);// 使用 transform 应用自定义操作链source.transform(customTransform).subscribe(System.out::println); // 输出: 4, 8}
}

学习打卡:Java学习笔记-day06-响应式编程Reactor API大全(上)

相关文章:

响应式编程Reactor API大全(上)

Reactor 是一个基于响应式编程的库&#xff0c;主要用于构建异步和事件驱动的应用程序。Reactor 提供了丰富的 API&#xff0c;包括创建、转换、过滤、组合等操作符&#xff0c;用于处理异步数据流。以下是一些 Reactor 的主要 API 示例&#xff1a; pom依赖 <dependencyMan…...

vue3自定义指令

一个自定义指令由一个包含类似组件生命周期钩子的对象来定义。钩子函数会接收到指令所绑定元素作为其参数。 页面内创建自定义指令 下面是一个自定义指令的例子&#xff0c;当一个 input 元素被 Vue 插入到 DOM 中后&#xff0c;它会被自动聚焦&#xff1a; <script setu…...

ECharts 多季度连续显示到一个图中。

效果图 二.相关option 以下option可以复制到 echarts的编辑器 进行查看修改 const site test1; const site2 test2;const qtrlyOption function (data: any, titleText: string): any {//获取最大值 。最大最小值的目的是&#xff1a;使左右里边的所有bar使用同一个指标let …...

【Microsoft Copilot】手机端发布 ——GPT-4, DALL-E3 免费用

Microsoft Copilot 关于Microsoft CopilotMicrosoft Copilot 的特点1. 可以在手机端使用&#xff1a;2. 可以免费使用GPT-4。3. 可以无限制地使用GPT-4。4. 可以使用DALL-E3生成图片。5. 搜索功能6. 图像识别 Microsoft Copilot的缺点和注意事项1. 非常容易报错2. 不支持长篇聊…...

[蓝桥杯 2013 省 AB] 错误票据

题目背景 某涉密单位下发了某种票据&#xff0c;并要在年终全部收回。 题目描述 每张票据有唯一的 ID 号&#xff0c;全年所有票据的 ID 号是连续的&#xff0c;但 ID 的开始数码是随机选定的。因为工作人员疏忽&#xff0c;在录入 ID 号的时候发生了一处错误&#xff0c;造…...

IDEA GitHub令牌原理(Personal Access Token)

1.IDEA的add github account 是什么原理&#xff1f; 在IntelliJ IDEA中添加GitHub账户&#xff0c;主要是为了让IDEA能够与GitHub进行交互&#xff0c;如克隆GitHub上的仓库&#xff0c;提交代码到GitHub等。其基本原理如下&#xff1a; 用户在IDEA中输入GitHub的用户名和密…...

[开发语言][python][c++]:C++中的this指针和Python中的Self -- 26岁生日

C中的this指针和Python中的Self 1. python中的Self2. C中的this指针3. C中的this指针和Python中self的异同点&#xff1a; 以朋友的新岁祝福开篇&#xff0c;祝笔者也祝大家☺️&#xff1a; 一岁一礼 一寸欢喜且喜且乐 且以永日​ From VardoZ癸卯年十一月廿六(兔年)之…...

Android Traceview 定位卡顿问题

Traceview 是一个 Android 性能分析工具&#xff0c;用于时间性能分析&#xff0c;主要帮助开发者了解应用程序中各个方法的执行时间和调用关系。Traceview 可以通过图形化界面查看应用程序的代码执行细节&#xff0c;包括每个方法的调用次数、方法调用的时间消耗、方法调用堆栈…...

第三方 Cookie 被禁用?企业该如何实现用户精准运营和管理?

从 1 月 4 日开始&#xff0c;谷歌 Chrome 浏览器将逐步禁用第三方 Cookie 。作为全球最大的浏览器之一&#xff0c;Chrome 的这一动作无疑将引发行业内的重大变革。一直以来&#xff0c;第三方 Cookie 都是网络营销和广告的重要工具。然而&#xff0c;随着人们对隐私保护的日益…...

Autosar PNC网络管理配置(2)-基于ETAS软件

文章目录 BswM初始化PNC对PDU的控制BswMModeRequestPortBswMModeConditionBswMLogicalExpressionBswMRuleBswMActionListEcuMEcuMWakeupSourceEcuMShutdownCauseEcuMRbAlSwitchOffCalloutEcuMRbOnGoOff...

【SpringMVC快速使用】1.@RestController @RequestMapping 2.logback的使用

背景&#xff1a;为何从这个最简单的 例子写起呢&#xff1f; 那是因为我们的管理后台之类的都是别人写的&#xff0c;我也听说了大家说&#xff1a;只用Post请求就足够了&#xff0c;但是却发现&#xff0c;在浏览器中测试时&#xff0c;默认是GET请求&#xff0c;如果直接写…...

C2593 operator << 不明确

错误 C2593 “operator <<” 不明确&#xff0c;通常出现在C代码中&#xff0c;当你尝试使用<<运算符&#xff08;通常用于输出或位移运算&#xff09;时&#xff0c;编译器无法确定使用哪个重载版本的运算符。这个错误可能由几个原因引起&#xff1a; 多个重载冲突…...

vue:使用【3.0】:条件模块

一、条件层级效果图 二、代码 <template><ContentWrap><!-- 添加条件分支:level1 --><div class"btnBox" v-if"isEdit"><el-button type"primary" click"add">添加条件分支</el-button></div…...

Kafka与RabbitMQ的区别

消息队列介绍 消息队列&#xff08;Message Queue&#xff09;是一种在分布式系统中进行异步通信的机制。它允许一个或多个生产者在发送消息时暂时将消息存储在队列中&#xff0c;然后由一个或多个消费者按顺序读取并处理这些消息。 消息队列具有以下特点&#xff1a; 异步通…...

C++力扣题目538--把二叉搜索树转换为累加树

给出二叉 搜索 树的根节点&#xff0c;该树的节点值各不相同&#xff0c;请你将其转换为累加树&#xff08;Greater Sum Tree&#xff09;&#xff0c;使每个节点 node 的新值等于原树中大于或等于 node.val 的值之和。 提醒一下&#xff0c;二叉搜索树满足下列约束条件&#…...

曲线生成 | 图解贝塞尔曲线生成原理(附ROS C++/Python/Matlab仿真)

目录 0 专栏介绍1 贝塞尔曲线的应用2 图解贝塞尔曲线3 贝塞尔曲线的性质4 算法仿真4.1 ROS C仿真4.2 Python仿真4.3 Matlab仿真 0 专栏介绍 &#x1f525;附C/Python/Matlab全套代码&#x1f525;课程设计、毕业设计、创新竞赛必备&#xff01;详细介绍全局规划(图搜索、采样法…...

【一万字干货】一篇给你讲清楚智慧城市——附送智慧系列开发项目合集

智慧城市的概念 智慧城市&#xff08;Smart City&#xff09;起源于传媒领域&#xff0c;是指利用各种信息技术或创新概念&#xff0c;将城市的系统和服务打通、集成&#xff0c;以提升资源运用的效率&#xff0c;优化城市管理和服务&#xff0c;以及改善市民生活质量。 中国…...

关于如何禁用、暂停或退出OneDrive等操作,看这篇文件就够了

​想知道如何禁用OneDrive?你可以暂停OneDrive的文件同步,退出应用程序,阻止它在启动时打开,或者永远从你的机器上删除该应用程序。我们将向你展示如何在Windows计算机上完成所有这些操作。 如何在Windows上关闭OneDrive 有多种方法可以防止OneDrive在你的电脑上妨碍你。…...

Vue3-46-Pinia-获取全局状态变量的方式

使用说明 在 Pinia 中&#xff0c;获取状态变量的方式非常的简单 &#xff1a; 就和使用对象一样。 使用思路 &#xff1a; 1、导入Store&#xff1b;2、声明Store对象&#xff1b;3、使用对象。 在逻辑代码中使用 但是 Option Store 和 Setup Store 两种方式定义的全局状态变量…...

数据库——DAY1(Linux上安装MySQL8.0.35(网络仓库安装))

一、环境部署 1、Red Hat Enterprise Linux 9.3 64 位 2、删除之前安装过本地镜像版本的MySQL软件&#xff08;以前未安装过&#xff0c;请跳过此步骤&#xff09; [rootlocalhost ~]# dnf remove mysql-server -y [rootlocalhost ~]# rm -rf /var/lib/mysql [rootlocalhost …...

HTML 语义化

目录 HTML 语义化HTML5 新特性HTML 语义化的好处语义化标签的使用场景最佳实践 HTML 语义化 HTML5 新特性 标准答案&#xff1a; 语义化标签&#xff1a; <header>&#xff1a;页头<nav>&#xff1a;导航<main>&#xff1a;主要内容<article>&#x…...

应用升级/灾备测试时使用guarantee 闪回点迅速回退

1.场景 应用要升级,当升级失败时,数据库回退到升级前. 要测试系统,测试完成后,数据库要回退到测试前。 相对于RMAN恢复需要很长时间&#xff0c; 数据库闪回只需要几分钟。 2.技术实现 数据库设置 2个db_recovery参数 创建guarantee闪回点&#xff0c;不需要开启数据库闪回。…...

2024年赣州旅游投资集团社会招聘笔试真

2024年赣州旅游投资集团社会招聘笔试真 题 ( 满 分 1 0 0 分 时 间 1 2 0 分 钟 ) 一、单选题(每题只有一个正确答案,答错、不答或多答均不得分) 1.纪要的特点不包括()。 A.概括重点 B.指导传达 C. 客观纪实 D.有言必录 【答案】: D 2.1864年,()预言了电磁波的存在,并指出…...

深入理解JavaScript设计模式之单例模式

目录 什么是单例模式为什么需要单例模式常见应用场景包括 单例模式实现透明单例模式实现不透明单例模式用代理实现单例模式javaScript中的单例模式使用命名空间使用闭包封装私有变量 惰性单例通用的惰性单例 结语 什么是单例模式 单例模式&#xff08;Singleton Pattern&#…...

STM32标准库-DMA直接存储器存取

文章目录 一、DMA1.1简介1.2存储器映像1.3DMA框图1.4DMA基本结构1.5DMA请求1.6数据宽度与对齐1.7数据转运DMA1.8ADC扫描模式DMA 二、数据转运DMA2.1接线图2.2代码2.3相关API 一、DMA 1.1简介 DMA&#xff08;Direct Memory Access&#xff09;直接存储器存取 DMA可以提供外设…...

智能在线客服平台:数字化时代企业连接用户的 AI 中枢

随着互联网技术的飞速发展&#xff0c;消费者期望能够随时随地与企业进行交流。在线客服平台作为连接企业与客户的重要桥梁&#xff0c;不仅优化了客户体验&#xff0c;还提升了企业的服务效率和市场竞争力。本文将探讨在线客服平台的重要性、技术进展、实际应用&#xff0c;并…...

将对透视变换后的图像使用Otsu进行阈值化,来分离黑色和白色像素。这句话中的Otsu是什么意思?

Otsu 是一种自动阈值化方法&#xff0c;用于将图像分割为前景和背景。它通过最小化图像的类内方差或等价地最大化类间方差来选择最佳阈值。这种方法特别适用于图像的二值化处理&#xff0c;能够自动确定一个阈值&#xff0c;将图像中的像素分为黑色和白色两类。 Otsu 方法的原…...

HBuilderX安装(uni-app和小程序开发)

下载HBuilderX 访问官方网站&#xff1a;https://www.dcloud.io/hbuilderx.html 根据您的操作系统选择合适版本&#xff1a; Windows版&#xff08;推荐下载标准版&#xff09; Windows系统安装步骤 运行安装程序&#xff1a; 双击下载的.exe安装文件 如果出现安全提示&…...

12.找到字符串中所有字母异位词

&#x1f9e0; 题目解析 题目描述&#xff1a; 给定两个字符串 s 和 p&#xff0c;找出 s 中所有 p 的字母异位词的起始索引。 返回的答案以数组形式表示。 字母异位词定义&#xff1a; 若两个字符串包含的字符种类和出现次数完全相同&#xff0c;顺序无所谓&#xff0c;则互为…...

【C++从零实现Json-Rpc框架】第六弹 —— 服务端模块划分

一、项目背景回顾 前五弹完成了Json-Rpc协议解析、请求处理、客户端调用等基础模块搭建。 本弹重点聚焦于服务端的模块划分与架构设计&#xff0c;提升代码结构的可维护性与扩展性。 二、服务端模块设计目标 高内聚低耦合&#xff1a;各模块职责清晰&#xff0c;便于独立开发…...