当前位置: 首页 > news >正文

响应式编程Reactor API大全(上)

Reactor 是一个基于响应式编程的库,主要用于构建异步和事件驱动的应用程序。Reactor 提供了丰富的 API,包括创建、转换、过滤、组合等操作符,用于处理异步数据流。以下是一些 Reactor 的主要 API 示例:

pom依赖

   <dependencyManagement><dependencies><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-bom</artifactId><version>2023.0.0</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><dependencies><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId></dependency><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.junit.jupiter</groupId><artifactId>junit-jupiter</artifactId><version>5.7.2</version><scope>test</scope></dependency></dependencies>

1. 创建 Mono 和 Flux

  • Mono: 用于表示包含零个或一个元素的异步序列。
  • Flux: 用于表示包含零个或多个元素的异步序列。
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;public class ReactorCreateExample {public static void main(String[] args) {// 创建包含单个元素的 MonoMono<String> mono = Mono.just("Hello, Reactor!");// 创建包含多个元素的 FluxFlux<Integer> flux = Flux.fromArray(new Integer[]{1, 2, 3, 4, 5});mono.subscribe(System.out::println); // 输出: Hello, Reactor!flux.subscribe(System.out::println); // 输出: 1, 2, 3, 4, 5}
}

2. 转换操作符

使用转换操作符对数据流进行转换或处理。

import reactor.core.publisher.Flux;public class ReactorTransformExample {public static void main(String[] args) {Flux<Integer> source = Flux.range(1, 5);// 对每个元素进行平方操作Flux<Integer> squared = source.map(x -> x * x);squared.subscribe(System.out::println); // 输出: 1, 4, 9, 16, 25}
}

3. 过滤操作符

使用过滤操作符筛选数据流中的元素。

import reactor.core.publisher.Flux;public class ReactorFilterExample {public static void main(String[] args) {Flux<Integer> source = Flux.range(1, 5);// 筛选偶数Flux<Integer> evenNumbers = source.filter(x -> x % 2 == 0);evenNumbers.subscribe(System.out::println); // 输出: 2, 4}
}

4. 组合操作符

使用组合操作符组合多个数据流。

import reactor.core.publisher.Flux;public class ReactorCombineExample {public static void main(String[] args) {Flux<Integer> source1 = Flux.range(1, 3);Flux<Integer> source2 = Flux.range(4, 3);// 合并两个数据流Flux<Integer> merged = Flux.concat(source1, source2);merged.subscribe(System.out::println); // 输出: 1, 2, 3, 4, 5, 6}
}

这些只是 Reactor API 的一小部分示例。Reactor 提供了丰富的操作符和方法,用于处理复杂的异步数据流。开发人员可以根据具体需求选择适当的操作符进行组合,以构建出符合业务逻辑的异步处理链。

5. 错误处理

Reactor 提供了多种处理错误的方式,例如使用 onErrorResume, onErrorReturn, doOnError 等方法。

import reactor.core.publisher.Flux;public class ReactorErrorHandlingExample {public static void main(String[] args) {Flux<Integer> source = Flux.just(1, 2, 0, 4, 5);// 处理除零异常并提供默认值Flux<Integer> result = source.map(x -> 10 / x).onErrorResume(ex -> Flux.just(-1));result.subscribe(System.out::println); // 输出: 10, 5, -1}
}

6. 背压处理

Reactor 提供了背压处理的支持,允许生产者和消费者之间实现合理的数据流控制。使用 onBackpressureBuffer 或者其他背压操作符可以处理高速生产者和慢速消费者之间的数据流。

import reactor.core.publisher.Flux;public class ReactorBackpressureExample {public static void main(String[] args) {Flux<Integer> source = Flux.range(1, 1000);// 设置缓冲区大小Flux<Integer> buffered = source.onBackpressureBuffer(10);buffered.subscribe(data -> {// 模拟慢速消费者try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(data);},error -> System.err.println("Error: " + error),() -> System.out.println("Done"));}
}
  • TODO:未能实现没有背压和有背压的对比

7. 使用 Reactor WebFlux 处理 Web 请求

Reactor 还提供了 WebFlux 模块,用于处理响应式的 Web 请求。以下是一个简单的示例:

import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;@RestController
public class WebFluxController {@GetMapping("/hello")public Mono<ResponseEntity<String>> hello() {return Mono.just(ResponseEntity.ok("Hello, Reactor WebFlux!"));}
}

8. Reactor 核心概念

Reactor 中有一些核心概念,了解这些概念有助于更好地使用 Reactor API。

  • Publisher(发布者): 代表一个生产数据的源头,通常是 MonoFlux

  • Subscriber(订阅者): 用于消费数据流的组件。通过 subscribe 方法订阅 Publisher

  • Subscription(订阅): 代表 SubscriberPublisher 之间的连接。Subscriber 可以使用 Subscription 来请求数据,取消订阅等。

  • Processor(处理器): 既是 Publisher 又是 Subscriber,用于在两者之间进行转换和处理。

public class ReactorCoreConceptsExample {public static void main(String[] args) {// 创建发布者Flux<Integer> source = Flux.range(1, 5);// 创建处理器,并进行数据处理UnicastProcessor<Integer> processor = UnicastProcessor.create();source.map(value -> value * 2)  // Example: doubling the values.subscribe(processor);// 创建订阅者CustomSubscriber<Integer> subscriber = new CustomSubscriber<>();// 订阅并处理数据processor.subscribe(subscriber);}// 自定义订阅者static class CustomSubscriber<T> extends BaseSubscriber<T> {@Overrideprotected void hookOnNext(T value) {System.out.println("Processed Value: " + value);}@Overrideprotected void hookOnError(Throwable throwable) {System.err.println("Error: " + throwable);}@Overrideprotected void hookOnComplete() {System.out.println("Done");}}
}
  • UnicastProcessor.create()已弃用,可以使用Sinks.many().unicast().onBackpressureBuffer()

9. Reactor 调度器

Reactor 提供了多种调度器,用于控制异步操作的执行线程。例如,Schedulers.boundedElastic() 创建了一个弹性线程池,可以根据需要动态调整线程数。

public class ReactorSchedulersExample {public static void main(String[] args) {Flux.range(1, 5).publishOn(Schedulers.boundedElastic())  // 在弹性线程池上发布.map(x -> x * x).subscribeOn(Schedulers.parallel())  // 在并行线程池上订阅.subscribe(System.out::println);}
}
  • 经测试,大概率只使用了一个线程

11. 组合多个 Mono 或 Flux

使用 zip 操作符可以组合多个 MonoFlux,将它们的元素进行组合。

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;public class ReactorZipExample {public static void main(String[] args) {Mono<String> mono1 = Mono.just("Hello");Mono<String> mono2 = Mono.just("Reactor");// 将两个 Mono 合并为一个 FluxFlux<String> result = Flux.zip(mono1, mono2).map(tuple -> tuple.getT1() + " " + tuple.getT2());result.subscribe(System.out::println); // 输出: Hello Reactor}
}

12. 超时操作

使用 timeout 操作符可以在指定的时间内等待数据流产生结果,如果超时,则触发错误。

public class ReactorTimeoutExample {public static void main(String[] args) throws InterruptedException {Flux<Integer> source = Flux.just(1, 2, 3).delayElements(Duration.ofSeconds(2)); // 模拟延迟// 在指定时间内等待数据流产生结果,否则触发超时source.timeout(Duration.ofSeconds(1)).subscribe(data -> System.out.println("Received: " + data),error -> System.err.println("Error: " + error),() -> System.out.println("Done"));//睡一会,等待任务执行完成Thread.sleep(3333);}
}

13. 并行操作

使用 parallel 操作符可以将一个数据流并行处理,提高处理速度。

public class ReactorParallelExample {public static void main(String[] args) throws InterruptedException {Flux.range(1, 10).parallel().runOn(Schedulers.parallel()).map(x -> x * x).sequential().subscribe(System.out::println);//睡一会,等待任务执行完成Thread.sleep(1111);}
}

14. 与 Java Stream 集成

Reactor 与 Java Stream 可以方便地进行集成。

import reactor.core.publisher.Flux;
import java.util.stream.Stream;public class ReactorJavaStreamIntegrationExample {public static void main(String[] args) {Flux<Integer> flux = Flux.fromStream(Stream.of(1, 2, 3, 4, 5));flux.subscribe(System.out::println); // 输出: 1, 2, 3, 4, 5}
}

15. 使用 Mono 和 Flux 进行条件操作

Reactor 提供了条件操作符,例如 switchIfEmptyfilter,用于根据条件处理数据流。

public class ReactorConditionalOperatorsExample {public static void main(String[] args) {Flux<Integer> empty = Flux.range(1, 0);Flux<Integer> source = Flux.range(1, 5);// 如果数据流为空,则切换到另一个数据流empty.switchIfEmpty(Flux.range(6, 3)).subscribe(System.out::println); // 输出: 6,7,8// 使用 filter 过滤元素source.filter(x -> x % 2 == 0).subscribe(System.out::println); // 输出: 2, 4}
}

16. 使用 Reactor StepVerifier 进行测试

代码需要写在test测试目录下!!!

Reactor 提供了 StepVerifier 类,用于测试异步操作的行为。

public class ReactorTestingExample {public static void main(String[] args) {Flux<Integer> flux = Flux.range(1, 5);// 使用 StepVerifier 验证数据流的行为StepVerifier.create(flux).expectNext(1, 1, 3, 4, 5)//正确顺序应该是12345.expectComplete().verify();}
}

17. 使用 Mono 和 Flux 进行重试

Reactor 提供了 retryWhen 方法,结合 Backoff 操作符,用于在发生错误时进行重试。

public class ReactorRetryExample {public static void main(String[] args) throws InterruptedException {Mono<Object> source = Mono.fromCallable(() -> {throw new RuntimeException("Simulated error");})//最大重试次数为3次,初始重试间隔为1秒,并且采用指数回退策略,直到达到最大的回退时间(这里是5秒)。.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(5)));source.subscribe(data -> System.out.println("Received: " + data),error -> System.err.println("Error: " + error.getMessage()));//得多睡会儿,让它跑完最大重试时间Thread.sleep(999999);}
}

19. 使用 Reactor Context 进行上下文传递

Reactor 提供了 Context 类,用于在操作链中传递上下文信息。这对于在异步操作中共享信息非常有用。

import reactor.core.publisher.Mono;
import reactor.util.context.Context;public class ReactorContextExample {public static void main(String[] args) {Mono<String> mono = Mono.deferContextual(contextView ->Mono.just("Hello, " + contextView.get("user")));String result = mono.contextWrite(Context.of("user", "John")).block();System.out.println(result); // 输出: Hello, John}
}

20. 使用 Reactor 的 doOn 方法进行副作用处理

doOn 系列方法允许在数据流的不同生命周期阶段执行副作用操作,如日志记录、统计等。

import reactor.core.publisher.Flux;public class ReactorDoOnExample {public static void main(String[] args) {Flux<Integer> source = Flux.range(1, 5);source.doOnNext(value -> System.out.println("Processing element: " + value)).doOnComplete(() -> System.out.println("Processing complete")).subscribe(System.out::println);}
}

21. 使用 Reactor 的 transform 方法进行操作链重用

transform 方法允许对操作链进行重用,将一系列操作组合为一个新的 Function

import reactor.core.publisher.Flux;import java.util.function.Function;public class ReactorTransformExample {public static void main(String[] args) {Flux<Integer> source = Flux.range(1, 5);// 定义一个操作链Function<Flux<Integer>, Flux<Integer>> customTransform = flux ->flux.filter(x -> x % 2 == 0).map(x -> x * 2);// 使用 transform 应用自定义操作链source.transform(customTransform).subscribe(System.out::println); // 输出: 4, 8}
}

学习打卡:Java学习笔记-day06-响应式编程Reactor API大全(上)

相关文章:

响应式编程Reactor API大全(上)

Reactor 是一个基于响应式编程的库&#xff0c;主要用于构建异步和事件驱动的应用程序。Reactor 提供了丰富的 API&#xff0c;包括创建、转换、过滤、组合等操作符&#xff0c;用于处理异步数据流。以下是一些 Reactor 的主要 API 示例&#xff1a; pom依赖 <dependencyMan…...

vue3自定义指令

一个自定义指令由一个包含类似组件生命周期钩子的对象来定义。钩子函数会接收到指令所绑定元素作为其参数。 页面内创建自定义指令 下面是一个自定义指令的例子&#xff0c;当一个 input 元素被 Vue 插入到 DOM 中后&#xff0c;它会被自动聚焦&#xff1a; <script setu…...

ECharts 多季度连续显示到一个图中。

效果图 二.相关option 以下option可以复制到 echarts的编辑器 进行查看修改 const site test1; const site2 test2;const qtrlyOption function (data: any, titleText: string): any {//获取最大值 。最大最小值的目的是&#xff1a;使左右里边的所有bar使用同一个指标let …...

【Microsoft Copilot】手机端发布 ——GPT-4, DALL-E3 免费用

Microsoft Copilot 关于Microsoft CopilotMicrosoft Copilot 的特点1. 可以在手机端使用&#xff1a;2. 可以免费使用GPT-4。3. 可以无限制地使用GPT-4。4. 可以使用DALL-E3生成图片。5. 搜索功能6. 图像识别 Microsoft Copilot的缺点和注意事项1. 非常容易报错2. 不支持长篇聊…...

[蓝桥杯 2013 省 AB] 错误票据

题目背景 某涉密单位下发了某种票据&#xff0c;并要在年终全部收回。 题目描述 每张票据有唯一的 ID 号&#xff0c;全年所有票据的 ID 号是连续的&#xff0c;但 ID 的开始数码是随机选定的。因为工作人员疏忽&#xff0c;在录入 ID 号的时候发生了一处错误&#xff0c;造…...

IDEA GitHub令牌原理(Personal Access Token)

1.IDEA的add github account 是什么原理&#xff1f; 在IntelliJ IDEA中添加GitHub账户&#xff0c;主要是为了让IDEA能够与GitHub进行交互&#xff0c;如克隆GitHub上的仓库&#xff0c;提交代码到GitHub等。其基本原理如下&#xff1a; 用户在IDEA中输入GitHub的用户名和密…...

[开发语言][python][c++]:C++中的this指针和Python中的Self -- 26岁生日

C中的this指针和Python中的Self 1. python中的Self2. C中的this指针3. C中的this指针和Python中self的异同点&#xff1a; 以朋友的新岁祝福开篇&#xff0c;祝笔者也祝大家☺️&#xff1a; 一岁一礼 一寸欢喜且喜且乐 且以永日​ From VardoZ癸卯年十一月廿六(兔年)之…...

Android Traceview 定位卡顿问题

Traceview 是一个 Android 性能分析工具&#xff0c;用于时间性能分析&#xff0c;主要帮助开发者了解应用程序中各个方法的执行时间和调用关系。Traceview 可以通过图形化界面查看应用程序的代码执行细节&#xff0c;包括每个方法的调用次数、方法调用的时间消耗、方法调用堆栈…...

第三方 Cookie 被禁用?企业该如何实现用户精准运营和管理?

从 1 月 4 日开始&#xff0c;谷歌 Chrome 浏览器将逐步禁用第三方 Cookie 。作为全球最大的浏览器之一&#xff0c;Chrome 的这一动作无疑将引发行业内的重大变革。一直以来&#xff0c;第三方 Cookie 都是网络营销和广告的重要工具。然而&#xff0c;随着人们对隐私保护的日益…...

Autosar PNC网络管理配置(2)-基于ETAS软件

文章目录 BswM初始化PNC对PDU的控制BswMModeRequestPortBswMModeConditionBswMLogicalExpressionBswMRuleBswMActionListEcuMEcuMWakeupSourceEcuMShutdownCauseEcuMRbAlSwitchOffCalloutEcuMRbOnGoOff...

【SpringMVC快速使用】1.@RestController @RequestMapping 2.logback的使用

背景&#xff1a;为何从这个最简单的 例子写起呢&#xff1f; 那是因为我们的管理后台之类的都是别人写的&#xff0c;我也听说了大家说&#xff1a;只用Post请求就足够了&#xff0c;但是却发现&#xff0c;在浏览器中测试时&#xff0c;默认是GET请求&#xff0c;如果直接写…...

C2593 operator << 不明确

错误 C2593 “operator <<” 不明确&#xff0c;通常出现在C代码中&#xff0c;当你尝试使用<<运算符&#xff08;通常用于输出或位移运算&#xff09;时&#xff0c;编译器无法确定使用哪个重载版本的运算符。这个错误可能由几个原因引起&#xff1a; 多个重载冲突…...

vue:使用【3.0】:条件模块

一、条件层级效果图 二、代码 <template><ContentWrap><!-- 添加条件分支:level1 --><div class"btnBox" v-if"isEdit"><el-button type"primary" click"add">添加条件分支</el-button></div…...

Kafka与RabbitMQ的区别

消息队列介绍 消息队列&#xff08;Message Queue&#xff09;是一种在分布式系统中进行异步通信的机制。它允许一个或多个生产者在发送消息时暂时将消息存储在队列中&#xff0c;然后由一个或多个消费者按顺序读取并处理这些消息。 消息队列具有以下特点&#xff1a; 异步通…...

C++力扣题目538--把二叉搜索树转换为累加树

给出二叉 搜索 树的根节点&#xff0c;该树的节点值各不相同&#xff0c;请你将其转换为累加树&#xff08;Greater Sum Tree&#xff09;&#xff0c;使每个节点 node 的新值等于原树中大于或等于 node.val 的值之和。 提醒一下&#xff0c;二叉搜索树满足下列约束条件&#…...

曲线生成 | 图解贝塞尔曲线生成原理(附ROS C++/Python/Matlab仿真)

目录 0 专栏介绍1 贝塞尔曲线的应用2 图解贝塞尔曲线3 贝塞尔曲线的性质4 算法仿真4.1 ROS C仿真4.2 Python仿真4.3 Matlab仿真 0 专栏介绍 &#x1f525;附C/Python/Matlab全套代码&#x1f525;课程设计、毕业设计、创新竞赛必备&#xff01;详细介绍全局规划(图搜索、采样法…...

【一万字干货】一篇给你讲清楚智慧城市——附送智慧系列开发项目合集

智慧城市的概念 智慧城市&#xff08;Smart City&#xff09;起源于传媒领域&#xff0c;是指利用各种信息技术或创新概念&#xff0c;将城市的系统和服务打通、集成&#xff0c;以提升资源运用的效率&#xff0c;优化城市管理和服务&#xff0c;以及改善市民生活质量。 中国…...

关于如何禁用、暂停或退出OneDrive等操作,看这篇文件就够了

​想知道如何禁用OneDrive?你可以暂停OneDrive的文件同步,退出应用程序,阻止它在启动时打开,或者永远从你的机器上删除该应用程序。我们将向你展示如何在Windows计算机上完成所有这些操作。 如何在Windows上关闭OneDrive 有多种方法可以防止OneDrive在你的电脑上妨碍你。…...

Vue3-46-Pinia-获取全局状态变量的方式

使用说明 在 Pinia 中&#xff0c;获取状态变量的方式非常的简单 &#xff1a; 就和使用对象一样。 使用思路 &#xff1a; 1、导入Store&#xff1b;2、声明Store对象&#xff1b;3、使用对象。 在逻辑代码中使用 但是 Option Store 和 Setup Store 两种方式定义的全局状态变量…...

数据库——DAY1(Linux上安装MySQL8.0.35(网络仓库安装))

一、环境部署 1、Red Hat Enterprise Linux 9.3 64 位 2、删除之前安装过本地镜像版本的MySQL软件&#xff08;以前未安装过&#xff0c;请跳过此步骤&#xff09; [rootlocalhost ~]# dnf remove mysql-server -y [rootlocalhost ~]# rm -rf /var/lib/mysql [rootlocalhost …...

系统设计 --- MongoDB亿级数据查询优化策略

系统设计 --- MongoDB亿级数据查询分表策略 背景Solution --- 分表 背景 使用audit log实现Audi Trail功能 Audit Trail范围: 六个月数据量: 每秒5-7条audi log&#xff0c;共计7千万 – 1亿条数据需要实现全文检索按照时间倒序因为license问题&#xff0c;不能使用ELK只能使用…...

质量体系的重要

质量体系是为确保产品、服务或过程质量满足规定要求&#xff0c;由相互关联的要素构成的有机整体。其核心内容可归纳为以下五个方面&#xff1a; &#x1f3db;️ 一、组织架构与职责 质量体系明确组织内各部门、岗位的职责与权限&#xff0c;形成层级清晰的管理网络&#xf…...

基于数字孪生的水厂可视化平台建设:架构与实践

分享大纲&#xff1a; 1、数字孪生水厂可视化平台建设背景 2、数字孪生水厂可视化平台建设架构 3、数字孪生水厂可视化平台建设成效 近几年&#xff0c;数字孪生水厂的建设开展的如火如荼。作为提升水厂管理效率、优化资源的调度手段&#xff0c;基于数字孪生的水厂可视化平台的…...

04-初识css

一、css样式引入 1.1.内部样式 <div style"width: 100px;"></div>1.2.外部样式 1.2.1.外部样式1 <style>.aa {width: 100px;} </style> <div class"aa"></div>1.2.2.外部样式2 <!-- rel内表面引入的是style样…...

WordPress插件:AI多语言写作与智能配图、免费AI模型、SEO文章生成

厌倦手动写WordPress文章&#xff1f;AI自动生成&#xff0c;效率提升10倍&#xff01; 支持多语言、自动配图、定时发布&#xff0c;让内容创作更轻松&#xff01; AI内容生成 → 不想每天写文章&#xff1f;AI一键生成高质量内容&#xff01;多语言支持 → 跨境电商必备&am…...

Hive 存储格式深度解析:从 TextFile 到 ORC,如何选对数据存储方案?

在大数据处理领域&#xff0c;Hive 作为 Hadoop 生态中重要的数据仓库工具&#xff0c;其存储格式的选择直接影响数据存储成本、查询效率和计算资源消耗。面对 TextFile、SequenceFile、Parquet、RCFile、ORC 等多种存储格式&#xff0c;很多开发者常常陷入选择困境。本文将从底…...

C++使用 new 来创建动态数组

问题&#xff1a; 不能使用变量定义数组大小 原因&#xff1a; 这是因为数组在内存中是连续存储的&#xff0c;编译器需要在编译阶段就确定数组的大小&#xff0c;以便正确地分配内存空间。如果允许使用变量来定义数组的大小&#xff0c;那么编译器就无法在编译时确定数组的大…...

A2A JS SDK 完整教程:快速入门指南

目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库&#xff…...

MySQL:分区的基本使用

目录 一、什么是分区二、有什么作用三、分类四、创建分区五、删除分区 一、什么是分区 MySQL 分区&#xff08;Partitioning&#xff09;是一种将单张表的数据逻辑上拆分成多个物理部分的技术。这些物理部分&#xff08;分区&#xff09;可以独立存储、管理和优化&#xff0c;…...

6个月Python学习计划 Day 16 - 面向对象编程(OOP)基础

第三周 Day 3 &#x1f3af; 今日目标 理解类&#xff08;class&#xff09;和对象&#xff08;object&#xff09;的关系学会定义类的属性、方法和构造函数&#xff08;init&#xff09;掌握对象的创建与使用初识封装、继承和多态的基本概念&#xff08;预告&#xff09; &a…...