当前位置: 首页 > news >正文

响应式编程-Project Reactor Mono 介绍

响应式编程-Project Reactor Mono 介绍

本文以Mono的角度来介绍Reactor编程,Flux的使用同理。

初体验

Web应用 controller 方法在Spring webmvc 和 Spring webFlux下Controller方法实现示例如下:

Spring webmvc:

    @GetMapping("/test1")

    @ResponseBody

    public String test1(){

        String result =  geterateTest();

        return result;

    }

Spring webFlux

    @GetMapping("/test2")

    @ResponseBody

    public Mono<String> test2(){

        Mono<String> result = Mono.fromSupplier(this:: geterateTest);

        return result;

    }

一个的响应是String对象, 另一个是Mono<String>对象。Reactor Mono表示一个产生0-1元素的异步序列,异步指Mono创建的时候并不会执行任何操作,当Mono发生订阅时才触发Mono序列的运行。非阻塞表示test2方法不会产生任何阻塞,即使genereateTest里面是一个阻塞的操作,因为此时不会执行实际的逻辑,所以不会发生任何阻塞。

NettyHttpServer.onStateChange方法中构建Mono并进行订阅。

HttpServerOperations ops = (HttpServerOperations)connection;

//Web Flux将按照Spring Web中的约定构建一个Publisher(执行过滤器、Controller方//法)

Publisher<Void> publisher = (Publisher)this.handler.apply(ops, ops);

Mono<Void> mono = Mono.deferContextual((ctx) -> {

      ops.currentContext = Context.of(ctx);

      return Mono.fromDirect(publisher);

});

……

//subscribe将触发前面Spring web中封装在Mono构建过程中的业务逻辑的真正执行。

//如果我们按照命令是编程去编写代码,业务逻辑在构建Mono的过程中就执行了。

mono.subscribe(ops.disposeSubscriber());

注: Spring web flux框架下也可以按照传统的命令式编程。

Mono的构建

Reactor编程可以分为 异步序列Mono/Flux的构建和和使用两部分。

Mono的基本构建

Mono类 提供了大量静态方法帮助构建Mono。

  • just(T):返回T类型对象的Mono序列
  • fromFuture(future):Mono序列的元素对象由future产生,订阅时Future产生T并推送至订阅者。其他from方法类似。
  • empty():返回一个订阅时直接完成的异步序列
  • error():返回一个订阅时直接推送错误信号的序列

其他方法详见Mono类API:

如:Mono<String> mono = Mono.just("TEST");

Mono装配

假设我们按照上面示例,将整个程序都以响应式编程的模式进行开发,方法都返回一个异步序列Mono/Flux。当调用者调用某一个方法时,面对返回的Mono/Flux对象有两种选择:1. 订阅(触发执行), 2.装配(Assembly):继续将获取到的异步序列封装到一个新的异步序列中,继续返回给外部调用者。如:Spring Web Flux 则是将Spring web 定义的包括WebFilter、Controller等逻辑组装成一个复合的Mono,最终进行订阅。

图1 Mono装配示例

OptimizableOperator 接口

       OptimizableOperator <IN, OUT>接口提供了指向下一个OptimizableOperator的指针,并且提供了从IN型订阅者获取OUT订阅者的方法,提供了一个Mono串行的组装方法。

图2 OptimizableOperator接口串行组装示意图

要实现一个串行化的Mono组装类通常实现抽象类InternalMonoOperator<I, O>,构造函数传入一个Mono<I>,得到一个新的O型序列。实现subscribeOrReturn方法将O型订阅转化为原I型订阅者,新的I型订阅者实现了基于O性订阅者之上的强化操作。Mono提供了大量InternalMonoOperator<I,O>的实现类。下面对MonoFilter进行分析,解释了如果创建基于InternalMonoOperator实现的装配类和使用方法。

MonoFilter

将原Mono上增加一个过滤Predicate函数,当原Mono产生元素时,只有Predicate测试通过的元素才会传递给最终的订阅者,测试失败将进行过滤,Mono元素直接完成。

final class MonoFilter<T> extends InternalMonoOperator<T, T> {

         final Predicate<? super T> predicate;

         //构造函数必须包含源Mono,和其他附加增加元素,这里是一个Predicate函数

         MonoFilter(Mono<? extends T> source, Predicate<? super T> predicate) {

                  super(source);

                  this.predicate = Objects.requireNonNull(predicate, "predicate");

         }

