当前位置: 首页 > news >正文

SpringBoot响应式编程(1)Reactor核心

一、概述

1.1介绍

Reactor 是一个用于JVM的完全非阻塞的响应式编程框架,Webflux 底层使用的也是该框架,其通过流的方式实现了异步相应,具备高效的需求管理(即对 “背压(backpressure)”的控制)能力。它与 Java 8 函数式 API 直接集成,比如 CompletableFuture, Stream, 以及 Duration。它提供了异步序列 API Flux(用于[N]个元素)和 Mono(用于 [0|1]个元素),并完全遵循和实现了“响应式扩展规范”(Reactive Extensions Specification)。

Reactor 的 reactor-ipc 组件还支持非阻塞的进程间通信(inter-process communication, IPC)。 Reactor IPC 为 HTTP(包括 Websockets)、TCP 和 UDP 提供了支持背压的网络引擎,从而适合 应用于微服务架构。并且完整支持响应式编解码(reactive encoding and decoding)。

1.2特性

Reactive Streams是JVM面向流的库的标准和规范

1、处理可能无限数量的元素

2、有序

3、在组件之间异步传递元素

4、强制性非阻塞,背压模式

1.3相关概念

Publisher:发布者;产生数据流 Subscriber:

订阅者; 消费数据流 Subscription:

订阅关系; 订阅关系是发布者和订阅者之间的关键接口。订阅者通过订阅来表示对发布者产生的数据的兴趣。订阅者可以请求一定数量的元素,也可以取消订阅。

Processor:处理器; 处理器是同时实现了发布者和订阅者接口的组件。它可以接收来自一个发布者的数据,进行处理,并将结果发布给下一个订阅者。处理器在Reactor中充当中间环节,代表一个处理阶段,允许你在数据流中进行转换、过滤和其他操作。

命令式编程:全自定义

响应式编程/声明式编程: 声明流、说清楚要干什么、最终结果是要怎样

响应式编程:

1、底层:基于数据缓冲队列 + 消息驱动模型 + 异步回调机制

2、编码:流式编程 + 链式调用 + 声明式API

3、效果:优雅全异步 + 消息实时处理 + 高吞吐量 + 占用少量资源

线程池、DataBuffer

1.4演示demo

Publisher 数据发布者

Publisher 是数据的发布者,它是一个函数式接口,其中只有一个方法,该方法可以配置其对应的消费者,源码如下:

    @FunctionalInterfacepublic static interface Publisher<T> {public void subscribe(Subscriber<? super T> subscriber);}

Subscriber 数据消费者

Subscriber 负责消费数据,其内部定义了 4 个方法,源码如下:

public static interface Subscriber<T> {public void onSubscribe(Subscription subscription);public void onNext(T item);public void onError(Throwable throwable);public void onComplete();}

onSubscribe():当与消费者绑定成功时调用该方法;

onNext():当接收到一条发布数据时调用该方法;

onError():当发布者或消费者发生异常时调用该方法;

onComplete():当发布者关闭且所有数据已经被全部消费后调用该方法;

Subscription 订阅关系

Subscription 定义了发布者和消费者的订阅关系,可以理解为两者之间的通道,定义源码如下:

    public static interface Subscription {public void request(long n);public void cancel();}

request():该方法用于向通道中请求 n 个数据进行处理;

cancel():该方法用于取消发布者和消费者的绑定关系;

Processor 中间处理器

Processor 是数据发布者与消费者中间的处理器,可以对发布者发布的数据进行预处理后再发送给消费者进行消费。

实际上其既是发布者,又是消费者,即在两者中间进行了一次数据的处理转发,其源码如下:

    public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {}

消息发布订阅示例代码

package com.yanyu.reactor.flow;import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;public class FlowDemo1 {//定义流中间操作处理器; 只用写订阅者的接口static class MyProcessor extends SubmissionPublisher<String>  implements Flow.Processor<String,String> {private Flow.Subscription subscription; //保存绑定关系@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println("processor订阅绑定完成");this.subscription = subscription;subscription.request(1); //找上游要一个数据}@Override //数据到达,触发这个回调public void onNext(String item) {System.out.println("processor拿到数据:"+item);//再加工item += ":哈哈";submit(item);//把我加工后的数据发出去subscription.request(1); //再要新数据}@Overridepublic void onError(Throwable throwable) {// 产生异常后直接取消订阅this.subscription.cancel();}@Overridepublic void onComplete() {// 发布者所有数据全部被接收,且发布者已经关闭System.out.println("数据接收完毕~");}}public static void main(String[] args) throws InterruptedException {//1、定义一个发布者; 发布数据;SubmissionPublisher<String> publisher = new SubmissionPublisher<>();//2、定一个中间操作:  给每个元素加个 哈哈 前缀MyProcessor myProcessor1 = new MyProcessor();MyProcessor myProcessor2 = new MyProcessor();MyProcessor myProcessor3 = new MyProcessor();//3、定义一个订阅者; 订阅者感兴趣发布者的数据;Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {private Flow.Subscription subscription;@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println(Thread.currentThread()+"订阅开始了:"+subscription);this.subscription = subscription;// 向数据发布者请求数据( //从上游请求一个数据)this.subscription.request(1);}@Overridepublic void onNext(String item) {System.out.println("接收到消息>>>" + item);// 接收数据后可以继续接收或取消订阅if(item.equals("7")){subscription.cancel(); //取消订阅}else {subscription.request(1);}}@Overridepublic void onError(Throwable throwable) {// 产生异常后直接取消订阅this.subscription.cancel();}@Overridepublic void onComplete() {// 发布者所有数据全部被接收,且发布者已经关闭System.out.println("数据接收完毕~");}};// 将订阅者注册到发布者//4、绑定发布者和订阅者publisher.subscribe(myProcessor1); //此时处理器相当于订阅者myProcessor1.subscribe(myProcessor2); //此时处理器相当于发布者myProcessor2.subscribe(myProcessor3);myProcessor3.subscribe(subscriber);  //链表关系绑定出责任链。//绑定操作;就是发布者,记住了所有订阅者都有谁,有数据后,给所有订阅者把数据推送过去。// 发布消息for (int i = 0; i < 10; i++) {// 发送数据if(i == 5){
//                publisher.closeExceptionally(new RuntimeException("5555"));}else {publisher.submit(String.valueOf(i));}//publisher发布的所有数据在它的buffer区;//中断
//            publisher.closeExceptionally();}// 关闭发布者publisher.close();Thread.sleep(20000);// 维持程序保持开启while (true) {}}
}

