当前位置: 首页 > news >正文

Reactive 编程-Project Reactor

Reactive 编程与 Project Reactor

Reactive 编程是一种编程范式,主要用于处理异步数据流。它旨在通过声明式的编程方式处理事件驱动的非阻塞任务,特别适合于构建响应式、可扩展、高并发的应用。随着互联网应用规模的扩大和响应速度的提升需求,Reactive 编程变得越来越流行。

在 Java 生态中,Project Reactor 是支持 Reactive 编程的核心库之一,基于 Reactive Streams 规范,并被 Spring 5 中的 WebFlux 采用为核心反应式框架。它提供了强大的 API 用于处理异步数据流,同时保持良好的性能和响应性。

一、Reactive 编程的核心概念

在传统的编程模型中,异步处理通常会涉及回调函数、线程池和复杂的同步控制,这样的代码不仅难以维护,还容易出现阻塞和性能瓶颈。Reactive 编程的出现解决了这些问题,它通过响应式的数据流,允许程序按需、非阻塞地处理数据和事件。

Reactive 编程的几个重要特点:

  1. 异步与非阻塞:程序可以异步处理任务,而不会因为等待结果而阻塞线程。这种非阻塞机制非常适合处理 I/O 密集型任务。

  2. 事件驱动:通过数据流来处理一系列事件。当有新数据产生时,系统会自动响应和处理这些数据。

  3. 流式处理:处理数据流中的数据项,类似于 Java 8 中的 Stream API,但不同的是,Reactive 流可以处理动态、无限的数据流。

  4. 背压(Backpressure):Reactive Streams 规范提供了背压机制,允许消费者根据自身的能力,按需请求数据,避免生产者产生过多的数据而导致内存溢出。

二、Project Reactor 概述

Project Reactor 是一个支持 Reactive Streams 规范的响应式编程库。它是构建在 Reactive Streams 基础上的高性能框架,提供了几种关键的异步数据流处理类型:MonoFlux

  • Mono:表示 0 或 1 个元素的异步数据流。适用于返回单个结果的场景,例如 HTTP 请求的响应。
  • Flux:表示 0 到 N 个元素的异步数据流,适用于处理多个结果或无限流的场景。

Project Reactor 中的 MonoFlux 提供了强大的操作符(类似于 Java 的 Stream API 中的操作符),用于组合、转换、过滤和操作异步数据流。

三、Mono 和 Flux 的基本用法
1. Mono 的使用

Mono 代表一个包含最多一个元素的异步流。它可以用于表示单个异步任务的结果,如数据库查询、HTTP 请求等。

