当前位置: 首页 > news >正文

一看就懂的RxJava源码分析

一看就懂的RxJava源码分析

    • 前言
    • 零、观察模式简介
    • 一、RxJava使用示例一
    • 二、示例一源码分析
      • 0. 示例一代码分解
      • 1. RxJava中的观察者是谁?
      • 2. RxJava中的被观察者又是谁?
      • 3. 观察者又是如何安插到被观察者中的?
      • 4. 示例一RxJava源码整体关系类图
      • 4. RxJava的Hook机制
    • 三、RxJava使用示例二
    • 四、示例二源码分析
      • 1. 同样从subscribe方法看起,看看观察者如何被安插到被观察者中
      • 2. subscribe方法源码如下
      • 3. subscribeActual方法是被map方法返回的对象实现,我们来看看map方法的源码
      • 4. 因此接着第2步去ObservableMap类中查看subscribeActual方法的源码
      • 5. 第4步提到的ObservableMap类中的source和function是什么?我们看下源码
      • 6. 第5步提到的map方法属于哪个对象呢?
      • 7. 我们接着第4步继续分析
      • 8. 我们看看ObservableEmitter发送的数据如何被map处理,然后又被观察者接收
    • 五、后记

前言

RxJava是一种基于观察者模式的异步编程库,且支持响应式编程,适用于处理复杂的事件流。RxJava是观察者模式的扩展应用,其的核心概念是Observable和Observer。Observable表示一个异步事件流,Observer表示对这个事件流的观察者。当Observable发出一个事件时,Observer会收到这个事件并进行相应的处理。RxJava还提供了一些操作符,可以对事件流进行过滤、转换、组合等操作,从而更方便地处理事件流。
本文对于RxJava的源码进行分析,以更加彻底的了解RxJava的实现思路。

零、观察模式简介

前言中我们提到RxJava是观察者模式的扩展应用,那么学习RxJava的源码,肯定要懂观察者模式的,如果不懂这种设计模式,直接来看源码是比较吃力的。本文也是以观察者模式为切入点来讲解RxJava源码

大家可以看下我之前写的全网最全面最精华的设计模式讲解,从程序员转变为工程师的第一步,这篇文章中对于观察者模式的介绍。再啰嗦一句,设计模式非常重要,也非常难以掌握,因为它不是简单的技术,而是一种思想,这世界上最难学习的就是思想,希望各位博友能够真正将这种思想融入自己的思维中。
在这里插入图片描述
简单来说,观察者模式有两个主要角色,一个是观察者,一个是被观察者,而实现观察者模式的核心是将观察者安插(聚合)到被观察者之中,这样被观察者的一举一动都能被观察者所捕捉,而这也是分析RxJava源码的核心,如果不能理解该核心,请移步我上面提到的文章,里面有比较详细的介绍。

一、RxJava使用示例一

使用RxJava需要在build.gradle中引用RxJava库

implementation 'io.reactivex.rxjava3:rxjava:3.0.4'
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'

最简单的RxJava使用示例

public class MainActivity extends AppCompatActivity {private final static String TAG = MainActivity.class.getSimpleName();@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {emitter.onNext("sun");emitter.onNext("hao");}}).subscribe(new Observer<String>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {Log.d(TAG, "onSubscribe");}@Overridepublic void onNext(@NonNull String  s) {Log.d(TAG, "onNext -》 "+s);}@Overridepublic void onError(@NonNull Throwable e) {Log.d(TAG, "onError");}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}});}
}

运行结果

2023-11-17 04:12:57.125 15872-15872/com.xiaomi.myaxjava D/MainActivity: onSubscribe
2023-11-17 04:12:57.125 15872-15872/com.xiaomi.myaxjava D/MainActivity: onNext -》 sun
2023-11-17 04:12:57.126 15872-15872/com.xiaomi.myaxjava D/MainActivity: onNext -》 hao

二、示例一源码分析

示例一的代码用于实现观察者监控被观察者发送的字符串,并进行打印。

0. 示例一代码分解

  • 为了便于我们更好的分析源码,我们将上面示例一中的匿名类实现全部去掉。匿名类是简化了代码,但有时候也提高了代码阅读的难度。去掉匿名类后的代码如下,看上去是不是容易理解多了。
