当前位置: 首页 > article >正文

Java 响应式编程:Reactor 框架深度解析

Java 响应式编程Reactor 框架深度解析核心概念响应式编程是一种编程范式关注数据的异步流和变化传播。在 Java 中Reactor 框架提供了强大的响应式编程支持基于 Reactive Streams 规范实现。Reactor 核心组件Mono表示 0 或 1 个元素的异步序列Flux表示 0 到 N 个元素的异步序列Scheduler调度器控制任务执行线程Operator操作符用于转换和处理数据流创建流// 创建 Flux FluxString flux Flux.just(Hello, World, Reactor); // 创建 Mono MonoString mono Mono.just(Hello); // 从集合创建 ListString list Arrays.asList(A, B, C); FluxString fromList Flux.fromIterable(list); // 从数组创建 String[] array {X, Y, Z}; FluxString fromArray Flux.fromArray(array); // 创建范围 FluxInteger range Flux.range(1, 10); // 创建空流 FluxString empty Flux.empty(); MonoString monoEmpty Mono.empty(); // 创建错误流 FluxString error Flux.error(new RuntimeException(Something went wrong));订阅流// 订阅并消费数据 Flux.just(A, B, C) .subscribe( item - System.out.println(Received: item), error - System.err.println(Error: error.getMessage()), () - System.out.println(Completed) ); // 简化订阅 Flux.just(1, 2, 3) .subscribe(System.out::println); // 使用 Disposable 控制订阅 Disposable disposable Flux.interval(Duration.ofSeconds(1)) .subscribe(tick - System.out.println(Tick: tick)); // 5 秒后取消订阅 Thread.sleep(5000); disposable.dispose();操作符// 转换操作符 Flux.just(hello, world) .map(String::toUpperCase) .subscribe(System.out::println); // HELLO, WORLD // 过滤操作符 Flux.range(1, 10) .filter(n - n % 2 0) .subscribe(System.out::println); // 2, 4, 6, 8, 10 // 映射操作符 Flux.just(a, b, c) .flatMap(s - Flux.just(s.toUpperCase(), s.toLowerCase())) .subscribe(System.out::println); // A, a, B, b, C, c // 组合操作符 FluxString flux1 Flux.just(A, B); FluxString flux2 Flux.just(X, Y); Flux.concat(flux1, flux2) .subscribe(System.out::println); // A, B, X, Y Flux.zip(flux1, flux2, (a, b) - a - b) .subscribe(System.out::println); // A-X, B-Y // 聚合操作符 Flux.range(1, 10) .reduce(0, Integer::sum) .subscribe(sum - System.out.println(Sum: sum)); // Sum: 55 Flux.range(1, 10) .collectList() .subscribe(list - System.out.println(List: list)); // List: [1, 2, ..., 10]错误处理// 错误处理操作符 Flux.error(new RuntimeException(Original error)) .onErrorReturn(Fallback value) .subscribe(System.out::println); // Fallback value Flux.error(new RuntimeException(Original error)) .onErrorResume(e - Flux.just(Recovered from: e.getMessage())) .subscribe(System.out::println); // Recovered from: Original error Flux.just(1, 2, 0, 3) .map(n - 10 / n) .onErrorContinue((e, value) - System.out.println(Skipping value)) .subscribe(System.out::println); // 10, 5, 3 Flux.just(1, 2, 3) .doOnError(e - System.err.println(Error: e.getMessage())) .subscribe(); // 使用 retry Flux.error(new RuntimeException(Retry error)) .retry(3) .subscribe( System.out::println, e - System.err.println(Failed after retries: e.getMessage()) );调度器// 使用调度器 Flux.range(1, 10) .subscribeOn(Schedulers.parallel()) // 在并行线程池执行 .subscribe(System.out::println); Flux.range(1, 10) .subscribeOn(Schedulers.boundedElastic()) // 在弹性线程池执行 .observeOn(Schedulers.single()) // 在单线程池观察结果 .subscribe(System.out::println); // 创建自定义调度器 Scheduler customScheduler Schedulers.newParallel(custom, 4); Flux.range(1, 10) .subscribeOn(customScheduler) .subscribe(System.out::println); // 使用虚拟线程Java 21 Flux.range(1, 10) .subscribeOn(Schedulers.fromExecutor(Executors.newVirtualThreadPerTaskExecutor())) .subscribe(System.out::println);背压处理// 背压策略 Flux.range(1, 1000) .onBackpressureBuffer(100) // 缓冲最多 100 个元素 .subscribe(); Flux.range(1, 1000) .onBackpressureDrop(dropped - System.out.println(Dropped: dropped)) .subscribe(); Flux.range(1, 1000) .onBackpressureLatest() // 只保留最新元素 .subscribe(); // 使用 request 控制流速 Flux.range(1, 10) .subscribe(new BaseSubscriberInteger() { Override protected void hookOnSubscribe(Subscription subscription) { request(2); // 初始请求 2 个元素 } Override protected void hookOnNext(Integer value) { System.out.println(Received: value); request(1); // 处理完一个再请求一个 } });实际应用示例// 响应式 REST 客户端 Service public class UserService { private final WebClient webClient; public UserService(WebClient.Builder webClientBuilder) { this.webClient webClientBuilder.baseUrl(https://api.example.com).build(); } public MonoUser getUserById(Long id) { return webClient.get() .uri(/users/{id}, id) .retrieve() .onStatus(HttpStatus::isError, response - response.bodyToMono(String.class) .flatMap(body - Mono.error(new UserNotFoundException(body))) ) .bodyToMono(User.class); } public FluxUser getUsersByDepartment(String department) { return webClient.get() .uri(/users?department{department}, department) .retrieve() .bodyToFlux(User.class); } public MonoUser createUser(UserCreateRequest request) { return webClient.post() .uri(/users) .bodyValue(request) .retrieve() .onStatus(HttpStatus::isError, response - response.bodyToMono(String.class) .flatMap(body - Mono.error(new UserCreationException(body))) ) .bodyToMono(User.class); } } // 响应式数据访问 Repository public class ReactiveUserRepository { private final MongoClient mongoClient; public ReactiveUserRepository(MongoClient mongoClient) { this.mongoClient mongoClient; } public MonoUser findById(String id) { return getCollection().findById(id); } public FluxUser findByDepartment(String department) { return getCollection().find(query(where(department).is(department))); } public MonoUser save(User user) { return getCollection().save(user); } public MonoVoid deleteById(String id) { return getCollection().deleteById(id); } private MongoCollectionUser getCollection() { return mongoClient.getDatabase(example).getCollection(users, User.class); } } // 组合多个异步操作 Service public class OrderService { private final UserService userService; private final ProductService productService; private final OrderRepository orderRepository; public OrderService(UserService userService, ProductService productService, OrderRepository orderRepository) { this.userService userService; this.productService productService; this.orderRepository orderRepository; } public MonoOrder createOrder(Long userId, Long productId, int quantity) { return Mono.zip( userService.getUserById(userId), productService.getProductById(productId) ) .flatMap(tuple - { User user tuple.getT1(); Product product tuple.getT2(); if (product.getStock() quantity) { return Mono.error(new InsufficientStockException()); } Order order new Order(); order.setUserId(userId); order.setProductId(productId); order.setQuantity(quantity); order.setTotalPrice(product.getPrice() * quantity); order.setStatus(PENDING); return orderRepository.save(order); }) .doOnSuccess(order - { // 更新库存 productService.updateStock(productId, -quantity).subscribe(); }) .doOnError(e - { // 记录错误日志 System.err.println(Failed to create order: e.getMessage()); }); } }测试响应式代码// 测试 Mono Test void monoTest() { MonoString mono Mono.just(Hello); StepVerifier.create(mono) .expectNext(Hello) .expectComplete() .verify(); } // 测试 Flux Test void fluxTest() { FluxInteger flux Flux.range(1, 5); StepVerifier.create(flux) .expectNext(1, 2, 3, 4, 5) .expectComplete() .verify(); } // 测试错误处理 Test void errorTest() { FluxString errorFlux Flux.error(new RuntimeException(Test error)); StepVerifier.create(errorFlux) .expectError(RuntimeException.class) .verify(); } // 测试转换操作 Test void transformTest() { FluxString flux Flux.just(a, b, c) .map(String::toUpperCase); StepVerifier.create(flux) .expectNext(A, B, C) .expectComplete() .verify(); }最佳实践避免阻塞不要在响应式链中调用阻塞方法正确处理背压根据消费者能力控制流速使用合适的调度器根据任务类型选择调度器组合操作符使用 compose 和 transform 组合操作符错误处理在适当的位置处理错误资源管理使用 using 管理资源生命周期避免嵌套订阅使用 flatMap 代替嵌套 subscribe测试验证使用 StepVerifier 测试响应式代码实际应用场景高并发 API处理大量并发请求实时数据流如 WebSocket 通信数据处理管道批量数据处理异步组合组合多个异步操作总结Reactor 框架为 Java 提供了强大的响应式编程能力。通过 Mono 和 Flux 两个核心类型可以优雅地处理异步数据流。合理使用操作符和调度器可以构建高性能、高并发的应用程序。别叫我大神叫我 Alex 就好。这其实可以更优雅一点响应式编程让异步操作变得更加简洁和高效。