         /**

         * 实现subscribeOrReturn,接收新Mono类型的订阅者,返回原Mono类型的订阅者。

         * 新的订阅者实现订阅时装配的目的,这里只有通过Predicate函数测试的元素,才会

         * 调用actual.onNext(T)方法推送给最终的订阅者

         **/

         @Override

         @SuppressWarnings("unchecked")

         public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {

                  if (actual instanceof ConditionalSubscriber) {

                          return new FluxFilter.FilterConditionalSubscriber<>((ConditionalSubscriber<? super T>) actual, predicate);

                  }

                  return new FluxFilter.FilterSubscriber<>(actual, predicate);

         }

    ......

}

Mono内置了大量的InternalMonoOperator实现类,如MonoFilter,但Reactor框架并不对外暴露这些类,(这些实现类都是包内可见的),而是通过Mono方法的形式去方便获取各个可实现类的对象,并且统一以Mono类型的对外暴露。抽象统一的Mono使用范式比起暴露各种各样的实现细节显得简洁清晰。

我们可以使用Mono内置的InternalMonoOperator实现类,也可以实现自己的InternalMonoOperator类,但应和Reactor框架保持统一的用法, 在Mono的使用上统一以Mono类型和协议进行操作,不对外暴露具体的实现细节。

Mono 提供的装配方法

       Reactor框架并不暴露具体的装配类细节,而是提供了大量静态或实例方法来对Mono进行装配,返回装配后的新Mono。如上节所述的MonoFilter使用方法如下:

Mono.just(2).filter( (v -> v % 2 != 0)).subscribe(i -> System.out.println(i),

                error -> System.err.println("Error: " + error),

                ()-> System.out.println("complete"));

Mono filter方法返回了一个可以对原序列元素进行检测的增强Mono,上述例子因Mono.just(2) 中的元素值2 无法通过(v -> v % 2 != 0)的测试,将被过滤掉,无法传给最终的订阅者,而只能接受到原序列的结束信号, 因此只会打印“complete“。

Filter方法显示实际是返回的MonoFilter对象。

public final Mono<T> filter(final Predicate<? super T> tester) {

         ……

         return onAssembly(new MonoFilter<>(this, tester));

}

其他Mono装配方法:

  • Mono<Tuple2<T1, T2>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2)

:将一个T1类型元素的Mono和一个T2类型元素的Mono中的元素组合成一个Tuple2<T1,T2>元素的Mono. Mono还提供了zip的多种版本,满足各种情况的Mono组合模式。

  • public final Mono<T> timeout(Duration timeout): 当原序列产生一个T类型元素后,如果没有在指定的时间内完成,则将触发一个错误。如果在限期内完成则没有任何影响,该实现使用了MonoTimeout<T, U, V> extends InternalMonoOperator<T, T>。
  • doOnXXXX系列方法,如doOnCancel,  doOnNext, doOnError等, 返回在特定事件上加入行为的增强Mono。

更多Mono的装配方法详见Mono API。

Mono的使用

Mono的使用其实只有一种就是对Mono进行订阅, 但是Mono类也提供了其他传统的接口来进行Mono的使用。

Mono的订阅

订阅Mono很简单,调用Mono对象的subscribe方法,传入一个CoreSubscriber的实现对象即可。

Mono.subscribe.源码中展示了对Mono装配后的复合Mono进行订阅的处理逻辑。

public final void subscribe(Subscriber<? super T> actual) {

    //获取最后一个装配的Mono corePublisher

         CorePublisher publisher = Operators.onLastAssembly(this);

         CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);

         ......

             //如果最后一个装配的publisher 实现了OptmizableOperator接口,一路组装

             //增强的Subscriber,按照循序后去下一个OptmizableOperator

                  if (publisher instanceof OptimizableOperator) {

                          OptimizableOperator operator = (OptimizableOperator) publisher;

                          while (true) {

                                   subscriber = operator.subscribeOrReturn(subscriber);

                                   if (subscriber == null) {

                                            return;

                                   }

                                   OptimizableOperator newSource = operator.nextOptimizableSource();

                                   if (newSource == null) {

                                            publisher = operator.source();

                                            break;

                                   }

                                   operator = newSource;

                          }

                  }

             //直到最底层的CorePublisher,使用最终转换所得的subscriber进行订阅,

             //原始序列产生的序号,将在一些列增强subscriber的增强下,或丢弃、或加工后传给

             //实际的订阅者

                  publisher.subscribe(subscriber);

}

Mono的简化使用

       Mono 提供了一些方法简化Mono的订阅操作,如block() 阻塞当前线程知道Mono序列返回元素或完成/异常信号

