Reactor响应式流的核心机制——背压机制
响应式流是什么?
响应式流旨在为无阻塞异步流处理提供一个标准。它旨在解决处理元素流的问题——如何将元素流从发布者传递到订阅者,而不需要发布者阻塞,或订阅者有无限制的缓冲区或丢弃。
响应式流模型存在两种基本的实现机制。一种就是传统开发模式下的“拉”模式,即消费者主动从生产者拉取元素;而另一种就是“推”模式,在这种模式下,生产者将元素推送给消费者。相较于“拉”模式,“推”模式下的数据处理的资源利用率更好,下图所示的就是一种典型的推模式处理流程。

数据流的生产者会持续地生成数据并推送给消费者。这里就引出了流量控制问题,即如果数据的生产者和消费者处理数据的速度是不一致的,我们应该如何确保系统的稳定性呢?
流量控制
生产者生产数据的速率小于消费者的场景
这种场景对于消费者来说没啥压力,正常消费就好了,这里也就不需要所谓的流量控制了。
生产者生产数据的速率大于消费者的场景
生产者生产数据的速率大于消费者的场景,应该是我们业务中经常遇到的场景了,这种场景由于消费者处理不过来导致崩溃,业界通常的做法是在生产者与消费者之间加一个队列做缓冲。我们知道队列具有存储与转发的功能,所以可以用它来进行一定的流量控制。

如何对于流量进行很好的控制?这就转变到了如何设计好一个队列了,目前 Java 业界主流的队列有以下三种:
无界队列 :
界队列在原则上是拥有无线大小容量的队列,可以存放生产者产生的所有消息。

-
优势:确保消费者消费到所有的数据
-
劣势:系统的回弹性降低,任何一个系统不可能拥有无限的资源,一旦内存等资源耗尽,系统就可能会有崩溃的风险。
有界丢弃队列 :
为了避免上面无界队列的弊端,有界丢弃队列采用的是如果队列满了,就会采用丢弃后面传入的值,这里可以设置一些丢弃策略,比如说按照优先级或先进先出等。

-
优势:考虑到资源的限制,适合允许丢消息的业务场景。
-
劣势:消息重要性很高的场景不建议采取这种队列
有界阻塞队列:
像一些支付金融级别的场景,是不允许丢数据的,所以我们引出有界阻塞队列,我们会在队列消息数量达到上限后阻塞生产者,而不是直接丢弃消息。

-
优势:解决了不允许丢数据的业务场景
-
劣势:当队列满了的时候,会阻塞生产者停止生产数据,这种场景不可能实现异步操作的。
所以,无论从回弹性、弹性还是即时响应性出发,上述的队列都不是响应式流的上佳解决办法。
背压机制
上面说的那几种队列纯“推”模式下的数据流量会有很多不可控制的因素,并不能直接应用,而是需要在“推”模式和“拉”模式之间考虑一定的平衡性,从而优雅地实现流量控制。这就需要引出响应式系统中非常重要的一个概念——背压机制(Backpressure)。
什么是背压?简单来说就是下游能够向上游反馈流量请求的机制。通过前面的分析,我们知道如果消费者消费数据的速度赶不上生产者生产数据的速度时,它就会持续消耗系统的资源,直到这些资源被消耗殆尽。
这个时候,就需要有一种机制使得消费者可以根据自身当前的处理能力通知生产者来调整生产数据的速度,这种机制就是背压。采用背压机制,消费者会根据自身的处理能力来请求数据,而生产者也会根据消费者的能力来生产数据,从而在两者之间达成一种动态的平衡,确保系统的即时响应性。
响应式流规范
有了背压机制,我们再来看下响应式流是如何基于这种机制去设计的一套规范,规范详情请参考:Reactive Streams
Java API 的响应式流只定义了四个核心接口:
-
Publisher<T>
-
Subscriber<T>
-
Subscription
-
Processor<T,R>
publisher<`T`>
Publisher 代表的就是一种可以生产无限数据的发布者,接口如下:
public interface Publisher<T> {public void subscribe(Subscriber<? super T> s);
}
可以看到,Publisher 里的 subscribe 方法传入的是 Subscriber 接口,其实这里用的是回调,Publisher 根据收到的请求向当前订阅者 Subscriber 发送元素。
Subscriber<`T`>
Subscriber 代表的是一种可以从发布者那里订阅并接收元素的订阅者,接口如下:
public interface Subscriber<T> {public void onSubscribe(Subscription s);public void onNext(T t);public void onError(Throwable t);public void onComplete();
}
Subscriber 接口定义的这组方法构成了数据流请求和处理的基本流程,其中,onSubscribe() 从命名上看就是一个回调方法,当发布者的 subscribe() 方法被调用时就会触发这个回调。而在该方法中有一个参数 Subscription,可以把这个 Subscription 看作是一种用于订阅的上下文对象。Subscription 对象中包含了这次回调中订阅者想要向发布者请求的数据个数。
当订阅关系已经建立,那么发布者就可以调用订阅者的 onNext() 方法向订阅者发送一个数据。这个过程是持续不断的,直到所发送的数据已经达到 Subscription 对象中所请求的数据个数。这时候 onComplete() 方法就会被触发,代表这个数据流已经全部发送结束。而一旦在这个过程中出现了异常,那么就会触发 onError() 方法,我们可以通过这个方法捕获到具体的异常信息进行处理,而数据流也就自动终止了。
Subscription
Subscription 代表的就是一种订阅上下文对象,它在订阅者和发布者之间进行传输,从而在两者之间形成一种契约关系,接口如下:
public interface Subscription {public void request(long n);public void cancel();
}
这里的 request() 方法用于请求 n 个元素,订阅者可以通过不断调用该方法来向发布者请求数据;而 cancel() 方法显然是用来取消这次订阅。请注意,Subscription 对象是确保生产者和消费者针对数据处理速度达成一种动态平衡的基础,也是流量控制中实现背压机制的关键所在。

