当前位置: 首页 > news >正文

reactor框架使用时,数据流请求流程

1. 我们在Flux打开时,可以看到

      public abstract class Flux<T> implements CorePublisher<T> {

2. 

public interface CorePublisher<T> extends Publisher<T> {void subscribe(CoreSubscriber<? super T> subscriber);
}

Publisher的关键时有个subscribe方法。这个方法就是在reactor的subscribe的时候会调用到这里。

3. 这里subscribe的参数有个CoreSubscriber。其实这个方法基本上最终其实会调用到CoreSubscriber的onSubscribe方法。

随便看对void subscribe(CoreSubscriber<? super T> subscriber);的方法的实现

选择Flux举例。看到最终调用结果:

4. 那么onSubscribe怎么调用下游呢?

注意,void onSubscribe(Subscription s);里面又传入了Subscription。

public interface Subscription {public void request(long n);public void cancel();
}

以Flux.just(1),为例,其实所有的onSubscribe方法会调用到Subscription的request方法。

5. request方法,最后调用到了onnext方法

所以数据流程如下:

subscribe()->subscribe(CoreSubscriber<? super T> subscriber)->CoreSubscriber.onSubscribe->Subscription.request(n)->CoreSubscriber.onNext()

6. 因此,onNext的调用前,数据的准备可以在Subscription.request的方法逻辑中内部进行准备,当准备好了再调用onNext方法。如果是分批request的,也就是说CoreSubscriber.onSubscribe逻辑中是分批次调用Subscription.request(n),则每个批次的Subscription.request(n)中都可以等待数据好了再调用onNext方法。

7. 也就是说,数据调用onNext之前,都可以准备好再调用。但是一旦onNext调用以后,就尽量不能阻塞住后续流程了。如果后续流程中有阻塞的情况,就要用publishon和subscribeon了,让阻塞的内容在单独的线程池中执行。

8. 对zipWith方法的理解。摘自chatgpt。不清楚是否正确

如果某个 Flux 中的数据项尚未准备好,zipWith 会挂起合并操作,直到另一个 Flux 中的数据也准备好为止。只要 zipWith 中的两个流的每一对数据项都准备好了,它才会触发 onNext()

因此这里其实Reactor框架其实netty线程还是在做其他的事情,当都准备好了,才会利用netty线程,进行onNext的处理

相关文章:

reactor框架使用时,数据流请求流程

1. 我们在Flux打开时&#xff0c;可以看到 public abstract class Flux<T> implements CorePublisher<T> { 2. public interface CorePublisher<T> extends Publisher<T> {void subscribe(CoreSubscriber<? super T> subscriber); } Publish…...

读西瓜书的数学准备

1&#xff0c;高等数学&#xff1a;会求偏导数就行 2&#xff0c;线性代数&#xff1a;会矩阵运算就行 参考&#xff1a;线性代数--矩阵基本计算&#xff08;加减乘法&#xff09;_矩阵运算-CSDN博客 3&#xff0c;概率论与数理统计&#xff1a;知道啥是随机变量就行...

摄像头模块如何应用在宠物产品领域

一、宠物监控类产品 家庭宠物远程监控摄像头 1.基本功能与原理&#xff1a;这类摄像头可以通过 Wi - Fi 连接到家庭网络&#xff0c;主人可以使用手机应用程序在任何有网络连接的地方查看宠物的实时画面。摄像头模块内置有图像传感器&#xff0c;能够捕捉光线并将其转换为数字…...

c++学习第七天

创作过程中难免有不足&#xff0c;若您发现本文内容有误&#xff0c;恳请不吝赐教。 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案例可供参考。 一、const成员函数 //Date.h#pragma once#include<iostream> using namespace std;class Date { public:Date…...

Ubuntu 24.04 LTS 通过 docker 安装 nextcloud 搭建个人网盘

准备 Ubuntu 24.04 LTSUbuntu 空闲硬盘挂载Ubuntu 安装 Docker DesktopUbuntu 24.04 LTS 安装 tailscale [我的Ubuntu服务器折腾集](https://blog.csdn.net/jh1513/article/details/145222679。 安装 nextcloud 参考 Ubuntu24.04系统Docker安装NextcloudOnlyoffice _。 更…...

RabbitMQ1-消息队列

目录 MQ的相关概念 什么是MQ 为什么要用MQ MQ的分类 MQ的选择 RabbitMQ RabbitMQ的概念 四大核心概念 RabbitMQ的核心部分 各个名词介绍 MQ的相关概念 什么是MQ MQ(message queue)&#xff0c;从字面意思上看&#xff0c;本质是个队列&#xff0c;FIFO 先入先出&am…...

Open3D计算点云粗糙度(方法一)【2025最新版】

目录 一、Roughness二、代码实现三、结果展示博客长期更新,本文最近更新时间为:2025年1月18日。 一、Roughness 通过菜单栏的Tools > Other > Roughness找到该功能。 这个工具可以估计点云的“粗糙度”。 选择一个或几个点云,然后启动这个工具。 CloudCompare只会询问…...

算法6(力扣148)-排序链表

1、问题 给你链表的头结点 head &#xff0c;请将其按 升序 排列并返回 排序后的链表 。 2、采用例子 输入&#xff1a;head [4,2,1,3] 输出&#xff1a;[1,2,3,4] 3、实现思路 将链表拆分成节点&#xff0c;存入数组使用sort排序&#xff0c;再用reduce重建链接 4、具…...

一文大白话讲清楚webpack基本使用——9——预加载之prefetch和preload以及webpackChunkName的使用

文章目录 一文大白话讲清楚webpack基本使用——9——预加载之prefetch和preload1. 建议按文章顺序从头看&#xff0c;一看到底&#xff0c;豁然开朗2. preload和prefetch的区别2. prefetch的使用3. preload的使用4. webpackChunkName 一文大白话讲清楚webpack基本使用——9——…...

【大数据2025】MapReduce

MapReduce 基础介绍 起源与发展&#xff1a;是 2004 年 10 月谷歌发表的 MAPREDUCE 论文的开源实现&#xff0c;最初用于大规模网页数据并行处理&#xff0c;现成为 Hadoop 核心子项目之一&#xff0c;是面向批处理的分布式计算框架。基本原理&#xff1a;分为 map 和 reduce …...

Windows图形界面(GUI)-QT-C/C++ - Qt List Widget详解与应用

公开视频 -> 链接点击跳转公开课程博客首页 -> ​​​链接点击跳转博客主页 目录 QListWidget概述 使用场景 常见样式 QListWidget属性设置 显示方式 (Display) 交互行为 (Interaction) 高级功能 (Advanced) QListWidget常见操作 内容处理 增加项目 删除项目…...

深度学习python基础(第二节) 分支语句和循环语句

本节主要介绍分支语句和循环语句的基本语法。 注意&#xff1a;在python中的作用域以缩进为准。有语言基础的很好理解&#xff0c;了解语法格式就可以。 布尔类型和比较运算符 # True真,False假 a True print(f"布尔变量a的内容是:{a},类型是:{type(a)}") 比较运算…...

Gin 源码概览 - 路由

本文基于gin 1.1 源码解读 https://github.com/gin-gonic/gin/archive/refs/tags/v1.1.zip 1. 注册路由 我们先来看一段gin代码&#xff0c;来看看最终得到的一颗路由树长啥样 func TestGinDocExp(t *testing.T) {engine : gin.Default()engine.GET("/api/user", f…...

第6章 ThreadGroup详细讲解(Java高并发编程详解:多线程与系统设计)

1.ThreadGroup 与 Thread 在Java程序中&#xff0c; 默认情况下&#xff0c; 新的线程都会被加入到main线程所在的group中&#xff0c; main线程的group名字同线程名。如同线程存在父子关系一样&#xff0c; Thread Group同样也存在父子关系。图6-1就很好地说明了父子thread、父…...

CentOS 7乱码问题如何解决?

1.使用超级用户操作: sudo su2.修改i18n配置文件&#xff1a; vi /etc/sysconfig/i18n将文件修改或添加为以下内容&#xff1a; LANG"zh_CN.UTF8" LC_ALL"zh_CN.UTF8"保存并退出&#xff08;按Esc键&#xff0c;输入:wq&#xff0c;然后回车&#xff09…...

JavaScript语言的多线程编程

JavaScript语言的多线程编程 JavaScript是一种广泛使用的编程语言&#xff0c;主要用于网页开发。由于其单线程的特性&#xff0c;JavaScript 一直以来都有“无法进行多线程编程”的印象。尽管如此&#xff0c;随着技术的发展&#xff0c;JavaScript也逐渐引入了多线程的概念&…...

OpenSeaOtter使用手册-变更通知和持续部署

我们在OpenSeaOtter Server 0.1.1版本增加的镜像变更通知功能。通过镜像变更通知和OpenSeaOtter Agent就可以轻松获得持续部署能力。 镜像变更通知是通过push的方式下发到Agent的&#xff0c;Agent所在机器不需要外网地址。在Agent收到镜像变更通知后&#xff0c;就会调用对应的…...

(2)STM32 USB设备开发-USB虚拟串口

例程&#xff1a;STM32USBdevice: 基于STM32的USB设备例子程序 - Gitee.com 本篇为USB虚拟串口教程&#xff0c;没有知识&#xff0c;全是实操&#xff0c;按照步骤就能获得一个STM32的USB虚拟串口。本例子是在野火F103MINI开发板上验证的&#xff0c;如果代码中出现一些外设的…...

他把智能科技引入现代农业领域

江苏田倍丰农业科技有限公司&#xff08;以下简称“田倍丰”&#xff09;是一家专注于粮油种植的农业科技公司&#xff0c;为拥有300亩以上田地的大户提供全面的解决方案。田倍丰通过与当地政府合作&#xff0c;将土地承包给大户&#xff0c;并提供农资和技术&#xff0c;实现利…...

ingress-nginx代理tcp使其能外部访问mysql

一、helm部署mysql主从复制 helm repo add bitnami https://charts.bitnami.com/bitnami helm repo updatehelm pull bitnami/mysql 解压后编辑values.yaml文件&#xff0c;修改如下&#xff08;storageclass已设置默认类&#xff09; 117 ## param architecture MySQL archit…...

大数据学习栈记——Neo4j的安装与使用

本文介绍图数据库Neofj的安装与使用&#xff0c;操作系统&#xff1a;Ubuntu24.04&#xff0c;Neofj版本&#xff1a;2025.04.0。 Apt安装 Neofj可以进行官网安装&#xff1a;Neo4j Deployment Center - Graph Database & Analytics 我这里安装是添加软件源的方法 最新版…...

Unity3D中Gfx.WaitForPresent优化方案

前言 在Unity中&#xff0c;Gfx.WaitForPresent占用CPU过高通常表示主线程在等待GPU完成渲染&#xff08;即CPU被阻塞&#xff09;&#xff0c;这表明存在GPU瓶颈或垂直同步/帧率设置问题。以下是系统的优化方案&#xff1a; 对惹&#xff0c;这里有一个游戏开发交流小组&…...

MFC内存泄露

1、泄露代码示例 void X::SetApplicationBtn() {CMFCRibbonApplicationButton* pBtn GetApplicationButton();// 获取 Ribbon Bar 指针// 创建自定义按钮CCustomRibbonAppButton* pCustomButton new CCustomRibbonAppButton();pCustomButton->SetImage(IDB_BITMAP_Jdp26)…...

测试markdown--肇兴

day1&#xff1a; 1、去程&#xff1a;7:04 --11:32高铁 高铁右转上售票大厅2楼&#xff0c;穿过候车厅下一楼&#xff0c;上大巴车 &#xffe5;10/人 **2、到达&#xff1a;**12点多到达寨子&#xff0c;买门票&#xff0c;美团/抖音&#xff1a;&#xffe5;78人 3、中饭&a…...

ardupilot 开发环境eclipse 中import 缺少C++

目录 文章目录 目录摘要1.修复过程摘要 本节主要解决ardupilot 开发环境eclipse 中import 缺少C++,无法导入ardupilot代码,会引起查看不方便的问题。如下图所示 1.修复过程 0.安装ubuntu 软件中自带的eclipse 1.打开eclipse—Help—install new software 2.在 Work with中…...

Mac下Android Studio扫描根目录卡死问题记录

环境信息 操作系统: macOS 15.5 (Apple M2芯片)Android Studio版本: Meerkat Feature Drop | 2024.3.2 Patch 1 (Build #AI-243.26053.27.2432.13536105, 2025年5月22日构建) 问题现象 在项目开发过程中&#xff0c;提示一个依赖外部头文件的cpp源文件需要同步&#xff0c;点…...

佰力博科技与您探讨热释电测量的几种方法

热释电的测量主要涉及热释电系数的测定&#xff0c;这是表征热释电材料性能的重要参数。热释电系数的测量方法主要包括静态法、动态法和积分电荷法。其中&#xff0c;积分电荷法最为常用&#xff0c;其原理是通过测量在电容器上积累的热释电电荷&#xff0c;从而确定热释电系数…...

【Linux系统】Linux环境变量:系统配置的隐形指挥官

。# Linux系列 文章目录 前言一、环境变量的概念二、常见的环境变量三、环境变量特点及其相关指令3.1 环境变量的全局性3.2、环境变量的生命周期 四、环境变量的组织方式五、C语言对环境变量的操作5.1 设置环境变量&#xff1a;setenv5.2 删除环境变量:unsetenv5.3 遍历所有环境…...

Kubernetes 节点自动伸缩(Cluster Autoscaler)原理与实践

在 Kubernetes 集群中&#xff0c;如何在保障应用高可用的同时有效地管理资源&#xff0c;一直是运维人员和开发者关注的重点。随着微服务架构的普及&#xff0c;集群内各个服务的负载波动日趋明显&#xff0c;传统的手动扩缩容方式已无法满足实时性和弹性需求。 Cluster Auto…...

在鸿蒙HarmonyOS 5中使用DevEco Studio实现指南针功能

指南针功能是许多位置服务应用的基础功能之一。下面我将详细介绍如何在HarmonyOS 5中使用DevEco Studio实现指南针功能。 1. 开发环境准备 确保已安装DevEco Studio 3.1或更高版本确保项目使用的是HarmonyOS 5.0 SDK在项目的module.json5中配置必要的权限 2. 权限配置 在mo…...