RxJava操作符变换过程
要使用Rxjava首先要导入两个包,其中rxandroid是rxjava在android中的扩展
implementation 'io.reactivex:rxandroid:1.2.1'implementation 'io.reactivex:rxjava:1.2.0'
我们在使用rxjava的操作符时都觉得很方便,但是rxjava是怎么实现操作符的转换呢,以下面的代码进行分析
String host = "https://baidu.com/";Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("message");subscriber.onCompleted();}}).map(new Func1<String, String>() {@Overridepublic String call(String s) {return host+s;}}).subscribe(new Subscriber<String>() {@Overridepublic void onCompleted() {ILog.LogDebug("subscriber onCompleted is come in");}@Overridepublic void onError(Throwable e) {ILog.LogDebug("subscriber onError is come in");}@Overridepublic void onNext(String s) {ILog.LogDebug(s);}});
上面的代码是链式调用,为了方便理解,我把上面的代码拆分成了下面样式
String host = "https://baidu.com/";Observable<String> obs1 = Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("message");subscriber.onCompleted();}});Observable<String> obs2 = obs1.map(new Func1<String, String>() {@Overridepublic String call(String s) {return host+s;}});obs2.subscribe(new Subscriber<String>() {@Overridepublic void onCompleted() {ILog.LogDebug("subscriberLast onCompleted is come in");}@Overridepublic void onError(Throwable e) {ILog.LogDebug("subscriberLast onError is come in");}@Overridepublic void onNext(String s) {ILog.LogDebug(s);}});
上面代码会打印,我们将一步一步分析,打印是怎么来的
https://www.baidu.com/message
subscriberLast onCompleted is come in
obs1是我们的原始Observable, obs2是我们变换过的Observable
首先从obs1的创建开始,就是Observable的创建过程,有不理解的可以先看RXJava的创建订阅过程
在osb1的创建过程create函数需要一个OnSubscribe对象,为了方便理解,就暂不拆开来写,代码虽然用的是匿名对象,我们暂且叫它onSubscribe1
Observable<String> obs1 = Observable.create(new Observable.OnSubscribe<String>() { //onSubscribe1@Overridepublic void call(Subscriber<? super String> subscriber) { subscriber.onNext("observable call onNext0");subscriber.onStart();subscriber.onNext("observable call onNext");subscriber.onCompleted();subscriber.onNext("observable call onNext1");}});
obs1创建成功后调用了map方法,map方法又返回了一个Observable,就是我们的obs2,同理,map方法的参数我们叫它func1
Observable<String> obs2 = obs1.map(new Func1<String, String>() { //func1@Overridepublic String call(String s) {return host+s;}});
随后我们又使用了osb2进行了订阅,同理subscribe方法的参数我们叫它subscriberLast
obs2.subscribe(new Subscriber<String>() { //subscriberLast@Overridepublic void onCompleted() {ILog.LogDebug("subscriber onCompleted is come in");}@Overridepublic void onError(Throwable e) {ILog.LogDebug("subscriber onError is come in");}@Overridepublic void onNext(String s) {ILog.LogDebug("subscriber onNext is come in s = "+s);}});
于是有了下面的关系,到这里还都很好理解
然后我们来分析obs1.map调用了map方法,rxjava做了什么
public class Observable<T> {
....public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {return create(new OnSubscribeMap<T, R>(this, func));}public static <T> Observable<T> create(OnSubscribe<T> f) {return new Observable<T>(RxJavaHooks.onCreate(f));}
....
}
map方法又调用了create方法,create方法的调用我们已经在RxJava的调用过程中讲过,传入的参数是一个OnSubscribe对象,OnSubscribeMap实现了OnSubscribe接口,我们把new出来的OnSubscribeMap对象暂时叫做onSubscribeMap1构造方法传入的参数是obs1和func1, 那我们再来看OnSubscribeMap类
public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {final Observable<T> source;final Func1<? super T, ? extends R> transformer;public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {this.source = source;this.transformer = transformer;}@Overridepublic void call(final Subscriber<? super R> o) {MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);o.add(parent);source.unsafeSubscribe(parent);}static final class MapSubscriber<T, R> extends Subscriber<T> {final Subscriber<? super R> actual;final Func1<? super T, ? extends R> mapper;boolean done;public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {this.actual = actual;this.mapper = mapper;}@Overridepublic void onNext(T t) {R result;try {result = mapper.call(t);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);unsubscribe();onError(OnErrorThrowable.addValueAsLastCause(ex, t));return;}actual.onNext(result);}@Overridepublic void onError(Throwable e) {if (done) {RxJavaHooks.onError(e);return;}done = true;actual.onError(e);}@Overridepublic void onCompleted() {if (done) {return;}actual.onCompleted();}@Overridepublic void setProducer(Producer p) {actual.setProducer(p);}}}
我们知道在Observable创建好后,调用了subscribe方法就可以进行订阅了,最后调用的也是Observable创建时传入的OnSubscribe对象的call方法,以obs1的创建举例,也就是我们这里的onSubscribe1,不懂得去看RxJava的调用过程
因为我们最后执行subscribe订阅方法的是obs2那么也就会调用obs2的OnSubscribe对象,那么obs2的OnSubscribe对象是谁呢,就是onSubscribeMap1那么执行完订阅就会调用onSubscribeMap1的call方法
@Overridepublic void call(final Subscriber<? super R> o) {MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);o.add(parent);source.unsafeSubscribe(parent);}
这里的source是OnSubscribeMap构造方法调用时初始化的也就是obs1,transformer 是我们的func1
public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {this.source = source;this.transformer = transformer;}
现在又有了下面的关系
在onSubscribeMap1的call方法中,一共有三行代码,这三行代码很重要,我们一行一行分析
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
实例化一个MapSubscriber对象parent ,MapSubscriber继承了Subscriber,所以MapSubscriber也是一个观察者;parent 持有了o,o是也就是subscriberLast,还持有了transformer,也就是func1。这里一会要详细分析
o.add(parent);
o是一个Subscriber,也是subscriberLast,上面的代码直接把parent添加到了subscriberLast的SubscriptionList列表。这里一会还要说一下。在看下一条代码
source.unsafeSubscribe(parent);
我们知道source也就是我们的obs1,obs1的订阅操作就是在这里发生的。至于为什么用unsafeSubscribe方法, 我们一会在分析
我们重新梳理一下,
1、obs2的订阅方法subscribe执行导致了obs2的onSubscribeMap1实例的call方法被执行;
2、onSubscribeMap1的call方法中又执行了obs1的订阅;obs1的观察者就是parent;
3、obs1的订阅必然会导致obs1 的onSubscribe实例onSubscribe1的call方法被执行。
4、在onSubscribe1的call方法中我们又调用了
subscriber.onNext("message");subscriber.onCompleted();
这里的subscriber也就是parent,必然会调用parent的next方法并传入message,和onCompleted。
5、继续分析parent
static final class MapSubscriber<T, R> extends Subscriber<T> {final Subscriber<? super R> actual;final Func1<? super T, ? extends R> mapper;boolean done;public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {this.actual = actual;this.mapper = mapper;}@Overridepublic void onNext(T t) {R result;try {result = mapper.call(t);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);unsubscribe();onError(OnErrorThrowable.addValueAsLastCause(ex, t));return;}actual.onNext(result);}@Overridepublic void onError(Throwable e) {if (done) {RxJavaHooks.onError(e);return;}done = true;actual.onError(e);}@Overridepublic void onCompleted() {if (done) {return;}actual.onCompleted();}@Overridepublic void setProducer(Producer p) {actual.setProducer(p);}}
parent是MapSubscriber,MapSubscriber继承了Subscriber,MapSubscriber的构造方法需要两个参数Subscriber和Func1,通过之前的分析,知道actual就是subscriberLast,mapper就是fun1
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {this.actual = actual;this.mapper = mapper;}
用一张图来说明下

static final class MapSubscriber<T, R> extends Subscriber<T> {.....@Overridepublic void onNext(T t) {R result;try {result = mapper.call(t); //这里调用,mapper就是fun1,的call方法,这里的t就是message,result就是转换后的字符串} catch (Throwable ex) {Exceptions.throwIfFatal(ex);unsubscribe();onError(OnErrorThrowable.addValueAsLastCause(ex, t));return;}actual.onNext(result);}.....
}
在parent的onnext方法中调用了func1的call方法,还记得我们在func1的call方法中写的什么,没错就是转换字符串,call方法的返回值就是转换后的字符串
final String host = "https://www.baidu.com/";Observable<String> obs2 = obs1.map(new Func1<String, String>() {@Overridepublic String call(String s) {return host+s; //func1的call方法 进行了字符串转换,这里的s就是message}});
继续调用了
actual.onNext(result);
我们知道actual就是subscriberLast,所以会调用subscriberLast的onNext方法,
obs2.subscribe(new Subscriber<String>() {@Overridepublic void onCompleted() {ILog.LogDebug("subscriberLast onCompleted is come in");}@Overridepublic void onError(Throwable e) {ILog.LogDebug("subscriber onError is come in");}@Overridepublic void onNext(String s) { //subscriberLast的onNext方法ILog.LogDebug(s);//最后会打印https://www.baidu.com/message}@Overridepublic void onStart() {super.onStart();}});
在obs1中我们还调用了onCompleted方法,先调用了actual.onCompleted();
actual也就是subscriberLast实例。
@Overridepublic void onCompleted() {if (done) {return;}actual.onCompleted();}
在这添加一张图方便更好的理解
简单的说就是先调用obs1的subscriber的onNext()方法,在onNext()方法中调用func1的call方法,处理数据源数据,然后再把处理过的数据源发射给obs2的subscriber
还记不记得上面的第二条代码 o.add(parent);为什么要把parent添加到o中呢,o也就是subscriberLast。
@Overridepublic void call(final Subscriber<? super R> o) {MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);o.add(parent);//第二条代码source.unsafeSubscribe(parent);//第三条代码}
subscriberLast的add方法最终会把parent添加到subscriberLast的SubscriptionList中,关于Subscriber请看RxJava中的Subscriber。再执行SubscriptionList的解绑方法unsubscribe会把subscriptions中的Subscription一并解绑,也就是会把parent和obs1的绑定关系解除。
那上面第三条代码为什么调用的是unsafeSubscribe方法呢,记得我们之前分析RxJava的订阅过程中看到的是最后包装了一个SafeSubscriber,再SafeSubscriber中会进行一些多线程的处理操作。
/** See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls* to user code from within an Observer"*/// if not already wrappedif (!(subscriber instanceof SafeSubscriber)) {// assign to `observer` so we return the protected versionsubscriber = new SafeSubscriber<T>(subscriber);}
observable中unsafeSubscribe方法也很简单,也没并由对传入的subscriber进行包装而是直接调用。
public class Observable<T> {
....
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {try {// new Subscriber so onStart itsubscriber.onStart();// allow the hook to intercept and/or decorateRxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);return RxJavaHooks.onObservableReturn(subscriber);} catch (Throwable e) {// special handling for certain Throwable/Error/Exception typesExceptions.throwIfFatal(e);// if an unhandled error occurs executing the onSubscribe we will propagate ittry {subscriber.onError(RxJavaHooks.onObservableError(e));} catch (Throwable e2) {Exceptions.throwIfFatal(e2);// if this happens it means the onError itself failed (perhaps an invalid function implementation)// so we are unable to propagate the error correctly and will just throwRuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);// TODO could the hook be the cause of the error in the on error handling.RxJavaHooks.onObservableError(r);// TODO why aren't we throwing the hook's return value.throw r; // NOPMD}return Subscriptions.unsubscribed();}}
....
我们回过来再看MapSubscriber中其实也已经进行了简单的处理工作,所以也就不需要使用SafeSubscriber了。
至此RxJava操作符变换过程就分析完了,欢迎大家补充和纠正。
相关文章:
RxJava操作符变换过程
要使用Rxjava首先要导入两个包,其中rxandroid是rxjava在android中的扩展 implementation io.reactivex:rxandroid:1.2.1implementation io.reactivex:rxjava:1.2.0我们在使用rxjava的操作符时都觉得很方便,但是rxjava是怎么实现操作符的转换呢࿰…...
开放平台订单接口
custom-自定义API操作 注册开通 taobao.custom 公共参数 名称 类型 必须 描述 key String 是 调用key(必须以GET方式拼接在URL中) secret String 是 调用密钥 api_name String 是 API接口名称(包括在请求地址中&a…...
CDN相关知识点
1、什么是CDN?CDN的作用是什么? CDN(Content Delivery Network,内容分发网络)是一种通过在多个节点上分布内容以提高网络性能、可靠性和可扩展性的网络解决方案。CDN通过在不同的地理位置部署服务器,使用户…...
【论文阅读】注意力机制与二维 TSP 问题
前置知识 注意力机制 见 这篇 二维 TSP 问题 给定二维平面上 nnn 个点的坐标 S{xi}i1nS\{x_i\}_{i1}^nS{xi}i1n,其中 xi∈[0,1]2x_i\in [0,1]^2xi∈[0,1]2,要找到一个 1∼n1\sim n1∼n 的排列 π\piπ ,使得目标函数 L(π∣s)∥xπ…...
[深入理解SSD系列 闪存实战2.1.7] NAND FLASH基本编程(写)操作及原理_NAND FLASH Program Operation 源码实现
前言 上面是我使用的NAND FLASH的硬件原理图,面对这些引脚,很难明白他们是什么含义, 下面先来个热身: 问1. 原理图上NAND FLASH只有数据线,怎么传输地址? 答1.在DATA0~DATA7上既传输数据,又传输地址 当ALE为高电平时传输的是地址, 问2. 从N...
PMP项目管理项目资源管理
目录1 项目资源管理概述2 规划资源管理3 估算活动资源4 获取资源5 建设团队6 管理团队7 控制资源1 项目资源管理概述 项目资源管理包括识别、获取和管理所需资源以成功完成项目的各个过程,这些过程有助于确保项目经理和项目团队在正确的时间和地点使用正确的资源。…...
程序的编译和链接
程序的编译和链接程序的编译和链接程序的两种环境翻译环境详解编译和链接预处理编译汇编链接运行环境程序的编译和链接 程序的两种环境 在ANSI C的任何一种实现中,存在两个不同的环境。 第1种是翻译环境,在这个环境中源代码被转换为可执行的机器指令。 …...
Win11的两个实用技巧系列之无法联网怎么办、耳机没声音的多种解决办法
Win11无法联网怎么办? win11安装后设备不能上网的解决办法Win11无法联网怎么办?电脑安装win11系统以后,发现不能上网,连接不上网络,该怎么办呢?下面我们就来看看win11安装后设备不能上网的解决办法Win11安装后&#x…...
【微信小程序】-- 自定义组件 - 数据监听器 - 案例 (三十五)
💌 所属专栏:【微信小程序开发教程】 😀 作 者:我是夜阑的狗🐶 🚀 个人简介:一个正在努力学技术的CV工程师,专注基础和实战分享 ,欢迎咨询! &…...
Linux - 第7节 - 进程间通信
1.进程间通信介绍 进程间通信目的: 数据传输:一个进程需要将它的数据发送给另一个进程 。 资源共享:多个进程之间共享同样的资源。 通知事件:一个进程需要向另一个或一组进程发送消息,通…...
# 数据完整性算法在shell及python中的实践
数据完整性算法在shell及python中的实践 文章目录数据完整性算法在shell及python中的实践1 预备知识1.1 摘要算法1.2 报文(数据)完整性校验1.3 python byte类型字符串与普通字符串区别2 传统方法(散列函数)2.1 在shell中实践2.2 在…...
QEMU启动x86-Linux内核
目录QEMU简介linux启动流程我的环境安装QEMU软件包安装源码安装编译linux内核编译busybox制作initramfs使用QEMU启动linux内核简化命令参考QEMU简介 QEMU(quick emulator)是一个通用的、开源的硬件模拟器,可以模拟不同硬件架构(如…...
C/C++每日一练(20230311)
目录 1. 计算阶乘的和 ★ 2. 基本计算器 ★★★ 3. N皇后 II ★★★ 🌟 每日一练刷题专栏 C/C 每日一练 专栏 Python 每日一练 专栏 1. 计算阶乘的和 计算:1!-2!3!-4!5!-6!7!-8!9!-10!,并输出计算结果。 注意:不全是…...
哪个牌子的洗地机耐用?耐用的洗地机推荐
作为当下非常热销的洗地机,它不仅解放了双手,使用也非常的便捷。是生活品质提高的最好代表,但是面对市面上让人眼花缭乱的洗地机,挑选几个来回都决定不了到底入手哪个好!为了能帮助大家选购到合适的洗地机,…...
搭建一个中心化的定时服务
1. 背景 在物联网络,很多设备之间都在进行交互,其中云端在远程交流中起到了很重要的作用。比如,一台设备想进行调温,但是需要知道此时房间的温度,那就需要定时去查询传感器测出来的房间温度,如果温度过高&a…...
【CSS】快速入门笔记
视频链接:https://www.bilibili.com/video/BV1mS4y1Z7Ga/?spm_id_from333.999.0.0&vd_source1ad00d913eae8281cbadad6ae66fb06c 文章目录一、CSS语法1.结构2.样式类型1)内联样式 Inline Style2)内部样式 Internal Style3)外部…...
第161篇 笔记-去中心化的含义
本文主要内容来自Vitalik Buterin的文章。“去中心化”这个词是在加密经济学领域用得最多的一个词,通常也作为辨别区块链的依据。然而,这个词也可能是被定义得最不恰当的一个词。数千小时的研究和价值数十亿美元哈希算力的投入都旨在实现去中心化&#x…...
「计算机组成原理」数据的表示和运算(二)
文章目录五、奇偶校验码六、算术逻辑单元ALU6.1 电路的基本原理6.2 加法器的设计6.2.1 一位全加器6.2.2 串行加法器6.2.3 串行进位的并行加法器6.2.4 并行进位的并行加法器七、补码加减运算器八、标志位的生成九、定点数的移位运算9.1 算数移位9.2 逻辑移位9.3 循环移位五、奇偶…...
建立自己的博客
环境安装: w10系统安装 第一步:安装git Git 官网: https://git-scm.com/ 第二步:安装Node.js Node.js官网:https://nodejs.org/zh-cn/ 使用cmd检测: node -v 第三步:安装Hexo Hexo官网:htt…...
Docker 安装mysql Mac 环境下
已安装桌面端 Docker (Mac安装Docker) 安装方式一 打开链接 https://www.docker.com/products/docker-desktop 选择平台下载 安装方式二 安装homebrew /usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/m…...
uniapp 对接腾讯云IM群组成员管理(增删改查)
UniApp 实战:腾讯云IM群组成员管理(增删改查) 一、前言 在社交类App开发中,群组成员管理是核心功能之一。本文将基于UniApp框架,结合腾讯云IM SDK,详细讲解如何实现群组成员的增删改查全流程。 权限校验…...
QT: `long long` 类型转换为 `QString` 2025.6.5
在 Qt 中,将 long long 类型转换为 QString 可以通过以下两种常用方法实现: 方法 1:使用 QString::number() 直接调用 QString 的静态方法 number(),将数值转换为字符串: long long value 1234567890123456789LL; …...
DeepSeek 技术赋能无人农场协同作业:用 AI 重构农田管理 “神经网”
目录 一、引言二、DeepSeek 技术大揭秘2.1 核心架构解析2.2 关键技术剖析 三、智能农业无人农场协同作业现状3.1 发展现状概述3.2 协同作业模式介绍 四、DeepSeek 的 “农场奇妙游”4.1 数据处理与分析4.2 作物生长监测与预测4.3 病虫害防治4.4 农机协同作业调度 五、实际案例大…...
今日学习:Spring线程池|并发修改异常|链路丢失|登录续期|VIP过期策略|数值类缓存
文章目录 优雅版线程池ThreadPoolTaskExecutor和ThreadPoolTaskExecutor的装饰器并发修改异常并发修改异常简介实现机制设计原因及意义 使用线程池造成的链路丢失问题线程池导致的链路丢失问题发生原因 常见解决方法更好的解决方法设计精妙之处 登录续期登录续期常见实现方式特…...
SiFli 52把Imagie图片,Font字体资源放在指定位置,编译成指定img.bin和font.bin的问题
分区配置 (ptab.json) img 属性介绍: img 属性指定分区存放的 image 名称,指定的 image 名称必须是当前工程生成的 binary 。 如果 binary 有多个文件,则以 proj_name:binary_name 格式指定文件名, proj_name 为工程 名&…...
华为OD机考-机房布局
import java.util.*;public class DemoTest5 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseSystem.out.println(solve(in.nextLine()));}}priv…...
代码规范和架构【立芯理论一】(2025.06.08)
1、代码规范的目标 代码简洁精炼、美观,可持续性好高效率高复用,可移植性好高内聚,低耦合没有冗余规范性,代码有规可循,可以看出自己当时的思考过程特殊排版,特殊语法,特殊指令,必须…...
如何应对敏捷转型中的团队阻力
应对敏捷转型中的团队阻力需要明确沟通敏捷转型目的、提升团队参与感、提供充分的培训与支持、逐步推进敏捷实践、建立清晰的奖励和反馈机制。其中,明确沟通敏捷转型目的尤为关键,团队成员只有清晰理解转型背后的原因和利益,才能降低对变化的…...
Kubernetes 节点自动伸缩(Cluster Autoscaler)原理与实践
在 Kubernetes 集群中,如何在保障应用高可用的同时有效地管理资源,一直是运维人员和开发者关注的重点。随着微服务架构的普及,集群内各个服务的负载波动日趋明显,传统的手动扩缩容方式已无法满足实时性和弹性需求。 Cluster Auto…...
若依登录用户名和密码加密
/*** 获取公钥:前端用来密码加密* return*/GetMapping("/getPublicKey")public RSAUtil.RSAKeyPair getPublicKey() {return RSAUtil.rsaKeyPair();}新建RSAUti.Java package com.ruoyi.common.utils;import org.apache.commons.codec.binary.Base64; im…...