相关文章:

Java 响应式编程:Reactor 框架深度解析

Java 响应式编程:Reactor 框架深度解析 核心概念 响应式编程是一种编程范式,关注数据的异步流和变化传播。在 Java 中,Reactor 框架提供了强大的响应式编程支持,基于 Reactive Streams 规范实现。 Reactor 核心组件 Mono&#xff…...

ComfyUI-Manager终极指南:轻松管理您的AI绘画工作流节点

ComfyUI-Manager终极指南:轻松管理您的AI绘画工作流节点 【免费下载链接】ComfyUI-Manager ComfyUI-Manager is an extension designed to enhance the usability of ComfyUI. It offers management functions to install, remove, disable, and enable various cus…...

基于LLM与向量数据库构建个人知识管理智能代理:从原理到实践

1. 项目概述:一个面向个人知识管理的智能代理 最近在折腾个人知识管理(PKM)系统,发现了一个挺有意思的开源项目: lessthanno/engram-agent 。简单来说,这是一个“记忆代理”,它旨在成为你数字…...

为什么选择QtScrcpy?3大突破性特性让Android投屏焕然一新

为什么选择QtScrcpy?3大突破性特性让Android投屏焕然一新 【免费下载链接】QtScrcpy Android real-time display control software 项目地址: https://gitcode.com/GitHub_Trending/qt/QtScrcpy QtScrcpy是一款基于Qt框架开发的Android设备实时投屏控制软件&…...

