RxJava操作符变换过程
要使用Rxjava首先要导入两个包,其中rxandroid是rxjava在android中的扩展
implementation 'io.reactivex:rxandroid:1.2.1'implementation 'io.reactivex:rxjava:1.2.0'
我们在使用rxjava的操作符时都觉得很方便,但是rxjava是怎么实现操作符的转换呢,以下面的代码进行分析
String host = "https://baidu.com/";Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("message");subscriber.onCompleted();}}).map(new Func1<String, String>() {@Overridepublic String call(String s) {return host+s;}}).subscribe(new Subscriber<String>() {@Overridepublic void onCompleted() {ILog.LogDebug("subscriber onCompleted is come in");}@Overridepublic void onError(Throwable e) {ILog.LogDebug("subscriber onError is come in");}@Overridepublic void onNext(String s) {ILog.LogDebug(s);}});
上面的代码是链式调用,为了方便理解,我把上面的代码拆分成了下面样式
String host = "https://baidu.com/";Observable<String> obs1 = Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("message");subscriber.onCompleted();}});Observable<String> obs2 = obs1.map(new Func1<String, String>() {@Overridepublic String call(String s) {return host+s;}});obs2.subscribe(new Subscriber<String>() {@Overridepublic void onCompleted() {ILog.LogDebug("subscriberLast onCompleted is come in");}@Overridepublic void onError(Throwable e) {ILog.LogDebug("subscriberLast onError is come in");}@Overridepublic void onNext(String s) {ILog.LogDebug(s);}});
上面代码会打印,我们将一步一步分析,打印是怎么来的
https://www.baidu.com/message
subscriberLast onCompleted is come in
obs1是我们的原始Observable, obs2是我们变换过的Observable
首先从obs1的创建开始,就是Observable的创建过程,有不理解的可以先看RXJava的创建订阅过程
在osb1的创建过程create函数需要一个OnSubscribe对象,为了方便理解,就暂不拆开来写,代码虽然用的是匿名对象,我们暂且叫它onSubscribe1
Observable<String> obs1 = Observable.create(new Observable.OnSubscribe<String>() { //onSubscribe1@Overridepublic void call(Subscriber<? super String> subscriber) { subscriber.onNext("observable call onNext0");subscriber.onStart();subscriber.onNext("observable call onNext");subscriber.onCompleted();subscriber.onNext("observable call onNext1");}});
obs1创建成功后调用了map方法,map方法又返回了一个Observable,就是我们的obs2,同理,map方法的参数我们叫它func1
Observable<String> obs2 = obs1.map(new Func1<String, String>() { //func1@Overridepublic String call(String s) {return host+s;}});
随后我们又使用了osb2进行了订阅,同理subscribe方法的参数我们叫它subscriberLast
obs2.subscribe(new Subscriber<String>() { //subscriberLast@Overridepublic void onCompleted() {ILog.LogDebug("subscriber onCompleted is come in");}@Overridepublic void onError(Throwable e) {ILog.LogDebug("subscriber onError is come in");}@Overridepublic void onNext(String s) {ILog.LogDebug("subscriber onNext is come in s = "+s);}});
于是有了下面的关系,到这里还都很好理解
然后我们来分析obs1.map调用了map方法,rxjava做了什么
public class Observable<T> {
....public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {return create(new OnSubscribeMap<T, R>(this, func));}public static <T> Observable<T> create(OnSubscribe<T> f) {return new Observable<T>(RxJavaHooks.onCreate(f));}
....
}
map方法又调用了create方法,create方法的调用我们已经在RxJava的调用过程中讲过,传入的参数是一个OnSubscribe对象,OnSubscribeMap实现了OnSubscribe接口,我们把new出来的OnSubscribeMap对象暂时叫做onSubscribeMap1构造方法传入的参数是obs1和func1, 那我们再来看OnSubscribeMap类
public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {final Observable<T> source;final Func1<? super T, ? extends R> transformer;public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {this.source = source;this.transformer = transformer;}@Overridepublic void call(final Subscriber<? super R> o) {MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);o.add(parent);source.unsafeSubscribe(parent);}static final class MapSubscriber<T, R> extends Subscriber<T> {final Subscriber<? super R> actual;final Func1<? super T, ? extends R> mapper;boolean done;public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {this.actual = actual;this.mapper = mapper;}@Overridepublic void onNext(T t) {R result;try {result = mapper.call(t);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);unsubscribe();onError(OnErrorThrowable.addValueAsLastCause(ex, t));return;}actual.onNext(result);}@Overridepublic void onError(Throwable e) {if (done) {RxJavaHooks.onError(e);return;}done = true;actual.onError(e);}@Overridepublic void onCompleted() {if (done) {return;}actual.onCompleted();}@Overridepublic void setProducer(Producer p) {actual.setProducer(p);}}}
我们知道在Observable创建好后,调用了subscribe方法就可以进行订阅了,最后调用的也是Observable创建时传入的OnSubscribe对象的call方法,以obs1的创建举例,也就是我们这里的onSubscribe1,不懂得去看RxJava的调用过程
因为我们最后执行subscribe订阅方法的是obs2那么也就会调用obs2的OnSubscribe对象,那么obs2的OnSubscribe对象是谁呢,就是onSubscribeMap1那么执行完订阅就会调用onSubscribeMap1的call方法
@Overridepublic void call(final Subscriber<? super R> o) {MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);o.add(parent);source.unsafeSubscribe(parent);}
这里的source是OnSubscribeMap构造方法调用时初始化的也就是obs1,transformer 是我们的func1
public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {this.source = source;this.transformer = transformer;}
现在又有了下面的关系
在onSubscribeMap1的call方法中,一共有三行代码,这三行代码很重要,我们一行一行分析
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
实例化一个MapSubscriber对象parent ,MapSubscriber继承了Subscriber,所以MapSubscriber也是一个观察者;parent 持有了o,o是也就是subscriberLast,还持有了transformer,也就是func1。这里一会要详细分析
o.add(parent);
o是一个Subscriber,也是subscriberLast,上面的代码直接把parent添加到了subscriberLast的SubscriptionList列表。这里一会还要说一下。在看下一条代码
source.unsafeSubscribe(parent);
我们知道source也就是我们的obs1,obs1的订阅操作就是在这里发生的。至于为什么用unsafeSubscribe方法, 我们一会在分析
我们重新梳理一下,
1、obs2的订阅方法subscribe执行导致了obs2的onSubscribeMap1实例的call方法被执行;
2、onSubscribeMap1的call方法中又执行了obs1的订阅;obs1的观察者就是parent;
3、obs1的订阅必然会导致obs1 的onSubscribe实例onSubscribe1的call方法被执行。
4、在onSubscribe1的call方法中我们又调用了
subscriber.onNext("message");subscriber.onCompleted();
这里的subscriber也就是parent,必然会调用parent的next方法并传入message,和onCompleted。
5、继续分析parent
static final class MapSubscriber<T, R> extends Subscriber<T> {final Subscriber<? super R> actual;final Func1<? super T, ? extends R> mapper;boolean done;public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {this.actual = actual;this.mapper = mapper;}@Overridepublic void onNext(T t) {R result;try {result = mapper.call(t);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);unsubscribe();onError(OnErrorThrowable.addValueAsLastCause(ex, t));return;}actual.onNext(result);}@Overridepublic void onError(Throwable e) {if (done) {RxJavaHooks.onError(e);return;}done = true;actual.onError(e);}@Overridepublic void onCompleted() {if (done) {return;}actual.onCompleted();}@Overridepublic void setProducer(Producer p) {actual.setProducer(p);}}
parent是MapSubscriber,MapSubscriber继承了Subscriber,MapSubscriber的构造方法需要两个参数Subscriber和Func1,通过之前的分析,知道actual就是subscriberLast,mapper就是fun1
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {this.actual = actual;this.mapper = mapper;}
用一张图来说明下
static final class MapSubscriber<T, R> extends Subscriber<T> {.....@Overridepublic void onNext(T t) {R result;try {result = mapper.call(t); //这里调用,mapper就是fun1,的call方法,这里的t就是message,result就是转换后的字符串} catch (Throwable ex) {Exceptions.throwIfFatal(ex);unsubscribe();onError(OnErrorThrowable.addValueAsLastCause(ex, t));return;}actual.onNext(result);}.....
}
在parent的onnext方法中调用了func1的call方法,还记得我们在func1的call方法中写的什么,没错就是转换字符串,call方法的返回值就是转换后的字符串
final String host = "https://www.baidu.com/";Observable<String> obs2 = obs1.map(new Func1<String, String>() {@Overridepublic String call(String s) {return host+s; //func1的call方法 进行了字符串转换,这里的s就是message}});
继续调用了
actual.onNext(result);
我们知道actual就是subscriberLast,所以会调用subscriberLast的onNext方法,
obs2.subscribe(new Subscriber<String>() {@Overridepublic void onCompleted() {ILog.LogDebug("subscriberLast onCompleted is come in");}@Overridepublic void onError(Throwable e) {ILog.LogDebug("subscriber onError is come in");}@Overridepublic void onNext(String s) { //subscriberLast的onNext方法ILog.LogDebug(s);//最后会打印https://www.baidu.com/message}@Overridepublic void onStart() {super.onStart();}});
在obs1中我们还调用了onCompleted方法,先调用了actual.onCompleted();
actual也就是subscriberLast实例。
@Overridepublic void onCompleted() {if (done) {return;}actual.onCompleted();}
在这添加一张图方便更好的理解
简单的说就是先调用obs1的subscriber的onNext()方法,在onNext()方法中调用func1的call方法,处理数据源数据,然后再把处理过的数据源发射给obs2的subscriber
还记不记得上面的第二条代码 o.add(parent);为什么要把parent添加到o中呢,o也就是subscriberLast。
@Overridepublic void call(final Subscriber<? super R> o) {MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);o.add(parent);//第二条代码source.unsafeSubscribe(parent);//第三条代码}
subscriberLast的add方法最终会把parent添加到subscriberLast的SubscriptionList中,关于Subscriber请看RxJava中的Subscriber。再执行SubscriptionList的解绑方法unsubscribe会把subscriptions中的Subscription一并解绑,也就是会把parent和obs1的绑定关系解除。
那上面第三条代码为什么调用的是unsafeSubscribe方法呢,记得我们之前分析RxJava的订阅过程中看到的是最后包装了一个SafeSubscriber,再SafeSubscriber中会进行一些多线程的处理操作。
/** See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls* to user code from within an Observer"*/// if not already wrappedif (!(subscriber instanceof SafeSubscriber)) {// assign to `observer` so we return the protected versionsubscriber = new SafeSubscriber<T>(subscriber);}
observable中unsafeSubscribe方法也很简单,也没并由对传入的subscriber进行包装而是直接调用。
public class Observable<T> {
....
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {try {// new Subscriber so onStart itsubscriber.onStart();// allow the hook to intercept and/or decorateRxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);return RxJavaHooks.onObservableReturn(subscriber);} catch (Throwable e) {// special handling for certain Throwable/Error/Exception typesExceptions.throwIfFatal(e);// if an unhandled error occurs executing the onSubscribe we will propagate ittry {subscriber.onError(RxJavaHooks.onObservableError(e));} catch (Throwable e2) {Exceptions.throwIfFatal(e2);// if this happens it means the onError itself failed (perhaps an invalid function implementation)// so we are unable to propagate the error correctly and will just throwRuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);// TODO could the hook be the cause of the error in the on error handling.RxJavaHooks.onObservableError(r);// TODO why aren't we throwing the hook's return value.throw r; // NOPMD}return Subscriptions.unsubscribed();}}
....
我们回过来再看MapSubscriber中其实也已经进行了简单的处理工作,所以也就不需要使用SafeSubscriber了。
至此RxJava操作符变换过程就分析完了,欢迎大家补充和纠正。
相关文章:

RxJava操作符变换过程
要使用Rxjava首先要导入两个包,其中rxandroid是rxjava在android中的扩展 implementation io.reactivex:rxandroid:1.2.1implementation io.reactivex:rxjava:1.2.0我们在使用rxjava的操作符时都觉得很方便,但是rxjava是怎么实现操作符的转换呢࿰…...

开放平台订单接口
custom-自定义API操作 注册开通 taobao.custom 公共参数 名称 类型 必须 描述 key String 是 调用key(必须以GET方式拼接在URL中) secret String 是 调用密钥 api_name String 是 API接口名称(包括在请求地址中&a…...
CDN相关知识点
1、什么是CDN?CDN的作用是什么? CDN(Content Delivery Network,内容分发网络)是一种通过在多个节点上分布内容以提高网络性能、可靠性和可扩展性的网络解决方案。CDN通过在不同的地理位置部署服务器,使用户…...

【论文阅读】注意力机制与二维 TSP 问题
前置知识 注意力机制 见 这篇 二维 TSP 问题 给定二维平面上 nnn 个点的坐标 S{xi}i1nS\{x_i\}_{i1}^nS{xi}i1n,其中 xi∈[0,1]2x_i\in [0,1]^2xi∈[0,1]2,要找到一个 1∼n1\sim n1∼n 的排列 π\piπ ,使得目标函数 L(π∣s)∥xπ…...

[深入理解SSD系列 闪存实战2.1.7] NAND FLASH基本编程(写)操作及原理_NAND FLASH Program Operation 源码实现
前言 上面是我使用的NAND FLASH的硬件原理图,面对这些引脚,很难明白他们是什么含义, 下面先来个热身: 问1. 原理图上NAND FLASH只有数据线,怎么传输地址? 答1.在DATA0~DATA7上既传输数据,又传输地址 当ALE为高电平时传输的是地址, 问2. 从N...

PMP项目管理项目资源管理
目录1 项目资源管理概述2 规划资源管理3 估算活动资源4 获取资源5 建设团队6 管理团队7 控制资源1 项目资源管理概述 项目资源管理包括识别、获取和管理所需资源以成功完成项目的各个过程,这些过程有助于确保项目经理和项目团队在正确的时间和地点使用正确的资源。…...

程序的编译和链接
程序的编译和链接程序的编译和链接程序的两种环境翻译环境详解编译和链接预处理编译汇编链接运行环境程序的编译和链接 程序的两种环境 在ANSI C的任何一种实现中,存在两个不同的环境。 第1种是翻译环境,在这个环境中源代码被转换为可执行的机器指令。 …...

Win11的两个实用技巧系列之无法联网怎么办、耳机没声音的多种解决办法
Win11无法联网怎么办? win11安装后设备不能上网的解决办法Win11无法联网怎么办?电脑安装win11系统以后,发现不能上网,连接不上网络,该怎么办呢?下面我们就来看看win11安装后设备不能上网的解决办法Win11安装后&#x…...

【微信小程序】-- 自定义组件 - 数据监听器 - 案例 (三十五)
💌 所属专栏:【微信小程序开发教程】 😀 作 者:我是夜阑的狗🐶 🚀 个人简介:一个正在努力学技术的CV工程师,专注基础和实战分享 ,欢迎咨询! &…...
Linux - 第7节 - 进程间通信
1.进程间通信介绍 进程间通信目的: 数据传输:一个进程需要将它的数据发送给另一个进程 。 资源共享:多个进程之间共享同样的资源。 通知事件:一个进程需要向另一个或一组进程发送消息,通…...

# 数据完整性算法在shell及python中的实践
数据完整性算法在shell及python中的实践 文章目录数据完整性算法在shell及python中的实践1 预备知识1.1 摘要算法1.2 报文(数据)完整性校验1.3 python byte类型字符串与普通字符串区别2 传统方法(散列函数)2.1 在shell中实践2.2 在…...

QEMU启动x86-Linux内核
目录QEMU简介linux启动流程我的环境安装QEMU软件包安装源码安装编译linux内核编译busybox制作initramfs使用QEMU启动linux内核简化命令参考QEMU简介 QEMU(quick emulator)是一个通用的、开源的硬件模拟器,可以模拟不同硬件架构(如…...

C/C++每日一练(20230311)
目录 1. 计算阶乘的和 ★ 2. 基本计算器 ★★★ 3. N皇后 II ★★★ 🌟 每日一练刷题专栏 C/C 每日一练 专栏 Python 每日一练 专栏 1. 计算阶乘的和 计算:1!-2!3!-4!5!-6!7!-8!9!-10!,并输出计算结果。 注意:不全是…...

哪个牌子的洗地机耐用?耐用的洗地机推荐
作为当下非常热销的洗地机,它不仅解放了双手,使用也非常的便捷。是生活品质提高的最好代表,但是面对市面上让人眼花缭乱的洗地机,挑选几个来回都决定不了到底入手哪个好!为了能帮助大家选购到合适的洗地机,…...

搭建一个中心化的定时服务
1. 背景 在物联网络,很多设备之间都在进行交互,其中云端在远程交流中起到了很重要的作用。比如,一台设备想进行调温,但是需要知道此时房间的温度,那就需要定时去查询传感器测出来的房间温度,如果温度过高&a…...

【CSS】快速入门笔记
视频链接:https://www.bilibili.com/video/BV1mS4y1Z7Ga/?spm_id_from333.999.0.0&vd_source1ad00d913eae8281cbadad6ae66fb06c 文章目录一、CSS语法1.结构2.样式类型1)内联样式 Inline Style2)内部样式 Internal Style3)外部…...

第161篇 笔记-去中心化的含义
本文主要内容来自Vitalik Buterin的文章。“去中心化”这个词是在加密经济学领域用得最多的一个词,通常也作为辨别区块链的依据。然而,这个词也可能是被定义得最不恰当的一个词。数千小时的研究和价值数十亿美元哈希算力的投入都旨在实现去中心化&#x…...

「计算机组成原理」数据的表示和运算(二)
文章目录五、奇偶校验码六、算术逻辑单元ALU6.1 电路的基本原理6.2 加法器的设计6.2.1 一位全加器6.2.2 串行加法器6.2.3 串行进位的并行加法器6.2.4 并行进位的并行加法器七、补码加减运算器八、标志位的生成九、定点数的移位运算9.1 算数移位9.2 逻辑移位9.3 循环移位五、奇偶…...

建立自己的博客
环境安装: w10系统安装 第一步:安装git Git 官网: https://git-scm.com/ 第二步:安装Node.js Node.js官网:https://nodejs.org/zh-cn/ 使用cmd检测: node -v 第三步:安装Hexo Hexo官网:htt…...

Docker 安装mysql Mac 环境下
已安装桌面端 Docker (Mac安装Docker) 安装方式一 打开链接 https://www.docker.com/products/docker-desktop 选择平台下载 安装方式二 安装homebrew /usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/m…...
3403. 从盒子中找出字典序最大的字符串 I
3403. 从盒子中找出字典序最大的字符串 I 题目链接:3403. 从盒子中找出字典序最大的字符串 I 代码如下: class Solution { public:string answerString(string word, int numFriends) {if (numFriends 1) {return word;}string res;for (int i 0;i &…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...

【Linux系统】Linux环境变量:系统配置的隐形指挥官
。# Linux系列 文章目录 前言一、环境变量的概念二、常见的环境变量三、环境变量特点及其相关指令3.1 环境变量的全局性3.2、环境变量的生命周期 四、环境变量的组织方式五、C语言对环境变量的操作5.1 设置环境变量:setenv5.2 删除环境变量:unsetenv5.3 遍历所有环境…...

elementUI点击浏览table所选行数据查看文档
项目场景: table按照要求特定的数据变成按钮可以点击 解决方案: <el-table-columnprop"mlname"label"名称"align"center"width"180"><template slot-scope"scope"><el-buttonv-if&qu…...

【LeetCode】算法详解#6 ---除自身以外数组的乘积
1.题目介绍 给定一个整数数组 nums,返回 数组 answer ,其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法,且在 O…...
C++ 类基础:封装、继承、多态与多线程模板实现
前言 C 是一门强大的面向对象编程语言,而类(Class)作为其核心特性之一,是理解和使用 C 的关键。本文将深入探讨 C 类的基本特性,包括封装、继承和多态,同时讨论类中的权限控制,并展示如何使用类…...

智警杯备赛--excel模块
数据透视与图表制作 创建步骤 创建 1.在Excel的插入或者数据标签页下找到数据透视表的按钮 2.将数据放进“请选择单元格区域“中,点击确定 这是最终结果,但是由于环境启不了,这里用的是自己的excel,真实的环境中的excel根据实训…...

GC1808:高性能音频ADC的卓越之选
在音频处理领域,高质量的音频模数转换器(ADC)是实现精准音频数字化的关键。GC1808,一款96kHz、24bit立体声音频ADC,以其卓越的性能和高性价比脱颖而出,成为众多音频设备制造商的理想选择。 GC1808集成了64倍…...

可下载旧版app屏蔽更新的app市场
软件介绍 手机用久了,app越来越臃肿,老手机卡顿成常态。这里给大家推荐个改善老手机使用体验的方法,还能帮我们卸载不需要的app。 手机现状 如今的app不断更新,看似在优化,实则内存占用越来越大,对手机性…...

2025年上海市“星光计划”第十一届职业院校技能大赛 网络安全赛项技能操作模块样题
2025年上海市“星光计划”第十一届职业院校技能大赛 网络安全赛项技能操作模块样题 (二)模块 A:安全事件响应、网络安全数据取证、应用安全、系统安全任务一:漏洞扫描与利用:任务二:Windows 操作系统渗透测试 :任务三&…...