PublishOn和SubscribeOn

       publishOn 和 SubscribeOn 传入Scheduler对象,将Mono的行为交由Scheduler的现成执行。其中publishOn调用之后的序列行为在新的执行线程执行,而SubscribeOn则是整个序列的执行都在新的现成中执行。

final Flux<String> flux = Flux

    .range(1, 2)

    .map(i -> 10 + i) 

    .publishOn(s) 

.map(i -> "value " + i);

flux.subscribe(System.out::println)

final Flux<String> flux = Flux

    .range(1, 2)

    .map(i -> 10 + i) 

    .subscribeOn(s) 

    .map(i -> "value " + i);

flux.subscribe(System.out::println)

总结

       本文对Reactor的Mono编程进行了初步的介绍,体现了响应式编程的核心在于异步序列的构建(Mono/Flux)和订阅使用。 其中构建时对Mono/Flux的装配(Assembly)是整个编程模型的核心。

相关文章:

响应式编程-Project Reactor Mono 介绍

响应式编程-Project Reactor Mono 介绍 本文以Mono的角度来介绍Reactor编程&#xff0c;Flux的使用同理。 初体验 Web应用 controller 方法在Spring webmvc 和 Spring webFlux下Controller方法实现示例如下&#xff1a; Spring webmvc: GetMapping("/test1") …...

R语言实操记录——导出高清图片(矢量图)

R语言 R语言实操记录——导出高清图片&#xff08;矢量图&#xff09; 文章目录 R语言一、起因&#xff08;闲聊&#xff0c;可跳过&#xff09;二、如何在R中导出高清图片&#xff08;矢量图&#xff09;2.1、保存为EPS图片格式后转AI编辑2.2、保存为PDF格式&#xff08;推荐…...

Apache Doris 开源最顶级基于MPP架构的高性能实时分析数据库

背景介绍 Apache Doris是一个基于MPP架构的易于使用&#xff0c;高性能和实时的分析数据库&#xff0c;以其极高的速度和易用性而闻名。海量数据下返回查询结果仅需亚秒级响应时间&#xff0c;不仅可以支持高并发点查询场景&#xff0c;还可以支持高通量复杂分析场景。 这些都…...

webgoat-Request Forgeries 请求伪造

(A8:2013) Request Forgeries Cross-Site Request Forgeries 跨站请求伪造&#xff0c;又称一键攻击或会话骑乘&#xff0c;简称CSRF &#xff08;有时发音为 sea-surf&#xff09;或 XSRF&#xff0c;是一种恶意利用网站&#xff0c;其中传输未经授权的命令 来自网站信任的用…...

【flask跨域问题】解决它

大概7-8年前&#xff0c;前后端还没开始分离或者刚开始分离的之前&#xff0c;跨域问题很多。 后来我就没在遇到过了&#xff0c;这次做一个小项目&#xff0c;又遇到了&#xff0c;记录下。 现在前端的脚手架都自己能解决了。 1. 跨域 是因为出于浏览器的同源策略限制。同源…...

虚幻引擎:如何在工程里面添加插件

1.在自己的项目中安装插件 在content目录下创建一个Plugins的文件,将插件文件放进去即可 2.在软件上安装,这样所有创建的项目都会带有此插件 将插件放在自己软件的这个目录下就好了...

SpringCloud Alibaba 【四】Openfeign

Openfeign配置与使用 前言介绍openfeign使用openfeign导入依赖启动类正式使用测试结果 前言 在springcloud中消费者项目需要调用提供者项目的接口&#xff0c;一开始用的是RestTemplate中的方法。但是RestTemplate进行远程调用时&#xff0c;直接调用controller层的接口&#…...

语音信号的线性预测分析、合成及MATLAB编程设计实现

需要的基础&#xff1a;AR模型、列文森-杜宾递推法 推荐阅读&#xff1a; 基于线性预测的语音编码原理解析 基于线性预测的语音编码原理解析 这篇文章和上一篇类似 语音信号的线性预测分析及其Matlab源码 这篇文章是要付费看的&#xff0c;但是他能预览的那部分写的确实好 语…...

rabbitMQ rascal/amqplib报错 Error: Unexpected close 排查

以下是一些可能导致此 RabbitMQ 客户端或任何其他 RabbitMQ 客户端中的套接字读取或写入失败的常见场景 1.错过&#xff08;客户端&#xff09;心跳 第一个常见原因是RabbitMQ 检测到心跳丢失。发生这种情况时&#xff0c;RabbitMQ 将添加一个有关它的日志条目&#xff0c;然…...

