SpringBoot响应式编程(1)Reactor核心
一、概述
1.1介绍
Reactor 是一个用于JVM的完全非阻塞的响应式编程框架,Webflux 底层使用的也是该框架,其通过流的方式实现了异步相应,具备高效的需求管理(即对 “背压(backpressure)”的控制)能力。它与 Java 8 函数式 API 直接集成,比如 CompletableFuture, Stream, 以及 Duration。它提供了异步序列 API Flux(用于[N]个元素)和 Mono(用于 [0|1]个元素),并完全遵循和实现了“响应式扩展规范”(Reactive Extensions Specification)。
Reactor 的 reactor-ipc 组件还支持非阻塞的进程间通信(inter-process communication, IPC)。 Reactor IPC 为 HTTP(包括 Websockets)、TCP 和 UDP 提供了支持背压的网络引擎,从而适合 应用于微服务架构。并且完整支持响应式编解码(reactive encoding and decoding)。
1.2特性
Reactive Streams是JVM面向流的库的标准和规范
1、处理可能无限数量的元素
2、有序
3、在组件之间异步传递元素
4、强制性非阻塞,背压模式
1.3相关概念
Publisher:发布者;产生数据流 Subscriber:
订阅者; 消费数据流 Subscription:
订阅关系; 订阅关系是发布者和订阅者之间的关键接口。订阅者通过订阅来表示对发布者产生的数据的兴趣。订阅者可以请求一定数量的元素,也可以取消订阅。
Processor:处理器; 处理器是同时实现了发布者和订阅者接口的组件。它可以接收来自一个发布者的数据,进行处理,并将结果发布给下一个订阅者。处理器在Reactor中充当中间环节,代表一个处理阶段,允许你在数据流中进行转换、过滤和其他操作。
命令式编程:全自定义
响应式编程/声明式编程: 声明流、说清楚要干什么、最终结果是要怎样
响应式编程:
1、底层:基于数据缓冲队列 + 消息驱动模型 + 异步回调机制
2、编码:流式编程 + 链式调用 + 声明式API
3、效果:优雅全异步 + 消息实时处理 + 高吞吐量 + 占用少量资源
线程池、DataBuffer
1.4演示demo
Publisher 数据发布者
Publisher 是数据的发布者,它是一个函数式接口,其中只有一个方法,该方法可以配置其对应的消费者,源码如下:
@FunctionalInterfacepublic static interface Publisher<T> {public void subscribe(Subscriber<? super T> subscriber);}
Subscriber 数据消费者
Subscriber 负责消费数据,其内部定义了 4 个方法,源码如下:
public static interface Subscriber<T> {public void onSubscribe(Subscription subscription);public void onNext(T item);public void onError(Throwable throwable);public void onComplete();}
onSubscribe():当与消费者绑定成功时调用该方法;
onNext():当接收到一条发布数据时调用该方法;
onError():当发布者或消费者发生异常时调用该方法;
onComplete():当发布者关闭且所有数据已经被全部消费后调用该方法;
Subscription 订阅关系
Subscription 定义了发布者和消费者的订阅关系,可以理解为两者之间的通道,定义源码如下:
public static interface Subscription {public void request(long n);public void cancel();}
request():该方法用于向通道中请求 n 个数据进行处理;
cancel():该方法用于取消发布者和消费者的绑定关系;
Processor 中间处理器
Processor 是数据发布者与消费者中间的处理器,可以对发布者发布的数据进行预处理后再发送给消费者进行消费。
实际上其既是发布者,又是消费者,即在两者中间进行了一次数据的处理转发,其源码如下:
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {}
消息发布订阅示例代码
package com.yanyu.reactor.flow;import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;public class FlowDemo1 {//定义流中间操作处理器; 只用写订阅者的接口static class MyProcessor extends SubmissionPublisher<String> implements Flow.Processor<String,String> {private Flow.Subscription subscription; //保存绑定关系@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println("processor订阅绑定完成");this.subscription = subscription;subscription.request(1); //找上游要一个数据}@Override //数据到达,触发这个回调public void onNext(String item) {System.out.println("processor拿到数据:"+item);//再加工item += ":哈哈";submit(item);//把我加工后的数据发出去subscription.request(1); //再要新数据}@Overridepublic void onError(Throwable throwable) {// 产生异常后直接取消订阅this.subscription.cancel();}@Overridepublic void onComplete() {// 发布者所有数据全部被接收,且发布者已经关闭System.out.println("数据接收完毕~");}}public static void main(String[] args) throws InterruptedException {//1、定义一个发布者; 发布数据;SubmissionPublisher<String> publisher = new SubmissionPublisher<>();//2、定一个中间操作: 给每个元素加个 哈哈 前缀MyProcessor myProcessor1 = new MyProcessor();MyProcessor myProcessor2 = new MyProcessor();MyProcessor myProcessor3 = new MyProcessor();//3、定义一个订阅者; 订阅者感兴趣发布者的数据;Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {private Flow.Subscription subscription;@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println(Thread.currentThread()+"订阅开始了:"+subscription);this.subscription = subscription;// 向数据发布者请求数据( //从上游请求一个数据)this.subscription.request(1);}@Overridepublic void onNext(String item) {System.out.println("接收到消息>>>" + item);// 接收数据后可以继续接收或取消订阅if(item.equals("7")){subscription.cancel(); //取消订阅}else {subscription.request(1);}}@Overridepublic void onError(Throwable throwable) {// 产生异常后直接取消订阅this.subscription.cancel();}@Overridepublic void onComplete() {// 发布者所有数据全部被接收,且发布者已经关闭System.out.println("数据接收完毕~");}};// 将订阅者注册到发布者//4、绑定发布者和订阅者publisher.subscribe(myProcessor1); //此时处理器相当于订阅者myProcessor1.subscribe(myProcessor2); //此时处理器相当于发布者myProcessor2.subscribe(myProcessor3);myProcessor3.subscribe(subscriber); //链表关系绑定出责任链。//绑定操作;就是发布者,记住了所有订阅者都有谁,有数据后,给所有订阅者把数据推送过去。// 发布消息for (int i = 0; i < 10; i++) {// 发送数据if(i == 5){
// publisher.closeExceptionally(new RuntimeException("5555"));}else {publisher.submit(String.valueOf(i));}//publisher发布的所有数据在它的buffer区;//中断
// publisher.closeExceptionally();}// 关闭发布者publisher.close();Thread.sleep(20000);// 维持程序保持开启while (true) {}}
}
Backpressure 回压
Backpressure 回压是指消费能力低于生产能力时,Subscriber 会将 Publisher 发布的数据缓存在 Subscription 中,其长度默认为256,源码如下:
static final int DEFAULT_BUFFER_SIZE = 256;public static int defaultBufferSize() {return DEFAULT_BUFFER_SIZE;}
当 Subscription 存满时,生产者将根据消费者的消费能力动态的调整数据发布的速度,以实现消费者对生产者的反向控制。
二、Reactor – Mono/Flux API
2.1概述
Reactor 这样的响应式库的目标就是要弥补上述“经典”的 JVM 异步方式所带来的不足, 此外还会关注一下几个方面:
- 可编排性(Composability) 以及 可读性(Readability)
- 使用丰富的 操作符 来处理形如 流 的数据
- 在 订阅(subscribe) 之前什么都不会发生
- 背压(backpressure) 具体来说即 消费者能够反向告知生产者生产内容的速度的能力
- 高层次 (同时也是有高价值的)的抽象,从而达到 并发无关 的效果
Mono和Flux
Mono: 0|1 数据流
Flux: N数据流
响应式流:元素(内容) + 信号(完成/异常);
Mono是一种特殊类型的Publisher。Mono对象表示单个或空值。这意味着它最多只能是onnext() 请求发出一个值,然后以oncomplete()信号终止。如果失败,它只会发出oneror()信号。
Flux是标准的Publisher,代表 0 到 N 异步序列值。这意味着它可以发出 0 对于多个值,onnnext()请求可能是无限值,然后以完成或错误信号终止。
2.2创建 Mono/Flux
常见创建方法
just():使用已知内容创建;
fromIterable():通过可迭代对象创建;
fromStream():从集合流中创建;
range():通过范围迭代创建;
interval():按照从 0 递增的方式自动创建;
delayElements():数据流延时发送方法;
//测试Fluxpublic static void flux() throws IOException {
// Mono: 0|1个元素的流
// Flux: N个元素的流; N>1//发布者发布数据流:源头// 1. 创建 Flux/Mono// 1.1 使用已知内容创建 FluxFlux.just(1, 2, 3, 4, "hello", "world").subscribe(e -> System.out.println("e1 = " + e));// 1.2 通过可迭代对象创建 FluxFlux.fromIterable(Arrays.asList(1, 2, 3, 4, 5)).subscribe(System.out::println);// 1.3 从集合流中创建 FluxFlux.fromStream(Stream.of(1,2,3,4)).subscribe(System.out::println);// 1.4 通过范围迭代创建 FluxFlux.range(0,10).subscribe(System.out::println);// 2. 创建时常用的方法// 2.1 interval() 方法可以用来生成从 0 开始递增的 Long 对象的数据序列Flux.interval(Duration.ofMillis(100))// 限制执行10次.take(10).subscribe(System.out::println);// 2.2 delayElements() 方法延时发送Flux.fromIterable(Arrays.asList(1, 2, 3, 4, 5)).delayElements(Duration.ofMillis(1000L)).subscribe(System.out::println);//3.企业级用法//3.1、多元素的流Flux<Integer> just = Flux.just(1, 2, 3, 4, 5); ////流不消费就没用; 消费:订阅just.subscribe(e -> System.out.println("e1 = " + e));//一个数据流可以有很多消费者just.subscribe(e -> System.out.println("e2 = " + e));//对于每个消费者来说流都是一样的; 广播模式;System.out.println("==========");Flux<Long> flux = Flux.interval(Duration.ofSeconds(1));//每秒产生一个从0开始的递增数字flux.subscribe(System.out::println);System.in.read();}
数据中间操作
map():对序列中的每个元素应用一个函数,并返回一个新的序列;flatMap():对序列中的每个元素应用一个异步函数,并将多个 Publisher 合并成一个序列(将流中的每一个元素看作一个新的流进行处理并按实际生产顺序进行合并)。
flatMapSequential():按订阅顺序进行合并;
filter():过滤序列中的元素,只保留满足条件的元素。
handle():对序列中的每个元素进行处理,可以选择性地发出零个或多个元素。
mapNotNull():对序列中的每个元素应用一个函数,并过滤掉结果为null的元素。
switchIfEmpty():如果序列为空,则切换到另一个Publisher
defaultIfEmpty():如果序列为空,则发出一个默认值。
public static void flux1() throws IOException {
// Mono: 0|1个元素的流
// Flux: N个元素的流; N>1//发布者发布数据流:源头// 作用:对序列中的每个元素应用一个函数,并返回一个新的序列。
// 使用场景:转化数据元素。Flux.just(1, 2, 3).map(i -> i * 2).subscribe(System.out::println);// 输出: 2, 4, 6
// 作用:对序列中的每个元素应用一个异步函数,并将多个 Publisher 合并成一个序列。
// 使用场景:异步处理和合并结果。Flux.just(1, 2, 3).flatMap(i -> Flux.just(i * 2)).subscribe(System.out::println);Flux.just("zhang san", "li si").flatMap(v -> {String[] s = v.split(" ");return Flux.fromArray(s); //把数据包装成多元素流}).log().subscribe();//两个人的名字,按照空格拆分,打印出所有的姓与名
// 作用:过滤序列中的元素,只保留满足条件的元素。
// 使用场景:筛选数据。Flux.just(1, 2, 3, 4).filter(i -> i % 2 == 0).subscribe(System.out::println); // 输出: 2, 4
// 作用:对序列中的每个元素进行处理,可以选择性地发出零个或多个元素。
// 使用场景:复杂的元素转换或过滤。Flux.just(1, 2, 3, 4).handle((i, sink) -> {if (i % 2 == 0) {sink.next(i * 2);}}).subscribe(System.out::println); // 输出: 4, 8
// 作用:对序列中的每个元素应用一个函数,并过滤掉结果为 null 的元素。
// 使用场景:转换数据并去除 null 值。Flux.just(1, 2, 3, 4).mapNotNull(i -> i % 2 == 0 ? i * 2 : null).subscribe(System.out::println); // 输出: 4, 8
// 作用:如果序列为空,则切换到另一个 Publisher。
// 使用场景:提供备用数据源。Flux.empty().switchIfEmpty(Flux.just(1, 2, 3)).subscribe(System.out::println); // 输出: 1, 2, 3
// 作用:如果序列为空,则发出一个默认值。
// 使用场景:提供默认值。
// Flux.empty()
// .defaultIfEmpty(1)
// .subscribe(System.out::println); // 输出: 1
// Flux.range(1, 100)
// .map(x -> {
// return x / 2;
// })
// .subscribe(System.out::println);Flux.just(5, 10).flatMap(x ->Flux.interval(Duration.ofMillis(x * 10))).subscribe(System.out::println);Flux.just(5, 10).flatMapSequential(x ->Flux.interval(Duration.ofMillis(x * 10)).take(x)).subscribe(System.out::println);Flux.just(5, 10).flatMapSequential(x ->Flux.interval(Duration.ofMillis(x * 10)).take(10)).subscribe(System.out::println);}
merge():按照所有流的实际产生顺序进行合并;
mergeSequential():按照流合并的次序进行合并,先消费第一个,在消费第二个;zip():
zip()方法用于将多个流(Publisher)合并成一个流。
zipWith():把流中的元素与另一个流中对应元素进行合并,多余元素将被抛弃;
- 第二个参数可以指定合并的规则
public static void flux2() throws IOException {
// Flux.merge(Flux.interval(Duration.ofMillis(10)).take(5),
// Flux.interval(Duration.ofMillis(10)).take(3))
// .subscribe(System.out::println);Flux.mergeSequential(Flux.interval(Duration.ofMillis(10)).take(5),Flux.interval(Duration.ofMillis(10)).take(3)).subscribe(System.out::println);Flux<Integer> numbers = Flux.just(1, 2, 3);Flux<String> letters = Flux.just("A", "B", "C");Flux.zip(numbers, letters, (number, letter) -> number + letter).subscribe(System.out::println);Flux.just(1, 2).zipWith(Flux.just(3, 4, 5 ), (s1, s2) -> s1 + "," + s2).subscribe(System.out::println);System.in.read();}
concat()
concat()方法用于将两个或多个流按顺序连接在一起。它会等待前一个流完成后,再订阅下一个流。Flux<Integer> flux1 = Flux.just(1, 2, 3); Flux<Integer> flux2 = Flux.just(4, 5, 6);Flux.concat(flux1, flux2).subscribe(System.out::println);
concatWith()
concatWith()方法是concat()的实例方法,用于将当前流与另一个流按顺序连接在一起。Flux<Integer> flux1 = Flux.just(1, 2, 3); Flux<Integer> flux2 = Flux.just(4, 5, 6);flux1.concatWith(flux2).subscribe(System.out::println);
concatMap()
concatMap()方法将流中的每个元素映射为一个新的流,并按顺序连接这些流。Flux.just(1, 2, 3).concatMap(i -> Flux.just(i * 10, i * 20)).subscribe(System.out::println);
buffer():将流中的元素收集为集合;
- 可以传入两个参数,分别为 maxSize 和 skip。其中,maxSize 代表数据切割后每个集合的最大长度;skip 代表每一次切割后切割器切割起点跳跃的元素个数
bufferTimeout():按照时间间隔切割对流中的数据进行收集;
- 可以传入两个参数,分别为 maxSize 和 maxTime。其中,maxSize 代表数据切割后每个集合的最大长度;maxTime 代表切割的最大时间间隔
bufferWhile():当 Predicate 为 true 时才收集当前元素;
bufferUntil():直到 Predicate 为 true 时才收集一次所有元素;
- 当第二个参数为 true 时,满足条件的元素将作为集合的首个元素;若为 false,则满足条件的元素为最后一个元素(默认)
Flux.range(1, 10).buffer(3, 3).subscribe(System.out::println);Flux.interval(Duration.ofMillis(100L)).bufferTimeout(9, Duration.ofMillis(1000L)).subscribe(System.out::println);Flux.range(1, 10).bufferWhile(i -> i % 2 == 0).subscribe(System.out::println);Flux.range(1, 10).bufferUntil(i -> i % 2 == 0, false).subscribe(System.out::println);System.in.read();
take():提取指定数量的元素或按时间间隔提取元素;
takeLast():提取最后 n 个元素;
takeWhile():当 Predicate 返回 true 时才进行提取;
takeUntil():提取元素直到 Predicate 返回 true;skip():跳过指定条数或跳过指定时间间隔;
last():取流中的最后一个元素;
next():取流中的第一个元素;
Flux.range(1, 1000).take(10).subscribe(System.out::println);Flux.interval(Duration.ofMillis(11)).take(Duration.ofMillis(100)).subscribe(System.out::println);Flux.range(1, 1000).takeLast(10).subscribe(System.out::println);Flux.range(1, 1000).takeWhile(i -> i < 20).subscribe(System.out::println);Flux.range(1, 1000).takeUntil(i -> i > 100).subscribe(System.out::println);Flux.just(1, 2, 3, 4, 5, 6, 7).skip(2).subscribe(System.out::println);Flux.interval(Duration.ofMillis(100)).skip(Duration.ofMillis(300)).subscribe(System.out::println);Flux.just(1, 2, 3, 4, 5).last().subscribe(System.out::println);Flux.just(1, 2, 3, 4, 5).next().subscribe(System.out::println););System.in.read();
reduce():对流中数据进行规约聚合;
reduceWith():对流中数据进行规约聚合,第一个参数可以通过 Supplier 设置初始值;groupBy() 通过一个策略 key 将一个 Flux分割为多个组。
Flux.range(1, 100).reduce(Integer::sum).subscribe(System.out::println);Flux.range(1, 100).reduceWith(() -> 100, Integer::sum).subscribe(System.out::println);Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9).groupBy(i -> i % 2 == 0 ? "even" : "odd").concatMap(i -> i.defaultIfEmpty(-1).map(String::valueOf).startWith(i.key())).subscribe(System.out::println);
flatMapMany()
flatMapMany()方法用于将单值流(Mono)转换为多值流(Flux)。它将Mono中的元素映射为一个Publisher,并将这些Publisher的元素合并成一个Flux。Mono<String> mono = Mono.just("Hello");mono.flatMapMany(s -> Flux.just(s.split(""))).subscribe(System.out::println);
transform()
transform()方法用于将流(Flux或Mono)转换为另一种流。它接受一个转换函数,该函数接收一个流并返回一个新的流。这种方法通常用于将通用的转换逻辑封装在一个函数中,以便在多个地方重用。Function<Flux<String>, Flux<String>> transformToUpperCase = flux -> flux.map(String::toUpperCase);Flux<String> flux = Flux.just("a", "b", "c");flux.transform(transformToUpperCase).subscribe(System.out::println);@Test//把流变形成新数据void transform() {AtomicInteger atomic = new AtomicInteger(0);Flux<String> flux = Flux.just("a", "b", "c").transform(values -> {// ++atomicif (atomic.incrementAndGet() == 1) {//如果是:第一次调用,老流中的所有元素转成大写return values.map(String::toUpperCase);} else {//如果不是第一次调用,原封不动返回return values;}});//transform 无defer,不会共享外部变量的值。 无状态转换; 原理,无论多少个订阅者,transform只执行一次//transform 有defer,会共享外部变量的值。 有状态转换; 原理,无论多少个订阅者,每个订阅者transform都只执行一次flux.subscribe(v -> System.out.println("订阅者1:v = " + v));flux.subscribe(v -> System.out.println("订阅者2:v = " + v));}
2.3事件感知
信号类型
在响应式编程中,信号是数据流的核心概念,主要包括正常信号和异常信号。
SignalType枚举类定义了各种信号类型:
- SUBSCRIBE:被订阅。
- REQUEST:请求了 N 个元素。
- CANCEL:流被取消。
- ON_SUBSCRIBE:在订阅的时候。
- ON_NEXT:在元素到达的时候。
- ON_ERROR:在流发生错误的时候。
- ON_COMPLETE:在流正常完成的时候。
- AFTER_TERMINATE:中断以后。
- CURRENT_CONTEXT:当前上下文。
- ON_CONTEXT:感知上下文。
doOnXxxAPI 触发时机* 1、doOnNext:每个数据(流的数据)到达的时候触发 * 2、doOnEach:每个元素(流的数据和信号)到达的时候触发 * 3、doOnRequest: 消费者请求流元素的时候 * 4、doOnError:流发生错误 * 5、doOnSubscribe: 流被订阅的时候 * 6、doOnTerminate: 发送取消/异常信号中断了流 * 7、doOnCancle: 流被取消 * 8、doOnDiscard:流中元素被忽略的时候
//Mono<Integer>: 只有一个Integer//Flux<Integer>: 有很多Integerpublic void fluxDoOn(String[] args) throws IOException, InterruptedException {
// Mono<Integer> just = Mono.just(1);
//
// just.subscribe(System.out::println);//空流: 链式API中,下面的操作符,操作的是上面的流。// 事件感知API:当流发生什么事的时候,触发一个回调,系统调用提前定义好的钩子函数(Hook【钩子函数】);doOnXxx;Flux<Integer> flux = Flux.range(1, 7).delayElements(Duration.ofSeconds(1)).doOnComplete(() -> {System.out.println("流正常结束...");}).doOnCancel(() -> {System.out.println("流已被取消...");}).doOnError(throwable -> {System.out.println("流出错..." + throwable);}).doOnNext(integer -> {System.out.println("doOnNext..." + integer);}); //有一个信号:此时代表完成信号flux.subscribe(new BaseSubscriber<Integer>() {@Overrideprotected void hookOnSubscribe(Subscription subscription) {System.out.println("订阅者和发布者绑定好了:" + subscription);request(1); //背压}@Overrideprotected void hookOnNext(Integer value) {System.out.println("元素到达:" + value);if (value < 5) {request(1);if (value == 3) {int i = 10 / 0;}} else {cancel();//取消订阅}; //继续要元素}@Overrideprotected void hookOnComplete() {System.out.println("数据流结束");}@Overrideprotected void hookOnError(Throwable throwable) {System.out.println("数据流异常");}@Overrideprotected void hookOnCancel() {System.out.println("数据流被取消");}@Overrideprotected void hookFinally(SignalType type) {System.out.println("结束信号:" + type);// 正常、异常
// try {
// //业务
// }catch (Exception e){
//
// }finally {
// //结束
// }}});Thread.sleep(2000);// Flux<Integer> range = Flux.range(1, 7);System.in.read();}
public void doOnXxxx(String[] args) {// 关键:doOnNext:表示流中某个元素到达以后触发我一个回调// doOnXxx要感知某个流的事件,写在这个流的后面,新流的前面Flux.just(1, 2, 3, 4, 5, 6, 7, 0, 5, 6).doOnNext(integer -> System.out.println("元素到达:" + integer)) //元素到达得到时候触发.doOnEach(integerSignal -> { //each封装的详细System.out.println("doOnEach.." + integerSignal);})//1,2,3,4,5,6,7,0.map(integer -> 10 / integer) //10,5,3,.doOnError(throwable -> {System.out.println("数据库已经保存了异常:" + throwable.getMessage());}).map(integer -> 100 / integer).doOnNext(integer -> System.out.println("元素到哈:" + integer)).subscribe(System.out::println);}
2.4Exception 异常处理
-
doOnError():异常监听,监听到异常的处理逻辑; -
onErrorReturn():产生异常时返回消息给订阅者;
Flux.just("1", "2", "3")// concatWith() 可以连接一个新的流.concatWith(Flux.error(new Exception("手动模拟异常..."))).doOnError(Throwable::printStackTrace).onErrorReturn("产生异常,返回 500...").subscribe(System.out::println);
-
subscribe():可以通过传入参数指定异常处理-
参数1:定义正常消费逻辑;
-
参数2:定义异常处理逻辑;
-
参数3:定义消费完成的逻辑;
-
Flux.just("1", "2", "3")// concatWith() 可以连接一个新的流.concatWith(Flux.error(new Exception("手动模拟异常..."))).subscribe(System.out::println,System.err::println,() -> System.out.println("完成..."));
onErrorResume():产生异常后重新产生新的流
Flux.just("1", "2", "3")// concatWith() 可以连接一个新的流.concatWith(Flux.error(new Exception("手动模拟异常..."))).onErrorResume(throwable -> {System.out.println(throwable.getMessage());;return Flux.just("1", "1", "1");}).subscribe(System.out::println);
retry():产生异常后进行重试,参数为重试次数
Flux.just(1).delayElements(Duration.ofSeconds(3)).log().timeout(Duration.ofSeconds(2)).retry(2) // 把流从头到尾重新请求一次.onErrorReturn(2).map(i-> i+"haha").subscribe(v-> System.out.println("v = " + v));System.in.read();
2.5流日志
Flux.range(1, 7)
// .log() //日志 onNext(1~7).filter(i -> i > 3) //挑出>3的元素
// .log() //onNext(4~7).map(i -> "haha-" + i).log() // onNext(haha-4 ~ 7).subscribe(System.out::println);
2.6背压和请求重塑
1、buffer:缓冲
Flux<List<Integer>> flux = Flux.range(1, 10) //原始流10个.buffer(3).log();//缓冲区:缓冲3个元素: 消费一次最多可以拿到三个元素; 凑满数批量发给消费者
//
// //一次发一个,一个一个发;
// 10元素,buffer(3);消费者请求4次,数据消费完成
2、limit:限流
Flux.range(1, 1000).log()//限流触发,看上游是怎么限流获取数据的.limitRate(100) //一次预取30个元素; 第一次 request(100),以后request(75).subscribe();
2.7以编程方式创建序列-Sink
在响应式编程中,
Sink是一种用于手动推送数据到流中的工具。Reactor 提供了Sinks类来创建不同类型的Sink,例如单值的Sink.One、多值的Sink.Many和空值的Sink.Empty等。下面我们将介绍如何使用Sink来以编程方式创建序列。
Sinks.many(); //发送Flux数据。
Sinks.one(); //发送Mono数据
Sinks: 接受器,数据管道,所有数据顺着这个管道往下走的 S
inks.many().unicast(); //单播: 这个管道只能绑定单个订阅者(消费者) Sinks.many().multicast();//多播: 这个管道能绑定多个订阅者
Sinks.many().replay();//重放: 这个管道能重放元素。 是否给后来的订阅者把之前的元素依然发给它;
@Testvoid sinks() throws InterruptedException, IOException {// Flux.create(fluxSink -> {
// fluxSink.next("111")
// })// Sinks.many(); //发送Flux数据。
// Sinks.one(); //发送Mono数据// Sinks: 接受器,数据管道,所有数据顺着这个管道往下走的//Sinks.many().unicast(); //单播: 这个管道只能绑定单个订阅者(消费者)//Sinks.many().multicast();//多播: 这个管道能绑定多个订阅者//Sinks.many().replay();//重放: 这个管道能重放元素。 是否给后来的订阅者把之前的元素依然发给它;// 从头消费还是从订阅的那一刻消费;// Sinks.Many<Object> many = Sinks.many()
// .multicast() //多播
// .onBackpressureBuffer(); //背压队列//默认订阅者,从订阅的那一刻开始接元素//发布者数据重放; 底层利用队列进行缓存之前数据
// Sinks.Many<Object> many = Sinks.many().replay().limit(3);
//
// new Thread(()->{
// for (int i = 0; i < 10; i++) {
// many.tryEmitNext("a-"+i);
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// }
// }).start();
//
//
//
// //订阅
// many.asFlux().subscribe(v-> System.out.println("v1 = " + v));
//
// new Thread(()->{
// try {
// Thread.sleep(5000);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// many.asFlux().subscribe(v-> System.out.println("v2 = " + v));
// }).start();Flux<Integer> cache = Flux.range(1, 10).delayElements(Duration.ofSeconds(1)) //不调缓存默认就是缓存所有.cache(1); //缓存两个元素; 默认全部缓存cache.subscribe();//缓存元素;// 最定义订阅者new Thread(()->{try {Thread.sleep(5000);} catch (InterruptedException e) {throw new RuntimeException(e);}cache.subscribe(v-> System.out.println("v = " + v));}).start();System.in.read();}
2.8自定义元素处理
handle()方法是 Reactor 中一个非常强大的操作符,它允许你同时处理和转换流中的元素。它的功能类似于map()和filter()的结合体,能够在处理元素的同时决定是否将其传递给下游。
import reactor.core.publisher.Flux;public class HandleExample {public static void main(String[] args) {Flux.range(1, 10).handle((value, sink) -> {System.out.println("拿到的值:" + value);if (value % 2 == 0) {sink.next("张三:" + value); // 仅向下游发送偶数}// 可以在这里添加更多的逻辑,例如错误处理或完成信号}).log() // 日志.subscribe(System.out::println,error -> System.err.println("Error: " + error),() -> System.out.println("Completed"));}
}
2.9线程调度
在响应式编程中,调度器(Schedulers)用于控制操作符执行的线程和调度策略。Reactor 提供了一些内置的调度器,帮助你管理并发和异步操作。
常见的调度器类型
- Schedulers.immediate():在当前线程中执行。
- Schedulers.single():在一个单独的线程中执行。
- Schedulers.elastic():在弹性线程池中执行,适用于 I/O 密集型操作。
- Schedulers.parallel():在固定大小的线程池中执行,适用于 CPU 密集型操作。
- Schedulers.boundedElastic():在弹性线程池中执行,适用于需要大量线程的 I/O 密集型操作。
- Schedulers.fromExecutor(Executor executor):使用自定义的
Executor。- Schedulers.newSingle():创建一个新的单线程调度器。
- Schedulers.newParallel():创建一个新的并行调度器。
使用调度器
你可以使用
subscribeOn()和publishOn()方法来指定操作符在不同的调度器上执行。
subscribeOn(Scheduler scheduler):指定订阅操作的调度器。publishOn(Scheduler scheduler):指定接下来操作的调度器。
// 百万数据,8个线程,每个线程处理100,进行分批处理一直处理结束Flux.range(1,10000).buffer(100).parallel(8).runOn(Schedulers.newParallel("yy")).log().flatMap(list->Flux.fromIterable(list)).collectSortedList(Integer::compareTo).subscribe(v-> System.out.println("v = " + v));System.in.read();
2.10Context-API
//Context-API: https://projectreactor.io/docs/core/release/reference/#context@Test //ThreadLocal在响应式编程中无法使用。//响应式中,数据流期间共享数据,Context API: Context:读写 ContextView:只读;void threadlocal(){//支持Context的中间操作Flux.just(1,2,3).transformDeferredContextual((flux,context)->{System.out.println("flux = " + flux);System.out.println("context = " + context);return flux.map(i->i+"==>"+context.get("prefix"));})//上游能拿到下游的最近一次数据.contextWrite(Context.of("prefix","哈哈"))//ThreadLocal共享了数据,上游的所有人能看到; Context由下游传播给上游.subscribe(v-> System.out.println("v = " + v));//以前 命令式编程// controller -- service -- dao//响应式编程 dao(10:数据源) --> service(10) --> controller(10); 从下游反向传播}
相关文章:
SpringBoot响应式编程(1)Reactor核心
一、概述 1.1介绍 Reactor 是一个用于JVM的完全非阻塞的响应式编程框架,Webflux 底层使用的也是该框架,其通过流的方式实现了异步相应,具备高效的需求管理(即对 “背压(backpressure)”的控制)…...
Java后端处理前端字符串与 JSON 数据:安全拼接与转义技巧
在现代 Web 开发中,前后端数据交互是家常便饭。我们经常需要处理前端传递的字符串和 JSON 数据,并在后端进行加工处理后发送到其他服务。本文将以 Spring Boot 为例,探讨如何安全地拼接字符串和 JSON 数据,并介绍如何避免 JSON 特…...
一文搞懂bfs,dfs和高级图算法
你以为BFS(广度优先搜索)和DFS(深度优先搜索)这两种基础算法,简单到小学数学就能搞定?但真的是这样吗?很多人都这么认为,但真的对吗?今天,我们不只是走马观花…...
【Rust光年纪】Rust异步编程利器:异步DNS、高性能Web服务器一网打尽
构建高效网络应用必备:解读Rust异步编程神器 前言 Rust 是一种快速流行的系统编程语言,它以其内存安全和并发性能而闻名。在 Rust 生态系统中,有许多优秀的库和框架可以帮助开发者构建高性能、可靠的应用程序。本文将介绍几个在 Rust 中备受…...
04学生管理系统(栈)
文章目录 预处理菜单结构体主函数函数声明栈操作功能实现 预处理 #define _CRT_SECURE_NO_WARNINGS #include<stdio.h> #include<stdlib.h> #include<windows.h> #include<conio.h>#define OVERFLOW -2 #define FALSE 0 #define TRUE 1 #define OK 1 …...
我们如何在centos上部署批量管理工具ansible
1)我们先准备环境、设备 #我们准备一台服务机 (192.168.61.140) #然后准备几天客户机(192.168.61.141 192.168.61.142)这里我们准备两台2)然后我们在客服务机里面添加域名 vi /etc/hosts #添加如下内容 192.…...
如何评估前端代码审查培训计划的有效性?
评估前端代码审查培训计划的有效性可以通过以下方法: 培训前后测试: 在培训前后对学员进行测试,比较结果以评估知识增长。 学员反馈: 通过问卷调查、访谈或开放式反馈收集学员对培训内容、方式和效果的看法。 参与度:…...
使用nvm切换Node.js版本
一、安装nvm nvm(Node Version Manager)是一个用于管理Node.js版本的工具,它允许你在同一台机器上安装和切换多个Node.js版本。 1.安装nvm https://github.com/coreybutler/nvm-windows 访问以上链接到github去下载 点击releases 下载下图…...
x264 编码器 PSNR算法源码分析
PSNR PSNR(Peak Signal-to-Noise Ratio,峰值信噪比)是一种常用的图像质量评价指标,用于衡量图像或视频的清晰度和质量。PSNR是基于信号的最大可能功率与影响信号的噪声功率之间的比率。在图像处理领域,PSNR通常用来评估图像压缩或图像增强算法的效果。 PSNR的计算公式是…...
开源web版3D展示工具Online3DViewer
Online3DViewer是一个免费且开源的Web解决方案,它允许用户在浏览器中直接预览和探索3D模型。 以下是关于Online3DViewer的详细介绍: 一、基本概述 定义:Online3DViewer是一个在线3D模型查看器,支持多种3D文件格式,用…...
白骑士的Matlab教学实战项目篇 4.2 信号与图像处理项目
系列目录 上一篇:白骑士的Matlab教学实战项目篇 4.1 数据分析与可视化 信号处理和图像处理是 MATLAB 的重要应用领域,广泛应用于医学、工程、科学研究等领域。以下内容将介绍信号滤波与频域分析、图像增强与分割的基本概念和方法,并通过一个…...
复现、并改进open-mmlab的mmpose详细细节
复现open-mmlab的mmpose详细细节 1.配置环境2.数据处理3.训练4.改进mmpose4.1 快速调试技巧4.2 快速定位4.3 改进backbone4.3.1 使用说明4.3.2 改进案例4.3.2.1 复现mmpose原配置文件4.3.2.2 复现开源项目4.3.2.3 修改配置文件4.3.2.4 修改新模型 4.4 添加auxiliary_head4.4.1 …...
编写兼容Python2.x与3.x代码
编写兼容Python2.x与3.x代码 当我们正处于Python2.x到Python3.x的过渡期时,你可能想过是否可以在不修改任何代码的前提下能同时运行在Python2和3中。这看起来还真是一个合理的诉求,但如何开始呢?哪些Python2代码在3.x解释器执行时容易出状况…...
比特币8.12学习问题
疑问:什么是过滤,什么是offset 没有投钱的情况下,怎么用api 公式:单币分配金额 总资金 / 2/ offset/选币数量,其中2 表示多空 买入滑点(Slippage)是指在执行交易订单时,实际成交…...
解析 Vue 中的app.version、 app.provide 与 app.runWithContext :原理、应用与实例剖析
目录 app.provide app.runWithContext app.version 非 VIP 用户能够通过积分下载博文资源 app.provide 在 Vue 3.0 中,app.provide充当着在应用层级提供全局共享数据或者服务的关键角色。 app.provide(key, value) 这一方法接收两个关键参数,其中 …...
Ubuntu server 命令行跑selenium
背景 自动化测试都是在本机win上使用selenium 跑自动化脚本,但是服务器都是命令行的没有web界面 依赖包部署 apt-get install zlib1g-dev zlib1g## 安装谷歌浏览器 ## 跳到底部,选择其他平台 https://www.google.com/chrome/## ubuntu # dpkg -i google-chrome-stable_…...
刚刚,模糊测试平台SFuzz受到行业认可
近日,中国网络安全产业联盟(CCIA)正式发布了“2024年网络安全优秀创新成果大赛-安全严选专题赛”评选结果,开源网安模糊测试平台SFuzz凭借重大创新能力,得到组委会认可,获本次大赛创新产品优胜奖。 2024年网…...
数据结构与算法——DFS(深度优先搜索)
算法介绍: 深度优先搜索(Depth-First Search,简称DFS)是一种用于遍历或搜索树或图的算法。这种算法会尽可能深地搜索图的分支,直到找到目标节点或达到叶节点(没有子节点的节点),然后…...
基于lambda简化设计模式
写在文章开头 本文将演示基于函数式编程的理念,优化设计模式中繁琐的模板化编码开发,以保证用尽可能少的代码做尽可能多的事,希望对你有帮助。 Hi,我是 sharkChili ,是个不断在硬核技术上作死的 java coder ÿ…...
揭秘! 经纬恒润“车路云一体化”方案研发服务背后的科技驱动力
随着高级别智能驾驶技术的飞速发展,自动驾驶与路侧基础设施协同合作已成为行业内的又一热点。我国率先提出以“车路云一体化”为核心的战略布局,国家政策密集出台,地方试点积极推进,行业标准日趋完善,智能网联汽车“车…...
[特殊字符] 智能合约中的数据是如何在区块链中保持一致的?
🧠 智能合约中的数据是如何在区块链中保持一致的? 为什么所有区块链节点都能得出相同结果?合约调用这么复杂,状态真能保持一致吗?本篇带你从底层视角理解“状态一致性”的真相。 一、智能合约的数据存储在哪里…...
第19节 Node.js Express 框架
Express 是一个为Node.js设计的web开发框架,它基于nodejs平台。 Express 简介 Express是一个简洁而灵活的node.js Web应用框架, 提供了一系列强大特性帮助你创建各种Web应用,和丰富的HTTP工具。 使用Express可以快速地搭建一个完整功能的网站。 Expre…...
vscode里如何用git
打开vs终端执行如下: 1 初始化 Git 仓库(如果尚未初始化) git init 2 添加文件到 Git 仓库 git add . 3 使用 git commit 命令来提交你的更改。确保在提交时加上一个有用的消息。 git commit -m "备注信息" 4 …...
多场景 OkHttpClient 管理器 - Android 网络通信解决方案
下面是一个完整的 Android 实现,展示如何创建和管理多个 OkHttpClient 实例,分别用于长连接、普通 HTTP 请求和文件下载场景。 <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas…...
iPhone密码忘记了办?iPhoneUnlocker,iPhone解锁工具Aiseesoft iPhone Unlocker 高级注册版分享
平时用 iPhone 的时候,难免会碰到解锁的麻烦事。比如密码忘了、人脸识别 / 指纹识别突然不灵,或者买了二手 iPhone 却被原来的 iCloud 账号锁住,这时候就需要靠谱的解锁工具来帮忙了。Aiseesoft iPhone Unlocker 就是专门解决这些问题的软件&…...
【服务器压力测试】本地PC电脑作为服务器运行时出现卡顿和资源紧张(Windows/Linux)
要让本地PC电脑作为服务器运行时出现卡顿和资源紧张的情况,可以通过以下几种方式模拟或触发: 1. 增加CPU负载 运行大量计算密集型任务,例如: 使用多线程循环执行复杂计算(如数学运算、加密解密等)。运行图…...
c#开发AI模型对话
AI模型 前面已经介绍了一般AI模型本地部署,直接调用现成的模型数据。这里主要讲述讲接口集成到我们自己的程序中使用方式。 微软提供了ML.NET来开发和使用AI模型,但是目前国内可能使用不多,至少实践例子很少看见。开发训练模型就不介绍了&am…...
成都鼎讯硬核科技!雷达目标与干扰模拟器,以卓越性能制胜电磁频谱战
在现代战争中,电磁频谱已成为继陆、海、空、天之后的 “第五维战场”,雷达作为电磁频谱领域的关键装备,其干扰与抗干扰能力的较量,直接影响着战争的胜负走向。由成都鼎讯科技匠心打造的雷达目标与干扰模拟器,凭借数字射…...
springboot整合VUE之在线教育管理系统简介
可以学习到的技能 学会常用技术栈的使用 独立开发项目 学会前端的开发流程 学会后端的开发流程 学会数据库的设计 学会前后端接口调用方式 学会多模块之间的关联 学会数据的处理 适用人群 在校学生,小白用户,想学习知识的 有点基础,想要通过项…...
Python 训练营打卡 Day 47
注意力热力图可视化 在day 46代码的基础上,对比不同卷积层热力图可视化的结果 import torch import torch.nn as nn import torch.optim as optim from torchvision import datasets, transforms from torch.utils.data import DataLoader import matplotlib.pypl…...