Backpressure 回压

Backpressure 回压是指消费能力低于生产能力时,Subscriber 会将 Publisher 发布的数据缓存在 Subscription 中,其长度默认为256,源码如下:

    static final int DEFAULT_BUFFER_SIZE = 256;public static int defaultBufferSize() {return DEFAULT_BUFFER_SIZE;}

当 Subscription 存满时,生产者将根据消费者的消费能力动态的调整数据发布的速度,以实现消费者对生产者的反向控制。

二、Reactor – Mono/Flux API

2.1概述

Reactor 这样的响应式库的目标就是要弥补上述“经典”的 JVM 异步方式所带来的不足, 此外还会关注一下几个方面:

  • 可编排性(Composability) 以及 可读性(Readability)
  • 使用丰富的 操作符 来处理形如 的数据
  • 订阅(subscribe) 之前什么都不会发生
  • 背压(backpressure) 具体来说即 消费者能够反向告知生产者生产内容的速度的能力
  • 高层次 (同时也是有高价值的)的抽象,从而达到 并发无关 的效果

Mono和Flux

Mono: 0|1 数据流

Flux: N数据流

响应式流:元素(内容) + 信号(完成/异常);

Mono是一种特殊类型的Publisher。Mono对象表示单个或空值。这意味着它最多只能是onnext() 请求发出一个值,然后以oncomplete()信号终止。如果失败,它只会发出oneror()信号。

Flux是标准的Publisher,代表 0 到 N 异步序列值。这意味着它可以发出 0 对于多个值,onnnext()请求可能是无限值,然后以完成或错误信号终止。

2.2创建 Mono/Flux

常见创建方法

  • just():使用已知内容创建;

  • fromIterable():通过可迭代对象创建;

  • fromStream():从集合流中创建;

  • range():通过范围迭代创建;

  • interval():按照从 0 递增的方式自动创建;

  • delayElements():数据流延时发送方法;

  //测试Fluxpublic static   void  flux() throws IOException {
//        Mono: 0|1个元素的流
//        Flux: N个元素的流;  N>1//发布者发布数据流:源头// 1. 创建 Flux/Mono// 1.1 使用已知内容创建 FluxFlux.just(1, 2, 3, 4, "hello", "world").subscribe(e -> System.out.println("e1 = " + e));// 1.2 通过可迭代对象创建 FluxFlux.fromIterable(Arrays.asList(1, 2, 3, 4, 5)).subscribe(System.out::println);// 1.3 从集合流中创建 FluxFlux.fromStream(Stream.of(1,2,3,4)).subscribe(System.out::println);// 1.4 通过范围迭代创建 FluxFlux.range(0,10).subscribe(System.out::println);// 2. 创建时常用的方法// 2.1 interval() 方法可以用来生成从 0 开始递增的 Long 对象的数据序列Flux.interval(Duration.ofMillis(100))// 限制执行10次.take(10).subscribe(System.out::println);// 2.2 delayElements() 方法延时发送Flux.fromIterable(Arrays.asList(1, 2, 3, 4, 5)).delayElements(Duration.ofMillis(1000L)).subscribe(System.out::println);//3.企业级用法//3.1、多元素的流Flux<Integer> just = Flux.just(1, 2, 3, 4, 5); ////流不消费就没用; 消费:订阅just.subscribe(e -> System.out.println("e1 = " + e));//一个数据流可以有很多消费者just.subscribe(e -> System.out.println("e2 = " + e));//对于每个消费者来说流都是一样的;  广播模式;System.out.println("==========");Flux<Long> flux = Flux.interval(Duration.ofSeconds(1));//每秒产生一个从0开始的递增数字flux.subscribe(System.out::println);System.in.read();}

数据中间操作

  • map():对序列中的每个元素应用一个函数,并返回一个新的序列;
  • flatMap():对序列中的每个元素应用一个异步函数,并将多个 Publisher 合并成一个序列(将流中的每一个元素看作一个新的流进行处理并按实际生产顺序进行合并)。

  • flatMapSequential():按订阅顺序进行合并;

  • filter() :过滤序列中的元素,只保留满足条件的元素。

  • handle():对序列中的每个元素进行处理,可以选择性地发出零个或多个元素。

  • mapNotNull():对序列中的每个元素应用一个函数,并过滤掉结果为 null 的元素。

  • switchIfEmpty():如果序列为空,则切换到另一个 Publisher

  • defaultIfEmpty():如果序列为空,则发出一个默认值。

