当前位置: 首页 > news >正文

响应式编程详解,带你熟悉Reactor响应式编程

文章目录

  • 一、什么是响应式编程
    • 1、Java的流和响应式流
    • 2、Java中响应式的使用
    • 3、Reactor中响应式流的基本接口
    • 4、Reactor中响应式接口的基本使用
  • 二、初始Reactor
    • 1、Flux和Mono的基本介绍
    • 2、引入Reactor依赖
    • 3、响应式类型的创建
    • 4、响应式类型的组合
      • (1)使用mergeWith合并响应式流
      • (2)使用zip压缩合并响应式流
      • (3)使用zip压缩合并为自定义对象的响应式流
      • (4)选择第⼀个反应式类型进⾏发布
    • 5、转换和过滤反应式流
      • (1)skip操作跳过指定数⽬的消息
      • (2)skip()操作的另⼀种形式
      • (3)take操作只发布第⼀批指定数量的数据项
      • (4)take操作的另一种形式
      • (5)filter操作自定义过滤条件
      • (6)distinct操作去重
      • (7)map操作映射新元素
      • (8)flatMap将流转成新的流
      • (9)buffer操作现将数据流拆分为小块
      • (10)collectList操作也可以将所有数据收集到一个List
      • (11)collectMap 操作产生⼀个发布Map的Mono
    • 6、在反应式类型上执行逻辑操作
      • (1)⽤all()⽅法来确保Flux中的所有消息都满⾜某些条件
      • (2)⽤any()⽅法来确保Flux中⾄少有⼀个消息满⾜某些条件
    • 7、在反应式类型上使用Subscriber订阅
      • (1)使用Subscriber消费消息
      • (2)使用Flux的doOnNext处理数据
    • 8、使用then来处理完成数据返回
  • 写在后面

一、什么是响应式编程

响应式编程是一种面向数据流和变化传播的编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。

在开发应⽤程序代码时,我们可以编写两种⻛格的代码,即命令式和响应式。

命令式(Imperative)的代码:它由⼀组任务组成,每次只运⾏⼀项任务,每项任务⼜都依赖于前⾯的任务。数据会按批次进⾏处理,在前⼀项任务还没有完成对当前数据批次的处理时,不能将这些数据递交给下⼀项处理任务。

响应式(Reactive)的代码:它定义了⼀组⽤来处理数据的任务,但是这些任务可以并⾏地执⾏。每项任务处理数据的⼀部分⼦集,并将结果交给处理流程中的下⼀项任务,同时继续处理数据的另⼀部分⼦集。

Reactor 是⼀个响应式编程库,同时也是Spring家族的⼀部分。它是Spring 5反应式编程功能的基础。

1、Java的流和响应式流

Java的Stream流通常都是同步的,并且只能处理有限的数据集。从本质上来说,它们只是使⽤函数来对集合进⾏迭代的⼀种⽅式。

响应式流⽀持异步处理任意⼤⼩的数据集,同样也包括⽆限数据集。只要数据就绪,它们就能实时地处理数据,并且能够通过回压来避免压垮数据的消费者。

2、Java中响应式的使用

JDK1.8时,是基于Observer/Observable接口而实现的观察者模式:

ObserverDemo observer = new ObserverDemo();
// 添加观察者
observer.addObserver(new Observer() {@Overridepublic void update(Observable o, Object arg) {System.out.println("发生了变化");}
});
observer.addObserver(new Observer() {@Overridepublic void update(Observable o, Object arg) {System.out.println("收到了通知");}
});
observer.setChanged(); // 数据变化
observer.notifyObservers(); // 通知

JDK9及以后,Observer/Observable接口就被弃用了,取而代之的是Flow类:

import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;public class FlowDemo {public static void main(String[] args) throws Exception {// 1. 定义发布者, 发布的数据类型是 Integer// 直接使用jdk自带的SubmissionPublisher, 它实现了 Publisher 接口SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();// 2. 定义订阅者Subscriber<Integer> subscriber = new Subscriber<Integer>() {private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription) {// 保存订阅关系, 需要用它来给发布者响应this.subscription = subscription;// 请求一个数据this.subscription.request(1);}@Overridepublic void onNext(Integer item) {// 接受到一个数据, 处理System.out.println("接受到数据: " + item);try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}// 处理完调用request再请求一个数据this.subscription.request(1);// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了// this.subscription.cancel();}@Overridepublic void onError(Throwable throwable) {// 出现了异常(例如处理数据的时候产生了异常)throwable.printStackTrace();// 我们可以告诉发布者, 后面不接受数据了this.subscription.cancel();}@Overridepublic void onComplete() {// 全部数据处理完了(发布者关闭了)System.out.println("处理完了!");}};// 3. 发布者和订阅者 建立订阅关系publiser.subscribe(subscriber);// 4. 生产数据, 并发布// 这里忽略数据生产过程for (int i = 0; i < 1000; i++) {System.out.println("生成数据:" + i);// submit是个block方法publiser.submit(i);}// 5. 结束后 关闭发布者// 正式环境 应该放 finally 或者使用 try-resouce 确保关闭publiser.close();// 主线程延迟停止, 否则数据没有消费就退出Thread.currentThread().join(1000);// debug的时候, 下面这行需要有断点// 否则主线程结束无法debugSystem.out.println();}}
import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;/**
* 带 process 的 flow demo
*//**
* Processor, 需要继承SubmissionPublisher并实现Processor接口
*
* 输入源数据 integer, 过滤掉小于0的, 然后转换成字符串发布出去
*/
class MyProcessor extends SubmissionPublisher<String>implements Processor<Integer, String> {private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription) {// 保存订阅关系, 需要用它来给发布者响应this.subscription = subscription;// 请求一个数据this.subscription.request(1);}@Overridepublic void onNext(Integer item) {// 接受到一个数据, 处理System.out.println("处理器接受到数据: " + item);// 过滤掉小于0的, 然后发布出去if (item > 0) {this.submit("转换后的数据:" + item);}// 处理完调用request再请求一个数据this.subscription.request(1);// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了// this.subscription.cancel();}@Overridepublic void onError(Throwable throwable) {// 出现了异常(例如处理数据的时候产生了异常)throwable.printStackTrace();// 我们可以告诉发布者, 后面不接受数据了this.subscription.cancel();}@Overridepublic void onComplete() {// 全部数据处理完了(发布者关闭了)System.out.println("处理器处理完了!");// 关闭发布者this.close();}
}public class FlowDemo2 {public static void main(String[] args) throws Exception {// 1. 定义发布者, 发布的数据类型是 Integer// 直接使用jdk自带的SubmissionPublisherSubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();// 2. 定义处理器, 对数据进行过滤, 并转换为String类型MyProcessor processor = new MyProcessor();// 3. 发布者 和 处理器 建立订阅关系publiser.subscribe(processor);// 4. 定义最终订阅者, 消费 String 类型数据Subscriber<String> subscriber = new Subscriber<String>() {private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription) {// 保存订阅关系, 需要用它来给发布者响应this.subscription = subscription;// 请求一个数据this.subscription.request(1);}@Overridepublic void onNext(String item) {// 接受到一个数据, 处理System.out.println("接受到数据: " + item);// 处理完调用request再请求一个数据this.subscription.request(1);// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了// this.subscription.cancel();}@Overridepublic void onError(Throwable throwable) {// 出现了异常(例如处理数据的时候产生了异常)throwable.printStackTrace();// 我们可以告诉发布者, 后面不接受数据了this.subscription.cancel();}@Overridepublic void onComplete() {// 全部数据处理完了(发布者关闭了)System.out.println("处理完了!");}};// 5. 处理器 和 最终订阅者 建立订阅关系processor.subscribe(subscriber);// 6. 生产数据, 并发布// 这里忽略数据生产过程publiser.submit(-111);publiser.submit(111);// 7. 结束后 关闭发布者// 正式环境 应该放 finally 或者使用 try-resouce 确保关闭publiser.close();// 主线程延迟停止, 否则数据没有消费就退出Thread.currentThread().join(1000);}}

3、Reactor中响应式流的基本接口

响应式流规范可以总结为4个接⼝:Publisher、Subscriber、Subscription和Processor。

Publisher负责⽣成数据,并将数据发送给 Subscription(每个Subscriber对应⼀个Subscription)。

public interface Publisher<T> {// Publisher接⼝声明了⼀个⽅法 subscribe(),Subscriber可以通过该⽅法向 Publisher发起订阅。public void subscribe(Subscriber<? super T> s);
}

⼀旦Subscriber订阅成功,就可以接收来⾃Publisher的事件。

public interface Subscriber<T> {// Subscriber的第⼀个事件是通过对 onSubscribe()⽅法的调⽤接收的。public void onSubscribe(Subscription s);// 每个数据项都会通过该方法处理public void onNext(T t);// 异常处理public void onError(Throwable t);// 结束public void onComplete();
}

Publisher调⽤ onSubscribe() ⽅法时,会将Subscription对象传递给 Subscriber。

通过Subscription,Subscriber可以管理其订阅情况:

public interface Subscription {// Subscriber可以通过调⽤ request()⽅法来请求 Publisher 发送数据,可以传⼊⼀个long类型的数值以表明它愿意接受多少数据// 这也是回压能够发挥作⽤的地⽅,以避免Publisher 发送多于 Subscriber能够处理的数据量public void request(long n);// 调⽤ cancel()⽅法表明它不再对数据感兴趣并且取消订阅public void cancel();
}

Subscriber 请求数据之后,数据就会开始流经响应式流,调用onNext方法。

Processor接⼝,它是Subscriber和Publisher的组合:

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

4、Reactor中响应式接口的基本使用

import java.util.concurrent.TimeUnit;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;public class ReactorDemo {public static void main(String[] args) {// reactor = jdk8 stream + jdk9 reactive stream// Mono 0-1个元素// Flux 0-N个元素String[] strs = { "1", "2", "3" };// 2. 定义订阅者Subscriber<Integer> subscriber = new Subscriber<Integer>() {private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription) {// 保存订阅关系, 需要用它来给发布者响应this.subscription = subscription;// 请求一个数据this.subscription.request(1);}@Overridepublic void onNext(Integer item) {// 接受到一个数据, 处理System.out.println("接受到数据: " + item);try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}// 处理完调用request再请求一个数据this.subscription.request(1);// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了// this.subscription.cancel();}@Overridepublic void onError(Throwable throwable) {// 出现了异常(例如处理数据的时候产生了异常)throwable.printStackTrace();// 我们可以告诉发布者, 后面不接受数据了this.subscription.cancel();}@Overridepublic void onComplete() {// 全部数据处理完了(发布者关闭了)System.out.println("处理完了!");}};// 这里就是jdk8的streamFlux.fromArray(strs).map(s -> Integer.parseInt(s))// 最终操作// 这里就是jdk9的reactive stream.subscribe(subscriber);}
}

二、初始Reactor

1、Flux和Mono的基本介绍

Reactor中有两个核心类,Mono和Flux。Flux和Mono是Reactor提供的最基础的构建块,⽽这两种响应式类型所提供的操作符则是组合使⽤它们以构建数据流动管线的黏合剂。

这两个类实现接口Publisher,提供丰富操作符。Flux对象实现发布者,返回N个元素Mono实现发布者,返回0或者1个元素。

Flux和Mono都是数据流的发布者,使用Flux和Mono都可以发出三种数据信号:元素值、错误信号、完成信号,错误信号和完成信号都代表终止信号,终止信号用于告诉订阅者数据流结束了,错误信号终止数据流同时把错误信息传递给订阅者。

Flux和Mono共有500多个操作,这些操作都可以⼤致归类为:创建操作;组合操作;转换操作;逻辑操作。

注意!Mono和Flux的很多操作是相同的,只不过对应的数据数量不同,所以本文更多的操作都是基于Flux的,Mono也同理。

在这里插入图片描述
在这里插入图片描述

2、引入Reactor依赖

需要引入reactor-core核心包和测试包。

<dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId><version>3.x.x</version>
</dependency><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-test</artifactId><version>3.x.x</version><scope>test</scope>
</dependency>

3、响应式类型的创建

Reactor提供了多种创建Flux和Mono的操作。

// 使⽤Flux或Mono上的静态 just()⽅法来创建⼀个响应式类型
Mono.just(1);
Flux<String> fruitFlux = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
// 调用just或其他方法只是声明数据流,数据流并没有发出,只有进行订阅之后才会触发数据流,不订阅什么都不会发生的。
// 添加一个订阅者,subscribe的方法参数相当于是一个Consumer
fruitFlux.subscribe(f -> System.out.println("Here's some fruit: " + f)
);// 根据集合创建
String[] fruits = new String[] {"Apple", "Orange", "Grape", "Banana", "Strawberry" };
Flux<String> fruitFlux2 = Flux.fromArray(fruits);
List<String> list = Arrays.asList(fruits);
Flux.fromIterable(list); // 集合Stream<String> stream = list.stream();
Flux.fromStream(stream); // stream流// 根据区间创建1-5
Flux<Integer> intervalFlux =Flux.range(1, 5);
intervalFlux.subscribe(f -> System.out.println("data is :" + f)
);
// 每秒发布⼀个值的Flux,通过interval()⽅法创建的Flux会从0开始发布值,并且后续的条⽬依次递增。
// 因为interval()⽅法没有指定最⼤值,所以它可能会永远运⾏。我们也可以使⽤take()⽅法将结果限制为前5个条⽬。
Flux<Long> intervalFlux2 =Flux.interval(Duration.ofSeconds(1)).take(5);
intervalFlux2.subscribe(f -> System.out.println("data2 is :" + f)
);// 阻塞,等待结果
Thread.sleep(100000);

4、响应式类型的组合

(1)使用mergeWith合并响应式流

在这里插入图片描述

Flux<String> characterFlux = Flux.just("Garfield", "Kojak", "Barbossa").delayElements(Duration.ofMillis(500)); // 每500毫秒发布⼀个数据Flux<String> foodFlux = Flux.just("Lasagna", "Lollipops", "Apples").delaySubscription(Duration.ofMillis(250)) // 订阅后250毫秒后开始发布数据.delayElements(Duration.ofMillis(500)); // 每500毫秒发布⼀个数据// 使⽤mergeWith()⽅法,将两个Flux合并,合并过后的Flux数据项发布顺序与源Flux的发布时间⼀致
// Garfield Lasagna Kojak Lollipops Barbossa Apples
Flux<String> mergedFlux = characterFlux.mergeWith(foodFlux);mergedFlux.subscribe(System.out::println);// 阻塞,等待结果
Thread.sleep(100000);

我们发现,使用mergeWith合并过的两个FLux,并没有严格意义上的先后之分,谁产生了数据就接着消费,与同一个无异。

(2)使用zip压缩合并响应式流

在这里插入图片描述

Flux<String> characterFlux = Flux.just("Garfield", "Kojak", "Barbossa");
Flux<String> foodFlux = Flux.just("Lasagna", "Lollipops", "Apples");
// 当两个Flux对象压缩在⼀起的时候,它将会产⽣⼀个新的发布元组的Flux,其中每个元组中都包含了来⾃每个源Flux的数据项
// 这个合并后的Flux发出的每个条⽬都是⼀个Tuple2(⼀个容纳两个其他对象的容器对象)的实例,其中包含了来⾃每个源Flux的数据项,并保持着它们发布的顺序。
Flux<Tuple2<String, String>> zippedFlux =Flux.zip(characterFlux, foodFlux);zippedFlux.subscribe(t -> {System.out.println(t.getT1() + "|" + t.getT2());
});
/*** 执行结果:* Garfield|Lasagna* Kojak|Lollipops* Barbossa|Apples*/

(3)使用zip压缩合并为自定义对象的响应式流

如果你不想使⽤Tuple2,⽽想要使⽤其他类型,就可以为zip()⽅法提供⼀个合并函数来⽣成你想要的任何对象,合并函数会传⼊这两个数据项。
在这里插入图片描述
zip操作的另⼀种形式(从每个传⼊Flux中各取⼀个元素,然后创建消息对象,并产⽣这些消息组成的Flux)

Flux<String> characterFlux = Flux.just("Garfield", "Kojak", "Barbossa");
Flux<String> foodFlux = Flux.just("Lasagna", "Lollipops", "Apples");// 压缩成自定义对象
Flux<String> zippedFlux =Flux.zip(characterFlux, foodFlux, (c, f) -> c + " eats " + f);
zippedFlux.subscribe(System.out:: println);/*** 执行结果:* Garfield eats Lasagna* Kojak eats Lollipops* Barbossa eats Apples*/

(4)选择第⼀个反应式类型进⾏发布

假设我们有两个Flux对象,此时我们不想将它们合并在⼀起,⽽是想要创建⼀个新的Flux,让这个新的Flux从第⼀个产⽣值的Flux中发布值。first()操作会在两个Flux对象中选择第⼀个发布值的Flux,并再次发布它的值。
在这里插入图片描述

Flux<String> slowFlux = Flux.just("tortoise", "snail", "sloth").delaySubscription(Duration.ofMillis(100)); // 延迟100ms
Flux<String> fastFlux = Flux.just("hare", "cheetah", "squirrel");
// 选择第⼀个反应式类型进⾏发布
Flux<String> firstFlux = Flux.first(slowFlux, fastFlux);
firstFlux.subscribe(System.out::println);
// 阻塞,等待结果
Thread.sleep(100000);
/*** 执行结果:* hare* cheetah* squirrel*/

5、转换和过滤反应式流

在数据流经⼀个流时,我们通常需要过滤掉某些值并对其他的值进⾏处理。

(1)skip操作跳过指定数⽬的消息

skip操作跳过指定数⽬的消息并将剩下的消息继续在结果Flux上进⾏传递
在这里插入图片描述

// 跳过3个,并创建一个新的Flux
Flux<String> skipFlux = Flux.just("one", "two", "skip a few", "ninety nine", "one hundred").skip(3);
skipFlux.subscribe(System.out::println);
/*** 执行结果* ninety nine* one hundred*/

(2)skip()操作的另⼀种形式

在⼀段时间之内跳过所有的第⼀批数据。
在这里插入图片描述

// 这是skip()操作的另⼀种形式,将会产⽣⼀个新Flux,在发布来⾃源Flux的数据项之前等待指定的⼀段时间
Flux<String> skipFlux = Flux.just("one", "two", "skip a few", "ninety nine", "one hundred").delayElements(Duration.ofSeconds(1)) // 每1秒一个.skip(Duration.ofSeconds(4)); // 4秒前的都跳过
skipFlux.subscribe(System.out::println);// 阻塞,等待结果
Thread.sleep(100000);/*** 执行结果:* ninety nine* one hundred*/

(3)take操作只发布第⼀批指定数量的数据项

根据对skip操作的描述来看,take可以认为是与skip相反的操作。skip操作会跳过前⾯⼏个数据项,⽽take操作只发布第⼀批指定数量的数据项,然后将取消订阅。
在这里插入图片描述

// take操作只发布传⼊Flux中前⾯指定数⽬的数据项,然后将取消订阅
Flux<String> nationalParkFlux = Flux.just("Yellowstone", "Yosemite", "Grand Canyon","Zion", "Grand Teton").take(3);
nationalParkFlux.subscribe(System.out::println);
/*** 执行结果:* Yellowstone* Yosemite* Grand Canyon*/

(4)take操作的另一种形式

take()⽅法也有另⼀种替代形式,基于间隔时间⽽不是数据项个数(在指定的时间过期之前,⼀直将消息传递给结果Flux)。它将接受并发布与源Flux⼀样多的数据项,直到某段时间结束,之后Flux将会完成。
在这里插入图片描述

// 在订阅之后的前3.5秒发布数据条⽬。
Flux<String> nationalParkFlux = Flux.just("Yellowstone", "Yosemite", "Grand Canyon","Zion", "Grand Teton").delayElements(Duration.ofSeconds(1)).take(Duration.ofMillis(3500));
nationalParkFlux.subscribe(System.out::println);
// 阻塞,等待结果
Thread.sleep(100000);
/*** 执行结果:* Yellowstone* Yosemite* Grand Canyon*/

(5)filter操作自定义过滤条件

filter操作允许我们根据任何条件进⾏选择性地发布。
在这里插入图片描述

Flux<String> nationalParkFlux = Flux.just("Yellowstone", "Yosemite", "Grand Canyon","Zion", "Grand Teton").filter(np -> !np.contains(" ")); // 过滤携带空格的
nationalParkFlux.subscribe(System.out::println);
/*** 执行结果* Yellowstone* Yosemite* Zion*/

(6)distinct操作去重

在这里插入图片描述

Flux<String> animalFlux = Flux.just("dog", "cat", "bird", "dog", "bird", "anteater").distinct();
// 去重
animalFlux.subscribe(System.out::println);
/*** 执行结果:* dog* cat* bird* anteater*/

(7)map操作映射新元素

map将元素映射为新的元素,并创建一个新的Flux。
在这里插入图片描述

// map将元素映射为新的元素,并创建一个新的Flux
Flux<Integer> integerFlux = Flux.just("Michael Jordan", "Scottie Pippen", "Steve Kerr").map(n -> {String[] split = n.split("\\s");return split.length; // 将String转为Integer});
integerFlux.subscribe(System.out::println);/*** 执行结果:* 2* 2* 2*/

其中重要的⼀点是:在每个数据项被源Flux发布时,map操作是同步执⾏的,如果你想要异步地转换过程,那么你应该考虑使⽤flatMap操作。

(8)flatMap将流转成新的流

flatMap并不像map操作那样简单地将⼀个对象转换到另⼀个对象,⽽是将对象转换为新的Mono或Flux。结果形成的Mono或Flux会扁平化为新的Flux。当与subscribeOn()⽅法结合使⽤时,flatMap操作可以释放Reactor反应式的异步能⼒。
在这里插入图片描述

// 使⽤flatMap()⽅法和subscribeOn()⽅法
Flux<Integer> integerFlux = Flux.just("Michael", "Scottie Pippen", "Steve Kerr Ob").flatMap(n -> Mono.just(n).map(p -> {String[] split = p.split("\\s");return split.length; // 将String转为Integer}).subscribeOn(Schedulers.parallel()) // 定义异步);
integerFlux.subscribe(System.out::println);
// 阻塞,等待结果
Thread.sleep(100000);

在这里插入图片描述

(9)buffer操作现将数据流拆分为小块

buffer操作会产⽣⼀个新的包含列表Flux(具备最⼤⻓度限制的列表,包含从传⼊的Flux中收集来的数据)
在这里插入图片描述

// buffer操作会产⽣⼀个新的包含列表Flux(具备最⼤⻓度限制的列表,包含从传⼊的Flux中收集来的数据)
Flux<String> fruitFlux = Flux.just("apple", "orange", "banana", "kiwi", "strawberry");
// 创建⼀个新的包含List 集合的Flux,其中每个List只有不超过指定数量的元素
Flux<List<String>> bufferedFlux = fruitFlux.buffer(3); // 数据切分为小块,每3个一块
bufferedFlux.subscribe(System.out::println);
/*** 执行结果:* [apple, orange, banana]* [kiwi, strawberry]*/
// 可以分片后并行执行
bufferedFlux.flatMap(x ->Flux.fromIterable(x).map(y -> y.toUpperCase()).subscribeOn(Schedulers.parallel())
).subscribe(l -> {System.out.println(Thread.currentThread().getName() + "线程执行:" + l);
});
/*** 执行结果(因为并行执行,结果可能不一致):* parallel-1线程执行:APPLE* parallel-1线程执行:ORANGE* parallel-1线程执行:BANANA* parallel-2线程执行:KIWI* parallel-2线程执行:STRAWBERRY*/
// 阻塞,等待结果
Thread.sleep(100000);

使⽤不带参数的buffer()⽅法可以将Flux发布的所有数据项都收集到⼀个List中:
在这里插入图片描述

Flux<List<String>> bufferedFlux = fruitFlux.buffer();

(10)collectList操作也可以将所有数据收集到一个List

collectList操作将产⽣⼀个包含传⼊Flux发布的所有消息的Mono。
在这里插入图片描述

Flux<String> fruitFlux = Flux.just("apple", "orange", "banana", "kiwi", "strawberry");
// 生成一个Mono,里面包含一个List
Mono<List<String>> fruitListMono = fruitFlux.collectList();

(11)collectMap 操作产生⼀个发布Map的Mono

collectMap操作将会产⽣⼀个Mono(包含了由传⼊Flux所发出的消息产⽣的Map,这个Map的key是从传⼊消息的某些特征衍⽣⽽来的)
在这里插入图片描述

Flux<String> animalFlux = Flux.just("aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Map<Character, String>> animalMapMono =animalFlux.collectMap(a -> a.charAt(0)); // 将第一个字符作为Map的key
animalMapMono.subscribe(System.out::println);
/*** 执行结果:* {a=aardvark, e=eagle, k=kangaroo}*/// 阻塞,等待结果
Thread.sleep(100000);

key相同的,会被覆盖。

6、在反应式类型上执行逻辑操作

(1)⽤all()⽅法来确保Flux中的所有消息都满⾜某些条件

在这里插入图片描述

Flux<String> animalFlux = Flux.just("aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Boolean> hasAMono = animalFlux.all(a -> a.contains("a"));

都满足条件会返回true,否则返回false。

(2)⽤any()⽅法来确保Flux中⾄少有⼀个消息满⾜某些条件

在这里插入图片描述

Flux<String> animalFlux = Flux.just("aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Boolean> hasAMono = animalFlux.any(a -> a.contains("t"));

至少有一个满足条件,就为true,都不满足就为false。

7、在反应式类型上使用Subscriber订阅

(1)使用Subscriber消费消息

Flux<String> stringFlux = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");stringFlux.subscribe(new Subscriber<String>() {// 保存订阅关系, 需要用它来给发布者响应private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription) {System.out.println("订阅者开始订阅");this.subscription = subscription;// 请求一个数据this.subscription.request(1);}@Overridepublic void onNext(String item) {System.out.println("订阅者开始处理数据" + item);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}// 处理完调用request再请求一个数据this.subscription.request(1);// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了// this.subscription.cancel();}@Overridepublic void onError(Throwable t) {// 出现了异常(例如处理数据的时候产生了异常)t.printStackTrace();// 我们可以告诉发布者, 后面不接受数据了this.subscription.cancel();}@Overridepublic void onComplete() {// 全部数据处理完了(发布者关闭了)System.out.println("订阅者处理完了!");}
});
/*** 执行结果:* 订阅者开始订阅* 订阅者开始处理数据Apple* 订阅者开始处理数据Orange* 订阅者开始处理数据Grape* 订阅者开始处理数据Banana* 订阅者开始处理数据Strawberry* 订阅者处理完了!*/// 阻塞
Thread.sleep(10000);

(2)使用Flux的doOnNext处理数据

Flux的doOnNext,会添加当Flux发出一个项目时触发的行为(副作用)。

Flux<String> stringFlux = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
stringFlux.doOnNext(t -> System.out.println("发布者处理数据:" + t)).subscribe(t -> System.out.println("订阅者处理数据:" + t));
/*** 执行结果:* 发布者处理数据:Apple* 订阅者处理数据:Apple* 发布者处理数据:Orange* 订阅者处理数据:Orange* 发布者处理数据:Grape* 订阅者处理数据:Grape* 发布者处理数据:Banana* 订阅者处理数据:Banana* 发布者处理数据:Strawberry* 订阅者处理数据:Strawberry*/// 阻塞
Thread.sleep(10000);

但是!以下写法是不会触发发布者的doOnNext事件的:

Flux<String> stringFlux = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
stringFlux.doOnNext(t -> System.out.println("发布者处理数据:" + t));
stringFlux.subscribe(t -> System.out.println("订阅者处理数据:" + t));

只有链式调用,才会触发发布者的doOnNext事件。

doOnNext可以写多个,顺序执行:

Flux<String> stringFlux = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
stringFlux.doOnNext(t -> System.out.println("发布者1处理数据:" + t)).doOnNext(t -> System.out.println("发布者2处理数据:" + t)).subscribe(t -> System.out.println("订阅者处理数据:" + t));
/*** 执行结果:* 发布者1处理数据:Apple* 发布者2处理数据:Apple* 订阅者处理数据:Apple* 发布者1处理数据:Orange* 发布者2处理数据:Orange* 订阅者处理数据:Orange* 发布者1处理数据:Grape* 发布者2处理数据:Grape* 订阅者处理数据:Grape* 发布者1处理数据:Banana* 发布者2处理数据:Banana* 订阅者处理数据:Banana* 发布者1处理数据:Strawberry* 发布者2处理数据:Strawberry* 订阅者处理数据:Strawberry*/

8、使用then来处理完成数据返回

Flux<String> just = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
// 返回一个Mono ,在此Flux完成时完成。这将主动忽略序列,只重放完成或错误信号。
just.doOnNext(t -> System.out.println("发布者处理数据:" + t)).then(Mono.defer(() -> {return Mono.just("我完成了");})).subscribe(t -> System.out.println("订阅者处理数据:" + t));
/*** 执行结果:* 发布者处理数据:Apple* 发布者处理数据:Orange* 发布者处理数据:Grape* 发布者处理数据:Banana* 发布者处理数据:Strawberry* 订阅者处理数据:我完成了*/

通常来说,发布者发布完之后,都需要调用then来处理数据,或调用thenEmpty返回一个空的Mono(Mono.empty())。

写在后面

如果本文对你有帮助,请点赞收藏关注一下吧 ~
在这里插入图片描述

相关文章:

响应式编程详解,带你熟悉Reactor响应式编程

文章目录一、什么是响应式编程1、Java的流和响应式流2、Java中响应式的使用3、Reactor中响应式流的基本接口4、Reactor中响应式接口的基本使用二、初始Reactor1、Flux和Mono的基本介绍2、引入Reactor依赖3、响应式类型的创建4、响应式类型的组合&#xff08;1&#xff09;使用m…...

踩坑篇之WebSocket实现类中无法使用@Autowired注入对象

大家好&#xff0c;我是小简&#xff0c;今天我又大意了&#xff0c;在WebSocket这个类上踩坑了。 接下来我讲讲我踩坑的经历吧&#xff01; package cn.donglifeng.shop.socket.endpoin;import cn.donglifeng.shop.common.context.SpringBeanContext; import cn.donglifeng.s…...

QT CTK插件框架 (一 下载编译)

CTK 为支持生物医学图像计算的公共开发包&#xff0c;其全称为 Common Toolkit。为医学成像提供一组统一的基本功能&#xff1b;促进代码和数据的交互及结合&#xff1b;避免重复开发&#xff1b;在工具包&#xff08;医学成像&#xff09;范围内不断扩展到新任务&#xff0c;而…...

【Java版oj】day10 井字棋、密码强度等级

目录 一、井字棋 &#xff08;1&#xff09;原题再现 &#xff08;2&#xff09;问题分析 &#xff08;3&#xff09;完整代码 二、密码强度等级 &#xff08;1&#xff09;原题再现 &#xff08;2&#xff09;问题分析 &#xff08;3&#xff09;完整代码 一、井字棋 &a…...

JavaScript的事件传播机制

你在学习和编写JavaScript时可能听说过事件冒泡&#xff08;event bubbling&#xff09;。它会发生在多个元素存在嵌套关系&#xff0c;并且这些元素都注册了同一事件(例如click)的监听器时。 但是事件冒泡只是事件机制的一部分。它经常与事件捕获(event capturing)和事件传播…...

队列的定义及基本操作实现(链式)

个人主页&#xff1a;【&#x1f60a;个人主页】 系列专栏&#xff1a;【❤️数据结构与算法】 学习名言&#xff1a;天子重英豪&#xff0c;文章教儿曹。万般皆下品&#xff0c;惟有读书高 系列文章目录 第一章 ❤️ 学前知识 第二章 ❤️ 单向链表 第三章 ❤️ 递归 文章目录…...

集成方法!

目录 关注降低variance,选择bias较小的基学习器 Bagging Stacking Random Forest 关注降低bias,选择variance较小的基学习器 Adaboost Boosting 关注降低variance,选择bias较小的基学习器 Bagging 给定m个样本的数据集&#xff0c;利用有放回的随机采样法&#xff0c;得…...

20年程序员生涯,读了200多本技术书,挑了几本精华好书分享给大家

不知不觉已经又走过了20个年头了&#xff0c;今年已经44了&#xff0c;虽然我已经退休在家&#xff0c;但一直都保持着读书的习惯&#xff0c;我每年平均要读10本技术书籍&#xff0c;保持不让自己的技术落伍。 这些年读的技术书不下200本&#xff0c;很多好书我都会保存在家&a…...

C++ 手写一个WebServer

文章目录 前言一、WebServer的原理刨析二、HTTP协议基础三、C++代码实战四、运行测试前言 本文由:我不会画饼呀 提供建议 大家如果有什么想看的文章(想了解的知识点),都可以在本专栏文章底部评论,或者私信我,在有能力的前提下,我都会尽量给大家写出来,供大家学习参考 …...

Elasticsearch 简介与安装

简介 Elasticsearch 是一个开源的搜索引擎&#xff0c;建立在一个全文搜索引擎库 Apache Lucene™ 基础之上。 Lucene 可以说是当下最先进、高性能、全功能的搜索引擎库—​无论是开源还是私有。 但是 Lucene 仅仅只是一个库。为了充分发挥其功能&#xff0c;你需要使用 Java…...

Qt5.12实战之QByteArray与字符指针及字符串转换

示例源码:#include <QCoreApplication> #include <QDebug> #include <QTextStream> static QTextStream cout (stdout,QIODevice::WriteOnly); #include <iostream> #include <QtGlobal> #include <QByteArray>void test() {qDebug() <…...

二、ElasticSearch基础语法

目录一、简单了解ik分词器(分词效果)1.standard(单字分词器&#xff0c;es默认分词器)2.ik_smart分词(粗粒度的拆分)3.ik_max_word分词器&#xff08;最细粒度拆分&#xff09;二、指定默认分词器1.为索引指定默认分词器三、ES操作数据1.概述2.创建索引3.查询索引4.删除索引5.添…...

Yolov8详解与实战

文章目录摘要模型详解C2F模块Losshead部分模型实战训练COCO数据集下载数据集COCO转yolo格式数据集&#xff08;适用V4&#xff0c;V5&#xff0c;V6&#xff0c;V7&#xff0c;V8&#xff09;配置yolov8环境训练测试训练自定义数据集Labelme数据集摘要 YOLOv8 是 ultralytics …...

多线程案例——阻塞队列

目录 一、阻塞队列 1. 生产者消费者模型 &#xff08;1&#xff09;解耦合 &#xff08;2&#xff09;“削峰填谷” 2. 标准库中的阻塞队列 3. 自己实现一个阻塞队列&#xff08;代码&#xff09; 4. 自己实现生产者消费者模型&#xff08;代码&#xff09; 一、阻塞队列…...

学习优秀博文(【国产MCU移植】手把手教你使用RT-Thread制作GD32系列BSP)有感 | 文末赠书5本

学习优秀博文&#xff08;【guo产MCU移植】手把手教你使用RT-Thread制作GD32系列BSP&#xff09;有感 一篇优秀的博文是什么样的&#xff1f;它有什么规律可循吗&#xff1f;优秀的guo产32位单片机处理器是否真的能成功替换掉stm32的垄断地位&#xff1f; 本文博主以亲身经历聊…...

写用例写的焦头烂额?看看摸鱼5年的老点工是怎么写的...

给你个需求&#xff0c;你要怎么转变成最终的用例&#xff1f; 直接把需求文档翻译一下就完事了。 老点工拿到需求后的标准操作&#xff1a; 第一步&#xff1a;解析需求 先解析需求-找出所有需求中的动词&#xff0c;再列出所有测试点。测试点过程不断发散&#xff0c;对于…...

基于深度学习的鸟类检测识别系统(含UI界面,Python代码)

摘要&#xff1a;鸟类识别是深度学习和机器视觉领域的一个热门应用&#xff0c;本文详细介绍基于YOLOv5的鸟类检测识别系统&#xff0c;在介绍算法原理的同时&#xff0c;给出Python的实现代码以及PyQt的UI界面。在界面中可以选择各种鸟类图片、视频以及开启摄像头进行检测识别…...

零基础搭建Tomcat集群(超详细)

&#x1f497;推荐阅读文章&#x1f497; &#x1f338;JavaSE系列&#x1f338;&#x1f449;1️⃣《JavaSE系列教程》&#x1f33a;MySQL系列&#x1f33a;&#x1f449;2️⃣《MySQL系列教程》&#x1f340;JavaWeb系列&#x1f340;&#x1f449;3️⃣《JavaWeb系列教程》…...

机器学习自学笔记——聚类

聚类的基本概念 聚类&#xff0c;顾名思义&#xff0c;就是将一个数据集中各个样本点聚集成不同的“类”。每个类中的样本点都有某些相似的特征。比如图书馆中&#xff0c;会把成百上千的书分成不同的类别&#xff1a;科普书、漫画书、科幻书等等&#xff0c;方便人们查找。每…...

注意下C语言整形提升

C语言整形提升 C语言整形提升是指在表达式中使用多种类型的数据时&#xff0c;编译器会自动将较小的类型转换为较大的类型&#xff0c;以便进行运算。在C语言中&#xff0c;整型提升规则如下&#xff1a; 如果表达式中存在short类型&#xff0c;则将其自动转换为int类型。 如…...

JavaSec-RCE

简介 RCE(Remote Code Execution)&#xff0c;可以分为:命令注入(Command Injection)、代码注入(Code Injection) 代码注入 1.漏洞场景&#xff1a;Groovy代码注入 Groovy是一种基于JVM的动态语言&#xff0c;语法简洁&#xff0c;支持闭包、动态类型和Java互操作性&#xff0c…...

线程与协程

1. 线程与协程 1.1. “函数调用级别”的切换、上下文切换 1. 函数调用级别的切换 “函数调用级别的切换”是指&#xff1a;像函数调用/返回一样轻量地完成任务切换。 举例说明&#xff1a; 当你在程序中写一个函数调用&#xff1a; funcA() 然后 funcA 执行完后返回&…...

基于服务器使用 apt 安装、配置 Nginx

&#x1f9fe; 一、查看可安装的 Nginx 版本 首先&#xff0c;你可以运行以下命令查看可用版本&#xff1a; apt-cache madison nginx-core输出示例&#xff1a; nginx-core | 1.18.0-6ubuntu14.6 | http://archive.ubuntu.com/ubuntu focal-updates/main amd64 Packages ng…...

【CSS position 属性】static、relative、fixed、absolute 、sticky详细介绍,多层嵌套定位示例

文章目录 ★ position 的五种类型及基本用法 ★ 一、position 属性概述 二、position 的五种类型详解(初学者版) 1. static(默认值) 2. relative(相对定位) 3. absolute(绝对定位) 4. fixed(固定定位) 5. sticky(粘性定位) 三、定位元素的层级关系(z-i…...

如何理解 IP 数据报中的 TTL?

目录 前言理解 前言 面试灵魂一问&#xff1a;说说对 IP 数据报中 TTL 的理解&#xff1f;我们都知道&#xff0c;IP 数据报由首部和数据两部分组成&#xff0c;首部又分为两部分&#xff1a;固定部分和可变部分&#xff0c;共占 20 字节&#xff0c;而即将讨论的 TTL 就位于首…...

Python 包管理器 uv 介绍

Python 包管理器 uv 全面介绍 uv 是由 Astral&#xff08;热门工具 Ruff 的开发者&#xff09;推出的下一代高性能 Python 包管理器和构建工具&#xff0c;用 Rust 编写。它旨在解决传统工具&#xff08;如 pip、virtualenv、pip-tools&#xff09;的性能瓶颈&#xff0c;同时…...

push [特殊字符] present

push &#x1f19a; present 前言present和dismiss特点代码演示 push和pop特点代码演示 前言 在 iOS 开发中&#xff0c;push 和 present 是两种不同的视图控制器切换方式&#xff0c;它们有着显著的区别。 present和dismiss 特点 在当前控制器上方新建视图层级需要手动调用…...

Golang——6、指针和结构体

指针和结构体 1、指针1.1、指针地址和指针类型1.2、指针取值1.3、new和make 2、结构体2.1、type关键字的使用2.2、结构体的定义和初始化2.3、结构体方法和接收者2.4、给任意类型添加方法2.5、结构体的匿名字段2.6、嵌套结构体2.7、嵌套匿名结构体2.8、结构体的继承 3、结构体与…...

SpringAI实战:ChatModel智能对话全解

一、引言&#xff1a;Spring AI 与 Chat Model 的核心价值 &#x1f680; 在 Java 生态中集成大模型能力&#xff0c;Spring AI 提供了高效的解决方案 &#x1f916;。其中 Chat Model 作为核心交互组件&#xff0c;通过标准化接口简化了与大语言模型&#xff08;LLM&#xff0…...

数据结构第5章:树和二叉树完全指南(自整理详细图文笔记)

名人说&#xff1a;莫道桑榆晚&#xff0c;为霞尚满天。——刘禹锡&#xff08;刘梦得&#xff0c;诗豪&#xff09; 原创笔记&#xff1a;Code_流苏(CSDN)&#xff08;一个喜欢古诗词和编程的Coder&#x1f60a;&#xff09; 上一篇&#xff1a;《数据结构第4章 数组和广义表》…...