步入响应式编程篇(二)之Reactor API
步入响应式编程篇(二)之Reactor API
- 前言
- 回顾响应式编程
- Reactor API的使用
- Stream
- 引入依赖
- Reactor API的使用
- 流源头的创建
- reactor api的背压模式
- 发布者与订阅者使用的线程
- 查看弹珠图
- 查看形成新流的日志
前言
对于响应式编程的基于概念,以及JDK自带的落地实现,可以查看步入响应式编程篇(一)
本篇将介绍Reactor API的使用:
reactor官网介绍,响应式编程是一种与数据流和变化传播相关的异步编程范式。这意味着可以通过所采用的编程语言轻松表达静态(例如数组)或动态(例如时间发射器)数据流;
对比Flow api以及completableFuture,前者编写代码比较麻烦,编写处理器还要自定义一个类,后者还不能满足响应式编程,两者都有其局限性,而Reactor API是基于Stream流操作的,无论是编写还是响应式编程都能满足;
回顾响应式编程
①在面向对象语言中,反应式编程范式通常作为 观察者设计模式的扩展。还可以比较主要的反应流模式与熟悉的迭代器设计模式,因为 Iterable- 所有这些库中的迭代器对。一个主要的区别是,虽然Iterator是基于拉的,但反应流是基于推的。
②使用迭代器是一种命令式编程模式,即使访问值的方法完全由Iterable负责。实际上,由开发人员来选择何时访问序列中的下一个()项。在反应式流中,上述对的等价物是发布者-订阅者。但它是 发布者在新的可用值到来时通知订阅者,这种推送方面是响应的关键。此外,应用于推送值的操作是以声明方式而不是命令方式表达的:程序员表达计算的逻辑,而不是描述其确切的控制流。
③除了推送值之外,还以定义良好的方式涵盖了错误处理和完成方面。发布者可以将新值推送到其订阅者(通过调用onNext),但也可以发出错误信号(通过调用onError)或完成信号(通过调用onComplete)。错误和完成都会终止序列。这可以归纳如下:
onNext x 0…N [onError | onComplete]
这种方法非常灵活。该模式支持没有值、只有一个值或有n个值的用例(包括无限序列的值,例如时钟的连续滴答声)。
Reactor API的使用
Stream
用的就是Java 8引用的Stream API,使用Stream api时也会结合lambda表达式和函数式接口,所以Stream api是包括它们的;
虽然是众所周知,但笔者在此还是提一嘴,Stream API的链式调用,是每个元素完成整个流所有步骤的处理后,再遍历下一个元素的。而不是所有元素完成后,再让流中的下一个步骤处理全部元素,这是一不小心就会陷入的理解误区;
还有诸如flatMap()、map()等参数为函数式接口的种类,总共有四类
//Predicate 有入参且返回值固定为boolean
//Consumer 有入参无返回值
//Function 有一个入参且一个返回值
//Supplier 无入参且有返回值
引入依赖
需引入Reactor API的依赖
<dependency><groupId>io.projectreactor</groupId><artifactId>reactor-bom</artifactId><version>2023.0.0</version>
</dependency>
Reactor API的使用
Reactor API的使用是定义流源头,然后往下流,使用链式处理多个步骤,最后流向最后,就是异常或响应信号。
流源头的创建
创建流的方式有Flux和Mono两种,前者是用于创建多个元素的流,后者是创建只有一个元素的流;

