Reactive 编程-Project Reactor
Reactive 编程与 Project Reactor
Reactive 编程是一种编程范式,主要用于处理异步数据流。它旨在通过声明式的编程方式处理事件驱动的非阻塞任务,特别适合于构建响应式、可扩展、高并发的应用。随着互联网应用规模的扩大和响应速度的提升需求,Reactive 编程变得越来越流行。
在 Java 生态中,Project Reactor 是支持 Reactive 编程的核心库之一,基于 Reactive Streams 规范,并被 Spring 5 中的 WebFlux 采用为核心反应式框架。它提供了强大的 API 用于处理异步数据流,同时保持良好的性能和响应性。
一、Reactive 编程的核心概念
在传统的编程模型中,异步处理通常会涉及回调函数、线程池和复杂的同步控制,这样的代码不仅难以维护,还容易出现阻塞和性能瓶颈。Reactive 编程的出现解决了这些问题,它通过响应式的数据流,允许程序按需、非阻塞地处理数据和事件。
Reactive 编程的几个重要特点:
-
异步与非阻塞:程序可以异步处理任务,而不会因为等待结果而阻塞线程。这种非阻塞机制非常适合处理 I/O 密集型任务。
-
事件驱动:通过数据流来处理一系列事件。当有新数据产生时,系统会自动响应和处理这些数据。
-
流式处理:处理数据流中的数据项,类似于 Java 8 中的
StreamAPI,但不同的是,Reactive 流可以处理动态、无限的数据流。 -
背压(Backpressure):Reactive Streams 规范提供了背压机制,允许消费者根据自身的能力,按需请求数据,避免生产者产生过多的数据而导致内存溢出。
二、Project Reactor 概述
Project Reactor 是一个支持 Reactive Streams 规范的响应式编程库。它是构建在 Reactive Streams 基础上的高性能框架,提供了几种关键的异步数据流处理类型:Mono 和 Flux。
- Mono:表示 0 或 1 个元素的异步数据流。适用于返回单个结果的场景,例如 HTTP 请求的响应。
- Flux:表示 0 到 N 个元素的异步数据流,适用于处理多个结果或无限流的场景。
Project Reactor 中的 Mono 和 Flux 提供了强大的操作符(类似于 Java 的 Stream API 中的操作符),用于组合、转换、过滤和操作异步数据流。
三、Mono 和 Flux 的基本用法
1. Mono 的使用
Mono 代表一个包含最多一个元素的异步流。它可以用于表示单个异步任务的结果,如数据库查询、HTTP 请求等。
import reactor.core.publisher.Mono;public class MonoExample {public static void main(String[] args) {// 创建一个包含数据的 MonoMono<String> mono = Mono.just("Hello, Reactive World!");// 订阅并消费 Monomono.subscribe(System.out::println);}
}
在上述示例中,Mono.just("Hello, Reactive World!") 创建了一个包含单个元素的 Mono。当调用 subscribe() 方法时,Mono 开始执行并将数据输出到控制台。
2. Flux 的使用
Flux 是表示 0 到 N 个元素的异步流,适合处理多个数据项或无限的数据流。
import reactor.core.publisher.Flux;public class FluxExample {public static void main(String[] args) {// 创建一个包含多个元素的 FluxFlux<String> flux = Flux.just("Spring", "Reactor", "WebFlux");// 订阅并消费 Fluxflux.subscribe(System.out::println);}
}
在这个例子中,Flux.just() 创建了一个包含多个元素的 Flux,当 subscribe() 被调用时,Flux 会依次发射每个元素并输出它们。
四、背压(Backpressure)机制
背压是 Reactive Streams 中的一个关键概念,旨在解决生产者与消费者之间速率不匹配的问题。当生产者产生数据的速度快于消费者处理的速度时,背压允许消费者按自己的能力请求数据,避免数据积压导致内存问题。
在 Project Reactor 中,背压由订阅者(即消费者)通过 request(n) 方法来控制。例如,Flux 通过 onBackpressureBuffer() 或 onBackpressureDrop() 来处理背压。
import reactor.core.publisher.Flux;public class BackpressureExample {public static void main(String[] args) {Flux.range(1, 1000).onBackpressureBuffer(10) // 当消费者处理不过来时,使用缓冲区.subscribe(data -> {System.out.println("Processing " + data);try {Thread.sleep(100); // 模拟处理时间} catch (InterruptedException e) {e.printStackTrace();}},error -> System.err.println("Error: " + error),() -> System.out.println("Complete"));}
}
在该示例中,Flux 将发射 1000 个元素,但消费者处理的速度较慢,因此我们使用 onBackpressureBuffer(10) 将多余的数据存放到缓冲区,以便消费者可以按需处理数据。
五、常用的操作符
Project Reactor 提供了许多操作符,用于处理和转换数据流。以下是一些常用的操作符示例:
1. map()
map() 操作符用于将每个元素进行转换,类似于 Java 8 的 Stream.map()。
Flux<Integer> numbers = Flux.just(1, 2, 3, 4);
numbers.map(n -> n * 2).subscribe(System.out::println); // 输出 2, 4, 6, 8
2. flatMap()
flatMap() 可以用于异步转换,并返回新的 Mono 或 Flux。
Flux<String> names = Flux.just("Tom", "Jerry", "Spike");
names.flatMap(name -> Flux.just(name.toUpperCase())).subscribe(System.out::println); // 输出 TOM, JERRY, SPIKE
3. filter()
filter() 用于过滤掉不满足条件的元素。
Flux<Integer> numbers = Flux.range(1, 10);
numbers.filter(n -> n % 2 == 0).subscribe(System.out::println); // 输出 2, 4, 6, 8, 10
4. reduce()
reduce() 用于将多个值组合成一个值,类似于 Stream.reduce()。
Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);
numbers.reduce((a, b) -> a + b).subscribe(System.out::println); // 输出 15
5. zip()
zip() 操作符用于合并多个流,并将其元素“打包”成一个对象或元组。
Flux<String> names = Flux.just("Tom", "Jerry");
Flux<Integer> ages = Flux.just(5, 6);
Flux.zip(names, ages).subscribe(tuple -> System.out.println("Name: " + tuple.getT1() + ", Age: " + tuple.getT2()));
六、Project Reactor 的错误处理
Reactive 编程中的错误处理非常重要,因为异步流程中的异常不会像同步代码那样直接抛出。在 Project Reactor 中,有几种方式处理错误:
1. onErrorReturn()
在发生错误时返回默认值。
Flux<Integer> flux = Flux.just(1, 2, 0, 4).map(i -> 10 / i) // 会产生除以 0 的异常.onErrorReturn(-1);
flux.subscribe(System.out::println); // 输出 10, 5, -1
2. onErrorResume()
onErrorResume() 允许你在发生错误时切换到一个新的 Flux 或 Mono。
Flux<Integer> flux = Flux.just(1, 2, 0, 4).map(i -> 10 / i).onErrorResume(e -> Flux.just(-1, -2));
flux.subscribe(System.out::println); // 输出 10, 5, -1, -2
七、Reactive 编程的优势与适用场景
Reactive 编程在处理高并发、I/O 密集型任务时表现尤为出色,特别适用于以下场景:
-
微服务架构:在微服务中,服务之间的通信常常需要通过非阻塞 I/O,Reactive 编程能够显著提升系统的吞吐量。
-
**高并发的
Web 应用**:Reactive 编程可以处理大量同时进行的用户请求,而不会因为线程阻塞而限制系统性能。
- 事件驱动系统:如物联网(IoT)系统,Reactive 编程可以很好地处理流式数据和异步事件。
八、总结
Reactive 编程为构建高性能、响应式系统提供了一种全新的方式,而 Project Reactor 则是 Java 生态中实现 Reactive Streams 规范的强大工具。通过 Mono 和 Flux 的流式 API,开发者可以简洁高效地处理异步任务,并借助背压机制避免过度生产数据导致的资源问题。
Project Reactor 的丰富操作符、错误处理机制和与 Spring WebFlux 的无缝集成,使其成为开发现代高并发应用的得力助手。掌握 Reactive 编程及其在 Project Reactor 中的实现,能够显著提高应用的扩展性、性能和响应性。
相关文章:
Reactive 编程-Project Reactor
Reactive 编程与 Project Reactor Reactive 编程是一种编程范式,主要用于处理异步数据流。它旨在通过声明式的编程方式处理事件驱动的非阻塞任务,特别适合于构建响应式、可扩展、高并发的应用。随着互联网应用规模的扩大和响应速度的提升需求࿰…...
splice用法
天行健,君子以自强不息;地势坤,君子以厚德载物。 每个人都有惰性,但不断学习是好好生活的根本,共勉! 文章均为学习整理笔记,分享记录为主,如有错误请指正,共同学习进步。…...
Redis - 缓存
文章目录 目录 文章目录 1. 什么是缓存? 2. 使用Redis作为缓存 2.1 关系型数据库的缺点 3. 缓存的更新策略 3.1 定期生成 3.2 实时生成 缓存淘汰策略 4. 缓存预热, 缓存穿透, 缓存雪崩 和 缓存击穿 缓存预热 缓存穿透 缓存雪崩 缓存击穿 总结 1. 什么…...
基于SpringBoot+Vue的养老院管理系统
作者:计算机学姐 开发技术:SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等,“文末源码”。 专栏推荐:前后端分离项目源码、SpringBoot项目源码、SSM项目源码 系统展示 【2025最新】基于JavaSpringBootVueMySQL的…...
多线程爬虫接入代理IP:高效数据抓取的秘诀
在现代网络环境中,爬虫已经成为获取信息的利器。然而,随着网站反爬措施的不断升级,单线程爬虫往往无法满足需求。多线程爬虫与代理IP的结合,不仅能提高效率,还能有效规避IP封禁问题。本文将详细探讨多线程爬虫接入代理…...
[网络][CISCO]Cisco-PIX配置详解
Cisco PIX防火墙配置指南 任何企业安全策略的一个主要部分都是实现和维护防火墙,因此防火墙在网络安全的实现当中扮演着重要的角色。防火墙通常位于企业网络的边缘,使内部网络与Internet之间或与其他外部网络互相隔离,并限制网络互访&#x…...
拒绝千篇一律,AI帮你定制独一无二的个人写真
每个女人都渴望展现最美的自己,你是否厌倦了拍出千篇一律的照片?今天,我要告诉你一个秘密,用简单三步,即可打造属于你的独一无二个人写真!文生图、蒙版换脸、图生图,三步化身超级模特࿰…...
在云服务器上安装 RabbitMQ:从零到一的最佳实践
🛠 1. RabbitMQ 简介 RabbitMQ 是一个开源的消息代理中间件,广泛应用于高并发、异步任务队列的场景中。在分布式系统架构中,RabbitMQ 可以充当消息的中转站,帮助不同服务之间进行高效的消息通信。 在这篇文章中,我们…...
【nginx】搭配okhttp 配置反向代理
nginx的默认是一个反向代理。 nginx会默认把输入的请求,转向其他的服务器执行。 这些转向的服务器与客户端发起的服务器不是同一个。 客户端只认识nginx,不知道ngiix转向何方。 正向代理修改okhttp的proxy,实际上很多代理都是正向的。 反向代理修改请求路径到nginx。 感觉还…...
Android V 广播注册和配置注意事项问题
现象 在Android V平台上,应用注册非Protected广播时,如果没有加导出flag会抛出异常导致进程crash。 E/AndroidRuntime: FATAL EXCEPTION: main java.lang.SecurityException: com.demo.myapplication: One of RECEIVER_EXPORTED or RECEIVER_NOT_EXPORT…...
深入解读Docker核心原理:Namespace资源隔离机制详解
在容器技术中,资源隔离 是容器化能够实现轻量级虚拟化的关键技术之一。通过资源隔离,容器可以拥有自己的独立环境,确保容器之间互不干扰,从而实现应用的安全和稳定。Docker作为主流的容器平台,其核心的资源隔离机制依赖…...
学习通、智慧职教刷课脚本
🐐个人主页 可惜已不在 🐋可以分享给身边有需要的人🐶 🐉有用的话就留下一个三连吧😼 目录 一.安装 脚本运行器 篡改猴 - Microsoft Edge Addons 二.安装脚本 三.扩展 一.安装 脚本运行器 安装浏览器 Microsoft E…...
SEO写作:从实战到精进的全方位指南
在数字化浪潮中,SEO不再是简单的关键词堆砌,而是成为企业品牌建设与市场拓展的核心策略。作为一名深耕SEO领域的实践者,我深知其中的门道与奥秘。今天,我将结合过往实战经验,以独特视角,带你一窥SEO写作的精…...
解决 git 不是内部或外部命令,也不是可运行的程序
目录 报错提示: 一、解决办法 1、从git官网下载windows版本的git 2、安装 3、注意事项 二、报错 1、解决 fatal: Not a git repository (or any of the parent directories): .git 问题 报错提示: 一、解决办法 Windows下配置Git: 1…...
【卷起来】VUE3.0教程-07-异步请求处理(springboot后端)
🌲 服务端接口准备 pom文件,引入mybatis/mybatis-plus相关依赖 <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>&…...
初一信息科技2024指南辅助教学软件(抓包软件)
专门针对信息科技20204指南写的程序,互联网和直播等知识中包含tcp/ip和udp,三次握手等原理,需要简单明了的实验来说明,在机房中需要用抓包软件,可能需要安装windump npcap等软件非常繁琐,还需要接触保护卡&…...
上汽大众:存储成本节约85%,查询性能提升5倍|OceanBase案例
近日,上汽大众汽车有限公司(简称“上汽大众”)的积分卡券等关键业务系统,已成功升级至 OB Cloud 云数据库。借助 OceanBase 原生分布式数据库的卓越性能与先进技术,实现了存储成本的大幅降低,高达85%&#…...
如何快准稳 实现MySQL大表历史数据迁移?
历史迁移解决方案以微服务架构为基础,使用多种设计模式,如:单例、桥接、工厂、模板、策略等。其中涉及的核心技术有多线程、过滤器等,致力于解决MySQL大表迁移的问题,提供多种迁移模式,如:库到库…...
C和指针:函数
函数定义 函数体就是一个代码块,它在函数被调用时执行。 类型 函数名(形式参数) 代码块 与函数定义相反,函数声明出现在函数被调用的地方。 函数声明 编译器是如何知道该函数期望接受的是什么类型和多少数量的参数。 原型 int *find_int( int key…...
Linux——分离部署,分化压力
PQS/TPS 每秒请求数/ 每秒事务数 // 流量衡量参数 可以根据预估QPS 和 服务器的支持的最高QPS 对照计算 就可以得出 需要上架的服务器的最小数量 PV 页面浏览数 UV 独立用户访问量 // 对于网站的总体访问量 response time 响应时间 // 每个请求的响应时间…...
Java 语言特性(面试系列1)
一、面向对象编程 1. 封装(Encapsulation) 定义:将数据(属性)和操作数据的方法绑定在一起,通过访问控制符(private、protected、public)隐藏内部实现细节。示例: public …...
k8s从入门到放弃之Ingress七层负载
k8s从入门到放弃之Ingress七层负载 在Kubernetes(简称K8s)中,Ingress是一个API对象,它允许你定义如何从集群外部访问集群内部的服务。Ingress可以提供负载均衡、SSL终结和基于名称的虚拟主机等功能。通过Ingress,你可…...
macOS多出来了:Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用
文章目录 问题现象问题原因解决办法 问题现象 macOS启动台(Launchpad)多出来了:Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用。 问题原因 很明显,都是Google家的办公全家桶。这些应用并不是通过独立安装的…...
Nuxt.js 中的路由配置详解
Nuxt.js 通过其内置的路由系统简化了应用的路由配置,使得开发者可以轻松地管理页面导航和 URL 结构。路由配置主要涉及页面组件的组织、动态路由的设置以及路由元信息的配置。 自动路由生成 Nuxt.js 会根据 pages 目录下的文件结构自动生成路由配置。每个文件都会对…...
linux 下常用变更-8
1、删除普通用户 查询用户初始UID和GIDls -l /home/ ###家目录中查看UID cat /etc/group ###此文件查看GID删除用户1.编辑文件 /etc/passwd 找到对应的行,YW343:x:0:0::/home/YW343:/bin/bash 2.将标红的位置修改为用户对应初始UID和GID: YW3…...
ardupilot 开发环境eclipse 中import 缺少C++
目录 文章目录 目录摘要1.修复过程摘要 本节主要解决ardupilot 开发环境eclipse 中import 缺少C++,无法导入ardupilot代码,会引起查看不方便的问题。如下图所示 1.修复过程 0.安装ubuntu 软件中自带的eclipse 1.打开eclipse—Help—install new software 2.在 Work with中…...
06 Deep learning神经网络编程基础 激活函数 --吴恩达
深度学习激活函数详解 一、核心作用 引入非线性:使神经网络可学习复杂模式控制输出范围:如Sigmoid将输出限制在(0,1)梯度传递:影响反向传播的稳定性二、常见类型及数学表达 Sigmoid σ ( x ) = 1 1 +...
华为云Flexus+DeepSeek征文|DeepSeek-V3/R1 商用服务开通全流程与本地部署搭建
华为云FlexusDeepSeek征文|DeepSeek-V3/R1 商用服务开通全流程与本地部署搭建 前言 如今大模型其性能出色,华为云 ModelArts Studio_MaaS大模型即服务平台华为云内置了大模型,能助力我们轻松驾驭 DeepSeek-V3/R1,本文中将分享如何…...
2023赣州旅游投资集团
单选题 1.“不登高山,不知天之高也;不临深溪,不知地之厚也。”这句话说明_____。 A、人的意识具有创造性 B、人的认识是独立于实践之外的 C、实践在认识过程中具有决定作用 D、人的一切知识都是从直接经验中获得的 参考答案: C 本题解…...
基于Java Swing的电子通讯录设计与实现:附系统托盘功能代码详解
JAVASQL电子通讯录带系统托盘 一、系统概述 本电子通讯录系统采用Java Swing开发桌面应用,结合SQLite数据库实现联系人管理功能,并集成系统托盘功能提升用户体验。系统支持联系人的增删改查、分组管理、搜索过滤等功能,同时可以最小化到系统…...