public class MainActivity extends AppCompatActivity {private final static String TAG = MainActivity.class.getSimpleName();@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);MySource mySource = new MySource();MyObserver myObserver = new MyObserver();Observable.create(mySource).subscribe(myObserver);}private class MySource implements ObservableOnSubscribe<String>{@Overridepublic void subscribe(@NonNull ObservableEmitter emitter) throws Throwable {emitter.onNext("sun");emitter.onNext("hao");}}private class MyObserver implements Observer<String>{@Overridepublic void onSubscribe(@NonNull Disposable d) {Log.d(TAG, "onSubscribe");}@Overridepublic void onNext(@NonNull String s) {Log.d(TAG, "onNext -》 "+s);}@Overridepublic void onError(@NonNull Throwable e) {Log.d(TAG, "onError");}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}}
}
  • RxJava核心代码就一行:
    Observable.create(mySource).subscribe(myObserver);
  • 上面我们提到RxJava是观察者模式的扩展应用,而观察者模式的核心是将观察者Observer安插到被观察者Observable之中,监视被观察者的一举一动。下面我们需要找出观察者是谁,被观察者是谁,观察者又如何被插入到被观察者之中即可。

1. RxJava中的观察者是谁?

RxJava通过subscribe方法订阅观察者(subscribe方法其实就是实现将观察者安插到被观察者,后面我们会详细介绍如何实现安插的),即示例一代码分解后中的观察者是myObserver。

private class MyObserver implements Observer<String>{@Overridepublic void onSubscribe(@NonNull Disposable d) {Log.d(TAG, "onSubscribe");}@Overridepublic void onNext(@NonNull String s) {Log.d(TAG, "onNext -》 "+s);}@Overridepublic void onError(@NonNull Throwable e) {Log.d(TAG, "onError");}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}}

2. RxJava中的被观察者又是谁?

其实如果理解观察者设计模式的话,我们从MyObservable的具体代码中,我们也能猜到,真正的被观察者应该是ObservableEmitter对象,因为被观察者是发数据方,观察者监察被观察者发送的数据,而负责发送数据的是ObservableEmitter对象的onNext方法,因此被观察者应该是ObservableEmitter对象,后面我们会从源码中证实这一点。

   private class MySource implements ObservableOnSubscribe<String>{@Overridepublic void subscribe(@NonNull ObservableEmitter emitter) throws Throwable {emitter.onNext("sun");emitter.onNext("hao");}}

3. 观察者又是如何安插到被观察者中的?

上面我们提到RxJava通过subscribe方法订阅观察者,即通过subscribe方法将观察者安插到被观察者中。下面我们开始分析subscribe方法的源码,看看观察者(示例中的myObserver)如何被安插到被观察者中。

  1. 示例一的RxJava核心代码就是下面一句
Observable.create(mySource).subscribe(myObserver);
  1. subscribe方法是属于哪个对象呢?
    很明显,subscribe方法的对象是由Observable.create()方法创建的,下面我们看看Observable.create方法的源码:
    public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {Objects.requireNonNull(source, "source is null");return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));}

create方法传入了我们自定义的MySource对象,返回时又被RxJavaPlugins.onAssembly方法处理了一下,我们来看看RxJavaPlugins.onAssembly方法:

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {Function<? super Observable, ? extends Observable> f = onObservableAssembly;if (f != null) {return apply(f, source);}return source;}

可以看到如果onObservableAssembly为null,那么onAssembly方法就什么都不做,如果不为null就对source进行处理一下再返回,其实这就是RxJava的Hook机制,也就是通过onAssembly方法拦截一下被观察者,先对被观察者处理一波,后面我们会详细介绍,这儿我们忽略这个hook即可。
那么Observable.create()方法创建的就是ObservableCreate对象,即该案例中subscribe方法是ObservableCreate对象的方法。ObservableCreate的类图如下,subscribe方法被抽象类Observable所实现。
在这里插入图片描述
3. subscribe方法的源码如下

public final void subscribe(@NonNull Observer<? super T> observer) {Objects.requireNonNull(observer, "observer is null");try {observer = RxJavaPlugins.onSubscribe(this, observer);Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");subscribeActual(observer); //这个是核心方法} catch (NullPointerException e) { // NOPMDthrow e;} catch (Throwable e) {Exceptions.throwIfFatal(e);// can't call onError because no way to know if a Disposable has been set or not// can't call onSubscribe because the call might have set a Subscription alreadyRxJavaPlugins.onError(e);NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");npe.initCause(e);throw npe;}}