基于Tauri+React+TS构建跨平台开发者效率工具:集成AI编程与Git Worktree

1. 项目概述:一个为现代开发者打造的桌面效率工具 如果你和我一样,每天的工作流都离不开终端、代码编辑器和各种AI助手,那你一定也经历过这种场景:在多个项目间频繁切换,终端里塞满了十几个标签页,想找个昨…...

一文扫盲人工智能全体系,从入门到进阶,新手也能不迷路

文章目录前言一、先搞懂:AI到底是个啥?别再把大模型当AI全部了1.1 从“假智能”到“真智能”:神经网络的革命1.2 AI的三大发展阶段:从弱人工智能到超人工智能二、AI核心技术栈拆解:从基础到进阶,一层一层讲…...

CANN/atvoss Muls算子样例

Muls算子样例 【免费下载链接】atvoss ATVOSS(Ascend C Templates for Vector Operator Subroutines)是一套基于Ascend C开发的Vector算子库,致力于为昇腾硬件上的Vector类融合算子提供极简、高效、高性能、高拓展的编程方式。 项目地址: h…...

为OpenClaw智能体工作流配置Taotoken作为可靠模型供应商

🚀 告别海外账号与网络限制!稳定直连全球优质大模型,限时半价接入中。 👉 点击领取海量免费额度 为OpenClaw智能体工作流配置Taotoken作为可靠模型供应商 在构建基于OpenClaw的智能体工作流时,一个稳定、可管理的模型…...

