当前位置: 首页 > news >正文

步入响应式编程篇(二)之Reactor API

步入响应式编程篇(二)之Reactor API

  • 前言
  • 回顾响应式编程
  • Reactor API的使用
    • Stream
    • 引入依赖
    • Reactor API的使用
      • 流源头的创建
    • reactor api的背压模式
    • 发布者与订阅者使用的线程
      • 查看弹珠图
      • 查看形成新流的日志

前言

对于响应式编程的基于概念,以及JDK自带的落地实现,可以查看步入响应式编程篇(一)
本篇将介绍Reactor API的使用:

reactor官网介绍,响应式编程是一种与数据流和变化传播相关的异步编程范式。这意味着可以通过所采用的编程语言轻松表达静态(例如数组)或动态(例如时间发射器)数据流;

对比Flow api以及completableFuture,前者编写代码比较麻烦,编写处理器还要自定义一个类,后者还不能满足响应式编程,两者都有其局限性,而Reactor API是基于Stream流操作的,无论是编写还是响应式编程都能满足;

回顾响应式编程

①在面向对象语言中,反应式编程范式通常作为 观察者设计模式的扩展。还可以比较主要的反应流模式与熟悉的迭代器设计模式,因为 Iterable- 所有这些库中的迭代器对。一个主要的区别是,虽然Iterator是基于拉的,但反应流是基于推的。

②使用迭代器是一种命令式编程模式,即使访问值的方法完全由Iterable负责。实际上,由开发人员来选择何时访问序列中的下一个()项。在反应式流中,上述对的等价物是发布者-订阅者。但它是 发布者在新的可用值到来时通知订阅者,这种推送方面是响应的关键。此外,应用于推送值的操作是以声明方式而不是命令方式表达的:程序员表达计算的逻辑,而不是描述其确切的控制流。

③除了推送值之外,还以定义良好的方式涵盖了错误处理和完成方面。发布者可以将新值推送到其订阅者(通过调用onNext),但也可以发出错误信号(通过调用onError)或完成信号(通过调用onComplete)。错误和完成都会终止序列。这可以归纳如下:

onNext x 0…N [onError | onComplete]

这种方法非常灵活。该模式支持没有值、只有一个值或有n个值的用例(包括无限序列的值,例如时钟的连续滴答声)。

Reactor API的使用

Stream

用的就是Java 8引用的Stream API,使用Stream api时也会结合lambda表达式和函数式接口,所以Stream api是包括它们的;
虽然是众所周知,但笔者在此还是提一嘴,Stream API的链式调用,是每个元素完成整个流所有步骤的处理后,再遍历下一个元素的。而不是所有元素完成后,再让流中的下一个步骤处理全部元素,这是一不小心就会陷入的理解误区;

还有诸如flatMap()、map()等参数为函数式接口的种类,总共有四类
//Predicate 有入参且返回值固定为boolean
//Consumer 有入参无返回值
//Function 有一个入参且一个返回值
//Supplier 无入参且有返回值

引入依赖

需引入Reactor API的依赖

<dependency><groupId>io.projectreactor</groupId><artifactId>reactor-bom</artifactId><version>2023.0.0</version>
</dependency>

Reactor API的使用

Reactor API的使用是定义流源头,然后往下流,使用链式处理多个步骤,最后流向最后,就是异常或响应信号。

流源头的创建

创建流的方式有Flux和Mono两种,前者是用于创建多个元素的流,后者是创建只有一个元素的流;
在这里插入图片描述

这就分别创建了Flux流和Mono流,很轻易实现了一个全异步的系统。相比Flow流的一顿操作,是不是简便了很多;

而创建流,需指定流的源头,有四种方式
①just(),如上例,穷举出所有元素;
②rang();

//表示流有1到100个元素
Flux.range(1,100);

③generate();
这适用于同步和逐个发射,这意味着接收器是一个SynchronousSink,并且它的next()方法在每次回调调用中最多只能被调用一次。

