当前位置: 首页 > news >正文

步入响应式编程篇(二)之Reactor API

步入响应式编程篇(二)之Reactor API

  • 前言
  • 回顾响应式编程
  • Reactor API的使用
    • Stream
    • 引入依赖
    • Reactor API的使用
      • 流源头的创建
    • reactor api的背压模式
    • 发布者与订阅者使用的线程
      • 查看弹珠图
      • 查看形成新流的日志

前言

对于响应式编程的基于概念,以及JDK自带的落地实现,可以查看步入响应式编程篇(一)
本篇将介绍Reactor API的使用:

reactor官网介绍,响应式编程是一种与数据流和变化传播相关的异步编程范式。这意味着可以通过所采用的编程语言轻松表达静态(例如数组)或动态(例如时间发射器)数据流;

对比Flow api以及completableFuture,前者编写代码比较麻烦,编写处理器还要自定义一个类,后者还不能满足响应式编程,两者都有其局限性,而Reactor API是基于Stream流操作的,无论是编写还是响应式编程都能满足;

回顾响应式编程

①在面向对象语言中,反应式编程范式通常作为 观察者设计模式的扩展。还可以比较主要的反应流模式与熟悉的迭代器设计模式,因为 Iterable- 所有这些库中的迭代器对。一个主要的区别是,虽然Iterator是基于拉的,但反应流是基于推的。

②使用迭代器是一种命令式编程模式,即使访问值的方法完全由Iterable负责。实际上,由开发人员来选择何时访问序列中的下一个()项。在反应式流中,上述对的等价物是发布者-订阅者。但它是 发布者在新的可用值到来时通知订阅者,这种推送方面是响应的关键。此外,应用于推送值的操作是以声明方式而不是命令方式表达的:程序员表达计算的逻辑,而不是描述其确切的控制流。

③除了推送值之外,还以定义良好的方式涵盖了错误处理和完成方面。发布者可以将新值推送到其订阅者(通过调用onNext),但也可以发出错误信号(通过调用onError)或完成信号(通过调用onComplete)。错误和完成都会终止序列。这可以归纳如下:

onNext x 0…N [onError | onComplete]

这种方法非常灵活。该模式支持没有值、只有一个值或有n个值的用例(包括无限序列的值,例如时钟的连续滴答声)。

Reactor API的使用

Stream

用的就是Java 8引用的Stream API,使用Stream api时也会结合lambda表达式和函数式接口,所以Stream api是包括它们的;
虽然是众所周知,但笔者在此还是提一嘴,Stream API的链式调用,是每个元素完成整个流所有步骤的处理后,再遍历下一个元素的。而不是所有元素完成后,再让流中的下一个步骤处理全部元素,这是一不小心就会陷入的理解误区;

还有诸如flatMap()、map()等参数为函数式接口的种类,总共有四类
//Predicate 有入参且返回值固定为boolean
//Consumer 有入参无返回值
//Function 有一个入参且一个返回值
//Supplier 无入参且有返回值

引入依赖

需引入Reactor API的依赖

<dependency><groupId>io.projectreactor</groupId><artifactId>reactor-bom</artifactId><version>2023.0.0</version>
</dependency>

Reactor API的使用

Reactor API的使用是定义流源头,然后往下流,使用链式处理多个步骤,最后流向最后,就是异常或响应信号。

流源头的创建

创建流的方式有Flux和Mono两种,前者是用于创建多个元素的流,后者是创建只有一个元素的流;
在这里插入图片描述

这就分别创建了Flux流和Mono流,很轻易实现了一个全异步的系统。相比Flow流的一顿操作,是不是简便了很多;

而创建流,需指定流的源头,有四种方式
①just(),如上例,穷举出所有元素;
②rang();

//表示流有1到100个元素
Flux.range(1,100);

③generate();
这适用于同步和逐个发射,这意味着接收器是一个SynchronousSink,并且它的next()方法在每次回调调用中最多只能被调用一次。