public static   void  flux1() throws IOException {
//        Mono: 0|1个元素的流
//        Flux: N个元素的流;  N>1//发布者发布数据流:源头//        作用:对序列中的每个元素应用一个函数,并返回一个新的序列。
//        使用场景:转化数据元素。Flux.just(1, 2, 3).map(i -> i * 2).subscribe(System.out::println);// 输出: 2, 4, 6
//        作用:对序列中的每个元素应用一个异步函数,并将多个 Publisher 合并成一个序列。
//        使用场景:异步处理和合并结果。Flux.just(1, 2, 3).flatMap(i -> Flux.just(i * 2)).subscribe(System.out::println);Flux.just("zhang san", "li si").flatMap(v -> {String[] s = v.split(" ");return Flux.fromArray(s); //把数据包装成多元素流}).log().subscribe();//两个人的名字,按照空格拆分,打印出所有的姓与名
//        作用:过滤序列中的元素,只保留满足条件的元素。
//        使用场景:筛选数据。Flux.just(1, 2, 3, 4).filter(i -> i % 2 == 0).subscribe(System.out::println); // 输出: 2, 4
//        作用:对序列中的每个元素进行处理,可以选择性地发出零个或多个元素。
//        使用场景:复杂的元素转换或过滤。Flux.just(1, 2, 3, 4).handle((i, sink) -> {if (i % 2 == 0) {sink.next(i * 2);}}).subscribe(System.out::println); // 输出: 4, 8
//        作用:对序列中的每个元素应用一个函数,并过滤掉结果为 null 的元素。
//        使用场景:转换数据并去除 null 值。Flux.just(1, 2, 3, 4).mapNotNull(i -> i % 2 == 0 ? i * 2 : null).subscribe(System.out::println); // 输出: 4, 8
//        作用:如果序列为空,则切换到另一个 Publisher。
//        使用场景:提供备用数据源。Flux.empty().switchIfEmpty(Flux.just(1, 2, 3)).subscribe(System.out::println); // 输出: 1, 2, 3
//        作用:如果序列为空,则发出一个默认值。
//        使用场景:提供默认值。
//        Flux.empty()
//                .defaultIfEmpty(1)
//                .subscribe(System.out::println); // 输出: 1
//        Flux.range(1, 100)
//                .map(x -> {
//                    return x / 2;
//                })
//                .subscribe(System.out::println);Flux.just(5, 10).flatMap(x ->Flux.interval(Duration.ofMillis(x * 10))).subscribe(System.out::println);Flux.just(5, 10).flatMapSequential(x ->Flux.interval(Duration.ofMillis(x * 10)).take(x)).subscribe(System.out::println);Flux.just(5, 10).flatMapSequential(x ->Flux.interval(Duration.ofMillis(x * 10)).take(10)).subscribe(System.out::println);}
  • merge():按照所有流的实际产生顺序进行合并;

  • mergeSequential():按照流合并的次序进行合并,先消费第一个,在消费第二个;

  • zip():zip() 方法用于将多个流(Publisher)合并成一个流。

  • zipWith():把流中的元素与另一个流中对应元素进行合并,多余元素将被抛弃;

    • 第二个参数可以指定合并的规则
  public static void  flux2() throws IOException {
//        Flux.merge(Flux.interval(Duration.ofMillis(10)).take(5),
//                        Flux.interval(Duration.ofMillis(10)).take(3))
//                .subscribe(System.out::println);Flux.mergeSequential(Flux.interval(Duration.ofMillis(10)).take(5),Flux.interval(Duration.ofMillis(10)).take(3)).subscribe(System.out::println);Flux<Integer> numbers = Flux.just(1, 2, 3);Flux<String> letters = Flux.just("A", "B", "C");Flux.zip(numbers, letters, (number, letter) -> number + letter).subscribe(System.out::println);Flux.just(1, 2).zipWith(Flux.just(3, 4, 5 ), (s1, s2) -> s1 + "," + s2).subscribe(System.out::println);System.in.read();}

concat()

concat() 方法用于将两个或多个流按顺序连接在一起。它会等待前一个流完成后,再订阅下一个流。

Flux<Integer> flux1 = Flux.just(1, 2, 3);
Flux<Integer> flux2 = Flux.just(4, 5, 6);Flux.concat(flux1, flux2).subscribe(System.out::println);

concatWith()

concatWith() 方法是 concat() 的实例方法,用于将当前流与另一个流按顺序连接在一起。

Flux<Integer> flux1 = Flux.just(1, 2, 3);
Flux<Integer> flux2 = Flux.just(4, 5, 6);flux1.concatWith(flux2).subscribe(System.out::println);

concatMap()

concatMap() 方法将流中的每个元素映射为一个新的流,并按顺序连接这些流。

Flux.just(1, 2, 3).concatMap(i -> Flux.just(i * 10, i * 20)).subscribe(System.out::println);

  • buffer():将流中的元素收集为集合;

    • 可以传入两个参数,分别为 maxSize 和 skip。其中,maxSize 代表数据切割后每个集合的最大长度;skip 代表每一次切割后切割器切割起点跳跃的元素个数
  • bufferTimeout():按照时间间隔切割对流中的数据进行收集;

    • 可以传入两个参数,分别为 maxSize 和 maxTime。其中,maxSize 代表数据切割后每个集合的最大长度;maxTime 代表切割的最大时间间隔
  • bufferWhile():当 Predicate 为 true 时才收集当前元素;

  • bufferUntil():直到 Predicate 为 true 时才收集一次所有元素;

    • 当第二个参数为 true 时,满足条件的元素将作为集合的首个元素;若为 false,则满足条件的元素为最后一个元素(默认)
 Flux.range(1, 10).buffer(3, 3).subscribe(System.out::println);Flux.interval(Duration.ofMillis(100L)).bufferTimeout(9, Duration.ofMillis(1000L)).subscribe(System.out::println);Flux.range(1, 10).bufferWhile(i -> i % 2 == 0).subscribe(System.out::println);Flux.range(1, 10).bufferUntil(i -> i % 2 == 0, false).subscribe(System.out::println);System.in.read();
  • take():提取指定数量的元素或按时间间隔提取元素;

  • takeLast():提取最后 n 个元素;

  • takeWhile():当 Predicate 返回 true 时才进行提取;

  • takeUntil():提取元素直到 Predicate 返回 true;

  • skip():跳过指定条数或跳过指定时间间隔;
  • last():取流中的最后一个元素;

  • next():取流中的第一个元素;

 Flux.range(1, 1000).take(10).subscribe(System.out::println);Flux.interval(Duration.ofMillis(11)).take(Duration.ofMillis(100)).subscribe(System.out::println);Flux.range(1, 1000).takeLast(10).subscribe(System.out::println);Flux.range(1, 1000).takeWhile(i -> i < 20).subscribe(System.out::println);Flux.range(1, 1000).takeUntil(i -> i > 100).subscribe(System.out::println);Flux.just(1, 2, 3, 4, 5, 6, 7).skip(2).subscribe(System.out::println);Flux.interval(Duration.ofMillis(100)).skip(Duration.ofMillis(300)).subscribe(System.out::println);Flux.just(1, 2, 3, 4, 5).last().subscribe(System.out::println);Flux.just(1, 2, 3, 4, 5).next().subscribe(System.out::println););System.in.read();
  • reduce():对流中数据进行规约聚合;

  • reduceWith():对流中数据进行规约聚合,第一个参数可以通过 Supplier 设置初始值;

  • groupBy()  通过一个策略 key 将一个 Flux分割为多个组。

        Flux.range(1, 100).reduce(Integer::sum).subscribe(System.out::println);Flux.range(1, 100).reduceWith(() -> 100, Integer::sum).subscribe(System.out::println);Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9).groupBy(i -> i % 2 == 0 ? "even" : "odd").concatMap(i -> i.defaultIfEmpty(-1).map(String::valueOf).startWith(i.key())).subscribe(System.out::println);