这就分别创建了Flux流和Mono流,很轻易实现了一个全异步的系统。相比Flow流的一顿操作,是不是简便了很多;
而创建流,需指定流的源头,有四种方式
①just(),如上例,穷举出所有元素;
②rang();
//表示流有1到100个元素
Flux.range(1,100);
③generate();
这适用于同步和逐个发射,这意味着接收器是一个SynchronousSink,并且它的next()方法在每次回调调用中最多只能被调用一次。
//入参是泛型为SynchronousSink的Consumer接口
Flux.generate(sink->{//查库存sink.next(queryStock("1"));sink.complete();
}).subscribe(System.out::println);//但同一次回调中循环调用next()会报错,所以先初始化state值为0
Flux.generate(()->0,(state,sink)->{if (state<=10) {//小于10,则继续调用,里面的逻辑是每次将state+1sink.next(state);}else{sink.complete();}return state+1;}).subscribe(System.out::println);
④create方式,create是一种更高级的Flux程序化创建形式,它适合于每轮多个发射,甚至来自多个线程。create无需指定初始值而且可以多次执行sink#next():
Flux.create(sink->{for (int i=0;i<2;i++) {sink.next(queryStock("1"));}sink.complete();
}).subscribe(System.out::println);
在业务上通常使用后两种,例如可以从缓存或DB查到数据,然后类似于观察者模式,往后推送处理;更多使用create,因为generate需设置初始值,而且每次回调只能调用一次sink#next()
reactor api的背压模式
使用背压模式的原因——响应式编程的思想,主要在于推,推送到每个处理器操作,如果不管当前处理能力就很容易处理出问题,类似于MQ消费者控制从队列中获取数据量,要控制消费能力,于是引出背压模式,接收时通过request告诉上游,所以在Reactor中实现背压时,消费者压力传播回源的方式是向上游操作员发送请求。当前请求的总和有时被称为当前“需求”或“待决请求”。需求的上限为Long.MAX_VALUE,表示一个无限的请求(意思是“尽可能快地生产”-基本上禁用反压)。
如下,与Flow API类似,也是在自定义订阅者中指定背压请求request()
Flux.range(1, 10).doOnRequest(r -> System.out.println("request of " + r)).subscribe(new BaseSubscriber<Integer>() {@Overridepublic void hookOnSubscribe(Subscription subscription) {request(1);}@Overridepublic void hookOnNext(Integer integer) {System.out.println("Cancelling after having received " + integer);cancel();}});
发布者与订阅者使用的线程
查看弹珠图
Flux.create(sink->{for (int i=0;i<2;i++) {sink.next(i*queryStock("1"));}//代表推送给下一个操作形成新流sink.complete();//过滤不等于0的数
}).filter(Predicate.not(Predicate.isEqual(0))).subscribe(System.out::println);
鼠标点到filter操作,就可以看到元素(这些不同形状的方块就是了)经过filter后,变少了,点到每个操作map、flatmap等都可以看到对应的弹珠图,可根据弹珠图快速理解形成新流的操作

查看形成新流的日志

在每个流后面使用log(),查看该流形成每个元素的日志,如都是request无限元素,然后create后,就调onNext(0),形成元素0,后面filter就过滤了0,就没有再调onNext(0),继续create元素1时,就调 onNext(1),形成元素1,后面filter后,形成的新流中仍然有元素1,所以还会调用onNext(1),以此类推。
默认情况下,发布者使用的线程就是订阅者的线程,那就证明一下:
如上用log()打印出,订阅者和发布者使用的线程都是main,那如果订阅者开启新的线程,下图也能看到发布者回调onNext操作也是使用订阅者的线程:

下图,使用publishOn()里指定新线程Schedulers.single(),代表过滤后,发布者使用新线程会回调onNext()

图中①是filter之前,发布者还是用的订阅者的线程回调onNext,②是filter之后,发布者使用新的线程回调onNext()。
总之,上面只是介绍reactor api操作的冰山一角,至于更多操作可以查看官网哈
如有需要收藏的看官,顺便也用发财的小手点点赞哈,如有错漏,也欢迎各位在评论区评论!
参考官网:https://projectreactor.io/docs/core/release/reference/aboutDoc.html
相关文章:
步入响应式编程篇(二)之Reactor API
步入响应式编程篇(二)之Reactor API 前言回顾响应式编程Reactor API的使用Stream引入依赖Reactor API的使用流源头的创建 reactor api的背压模式发布者与订阅者使用的线程查看弹珠图查看形成新流的日志 前言 对于响应式编程的基于概念,以及J…...
Oracle SQL: TRANSLATE 和 REGEXP_LIKE 的知识点详细分析
目录 前言1. TRANSLATE2. REGEXP_LIKE3. 实战 前言 🤟 找工作,来万码优才:👉 #小程序://万码优才/r6rqmzDaXpYkJZF 1. TRANSLATE TRANSLATE 用于替换字符串中指定字符集的每个字符,返回替换后的字符串 逐一映射输入字…...
RabbitMQ 在实际应用时要注意的问题
1. 幂等性保障 1.1 幂等性介绍 幂等性是数学和计算机科学中某些运算的性质,它们可以被多次应⽤,⽽不会改变初始应⽤的结果. 应⽤程序的幂等性介绍 在应⽤程序中,幂等性就是指对⼀个系统进⾏重复调⽤(相同参数),不论请求多少次,这些请求对系统的影响都是相同的效果. ⽐如数据库…...
算法日记8:StarryCoding60(单调栈)
一、题目 二、解题思路: 题意为让我们找到每个元素的左边第一个比它小的元素,若不存在则输出-1 2.1法一:暴力(0n2) 首先,我们可以想到最朴素的算法:直接暴力两层for达成目标核心代码如下&…...
大象机器人发布首款穿戴式数据采集器myController S570,助力具身智能数据收集!
myController S570 具有较高的数据采集速度和远程控制能力,大大简化了人形机器人的编程。 myController S570 是一款可移动的轻量级外骨骼,具有 14 个关节、2 个操纵杆和 2 个按钮,它提供高数据采集速度,出色的兼容性,…...
【银河麒麟高级服务器操作系统】业务访问慢网卡丢包现象分析及处理过程
了解更多银河麒麟操作系统全新产品,请点击访问 麒麟软件产品专区:product.kylinos.cn 开发者专区:developer.kylinos.cn 文档中心:document.kylinos.cn 交流论坛:forum.kylinos.cn 服务器环境以及配置 【内核版本…...
C语言之饭店外卖信息管理系统
🌟 嗨,我是LucianaiB! 🌍 总有人间一两风,填我十万八千梦。 🚀 路漫漫其修远兮,吾将上下而求索。 C语言之饭店外卖信息管理系统 目录 设计题目设计目的设计任务描述设计要求输入和输出要求验…...
记一次 .NET某数字化协同管理系统 内存暴涨分析
一:背景 1. 讲故事 高级调试训练营里的一位朋友找到我,说他们跑在linux上的.NET程序出现了内存泄露的情况,上windbg观察发现内存都是IMAGE给吃掉了,那些image都标记了 doublemapper__deleted_ 字样,问我为啥会这样&a…...
部门管理查询部门,nginx反向代理,前端如何访问到后端Tomcat 注解@RequestParam
接口开发 增删改通常是不用返回data数据,返回null 列表查询-结果封装,时间 前后端联调测试 nginx反向代理,前端如何访问到后端Tomcat服务器 删除部门...
JS通过ASCII码值实现随机字符串的生成(可指定长度以及解决首位不出现数值)
在之前写过一篇“JS实现随机生成字符串(可指定长度)”,当时写的过于简单和传统,比较粗放。此次针对此问题,对随机生成字符串的功能进行优化处理,对随机取到的字符都通过程序自动来完成。 在写之前ÿ…...
速通Docker === 快速部署Redis主从集群
目录 镜像仓库介绍 持久化你的数据库 连接到其他容器 创建自定义网络 部署主节点 部署从节点 验证部署 总结 在现代应用架构中,Redis作为一个高性能的内存数据库,被广泛应用于缓存、会话存储、实时分析等多个领域。为了提高Redis的可用性和数据的…...
论文笔记(六十三)Understanding Diffusion Models: A Unified Perspective(一)
Understanding Diffusion Models: A Unified Perspective(一) 文章概括引言:生成模型背景:ELBO、VAE 和分层 VAE证据下界(Evidence Lower Bound)变分自编码器 (Variational Autoencoders&#x…...
stm32使用MDK5.35时遇到*** TOOLS.INI: TOOLCHAIN NOT INSTALLED
mdk5.35出现*** TOOLS.INI: TOOLCHAIN NOT INSTALLED的问题!!!! 以管理员身份重新打开MDK5.35.0.0,用keygen破解密码,但是一直提示我是没有破解成功。 解决办法: target 改成ARM...
在Ubuntu上安装RabbitMQ教程
1、安装erlang 因为rabbitmq是基于erlang开发的,所以要安装rabbitmq,首先需要安装erlang运行环境 apt-get install erlang执行命令查是否安装成功:erl,疯狂 Ctrlc 就能退出命令行 2、安装rabbitmq 1、查看erlang与rabbitmq版本…...
【算法】集合List和队列
阿华代码,不是逆风,就是我疯 你们的点赞收藏是我前进最大的动力!! 希望本文内容能够帮助到你!! 目录 零:集合,队列的用法 一:字母异位词分组 二:二叉树的锯…...
uniapps使用HTML5的io模块拷贝文件目录
最近在集成sqlite到uniapp的过程中,因为要将sqlite数据库预加载,所以需要使用HTML5的plus.io模块。使用过程中遇到了许多问题,比如文件路径总是解析不到等。尤其是应用私有文档目录’_doc’。 根据官方文档: 为了安全管理应用的…...
css‘s hover VS mobile
.animation {animation: 30s move infinite linear;/* &:hover {animation-play-state: paused;*/ }原本写的好好的,测试说:“移动端点击滚动条,跳转到其他页面后,返回当前页面,滚动条不滚动;可以优化位…...
工业制造离不开的BOM
在制造业的浩瀚星空中,物料清单(BOM)犹如“北极星”,牢牢指引着产品从设计蓝图迈向实物诞生的全过程。 BOM的分类 按照设计制造的不同阶段,将BOM划分为设计BOM、工艺BOM、制造BOM三种类型。 设计BOM Engineering BO…...
HTML中的`<!DOCTYPE html>`是什么意思?
诸神缄默不语-个人CSDN博文目录 在学习HTML时,我们经常会看到HTML文档的开头出现<!DOCTYPE html>,它是HTML文件的第一行。很多初学者可能会疑惑,为什么需要这行代码?它到底有什么作用呢?在这篇文章中࿰…...
C语言之斗地主游戏
🌟 嗨,我是LucianaiB! 🌍 总有人间一两风,填我十万八千梦。 🚀 路漫漫其修远兮,吾将上下而求索。 C语言之斗地主游戏 目录 程序概述程序设计 Card类CardGroup类Player类LastCards类Land…...
华为云AI开发平台ModelArts
华为云ModelArts:重塑AI开发流程的“智能引擎”与“创新加速器”! 在人工智能浪潮席卷全球的2025年,企业拥抱AI的意愿空前高涨,但技术门槛高、流程复杂、资源投入巨大的现实,却让许多创新构想止步于实验室。数据科学家…...
C++初阶-list的底层
目录 1.std::list实现的所有代码 2.list的简单介绍 2.1实现list的类 2.2_list_iterator的实现 2.2.1_list_iterator实现的原因和好处 2.2.2_list_iterator实现 2.3_list_node的实现 2.3.1. 避免递归的模板依赖 2.3.2. 内存布局一致性 2.3.3. 类型安全的替代方案 2.3.…...
java_网络服务相关_gateway_nacos_feign区别联系
1. spring-cloud-starter-gateway 作用:作为微服务架构的网关,统一入口,处理所有外部请求。 核心能力: 路由转发(基于路径、服务名等)过滤器(鉴权、限流、日志、Header 处理)支持负…...
Docker 运行 Kafka 带 SASL 认证教程
Docker 运行 Kafka 带 SASL 认证教程 Docker 运行 Kafka 带 SASL 认证教程一、说明二、环境准备三、编写 Docker Compose 和 jaas文件docker-compose.yml代码说明:server_jaas.conf 四、启动服务五、验证服务六、连接kafka服务七、总结 Docker 运行 Kafka 带 SASL 认…...
【android bluetooth 框架分析 04】【bt-framework 层详解 1】【BluetoothProperties介绍】
1. BluetoothProperties介绍 libsysprop/srcs/android/sysprop/BluetoothProperties.sysprop BluetoothProperties.sysprop 是 Android AOSP 中的一种 系统属性定义文件(System Property Definition File),用于声明和管理 Bluetooth 模块相…...
现代密码学 | 椭圆曲线密码学—附py代码
Elliptic Curve Cryptography 椭圆曲线密码学(ECC)是一种基于有限域上椭圆曲线数学特性的公钥加密技术。其核心原理涉及椭圆曲线的代数性质、离散对数问题以及有限域上的运算。 椭圆曲线密码学是多种数字签名算法的基础,例如椭圆曲线数字签…...
Spring AI 入门:Java 开发者的生成式 AI 实践之路
一、Spring AI 简介 在人工智能技术快速迭代的今天,Spring AI 作为 Spring 生态系统的新生力量,正在成为 Java 开发者拥抱生成式 AI 的最佳选择。该框架通过模块化设计实现了与主流 AI 服务(如 OpenAI、Anthropic)的无缝对接&…...
【python异步多线程】异步多线程爬虫代码示例
claude生成的python多线程、异步代码示例,模拟20个网页的爬取,每个网页假设要0.5-2秒完成。 代码 Python多线程爬虫教程 核心概念 多线程:允许程序同时执行多个任务,提高IO密集型任务(如网络请求)的效率…...
Spring AI Chat Memory 实战指南:Local 与 JDBC 存储集成
一个面向 Java 开发者的 Sring-Ai 示例工程项目,该项目是一个 Spring AI 快速入门的样例工程项目,旨在通过一些小的案例展示 Spring AI 框架的核心功能和使用方法。 项目采用模块化设计,每个模块都专注于特定的功能领域,便于学习和…...
华为OD机试-最短木板长度-二分法(A卷,100分)
此题是一个最大化最小值的典型例题, 因为搜索范围是有界的,上界最大木板长度补充的全部木料长度,下界最小木板长度; 即left0,right10^6; 我们可以设置一个候选值x(mid),将木板的长度全部都补充到x,如果成功…...