//入参是泛型为SynchronousSink的Consumer接口
Flux.generate(sink->{//查库存sink.next(queryStock("1"));sink.complete();
}).subscribe(System.out::println);//但同一次回调中循环调用next()会报错,所以先初始化state值为0
Flux.generate(()->0,(state,sink)->{if (state<=10) {//小于10,则继续调用,里面的逻辑是每次将state+1sink.next(state);}else{sink.complete();}return state+1;}).subscribe(System.out::println);

④create方式,create是一种更高级的Flux程序化创建形式,它适合于每轮多个发射,甚至来自多个线程。create无需指定初始值而且可以多次执行sink#next():

Flux.create(sink->{for (int i=0;i<2;i++) {sink.next(queryStock("1"));}sink.complete();
}).subscribe(System.out::println);

在业务上通常使用后两种,例如可以从缓存或DB查到数据,然后类似于观察者模式,往后推送处理;更多使用create,因为generate需设置初始值,而且每次回调只能调用一次sink#next()

reactor api的背压模式

使用背压模式的原因——响应式编程的思想,主要在于推,推送到每个处理器操作,如果不管当前处理能力就很容易处理出问题,类似于MQ消费者控制从队列中获取数据量,要控制消费能力,于是引出背压模式,接收时通过request告诉上游,所以在Reactor中实现背压时,消费者压力传播回源的方式是向上游操作员发送请求。当前请求的总和有时被称为当前“需求”或“待决请求”。需求的上限为Long.MAX_VALUE,表示一个无限的请求(意思是“尽可能快地生产”-基本上禁用反压)。
如下,与Flow API类似,也是在自定义订阅者中指定背压请求request()

Flux.range(1, 10).doOnRequest(r -> System.out.println("request of " + r)).subscribe(new BaseSubscriber<Integer>() {@Overridepublic void hookOnSubscribe(Subscription subscription) {request(1);}@Overridepublic void hookOnNext(Integer integer) {System.out.println("Cancelling after having received " + integer);cancel();}});

发布者与订阅者使用的线程

查看弹珠图

Flux.create(sink->{for (int i=0;i<2;i++) {sink.next(i*queryStock("1"));}//代表推送给下一个操作形成新流sink.complete();//过滤不等于0的数
}).filter(Predicate.not(Predicate.isEqual(0))).subscribe(System.out::println);

鼠标点到filter操作,就可以看到元素(这些不同形状的方块就是了)经过filter后,变少了,点到每个操作map、flatmap等都可以看到对应的弹珠图,可根据弹珠图快速理解形成新流的操作
在这里插入图片描述

查看形成新流的日志

在这里插入图片描述
在每个流后面使用log(),查看该流形成每个元素的日志,如都是request无限元素,然后create后,就调onNext(0),形成元素0,后面filter就过滤了0,就没有再调onNext(0),继续create元素1时,就调 onNext(1),形成元素1,后面filter后,形成的新流中仍然有元素1,所以还会调用onNext(1),以此类推。

默认情况下,发布者使用的线程就是订阅者的线程,那就证明一下:
如上用log()打印出,订阅者和发布者使用的线程都是main,那如果订阅者开启新的线程,下图也能看到发布者回调onNext操作也是使用订阅者的线程:

在这里插入图片描述

下图,使用publishOn()里指定新线程Schedulers.single(),代表过滤后,发布者使用新线程会回调onNext()
在这里插入图片描述
图中①是filter之前,发布者还是用的订阅者的线程回调onNext,②是filter之后,发布者使用新的线程回调onNext()。

总之,上面只是介绍reactor api操作的冰山一角,至于更多操作可以查看官网哈

如有需要收藏的看官,顺便也用发财的小手点点赞哈,如有错漏,也欢迎各位在评论区评论!

参考官网:https://projectreactor.io/docs/core/release/reference/aboutDoc.html

相关文章:

步入响应式编程篇(二)之Reactor API

步入响应式编程篇&#xff08;二&#xff09;之Reactor API 前言回顾响应式编程Reactor API的使用Stream引入依赖Reactor API的使用流源头的创建 reactor api的背压模式发布者与订阅者使用的线程查看弹珠图查看形成新流的日志 前言 对于响应式编程的基于概念&#xff0c;以及J…...

Oracle SQL: TRANSLATE 和 REGEXP_LIKE 的知识点详细分析

目录 前言1. TRANSLATE2. REGEXP_LIKE3. 实战 前言 &#x1f91f; 找工作&#xff0c;来万码优才&#xff1a;&#x1f449; #小程序://万码优才/r6rqmzDaXpYkJZF 1. TRANSLATE TRANSLATE 用于替换字符串中指定字符集的每个字符&#xff0c;返回替换后的字符串 逐一映射输入字…...

RabbitMQ 在实际应用时要注意的问题

1. 幂等性保障 1.1 幂等性介绍 幂等性是数学和计算机科学中某些运算的性质,它们可以被多次应⽤,⽽不会改变初始应⽤的结果. 应⽤程序的幂等性介绍 在应⽤程序中,幂等性就是指对⼀个系统进⾏重复调⽤(相同参数),不论请求多少次,这些请求对系统的影响都是相同的效果. ⽐如数据库…...

算法日记8:StarryCoding60(单调栈)

一、题目 二、解题思路&#xff1a; 题意为让我们找到每个元素的左边第一个比它小的元素&#xff0c;若不存在则输出-1 2.1法一&#xff1a;暴力&#xff08;0n2&#xff09; 首先&#xff0c;我们可以想到最朴素的算法&#xff1a;直接暴力两层for达成目标核心代码如下&…...

大象机器人发布首款穿戴式数据采集器myController S570,助力具身智能数据收集!

myController S570 具有较高的数据采集速度和远程控制能力&#xff0c;大大简化了人形机器人的编程。 myController S570 是一款可移动的轻量级外骨骼&#xff0c;具有 14 个关节、2 个操纵杆和 2 个按钮&#xff0c;它提供高数据采集速度&#xff0c;出色的兼容性&#xff0c…...

【银河麒麟高级服务器操作系统】业务访问慢网卡丢包现象分析及处理过程

了解更多银河麒麟操作系统全新产品&#xff0c;请点击访问 麒麟软件产品专区&#xff1a;product.kylinos.cn 开发者专区&#xff1a;developer.kylinos.cn 文档中心&#xff1a;document.kylinos.cn 交流论坛&#xff1a;forum.kylinos.cn 服务器环境以及配置 【内核版本…...

C语言之饭店外卖信息管理系统

&#x1f31f; 嗨&#xff0c;我是LucianaiB&#xff01; &#x1f30d; 总有人间一两风&#xff0c;填我十万八千梦。 &#x1f680; 路漫漫其修远兮&#xff0c;吾将上下而求索。 C语言之饭店外卖信息管理系统 目录 设计题目设计目的设计任务描述设计要求输入和输出要求验…...

记一次 .NET某数字化协同管理系统 内存暴涨分析

一&#xff1a;背景 1. 讲故事 高级调试训练营里的一位朋友找到我&#xff0c;说他们跑在linux上的.NET程序出现了内存泄露的情况&#xff0c;上windbg观察发现内存都是IMAGE给吃掉了&#xff0c;那些image都标记了 doublemapper__deleted_ 字样&#xff0c;问我为啥会这样&a…...

部门管理查询部门,nginx反向代理,前端如何访问到后端Tomcat 注解@RequestParam

接口开发 增删改通常是不用返回data数据&#xff0c;返回null 列表查询-结果封装&#xff0c;时间 前后端联调测试 nginx反向代理&#xff0c;前端如何访问到后端Tomcat服务器 删除部门...

JS通过ASCII码值实现随机字符串的生成(可指定长度以及解决首位不出现数值)

在之前写过一篇“JS实现随机生成字符串&#xff08;可指定长度&#xff09;”&#xff0c;当时写的过于简单和传统&#xff0c;比较粗放。此次针对此问题&#xff0c;对随机生成字符串的功能进行优化处理&#xff0c;对随机取到的字符都通过程序自动来完成。 在写之前&#xff…...

速通Docker === 快速部署Redis主从集群

目录 镜像仓库介绍 持久化你的数据库 连接到其他容器 创建自定义网络 部署主节点 部署从节点 验证部署 总结 在现代应用架构中&#xff0c;Redis作为一个高性能的内存数据库&#xff0c;被广泛应用于缓存、会话存储、实时分析等多个领域。为了提高Redis的可用性和数据的…...

论文笔记(六十三)Understanding Diffusion Models: A Unified Perspective(一)

Understanding Diffusion Models: A Unified Perspective&#xff08;一&#xff09; 文章概括引言&#xff1a;生成模型背景&#xff1a;ELBO、VAE 和分层 VAE证据下界&#xff08;Evidence Lower Bound&#xff09;变分自编码器 &#xff08;Variational Autoencoders&#x…...

stm32使用MDK5.35时遇到*** TOOLS.INI: TOOLCHAIN NOT INSTALLED

mdk5.35出现*** TOOLS.INI: TOOLCHAIN NOT INSTALLED的问题&#xff01;&#xff01;&#xff01;&#xff01; 以管理员身份重新打开MDK5.35.0.0&#xff0c;用keygen破解密码&#xff0c;但是一直提示我是没有破解成功。 解决办法&#xff1a; target 改成ARM...

在Ubuntu上安装RabbitMQ教程

1、安装erlang 因为rabbitmq是基于erlang开发的&#xff0c;所以要安装rabbitmq&#xff0c;首先需要安装erlang运行环境 apt-get install erlang执行命令查是否安装成功&#xff1a;erl&#xff0c;疯狂 Ctrlc 就能退出命令行 2、安装rabbitmq 1、查看erlang与rabbitmq版本…...

【算法】集合List和队列

阿华代码&#xff0c;不是逆风&#xff0c;就是我疯 你们的点赞收藏是我前进最大的动力&#xff01;&#xff01; 希望本文内容能够帮助到你&#xff01;&#xff01; 目录 零&#xff1a;集合&#xff0c;队列的用法 一&#xff1a;字母异位词分组 二&#xff1a;二叉树的锯…...

uniapps使用HTML5的io模块拷贝文件目录

最近在集成sqlite到uniapp的过程中&#xff0c;因为要将sqlite数据库预加载&#xff0c;所以需要使用HTML5的plus.io模块。使用过程中遇到了许多问题&#xff0c;比如文件路径总是解析不到等。尤其是应用私有文档目录’_doc’。 根据官方文档&#xff1a; 为了安全管理应用的…...

css‘s hover VS mobile

.animation {animation: 30s move infinite linear;/* &:hover {animation-play-state: paused;*/ }原本写的好好的&#xff0c;测试说&#xff1a;“移动端点击滚动条&#xff0c;跳转到其他页面后&#xff0c;返回当前页面&#xff0c;滚动条不滚动&#xff1b;可以优化位…...

工业制造离不开的BOM

在制造业的浩瀚星空中&#xff0c;物料清单&#xff08;BOM&#xff09;犹如“北极星”&#xff0c;牢牢指引着产品从设计蓝图迈向实物诞生的全过程。 BOM的分类 按照设计制造的不同阶段&#xff0c;将BOM划分为设计BOM、工艺BOM、制造BOM三种类型。 设计BOM Engineering BO…...

HTML中的`<!DOCTYPE html>`是什么意思?

诸神缄默不语-个人CSDN博文目录 在学习HTML时&#xff0c;我们经常会看到HTML文档的开头出现<!DOCTYPE html>&#xff0c;它是HTML文件的第一行。很多初学者可能会疑惑&#xff0c;为什么需要这行代码&#xff1f;它到底有什么作用呢&#xff1f;在这篇文章中&#xff0…...

C语言之斗地主游戏

&#x1f31f; 嗨&#xff0c;我是LucianaiB&#xff01; &#x1f30d; 总有人间一两风&#xff0c;填我十万八千梦。 &#x1f680; 路漫漫其修远兮&#xff0c;吾将上下而求索。 ​ C语言之斗地主游戏 目录 程序概述程序设计 Card类CardGroup类Player类LastCards类Land…...

变量 varablie 声明- Rust 变量 let mut 声明与 C/C++ 变量声明对比分析

一、变量声明设计&#xff1a;let 与 mut 的哲学解析 Rust 采用 let 声明变量并通过 mut 显式标记可变性&#xff0c;这种设计体现了语言的核心哲学。以下是深度解析&#xff1a; 1.1 设计理念剖析 安全优先原则&#xff1a;默认不可变强制开发者明确声明意图 let x 5; …...

谷歌浏览器插件

项目中有时候会用到插件 sync-cookie-extension1.0.0&#xff1a;开发环境同步测试 cookie 至 localhost&#xff0c;便于本地请求服务携带 cookie 参考地址&#xff1a;https://juejin.cn/post/7139354571712757767 里面有源码下载下来&#xff0c;加在到扩展即可使用FeHelp…...

React hook之useRef

React useRef 详解 useRef 是 React 提供的一个 Hook&#xff0c;用于在函数组件中创建可变的引用对象。它在 React 开发中有多种重要用途&#xff0c;下面我将全面详细地介绍它的特性和用法。 基本概念 1. 创建 ref const refContainer useRef(initialValue);initialValu…...

在 Nginx Stream 层“改写”MQTT ngx_stream_mqtt_filter_module

1、为什么要修改 CONNECT 报文&#xff1f; 多租户隔离&#xff1a;自动为接入设备追加租户前缀&#xff0c;后端按 ClientID 拆分队列。零代码鉴权&#xff1a;将入站用户名替换为 OAuth Access-Token&#xff0c;后端 Broker 统一校验。灰度发布&#xff1a;根据 IP/地理位写…...

[ICLR 2022]How Much Can CLIP Benefit Vision-and-Language Tasks?

论文网址&#xff1a;pdf 英文是纯手打的&#xff01;论文原文的summarizing and paraphrasing。可能会出现难以避免的拼写错误和语法错误&#xff0c;若有发现欢迎评论指正&#xff01;文章偏向于笔记&#xff0c;谨慎食用 目录 1. 心得 2. 论文逐段精读 2.1. Abstract 2…...

css的定位(position)详解:相对定位 绝对定位 固定定位

在 CSS 中&#xff0c;元素的定位通过 position 属性控制&#xff0c;共有 5 种定位模式&#xff1a;static&#xff08;静态定位&#xff09;、relative&#xff08;相对定位&#xff09;、absolute&#xff08;绝对定位&#xff09;、fixed&#xff08;固定定位&#xff09;和…...

汇编常见指令

汇编常见指令 一、数据传送指令 指令功能示例说明MOV数据传送MOV EAX, 10将立即数 10 送入 EAXMOV [EBX], EAX将 EAX 值存入 EBX 指向的内存LEA加载有效地址LEA EAX, [EBX4]将 EBX4 的地址存入 EAX&#xff08;不访问内存&#xff09;XCHG交换数据XCHG EAX, EBX交换 EAX 和 EB…...

HarmonyOS运动开发:如何用mpchart绘制运动配速图表

##鸿蒙核心技术##运动开发##Sensor Service Kit&#xff08;传感器服务&#xff09;# 前言 在运动类应用中&#xff0c;运动数据的可视化是提升用户体验的重要环节。通过直观的图表展示运动过程中的关键数据&#xff0c;如配速、距离、卡路里消耗等&#xff0c;用户可以更清晰…...

系统掌握PyTorch:图解张量、Autograd、DataLoader、nn.Module与实战模型

本文较长&#xff0c;建议点赞收藏&#xff0c;以免遗失。更多AI大模型应用开发学习视频及资料&#xff0c;尽在聚客AI学院。 本文通过代码驱动的方式&#xff0c;系统讲解PyTorch核心概念和实战技巧&#xff0c;涵盖张量操作、自动微分、数据加载、模型构建和训练全流程&#…...

【Linux手册】探秘系统世界:从用户交互到硬件底层的全链路工作之旅

目录 前言 操作系统与驱动程序 是什么&#xff0c;为什么 怎么做 system call 用户操作接口 总结 前言 日常生活中&#xff0c;我们在使用电子设备时&#xff0c;我们所输入执行的每一条指令最终大多都会作用到硬件上&#xff0c;比如下载一款软件最终会下载到硬盘上&am…...