一文1600字使用Postman搞定各种接口token实战(建议收藏)

现在许多项目都使用jwt来实现用户登录和数据权限&#xff0c;校验过用户的用户名和密码后&#xff0c;会向用户响应一段经过加密的token&#xff0c;在这段token中可能储存了数据权限等&#xff0c;在后期的访问中&#xff0c;需要携带这段token&#xff0c;后台解析这段token才…...

Vue自定义组件学习笔记

专业描述: vue关于自定义组件的描述中&#xff0c;父子组件是相对的概念,父组件表示引用当前组件的组件,子组件就是当前组件&#xff1b; 1)关于props和emits选项的理解 1.props:我们平时写的.vue文件实际上就是一个自定义组件,只是一般不会考虑复用性,不会去设置props选项,…...

王道p18 第12题假设 A中的 n个元素保存在一个一维数组中,请设计一个尽可能高效的算法,找出A的主元素。若存在主元素,则输出该元素:否则输出-1

视频讲解在&#xff1a;&#x1f447; p18 第12题 c语言实现王道数据结构课后习题_哔哩哔哩_bilibili 从前向后扫描数组元素&#xff0c;标记出一个可能成为主元素的元素 Num。然后重新计数&#xff0c;确认 Num 是否是主元素。 我们可分为以下两步: 1.选取候选的主元素。依…...

OpenTiny Vue 3.11.0 发布:增加富文本、ColorPicker等4个新组件,迎来了贡献者大爆发!

非常高兴跟大家宣布&#xff0c;2023年10月24日&#xff0c;OpenTiny Vue 发布了 v3.11.0 &#x1f389;。 OpenTiny 每次大版本发布&#xff0c;都会给大家带来一些实用的新特性&#xff0c;8.14 我们发布了 v3.10.0 版本&#xff0c;增加了4个新组件&#xff0c;组件 Demo 支…...

vivado查看报告和消息5

1、可配置报告策略 “ Configurable Report Strategies ” &#xff08; 可配置报告策略 &#xff09; 支持在 Vivado 工程模式下运行综合与实现的每个步骤之后选择 要运行的报告命令。根据设计阶段、设计复杂性和用户首选项&#xff0c; 需自动生成一组不同的报告以供频繁查…...

基于javaweb+mysql的jsp+servlet学生成绩管理系统(管理员、教师、学生)

博主24h在线&#xff0c;想要源码文档部署视频直接私聊&#xff0c;9.9元拿走&#xff01; 基于javawebmysql的jspservlet学生成绩管理系统(管理员、教师、学生)(javajspservletjavabeanmysqltomcat) 运行环境 Java≥8、MySQL≥5.7、Tomcat≥8 开发工具 eclipse/idea/myecl…...

基于卷积优化算法的无人机航迹规划-附代码

基于卷积优化算法的无人机航迹规划 文章目录 基于卷积优化算法的无人机航迹规划1.卷积优化搜索算法2.无人机飞行环境建模3.无人机航迹规划建模4.实验结果4.1地图创建4.2 航迹规划 5.参考文献6.Matlab代码 摘要&#xff1a;本文主要介绍利用卷积优化算法来优化无人机航迹规划。 …...

科技云报道:不卷自研大模型,金山办公如何创新生成式AI?

科技云报道原创。 过去大半年里&#xff0c;很多人对大模型的前景寄予厚望。主流观点认为&#xff0c;每个行业、每款产品都可以通过大模型“重做一遍”。 “重做一遍”听起来想象空间很大&#xff0c;但实际上多数大模型产品需要漫长的训练周期和海量资源投入&#xff0c;落…...

3BHE022291R0101 PCD230A 专注于制造卓越人工智能

3BHE022291R0101 PCD230A 专注于制造卓越人工智能 BISTelligence是BISTel的一个分支&#xff0c;BISTel是为全球半导体和FPD制造商提供工程和软件自动化产品的领先供应商。半导体产品集团上个月被卖给了新思科技。在出售给Synopsys之后&#xff0c;Bisetlliegnce成立了两个部门…...

小程序 scroll-view 性能问题

先说使用场景&#xff0c;一次加载很多数据造成小程序卡顿的问题 &#xff0c;找了好多都没有好的解决办法&#xff0c;要么太过复杂&#xff0c;然后研究了两天通过简单的办法实现&#xff0c;先根据数量把高度撑开&#xff0c;然后根据滚动位置渲染指定的数据就可以了&#x…...

