RxJava操作符变换过程
要使用Rxjava首先要导入两个包,其中rxandroid是rxjava在android中的扩展
implementation 'io.reactivex:rxandroid:1.2.1'implementation 'io.reactivex:rxjava:1.2.0'
我们在使用rxjava的操作符时都觉得很方便,但是rxjava是怎么实现操作符的转换呢,以下面的代码进行分析
String host = "https://baidu.com/";Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("message");subscriber.onCompleted();}}).map(new Func1<String, String>() {@Overridepublic String call(String s) {return host+s;}}).subscribe(new Subscriber<String>() {@Overridepublic void onCompleted() {ILog.LogDebug("subscriber onCompleted is come in");}@Overridepublic void onError(Throwable e) {ILog.LogDebug("subscriber onError is come in");}@Overridepublic void onNext(String s) {ILog.LogDebug(s);}});
上面的代码是链式调用,为了方便理解,我把上面的代码拆分成了下面样式
String host = "https://baidu.com/";Observable<String> obs1 = Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("message");subscriber.onCompleted();}});Observable<String> obs2 = obs1.map(new Func1<String, String>() {@Overridepublic String call(String s) {return host+s;}});obs2.subscribe(new Subscriber<String>() {@Overridepublic void onCompleted() {ILog.LogDebug("subscriberLast onCompleted is come in");}@Overridepublic void onError(Throwable e) {ILog.LogDebug("subscriberLast onError is come in");}@Overridepublic void onNext(String s) {ILog.LogDebug(s);}});
上面代码会打印,我们将一步一步分析,打印是怎么来的
https://www.baidu.com/message
subscriberLast onCompleted is come in
obs1是我们的原始Observable, obs2是我们变换过的Observable
首先从obs1的创建开始,就是Observable的创建过程,有不理解的可以先看RXJava的创建订阅过程
在osb1的创建过程create函数需要一个OnSubscribe对象,为了方便理解,就暂不拆开来写,代码虽然用的是匿名对象,我们暂且叫它onSubscribe1
Observable<String> obs1 = Observable.create(new Observable.OnSubscribe<String>() { //onSubscribe1@Overridepublic void call(Subscriber<? super String> subscriber) { subscriber.onNext("observable call onNext0");subscriber.onStart();subscriber.onNext("observable call onNext");subscriber.onCompleted();subscriber.onNext("observable call onNext1");}});
obs1创建成功后调用了map方法,map方法又返回了一个Observable,就是我们的obs2,同理,map方法的参数我们叫它func1
Observable<String> obs2 = obs1.map(new Func1<String, String>() { //func1@Overridepublic String call(String s) {return host+s;}});
随后我们又使用了osb2进行了订阅,同理subscribe方法的参数我们叫它subscriberLast
obs2.subscribe(new Subscriber<String>() { //subscriberLast@Overridepublic void onCompleted() {ILog.LogDebug("subscriber onCompleted is come in");}@Overridepublic void onError(Throwable e) {ILog.LogDebug("subscriber onError is come in");}@Overridepublic void onNext(String s) {ILog.LogDebug("subscriber onNext is come in s = "+s);}});
于是有了下面的关系,到这里还都很好理解
然后我们来分析obs1.map调用了map方法,rxjava做了什么
public class Observable<T> {
....public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {return create(new OnSubscribeMap<T, R>(this, func));}public static <T> Observable<T> create(OnSubscribe<T> f) {return new Observable<T>(RxJavaHooks.onCreate(f));}
....
}
map方法又调用了create方法,create方法的调用我们已经在RxJava的调用过程中讲过,传入的参数是一个OnSubscribe对象,OnSubscribeMap实现了OnSubscribe接口,我们把new出来的OnSubscribeMap对象暂时叫做onSubscribeMap1构造方法传入的参数是obs1和func1, 那我们再来看OnSubscribeMap类
public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {final Observable<T> source;final Func1<? super T, ? extends R> transformer;public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {this.source = source;this.transformer = transformer;}@Overridepublic void call(final Subscriber<? super R> o) {MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);o.add(parent);source.unsafeSubscribe(parent);}static final class MapSubscriber<T, R> extends Subscriber<T> {final Subscriber<? super R> actual;final Func1<? super T, ? extends R> mapper;boolean done;public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {this.actual = actual;this.mapper = mapper;}@Overridepublic void onNext(T t) {R result;try {result = mapper.call(t);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);unsubscribe();onError(OnErrorThrowable.addValueAsLastCause(ex, t));return;}actual.onNext(result);}@Overridepublic void onError(Throwable e) {if (done) {RxJavaHooks.onError(e);return;}done = true;actual.onError(e);}@Overridepublic void onCompleted() {if (done) {return;}actual.onCompleted();}@Overridepublic void setProducer(Producer p) {actual.setProducer(p);}}}
我们知道在Observable创建好后,调用了subscribe方法就可以进行订阅了,最后调用的也是Observable创建时传入的OnSubscribe对象的call方法,以obs1的创建举例,也就是我们这里的onSubscribe1,不懂得去看RxJava的调用过程
因为我们最后执行subscribe订阅方法的是obs2那么也就会调用obs2的OnSubscribe对象,那么obs2的OnSubscribe对象是谁呢,就是onSubscribeMap1那么执行完订阅就会调用onSubscribeMap1的call方法
@Overridepublic void call(final Subscriber<? super R> o) {MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);o.add(parent);source.unsafeSubscribe(parent);}
这里的source是OnSubscribeMap构造方法调用时初始化的也就是obs1,transformer 是我们的func1
public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {this.source = source;this.transformer = transformer;}
现在又有了下面的关系
在onSubscribeMap1的call方法中,一共有三行代码,这三行代码很重要,我们一行一行分析
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
实例化一个MapSubscriber对象parent ,MapSubscriber继承了Subscriber,所以MapSubscriber也是一个观察者;parent 持有了o,o是也就是subscriberLast,还持有了transformer,也就是func1。这里一会要详细分析
o.add(parent);
o是一个Subscriber,也是subscriberLast,上面的代码直接把parent添加到了subscriberLast的SubscriptionList列表。这里一会还要说一下。在看下一条代码
source.unsafeSubscribe(parent);
我们知道source也就是我们的obs1,obs1的订阅操作就是在这里发生的。至于为什么用unsafeSubscribe方法, 我们一会在分析
我们重新梳理一下,
1、obs2的订阅方法subscribe执行导致了obs2的onSubscribeMap1实例的call方法被执行;
2、onSubscribeMap1的call方法中又执行了obs1的订阅;obs1的观察者就是parent;
3、obs1的订阅必然会导致obs1 的onSubscribe实例onSubscribe1的call方法被执行。
4、在onSubscribe1的call方法中我们又调用了
subscriber.onNext("message");subscriber.onCompleted();
这里的subscriber也就是parent,必然会调用parent的next方法并传入message,和onCompleted。
5、继续分析parent
static final class MapSubscriber<T, R> extends Subscriber<T> {final Subscriber<? super R> actual;final Func1<? super T, ? extends R> mapper;boolean done;public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {this.actual = actual;this.mapper = mapper;}@Overridepublic void onNext(T t) {R result;try {result = mapper.call(t);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);unsubscribe();onError(OnErrorThrowable.addValueAsLastCause(ex, t));return;}actual.onNext(result);}@Overridepublic void onError(Throwable e) {if (done) {RxJavaHooks.onError(e);return;}done = true;actual.onError(e);}@Overridepublic void onCompleted() {if (done) {return;}actual.onCompleted();}@Overridepublic void setProducer(Producer p) {actual.setProducer(p);}}
parent是MapSubscriber,MapSubscriber继承了Subscriber,MapSubscriber的构造方法需要两个参数Subscriber和Func1,通过之前的分析,知道actual就是subscriberLast,mapper就是fun1
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {this.actual = actual;this.mapper = mapper;}
用一张图来说明下

static final class MapSubscriber<T, R> extends Subscriber<T> {.....@Overridepublic void onNext(T t) {R result;try {result = mapper.call(t); //这里调用,mapper就是fun1,的call方法,这里的t就是message,result就是转换后的字符串} catch (Throwable ex) {Exceptions.throwIfFatal(ex);unsubscribe();onError(OnErrorThrowable.addValueAsLastCause(ex, t));return;}actual.onNext(result);}.....
}
在parent的onnext方法中调用了func1的call方法,还记得我们在func1的call方法中写的什么,没错就是转换字符串,call方法的返回值就是转换后的字符串
final String host = "https://www.baidu.com/";Observable<String> obs2 = obs1.map(new Func1<String, String>() {@Overridepublic String call(String s) {return host+s; //func1的call方法 进行了字符串转换,这里的s就是message}});
继续调用了
actual.onNext(result);
我们知道actual就是subscriberLast,所以会调用subscriberLast的onNext方法,
obs2.subscribe(new Subscriber<String>() {@Overridepublic void onCompleted() {ILog.LogDebug("subscriberLast onCompleted is come in");}@Overridepublic void onError(Throwable e) {ILog.LogDebug("subscriber onError is come in");}@Overridepublic void onNext(String s) { //subscriberLast的onNext方法ILog.LogDebug(s);//最后会打印https://www.baidu.com/message}@Overridepublic void onStart() {super.onStart();}});
在obs1中我们还调用了onCompleted方法,先调用了actual.onCompleted();
actual也就是subscriberLast实例。
@Overridepublic void onCompleted() {if (done) {return;}actual.onCompleted();}
在这添加一张图方便更好的理解
简单的说就是先调用obs1的subscriber的onNext()方法,在onNext()方法中调用func1的call方法,处理数据源数据,然后再把处理过的数据源发射给obs2的subscriber
还记不记得上面的第二条代码 o.add(parent);为什么要把parent添加到o中呢,o也就是subscriberLast。
@Overridepublic void call(final Subscriber<? super R> o) {MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);o.add(parent);//第二条代码source.unsafeSubscribe(parent);//第三条代码}
subscriberLast的add方法最终会把parent添加到subscriberLast的SubscriptionList中,关于Subscriber请看RxJava中的Subscriber。再执行SubscriptionList的解绑方法unsubscribe会把subscriptions中的Subscription一并解绑,也就是会把parent和obs1的绑定关系解除。
那上面第三条代码为什么调用的是unsafeSubscribe方法呢,记得我们之前分析RxJava的订阅过程中看到的是最后包装了一个SafeSubscriber,再SafeSubscriber中会进行一些多线程的处理操作。
/** See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls* to user code from within an Observer"*/// if not already wrappedif (!(subscriber instanceof SafeSubscriber)) {// assign to `observer` so we return the protected versionsubscriber = new SafeSubscriber<T>(subscriber);}
observable中unsafeSubscribe方法也很简单,也没并由对传入的subscriber进行包装而是直接调用。
public class Observable<T> {
....
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {try {// new Subscriber so onStart itsubscriber.onStart();// allow the hook to intercept and/or decorateRxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);return RxJavaHooks.onObservableReturn(subscriber);} catch (Throwable e) {// special handling for certain Throwable/Error/Exception typesExceptions.throwIfFatal(e);// if an unhandled error occurs executing the onSubscribe we will propagate ittry {subscriber.onError(RxJavaHooks.onObservableError(e));} catch (Throwable e2) {Exceptions.throwIfFatal(e2);// if this happens it means the onError itself failed (perhaps an invalid function implementation)// so we are unable to propagate the error correctly and will just throwRuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);// TODO could the hook be the cause of the error in the on error handling.RxJavaHooks.onObservableError(r);// TODO why aren't we throwing the hook's return value.throw r; // NOPMD}return Subscriptions.unsubscribed();}}
....
我们回过来再看MapSubscriber中其实也已经进行了简单的处理工作,所以也就不需要使用SafeSubscriber了。
至此RxJava操作符变换过程就分析完了,欢迎大家补充和纠正。
相关文章:
RxJava操作符变换过程
要使用Rxjava首先要导入两个包,其中rxandroid是rxjava在android中的扩展 implementation io.reactivex:rxandroid:1.2.1implementation io.reactivex:rxjava:1.2.0我们在使用rxjava的操作符时都觉得很方便,但是rxjava是怎么实现操作符的转换呢࿰…...
开放平台订单接口
custom-自定义API操作 注册开通 taobao.custom 公共参数 名称 类型 必须 描述 key String 是 调用key(必须以GET方式拼接在URL中) secret String 是 调用密钥 api_name String 是 API接口名称(包括在请求地址中&a…...
CDN相关知识点
1、什么是CDN?CDN的作用是什么? CDN(Content Delivery Network,内容分发网络)是一种通过在多个节点上分布内容以提高网络性能、可靠性和可扩展性的网络解决方案。CDN通过在不同的地理位置部署服务器,使用户…...
【论文阅读】注意力机制与二维 TSP 问题
前置知识 注意力机制 见 这篇 二维 TSP 问题 给定二维平面上 nnn 个点的坐标 S{xi}i1nS\{x_i\}_{i1}^nS{xi}i1n,其中 xi∈[0,1]2x_i\in [0,1]^2xi∈[0,1]2,要找到一个 1∼n1\sim n1∼n 的排列 π\piπ ,使得目标函数 L(π∣s)∥xπ…...
[深入理解SSD系列 闪存实战2.1.7] NAND FLASH基本编程(写)操作及原理_NAND FLASH Program Operation 源码实现
前言 上面是我使用的NAND FLASH的硬件原理图,面对这些引脚,很难明白他们是什么含义, 下面先来个热身: 问1. 原理图上NAND FLASH只有数据线,怎么传输地址? 答1.在DATA0~DATA7上既传输数据,又传输地址 当ALE为高电平时传输的是地址, 问2. 从N...
PMP项目管理项目资源管理
目录1 项目资源管理概述2 规划资源管理3 估算活动资源4 获取资源5 建设团队6 管理团队7 控制资源1 项目资源管理概述 项目资源管理包括识别、获取和管理所需资源以成功完成项目的各个过程,这些过程有助于确保项目经理和项目团队在正确的时间和地点使用正确的资源。…...
程序的编译和链接
程序的编译和链接程序的编译和链接程序的两种环境翻译环境详解编译和链接预处理编译汇编链接运行环境程序的编译和链接 程序的两种环境 在ANSI C的任何一种实现中,存在两个不同的环境。 第1种是翻译环境,在这个环境中源代码被转换为可执行的机器指令。 …...
Win11的两个实用技巧系列之无法联网怎么办、耳机没声音的多种解决办法
Win11无法联网怎么办? win11安装后设备不能上网的解决办法Win11无法联网怎么办?电脑安装win11系统以后,发现不能上网,连接不上网络,该怎么办呢?下面我们就来看看win11安装后设备不能上网的解决办法Win11安装后&#x…...
【微信小程序】-- 自定义组件 - 数据监听器 - 案例 (三十五)
💌 所属专栏:【微信小程序开发教程】 😀 作 者:我是夜阑的狗🐶 🚀 个人简介:一个正在努力学技术的CV工程师,专注基础和实战分享 ,欢迎咨询! &…...
Linux - 第7节 - 进程间通信
1.进程间通信介绍 进程间通信目的: 数据传输:一个进程需要将它的数据发送给另一个进程 。 资源共享:多个进程之间共享同样的资源。 通知事件:一个进程需要向另一个或一组进程发送消息,通…...
# 数据完整性算法在shell及python中的实践
数据完整性算法在shell及python中的实践 文章目录数据完整性算法在shell及python中的实践1 预备知识1.1 摘要算法1.2 报文(数据)完整性校验1.3 python byte类型字符串与普通字符串区别2 传统方法(散列函数)2.1 在shell中实践2.2 在…...
QEMU启动x86-Linux内核
目录QEMU简介linux启动流程我的环境安装QEMU软件包安装源码安装编译linux内核编译busybox制作initramfs使用QEMU启动linux内核简化命令参考QEMU简介 QEMU(quick emulator)是一个通用的、开源的硬件模拟器,可以模拟不同硬件架构(如…...
C/C++每日一练(20230311)
目录 1. 计算阶乘的和 ★ 2. 基本计算器 ★★★ 3. N皇后 II ★★★ 🌟 每日一练刷题专栏 C/C 每日一练 专栏 Python 每日一练 专栏 1. 计算阶乘的和 计算:1!-2!3!-4!5!-6!7!-8!9!-10!,并输出计算结果。 注意:不全是…...
哪个牌子的洗地机耐用?耐用的洗地机推荐
作为当下非常热销的洗地机,它不仅解放了双手,使用也非常的便捷。是生活品质提高的最好代表,但是面对市面上让人眼花缭乱的洗地机,挑选几个来回都决定不了到底入手哪个好!为了能帮助大家选购到合适的洗地机,…...
搭建一个中心化的定时服务
1. 背景 在物联网络,很多设备之间都在进行交互,其中云端在远程交流中起到了很重要的作用。比如,一台设备想进行调温,但是需要知道此时房间的温度,那就需要定时去查询传感器测出来的房间温度,如果温度过高&a…...
【CSS】快速入门笔记
视频链接:https://www.bilibili.com/video/BV1mS4y1Z7Ga/?spm_id_from333.999.0.0&vd_source1ad00d913eae8281cbadad6ae66fb06c 文章目录一、CSS语法1.结构2.样式类型1)内联样式 Inline Style2)内部样式 Internal Style3)外部…...
第161篇 笔记-去中心化的含义
本文主要内容来自Vitalik Buterin的文章。“去中心化”这个词是在加密经济学领域用得最多的一个词,通常也作为辨别区块链的依据。然而,这个词也可能是被定义得最不恰当的一个词。数千小时的研究和价值数十亿美元哈希算力的投入都旨在实现去中心化&#x…...
「计算机组成原理」数据的表示和运算(二)
文章目录五、奇偶校验码六、算术逻辑单元ALU6.1 电路的基本原理6.2 加法器的设计6.2.1 一位全加器6.2.2 串行加法器6.2.3 串行进位的并行加法器6.2.4 并行进位的并行加法器七、补码加减运算器八、标志位的生成九、定点数的移位运算9.1 算数移位9.2 逻辑移位9.3 循环移位五、奇偶…...
建立自己的博客
环境安装: w10系统安装 第一步:安装git Git 官网: https://git-scm.com/ 第二步:安装Node.js Node.js官网:https://nodejs.org/zh-cn/ 使用cmd检测: node -v 第三步:安装Hexo Hexo官网:htt…...
Docker 安装mysql Mac 环境下
已安装桌面端 Docker (Mac安装Docker) 安装方式一 打开链接 https://www.docker.com/products/docker-desktop 选择平台下载 安装方式二 安装homebrew /usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/m…...
Greasy Fork:开源用户脚本平台的价值探索与实践指南
Greasy Fork:开源用户脚本平台的价值探索与实践指南 【免费下载链接】greasyfork An online repository of user scripts. 项目地址: https://gitcode.com/gh_mirrors/gr/greasyfork 一、价值定位:重新定义浏览器增强体验 1.1 开源平台的核心价值…...
GME-Qwen2-VL-2B效果实测:抽象文字如何匹配具体图片?
GME-Qwen2-VL-2B效果实测:抽象文字如何匹配具体图片? 1. 多模态搜索的突破性体验 想象一下,你脑海中浮现出一句富有哲理的句子:"人生不是裁决书",却想找一张能表达这种意境的图片。传统搜索引擎会怎么做&a…...
CMake+vcpkg环境配置避坑指南:从命令行到GUI的完整流程
CMakevcpkg环境配置避坑指南:从命令行到GUI的完整流程 刚接触C/C开发的工程师们,往往会在环境配置阶段经历"从入门到放弃"的心路历程。面对复杂的依赖库管理、跨平台编译问题,以及各种晦涩的错误提示,不少开发者甚至还没…...
OpenClaw任务编排:用Qwen3.5-4B-Claude实现爬虫+分析闭环
OpenClaw任务编排:用Qwen3.5-4B-Claude实现爬虫分析闭环 1. 为什么需要自动化任务编排 去年我接手了一个市场调研项目,需要每周从20多个网站抓取产品价格数据,清洗后生成趋势图表。最初用Python脚本手动Excel处理,每次要花3小时…...
Windows系统下Tesseract-OCR最全配置指南:从环境变量设置到多语言识别
Windows系统下Tesseract-OCR深度配置与实战指南 1. 环境准备与核心组件安装 在Windows平台上部署Tesseract-OCR需要特别注意64位系统的兼容性问题。首先需要从官方推荐的镜像站点下载最新稳定版本(目前推荐5.3.0以上版本),安装时务必勾选Addi…...
# Kafka 消息队列实战指南
大数据开发核心技能:Kafka 架构原理、生产者消费者配置、Spark/Flink 集成、消息积压处理、数据一致性保障、生产环境案例,从 0 到 1 掌握企业级消息队列📌 前言 真实生产问题 问题场景: 某电商公司数据平台遇到的问题:…...
优化实践:结合ResNet与CBAM注意力机制提升垃圾分类模型性能
1. ResNet与CBAM模块技术解析 1.1 ResNet的核心设计思想 ResNet(残差网络)之所以能成为深度学习领域的里程碑,关键在于它解决了传统深度神经网络的两大痛点:梯度消失问题和网络退化现象。想象一下教小朋友搭积木,当积木…...
解锁新可能:ArkData 在智能穿戴设备中的应用
解锁新可能:ArkData 在智能穿戴设备中的应用随着人们对健康生活的重视,智能穿戴设备愈发普及。这些设备能够实时收集心率、步数、睡眠等健康数据,为人们的健康管理提供重要参考。在这一背景下,如何高效管理和利用这些健康数据成为…...
OpenClaw日志分析:QwQ-32B任务执行效率监控
OpenClaw日志分析:QwQ-32B任务执行效率监控 1. 为什么需要监控OpenClaw任务执行效率 去年冬天,我部署了一个自动整理会议纪要的OpenClaw工作流。起初运行得很顺利,直到某天早上发现它漏掉了三场重要会议的记录。检查日志才发现,…...
别再混淆了!FFmpeg提取AAC/H264流时常见的3个容器格式误区
别再混淆了!FFmpeg提取AAC/H264流时常见的3个容器格式误区 第一次用FFmpeg提取音频时,我把.m4a文件直接重命名为.aac,结果播放器报错——这个看似简单的操作背后,隐藏着容器格式与编码格式的深层差异。本文将用真实踩坑案例&#…...