看源码我们要先梳理主线,一定不要想着一开始每一句都看懂(因为这样大概率会看晕),一些支线的细节代码可以先不关注,重点是关注主线代码
大概一看subscribe方法中核心方法是subscribeActual(observer)方法,上面我们分析了subscribe方法属于ObservableCreate对象,那么subscribeActual方法自然也是属于ObservableCreate类,我们去ObservableCreate类中查看subscribeActual方法的源码如下:

 @Overrideprotected void subscribeActual(Observer<? super T> observer) {CreateEmitter<T> parent = new CreateEmitter<>(observer);//这里创建了CreateEmitter对象,并将我们的观察者传进去了observer.onSubscribe(parent);//观察者执行onSubscribe方法。这里就能明白为什么观察者都是先执行onSubscribe方法了try {source.subscribe(parent);//这里的source是谁呢?} catch (Throwable ex) {Exceptions.throwIfFatal(ex);parent.onError(ex);}}

subscribeActual方法中首先创建了CreateEmitter对象,并将我们自定义的观察者聚合进去。其实,从这就证实了,CreateEmitter对象是被观察者,CreateEmitter通过构造函数将观察者注入,如此以来,观察者就被安插进了被观察者之中,具体如何监控被观察者,我们后面再说。
其次,subscribeActual方法不管三七二十一先执行观察者的onSubscribe方法,这里也能说明为什么RxJava都是先执行onSubscribe方法。
最后,subscribeActual方法执行source.subscribe(parent),将创建的被观察者即CreateEmitter对象parent传递了出去,那么这里的source又是谁呢?我们看看源码中source赋值的地方

public final class ObservableCreate<T> extends Observable<T> {final ObservableOnSubscribe<T> source;public ObservableCreate(ObservableOnSubscribe<T> source) {this.source = source;//通过ObservableCreate构造函数传入source的值}@Overrideprotected void subscribeActual(Observer<? super T> observer) {CreateEmitter<T> parent = new CreateEmitter<>(observer);observer.onSubscribe(parent);try {source.subscribe(parent);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);parent.onError(ex);}}
}

从源码中可以看到source是创建ObservableCreate对象的时候传入的,上面分析subscribe方法是属于哪个对象的时候,我们就看到ObservableCreate对象是由Observable.create创建的

    Observable.create(mySource).subscribe(myObserver);public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {Objects.requireNonNull(source, "source is null");return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));//这里的source就是我们自定的mySource}private class MySource implements ObservableOnSubscribe<String>{@Overridepublic void subscribe(@NonNull ObservableEmitter emitter) throws Throwable {emitter.onNext("sun");emitter.onNext("hao");}}

这时我们再看subscribeActual方法源码

 @Overrideprotected void subscribeActual(Observer<? super T> observer) {CreateEmitter<T> parent = new CreateEmitter<>(observer);observer.onSubscribe(parent);try {source.subscribe(parent);//source就是我们自定义MySource对象} catch (Throwable ex) {Exceptions.throwIfFatal(ex);parent.onError(ex);}}

source.subscribe(parent)中的source就是我们自定义的MySource对象,通过subscribe方法将subscribeActual方法中创建的被观察者CreateEmitter对象parent传递到MySource中。此时,我们再看被观察者发送数据时,观察者是如何监察到的

private class MySource implements ObservableOnSubscribe<String>{@Overridepublic void subscribe(@NonNull ObservableEmitter emitter) throws Throwable {emitter.onNext("sun");//emitter就是subscribeActual方法中创建的CreateEmitter对象emitter.onNext("hao");}}

被观察者emitter通过onNext发送数据,我们来看看onNext方法的源码

static final class CreateEmitter<T>extends AtomicReference<Disposable>implements ObservableEmitter<T>, Disposable {private static final long serialVersionUID = -3434801548987643227L;final Observer<? super T> observer;CreateEmitter(Observer<? super T> observer) {this.observer = observer;}@Overridepublic void onNext(T t) {if (t == null) {onError(ExceptionHelper.createNullPointerException("onNext called with a null value."));return;}if (!isDisposed()) {observer.onNext(t);//观察者接收到被观察者发送的数据}}}

可以看到当被观察者对象调用onNext方法时,安插在内部的观察者会接收到数据,而观察者的onNext方法就是我们自定义实现的

private class MyObserver implements Observer<String>{@Overridepublic void onSubscribe(@NonNull Disposable d) {Log.d(TAG, "onSubscribe");}@Overridepublic void onNext(@NonNull String s) {Log.d(TAG, "onNext -》 "+s);//监察到被观察者发送的数据}@Overridepublic void onError(@NonNull Throwable e) {Log.d(TAG, "onError");}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}}

4. 示例一RxJava源码整体关系类图

在这里插入图片描述
ObservableCreate就是一个中转器,它把MySource和MyObserver中转出去,之后观察者MyObserver聚合(安插)到被观察者CreateEmitter对象,最后,被观察者对象又通过subscribe方法被传递给MySource对象。

4. RxJava的Hook机制

  1. 上面我们也提到了该机制,Observable的create方法源码如下
    public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {Objects.requireNonNull(source, "source is null");return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));}

创建ObservableCreate对象之后,又被RxJavaPlugins.onAssembly方法处理一下才返回,这个就是RxJava的hook机制,其实就是将创建的被观察者先自己玩一把,再给观察者处理。

  1. RxJavaPlugins.onAssembly方法源码如下:
static volatile Function<? super Observable, ? extends Observable> onObservableAssembly;public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {Function<? super Observable, ? extends Observable> f = onObservableAssembly;if (f != null) {//如果设置了onObservableAssembly,则意味着开启hookreturn apply(f, source);}return source;}static <T, R> R apply(@NonNull Function<T, R> f, @NonNull T t) {try {return f.apply(t);} catch (Throwable ex) {throw ExceptionHelper.wrapOrThrow(ex);}}

通过如下方法给onObservableAssembly赋值

   public static void setOnObservableAssembly(@Nullable Function<? super Observable, ? extends Observable> onObservableAssembly) {if (lockdown) {throw new IllegalStateException("Plugins can't be changed anymore");}RxJavaPlugins.onObservableAssembly = onObservableAssembly;}

使用RxJava的hook机制示例

 RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {@Overridepublic Observable apply(Observable observable) throws Throwable {return null;//在这拦截住Observable,自己先处理一波}});

三、RxJava使用示例二

public class MainActivity extends AppCompatActivity {private final static String TAG = MainActivity.class.getSimpleName();@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {emitter.onNext("sun");emitter.onNext("hao");}}).map(new Function<String, Integer>() {@Overridepublic Integer apply(String s) throws Throwable {Integer ans = 0;if (s.equals("sun")){ans = 66;}else if (s.equals("hao")){ans = 99;}else{ans = 88;}return ans;}}).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {Log.d(TAG, "onSubscribe");}@Overridepublic void onNext(@NonNull Integer s) {Log.d(TAG, "onNext -》 "+s);}@Overridepublic void onError(@NonNull Throwable e) {Log.d(TAG, "onError");}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}});}
}

运行结果如下:

2023-11-17 04:58:08.660 16545-16545/com.xiaomi.myaxjava D/MainActivity: onSubscribe
2023-11-17 04:58:08.660 16545-16545/com.xiaomi.myaxjava D/MainActivity: onNext -66
2023-11-17 04:58:08.660 16545-16545/com.xiaomi.myaxjava D/MainActivity: onNext -99

四、示例二源码分析

示例二比示例一稍微复杂一点,加了map操作符,map操作符的作用是将被观察者发送的数据先做一波处理再给观察者。有了示例一源码的基础,分析示例二源码也比较简单了。在分析该部分源码前建议先去看一下我之前写的装饰模式,RxJava就是通过装饰模式将一个个的操作符装饰为各种类型的观察者(都是Observer的子类)。

1. 同样从subscribe方法看起,看看观察者如何被安插到被观察者中

Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {emitter.onNext("sun");emitter.onNext("hao");}}).map(new Function<String, Integer>() {@Overridepublic Integer apply(String s) throws Throwable {Integer ans = 0;if (s.equals("sun")){ans = 66;}else if (s.equals("hao")){ans = 99;}else{ans = 88;}return ans;}}).subscribe(new Observer<Integer>() {//subscribe方法是哪个对象的方法?很明显是map方法返回的对象@Overridepublic void onSubscribe(@NonNull Disposable d) {Log.d(TAG, "onSubscribe");}@Overridepublic void onNext(@NonNull Integer s) {Log.d(TAG, "onNext -》 "+s);}@Overridepublic void onError(@NonNull Throwable e) {Log.d(TAG, "onError");}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}});

2. subscribe方法源码如下

public final void subscribe(@NonNull Observer<? super T> observer) {Objects.requireNonNull(observer, "observer is null");try {observer = RxJavaPlugins.onSubscribe(this, observer);Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");subscribeActual(observer);//该方法是被哪个类实现的?很明显subscribe方法别哪个类实现,该方法就是被哪个方法实现} catch (NullPointerException e) { // NOPMDthrow e;} catch (Throwable e) {Exceptions.throwIfFatal(e);// can't call onError because no way to know if a Disposable has been set or not// can't call onSubscribe because the call might have set a Subscription alreadyRxJavaPlugins.onError(e);NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");npe.initCause(e);throw npe;}}

3. subscribeActual方法是被map方法返回的对象实现,我们来看看map方法的源码

    public final <R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {Objects.requireNonNull(mapper, "mapper is null");//示例一中讲解过RxJavaPlugins.onAssembly是hook函数,在此我们不考虑,那么返回的就是ObservableMap对象return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper));}

4. 因此接着第2步去ObservableMap类中查看subscribeActual方法的源码

  @Overridepublic void subscribeActual(Observer<? super U> t) {source.subscribe(new MapObserver<T, U>(t, function));//这里面的source和function是什么?我们后面分析}

可以看到subscribeActual方法中将我们自定义的观察者送入了MapObserver的构造函数,这儿就是采用装饰模型将我们自定义的观察者进行装饰,形成一个新的观察者。我们看下MapObserver的源码

static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {final Function<? super T, ? extends U> mapper;MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {super(actual); //把我们自定义的观察者传递给了MapObserver的父类this.mapper = mapper;}@Overridepublic void onNext(T t) {...}}
}

我们接着看MapObserver的父类BasicFuseableObserver是如何处理我们自定义的观察者的

public abstract class BasicFuseableObserver<T, R> implements Observer<T>, QueueDisposable<R> {/** The downstream subscriber. */protected final Observer<? super R> downstream;/*** Construct a BasicFuseableObserver by wrapping the given subscriber.* @param downstream the subscriber, not null (not verified)*/public BasicFuseableObserver(Observer<? super R> downstream) {this.downstream = downstream;//将我们自定义的观察者赋值给了downstream}
}

到这儿我们知道自定义的观察者最终被传递给了downstream,下面我们分析ObservableMap类中的source和function是什么?

5. 第4步提到的ObservableMap类中的source和function是什么?我们看下源码

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {final Function<? super T, ? extends U> function;// source和function都是通过ObservableMap的构造函数传入的public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {super(source);this.function = function;}@Overridepublic void subscribeActual(Observer<? super U> t) {source.subscribe(new MapObserver<T, U>(t, function));}
}

source和function都是通过ObservableMap的构造函数传入的,第3步我们分析map源码的时候看到创建了ObservableMap对象

   public final <R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {Objects.requireNonNull(mapper, "mapper is null");//这里的mapper就是我们自定义的Function//那这里的this是谁呢?很明显map方法属于哪个对象,这个this是那个对象return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper));}

6. 第5步提到的map方法属于哪个对象呢?

Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {emitter.onNext("sun");emitter.onNext("hao");}}).map(new Function<String, Integer>() {//map方法是Observable.create方法返回的对象@Overridepublic Integer apply(String s) throws Throwable {Integer ans = 0;if (s.equals("sun")){ans = 66;}else if (s.equals("hao")){ans = 99;}else{ans = 88;}return ans;}}).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {Log.d(TAG, "onSubscribe");}@Overridepublic void onNext(@NonNull Integer s) {Log.d(TAG, "onNext -》 "+s);}@Overridepublic void onError(@NonNull Throwable e) {Log.d(TAG, "onError");}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}});

我们接着看Observable.create方法的源码

public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {Objects.requireNonNull(source, "source is null");//返回的是一个ObservableCreate的对象,这里的source是我们自定义的ObservableOnSubscribe对象return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));}

到这儿我们知道ObservableMap类中的function就是我们在map方法中传入的自定义Function匿名类,source就是我们通过Observable.create方法创建的ObservableCreate对象

7. 我们接着第4步继续分析

  @Overridepublic void subscribeActual(Observer<? super U> t) {//source就是我们通过Observable.create方法创建的ObservableCreate对象//function就是我们在map方法中传入的自定义Function匿名类source.subscribe(new MapObserver<T, U>(t, function));}

我们再来看ObservableCreate类中实现的subscribe方法,ObservableCreate继承自Observable,ObservableCreate实现的subscribe也是继承自Observable,因此我们去Observable中查看subscribe方法的源码

public final void subscribe(@NonNull Observer<? super T> observer) {Objects.requireNonNull(observer, "observer is null");try {observer = RxJavaPlugins.onSubscribe(this, observer);Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");subscribeActual(observer);//把装饰后的MapObserver对象通过subscribeActual方法传递出去} catch (NullPointerException e) { // NOPMDthrow e;} catch (Throwable e) {Exceptions.throwIfFatal(e);// can't call onError because no way to know if a Disposable has been set or not// can't call onSubscribe because the call might have set a Subscription alreadyRxJavaPlugins.onError(e);NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");npe.initCause(e);throw npe;}}

我们接着看ObservableCreate类中的subscribeActual方法源码,这个时候就和之前示例一分析的源码接上了,就不再详细分析

    @Overrideprotected void subscribeActual(Observer<? super T> observer) {//创建CreateEmitter对象,并将MapObserver对象传递进去CreateEmitter<T> parent = new CreateEmitter<>(observer);observer.onSubscribe(parent);try {//这里的source就是我们传入Observable.create方法的自定义ObservableOnSubscribe匿名类//那么subscribe方法就是我们外面自己实现的方法source.subscribe(parent);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);parent.onError(ex);}}
			@Overridepublic void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {//emitter就是上面subscribeActual方法中实例化的CreateEmitter对象emitter.onNext("sun");emitter.onNext("hao");}

8. 我们看看ObservableEmitter发送的数据如何被map处理,然后又被观察者接收

ObservableEmitter通过onNext发送数据,以emitter.onNext(“sun”)为例,我们看下onNext的源码

 @Overridepublic void onNext(T t) {if (t == null) {onError(ExceptionHelper.createNullPointerException("onNext called with a null value."));return;}if (!isDisposed()) {observer.onNext(t);//这里的observer就是ObservableMap对象,t是我们发送的数据“sun”}}

接着我们再看下ObservableMap中的onNext方法源码

		@Overridepublic void onNext(T t) {// t = "sun"if (done) {return;}if (sourceMode != NONE) {downstream.onNext(null);return;}U v;try {//这里的mapper就是我们外面在map方法中传入的自定义Function,在此map先对发送的数据“sun”做了处理v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");} catch (Throwable ex) {fail(ex);return;}//downstream就是我们外面subscribe方法中传入的自定义Observer对象downstream.onNext(v);//这时候拿到数据是经过map处理后的数据}@Overridepublic Integer apply(String s) throws Throwable {Integer ans = 0;if (s.equals("sun")){ans = 66;}else if (s.equals("hao")){ans = 99;}else{ans = 88;}return ans;}

至此,整个示例二的源码我们就分析完了

五、后记

这也是我们第一次输出分析源码的文章,一是帮助自己理清楚RxJava源码流程,二是希望能够对其他博友有所帮忙,但第一次写源码分析类的文章,确实经验不足,感觉写的还是有点混乱,没有写出自己的预期,后面继续努力。

总之,就针对RxJava的源码,个人任务掌握一个切入点就对了,那就是从subscribe方法入手,看看我们自定义的观察者如何与发送数据的被观察者关联在一起的。

相关文章:

一看就懂的RxJava源码分析

一看就懂的RxJava源码分析 前言零、观察模式简介一、RxJava使用示例一二、示例一源码分析0. 示例一代码分解1. RxJava中的观察者是谁&#xff1f;2. RxJava中的被观察者又是谁&#xff1f;3. 观察者又是如何安插到被观察者中的&#xff1f;4. 示例一RxJava源码整体关系类图4. R…...

halcon中灰度图自动二值化

1、首先图片要先形成灰度图&#xff0c;如果下一句是二值化的那就删掉 dev_clear_window() read_image(Image, D:/desktop/tmpp/微信图片_20231201184731.png) * 转为灰度图 rgb1_to_gray(Image, GrayImage) 2、双击图像变量中的GrayImage 3、工具栏点击打开灰度直方图按钮&…...

Mybatis-Plus实体类注解怎么用

TableName 用在实体类上&#xff0c;指定实体类对应的表名称。 TableName(value "表名") TableId 用在属性上&#xff0c;指定主键字段的名称和类型。主键字段的名称一般是id&#xff0c;类型为自增。 TableId(value "id", type IdType.AUTO) TableFi…...

我是如何写作的?

以前是如何写作的 从小学三年级开始学写作文&#xff0c;看的作文书&#xff0c;老师布置作文题目&#xff0c;内容我都是自己写的。那时会积累一些好词&#xff0c;听到什么好词就记住了。并没有去观察什么&#xff0c;也没有好好花心思在写作上。总觉得我写的作文与真正好的…...

绩效考核实施之——如何做好部门间绩效的平衡?

绩效考核是企业人力资源管理的难点&#xff0c;而绩效考核的公正往往是绩效考核成败的关键&#xff0c; 如果绩效考核的不公平不合理&#xff0c;极易带来企业人员的负面情绪&#xff0c;甚至引起人才的流失。想要保证绩效考核的公平性&#xff0c;就要做好绩效的平衡&#xf…...

全新付费进群系统源码 完整版教程

首先准备域名和服务器 安装环境&#xff1a;Nginx1.18 MySQL 5.6 php7.2 安装扩展sg11 伪静态thikphp 后台域名/admin账号admin密码123456 代理域名/daili账号admin密码123456 一、环境配置 二、建站上传源代码解压 上传数据库配置数据库信息 三、登入管理后台 后台域名/ad…...

拉新地推任务管理分销助手公众号开发

拉新地推任务管理分销助手公众号开发 拉新地推任务管理分销助手公众号开发功能可以帮助企业进行地推任务的管理和分销助手的开发。以下是一些可能的功能介绍&#xff1a; 任务管理&#xff1a;这个功能可以让企业创建、分配和管理地推任务。管理员可以创建地推任务&#xff0c…...

MySQL三范式

欢迎大家到我的博客浏览。MySQL三范式 | YinKais Blog 简介 三大范式是 MySQL 数据库设计表结构所遵循的规范和指导方法&#xff0c;目的是为了减少冗余&#xff0c;建立结构合理的数据库&#xff0c;从而提高数据存储和使用的性能。 三大范式之间是有依赖关系的&#xff0c…...

玩转微服务-技术篇-JSDOC教程

一. 简介 JSDoc 3 是一个用于 JavaScript 的API文档生成器&#xff0c;类似于 Javadoc 或 phpDocumentor。可以将文档注释直接添加到源代码中。JSDoc 工具将扫描您的源代码并为您生成一个 HTML 文档网站。 JSDoc 是一种用于 JavaScript 代码文档注释的标记语言和工具。它不仅…...

Android12之logcat日志显示颜色和时间(一百六十七)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a;多媒…...

【Windows】内网穿透实现hMailServer远程发送邮件

目录 前言1. 安装hMailServer2. 设置hMailServer3. 客户端安装添加账号4. 测试发送邮件5. 安装cpolar6. 创建公网地址7. 测试远程发送邮件8. 固定连接公网地址9. 测试固定远程地址发送邮件 前言 hMailServer 是一个邮件服务器,通过它我们可以搭建自己的邮件服务,通过cpolar内网…...

深信服技术认证“SCSA-S”划重点:SQL注入漏洞

为帮助大家更加系统化地学习网络安全知识&#xff0c;以及更高效地通过深信服安全服务认证工程师考核&#xff0c;深信服特别推出“SCSA-S认证备考秘笈”共十期内容&#xff0c;“考试重点”内容框架&#xff0c;帮助大家快速get重点知识~ 划重点来啦 深信服安全服务认证工程师…...

Leetcode-二叉树oj题

1.二叉树的前序遍历 144. 二叉树的前序遍历https://leetcode.cn/problems/binary-tree-preorder-traversal/这个题目在遍历的基础上还要求返回数组&#xff0c;数组里面按前序存放二叉树节点的值。 既然要返回数组&#xff0c;就必然要malloc一块空间&#xff0c;那么我们需…...

软件磁盘阵列(software RAID)

RAID-0 等量模式&#xff08;各个磁盘平均存放文件&#xff09; RAID-1 镜像模式&#xff08;一个文件存放两个磁盘&#xff09; RAID 01 RAID 10 组合模式 RAID 5 三块以上磁盘&#xff0c;记录文件和同位码&#xff08;存放不通磁盘&#xff0c;通过同…...

浏览器安全攻击与防御

前言 浏览器是我们访问互联网的主要工具&#xff0c;也是我们接触信息的主要渠道。但是&#xff0c;浏览器也可能成为攻击者利用的突破口&#xff0c;通过各种手段&#xff0c;窃取或篡改我们的数据&#xff0c;甚至控制我们的设备.本文将向大家介绍一些常见的浏览器安全的攻击…...

vue生命周期、工程化开发和脚手架

1、前言 持续学习记录总结中&#xff0c;vue生命周期、工程化开发和脚手架 2、Vue生命周期 Vue生命周期&#xff1a;就是一个Vue实例从 创建 到 销毁 的整个过程。 生命周期四个阶段&#xff1a;① 创建 ② 挂载 ③ 更新 ④ 销毁 1.创建阶段&#xff1a;创建响应式数据 2.挂…...

文件搜索工具HoudahSpot mac中文版特点

HoudahSpot mac是一款文件搜索工具&#xff0c;它可以帮助用户快速准确地找到文件和文件夹&#xff0c;支持高级搜索和过滤&#xff0c;同时提供了多种视图和操作选项&#xff0c;方便用户进行文件管理和整理。 HoudahSpot mac软件特点 高级搜索和过滤功能&#xff1a;软件支持…...

maven中scope和optional区别

文章目录 1. Scope&#xff08;作用范围&#xff09;&#xff1a;2. Optional&#xff08;可选项&#xff09;&#xff1a; 1. Scope&#xff08;作用范围&#xff09;&#xff1a; Maven的<scope>元素用于指定依赖项的作用范围&#xff0c;即依赖项在构建和运行时的可见…...

《ChatGPT实操应用大全》探索无限可能

&#x1f5e3;️探索ChatGPT&#xff0c;开启无限可能&#x1f680; 文末有免费送书福利&#xff01;&#xff01;&#xff01; ChatGPT是人类有史以来最伟大的发明。他能写作、绘画、翻译、看病、做菜、编程、数据分析、制作视频、解高等数学题…&#xff0c;他会的技能…...

基于helm的方式在k8s集群中部署gitlab - 部署(一)

文章目录 1. 背景说明2. 你可以学到什么&#xff1f;3. 前置条件4. 安装docker服务&#xff08;所有节点&#xff09;5. 部署k8s集群5.1 系统配置&#xff08;所有节点&#xff09;5.2 安装kubelet组件(所有节点)5.2.1 编写kubelet源5.2.2 安装kubelet5.2.3 启动kubelet 5.3 集…...

flask web开发学习之初识flask(二)

文章目录 一、创建程序实例并注册路由1. 为视图绑定绑定多个URL2. 动态URL 二、启动开发服务器1. 自动发现程序实例2. 管理环境变量3. 使用pycharm运行服务器4. 更多的启动选项5. 设置运行环境6. 调试器7. 重载器 一、创建程序实例并注册路由 app.py # 从flask包中导入flask类…...

利用异或、取反、自增bypass_webshell_waf

目录 引言 利用异或 介绍 eval与assert 蚁剑连接 进阶题目 利用取反 利用自增 引言 有这样一个waf用于防御我们上传的文件&#xff1a; function fun($var): bool{$blacklist ["\$_", "eval","copy" ,"assert","usort…...

K8s Docker实践三

单主机创建多个node 在Mac桌面上部署多个Kubernetes节点可以使用Minikube工具。Minikube是一个轻量级的Kubernetes工具&#xff0c;它可以在单个主机上创建一个虚拟集群。以下是在Mac桌面上使用Minikube部署多个Kubernetes节点的步骤&#xff1a; 安装Minikube&#xff0c;运…...

记录 | pip加速配置

以下方法不仅适用于linux&#xff0c;也适用于mac 临时加速配置&#xff1a; pip install -i https://pypi.douban.com/simple --trusted-host pypi.douban.com matplotlib3.4.0其中可选源有&#xff1a; https://pypi.douban.com/simple http://mirrors.aliyun.com/pypi/sim…...

HarmonyOS开发—Arkts循环渲染(ForEach)深入运用详解【鸿蒙专栏-16】

文章目录 ArkTS ForEach接口详解与应用示例ForEach接口概述介绍接口描述参数说明键值生成规则默认规则组件创建规则首次渲染非首次渲染使用场景高级用法条件渲染逻辑LazyForEach的性能优化渲染结果预期ForEach的错误使用案例与性能降低渲染结果非预期渲染性能降低结语ArkTS For…...

uniapp挽留提示2.0

项目需求&#xff1a;有时候挽留的ui是全屏的&#xff0c;用page-container也可以。后来产品提了个问题&#xff0c;手机侧滑的时候没那么顺畅&#xff08;就是一用侧滑&#xff0c;就显示出来&#xff0c;产品要的方案是如下图&#xff0c;emmm大概是这个意思&#xff09; 后面…...

电源控制系统架构(PCSA)之系统分区电压域

目录 4.1 电压域 4.1.1 系统逻辑 4.1.2 Always-On逻辑 4.1.3 处理器Clusters 4.1.4 图形处理器 4.1.5 其他功能 4.1.6 SoC分区示例 本章描述基于Arm组件的SoC划分为电压域和电源域。 所描述的选择并不详尽&#xff0c;只是可能性的一个子集。目的是描述基于Arm组件的SoC…...

[Linux] 正则表达式及grep和awk

一、正则表达式 1.1 什么是正则表达式 正则表达式是一种用于匹配和操作文本的强大工具&#xff0c;它是由一系列字符和特殊字符组成的模式&#xff0c;用于描述要匹配的文本模式。 正则表达式可以在文本中查找、替换、提取和验证特定的模式。 正则表达式和通配符的区别 正则…...

ssm+java车辆售后维护系统 springboot汽车保养养护管理系统+jsp

以前汽车维修人员只是在汽车运输行业中从事后勤保障工作,随着我国经济的发展,汽车维修行业已经从原来的从属部门发展成了如今的功能齐备的独立企业。这种结构的转变,给私营汽修企业和个体汽修企业的发展带来了契机,私营企业和个体维修企业的加入也带动了整个汽修行业的整体水平…...

HNU练习七 字符串编程题7. 机器人游戏

【问题描述】 有人建造了一些机器人&#xff0c;并且将他们放置在包含n个单元的一维网格上&#xff0c;一个长度为n的字符串s代表了他们的编排方式&#xff0c;字符串中的字符既可以是.&#xff0c;也可以是0~9之间的一个数字字符&#xff0c;字符.表示开始时在相应的单元上无机…...