【移远QuecPython】EC800M物联网开发板的硬件PWM和PWM输出BUG

【移远QuecPython】EC800M物联网开发板的硬件PWM和PWM输出BUG 文章目录 导入库初始化PWM开启PWMPWM硬件BUG硬件BUG复现原因附录&#xff1a;列表的赋值类型和py打包列表赋值BUG复现代码改进优化总结 py打包 导入库 from misc import PWM_V2或者 from misc import PWM但我觉得…...

Chapter03-Authentication vulnerabilities

文章目录 1. 身份验证简介1.1 What is authentication1.2 difference between authentication and authorization1.3 身份验证机制失效的原因1.4 身份验证机制失效的影响 2. 基于登录功能的漏洞2.1 密码爆破2.2 用户名枚举2.3 有缺陷的暴力破解防护2.3.1 如果用户登录尝试失败次…...

突破不可导策略的训练难题:零阶优化与强化学习的深度嵌合

强化学习&#xff08;Reinforcement Learning, RL&#xff09;是工业领域智能控制的重要方法。它的基本原理是将最优控制问题建模为马尔可夫决策过程&#xff0c;然后使用强化学习的Actor-Critic机制&#xff08;中文译作“知行互动”机制&#xff09;&#xff0c;逐步迭代求解…...

rknn优化教程(二)

文章目录 1. 前述2. 三方库的封装2.1 xrepo中的库2.2 xrepo之外的库2.2.1 opencv2.2.2 rknnrt2.2.3 spdlog 3. rknn_engine库 1. 前述 OK&#xff0c;开始写第二篇的内容了。这篇博客主要能写一下&#xff1a; 如何给一些三方库按照xmake方式进行封装&#xff0c;供调用如何按…...

Python:操作 Excel 折叠

💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 Python 操作 Excel 系列 读取单元格数据按行写入设置行高和列宽自动调整行高和列宽水平…...

渗透实战PortSwigger靶场-XSS Lab 14:大多数标签和属性被阻止

<script>标签被拦截 我们需要把全部可用的 tag 和 event 进行暴力破解 XSS cheat sheet&#xff1a; https://portswigger.net/web-security/cross-site-scripting/cheat-sheet 通过爆破发现body可以用 再把全部 events 放进去爆破 这些 event 全部可用 <body onres…...

Frozen-Flask :将 Flask 应用“冻结”为静态文件

Frozen-Flask 是一个用于将 Flask 应用“冻结”为静态文件的 Python 扩展。它的核心用途是&#xff1a;将一个 Flask Web 应用生成成纯静态 HTML 文件&#xff0c;从而可以部署到静态网站托管服务上&#xff0c;如 GitHub Pages、Netlify 或任何支持静态文件的网站服务器。 &am…...

现代密码学 | 椭圆曲线密码学—附py代码

Elliptic Curve Cryptography 椭圆曲线密码学&#xff08;ECC&#xff09;是一种基于有限域上椭圆曲线数学特性的公钥加密技术。其核心原理涉及椭圆曲线的代数性质、离散对数问题以及有限域上的运算。 椭圆曲线密码学是多种数字签名算法的基础&#xff0c;例如椭圆曲线数字签…...

Unity | AmplifyShaderEditor插件基础(第七集:平面波动shader)

目录 一、&#x1f44b;&#x1f3fb;前言 二、&#x1f608;sinx波动的基本原理 三、&#x1f608;波动起来 1.sinx节点介绍 2.vertexPosition 3.集成Vector3 a.节点Append b.连起来 4.波动起来 a.波动的原理 b.时间节点 c.sinx的处理 四、&#x1f30a;波动优化…...

CSS设置元素的宽度根据其内容自动调整

width: fit-content 是 CSS 中的一个属性值&#xff0c;用于设置元素的宽度根据其内容自动调整&#xff0c;确保宽度刚好容纳内容而不会超出。 效果对比 默认情况&#xff08;width: auto&#xff09;&#xff1a; 块级元素&#xff08;如 <div>&#xff09;会占满父容器…...

蓝桥杯 冶炼金属

原题目链接 &#x1f527; 冶炼金属转换率推测题解 &#x1f4dc; 原题描述 小蓝有一个神奇的炉子用于将普通金属 O O O 冶炼成为一种特殊金属 X X X。这个炉子有一个属性叫转换率 V V V&#xff0c;是一个正整数&#xff0c;表示每 V V V 个普通金属 O O O 可以冶炼出 …...