import reactor.core.publisher.Mono;public class MonoExample {public static void main(String[] args) {// 创建一个包含数据的 MonoMono<String> mono = Mono.just("Hello, Reactive World!");// 订阅并消费 Monomono.subscribe(System.out::println);}
}

在上述示例中,Mono.just("Hello, Reactive World!") 创建了一个包含单个元素的 Mono。当调用 subscribe() 方法时,Mono 开始执行并将数据输出到控制台。

2. Flux 的使用

Flux 是表示 0 到 N 个元素的异步流,适合处理多个数据项或无限的数据流。

import reactor.core.publisher.Flux;public class FluxExample {public static void main(String[] args) {// 创建一个包含多个元素的 FluxFlux<String> flux = Flux.just("Spring", "Reactor", "WebFlux");// 订阅并消费 Fluxflux.subscribe(System.out::println);}
}

在这个例子中,Flux.just() 创建了一个包含多个元素的 Flux,当 subscribe() 被调用时,Flux 会依次发射每个元素并输出它们。

四、背压(Backpressure)机制

背压是 Reactive Streams 中的一个关键概念,旨在解决生产者与消费者之间速率不匹配的问题。当生产者产生数据的速度快于消费者处理的速度时,背压允许消费者按自己的能力请求数据,避免数据积压导致内存问题。

在 Project Reactor 中,背压由订阅者(即消费者)通过 request(n) 方法来控制。例如,Flux 通过 onBackpressureBuffer()onBackpressureDrop() 来处理背压。

import reactor.core.publisher.Flux;public class BackpressureExample {public static void main(String[] args) {Flux.range(1, 1000).onBackpressureBuffer(10)  // 当消费者处理不过来时,使用缓冲区.subscribe(data -> {System.out.println("Processing " + data);try {Thread.sleep(100);  // 模拟处理时间} catch (InterruptedException e) {e.printStackTrace();}},error -> System.err.println("Error: " + error),() -> System.out.println("Complete"));}
}

在该示例中,Flux 将发射 1000 个元素,但消费者处理的速度较慢,因此我们使用 onBackpressureBuffer(10) 将多余的数据存放到缓冲区,以便消费者可以按需处理数据。

五、常用的操作符

Project Reactor 提供了许多操作符,用于处理和转换数据流。以下是一些常用的操作符示例:

1. map()

map() 操作符用于将每个元素进行转换,类似于 Java 8 的 Stream.map()

Flux<Integer> numbers = Flux.just(1, 2, 3, 4);
numbers.map(n -> n * 2).subscribe(System.out::println);  // 输出 2, 4, 6, 8
2. flatMap()

flatMap() 可以用于异步转换,并返回新的 MonoFlux

Flux<String> names = Flux.just("Tom", "Jerry", "Spike");
names.flatMap(name -> Flux.just(name.toUpperCase())).subscribe(System.out::println);  // 输出 TOM, JERRY, SPIKE
3. filter()

filter() 用于过滤掉不满足条件的元素。

Flux<Integer> numbers = Flux.range(1, 10);
numbers.filter(n -> n % 2 == 0).subscribe(System.out::println);  // 输出 2, 4, 6, 8, 10
4. reduce()

reduce() 用于将多个值组合成一个值,类似于 Stream.reduce()

Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);
numbers.reduce((a, b) -> a + b).subscribe(System.out::println);  // 输出 15
5. zip()

zip() 操作符用于合并多个流,并将其元素“打包”成一个对象或元组。

Flux<String> names = Flux.just("Tom", "Jerry");
Flux<Integer> ages = Flux.just(5, 6);
Flux.zip(names, ages).subscribe(tuple -> System.out.println("Name: " + tuple.getT1() + ", Age: " + tuple.getT2()));
六、Project Reactor 的错误处理

Reactive 编程中的错误处理非常重要,因为异步流程中的异常不会像同步代码那样直接抛出。在 Project Reactor 中,有几种方式处理错误:

1. onErrorReturn()

在发生错误时返回默认值。

Flux<Integer> flux = Flux.just(1, 2, 0, 4).map(i -> 10 / i)  // 会产生除以 0 的异常.onErrorReturn(-1);
flux.subscribe(System.out::println);  // 输出 10, 5, -1
2. onErrorResume()

onErrorResume() 允许你在发生错误时切换到一个新的 FluxMono

Flux<Integer> flux = Flux.just(1, 2, 0, 4).map(i -> 10 / i).onErrorResume(e -> Flux.just(-1, -2));
flux.subscribe(System.out::println);  // 输出 10, 5, -1, -2
七、Reactive 编程的优势与适用场景

Reactive 编程在处理高并发、I/O 密集型任务时表现尤为出色,特别适用于以下场景:

  1. 微服务架构:在微服务中,服务之间的通信常常需要通过非阻塞 I/O,Reactive 编程能够显著提升系统的吞吐量。

  2. **高并发的

Web 应用**:Reactive 编程可以处理大量同时进行的用户请求,而不会因为线程阻塞而限制系统性能。

  1. 事件驱动系统:如物联网(IoT)系统,Reactive 编程可以很好地处理流式数据和异步事件。
八、总结

Reactive 编程为构建高性能、响应式系统提供了一种全新的方式,而 Project Reactor 则是 Java 生态中实现 Reactive Streams 规范的强大工具。通过 MonoFlux 的流式 API,开发者可以简洁高效地处理异步任务,并借助背压机制避免过度生产数据导致的资源问题。

Project Reactor 的丰富操作符、错误处理机制和与 Spring WebFlux 的无缝集成,使其成为开发现代高并发应用的得力助手。掌握 Reactive 编程及其在 Project Reactor 中的实现,能够显著提高应用的扩展性、性能和响应性。

相关文章:

Reactive 编程-Project Reactor

Reactive 编程与 Project Reactor Reactive 编程是一种编程范式&#xff0c;主要用于处理异步数据流。它旨在通过声明式的编程方式处理事件驱动的非阻塞任务&#xff0c;特别适合于构建响应式、可扩展、高并发的应用。随着互联网应用规模的扩大和响应速度的提升需求&#xff0…...

splice用法

天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物。 每个人都有惰性&#xff0c;但不断学习是好好生活的根本&#xff0c;共勉&#xff01; 文章均为学习整理笔记&#xff0c;分享记录为主&#xff0c;如有错误请指正&#xff0c;共同学习进步。…...

Redis - 缓存

文章目录 目录 文章目录 1. 什么是缓存&#xff1f; 2. 使用Redis作为缓存 2.1 关系型数据库的缺点 3. 缓存的更新策略 3.1 定期生成 3.2 实时生成 缓存淘汰策略 4. 缓存预热, 缓存穿透, 缓存雪崩 和 缓存击穿 缓存预热 缓存穿透 缓存雪崩 缓存击穿 总结 1. 什么…...

基于SpringBoot+Vue的养老院管理系统

作者&#xff1a;计算机学姐 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等&#xff0c;“文末源码”。 专栏推荐&#xff1a;前后端分离项目源码、SpringBoot项目源码、SSM项目源码 系统展示 【2025最新】基于JavaSpringBootVueMySQL的…...

多线程爬虫接入代理IP:高效数据抓取的秘诀

在现代网络环境中&#xff0c;爬虫已经成为获取信息的利器。然而&#xff0c;随着网站反爬措施的不断升级&#xff0c;单线程爬虫往往无法满足需求。多线程爬虫与代理IP的结合&#xff0c;不仅能提高效率&#xff0c;还能有效规避IP封禁问题。本文将详细探讨多线程爬虫接入代理…...

[网络][CISCO]Cisco-PIX配置详解

Cisco PIX防火墙配置指南 任何企业安全策略的一个主要部分都是实现和维护防火墙&#xff0c;因此防火墙在网络安全的实现当中扮演着重要的角色。防火墙通常位于企业网络的边缘&#xff0c;使内部网络与Internet之间或与其他外部网络互相隔离&#xff0c;并限制网络互访&#x…...

拒绝千篇一律,AI帮你定制独一无二的个人写真

每个女人都渴望展现最美的自己&#xff0c;你是否厌倦了拍出千篇一律的照片&#xff1f;今天&#xff0c;我要告诉你一个秘密&#xff0c;用简单三步&#xff0c;即可打造属于你的独一无二个人写真&#xff01;文生图、蒙版换脸、图生图&#xff0c;三步化身超级模特&#xff0…...

在云服务器上安装 RabbitMQ:从零到一的最佳实践

&#x1f6e0; 1. RabbitMQ 简介 RabbitMQ 是一个开源的消息代理中间件&#xff0c;广泛应用于高并发、异步任务队列的场景中。在分布式系统架构中&#xff0c;RabbitMQ 可以充当消息的中转站&#xff0c;帮助不同服务之间进行高效的消息通信。 在这篇文章中&#xff0c;我们…...

【nginx】搭配okhttp 配置反向代理

nginx的默认是一个反向代理。 nginx会默认把输入的请求,转向其他的服务器执行。 这些转向的服务器与客户端发起的服务器不是同一个。 客户端只认识nginx,不知道ngiix转向何方。 正向代理修改okhttp的proxy,实际上很多代理都是正向的。 反向代理修改请求路径到nginx。 感觉还…...

Android V 广播注册和配置注意事项问题

现象 在Android V平台上&#xff0c;应用注册非Protected广播时&#xff0c;如果没有加导出flag会抛出异常导致进程crash。 E/AndroidRuntime: FATAL EXCEPTION: main java.lang.SecurityException: com.demo.myapplication: One of RECEIVER_EXPORTED or RECEIVER_NOT_EXPORT…...

深入解读Docker核心原理:Namespace资源隔离机制详解

在容器技术中&#xff0c;资源隔离 是容器化能够实现轻量级虚拟化的关键技术之一。通过资源隔离&#xff0c;容器可以拥有自己的独立环境&#xff0c;确保容器之间互不干扰&#xff0c;从而实现应用的安全和稳定。Docker作为主流的容器平台&#xff0c;其核心的资源隔离机制依赖…...

学习通、智慧职教刷课脚本

&#x1f410;个人主页 可惜已不在 &#x1f40b;可以分享给身边有需要的人&#x1f436; &#x1f409;有用的话就留下一个三连吧&#x1f63c; 目录 一.安装 脚本运行器 篡改猴 - Microsoft Edge Addons 二.安装脚本 三.扩展 一.安装 脚本运行器 安装浏览器 Microsoft E…...

SEO写作:从实战到精进的全方位指南

在数字化浪潮中&#xff0c;SEO不再是简单的关键词堆砌&#xff0c;而是成为企业品牌建设与市场拓展的核心策略。作为一名深耕SEO领域的实践者&#xff0c;我深知其中的门道与奥秘。今天&#xff0c;我将结合过往实战经验&#xff0c;以独特视角&#xff0c;带你一窥SEO写作的精…...

解决 git 不是内部或外部命令,也不是可运行的程序

目录 报错提示&#xff1a; 一、解决办法 1、从git官网下载windows版本的git 2、安装 3、注意事项 二、报错 1、解决 fatal: Not a git repository (or any of the parent directories): .git 问题 报错提示&#xff1a; 一、解决办法 Windows下配置Git&#xff1a; 1…...

【卷起来】VUE3.0教程-07-异步请求处理(springboot后端)

&#x1f332; 服务端接口准备 pom文件&#xff0c;引入mybatis/mybatis-plus相关依赖 <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>&…...

初一信息科技2024指南辅助教学软件(抓包软件)

专门针对信息科技20204指南写的程序&#xff0c;互联网和直播等知识中包含tcp/ip和udp&#xff0c;三次握手等原理&#xff0c;需要简单明了的实验来说明&#xff0c;在机房中需要用抓包软件&#xff0c;可能需要安装windump npcap等软件非常繁琐&#xff0c;还需要接触保护卡&…...

上汽大众:存储成本节约85%,查询性能提升5倍|OceanBase案例

近日&#xff0c;上汽大众汽车有限公司&#xff08;简称“上汽大众”&#xff09;的积分卡券等关键业务系统&#xff0c;已成功升级至 OB Cloud 云数据库。借助 OceanBase 原生分布式数据库的卓越性能与先进技术&#xff0c;实现了存储成本的大幅降低&#xff0c;高达85%&#…...

如何快准稳 实现MySQL大表历史数据迁移?

历史迁移解决方案以微服务架构为基础&#xff0c;使用多种设计模式&#xff0c;如&#xff1a;单例、桥接、工厂、模板、策略等。其中涉及的核心技术有多线程、过滤器等&#xff0c;致力于解决MySQL大表迁移的问题&#xff0c;提供多种迁移模式&#xff0c;如&#xff1a;库到库…...

C和指针:函数

函数定义 函数体就是一个代码块&#xff0c;它在函数被调用时执行。 类型 函数名(形式参数) 代码块 与函数定义相反&#xff0c;函数声明出现在函数被调用的地方。 函数声明 编译器是如何知道该函数期望接受的是什么类型和多少数量的参数。 原型 int *find_int( int key…...

Linux——分离部署,分化压力

PQS/TPS 每秒请求数/ 每秒事务数 // 流量衡量参数 可以根据预估QPS 和 服务器的支持的最高QPS 对照计算 就可以得出 需要上架的服务器的最小数量 PV 页面浏览数 UV 独立用户访问量 // 对于网站的总体访问量 response time 响应时间 // 每个请求的响应时间…...

[特殊字符] 智能合约中的数据是如何在区块链中保持一致的?

&#x1f9e0; 智能合约中的数据是如何在区块链中保持一致的&#xff1f; 为什么所有区块链节点都能得出相同结果&#xff1f;合约调用这么复杂&#xff0c;状态真能保持一致吗&#xff1f;本篇带你从底层视角理解“状态一致性”的真相。 一、智能合约的数据存储在哪里&#xf…...

(LeetCode 每日一题) 3442. 奇偶频次间的最大差值 I (哈希、字符串)

题目&#xff1a;3442. 奇偶频次间的最大差值 I 思路 &#xff1a;哈希&#xff0c;时间复杂度0(n)。 用哈希表来记录每个字符串中字符的分布情况&#xff0c;哈希表这里用数组即可实现。 C版本&#xff1a; class Solution { public:int maxDifference(string s) {int a[26]…...

eNSP-Cloud(实现本地电脑与eNSP内设备之间通信)

说明&#xff1a; 想象一下&#xff0c;你正在用eNSP搭建一个虚拟的网络世界&#xff0c;里面有虚拟的路由器、交换机、电脑&#xff08;PC&#xff09;等等。这些设备都在你的电脑里面“运行”&#xff0c;它们之间可以互相通信&#xff0c;就像一个封闭的小王国。 但是&#…...

Spring Boot 实现流式响应(兼容 2.7.x)

在实际开发中&#xff0c;我们可能会遇到一些流式数据处理的场景&#xff0c;比如接收来自上游接口的 Server-Sent Events&#xff08;SSE&#xff09; 或 流式 JSON 内容&#xff0c;并将其原样中转给前端页面或客户端。这种情况下&#xff0c;传统的 RestTemplate 缓存机制会…...

Vue3 + Element Plus + TypeScript中el-transfer穿梭框组件使用详解及示例

使用详解 Element Plus 的 el-transfer 组件是一个强大的穿梭框组件&#xff0c;常用于在两个集合之间进行数据转移&#xff0c;如权限分配、数据选择等场景。下面我将详细介绍其用法并提供一个完整示例。 核心特性与用法 基本属性 v-model&#xff1a;绑定右侧列表的值&…...

连锁超市冷库节能解决方案:如何实现超市降本增效

在连锁超市冷库运营中&#xff0c;高能耗、设备损耗快、人工管理低效等问题长期困扰企业。御控冷库节能解决方案通过智能控制化霜、按需化霜、实时监控、故障诊断、自动预警、远程控制开关六大核心技术&#xff0c;实现年省电费15%-60%&#xff0c;且不改动原有装备、安装快捷、…...

CentOS下的分布式内存计算Spark环境部署

一、Spark 核心架构与应用场景 1.1 分布式计算引擎的核心优势 Spark 是基于内存的分布式计算框架&#xff0c;相比 MapReduce 具有以下核心优势&#xff1a; 内存计算&#xff1a;数据可常驻内存&#xff0c;迭代计算性能提升 10-100 倍&#xff08;文档段落&#xff1a;3-79…...

汇编常见指令

汇编常见指令 一、数据传送指令 指令功能示例说明MOV数据传送MOV EAX, 10将立即数 10 送入 EAXMOV [EBX], EAX将 EAX 值存入 EBX 指向的内存LEA加载有效地址LEA EAX, [EBX4]将 EBX4 的地址存入 EAX&#xff08;不访问内存&#xff09;XCHG交换数据XCHG EAX, EBX交换 EAX 和 EB…...

算法笔记2

1.字符串拼接最好用StringBuilder&#xff0c;不用String 2.创建List<>类型的数组并创建内存 List arr[] new ArrayList[26]; Arrays.setAll(arr, i -> new ArrayList<>()); 3.去掉首尾空格...

论文笔记——相干体技术在裂缝预测中的应用研究

目录 相关地震知识补充地震数据的认识地震几何属性 相干体算法定义基本原理第一代相干体技术&#xff1a;基于互相关的相干体技术&#xff08;Correlation&#xff09;第二代相干体技术&#xff1a;基于相似的相干体技术&#xff08;Semblance&#xff09;基于多道相似的相干体…...