RxJava操作符变换过程
要使用Rxjava首先要导入两个包,其中rxandroid是rxjava在android中的扩展
implementation 'io.reactivex:rxandroid:1.2.1'implementation 'io.reactivex:rxjava:1.2.0'
我们在使用rxjava的操作符时都觉得很方便,但是rxjava是怎么实现操作符的转换呢,以下面的代码进行分析
String host = "https://baidu.com/";Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("message");subscriber.onCompleted();}}).map(new Func1<String, String>() {@Overridepublic String call(String s) {return host+s;}}).subscribe(new Subscriber<String>() {@Overridepublic void onCompleted() {ILog.LogDebug("subscriber onCompleted is come in");}@Overridepublic void onError(Throwable e) {ILog.LogDebug("subscriber onError is come in");}@Overridepublic void onNext(String s) {ILog.LogDebug(s);}});
上面的代码是链式调用,为了方便理解,我把上面的代码拆分成了下面样式
String host = "https://baidu.com/";Observable<String> obs1 = Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("message");subscriber.onCompleted();}});Observable<String> obs2 = obs1.map(new Func1<String, String>() {@Overridepublic String call(String s) {return host+s;}});obs2.subscribe(new Subscriber<String>() {@Overridepublic void onCompleted() {ILog.LogDebug("subscriberLast onCompleted is come in");}@Overridepublic void onError(Throwable e) {ILog.LogDebug("subscriberLast onError is come in");}@Overridepublic void onNext(String s) {ILog.LogDebug(s);}});
上面代码会打印,我们将一步一步分析,打印是怎么来的
https://www.baidu.com/message
subscriberLast onCompleted is come in
obs1是我们的原始Observable, obs2是我们变换过的Observable
首先从obs1的创建开始,就是Observable的创建过程,有不理解的可以先看RXJava的创建订阅过程
在osb1的创建过程create函数需要一个OnSubscribe对象,为了方便理解,就暂不拆开来写,代码虽然用的是匿名对象,我们暂且叫它onSubscribe1
Observable<String> obs1 = Observable.create(new Observable.OnSubscribe<String>() { //onSubscribe1@Overridepublic void call(Subscriber<? super String> subscriber) { subscriber.onNext("observable call onNext0");subscriber.onStart();subscriber.onNext("observable call onNext");subscriber.onCompleted();subscriber.onNext("observable call onNext1");}});
obs1创建成功后调用了map方法,map方法又返回了一个Observable,就是我们的obs2,同理,map方法的参数我们叫它func1
Observable<String> obs2 = obs1.map(new Func1<String, String>() { //func1@Overridepublic String call(String s) {return host+s;}});
随后我们又使用了osb2进行了订阅,同理subscribe方法的参数我们叫它subscriberLast
obs2.subscribe(new Subscriber<String>() { //subscriberLast@Overridepublic void onCompleted() {ILog.LogDebug("subscriber onCompleted is come in");}@Overridepublic void onError(Throwable e) {ILog.LogDebug("subscriber onError is come in");}@Overridepublic void onNext(String s) {ILog.LogDebug("subscriber onNext is come in s = "+s);}});
于是有了下面的关系,到这里还都很好理解
然后我们来分析obs1.map调用了map方法,rxjava做了什么
public class Observable<T> {
....public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {return create(new OnSubscribeMap<T, R>(this, func));}public static <T> Observable<T> create(OnSubscribe<T> f) {return new Observable<T>(RxJavaHooks.onCreate(f));}
....
}
map方法又调用了create方法,create方法的调用我们已经在RxJava的调用过程中讲过,传入的参数是一个OnSubscribe对象,OnSubscribeMap实现了OnSubscribe接口,我们把new出来的OnSubscribeMap对象暂时叫做onSubscribeMap1构造方法传入的参数是obs1和func1, 那我们再来看OnSubscribeMap类
public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {final Observable<T> source;final Func1<? super T, ? extends R> transformer;public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {this.source = source;this.transformer = transformer;}@Overridepublic void call(final Subscriber<? super R> o) {MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);o.add(parent);source.unsafeSubscribe(parent);}static final class MapSubscriber<T, R> extends Subscriber<T> {final Subscriber<? super R> actual;final Func1<? super T, ? extends R> mapper;boolean done;public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {this.actual = actual;this.mapper = mapper;}@Overridepublic void onNext(T t) {R result;try {result = mapper.call(t);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);unsubscribe();onError(OnErrorThrowable.addValueAsLastCause(ex, t));return;}actual.onNext(result);}@Overridepublic void onError(Throwable e) {if (done) {RxJavaHooks.onError(e);return;}done = true;actual.onError(e);}@Overridepublic void onCompleted() {if (done) {return;}actual.onCompleted();}@Overridepublic void setProducer(Producer p) {actual.setProducer(p);}}}
我们知道在Observable创建好后,调用了subscribe方法就可以进行订阅了,最后调用的也是Observable创建时传入的OnSubscribe对象的call方法,以obs1的创建举例,也就是我们这里的onSubscribe1,不懂得去看RxJava的调用过程
因为我们最后执行subscribe订阅方法的是obs2那么也就会调用obs2的OnSubscribe对象,那么obs2的OnSubscribe对象是谁呢,就是onSubscribeMap1那么执行完订阅就会调用onSubscribeMap1的call方法
@Overridepublic void call(final Subscriber<? super R> o) {MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);o.add(parent);source.unsafeSubscribe(parent);}
这里的source是OnSubscribeMap构造方法调用时初始化的也就是obs1,transformer 是我们的func1
public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {this.source = source;this.transformer = transformer;}
现在又有了下面的关系
在onSubscribeMap1的call方法中,一共有三行代码,这三行代码很重要,我们一行一行分析
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
实例化一个MapSubscriber对象parent ,MapSubscriber继承了Subscriber,所以MapSubscriber也是一个观察者;parent 持有了o,o是也就是subscriberLast,还持有了transformer,也就是func1。这里一会要详细分析
o.add(parent);
o是一个Subscriber,也是subscriberLast,上面的代码直接把parent添加到了subscriberLast的SubscriptionList列表。这里一会还要说一下。在看下一条代码
source.unsafeSubscribe(parent);
我们知道source也就是我们的obs1,obs1的订阅操作就是在这里发生的。至于为什么用unsafeSubscribe方法, 我们一会在分析
我们重新梳理一下,
1、obs2的订阅方法subscribe执行导致了obs2的onSubscribeMap1实例的call方法被执行;
2、onSubscribeMap1的call方法中又执行了obs1的订阅;obs1的观察者就是parent;
3、obs1的订阅必然会导致obs1 的onSubscribe实例onSubscribe1的call方法被执行。
4、在onSubscribe1的call方法中我们又调用了
subscriber.onNext("message");subscriber.onCompleted();
这里的subscriber也就是parent,必然会调用parent的next方法并传入message,和onCompleted。
5、继续分析parent
static final class MapSubscriber<T, R> extends Subscriber<T> {final Subscriber<? super R> actual;final Func1<? super T, ? extends R> mapper;boolean done;public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {this.actual = actual;this.mapper = mapper;}@Overridepublic void onNext(T t) {R result;try {result = mapper.call(t);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);unsubscribe();onError(OnErrorThrowable.addValueAsLastCause(ex, t));return;}actual.onNext(result);}@Overridepublic void onError(Throwable e) {if (done) {RxJavaHooks.onError(e);return;}done = true;actual.onError(e);}@Overridepublic void onCompleted() {if (done) {return;}actual.onCompleted();}@Overridepublic void setProducer(Producer p) {actual.setProducer(p);}}
parent是MapSubscriber,MapSubscriber继承了Subscriber,MapSubscriber的构造方法需要两个参数Subscriber和Func1,通过之前的分析,知道actual就是subscriberLast,mapper就是fun1
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {this.actual = actual;this.mapper = mapper;}
用一张图来说明下

static final class MapSubscriber<T, R> extends Subscriber<T> {.....@Overridepublic void onNext(T t) {R result;try {result = mapper.call(t); //这里调用,mapper就是fun1,的call方法,这里的t就是message,result就是转换后的字符串} catch (Throwable ex) {Exceptions.throwIfFatal(ex);unsubscribe();onError(OnErrorThrowable.addValueAsLastCause(ex, t));return;}actual.onNext(result);}.....
}
在parent的onnext方法中调用了func1的call方法,还记得我们在func1的call方法中写的什么,没错就是转换字符串,call方法的返回值就是转换后的字符串
final String host = "https://www.baidu.com/";Observable<String> obs2 = obs1.map(new Func1<String, String>() {@Overridepublic String call(String s) {return host+s; //func1的call方法 进行了字符串转换,这里的s就是message}});
继续调用了
actual.onNext(result);
我们知道actual就是subscriberLast,所以会调用subscriberLast的onNext方法,
obs2.subscribe(new Subscriber<String>() {@Overridepublic void onCompleted() {ILog.LogDebug("subscriberLast onCompleted is come in");}@Overridepublic void onError(Throwable e) {ILog.LogDebug("subscriber onError is come in");}@Overridepublic void onNext(String s) { //subscriberLast的onNext方法ILog.LogDebug(s);//最后会打印https://www.baidu.com/message}@Overridepublic void onStart() {super.onStart();}});
在obs1中我们还调用了onCompleted方法,先调用了actual.onCompleted();
actual也就是subscriberLast实例。
@Overridepublic void onCompleted() {if (done) {return;}actual.onCompleted();}
在这添加一张图方便更好的理解
简单的说就是先调用obs1的subscriber的onNext()方法,在onNext()方法中调用func1的call方法,处理数据源数据,然后再把处理过的数据源发射给obs2的subscriber
还记不记得上面的第二条代码 o.add(parent);为什么要把parent添加到o中呢,o也就是subscriberLast。
@Overridepublic void call(final Subscriber<? super R> o) {MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);o.add(parent);//第二条代码source.unsafeSubscribe(parent);//第三条代码}
subscriberLast的add方法最终会把parent添加到subscriberLast的SubscriptionList中,关于Subscriber请看RxJava中的Subscriber。再执行SubscriptionList的解绑方法unsubscribe会把subscriptions中的Subscription一并解绑,也就是会把parent和obs1的绑定关系解除。
那上面第三条代码为什么调用的是unsafeSubscribe方法呢,记得我们之前分析RxJava的订阅过程中看到的是最后包装了一个SafeSubscriber,再SafeSubscriber中会进行一些多线程的处理操作。
/** See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls* to user code from within an Observer"*/// if not already wrappedif (!(subscriber instanceof SafeSubscriber)) {// assign to `observer` so we return the protected versionsubscriber = new SafeSubscriber<T>(subscriber);}
observable中unsafeSubscribe方法也很简单,也没并由对传入的subscriber进行包装而是直接调用。
public class Observable<T> {
....
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {try {// new Subscriber so onStart itsubscriber.onStart();// allow the hook to intercept and/or decorateRxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);return RxJavaHooks.onObservableReturn(subscriber);} catch (Throwable e) {// special handling for certain Throwable/Error/Exception typesExceptions.throwIfFatal(e);// if an unhandled error occurs executing the onSubscribe we will propagate ittry {subscriber.onError(RxJavaHooks.onObservableError(e));} catch (Throwable e2) {Exceptions.throwIfFatal(e2);// if this happens it means the onError itself failed (perhaps an invalid function implementation)// so we are unable to propagate the error correctly and will just throwRuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);// TODO could the hook be the cause of the error in the on error handling.RxJavaHooks.onObservableError(r);// TODO why aren't we throwing the hook's return value.throw r; // NOPMD}return Subscriptions.unsubscribed();}}
....
我们回过来再看MapSubscriber中其实也已经进行了简单的处理工作,所以也就不需要使用SafeSubscriber了。
至此RxJava操作符变换过程就分析完了,欢迎大家补充和纠正。
相关文章:
RxJava操作符变换过程
要使用Rxjava首先要导入两个包,其中rxandroid是rxjava在android中的扩展 implementation io.reactivex:rxandroid:1.2.1implementation io.reactivex:rxjava:1.2.0我们在使用rxjava的操作符时都觉得很方便,但是rxjava是怎么实现操作符的转换呢࿰…...
开放平台订单接口
custom-自定义API操作 注册开通 taobao.custom 公共参数 名称 类型 必须 描述 key String 是 调用key(必须以GET方式拼接在URL中) secret String 是 调用密钥 api_name String 是 API接口名称(包括在请求地址中&a…...
CDN相关知识点
1、什么是CDN?CDN的作用是什么? CDN(Content Delivery Network,内容分发网络)是一种通过在多个节点上分布内容以提高网络性能、可靠性和可扩展性的网络解决方案。CDN通过在不同的地理位置部署服务器,使用户…...
【论文阅读】注意力机制与二维 TSP 问题
前置知识 注意力机制 见 这篇 二维 TSP 问题 给定二维平面上 nnn 个点的坐标 S{xi}i1nS\{x_i\}_{i1}^nS{xi}i1n,其中 xi∈[0,1]2x_i\in [0,1]^2xi∈[0,1]2,要找到一个 1∼n1\sim n1∼n 的排列 π\piπ ,使得目标函数 L(π∣s)∥xπ…...
[深入理解SSD系列 闪存实战2.1.7] NAND FLASH基本编程(写)操作及原理_NAND FLASH Program Operation 源码实现
前言 上面是我使用的NAND FLASH的硬件原理图,面对这些引脚,很难明白他们是什么含义, 下面先来个热身: 问1. 原理图上NAND FLASH只有数据线,怎么传输地址? 答1.在DATA0~DATA7上既传输数据,又传输地址 当ALE为高电平时传输的是地址, 问2. 从N...
PMP项目管理项目资源管理
目录1 项目资源管理概述2 规划资源管理3 估算活动资源4 获取资源5 建设团队6 管理团队7 控制资源1 项目资源管理概述 项目资源管理包括识别、获取和管理所需资源以成功完成项目的各个过程,这些过程有助于确保项目经理和项目团队在正确的时间和地点使用正确的资源。…...
程序的编译和链接
程序的编译和链接程序的编译和链接程序的两种环境翻译环境详解编译和链接预处理编译汇编链接运行环境程序的编译和链接 程序的两种环境 在ANSI C的任何一种实现中,存在两个不同的环境。 第1种是翻译环境,在这个环境中源代码被转换为可执行的机器指令。 …...
Win11的两个实用技巧系列之无法联网怎么办、耳机没声音的多种解决办法
Win11无法联网怎么办? win11安装后设备不能上网的解决办法Win11无法联网怎么办?电脑安装win11系统以后,发现不能上网,连接不上网络,该怎么办呢?下面我们就来看看win11安装后设备不能上网的解决办法Win11安装后&#x…...
【微信小程序】-- 自定义组件 - 数据监听器 - 案例 (三十五)
💌 所属专栏:【微信小程序开发教程】 😀 作 者:我是夜阑的狗🐶 🚀 个人简介:一个正在努力学技术的CV工程师,专注基础和实战分享 ,欢迎咨询! &…...
Linux - 第7节 - 进程间通信
1.进程间通信介绍 进程间通信目的: 数据传输:一个进程需要将它的数据发送给另一个进程 。 资源共享:多个进程之间共享同样的资源。 通知事件:一个进程需要向另一个或一组进程发送消息,通…...
# 数据完整性算法在shell及python中的实践
数据完整性算法在shell及python中的实践 文章目录数据完整性算法在shell及python中的实践1 预备知识1.1 摘要算法1.2 报文(数据)完整性校验1.3 python byte类型字符串与普通字符串区别2 传统方法(散列函数)2.1 在shell中实践2.2 在…...
QEMU启动x86-Linux内核
目录QEMU简介linux启动流程我的环境安装QEMU软件包安装源码安装编译linux内核编译busybox制作initramfs使用QEMU启动linux内核简化命令参考QEMU简介 QEMU(quick emulator)是一个通用的、开源的硬件模拟器,可以模拟不同硬件架构(如…...
C/C++每日一练(20230311)
目录 1. 计算阶乘的和 ★ 2. 基本计算器 ★★★ 3. N皇后 II ★★★ 🌟 每日一练刷题专栏 C/C 每日一练 专栏 Python 每日一练 专栏 1. 计算阶乘的和 计算:1!-2!3!-4!5!-6!7!-8!9!-10!,并输出计算结果。 注意:不全是…...
哪个牌子的洗地机耐用?耐用的洗地机推荐
作为当下非常热销的洗地机,它不仅解放了双手,使用也非常的便捷。是生活品质提高的最好代表,但是面对市面上让人眼花缭乱的洗地机,挑选几个来回都决定不了到底入手哪个好!为了能帮助大家选购到合适的洗地机,…...
搭建一个中心化的定时服务
1. 背景 在物联网络,很多设备之间都在进行交互,其中云端在远程交流中起到了很重要的作用。比如,一台设备想进行调温,但是需要知道此时房间的温度,那就需要定时去查询传感器测出来的房间温度,如果温度过高&a…...
【CSS】快速入门笔记
视频链接:https://www.bilibili.com/video/BV1mS4y1Z7Ga/?spm_id_from333.999.0.0&vd_source1ad00d913eae8281cbadad6ae66fb06c 文章目录一、CSS语法1.结构2.样式类型1)内联样式 Inline Style2)内部样式 Internal Style3)外部…...
第161篇 笔记-去中心化的含义
本文主要内容来自Vitalik Buterin的文章。“去中心化”这个词是在加密经济学领域用得最多的一个词,通常也作为辨别区块链的依据。然而,这个词也可能是被定义得最不恰当的一个词。数千小时的研究和价值数十亿美元哈希算力的投入都旨在实现去中心化&#x…...
「计算机组成原理」数据的表示和运算(二)
文章目录五、奇偶校验码六、算术逻辑单元ALU6.1 电路的基本原理6.2 加法器的设计6.2.1 一位全加器6.2.2 串行加法器6.2.3 串行进位的并行加法器6.2.4 并行进位的并行加法器七、补码加减运算器八、标志位的生成九、定点数的移位运算9.1 算数移位9.2 逻辑移位9.3 循环移位五、奇偶…...
建立自己的博客
环境安装: w10系统安装 第一步:安装git Git 官网: https://git-scm.com/ 第二步:安装Node.js Node.js官网:https://nodejs.org/zh-cn/ 使用cmd检测: node -v 第三步:安装Hexo Hexo官网:htt…...
Docker 安装mysql Mac 环境下
已安装桌面端 Docker (Mac安装Docker) 安装方式一 打开链接 https://www.docker.com/products/docker-desktop 选择平台下载 安装方式二 安装homebrew /usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/m…...
网络编程(Modbus进阶)
思维导图 Modbus RTU(先学一点理论) 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议,由 Modicon 公司(现施耐德电气)于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…...
JavaSec-RCE
简介 RCE(Remote Code Execution),可以分为:命令注入(Command Injection)、代码注入(Code Injection) 代码注入 1.漏洞场景:Groovy代码注入 Groovy是一种基于JVM的动态语言,语法简洁,支持闭包、动态类型和Java互操作性,…...
Ubuntu系统下交叉编译openssl
一、参考资料 OpenSSL&&libcurl库的交叉编译 - hesetone - 博客园 二、准备工作 1. 编译环境 宿主机:Ubuntu 20.04.6 LTSHost:ARM32位交叉编译器:arm-linux-gnueabihf-gcc-11.1.0 2. 设置交叉编译工具链 在交叉编译之前&#x…...
Zustand 状态管理库:极简而强大的解决方案
Zustand 是一个轻量级、快速和可扩展的状态管理库,特别适合 React 应用。它以简洁的 API 和高效的性能解决了 Redux 等状态管理方案中的繁琐问题。 核心优势对比 基本使用指南 1. 创建 Store // store.js import create from zustandconst useStore create((set)…...
Java 8 Stream API 入门到实践详解
一、告别 for 循环! 传统痛点: Java 8 之前,集合操作离不开冗长的 for 循环和匿名类。例如,过滤列表中的偶数: List<Integer> list Arrays.asList(1, 2, 3, 4, 5); List<Integer> evens new ArrayList…...
边缘计算医疗风险自查APP开发方案
核心目标:在便携设备(智能手表/家用检测仪)部署轻量化疾病预测模型,实现低延迟、隐私安全的实时健康风险评估。 一、技术架构设计 #mermaid-svg-iuNaeeLK2YoFKfao {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg…...
镜像里切换为普通用户
如果你登录远程虚拟机默认就是 root 用户,但你不希望用 root 权限运行 ns-3(这是对的,ns3 工具会拒绝 root),你可以按以下方法创建一个 非 root 用户账号 并切换到它运行 ns-3。 一次性解决方案:创建非 roo…...
解决本地部署 SmolVLM2 大语言模型运行 flash-attn 报错
出现的问题 安装 flash-attn 会一直卡在 build 那一步或者运行报错 解决办法 是因为你安装的 flash-attn 版本没有对应上,所以报错,到 https://github.com/Dao-AILab/flash-attention/releases 下载对应版本,cu、torch、cp 的版本一定要对…...
使用 SymPy 进行向量和矩阵的高级操作
在科学计算和工程领域,向量和矩阵操作是解决问题的核心技能之一。Python 的 SymPy 库提供了强大的符号计算功能,能够高效地处理向量和矩阵的各种操作。本文将深入探讨如何使用 SymPy 进行向量和矩阵的创建、合并以及维度拓展等操作,并通过具体…...
MySQL 知识小结(一)
一、my.cnf配置详解 我们知道安装MySQL有两种方式来安装咱们的MySQL数据库,分别是二进制安装编译数据库或者使用三方yum来进行安装,第三方yum的安装相对于二进制压缩包的安装更快捷,但是文件存放起来数据比较冗余,用二进制能够更好管理咱们M…...