flatMapMany()

flatMapMany() 方法用于将单值流(Mono)转换为多值流(Flux)。它将 Mono 中的元素映射为一个 Publisher,并将这些 Publisher 的元素合并成一个 Flux

Mono<String> mono = Mono.just("Hello");mono.flatMapMany(s -> Flux.just(s.split(""))).subscribe(System.out::println);

transform()

transform() 方法用于将流(Flux 或 Mono)转换为另一种流。它接受一个转换函数,该函数接收一个流并返回一个新的流。这种方法通常用于将通用的转换逻辑封装在一个函数中,以便在多个地方重用。

Function<Flux<String>, Flux<String>> transformToUpperCase = flux -> flux.map(String::toUpperCase);Flux<String> flux = Flux.just("a", "b", "c");flux.transform(transformToUpperCase).subscribe(System.out::println);@Test//把流变形成新数据void transform() {AtomicInteger atomic = new AtomicInteger(0);Flux<String> flux = Flux.just("a", "b", "c").transform(values -> {// ++atomicif (atomic.incrementAndGet() == 1) {//如果是:第一次调用,老流中的所有元素转成大写return values.map(String::toUpperCase);} else {//如果不是第一次调用,原封不动返回return values;}});//transform 无defer,不会共享外部变量的值。 无状态转换; 原理,无论多少个订阅者,transform只执行一次//transform 有defer,会共享外部变量的值。   有状态转换; 原理,无论多少个订阅者,每个订阅者transform都只执行一次flux.subscribe(v -> System.out.println("订阅者1:v = " + v));flux.subscribe(v -> System.out.println("订阅者2:v = " + v));}

2.3事件感知

信号类型

在响应式编程中,信号是数据流的核心概念,主要包括正常信号和异常信号。SignalType 枚举类定义了各种信号类型:

  • SUBSCRIBE:被订阅。
  • REQUEST:请求了 N 个元素。
  • CANCEL:流被取消。
  • ON_SUBSCRIBE:在订阅的时候。
  • ON_NEXT:在元素到达的时候。
  • ON_ERROR:在流发生错误的时候。
  • ON_COMPLETE:在流正常完成的时候。
  • AFTER_TERMINATE:中断以后。
  • CURRENT_CONTEXT:当前上下文。
  • ON_CONTEXT:感知上下文。

doOnXxx API 触发时机

