Reactive 编程-Project Reactor
Reactive 编程与 Project Reactor
Reactive 编程是一种编程范式,主要用于处理异步数据流。它旨在通过声明式的编程方式处理事件驱动的非阻塞任务,特别适合于构建响应式、可扩展、高并发的应用。随着互联网应用规模的扩大和响应速度的提升需求,Reactive 编程变得越来越流行。
在 Java 生态中,Project Reactor 是支持 Reactive 编程的核心库之一,基于 Reactive Streams 规范,并被 Spring 5 中的 WebFlux 采用为核心反应式框架。它提供了强大的 API 用于处理异步数据流,同时保持良好的性能和响应性。
一、Reactive 编程的核心概念
在传统的编程模型中,异步处理通常会涉及回调函数、线程池和复杂的同步控制,这样的代码不仅难以维护,还容易出现阻塞和性能瓶颈。Reactive 编程的出现解决了这些问题,它通过响应式的数据流,允许程序按需、非阻塞地处理数据和事件。
Reactive 编程的几个重要特点:
-
异步与非阻塞:程序可以异步处理任务,而不会因为等待结果而阻塞线程。这种非阻塞机制非常适合处理 I/O 密集型任务。
-
事件驱动:通过数据流来处理一系列事件。当有新数据产生时,系统会自动响应和处理这些数据。
-
流式处理:处理数据流中的数据项,类似于 Java 8 中的
Stream
API,但不同的是,Reactive 流可以处理动态、无限的数据流。 -
背压(Backpressure):Reactive Streams 规范提供了背压机制,允许消费者根据自身的能力,按需请求数据,避免生产者产生过多的数据而导致内存溢出。
二、Project Reactor 概述
Project Reactor 是一个支持 Reactive Streams 规范的响应式编程库。它是构建在 Reactive Streams 基础上的高性能框架,提供了几种关键的异步数据流处理类型:Mono
和 Flux
。
- Mono:表示 0 或 1 个元素的异步数据流。适用于返回单个结果的场景,例如 HTTP 请求的响应。
- Flux:表示 0 到 N 个元素的异步数据流,适用于处理多个结果或无限流的场景。
Project Reactor 中的 Mono
和 Flux
提供了强大的操作符(类似于 Java 的 Stream
API 中的操作符),用于组合、转换、过滤和操作异步数据流。
三、Mono 和 Flux 的基本用法
1. Mono 的使用
Mono
代表一个包含最多一个元素的异步流。它可以用于表示单个异步任务的结果,如数据库查询、HTTP 请求等。
import reactor.core.publisher.Mono;public class MonoExample {public static void main(String[] args) {// 创建一个包含数据的 MonoMono<String> mono = Mono.just("Hello, Reactive World!");// 订阅并消费 Monomono.subscribe(System.out::println);}
}
在上述示例中,Mono.just("Hello, Reactive World!")
创建了一个包含单个元素的 Mono。当调用 subscribe()
方法时,Mono 开始执行并将数据输出到控制台。
2. Flux 的使用
Flux
是表示 0 到 N 个元素的异步流,适合处理多个数据项或无限的数据流。
import reactor.core.publisher.Flux;public class FluxExample {public static void main(String[] args) {// 创建一个包含多个元素的 FluxFlux<String> flux = Flux.just("Spring", "Reactor", "WebFlux");// 订阅并消费 Fluxflux.subscribe(System.out::println);}
}
在这个例子中,Flux.just()
创建了一个包含多个元素的 Flux,当 subscribe()
被调用时,Flux 会依次发射每个元素并输出它们。
四、背压(Backpressure)机制
背压是 Reactive Streams 中的一个关键概念,旨在解决生产者与消费者之间速率不匹配的问题。当生产者产生数据的速度快于消费者处理的速度时,背压允许消费者按自己的能力请求数据,避免数据积压导致内存问题。
在 Project Reactor 中,背压由订阅者(即消费者)通过 request(n)
方法来控制。例如,Flux 通过 onBackpressureBuffer()
或 onBackpressureDrop()
来处理背压。
import reactor.core.publisher.Flux;public class BackpressureExample {public static void main(String[] args) {Flux.range(1, 1000).onBackpressureBuffer(10) // 当消费者处理不过来时,使用缓冲区.subscribe(data -> {System.out.println("Processing " + data);try {Thread.sleep(100); // 模拟处理时间} catch (InterruptedException e) {e.printStackTrace();}},error -> System.err.println("Error: " + error),() -> System.out.println("Complete"));}
}
在该示例中,Flux 将发射 1000 个元素,但消费者处理的速度较慢,因此我们使用 onBackpressureBuffer(10)
将多余的数据存放到缓冲区,以便消费者可以按需处理数据。
五、常用的操作符
Project Reactor 提供了许多操作符,用于处理和转换数据流。以下是一些常用的操作符示例:
1. map()
map()
操作符用于将每个元素进行转换,类似于 Java 8 的 Stream.map()
。
Flux<Integer> numbers = Flux.just(1, 2, 3, 4);
numbers.map(n -> n * 2).subscribe(System.out::println); // 输出 2, 4, 6, 8
2. flatMap()
flatMap()
可以用于异步转换,并返回新的 Mono
或 Flux
。
Flux<String> names = Flux.just("Tom", "Jerry", "Spike");
names.flatMap(name -> Flux.just(name.toUpperCase())).subscribe(System.out::println); // 输出 TOM, JERRY, SPIKE
3. filter()
filter()
用于过滤掉不满足条件的元素。
Flux<Integer> numbers = Flux.range(1, 10);
numbers.filter(n -> n % 2 == 0).subscribe(System.out::println); // 输出 2, 4, 6, 8, 10
4. reduce()
reduce()
用于将多个值组合成一个值,类似于 Stream.reduce()
。
Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);
numbers.reduce((a, b) -> a + b).subscribe(System.out::println); // 输出 15
5. zip()
zip()
操作符用于合并多个流,并将其元素“打包”成一个对象或元组。
Flux<String> names = Flux.just("Tom", "Jerry");
Flux<Integer> ages = Flux.just(5, 6);
Flux.zip(names, ages).subscribe(tuple -> System.out.println("Name: " + tuple.getT1() + ", Age: " + tuple.getT2()));
六、Project Reactor 的错误处理
Reactive 编程中的错误处理非常重要,因为异步流程中的异常不会像同步代码那样直接抛出。在 Project Reactor 中,有几种方式处理错误:
1. onErrorReturn()
在发生错误时返回默认值。
Flux<Integer> flux = Flux.just(1, 2, 0, 4).map(i -> 10 / i) // 会产生除以 0 的异常.onErrorReturn(-1);
flux.subscribe(System.out::println); // 输出 10, 5, -1
2. onErrorResume()
onErrorResume()
允许你在发生错误时切换到一个新的 Flux
或 Mono
。
Flux<Integer> flux = Flux.just(1, 2, 0, 4).map(i -> 10 / i).onErrorResume(e -> Flux.just(-1, -2));
flux.subscribe(System.out::println); // 输出 10, 5, -1, -2
七、Reactive 编程的优势与适用场景
Reactive 编程在处理高并发、I/O 密集型任务时表现尤为出色,特别适用于以下场景:
-
微服务架构:在微服务中,服务之间的通信常常需要通过非阻塞 I/O,Reactive 编程能够显著提升系统的吞吐量。
-
**高并发的
Web 应用**:Reactive 编程可以处理大量同时进行的用户请求,而不会因为线程阻塞而限制系统性能。
- 事件驱动系统:如物联网(IoT)系统,Reactive 编程可以很好地处理流式数据和异步事件。
八、总结
Reactive 编程为构建高性能、响应式系统提供了一种全新的方式,而 Project Reactor 则是 Java 生态中实现 Reactive Streams 规范的强大工具。通过 Mono
和 Flux
的流式 API,开发者可以简洁高效地处理异步任务,并借助背压机制避免过度生产数据导致的资源问题。
Project Reactor 的丰富操作符、错误处理机制和与 Spring WebFlux 的无缝集成,使其成为开发现代高并发应用的得力助手。掌握 Reactive 编程及其在 Project Reactor 中的实现,能够显著提高应用的扩展性、性能和响应性。
相关文章:

Reactive 编程-Project Reactor
Reactive 编程与 Project Reactor Reactive 编程是一种编程范式,主要用于处理异步数据流。它旨在通过声明式的编程方式处理事件驱动的非阻塞任务,特别适合于构建响应式、可扩展、高并发的应用。随着互联网应用规模的扩大和响应速度的提升需求࿰…...

splice用法
天行健,君子以自强不息;地势坤,君子以厚德载物。 每个人都有惰性,但不断学习是好好生活的根本,共勉! 文章均为学习整理笔记,分享记录为主,如有错误请指正,共同学习进步。…...

Redis - 缓存
文章目录 目录 文章目录 1. 什么是缓存? 2. 使用Redis作为缓存 2.1 关系型数据库的缺点 3. 缓存的更新策略 3.1 定期生成 3.2 实时生成 缓存淘汰策略 4. 缓存预热, 缓存穿透, 缓存雪崩 和 缓存击穿 缓存预热 缓存穿透 缓存雪崩 缓存击穿 总结 1. 什么…...

基于SpringBoot+Vue的养老院管理系统
作者:计算机学姐 开发技术:SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等,“文末源码”。 专栏推荐:前后端分离项目源码、SpringBoot项目源码、SSM项目源码 系统展示 【2025最新】基于JavaSpringBootVueMySQL的…...

多线程爬虫接入代理IP:高效数据抓取的秘诀
在现代网络环境中,爬虫已经成为获取信息的利器。然而,随着网站反爬措施的不断升级,单线程爬虫往往无法满足需求。多线程爬虫与代理IP的结合,不仅能提高效率,还能有效规避IP封禁问题。本文将详细探讨多线程爬虫接入代理…...

[网络][CISCO]Cisco-PIX配置详解
Cisco PIX防火墙配置指南 任何企业安全策略的一个主要部分都是实现和维护防火墙,因此防火墙在网络安全的实现当中扮演着重要的角色。防火墙通常位于企业网络的边缘,使内部网络与Internet之间或与其他外部网络互相隔离,并限制网络互访&#x…...

拒绝千篇一律,AI帮你定制独一无二的个人写真
每个女人都渴望展现最美的自己,你是否厌倦了拍出千篇一律的照片?今天,我要告诉你一个秘密,用简单三步,即可打造属于你的独一无二个人写真!文生图、蒙版换脸、图生图,三步化身超级模特࿰…...

在云服务器上安装 RabbitMQ:从零到一的最佳实践
🛠 1. RabbitMQ 简介 RabbitMQ 是一个开源的消息代理中间件,广泛应用于高并发、异步任务队列的场景中。在分布式系统架构中,RabbitMQ 可以充当消息的中转站,帮助不同服务之间进行高效的消息通信。 在这篇文章中,我们…...

【nginx】搭配okhttp 配置反向代理
nginx的默认是一个反向代理。 nginx会默认把输入的请求,转向其他的服务器执行。 这些转向的服务器与客户端发起的服务器不是同一个。 客户端只认识nginx,不知道ngiix转向何方。 正向代理修改okhttp的proxy,实际上很多代理都是正向的。 反向代理修改请求路径到nginx。 感觉还…...

Android V 广播注册和配置注意事项问题
现象 在Android V平台上,应用注册非Protected广播时,如果没有加导出flag会抛出异常导致进程crash。 E/AndroidRuntime: FATAL EXCEPTION: main java.lang.SecurityException: com.demo.myapplication: One of RECEIVER_EXPORTED or RECEIVER_NOT_EXPORT…...

深入解读Docker核心原理:Namespace资源隔离机制详解
在容器技术中,资源隔离 是容器化能够实现轻量级虚拟化的关键技术之一。通过资源隔离,容器可以拥有自己的独立环境,确保容器之间互不干扰,从而实现应用的安全和稳定。Docker作为主流的容器平台,其核心的资源隔离机制依赖…...

学习通、智慧职教刷课脚本
🐐个人主页 可惜已不在 🐋可以分享给身边有需要的人🐶 🐉有用的话就留下一个三连吧😼 目录 一.安装 脚本运行器 篡改猴 - Microsoft Edge Addons 二.安装脚本 三.扩展 一.安装 脚本运行器 安装浏览器 Microsoft E…...

SEO写作:从实战到精进的全方位指南
在数字化浪潮中,SEO不再是简单的关键词堆砌,而是成为企业品牌建设与市场拓展的核心策略。作为一名深耕SEO领域的实践者,我深知其中的门道与奥秘。今天,我将结合过往实战经验,以独特视角,带你一窥SEO写作的精…...

解决 git 不是内部或外部命令,也不是可运行的程序
目录 报错提示: 一、解决办法 1、从git官网下载windows版本的git 2、安装 3、注意事项 二、报错 1、解决 fatal: Not a git repository (or any of the parent directories): .git 问题 报错提示: 一、解决办法 Windows下配置Git: 1…...

【卷起来】VUE3.0教程-07-异步请求处理(springboot后端)
🌲 服务端接口准备 pom文件,引入mybatis/mybatis-plus相关依赖 <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>&…...

初一信息科技2024指南辅助教学软件(抓包软件)
专门针对信息科技20204指南写的程序,互联网和直播等知识中包含tcp/ip和udp,三次握手等原理,需要简单明了的实验来说明,在机房中需要用抓包软件,可能需要安装windump npcap等软件非常繁琐,还需要接触保护卡&…...

上汽大众:存储成本节约85%,查询性能提升5倍|OceanBase案例
近日,上汽大众汽车有限公司(简称“上汽大众”)的积分卡券等关键业务系统,已成功升级至 OB Cloud 云数据库。借助 OceanBase 原生分布式数据库的卓越性能与先进技术,实现了存储成本的大幅降低,高达85%&#…...

如何快准稳 实现MySQL大表历史数据迁移?
历史迁移解决方案以微服务架构为基础,使用多种设计模式,如:单例、桥接、工厂、模板、策略等。其中涉及的核心技术有多线程、过滤器等,致力于解决MySQL大表迁移的问题,提供多种迁移模式,如:库到库…...

C和指针:函数
函数定义 函数体就是一个代码块,它在函数被调用时执行。 类型 函数名(形式参数) 代码块 与函数定义相反,函数声明出现在函数被调用的地方。 函数声明 编译器是如何知道该函数期望接受的是什么类型和多少数量的参数。 原型 int *find_int( int key…...

Linux——分离部署,分化压力
PQS/TPS 每秒请求数/ 每秒事务数 // 流量衡量参数 可以根据预估QPS 和 服务器的支持的最高QPS 对照计算 就可以得出 需要上架的服务器的最小数量 PV 页面浏览数 UV 独立用户访问量 // 对于网站的总体访问量 response time 响应时间 // 每个请求的响应时间…...

javaaaa
1 飞机票 代码实现: import java.util.Scanner; public class F1 {public static void main(String[] args) {Scanner input new Scanner(System.in);System.out.print("请输入票价: ");double jia input.nextDouble();System.out.print(&…...

游戏开发引擎___unity位置信息和unlit shader(无光照着色器)的使用,以桌子的渲染为例
unity是左手坐标系 1.位置信息 1.1 代码 using System.Collections; using System.Collections.Generic; using UnityEngine;public class positionTest : MonoBehaviour {public Camera Camera;private void OnGUI(){//世界坐标系,GUI里的标签GUI.Label(new Rec…...

反向沙箱的功能特点
在这个信息化飞速发展的时代,企业的数据安全面临着前所未有的挑战。员工的无意操作、恶意软件的潜伏、甚至是敌对势力的网络攻击,都可能成为企业数据安全的致命威胁。深信达SPN反向沙箱为您筑起了一道坚不可摧的数据安全防线! 来百度APP畅享高…...

可测试,可维护,可移植:上位机软件分层设计的重要性
互联网中,软件工程师岗位会分前端工程师,后端工程师。这是由于互联网软件规模庞大,从业人员众多。前后端分别根据各自需求发展不一样的技术栈。那么上位机软件呢?它规模小,通常一个人就能开发一个项目。它还有必要分前…...

构造函数与析构函数的执行顺序
对象作为成员变量的构造函数与析构函数 当一个类包含另一个类的对象作为成员时,这些成员对象的构造函数会在包含它们的对象的构造函数之前被调用,而它们的析构函数则会在包含它们的对象的析构函数之后被调用。成员对象的构造函数和析构函数的调用顺序与…...

Vue框架;Vue中的选择和循环结构;Vue数据类型;Vue中的事件和动态属性;Vue子组件通过导入在主组件显示在网页;Vue中主组件向子组件传递数据
一,Vue简介 前端现在比较火的三大框架就是:vue ,React,Angular。在国内使用最多的还是: vue >React >Angular Vue (发音为 /vjuː/,类似 view) 是一款用于构建用户界面的 JavaScript 框架。它基于标准…...

懒人笔记-opencv4.8.0篇
懒人笔记-opencv4.8.0篇 前言1、卸载 opencv3.4.31.1 cmake1.2 编译过程1.3 卸载1.4 检查代码是否卸载干净 2、安装 opencv4.8.02.1 安装依赖2.2 创建编译目录2.3 设置编译选项2.4 执行编译命令2.5 环境配置2.5.1、环境配置添加库路径2.5.2 更新系统2.5.3 配置bash2.5.4 保存退…...

解决uniapp视频video组件进入全屏再退出全屏后,cover-view失效的问题
给cover-view一个变量如isCloseBtnShow,通过v-if(不要用v-show)来控制显示隐藏。监听video全屏事件,全屏时,设置变量为false,退出全屏时再设为true,这样每次退出全屏,cover-view会重新加载。被覆盖的问题就…...

ip属地河北切换北京
我们知道,每当电脑或手机连接网络时,都会分配到一个网络IP地址,这个IP地址通常与设备所在的地区网络相关联。然而,出于业务或个人需求,有时我们需要将本机的IP地址切换到其他城市。例如要将IP属地河北切换北京…...

fpga入门名词(1)
这是第一代FPGA ,在 FPGA(现场可编程门阵列)设计中,LCA(逻辑单元阵列)通常由几个关键组件构成,包括 IOB、CLB 和 Interconnect。以下是这些组件的简要说明: 1. IOB(Input/Output B…...