//入参是泛型为SynchronousSink的Consumer接口
Flux.generate(sink->{//查库存sink.next(queryStock("1"));sink.complete();
}).subscribe(System.out::println);//但同一次回调中循环调用next()会报错,所以先初始化state值为0
Flux.generate(()->0,(state,sink)->{if (state<=10) {//小于10,则继续调用,里面的逻辑是每次将state+1sink.next(state);}else{sink.complete();}return state+1;}).subscribe(System.out::println);

④create方式,create是一种更高级的Flux程序化创建形式,它适合于每轮多个发射,甚至来自多个线程。create无需指定初始值而且可以多次执行sink#next():

Flux.create(sink->{for (int i=0;i<2;i++) {sink.next(queryStock("1"));}sink.complete();
}).subscribe(System.out::println);

在业务上通常使用后两种,例如可以从缓存或DB查到数据,然后类似于观察者模式,往后推送处理;更多使用create,因为generate需设置初始值,而且每次回调只能调用一次sink#next()

reactor api的背压模式

使用背压模式的原因——响应式编程的思想,主要在于推,推送到每个处理器操作,如果不管当前处理能力就很容易处理出问题,类似于MQ消费者控制从队列中获取数据量,要控制消费能力,于是引出背压模式,接收时通过request告诉上游,所以在Reactor中实现背压时,消费者压力传播回源的方式是向上游操作员发送请求。当前请求的总和有时被称为当前“需求”或“待决请求”。需求的上限为Long.MAX_VALUE,表示一个无限的请求(意思是“尽可能快地生产”-基本上禁用反压)。
如下,与Flow API类似,也是在自定义订阅者中指定背压请求request()

Flux.range(1, 10).doOnRequest(r -> System.out.println("request of " + r)).subscribe(new BaseSubscriber<Integer>() {@Overridepublic void hookOnSubscribe(Subscription subscription) {request(1);}@Overridepublic void hookOnNext(Integer integer) {System.out.println("Cancelling after having received " + integer);cancel();}});

发布者与订阅者使用的线程

查看弹珠图

Flux.create(sink->{for (int i=0;i<2;i++) {sink.next(i*queryStock("1"));}//代表推送给下一个操作形成新流sink.complete();//过滤不等于0的数
}).filter(Predicate.not(Predicate.isEqual(0))).subscribe(System.out::println);

鼠标点到filter操作,就可以看到元素(这些不同形状的方块就是了)经过filter后,变少了,点到每个操作map、flatmap等都可以看到对应的弹珠图,可根据弹珠图快速理解形成新流的操作
在这里插入图片描述

查看形成新流的日志

在这里插入图片描述
在每个流后面使用log(),查看该流形成每个元素的日志,如都是request无限元素,然后create后,就调onNext(0),形成元素0,后面filter就过滤了0,就没有再调onNext(0),继续create元素1时,就调 onNext(1),形成元素1,后面filter后,形成的新流中仍然有元素1,所以还会调用onNext(1),以此类推。

默认情况下,发布者使用的线程就是订阅者的线程,那就证明一下:
如上用log()打印出,订阅者和发布者使用的线程都是main,那如果订阅者开启新的线程,下图也能看到发布者回调onNext操作也是使用订阅者的线程:

在这里插入图片描述

下图,使用publishOn()里指定新线程Schedulers.single(),代表过滤后,发布者使用新线程会回调onNext()
在这里插入图片描述
图中①是filter之前,发布者还是用的订阅者的线程回调onNext,②是filter之后,发布者使用新的线程回调onNext()。

总之,上面只是介绍reactor api操作的冰山一角,至于更多操作可以查看官网哈

如有需要收藏的看官,顺便也用发财的小手点点赞哈,如有错漏,也欢迎各位在评论区评论!

参考官网:https://projectreactor.io/docs/core/release/reference/aboutDoc.html

相关文章:

步入响应式编程篇(二)之Reactor API

步入响应式编程篇&#xff08;二&#xff09;之Reactor API 前言回顾响应式编程Reactor API的使用Stream引入依赖Reactor API的使用流源头的创建 reactor api的背压模式发布者与订阅者使用的线程查看弹珠图查看形成新流的日志 前言 对于响应式编程的基于概念&#xff0c;以及J…...

Oracle SQL: TRANSLATE 和 REGEXP_LIKE 的知识点详细分析

目录 前言1. TRANSLATE2. REGEXP_LIKE3. 实战 前言 &#x1f91f; 找工作&#xff0c;来万码优才&#xff1a;&#x1f449; #小程序://万码优才/r6rqmzDaXpYkJZF 1. TRANSLATE TRANSLATE 用于替换字符串中指定字符集的每个字符&#xff0c;返回替换后的字符串 逐一映射输入字…...

RabbitMQ 在实际应用时要注意的问题

1. 幂等性保障 1.1 幂等性介绍 幂等性是数学和计算机科学中某些运算的性质,它们可以被多次应⽤,⽽不会改变初始应⽤的结果. 应⽤程序的幂等性介绍 在应⽤程序中,幂等性就是指对⼀个系统进⾏重复调⽤(相同参数),不论请求多少次,这些请求对系统的影响都是相同的效果. ⽐如数据库…...

算法日记8:StarryCoding60(单调栈)

一、题目 二、解题思路&#xff1a; 题意为让我们找到每个元素的左边第一个比它小的元素&#xff0c;若不存在则输出-1 2.1法一&#xff1a;暴力&#xff08;0n2&#xff09; 首先&#xff0c;我们可以想到最朴素的算法&#xff1a;直接暴力两层for达成目标核心代码如下&…...

大象机器人发布首款穿戴式数据采集器myController S570,助力具身智能数据收集!

myController S570 具有较高的数据采集速度和远程控制能力&#xff0c;大大简化了人形机器人的编程。 myController S570 是一款可移动的轻量级外骨骼&#xff0c;具有 14 个关节、2 个操纵杆和 2 个按钮&#xff0c;它提供高数据采集速度&#xff0c;出色的兼容性&#xff0c…...

【银河麒麟高级服务器操作系统】业务访问慢网卡丢包现象分析及处理过程

了解更多银河麒麟操作系统全新产品&#xff0c;请点击访问 麒麟软件产品专区&#xff1a;product.kylinos.cn 开发者专区&#xff1a;developer.kylinos.cn 文档中心&#xff1a;document.kylinos.cn 交流论坛&#xff1a;forum.kylinos.cn 服务器环境以及配置 【内核版本…...

C语言之饭店外卖信息管理系统

&#x1f31f; 嗨&#xff0c;我是LucianaiB&#xff01; &#x1f30d; 总有人间一两风&#xff0c;填我十万八千梦。 &#x1f680; 路漫漫其修远兮&#xff0c;吾将上下而求索。 C语言之饭店外卖信息管理系统 目录 设计题目设计目的设计任务描述设计要求输入和输出要求验…...

记一次 .NET某数字化协同管理系统 内存暴涨分析

一&#xff1a;背景 1. 讲故事 高级调试训练营里的一位朋友找到我&#xff0c;说他们跑在linux上的.NET程序出现了内存泄露的情况&#xff0c;上windbg观察发现内存都是IMAGE给吃掉了&#xff0c;那些image都标记了 doublemapper__deleted_ 字样&#xff0c;问我为啥会这样&a…...

部门管理查询部门,nginx反向代理,前端如何访问到后端Tomcat 注解@RequestParam

接口开发 增删改通常是不用返回data数据&#xff0c;返回null 列表查询-结果封装&#xff0c;时间 前后端联调测试 nginx反向代理&#xff0c;前端如何访问到后端Tomcat服务器 删除部门...

JS通过ASCII码值实现随机字符串的生成(可指定长度以及解决首位不出现数值)

在之前写过一篇“JS实现随机生成字符串&#xff08;可指定长度&#xff09;”&#xff0c;当时写的过于简单和传统&#xff0c;比较粗放。此次针对此问题&#xff0c;对随机生成字符串的功能进行优化处理&#xff0c;对随机取到的字符都通过程序自动来完成。 在写之前&#xff…...

速通Docker === 快速部署Redis主从集群

目录 镜像仓库介绍 持久化你的数据库 连接到其他容器 创建自定义网络 部署主节点 部署从节点 验证部署 总结 在现代应用架构中&#xff0c;Redis作为一个高性能的内存数据库&#xff0c;被广泛应用于缓存、会话存储、实时分析等多个领域。为了提高Redis的可用性和数据的…...

论文笔记(六十三)Understanding Diffusion Models: A Unified Perspective(一)

Understanding Diffusion Models: A Unified Perspective&#xff08;一&#xff09; 文章概括引言&#xff1a;生成模型背景&#xff1a;ELBO、VAE 和分层 VAE证据下界&#xff08;Evidence Lower Bound&#xff09;变分自编码器 &#xff08;Variational Autoencoders&#x…...

stm32使用MDK5.35时遇到*** TOOLS.INI: TOOLCHAIN NOT INSTALLED

mdk5.35出现*** TOOLS.INI: TOOLCHAIN NOT INSTALLED的问题&#xff01;&#xff01;&#xff01;&#xff01; 以管理员身份重新打开MDK5.35.0.0&#xff0c;用keygen破解密码&#xff0c;但是一直提示我是没有破解成功。 解决办法&#xff1a; target 改成ARM...

在Ubuntu上安装RabbitMQ教程

1、安装erlang 因为rabbitmq是基于erlang开发的&#xff0c;所以要安装rabbitmq&#xff0c;首先需要安装erlang运行环境 apt-get install erlang执行命令查是否安装成功&#xff1a;erl&#xff0c;疯狂 Ctrlc 就能退出命令行 2、安装rabbitmq 1、查看erlang与rabbitmq版本…...

【算法】集合List和队列

阿华代码&#xff0c;不是逆风&#xff0c;就是我疯 你们的点赞收藏是我前进最大的动力&#xff01;&#xff01; 希望本文内容能够帮助到你&#xff01;&#xff01; 目录 零&#xff1a;集合&#xff0c;队列的用法 一&#xff1a;字母异位词分组 二&#xff1a;二叉树的锯…...

uniapps使用HTML5的io模块拷贝文件目录

最近在集成sqlite到uniapp的过程中&#xff0c;因为要将sqlite数据库预加载&#xff0c;所以需要使用HTML5的plus.io模块。使用过程中遇到了许多问题&#xff0c;比如文件路径总是解析不到等。尤其是应用私有文档目录’_doc’。 根据官方文档&#xff1a; 为了安全管理应用的…...

css‘s hover VS mobile

.animation {animation: 30s move infinite linear;/* &:hover {animation-play-state: paused;*/ }原本写的好好的&#xff0c;测试说&#xff1a;“移动端点击滚动条&#xff0c;跳转到其他页面后&#xff0c;返回当前页面&#xff0c;滚动条不滚动&#xff1b;可以优化位…...

工业制造离不开的BOM

在制造业的浩瀚星空中&#xff0c;物料清单&#xff08;BOM&#xff09;犹如“北极星”&#xff0c;牢牢指引着产品从设计蓝图迈向实物诞生的全过程。 BOM的分类 按照设计制造的不同阶段&#xff0c;将BOM划分为设计BOM、工艺BOM、制造BOM三种类型。 设计BOM Engineering BO…...

HTML中的`<!DOCTYPE html>`是什么意思?

诸神缄默不语-个人CSDN博文目录 在学习HTML时&#xff0c;我们经常会看到HTML文档的开头出现<!DOCTYPE html>&#xff0c;它是HTML文件的第一行。很多初学者可能会疑惑&#xff0c;为什么需要这行代码&#xff1f;它到底有什么作用呢&#xff1f;在这篇文章中&#xff0…...

C语言之斗地主游戏

&#x1f31f; 嗨&#xff0c;我是LucianaiB&#xff01; &#x1f30d; 总有人间一两风&#xff0c;填我十万八千梦。 &#x1f680; 路漫漫其修远兮&#xff0c;吾将上下而求索。 ​ C语言之斗地主游戏 目录 程序概述程序设计 Card类CardGroup类Player类LastCards类Land…...

最新SpringBoot+SpringCloud+Nacos微服务框架分享

文章目录 前言一、服务规划二、架构核心1.cloud的pom2.gateway的异常handler3.gateway的filter4、admin的pom5、admin的登录核心 三、code-helper分享总结 前言 最近有个活蛮赶的&#xff0c;根据Excel列的需求预估的工时直接打骨折&#xff0c;不要问我为什么&#xff0c;主要…...

多模态大语言模型arxiv论文略读(108)

CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文标题&#xff1a;CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文作者&#xff1a;Sayna Ebrahimi, Sercan O. Arik, Tejas Nama, Tomas Pfister ➡️ 研究机构: Google Cloud AI Re…...

Android 之 kotlin 语言学习笔记三(Kotlin-Java 互操作)

参考官方文档&#xff1a;https://developer.android.google.cn/kotlin/interop?hlzh-cn 一、Java&#xff08;供 Kotlin 使用&#xff09; 1、不得使用硬关键字 不要使用 Kotlin 的任何硬关键字作为方法的名称 或字段。允许使用 Kotlin 的软关键字、修饰符关键字和特殊标识…...

智能分布式爬虫的数据处理流水线优化:基于深度强化学习的数据质量控制

在数字化浪潮席卷全球的今天&#xff0c;数据已成为企业和研究机构的核心资产。智能分布式爬虫作为高效的数据采集工具&#xff0c;在大规模数据获取中发挥着关键作用。然而&#xff0c;传统的数据处理流水线在面对复杂多变的网络环境和海量异构数据时&#xff0c;常出现数据质…...

OPENCV形态学基础之二腐蚀

一.腐蚀的原理 (图1) 数学表达式&#xff1a;dst(x,y) erode(src(x,y)) min(x,y)src(xx,yy) 腐蚀也是图像形态学的基本功能之一&#xff0c;腐蚀跟膨胀属于反向操作&#xff0c;膨胀是把图像图像变大&#xff0c;而腐蚀就是把图像变小。腐蚀后的图像变小变暗淡。 腐蚀…...

代码随想录刷题day30

1、零钱兑换II 给你一个整数数组 coins 表示不同面额的硬币&#xff0c;另给一个整数 amount 表示总金额。 请你计算并返回可以凑成总金额的硬币组合数。如果任何硬币组合都无法凑出总金额&#xff0c;返回 0 。 假设每一种面额的硬币有无限个。 题目数据保证结果符合 32 位带…...

R语言速释制剂QBD解决方案之三

本文是《Quality by Design for ANDAs: An Example for Immediate-Release Dosage Forms》第一个处方的R语言解决方案。 第一个处方研究评估原料药粒径分布、MCC/Lactose比例、崩解剂用量对制剂CQAs的影响。 第二处方研究用于理解颗粒外加硬脂酸镁和滑石粉对片剂质量和可生产…...

【Go语言基础【12】】指针:声明、取地址、解引用

文章目录 零、概述&#xff1a;指针 vs. 引用&#xff08;类比其他语言&#xff09;一、指针基础概念二、指针声明与初始化三、指针操作符1. &&#xff1a;取地址&#xff08;拿到内存地址&#xff09;2. *&#xff1a;解引用&#xff08;拿到值&#xff09; 四、空指针&am…...

【从零学习JVM|第三篇】类的生命周期(高频面试题)

前言&#xff1a; 在Java编程中&#xff0c;类的生命周期是指类从被加载到内存中开始&#xff0c;到被卸载出内存为止的整个过程。了解类的生命周期对于理解Java程序的运行机制以及性能优化非常重要。本文会深入探寻类的生命周期&#xff0c;让读者对此有深刻印象。 目录 ​…...

uniapp 开发ios, xcode 提交app store connect 和 testflight内测

uniapp 中配置 配置manifest 文档&#xff1a;manifest.json 应用配置 | uni-app官网 hbuilderx中本地打包 下载IOS最新SDK 开发环境 | uni小程序SDK hbulderx 版本号&#xff1a;4.66 对应的sdk版本 4.66 两者必须一致 本地打包的资源导入到SDK 导入资源 | uni小程序SDK …...