*      1、doOnNext:每个数据(流的数据)到达的时候触发
*      2、doOnEach:每个元素(流的数据和信号)到达的时候触发
*      3、doOnRequest: 消费者请求流元素的时候
*      4、doOnError:流发生错误
*      5、doOnSubscribe: 流被订阅的时候
*      6、doOnTerminate: 发送取消/异常信号中断了流
*      7、doOnCancle: 流被取消
*      8、doOnDiscard:流中元素被忽略的时候
//Mono<Integer>: 只有一个Integer//Flux<Integer>: 有很多Integerpublic void fluxDoOn(String[] args) throws IOException, InterruptedException {
//        Mono<Integer> just = Mono.just(1);
//
//        just.subscribe(System.out::println);//空流:  链式API中,下面的操作符,操作的是上面的流。// 事件感知API:当流发生什么事的时候,触发一个回调,系统调用提前定义好的钩子函数(Hook【钩子函数】);doOnXxx;Flux<Integer> flux = Flux.range(1, 7).delayElements(Duration.ofSeconds(1)).doOnComplete(() -> {System.out.println("流正常结束...");}).doOnCancel(() -> {System.out.println("流已被取消...");}).doOnError(throwable -> {System.out.println("流出错..." + throwable);}).doOnNext(integer -> {System.out.println("doOnNext..." + integer);}); //有一个信号:此时代表完成信号flux.subscribe(new BaseSubscriber<Integer>() {@Overrideprotected void hookOnSubscribe(Subscription subscription) {System.out.println("订阅者和发布者绑定好了:" + subscription);request(1); //背压}@Overrideprotected void hookOnNext(Integer value) {System.out.println("元素到达:" + value);if (value < 5) {request(1);if (value == 3) {int i = 10 / 0;}} else {cancel();//取消订阅}; //继续要元素}@Overrideprotected void hookOnComplete() {System.out.println("数据流结束");}@Overrideprotected void hookOnError(Throwable throwable) {System.out.println("数据流异常");}@Overrideprotected void hookOnCancel() {System.out.println("数据流被取消");}@Overrideprotected void hookFinally(SignalType type) {System.out.println("结束信号:" + type);// 正常、异常
//                try {
//                    //业务
//                }catch (Exception e){
//
//                }finally {
//                    //结束
//                }}});Thread.sleep(2000);//        Flux<Integer> range = Flux.range(1, 7);System.in.read();}
 public void doOnXxxx(String[] args) {// 关键:doOnNext:表示流中某个元素到达以后触发我一个回调// doOnXxx要感知某个流的事件,写在这个流的后面,新流的前面Flux.just(1, 2, 3, 4, 5, 6, 7, 0, 5, 6).doOnNext(integer -> System.out.println("元素到达:" + integer)) //元素到达得到时候触发.doOnEach(integerSignal -> { //each封装的详细System.out.println("doOnEach.." + integerSignal);})//1,2,3,4,5,6,7,0.map(integer -> 10 / integer) //10,5,3,.doOnError(throwable -> {System.out.println("数据库已经保存了异常:" + throwable.getMessage());}).map(integer -> 100 / integer).doOnNext(integer -> System.out.println("元素到哈:" + integer)).subscribe(System.out::println);}

2.4Exception 异常处理

  • doOnError():异常监听,监听到异常的处理逻辑;

  • onErrorReturn():产生异常时返回消息给订阅者;

        Flux.just("1", "2", "3")// concatWith() 可以连接一个新的流.concatWith(Flux.error(new Exception("手动模拟异常..."))).doOnError(Throwable::printStackTrace).onErrorReturn("产生异常,返回 500...").subscribe(System.out::println);
  • subscribe():可以通过传入参数指定异常处理

    • 参数1:定义正常消费逻辑;

    • 参数2:定义异常处理逻辑;

    • 参数3:定义消费完成的逻辑;

        Flux.just("1", "2", "3")// concatWith() 可以连接一个新的流.concatWith(Flux.error(new Exception("手动模拟异常..."))).subscribe(System.out::println,System.err::println,() -> System.out.println("完成..."));
  • onErrorResume():产生异常后重新产生新的流
        Flux.just("1", "2", "3")// concatWith() 可以连接一个新的流.concatWith(Flux.error(new Exception("手动模拟异常..."))).onErrorResume(throwable -> {System.out.println(throwable.getMessage());;return Flux.just("1", "1", "1");}).subscribe(System.out::println);
  • retry():产生异常后进行重试,参数为重试次数
    Flux.just(1).delayElements(Duration.ofSeconds(3)).log().timeout(Duration.ofSeconds(2)).retry(2) // 把流从头到尾重新请求一次.onErrorReturn(2).map(i-> i+"haha").subscribe(v-> System.out.println("v = " + v));System.in.read();

2.5流日志

 Flux.range(1, 7)
//                .log() //日志   onNext(1~7).filter(i -> i > 3) //挑出>3的元素
//                .log() //onNext(4~7).map(i -> "haha-" + i).log()  // onNext(haha-4 ~ 7).subscribe(System.out::println);

2.6背压和请求重塑

1、buffer:缓冲

Flux<List<Integer>> flux = Flux.range(1, 10)  //原始流10个.buffer(3).log();//缓冲区:缓冲3个元素: 消费一次最多可以拿到三个元素; 凑满数批量发给消费者
//
//        //一次发一个,一个一个发;
// 10元素,buffer(3);消费者请求4次,数据消费完成

2、limit:限流

Flux.range(1, 1000).log()//限流触发,看上游是怎么限流获取数据的.limitRate(100) //一次预取30个元素; 第一次 request(100),以后request(75).subscribe();

2.7以编程方式创建序列-Sink

在响应式编程中,Sink 是一种用于手动推送数据到流中的工具。Reactor 提供了 Sinks 类来创建不同类型的 Sink,例如单值的 Sink.One、多值的 Sink.Many 和空值的 Sink.Empty 等。下面我们将介绍如何使用 Sink 来以编程方式创建序列。

Sinks.many(); //发送Flux数据。

Sinks.one(); //发送Mono数据

Sinks: 接受器,数据管道,所有数据顺着这个管道往下走的 S

inks.many().unicast(); //单播: 这个管道只能绑定单个订阅者(消费者) Sinks.many().multicast();//多播: 这个管道能绑定多个订阅者

Sinks.many().replay();//重放: 这个管道能重放元素。 是否给后来的订阅者把之前的元素依然发给它;

 @Testvoid sinks() throws InterruptedException, IOException {//       Flux.create(fluxSink -> {
//           fluxSink.next("111")
//       })//        Sinks.many(); //发送Flux数据。
//        Sinks.one(); //发送Mono数据// Sinks: 接受器,数据管道,所有数据顺着这个管道往下走的//Sinks.many().unicast(); //单播:  这个管道只能绑定单个订阅者(消费者)//Sinks.many().multicast();//多播: 这个管道能绑定多个订阅者//Sinks.many().replay();//重放: 这个管道能重放元素。 是否给后来的订阅者把之前的元素依然发给它;// 从头消费还是从订阅的那一刻消费;//        Sinks.Many<Object> many = Sinks.many()
//                .multicast() //多播
//                .onBackpressureBuffer(); //背压队列//默认订阅者,从订阅的那一刻开始接元素//发布者数据重放; 底层利用队列进行缓存之前数据
//        Sinks.Many<Object> many = Sinks.many().replay().limit(3);
//
//        new Thread(()->{
//            for (int i = 0; i < 10; i++) {
//                many.tryEmitNext("a-"+i);
//                try {
//                    Thread.sleep(1000);
//                } catch (InterruptedException e) {
//                    throw new RuntimeException(e);
//                }
//            }
//        }).start();
//
//
//
//        //订阅
//        many.asFlux().subscribe(v-> System.out.println("v1 = " + v));
//
//        new Thread(()->{
//            try {
//                Thread.sleep(5000);
//            } catch (InterruptedException e) {
//                throw new RuntimeException(e);
//            }
//            many.asFlux().subscribe(v-> System.out.println("v2 = " + v));
//        }).start();Flux<Integer> cache = Flux.range(1, 10).delayElements(Duration.ofSeconds(1)) //不调缓存默认就是缓存所有.cache(1);   //缓存两个元素; 默认全部缓存cache.subscribe();//缓存元素;// 最定义订阅者new Thread(()->{try {Thread.sleep(5000);} catch (InterruptedException e) {throw new RuntimeException(e);}cache.subscribe(v-> System.out.println("v = " + v));}).start();System.in.read();}

2.8自定义元素处理

handle() 方法是 Reactor 中一个非常强大的操作符,它允许你同时处理和转换流中的元素。它的功能类似于 map() 和 filter() 的结合体,能够在处理元素的同时决定是否将其传递给下游。

import reactor.core.publisher.Flux;public class HandleExample {public static void main(String[] args) {Flux.range(1, 10).handle((value, sink) -> {System.out.println("拿到的值:" + value);if (value % 2 == 0) {sink.next("张三:" + value); // 仅向下游发送偶数}// 可以在这里添加更多的逻辑,例如错误处理或完成信号}).log() // 日志.subscribe(System.out::println,error -> System.err.println("Error: " + error),() -> System.out.println("Completed"));}
}

2.9线程调度

在响应式编程中,调度器(Schedulers)用于控制操作符执行的线程和调度策略。Reactor 提供了一些内置的调度器,帮助你管理并发和异步操作。

常见的调度器类型

  1. Schedulers.immediate():在当前线程中执行。
  2. Schedulers.single():在一个单独的线程中执行。
  3. Schedulers.elastic():在弹性线程池中执行,适用于 I/O 密集型操作。
  4. Schedulers.parallel():在固定大小的线程池中执行,适用于 CPU 密集型操作。
  5. Schedulers.boundedElastic():在弹性线程池中执行,适用于需要大量线程的 I/O 密集型操作。
  6. Schedulers.fromExecutor(Executor executor):使用自定义的 Executor
  7. Schedulers.newSingle():创建一个新的单线程调度器。
  8. Schedulers.newParallel():创建一个新的并行调度器。

使用调度器

你可以使用 subscribeOn() 和 publishOn() 方法来指定操作符在不同的调度器上执行。

  • subscribeOn(Scheduler scheduler):指定订阅操作的调度器。
  • publishOn(Scheduler scheduler):指定接下来操作的调度器。
  // 百万数据,8个线程,每个线程处理100,进行分批处理一直处理结束Flux.range(1,10000).buffer(100).parallel(8).runOn(Schedulers.newParallel("yy")).log().flatMap(list->Flux.fromIterable(list)).collectSortedList(Integer::compareTo).subscribe(v-> System.out.println("v = " + v));System.in.read();

2.10Context-API

  //Context-API: https://projectreactor.io/docs/core/release/reference/#context@Test //ThreadLocal在响应式编程中无法使用。//响应式中,数据流期间共享数据,Context API: Context:读写 ContextView:只读;void threadlocal(){//支持Context的中间操作Flux.just(1,2,3).transformDeferredContextual((flux,context)->{System.out.println("flux = " + flux);System.out.println("context = " + context);return flux.map(i->i+"==>"+context.get("prefix"));})//上游能拿到下游的最近一次数据.contextWrite(Context.of("prefix","哈哈"))//ThreadLocal共享了数据,上游的所有人能看到; Context由下游传播给上游.subscribe(v-> System.out.println("v = " + v));//以前 命令式编程//        controller -- service -- dao//响应式编程 dao(10:数据源) --> service(10) --> controller(10); 从下游反向传播}

相关文章:

SpringBoot响应式编程(1)Reactor核心

一、概述 1.1介绍 Reactor 是一个用于JVM的完全非阻塞的响应式编程框架&#xff0c;Webflux 底层使用的也是该框架&#xff0c;其通过流的方式实现了异步相应&#xff0c;具备高效的需求管理&#xff08;即对 “背压&#xff08;backpressure&#xff09;”的控制&#xff09…...

Java后端处理前端字符串与 JSON 数据:安全拼接与转义技巧

在现代 Web 开发中&#xff0c;前后端数据交互是家常便饭。我们经常需要处理前端传递的字符串和 JSON 数据&#xff0c;并在后端进行加工处理后发送到其他服务。本文将以 Spring Boot 为例&#xff0c;探讨如何安全地拼接字符串和 JSON 数据&#xff0c;并介绍如何避免 JSON 特…...

一文搞懂bfs,dfs和高级图算法

你以为BFS&#xff08;广度优先搜索&#xff09;和DFS&#xff08;深度优先搜索&#xff09;这两种基础算法&#xff0c;简单到小学数学就能搞定&#xff1f;但真的是这样吗&#xff1f;很多人都这么认为&#xff0c;但真的对吗&#xff1f;今天&#xff0c;我们不只是走马观花…...

【Rust光年纪】Rust异步编程利器:异步DNS、高性能Web服务器一网打尽

构建高效网络应用必备&#xff1a;解读Rust异步编程神器 前言 Rust 是一种快速流行的系统编程语言&#xff0c;它以其内存安全和并发性能而闻名。在 Rust 生态系统中&#xff0c;有许多优秀的库和框架可以帮助开发者构建高性能、可靠的应用程序。本文将介绍几个在 Rust 中备受…...

04学生管理系统(栈)

文章目录 预处理菜单结构体主函数函数声明栈操作功能实现 预处理 #define _CRT_SECURE_NO_WARNINGS #include<stdio.h> #include<stdlib.h> #include<windows.h> #include<conio.h>#define OVERFLOW -2 #define FALSE 0 #define TRUE 1 #define OK 1 …...

我们如何在centos上部署批量管理工具ansible

1&#xff09;我们先准备环境、设备 #我们准备一台服务机 &#xff08;192.168.61.140&#xff09; ​#然后准备几天客户机&#xff08;192.168.61.141 192.168.61.142&#xff09;这里我们准备两台2)然后我们在客服务机里面添加域名 vi /etc/hosts ​ #添加如下内容 192.…...

如何评估前端代码审查培训计划的有效性?

评估前端代码审查培训计划的有效性可以通过以下方法&#xff1a; 培训前后测试&#xff1a; 在培训前后对学员进行测试&#xff0c;比较结果以评估知识增长。 学员反馈&#xff1a; 通过问卷调查、访谈或开放式反馈收集学员对培训内容、方式和效果的看法。 参与度&#xff1a…...

使用nvm切换Node.js版本

一、安装nvm nvm&#xff08;Node Version Manager&#xff09;是一个用于管理Node.js版本的工具&#xff0c;它允许你在同一台机器上安装和切换多个Node.js版本。 1.安装nvm https://github.com/coreybutler/nvm-windows 访问以上链接到github去下载 点击releases 下载下图…...

x264 编码器 PSNR算法源码分析

PSNR PSNR(Peak Signal-to-Noise Ratio,峰值信噪比)是一种常用的图像质量评价指标,用于衡量图像或视频的清晰度和质量。PSNR是基于信号的最大可能功率与影响信号的噪声功率之间的比率。在图像处理领域,PSNR通常用来评估图像压缩或图像增强算法的效果。 PSNR的计算公式是…...

开源web版3D展示工具Online3DViewer

Online3DViewer是一个免费且开源的Web解决方案&#xff0c;它允许用户在浏览器中直接预览和探索3D模型。 以下是关于Online3DViewer的详细介绍&#xff1a; 一、基本概述 定义&#xff1a;Online3DViewer是一个在线3D模型查看器&#xff0c;支持多种3D文件格式&#xff0c;用…...

白骑士的Matlab教学实战项目篇 4.2 信号与图像处理项目

系列目录 上一篇&#xff1a;白骑士的Matlab教学实战项目篇 4.1 数据分析与可视化 信号处理和图像处理是 MATLAB 的重要应用领域&#xff0c;广泛应用于医学、工程、科学研究等领域。以下内容将介绍信号滤波与频域分析、图像增强与分割的基本概念和方法&#xff0c;并通过一个…...

复现、并改进open-mmlab的mmpose详细细节

复现open-mmlab的mmpose详细细节 1.配置环境2.数据处理3.训练4.改进mmpose4.1 快速调试技巧4.2 快速定位4.3 改进backbone4.3.1 使用说明4.3.2 改进案例4.3.2.1 复现mmpose原配置文件4.3.2.2 复现开源项目4.3.2.3 修改配置文件4.3.2.4 修改新模型 4.4 添加auxiliary_head4.4.1 …...

编写兼容Python2.x与3.x代码

编写兼容Python2.x与3.x代码 当我们正处于Python2.x到Python3.x的过渡期时&#xff0c;你可能想过是否可以在不修改任何代码的前提下能同时运行在Python2和3中。这看起来还真是一个合理的诉求&#xff0c;但如何开始呢&#xff1f;哪些Python2代码在3.x解释器执行时容易出状况…...

比特币8.12学习问题

疑问&#xff1a;什么是过滤&#xff0c;什么是offset 没有投钱的情况下&#xff0c;怎么用api 公式&#xff1a;单币分配金额 总资金 / 2/ offset/选币数量&#xff0c;其中2 表示多空 买入滑点&#xff08;Slippage&#xff09;是指在执行交易订单时&#xff0c;实际成交…...

解析 Vue 中的app.version、 app.provide 与 app.runWithContext :原理、应用与实例剖析

目录 app.provide app.runWithContext ​​​​​​​app.version 非 VIP 用户能够通过积分下载博文资源 app.provide 在 Vue 3.0 中,app.provide充当着在应用层级提供全局共享数据或者服务的关键角色。 app.provide(key, value) 这一方法接收两个关键参数,其中 …...

Ubuntu server 命令行跑selenium

背景 自动化测试都是在本机win上使用selenium 跑自动化脚本,但是服务器都是命令行的没有web界面 依赖包部署 apt-get install zlib1g-dev zlib1g## 安装谷歌浏览器 ## 跳到底部,选择其他平台 https://www.google.com/chrome/## ubuntu # dpkg -i google-chrome-stable_…...

刚刚,模糊测试平台SFuzz受到行业认可

近日&#xff0c;中国网络安全产业联盟&#xff08;CCIA&#xff09;正式发布了“2024年网络安全优秀创新成果大赛-安全严选专题赛”评选结果&#xff0c;开源网安模糊测试平台SFuzz凭借重大创新能力&#xff0c;得到组委会认可&#xff0c;获本次大赛创新产品优胜奖。 2024年网…...

数据结构与算法——DFS(深度优先搜索)

算法介绍&#xff1a; 深度优先搜索&#xff08;Depth-First Search&#xff0c;简称DFS&#xff09;是一种用于遍历或搜索树或图的算法。这种算法会尽可能深地搜索图的分支&#xff0c;直到找到目标节点或达到叶节点&#xff08;没有子节点的节点&#xff09;&#xff0c;然后…...

基于lambda简化设计模式

写在文章开头 本文将演示基于函数式编程的理念&#xff0c;优化设计模式中繁琐的模板化编码开发&#xff0c;以保证用尽可能少的代码做尽可能多的事&#xff0c;希望对你有帮助。 Hi&#xff0c;我是 sharkChili &#xff0c;是个不断在硬核技术上作死的 java coder &#xff…...

揭秘! 经纬恒润“车路云一体化”方案研发服务背后的科技驱动力

随着高级别智能驾驶技术的飞速发展&#xff0c;自动驾驶与路侧基础设施协同合作已成为行业内的又一热点。我国率先提出以“车路云一体化”为核心的战略布局&#xff0c;国家政策密集出台&#xff0c;地方试点积极推进&#xff0c;行业标准日趋完善&#xff0c;智能网联汽车“车…...

Redis操作--RedisTemplate(二)StringRedisTemplate

一、介绍 1、简介 由于存储在 Redis 中的 key 和 value 通常是很常见的 String 类型&#xff0c;Redis模块提供了 RedisConnection 和 RedisTemplate 的扩展&#xff0c;分是 StringRedisConnection 和 StringRedisTemplate&#xff0c;作为字符串操作的解决方案。 通过源码…...

【自动驾驶】ROS中自定义格式的服务通信,含命令行动态传参(c++)

目录 通信流程创建服务器端及客户端新建服务通讯文件修改service的xml及cmakelistCMakeLists.txt编辑 msg 相关配置编译消息相关头文件在cmakelist中包含头文件的路径在service包下编写service.cpp在client包下编写client.cpp测试运行查询服务的相关指令列出目前的所有服务&…...

优思学院|PDCA和DMAIC之间如何选择?

在现代组织中&#xff0c;提升方法、质量和效率是企业追求卓越、保持竞争力的核心目标。在这条道路上&#xff0c;DMAIC&#xff08;定义、测量、分析、改进、控制&#xff09;和PDCA&#xff08;计划、执行、检查、行动&#xff09;被广泛应用于持续改进和问题解决。这两者虽然…...

5 款最佳 Micro SD 卡恢复软件,助您恢复文件

您是否对数据恢复存在某些疑问&#xff0c;并想知道如何恢复 Micro SD 卡上的文件&#xff1f;如果是&#xff0c;那么在本文中您将找到答案。网上有许多专门用于从 Micro SD 卡或格式化的 Micro 卡恢复已删除文件而设计的软件。因此&#xff0c;在本文中&#xff0c;我们将向您…...

【使用教程】CiA402中的“原点回归模式”和“轮廓位置模式”搭配使用操作实例

使用“原点回归模式”配合“轮廓位置模式”是步进或伺服电机使用过程中最常用的方法&#xff0c;其对于提高自动化生产线的准确性和效率具有重要意义&#xff0c;本文将对正常使用控制电机中发送的命令及顺序进行简要说明。 说明&#xff1a;“原点回归”以“堵转回原点”的方式…...

服务器网络不通排查方案

服务器网络不通排查方案 最近遇到了服务器上服务已经启动&#xff0c;但是在浏览器上无法访问的问题&#xff0c;记录一下排查流程 文章目录 服务器网络不通排查方案netstart排查网络连接信息netstat 命令netstat -aptn 命令 iptables总结 netstart排查网络连接信息 netstat …...

Spring Boot + Vue 跨域配置(CORS)问题解决历程

在使用 Spring Boot 和 Vue 开发前后端分离的项目时&#xff0c;跨域资源共享&#xff08;CORS&#xff09;问题是一个常见的挑战。接下来&#xff0c;我将分享我是如何一步步解决这个问题的&#xff0c;包括中间的一些试错过程&#xff0c;希望能够帮助到正在经历类似问题的你…...

Think | 大模型迈向AGI的探索和对齐

注&#xff1a;节选自我于24年初所写的「融合RL与LLM思想探寻世界模型以迈向AGI」散文式风格文章&#xff0c;感兴趣的小伙伴儿可以访问我的主页置顶或专栏收录&#xff0c;并制作了电子书供大家参考&#xff0c;有需要的小伙伴可以关注私信我&#xff0c;因为属于技术散文风格…...

为什么选择在Facebook投放广告?

2024年了你还没对 Facebook 广告产生兴趣&#xff1f;那你可就亏大了&#xff01; 今天这篇文章&#xff0c;我们会分享它对你扩大业务的好处。要知道&#xff0c;Facebook 广告凭借它庞大的用户群和先进的定位选项&#xff0c;已经是企业主们有效接触目标受众的必备神器。接下…...

10 ARM 体系

10 ARM 体系 ARM体系1、基本概念1.1 常见的处理器1.2 ARM7三级指令流水线1.3 初识PC寄存器 2、 ARM核的七种工作模式3、ARM核七种异常 ARM体系 1、基本概念 1.1 常见的处理器 PowerPC处理器&#xff1a;飞思卡尔MPC系列 DSP:TI达芬奇系列 FPGA&#xff1a;Xilinx赛灵思的ZYN…...