Android RxJava框架源码解析(四)
目录
- 一、观察者Observer创建过程
- 二、被观察者Observable创建过程
- 三、subscribe订阅过程
- 四、map操作符
- 五、线程切换原理
简单示例1:
private Disposable mDisposable;
Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> e) throws Exception {e.onNext("test");}}).subscribe(new Observer<String>() { @Overridepublic void onSubscribe(Disposable d) {mDisposable = d;}@Overridepublic void onNext(String s) {}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}});@Overrideprotected void onDestroy() {super.onDestroy();if (mDisposable != null) {if (!mDisposable.isDisposed()) {mDisposable.dispose();}}}
特别注意:上面示例代码中的mDisposable最后必须要释放掉,不然会出现内存泄漏
一、观察者Observer创建过程
首先对观察者Observer源码开始进行简单分析下:
Observer.java
public interface Observer<T> {//表示一执行subscribe订阅就会执行该函数,这个函数一定执行在主线程中void onSubscribe(@NonNull Disposable d);// 表示拿到上一个流程的数据void onNext(@NonNull T t);// 表示拿到上一个流程的错误数据void onError(@NonNull Throwable e);// 表示事件流程结束void onComplete();
}
具体的对象创建是在上面示例代码1中的new Observer<String>()操作,这个称这个为自定义观察者。
二、被观察者Observable创建过程
分析完观察者Observer的创建,现在来分析下被观察者Observable的创建流程,
Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> e) throws Exception {e.onNext("test");}})
将new ObservableOnSubscribe()过程可以理解为是自定义source的过程。
new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> e) throws Exception {e.onNext("test");}}
执行Observable.create()代码流程
Observable.java
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {ObjectHelper.requireNonNull(source, "source is null"); //校验是否为nullreturn RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));}
其中,RxJavaPlugins.onAssembly()采用了hook技术,如果没有重写RxJavaPlugins.setOnObservableAssembly()方法,这个可以不要考虑。
ObservableCreate.java
public final class ObservableCreate<T> extends Observable<T> {final ObservableOnSubscribe<T> source; // 自定义sourcepublic ObservableCreate(ObservableOnSubscribe<T> source) {this.source = source;}@Overrideprotected void subscribeActual(Observer<? super T> observer) {CreateEmitter<T> parent = new CreateEmitter<T>(observer);observer.onSubscribe(parent);try {source.subscribe(parent);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);parent.onError(ex);}}static final class CreateEmitter<T>extends AtomicReference<Disposable>implements ObservableEmitter<T>, Disposable {private static final long serialVersionUID = -3434801548987643227L;final Observer<? super T> observer;CreateEmitter(Observer<? super T> observer) {this.observer = observer;}@Overridepublic void onNext(T t) {if (t == null) {onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));return;}if (!isDisposed()) {observer.onNext(t);}}@Overridepublic void onError(Throwable t) {if (!tryOnError(t)) {RxJavaPlugins.onError(t);}}@Overridepublic boolean tryOnError(Throwable t) {if (t == null) {t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");}if (!isDisposed()) {try {observer.onError(t);} finally {dispose();}return true;}return false;}@Overridepublic void onComplete() {if (!isDisposed()) {try {observer.onComplete();} finally {dispose();}}}@Overridepublic void setDisposable(Disposable d) {DisposableHelper.set(this, d);}@Overridepublic void setCancellable(Cancellable c) {setDisposable(new CancellableDisposable(c));}@Overridepublic ObservableEmitter<T> serialize() {return new SerializedEmitter<T>(this);}@Overridepublic void dispose() {DisposableHelper.dispose(this);}@Overridepublic boolean isDisposed() {return DisposableHelper.isDisposed(get());}}/*** Serializes calls to onNext, onError and onComplete.** @param <T> the value type*/static final class SerializedEmitter<T>extends AtomicIntegerimplements ObservableEmitter<T> {private static final long serialVersionUID = 4883307006032401862L;final ObservableEmitter<T> emitter;final AtomicThrowable error;final SpscLinkedArrayQueue<T> queue;volatile boolean done;SerializedEmitter(ObservableEmitter<T> emitter) {this.emitter = emitter;this.error = new AtomicThrowable();this.queue = new SpscLinkedArrayQueue<T>(16);}@Overridepublic void onNext(T t) {if (emitter.isDisposed() || done) {return;}if (t == null) {onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));return;}if (get() == 0 && compareAndSet(0, 1)) {emitter.onNext(t);if (decrementAndGet() == 0) {return;}} else {SimpleQueue<T> q = queue;synchronized (q) {q.offer(t);}if (getAndIncrement() != 0) {return;}}drainLoop();}@Overridepublic void onError(Throwable t) {if (!tryOnError(t)) {RxJavaPlugins.onError(t);}}@Overridepublic boolean tryOnError(Throwable t) {if (emitter.isDisposed() || done) {return false;}if (t == null) {t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");}if (error.addThrowable(t)) {done = true;drain();return true;}return false;}@Overridepublic void onComplete() {if (emitter.isDisposed() || done) {return;}done = true;drain();}void drain() {if (getAndIncrement() == 0) {drainLoop();}}void drainLoop() {ObservableEmitter<T> e = emitter;SpscLinkedArrayQueue<T> q = queue;AtomicThrowable error = this.error;int missed = 1;for (;;) {for (;;) {if (e.isDisposed()) {q.clear();return;}if (error.get() != null) {q.clear();e.onError(error.terminate());return;}boolean d = done;T v = q.poll();boolean empty = v == null;if (d && empty) {e.onComplete();return;}if (empty) {break;}e.onNext(v);}missed = addAndGet(-missed);if (missed == 0) {break;}}}@Overridepublic void setDisposable(Disposable s) {emitter.setDisposable(s);}@Overridepublic void setCancellable(Cancellable c) {emitter.setCancellable(c);}@Overridepublic boolean isDisposed() {return emitter.isDisposed();}@Overridepublic ObservableEmitter<T> serialize() {return this;}}}
这里将ObservableCreate的源码全部放在这,作为一个埋点
其实,Observable.create()方法主要功能就是创建了一个ObservableCreate对象,并将自定义的source传给ObservableCreate。该方法最终返回的是ObserverableCreate对象。
三、subscribe订阅过程
分析执行subscribe()订阅流程,并将自定义观察者作为参数传入。
Observable.java
@Overridepublic final void subscribe(Observer<? super T> observer) {ObjectHelper.requireNonNull(observer, "observer is null"); // 功能校验,判定observer是否为nulltry {observer = RxJavaPlugins.onSubscribe(this, observer);ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");subscribeActual(observer); } catch (NullPointerException e) { // NOPMDthrow e;} catch (Throwable e) {Exceptions.throwIfFatal(e);// can't call onError because no way to know if a Disposable has been set or not// can't call onSubscribe because the call might have set a Subscription alreadyRxJavaPlugins.onError(e);NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");npe.initCause(e);throw npe;}}
首先会执行一些功能校验,最后执行到subscribeActual()方法中。
Observable.java
protected abstract void subscribeActual(Observer<? super T> observer);
subscribeActual()是一个抽象类,从而最终调用的是ObservableCreate的subscribeActual()方法中。
ObservableCreate.java
@Overrideprotected void subscribeActual(Observer<? super T> observer) { // observer为自定义观察者// 自定义一个CreateEmitter发射器CreateEmitter<T> parent = new CreateEmitter<T>(observer); // 执行该方法就会执行自定义观察者的onSubscribe()方法中observer.onSubscribe(parent);try {source.subscribe(parent);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);parent.onError(ex);}}
subscribeActual()方法里面会执行如下三个操作:
1)CreateEmitter<T> parent = new CreateEmitter<T>(observer); --> 首先会创建一个CreateEmitter发射器,并将自定义观察者传入该发射器中
2)observer.onSubscribe(parent);–> 执行自定义观察者的onSubscribe()方法,所以该方法也是最先执行调用,并且一定在主线程中
3)source.subscribe(parent); -->执行自定义source的subscribe()订阅操作,从而跳转到示例代码1中ObservableOnSubscribe的subscribe()方法,并将CreateEmitter发射器作为参数传入进去
new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> e) throws Exception {e.onNext("test");}}
执行e.onNext("test")就会跳转到CreateEmitter发射器中的onNext()方法
ObservableCreate.java
static final class CreateEmitter<T>extends AtomicReference<Disposable>implements ObservableEmitter<T>, Disposable {final Observer<? super T> observer;CreateEmitter(Observer<? super T> observer) {this.observer = observer;}@Overridepublic void onNext(T t) {if (t == null) {onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));return;}if (!isDisposed()) {observer.onNext(t); //执行该流程,observer为自定义观察者}}...}
该observer为上面流程中自定义的CreateEmitter发射器CreateEmitter<T> parent = new CreateEmitter<T>(observer);传入进来的自定义观察者对象,执行observer.onNext(t)该语句就调到示例代码1中的
@Override
public void onNext(String s) {}
Observable与Observer订阅的过程时序图如下:

在标准的观察者设计模式中,是一个“被观察者”,多个“观察者”,并且需要“被观察者”发出改变通知后,所以的“观察者”才能观察到
在RxJava观察者设计模式中,是多个“被观察者”,一个“观察者”,并且需要 起点(被观察者) 和 终点(观察者) 在“订阅”一次后,才发出改变通知,终点(观察者)才能观察到
图1:RxJava简单订阅过程:

四、map操作符
加入map操作符之后的简单示例代码2:
private Disposable mDisposable;// 创建ObserverCreate
Observable.create(new ObservableOnSubscribe<String>() { //自定义source@Overridepublic void subscribe(ObservableEmitter<String> e) throws Exception {e.onNext("test");}})// ObservableCreate.map.map(new Function<String, String>() {@Overridepublic String apply(String s) throws Exception {return s;}})// ObservableMap.subscribe.subscribe(new Observer<String>() { //自定义观察者@Overridepublic void onSubscribe(Disposable d) {mDisposable = d;}@Overridepublic void onNext(String s) {}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}});@Overrideprotected void onDestroy() {super.onDestroy();if (mDisposable != null) {if (!mDisposable.isDisposed()) {mDisposable.dispose();}}}
这个示例代码2写法采用装饰模型
图2加入map操作符之后的流程:

从①~⑥流程简称为封包裹,⑦ ~⑨流程简称为拆包裹
其实图1与图2的区别不大,主要就是多了一个ObservableMap封包裹的流程,其他流程都类似。针对这个区别进行代码流程阐述下:
从示例代码2中执行map()操作进行分析:
Observable.java
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {ObjectHelper.requireNonNull(mapper, "mapper is null");return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));}
进行创建ObservableMap对象
ObservableMap.java
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {final Function<? super T, ? extends U> function;public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {super(source); //source指ObservableCreatethis.function = function; // 自定义的Function方法}@Overridepublic void subscribeActual(Observer<? super U> t) {source.subscribe(new MapObserver<T, U>(t, function)); //这里面的t为下一层包裹即图2中的自定义观察者,source指上一层ObservableCreate}
...
}
这里需要注意,在ObservableMap()构造函数中,参数source指从上一层传过来的ObservableCreate对象,参数function指示例代码2中的new Function()方法。
.map(new Function<String, String>()
执行示例代码2中的.subscribe()其实就是执行到了ObservableMap类的subscribeActual()方法,在这个方法中会对MapObserver进行封装一层包裹,并将下一层的包裹即自定义观察者也就是参数t传入。
MapObserver为ObservableMap的内部类。
ObservableMap.java
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {final Function<? super T, ? extends U> mapper;MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {super(actual); // actual为自定义观察者this.mapper = mapper;}...
}
在执行图2的第⑧步流程时,就会调用执行包裹1的onNext()方法,即MapObserver类的onNext();
ObservableMap.java
@Override
public void onNext(T t) {if (done) {return;}if (sourceMode != NONE) {actual.onNext(null);return;}U v;try {// 代码1v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");} catch (Throwable ex) {fail(ex);return;}// 代码2actual.onNext(v);
}
1:代码1
执行mapper.apply(t)流程的时候,其实就是调用了示例代码2中的apply()方法。
Function.java
public interface Function<T, R> {R apply(@NonNull T t) throws Exception;
}
@Override
public String apply(String s) throws Exception {return s;
}
2:代码2
actual.onNext(v);中的actual是在ObservableMap构造函数传过来的,actual对应图2中的自定义观察者对象,也就是对应图2中的第9步流程。
五、线程切换原理
subscribeOn:给上面代码分配线程
observeOn:给下面代码分配线程
Scheduler分类:
| 调度器类型 | 效果 |
|---|---|
| Schedulers.computation() | 用于计算任务,如事件循环或回调处理,不要用于IO操作(IO操作使用Schedulers.io());默认线程数等于处理器的数量 |
| Schedulers.from(executor) | 使用指定的Executor作为调度器 |
| Schedulers.immediate() | 在当前线程立即开始执行任务 |
| Schedulers.io() | 用于IO密集型任务 |
| Schedulers.newThread() | 为每个任务创建一个新任务 |
| Schedulers.trampoline() | 当其他排队的任务完成后,在当前线程排队开始执行 |
| AndroidSchedulers.mainThread() | 用于Android的UI更新操作 |
相关文章:
Android RxJava框架源码解析(四)
目录一、观察者Observer创建过程二、被观察者Observable创建过程三、subscribe订阅过程四、map操作符五、线程切换原理简单示例1: private Disposable mDisposable; Observable.create(new ObservableOnSubscribe<String>() {Overridepublic void subscribe(…...
Linux信号-进程退出状态码
当进程因收到信号被终止执行退出后,父进程可以通过wait或waitpid得到它的exit code。进程被各信号终止的退出状态码总结如下:信号编号信号名称信号描述默认处理方式Exit code1SIGHUP挂起终止12SIGINT终端中断终止23SIGQUIT终端退出终止、coredump1314SIG…...
springcloud+vue实现图书管理系统
一、前言: 今天我们来分享一下一个简单的图书管理系统 我们知道图书馆系统可以有两个系统,一个是管理员管理图书的系统,管理员可以(1)查找某一本图书情况、(2)增加新的图书、(3&…...
GEE学习笔记 六十:GEE中生成GIF动画
生成GIF动画这个是GEE新增加的功能之一,这一篇文章我会简单介绍一下如何使用GEE来制作GIF动画。 相关API如下: 参数含义: params:设置GIF动画显示参数,详细的参数可以参考ee.data.getMapId() callback:回调…...
react中的useEffect
是函数组件中执行的副作用,副作用就是指每次组件更新都会执行的函数,可以用来取代生命周期。 1. 基本用法 import { useEffect } from "react"; useEffect(()>{console.log(副作用); });2. 副作用分为需要清除的和不需要清除 假如设置…...
故障安全(Crash-Safe) 复制
二进制日志记录是故障安全的:MySQL 仅记录完成的事件或事务使用 sync-binlog 提高安全性默认值是1,最安全的,操作系统在每次事务后写入文件将svnc-binloq 设置为0,当操作系统根据其内部规则写入文件的同时服务器崩溃时性能最好但事务丢失的可…...
Spring aop之针对注解
前言 接触过Spring的都知道,aop是其中重要的特性之一。笔者在开发做项目中,aop更多地是要和注解搭配:在某些方法上加上自定义注解,然后要对这些方法进行增强(很少用execution指定,哪些包下的哪些方法要增强)。那这时就…...
【JavaScript速成之路】JavaScript数据类型转换
📃个人主页:「小杨」的csdn博客 🔥系列专栏:【JavaScript速成之路】 🐳希望大家多多支持🥰一起进步呀! 文章目录前言数据类型转换1,转换为字符串型1.1,利用“”拼接转换成…...
21-绑定自定义事件
绑定自定义事件 利用自定义事件获取子组件的值 父组件给子组件绑定一个自定义事件,实际上是绑定到了子组件的实例对象vc上: <!-- 自定义myEvent事件 --> <Student v-on:myEventgetStudentName/>在父组件中编写getStudentName的实现&#…...
【Mysql】触发器
【Mysql】触发器 文章目录【Mysql】触发器1. 触发器1.1 介绍1.2 语法1.2.1 创建触发器1.2.2 查看触发器1.2.3 删除触发器1.2.4 案例1. 触发器 1.1 介绍 触发器是与表有关的数据库对象,指在insert、update、delete之前(BEFORE)或之后(AFTER),触发并执行…...
CODESYS开发教程11-库管理器
今天继续我们的小白教程,老鸟就不要在这浪费时间了😊。 前面一期我们介绍了CODESYS的文件读写函数库SysFile。大家可能发现了,在CODESYS的开发中实际上是离不开各种库的使用,其中包括系统库、第三方库以及用户自己开发的库。实际…...
【UnityAR相关】Unity Vuforia扫图片成模型具体步骤
1 资产准备 导入要生成的fbx模型(带有材质), 你会发现导入fbx的材质丢失了: 选择Standard再Extract Materials导出材质到指定文件夹下(我放在Assets->Materials了 ok啦! 材质出现了, 模型…...
2023年全国最新保安员精选真题及答案2
百分百题库提供保安员考试试题、保安职业资格考试预测题、保安员考试真题、保安职业资格证考试题库等,提供在线做题刷题,在线模拟考试,助你考试轻松过关。 21.一般来说,最经济的巡逻方式是()。 A:步巡 B:…...
keil5安装了pack包但是还是不能选择device
一开始,我以为是keil5无法安装 STM32 芯片包,打开device倒是可以看到stm公司的芯片包,但是没有我想要的stm32f1。 我按照网上的一些说法,找到了这个STM32F1 的pack芯片包,但是我双击安装的时候,它的安装位…...
秒杀系统设计
1.秒杀系统的特点 瞬时高并发 2.预防措施 2.1.流量限制 对于一个相同的用户,限制请求的频次对于一个相同的IP,限制请求的频次验证码,减缓用户请求的次数活动开启之前,按钮先置灰,防止无效的请求流入系统࿰…...
全面认识数据指标体系
什么是数据指标体系? 看了下百度百科,竟然没有数据指标这个词条,看来这个词大家平时还用的不多啊。那只有间接偷懒一下,分别查下指标和数据这两个词条的含义,在组合起来看看。 数据:数据是指对客观事件进…...
热榜首推!阿里内部都在用的Java后端面试笔记,主流技术全在里面了!备战2023Java面试,拿理想offer
纵观今年的技术招聘市场, Java依旧是当仁不让的霸主 !即便遭受 Go等新兴语言不断冲击,依旧岿然不动。究其原因:Java有着极其成熟的生态,这个不用我多说;Java在 运维、可观测性、可监 控性方面都有着非常优秀…...
Android架构设计——【 APT技术实现butterknife框架 】
APT简介 APT英文全称:Android annotation process tool是一种处理注释的工具,它对源代码文件进行检测找出其中的Annotation,使用Annotation进行额外的处理。 Annotation处理器在处理Annotation时可以根据源文件中的Annotation生成额外的源文…...
线程的基本概念
文章目录基础概念线程与进程什么是进程?什么是线程?进程和线程的区别:多线程什么是多线程?多线程的局限性串行、并行、并发同步异步、阻塞非阻塞线程的创建1、继承Thread类,重写run方法2、实现Runnable接口,…...
java面试题中常见名词注解
一.常见名词注解 1.mysql索引,索引数据结构,hash,二叉树,B树,B树,红黑树, mysql索引:帮助mysql高效获取数据的数据结构,通俗来说,数据库索引就好比一本书的…...
Vue2 第一节_Vue2上手_插值表达式{{}}_访问数据和修改数据_Vue开发者工具
文章目录 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染2. 插值表达式{{}}3. 访问数据和修改数据4. vue响应式5. Vue开发者工具--方便调试 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染 准备容器引包创建Vue实例 new Vue()指定配置项 ->渲染数据 准备一个容器,例如: …...
屋顶变身“发电站” ,中天合创屋面分布式光伏发电项目顺利并网!
5月28日,中天合创屋面分布式光伏发电项目顺利并网发电,该项目位于内蒙古自治区鄂尔多斯市乌审旗,项目利用中天合创聚乙烯、聚丙烯仓库屋面作为场地建设光伏电站,总装机容量为9.96MWp。 项目投运后,每年可节约标煤3670…...
【Go】3、Go语言进阶与依赖管理
前言 本系列文章参考自稀土掘金上的 【字节内部课】公开课,做自我学习总结整理。 Go语言并发编程 Go语言原生支持并发编程,它的核心机制是 Goroutine 协程、Channel 通道,并基于CSP(Communicating Sequential Processes࿰…...
04-初识css
一、css样式引入 1.1.内部样式 <div style"width: 100px;"></div>1.2.外部样式 1.2.1.外部样式1 <style>.aa {width: 100px;} </style> <div class"aa"></div>1.2.2.外部样式2 <!-- rel内表面引入的是style样…...
C++ 求圆面积的程序(Program to find area of a circle)
给定半径r,求圆的面积。圆的面积应精确到小数点后5位。 例子: 输入:r 5 输出:78.53982 解释:由于面积 PI * r * r 3.14159265358979323846 * 5 * 5 78.53982,因为我们只保留小数点后 5 位数字。 输…...
浅谈不同二分算法的查找情况
二分算法原理比较简单,但是实际的算法模板却有很多,这一切都源于二分查找问题中的复杂情况和二分算法的边界处理,以下是博主对一些二分算法查找的情况分析。 需要说明的是,以下二分算法都是基于有序序列为升序有序的情况…...
【JavaWeb】Docker项目部署
引言 之前学习了Linux操作系统的常见命令,在Linux上安装软件,以及如何在Linux上部署一个单体项目,大多数同学都会有相同的感受,那就是麻烦。 核心体现在三点: 命令太多了,记不住 软件安装包名字复杂&…...
全志A40i android7.1 调试信息打印串口由uart0改为uart3
一,概述 1. 目的 将调试信息打印串口由uart0改为uart3。 2. 版本信息 Uboot版本:2014.07; Kernel版本:Linux-3.10; 二,Uboot 1. sys_config.fex改动 使能uart3(TX:PH00 RX:PH01),并让boo…...
【7色560页】职场可视化逻辑图高级数据分析PPT模版
7种色调职场工作汇报PPT,橙蓝、黑红、红蓝、蓝橙灰、浅蓝、浅绿、深蓝七种色调模版 【7色560页】职场可视化逻辑图高级数据分析PPT模版:职场可视化逻辑图分析PPT模版https://pan.quark.cn/s/78aeabbd92d1...
Yolov8 目标检测蒸馏学习记录
yolov8系列模型蒸馏基本流程,代码下载:这里本人提交了一个demo:djdll/Yolov8_Distillation: Yolov8轻量化_蒸馏代码实现 在轻量化模型设计中,**知识蒸馏(Knowledge Distillation)**被广泛应用,作为提升模型…...