Processor<`T,R`>
Processor 代表的就是订阅者和发布者的处理阶段,Processor 接口继承了 Publisher 和 Subscriber 接口。它用于转换发布者——订阅者管道中的元素。Processor订阅类型 T 的数据元素,接收并转换为类型 R 的数据,并发布变换后的数据。接口如下:
public interface Processor<T,R> extends Subscriber<T>, Publisher<R> {}
图显示了处理者在发布者——订阅和管道中作为转换器的作用,可以拥有多个处理者。
![]()
总结
-
响应式流规范定义的很简洁,但实现起来并不简单,发布者和订阅者之间的所有交互的异步性质以及背压机制使得实现变得复杂。
-
响应式流规范非常灵活,还可以提供独立的“推”模型和“拉”模型。如果为了实现纯“推”模型,我们可以考虑一次请求足够多的元素;而对于纯“拉”模型,相当于就是在每次调用 Subscriber 的 onNext() 方法时只请求一个新元素。
-
JDK 9 中提供了 Flow 响应式流接口,与响应式流兼容的接口,可以看得出,JDK 团队后续的发展趋势也是想往响应式流这块靠近。
相关文章:
Reactor响应式流的核心机制——背压机制
响应式流是什么? 响应式流旨在为无阻塞异步流处理提供一个标准。它旨在解决处理元素流的问题——如何将元素流从发布者传递到订阅者,而不需要发布者阻塞,或订阅者有无限制的缓冲区或丢弃。 响应式流模型存在两种基本的实现机制。一种就是传统…...
[数据结构]栈的深入学习-java实现
CSDN的各位uu们你们好,今天千泽带来了栈的深入学习,我们会简单的用代码实现一下栈, 接下来让我们一起进入栈的神奇小世界吧!0.速览文章一、栈的定义1. 栈的概念2. 栈的图解二、栈的模拟实现三.栈的经典使用场景-逆波兰表达式总结一、栈的定义 1. 栈的概念 栈:一种…...
网络编程基础
1 互联网的本质硬件设备有了操作系统,然后装上软件之后我们就能够正常使用了,然后也只能自己使用。像这样,每个人都拥有一台自己的机器,然而彼此孤立。如何才能和大家一起愉快的玩耍?什么是网络?简单来说&a…...
华为OD机试题 - 数列还原(JavaScript)| 机考必刷
更多题库,搜索引擎搜 梦想橡皮擦华为OD 👑👑👑 更多华为OD题库,搜 梦想橡皮擦 华为OD 👑👑👑 更多华为机考题库,搜 梦想橡皮擦华为OD 👑👑👑 华为OD机试题 最近更新的博客使用说明本篇题解:数列还原题目输入输出示例一输入输出Code代码解析版权说明华为O…...
10-Oracle存储过程(创建,修改,使用及管理)
本章内容 1、我们为什么要用存储过程? 2、存储过程是如何定义和维护的? 3、我们如何调用存储过程? 4、存储过程中常用的复合数据处理方式及CTE 5、存储过程如何进行异常处理? 6、存储过程如何进行事务处理? 7、我们应如何优化存储过程? 1、我们为什么要用存储过程…...
创建线程的三种方法
文章目录1、创建一个类实现Runnable接口,并重写run方法。2、创建一个类继承Thread类,并重写run方法。3、实现Callable接口,重写call()方法,这种方式可以通过FutureTask获取任务执行的返回值。4、run()方法和start()方法有什么区别…...
第一章---Pytorch快速入门---第三节---pytorch中的数据操作和预处理
目录 1.高维数组 1.1 回归数据准备 1.2 分类数据准备 2. 图像数据 1.torchvision.datasets模块导入数据并预处理 2.从文件夹中导入数据并进行预处理 pytorch中torch.utils.data模块包含着一些常用的数据预处理的操作,主要用于数据的读取、切分、准备等。 常用…...
【代码随想录训练营】【Day38】第九章|动态规划|理论基础|509. 斐波那契数|70. 爬楼梯|746. 使用最小花费爬楼梯
理论基础 动态规划与贪心的区别并不是学习动态规划所必须了解的,所以并不重要。 想要了解动态规划算法题的特点,可以直接做下面三道入门简单题练练手感,找找感觉,很快就能体会到动态规划的解题思想。 总结成一句话就是…...
华为OD机试题 - 快递货车(JavaScript)| 机考必刷
更多题库,搜索引擎搜 梦想橡皮擦华为OD 👑👑👑 更多华为OD题库,搜 梦想橡皮擦 华为OD 👑👑👑 更多华为机考题库,搜 梦想橡皮擦华为OD 👑👑👑 华为OD机试题 最近更新的博客使用说明本篇题解:快递货车题目输入输出示例一输入输出Code解题思路版权说明华为O…...
前端——7.图像标签和路径
这篇文章,我们来讲解一下图像标签 目录 1.图像标签 1.1介绍 1.2实际展示 1.3图像标签的属性 1.3.1 alt属性 1.3.2 title属性 1.3.3 width / height 属性 1.3.4 border属性 1.4注意事项 2.文件夹 2.1目录文件夹和根目录 2.2 VSCode打开目录文件夹 3.路…...
java -- stream流
写在前面: stream流一直在使用,但是感觉还不够精通,现在深入研究一下。 stream这个章节中,会用到 函数式接口–lambda表达式–方法引用的相关知识 介绍 是jdk8引进的新特性。 stream流是类似一条流水线一样的操作,每次对数据进…...
【Spring6】| Bean的四种获取方式(实例化)
目录 一:Bean的实例化方式 1. 通过构造方法实例化 2. 通过简单工厂模式实例化 3. 通过factory-bean实例化 4. 通过FactoryBean接口实例化 5. BeanFactory和FactoryBean的区别(面试题) 6. 使用FactoryBean注入自定义Date 一:…...
01: 新手学SpringCloud前需知道的5点
目录 第一点: 什么是微服务架构 第二点:为什么需要学习Spring Cloud 第三点: Spring Cloud 是什么 第四点: SpringCloud的优缺点 1、SpringCloud优点 2、SpringCloud缺点 第五点: SpringCloud由什么组成 1&…...
ubuntu apt安装arm交叉编译工具
查找查找编译目标为32位的gcc-arm交叉编译器命令apt-cache search arm|awk index($1,"arm")!0 {print}|grep gcc-arm\|g-arm #或者 apt-cache search arm|awk index($1,"arm")!0 {print}|grep -E gcc-arm|g\\-arm输出如下g-arm-linux-gnueabihf - GNU C co…...
阿里云一面经历
文章目录 ES 查询方式都有哪些?1 基于词项的查询term & terms 查询Fuzzy QueryWildcard Query2 基于全文的查询Match QueryQuery String QueryMatch Phrase Query3 复合查询Bool QueryElasticsearch 删除原理ES 大文章怎么存arthas 常用命令arthas 排查问题过程arthas 工作…...
Java Stream中 用List集合统计 求和 最大值 最小值 平均值
对集合数据的统计,是开发中常用的功能,掌握好Java Stream提供的方法,避免自己写代码统计,可以提高工作效率。 先造点数据: pigs.add(new Pig(1, "猪爸爸", 31, "M", false)); pigs.add(new Pig(…...
【Linux】多线程---线程控制
进程在前面已经讲过了,所以这次我们来讨论一下多线程。前言:线程的背景进程是Linux中资源及事物管理的基本单位,是系统进行资源分配和调度的一个独立单位。但是实现进程间通信需要借助操作系统中专门的通信机制,但是只这些机制将占…...
秒杀高并发解决方案
秒杀高并发解决方案 1.秒杀/高并发方案-介绍 秒杀/高并发 其实主要解决两个问题,一个是并发读,一个是并发写并发读的核心优化理念是尽量减少用户到 DB 来"读"数据,或者让他们读更少的数据, 并 发写的处理原则也一样针对秒杀系统需…...
【每日一题】蓝桥杯加练 | Day07
文章目录一、三角回文数1、问题描述2、解题思路3、AC代码一、三角回文数 原题链接:三角回文数 1、问题描述 对于正整数 n, 如果存在正整数 k 使得n123⋯k k(k1)2\frac{k(k1)}{2}2k(k1) , 则 n 称为三角数。例如, 66066 是一个三角数, 因为 66066123⋯363 。 如果一…...
条件语句(分支语句)——“Python”
各位CSDN的uu们你们好呀,最近总是感觉特别特别忙,但是却又不知道到底干了些什么,好像啥也没有做,还忙得莫名其妙,言归正传,今天,小雅兰的内容还是Python呀,介绍一些顺序结构的知识点…...
如何用NanoMsg的6种通信模式搞定分布式系统开发?附代码示例
如何用NanoMsg的6种通信模式构建高可靠分布式系统?实战代码解析 在分布式系统开发中,通信模式的选择往往决定了整个架构的扩展性和可靠性。NanoMsg作为轻量级高性能通信库,提供了6种经过验证的通信模式,每种都对应着特定的应用场景…...
从实验室到产品:脑机接口(BCI)开发中,EEG实时预处理流程设计与避坑指南
从实验室到产品:脑机接口(BCI)开发中EEG实时预处理流程设计与避坑指南 在咖啡馆见到那位渐冻症患者用脑电波操控机械臂喝咖啡时,我意识到脑机接口技术正从实验室走向真实世界。但鲜有人提及的是,这套酷炫系统背后藏着怎样的信号处理炼狱——当…...
stm32开发新手福音:告别复杂安装,用快马ai生成带详解的hal库基础代码
作为一名刚接触STM32开发的新手,我最近在尝试用HAL库控制GPIO时遇到了不少麻烦。从下载安装STM32CubeMX到配置工程,每一步都让我这个小白手忙脚乱。直到发现了InsCode(快马)平台,整个过程变得简单多了——不需要自己搭建环境,AI就…...
实战指南:基于快马生成电商订单自动化n8n工作流,无缝衔接shopify与crm
实战指南:基于快马生成电商订单自动化n8n工作流,无缝衔接shopify与crm 最近在帮朋友优化他们电商业务的后台流程,发现手动处理订单实在太费时间了。特别是遇到大促期间,订单量暴增,人工操作不仅效率低还容易出错。于是…...
避坑指南:Dify知识库数据清洗的5个常见错误与正则表达式优化技巧
避坑指南:Dify知识库数据清洗的5个常见错误与正则表达式优化技巧 在企业级知识库构建过程中,数据清洗环节往往成为影响LLM问答质量的关键瓶颈。许多团队投入大量资源进行知识库建设后,仍面临"清洗了数据但召回率低"的困境。本文将揭…...
嵌入式开发中的静态代码分析工具与应用
嵌入式代码静态分析工具深度解析1. 静态代码分析技术概述1.1 传统编译器的局限性标准C语言编译器通常只能检测代码中的语法错误和部分潜在缺陷,对于程序架构设计和逻辑层面的问题往往无能为力。这种局限性在嵌入式开发中尤为明显,因为嵌入式系统对代码质…...
【超全】基于Springboot多维分类的知识管理系统【包括源码+文档+调试】
💕💕发布人: 码上青云 💕💕各类成品Java毕设 。javaweb,ssm,springboot等项目,欢迎咨询。 💕💕程序开发、技术解答、代码讲解、文档, ἱ…...
League-Toolkit:英雄联盟智能辅助工具全方位评测
League-Toolkit:英雄联盟智能辅助工具全方位评测 【免费下载链接】League-Toolkit 兴趣使然的、简单易用的英雄联盟工具集。支持战绩查询、自动秒选等功能。基于 LCU API。 项目地址: https://gitcode.com/gh_mirrors/le/League-Toolkit 在快节奏的英雄联盟对…...
YOLOv11分割模型实战:从预测到训练,我的完整避坑与调优记录
YOLOv11分割模型实战:从预测到训练,我的完整避坑与调优记录 第一次接触YOLOv11分割任务时,我本以为会像使用常规检测模型那样顺利。直到实际跑通整个流程才发现,从环境配置到训练调优,每个环节都藏着意想不到的"坑…...
staticFunctional:嵌入式零堆内存的std::function替代方案
1. staticFunctional:嵌入式系统中零动态内存开销的 std::function 替代方案1.1 设计动因与工程痛点在资源受限的嵌入式系统(如 ARM Cortex-M0/M4、AVR、ESP32、Teensy 系列)中,std::function的标准实现存在根本性兼容障碍。其典型…...