大模型API聚合服务:一站式解决多模型接入难题

1. 项目概述:一站式大模型API聚合服务如果你正在开发一个需要集成多种大语言模型的应用,比如一个智能客服系统、一个内容创作工具,或者一个AI研究平台,那么你大概率会遇到一个非常现实的问题:对接不同厂商的API&#x…...

CANN/triton-ge-backend性能调优方法论

性能调优方法论 【免费下载链接】triton-inference-server-ge-backend ge-backend基于triton inference server框架实现对接NPU生态,快速实现传统CV\NLP等模型的服务化。 项目地址: https://gitcode.com/cann/triton-inference-server-ge-backend 模型优化有…...

AI算法在多市场环境下的合谋机制与市场分配策略研究

1. 项目概述:当AI成为市场中的“隐形玩家”最近几年,我身边不少做量化交易、电商定价或者供应链优化的朋友,都开始频繁地讨论一个话题:我们部署的AI优化算法,会不会在不知不觉中“串通”起来,形成一种新型的…...

开源项目赞助管理平台Sponsio:自托管部署与核心架构解析

1. 项目概述:SponsioLabs/Sponsio 是什么?如果你在开源社区里泡过一段时间,肯定对“用爱发电”这个词不陌生。开发者们投入大量时间、精力,甚至金钱,维护一个项目,却常常面临一个现实问题:如何获…...

Animal-AI环境:用AI复现动物认知实验的虚拟实验室

1. 项目概述:当AI遇见动物智能最近几年,AI领域最激动人心的进展之一,就是智能体(Agent)在复杂环境中的决策与学习能力。从AlphaGo到AlphaStar,再到各种游戏AI,我们见证了算法在特定规则下的卓越…...

数字孪生如何破解AI预测性维护的可解释性与泛化难题

1. 项目概述:当数字孪生遇见AI预测性维护在工业界摸爬滚打十几年,我亲眼见证了维护策略从“坏了再修”到“定期保养”,再到如今炙手可热的“预测性维护”的演进。预测性维护(Predictive Maintenance, PMx)的核心愿景很…...

10x-Agent-Loop:突破AI编程助手配额限制的智能缓存与调度方案

1. 项目概述与核心价值最近在开发者社区里,一个名为“10x-Agent-Loop”的工具讨论热度挺高。简单来说,它瞄准了一个非常具体的痛点:当你深度依赖像Cursor或Windsurf这类AI编程助手时,经常会遇到一个天花板——请求配额限制。无论是…...

ATB RingMLA C++示例

加速库RingMLA C Demo 【免费下载链接】ascend-transformer-boost 本项目是CANN提供的是一款高效、可靠的Transformer加速库,基于华为Ascend AI处理器,提供Transformer定制化场景的高性能融合算子。 项目地址: https://gitcode.com/cann/ascend-transf…...

Python 爬虫高级实战:网盘资源信息批量爬虫开发

前言 在互联网资源分发场景中,网盘已成为文档、教程、软件、影视、学习资料等资源的核心存储与分发载体。海量公开网盘资源分散在各类资源站点、论坛、分享页面中,依靠人工逐条检索、整理链接效率极低,且难以实现批量汇总、分类归档与失效链接筛查。依托 Python 开发网盘资…...

艺术史视角下的生成式AI审美:从风格谱系到技术认知的深度解析

1. 项目概述:当艺术史遇见生成式AI最近和几位做艺术策展的朋友聊天,他们都在抱怨同一个问题:现在用AI生成的“艺术品”越来越多,但很多作品看起来就是一堆流行元素的拼贴,缺乏真正的“灵魂”。这让我开始思考一个更深层…...

泊松分布实战指南:从原理到异常检测的工程落地

1. 什么是泊松分布?——一个数据从业者每天都在用、却未必真正吃透的概率工具你有没有算过,过去一小时里你的邮箱收到了几封新邮件?上个月车间里产线上出现了几个次品?过去24小时网站服务器收到了多少次API请求?这些数…...

Fortran学习笔记

这是我之前学习Fortran时做到笔记,分享出来当个备份!Fortran是一门非常古老的编程语言,但是至今依然有人在使用。建议利用闲暇时间学习!1、编译命令: g95 –c a.f90:将a.f90编译为名为a.o的目标文件。 g95 h.f90&#…...

AI跨学科扩散62年文献计量分析:从计算机科学到生物医学、社会科学的融合路径与未来趋势

1. 项目概述:从海量文献中洞察AI的融合之路最近几年,AI(人工智能)这个词几乎无处不在,从写代码到画图,从自动驾驶到药物研发,它像水银泻地一样渗透进各个角落。但你是否想过,这种“渗…...

网络异质性如何影响AI竞赛中的安全与创新平衡

1. 项目概述:一场关于“网络生态”的AI竞赛 最近和几个做AI安全的朋友聊天,大家不约而同地提到了一个现象:现在很多AI竞赛,无论是安全攻防赛还是创新应用赛,参赛团队的背景越来越“杂”。你可能会遇到一支队伍&#xf…...

Poetry依赖管理:用SAT求解器终结Python版本冲突

1. 为什么我三年前就停用 pip venv,转而把 Poetry 当成 Python 项目的“呼吸系统”你有没有经历过这样的深夜:凌晨两点,服务器上一个本该稳如老狗的 Flask API 突然报ImportError: cannot import name AsyncGenerator;你翻遍代码…...

深度学习超分辨率技术加速SEM材料表征:原理、实践与16倍效率提升

1. 项目概述:当深度学习遇见扫描电镜在材料科学的研究一线,尤其是金属微观结构分析领域,扫描电子显微镜(SEM)是我们观察材料“内在世界”的得力工具。然而,一个长期困扰我们的矛盾是:高分辨率与…...

OpenClaw安全审计:AI驱动的自动化配置检查与隐私保护实践

1. 项目概述与核心价值 最近在折腾我的 OpenClaw 机器人,这玩意儿功能是越来越强大了,能接各种消息渠道,还能调用五花八门的工具。但功能一多,配置就复杂,安全问题也跟着冒头。比如,你是不是也担心过 API 密…...

基于MCP协议构建AI与Telegram的智能连接桥梁

1. 项目概述:一个连接AI与即时通讯的桥梁 最近在折腾AI应用开发,特别是想让大语言模型(LLM)能直接操作外部工具,比如发个消息、查个天气。这让我接触到了 Model Context Protocol ,也就是MCP。简单来说&…...

Claude Code用户如何配置Taotoken解决密钥不稳定与额度不足问题

🚀 告别海外账号与网络限制!稳定直连全球优质大模型,限时半价接入中。 👉 点击领取海量免费额度 Claude Code用户如何配置Taotoken解决密钥不稳定与额度不足问题 1. 理解Claude Code的API配置机制 Claude Code作为一款编程辅助工…...

基于角色的AI能力框架:重塑工程教育中的人机协作新范式

1. 项目概述:当AI遇见工程教育,我们需要怎样的“角色”?最近和几位在高校任教的朋友聊天,他们不约而同地提到了一个共同的困惑:ChatGPT、Copilot这些工具,学生们用得很溜,但老师们却有点“跟不上…...

CANN/hixl LLM集群信息文档

LLMClusterInfo 【免费下载链接】hixl HIXL(Huawei Xfer Library)是一个灵活、高效的昇腾单边通信库,面向集群场景提供简单、可靠、高效的点对点数据传输能力。 项目地址: https://gitcode.com/cann/hixl 产品支持情况 产品是否支持A…...

CANN/cannbot-skills FA调用完整代码示例

FA 调用完整代码示例 【免费下载链接】cannbot-skills CANNBot 是面向 CANN 开发的用于提升开发效率的系列智能体,本仓库为其提供可复用的 Skills 模块。 项目地址: https://gitcode.com/cann/cannbot-skills 基于仓库中已有模型的实际调用,按模式…...