RxJava/RxAndroid的操作符使用(二)
文章目录
- 一、创建操作
- 1、基本创建
- 2、快速创建
- 2.1 empty
- 2.2 never
- 2.3 error
- 2.4 from
- 2.5 just
- 3、定时与延时创建操作
- 3.1 defer
- 3.2 timer
- 3.3 interval
- 3.4 intervalRange
- 3.5 range
- 3.6 repeat
- 二、过滤操作
- 1、skip/skipLast
- 2、debounce
- 3、distinct——去重
- 4、elementAt——获取指定位置元素
- 5、filter——过滤
- 6、first——取第一个数据
- 7、last——取最后一个
- 8、ignoreElements & ignoreElement(忽略元素)
- 9、ofType(过滤类型)
- 10、sample
- 11 、take & takeLast
- 三、组合可观察对象操作符
- 1、CombineLatest
- 2、merge
- 3、zip
- 4、startWith
- 5、join
- 四、变化操作符
- 1、map
- 2、flatMap / concatMap
- 3、scan
- 4、buffer
- 5、window
- 关于RxJava/RxAndroid的全部文章
一、创建操作
1、基本创建
create创建一个基本的被观察者
在使用create()
操作符时,最好在被观察者的回调函数subscribe()
中加上isDisposed()
,以便在观察者断开连接的时候不在执行subscribe()函数中的相关逻辑,避免意想不到的错误出现。
Observable.create(new ObservableOnSubscribe<Object>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Object> emitter) throws Throwable {try {if(!emitter.isDisposed()){emitter.onNext("a");emitter.onNext("b");}} catch (Exception e) {emitter.onError(e);}}}).subscribe(value -> Log.e(TAG, "onNext: " + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));
2、快速创建
完整&快速创建被观察者、数组、集合遍历
操作符 | 作用 |
---|---|
empty | 创建一个只发送 onComplete 事件的 Observable。 |
never | 创建一个不发送任何事件的 Observable。 |
error | 创建一个只发送 onError 事件的 Observable。 |
from | 操作符用于将其他对象或数据结构转换为 Observable,可发送不同类型的数据流 |
just | 操作符将对象或一组对象转换为 Observable,并立即发送这些对象,没有延迟。 |
2.1 empty
创建一个不发射任何items
但正常终止的 Observable——create an Observable that emits no items but terminates normally
Observable.empty().subscribe(value -> Log.e(TAG, "onNext: "+value ),error -> Log.e(TAG, "Error: "+error),()->Log.e(TAG,"onComplete"));
2.2 never
创建一个不发射任何items
且不会终止的 Observable——create an Observable that emits no items and does not terminate
Observable.never().subscribe(value -> Log.e(TAG, "onNext: " + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));
不发送任何事件
2.3 error
创建一个不发射任何items并以错误终止的 Observable——create an Observable that emits no items and terminates with an error
Observable.error(new Exception("ERROR")).subscribe(value -> Log.e(TAG, "onNext: " + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));
2.4 from
以fromAray举例:
Observable.fromArray(1,2,3,4,5).subscribe(value -> Log.e(TAG, "onNext: " + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));
2.5 just
Observable.just(1,2,3,4,5).subscribe(value -> Log.e(TAG, "onNext: " + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));
通过just()
创建传入Integer类型的参数构建Observable被观察者,相当于执行了onNext(1)~onNext(5),通过链式编程订阅观察者。注意just的数据一般不能超过10个。
注意,如果将 null
传递给 Just,它将返回一个将 null
作为项目发出的 Observable。不要错误地认为这将返回一个空的 Observable(根本不发出任何项目)
3、定时与延时创建操作
定时操作、周期性操作
操作符 | 作用 |
---|---|
defer | 直到有Observer观察者订阅时,才会通过Observeable的工厂方法动态创建Observeable,并且发送事件 |
timer | 用于延时发送,在给定的延迟后发出单个项目 |
interval | 它按照指定时间间隔发出整数序列,通常用于定时操作。 |
intervalRange | 类似于interval(),快速创建一个被观察者对象,指定时间间隔就发送事件,可以执行发送事件的数量 |
range | 它发出一个连续的整数序列,可以指定发送的次数 |
repeat | 重复发送指定次数的某个事件流 |
3.1 defer
直到有Observer观察者订阅时,才会通过Observeable的工厂方法动态创建Observeable,并且发送事件
defer
不会立即创建 Observable,而是等到观察者订阅时才动态创建,每个观察者都会得到一个新的 Observable 实例。
defer确保了Observable
代码在被订阅后才执行(而不是创建后立即执行)
Observable<Integer> integerObservable = Observable.defer(new Supplier<ObservableSource<? extends Integer>>() {@Overridepublic ObservableSource<? extends Integer> get() throws Throwable {int randomNumber = (int) (Math.random() * 100);return Observable.just(randomNumber);}});integerObservable.subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Throwable {Log.e(TAG, "第一次" + integer.toString());}});integerObservable.subscribe(integer -> Log.e(TAG, "第二次" + integer.toString()));
3.2 timer
构造方法如下:
timer(long delay, TimeUnit unit)timer(long delay, TimeUnit unit, Scheduler scheduler)
- delay:延时的时间,类型为Long;
- unit:表示时间单位,有TimeUnit.SECONDS等多种类型;
- scheduler:表示调度器,用于指定线程。
用于延时发送
final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Log.e(TAG, "timer:当前时间 ==" + dateFormat.format(System.currentTimeMillis()));Observable.timer(5, TimeUnit.SECONDS).subscribe(value -> Log.e(TAG, "timer:onNext ==" + dateFormat.format(System.currentTimeMillis())),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));
表示延迟5s后发送数据
3.3 interval
用于定时发送数据,快速创建Observable被观察者对象,每隔指定的时间就发送相应的事件,事件序列从0开始,无限递增1;
//在指定延迟时间后,每个多少时间发送一次事件
interval(long initialDelay, long period, TimeUnit unit)//在指定的延迟时间后,每隔多少时间发送一次事件,可以指定调度器
interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)//每间隔多少时间发送一次事件,使用默认的线程
Observable<Long> interval(long period, TimeUnit unit)//每间隔多少时间发送一次事件,可以指定调度器
interval(long period, TimeUnit unit, Scheduler scheduler)
- initialDelay: 表示延迟开始的时间,类型为
Long
- period:距离下一次发送事件的时间间隔,类型
Long
- unit:时间单位,有
TimeUnit.SECONDS
等多种类型; - scheduler:表示调度器,用于指定线程。
它会从0开始,然后每隔 1 秒发射一个递增的整数值
Observable.interval(1,3,TimeUnit.SECONDS).subscribe(value -> Log.e(TAG, "timer:onNext ==" + dateFormat.format(System.currentTimeMillis())),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));
定时发射指定的结果
// 创建一个每秒发射一个递增整数的 ObservableObservable<Long> intervalObservable = Observable.interval(1, TimeUnit.SECONDS);// 使用 map 操作符将递增的整数值映射为您想要的数据类型Observable<String> customObservable = intervalObservable.map(index -> "Data_" + index); // 映射为字符串 "Data_" + index// 订阅并输出结果customObservable.subscribe(data -> System.out.println("Received: " + data),error -> System.err.println("Error: " + error),() -> System.out.println("Completed"));
3.4 intervalRange
类似于interval(),快速创建一个被观察者对象,指定时间间隔就发送事件,可以执行发送事件的数量,数据依次递增1。
intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
- start:表示事件开始的数值大小,类型为Long
- count:表示事件执行的次数,类型为long,不能为负数;
- initialDelay:表示延迟开始的时间,类型为Long;
- period:距离下一次发送事件的时间间隔,类型Long;
- unit:时间单位,有TimeUnit.SECONDS等多种类型;
- scheduler:表示调度器,用于指定线程。
Observable.intervalRange(10, 3, 2, 1,TimeUnit.SECONDS,Schedulers.io()).subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));
3.5 range
Range 运算符按顺序发出一系列连续整数,可以在其中选择范围的起点及其长度。
它发出一个连续的整数序列,通常不涉及延迟。类似于intervalRange。
public static Observable<Integer> range(int start, int count)public static Observable<Long> rangeLong(long start, long count)
- start:事件开始的大小
- count:发送的事件次数
Observable.range(10,5).subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));
3.6 repeat
repeat操作符可以重复发送指定次数的某个事件流,repeat操作符默认在trampoline调度器上执行,repeat默认重复次数为Long.MAX_VALUE,可使用重载方法指定次数以及使用repeatUntil指定条件。
//一直重复Observable.fromArray(1, 2, 3, 4).repeat();//重复发送5次Observable.fromArray(1, 2, 3, 4).repeat(5);//重复发送直到符合条件时停止重复Observable.fromArray(1, 2, 3, 4).repeatUntil(new BooleanSupplier() {@Overridepublic boolean getAsBoolean() throws Exception {//自定判断条件,为true即可停止,默认为falsereturn false;}});
二、过滤操作
1、skip/skipLast
可以在Flowable,Observable中使用,表示源发射数据前,跳过多少个。
-
skip:
skip
操作符用于跳过 Observable 开头的一定数量的事件,然后开始发射后续的事件。它忽略序列的头部事件。例如,
observable.skip(3)
会跳过前面的 3 个事件,然后发射后续的事件。 -
skipLast:
skipLast
操作符用于跳过 Observable 末尾的一定数量的事件,然后发射前面的事件。它忽略序列的末尾事件。例如,
observable.skipLast(3)
会发射从序列开头到倒数第 3 个事件之前的事件,忽略了最后 3 个事件。
Observable<Integer> integerObservable = Observable.just(1, 2, 3, 4, 5, 6, 7, 8);integerObservable.skipLast(3).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Throwable {Log.e(TAG, "accept: " + integer);}});
换成skip
后结果如下:
2、debounce
仅当特定时间跨度过去而没有发出另一个项目时,才从 Observable 发出一个项目
Observable.create(emitter -> {emitter.onNext(1);Thread.sleep(1_500);emitter.onNext(2);Thread.sleep(500);emitter.onNext(3);Thread.sleep(2000);emitter.onNext(4);emitter.onComplete();
}).subscribeOn(Schedulers.io()).debounce(1,TimeUnit.SECONDS).blockingSubscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete")
);
debounce(1, TimeUnit.SECONDS)
表示将事件流中的事件按照时间窗口的方式进行过滤。具体含义是,如果在连续的 1 秒内没有新的事件发射,那么才会将最后一个事件传递给观察者,否则会丢弃之前的事件。
结合图像理解,红色线条为debounce
监听的发射节点,也就是每隔一秒发送一次数据。
在0s时发送了1。
在1s时由于没有数据,就没有发送数据。
在1s—2s期间产生了两次数据,分别是2和3。但是debounce
只会将距离2s最近一次的数据发送。因此2被不会发送出来。
3、distinct——去重
可作用于Flowable,Observable,去掉数据源重复的数据。
Observable.just(1,2,3,1,2,3,4).distinct().subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));
distinctUntilChanged()去掉相邻重复数据。
Observable.just(1,3,3,2,2,3,4).distinctUntilChanged().subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));//还可以指定重复条件Observable.just(1,3,3,2,2,3,4).distinctUntilChanged(new Function<Integer, Boolean>() {@Overridepublic Boolean apply(Integer integer) throws Throwable {return integer>3;}}).subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));
4、elementAt——获取指定位置元素
//获取索引为1的元素,如果不存在返回Error
Observable.just("a","b","c","d","e").elementAt(1,"Error").subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error),
);
5、filter——过滤
用于过滤指定的发射元素。
Observable.just(1, 2, 3, 4, 5, 6).filter(new Predicate<Integer>() {@Overridepublic boolean test(Integer integer) throws Throwable {return (integer % 2) != 0;}}).subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));
6、first——取第一个数据
//不存在则返回100
Observable.just(1, 2, 3, 4, 5, 6).first(100).subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error));
7、last——取最后一个
last、lastElement、lastOrError与fist、firstElement、firstOrError相对应。
Observable.just(1, 2, 3, 4, 5, 6).last(100).subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error));
8、ignoreElements & ignoreElement(忽略元素)
ignoreElements 作用于Flowable
、Observable
。ignoreElement作用于Maybe
、Single
。两者都是忽略掉数据,不发射任何数据,返回完成或者错误时间。
9、ofType(过滤类型)
作用于Flowable、Observable、Maybe,过滤选择类型。
Observable.just(1, 2, 3, 4.4, 5.5, 6.6).ofType(Integer.class).subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error));
10、sample
debounce
:它等待一段时间,如果在这段时间内没有新事件到达,它会发射最后一个事件。它用于处理高频率事件流,例如用户输入,以确保只处理用户停止输入后的事件。debounce
等待事件流静止,然后发射最后一个事件。sample
:它按照固定的时间间隔从事件流中抽样一个事件,并发射该事件。它用于定期采样事件流,例如从传感器数据中每隔一段时间获取一次数据。sample
定期获取事件,无论事件流是否活跃。
Observable<Integer> observable = Observable.create(emitter -> {emitter.onNext(1);Thread.sleep(1_500);emitter.onNext(2);Thread.sleep(500);emitter.onNext(3);Thread.sleep(2000);emitter.onNext(4);emitter.onComplete();});observable.sample(1, TimeUnit.SECONDS).subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error));
产生的数据在红线处发送
1在第1s时被发送,2在第2s时被发送,3在第3s时被发送,由于4还未在第5s时就已经onComplete
所以4无法被发送
11 、take & takeLast
作用于Flowable、Observable。take发射前n个元素。takeLast发射后n个元素。
Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);source.take(4).subscribe(value-> Log.e(TAG, "timer:onNext ==" + value));//打印:1 2 3 4source.takeLast(4).subscribe(value-> Log.e(TAG, "timer:onNext ==" + value));//打印:7 8 9 10
三、组合可观察对象操作符
操作符 | 作用 |
---|---|
combineLatest | 用于将多个 Observable 中最新的事件进行组合,并生成一个新的事件。 |
merge | 用于将多个 Observable 合并成一个单一的 Observable,按照它们发射事件的顺序合并。 |
zip | 用于一一配对多个 Observable 发射的事件,只有当所有 Observable 都有事件时才生成新事件。 |
startWith | 用于在一个 Observable 发射的事件前插入一个或多个初始事件。 |
join | 用于将两个 Observable 的事件按照时间窗口的方式进行组合。 |
1、CombineLatest
通过指定的函数将每个 Observable 发出的最新项目组合在一起,并根据该函数的结果发出项目
combineLatest
用于将多个 Observable 中最新的事件进行组合,并生成一个新的事件。- 当任何一个 Observable 发射新数据时,都会生成新的组合事件。
- 适用于需要及时反应多个数据源最新值变化的情况。
Observable<Integer> source1 = Observable.just(1, 2, 3);
Observable<String> source2 = Observable.just("A", "B", "C");
Observable<Boolean> source3 = Observable.just(true, false, true);Observable<String> combined = Observable.combineLatest(source1,source2,source3,(integer, string, aBoolean) -> integer + " " + string + " " + aBoolean
);combined.subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete")
);
2、merge
merge
用于将多个 Observable 合并成一个单一的 Observable,按照它们发射事件的顺序合并。merge
不会进行事件的组合,只是合并多个 Observable 的事件。- 适用于需要将多个 Observable 的事件合并成一个流的情况。
注意:merge只能合并相同类型的Observable
Observable<Integer> source1 = Observable.just(1, 2, 3);Observable<Integer> source2 = Observable.just(4,5,6);Observable<Integer> source3 = Observable.just(7,8,9);Observable<Integer> combined = Observable.merge(source1,source2,source3);combined.subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));
3、zip
zip
用于一一配对多个 Observable 发射的事件,只有当所有 Observable 都有事件时才生成新事件。zip
会等待所有 Observable 都有事件后,才会执行组合函数生成新事件。- 适用于需要将多个数据源的事件一一配对的情况。
Observable<Integer> source1 = Observable.just(1, 2, 3);Observable<String> source2 = Observable.just("A", "B", "C");Observable<Boolean> source3 = Observable.just(true, false, true);Observable<String> combined = Observable.zip(source1,source2,source3,(integer, string, aBoolean) -> integer + " " + string + " " + aBoolean);combined.subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));
4、startWith
startWith
用于在一个 Observable 发射的事件前插入一个或多个初始事件。- 这些初始事件会作为 Observable 的开头。
- 适用于需要在 Observable 发射事件前添加一些初始数据的情况。
Observable<Integer> source = Observable.just(1, 2, 3);Observable<Integer> withStart = source.startWithArray(100,200);withStart.subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));
5、join
join
用于将两个 Observable 的事件按照时间窗口的方式进行组合。- 可以为每个 Observable 设置时间窗口,然后在这些窗口内组合事件。
- 适用于需要在时间窗口内组合两个 Observable 的事件的情况。
时间窗口:
- 固定时间窗口:定义一个固定的时间段,将在该时间段内的事件分为一个时间窗口。
- 延时时间窗口:定义一个时间段,但在事件发生后延迟一段时间后才分为时间窗口。
- 动态时间窗口:根据事件的特定条件动态地定义时间窗口。
Observable<Integer> left = Observable.just(1, 2, 3);Observable<Integer> right = Observable.just(10, 20, 30);left.join(right,leftDuration -> Observable.timer(1, TimeUnit.SECONDS),rightDuration -> Observable.timer(1, TimeUnit.SECONDS),(leftValue, rightValue) -> "Left: " + leftValue + ", Right: " + rightValue).subscribe(value -> Log.e(TAG, "timer:onNext ==" + value));
在这个示例中,我们定义了以下时间窗口规则:
- 左边的时间窗口规则:
leftDuration -> Observable.timer(1, TimeUnit.SECONDS)
表示在左边的事件后等待 1 秒后生成一个时间窗口。 - 右边的时间窗口规则:
rightDuration -> Observable.timer(2, TimeUnit.SECONDS)
表示在右边的事件后等待 2 秒后生成一个时间窗口。
现在让我们看看时间窗口如何影响事件的组合:
- 当左边的事件
1
发生时,它会进入左边的时间窗口,并等待 1 秒。在此期间,右边的事件没有机会进入左边的时间窗口。 - 当右边的事件
10
发生时,它会进入右边的时间窗口,并等待 2 秒。在此期间,左边的事件也没有机会进入右边的时间窗口。
只有在左边和右边的事件都在各自的时间窗口内时,它们才会被组合。在这个示例中,左边的事件会在右边的时间窗口内被组合。所以,在 1
秒后,左边的事件 1
和右边的事件 10
被组合成 “Left: 1, Right: 10”。
四、变化操作符
| 操作符 | 说明 |
map() | 对数据流的类型进行转换 |
---|---|
flatMap() | 对数据流的类型进行包装成另一个数据流 |
scan() | scan操作符会对发射的数据 和上一轮发射的数据 进行函数处理,并返回的数据供下一轮使用。 |
buffer() | 缓存指定大小数据 |
window() | 缓存指定大小数据,返回新的integerObservable |
对上一轮处理过后的数据流进行函数处理
对所有的数据流进行分组
缓存发射的数据流到一定数量,随后发射出数据流集合
缓存发射的数据流到一定数量,随后发射出新的事件流
1、map
Observable.just(1,2,3).map(new Function<Integer, Object>() {@Overridepublic Object apply(Integer integer) throws Throwable {return integer * 100;}}).subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));
2、flatMap / concatMap
Observable observable = Observable.just(isLogin("12346")).flatMap(new Function<Boolean, ObservableSource<?>>() {@Overridepublic ObservableSource<?> apply(Boolean aBoolean) throws Throwable {String Login = "登陆失败,帐号秘密错误";if (aBoolean) Login = "登陆成功";return Observable.just(Login).delay(2, TimeUnit.SECONDS);}});observable.subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));private boolean isLogin(String passWord) {if (passWord.equals("123456")) {return true;}return false;}
Observable.just(isLogin("12346"))
创建一个 Observable,它会发射一个布尔值,表示登录是否成功。.flatMap(new Function<Boolean, ObservableSource<?>>() { ... }
:使用flatMap
操作符将上一步的布尔值结果转换成一个新的 Observable,其中包含登录的结果消息。flatMap
中的apply
方法根据登录结果aBoolean
决定返回不同的消息。如果登录成功,返回 “登陆成功” 消息,否则返回 “登陆失败,帐号秘密错误” 消息,并使用delay
延迟 2 秒发送消息。observable.subscribe(...)
:最后,订阅observable
,并设置了三个回调函数,分别处理 onNext、onError、onComplete 事件。
concatMap与flatMap的区别: concatMap是有序的,flatMap是无序的。
flatMap()
:
- 不保证内部 Observable 的发射顺序,它会尽可能并行地处理内部 Observable,并将它们的发射结果合并到一个单一的 Observable 中。
- 内部 Observable 可以乱序发射数据,最终结果也可能是乱序的。
concatMap()
:
- 保证内部 Observable 的发射顺序,它会按照原始数据的顺序依次处理每个内部 Observable,等待一个内部 Observable 完成后再处理下一个。
- 内部 Observable 的发射顺序和最终结果的顺序都与原始数据的顺序一致。
Observable<Integer> source = Observable.just(1, 2, 3);// 使用 flatMapsource.flatMap(num -> Observable.just(num * 2).delay(num, TimeUnit.MILLISECONDS)).subscribe(value -> Log.e(TAG, "timer:flatMapOnNext ==" + value));// 使用 concatMapsource.concatMap(num -> Observable.just(num * 2).delay(num, TimeUnit.MILLISECONDS)).subscribe(value -> Log.e(TAG, "timer:concatMapOnNext ==" + value));
3、scan
scan操作符会对发射的数据
和上一轮发射的数据
进行函数处理,并返回的数据供下一轮使用。
Observable<Integer> observable = Observable.just(1,2,3,4).scan(new BiFunction<Integer, Integer, Integer>() {@Overridepublic Integer apply(Integer integer, Integer integer2) throws Throwable {Log.e(TAG, "integer = " + integer +" integer2 = "+integer2);return integer2-integer;}});observable.subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));
- 初始情况下,前一个累积结果为空(因为没有前一个值),所以第一个数据项 1 直接发射出来,产生的结果是 1。
- 接下来,前一个累积结果是 1,当前数据项是 2,所以执行操作 2 - 1,产生的结果是 1。
- 再次执行,前一个累积结果是 1,当前数据项是 3,所以执行操作 3 - 1,产生的结果是 2。
- 最后,前一个累积结果是 2,当前数据项是 4,执行操作 4 - 2,产生的结果是 2。
4、buffer
buffer操作符可以将发射出来的数据流,在给定的缓存池中进行缓存,当缓存池中的数据项溢满时,则将缓存池的数据项进行输出,重复上述过程,直到将发射出来的数据全部发射出去。
Observable.just(1,2,3,4,5,6,7,8).buffer(3).subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"))
5、window
window
操作符和buffer
操作符在功能上实现的效果是一样的,但window
操作符最大区别在于同样是缓存一定数量的数据项,window
操作符最终发射出来的是新的事件流integerObservable,而buffer操作符发射出来的是新的数据流。
也就是说,window操作符发射出来新的事件流中的数据项,还可以经过Rxjava其他操作符进行处理
window
操作符用于将一个 Observable 拆分为多个子 Observable,每个子 Observable 包含一定数量的连续数据项。window
操作符的两个参数的含义如下:
- 第一个参数(count):指定每个子 Observable 中包含的数据项的数量。
- 第二个参数(skip):指定何时启动新的窗口。它定义了窗口之间的重叠或间隔。如果 skip 等于 count,则窗口之间不重叠。如果 skip 小于 count,则窗口之间有重叠数据。
举个例子来说明:
假设有一个 Observable 发出的数据序列如下:1, 2, 3, 4, 5, 6, 7, 8, 9。
- 如果你使用
window(3, 2)
,它的含义是每个窗口包含 3 个数据项,且窗口之间间隔 2 个数据项。那么分割后的子 Observable 将是:
- 窗口1:1, 2, 3
- 窗口2:3, 4, 5
- 窗口3:5, 6, 7
- 窗口4:7, 8, 9
- 如果你使用
window(3, 3)
,窗口之间不重叠,每个窗口包含 3 个数据项。那么分割后的子 Observable 将是:
- 窗口1:1, 2, 3
- 窗口2:4, 5, 6
- 窗口3:7, 8, 9
这里只使用了一个参数,用于指定窗口的大小。然后更具window
发射新的事件流integerObservable
的特性,对这个事件流进行了去重操作。
Observable.just(1,1,3,4,6,6,7,8).window(3).subscribe(new Consumer<Observable<Integer>>() {@Overridepublic void accept(Observable<Integer> integerObservable) throws Throwable {integerObservable.distinct().subscribe(value -> Log.e(TAG, "timer:onNext ==" + value.toString()));}});
关于RxJava/RxAndroid的全部文章
RxJava/RxAndroid的基本使用方法(一)
RxJava的操作符使用(二)
参考文档:
官方文档:reactivex
RxJava3 Wiki:Home · ReactiveX/RxJava Wiki · GitHub
RxJava3官方github:What’s different in 3.0 · ReactiveX/RxJava Wiki · GitHub
RxJava2 只看这一篇文章就够了–玉刚说
RxJava2最全面、最详细的讲解–苏火火丶
关于背压(Backpressure)的介绍:关于RxJava最友好的文章——背压(Backpressure)
RXJava3+OKHTTP3+Retrofit2(观察者设计模式)讲解+实战
相关文章:

RxJava/RxAndroid的操作符使用(二)
文章目录 一、创建操作1、基本创建2、快速创建2.1 empty2.2 never2.3 error2.4 from2.5 just 3、定时与延时创建操作3.1 defer3.2 timer3.3 interval3.4 intervalRange3.5 range3.6 repeat 二、过滤操作1、skip/skipLast2、debounce3、distinct——去重4、elementAt——获取指定…...

【C语法学习】20 - 文件访问顺序
文章目录 0 前言1 文件位置指示符2 rewind()函数2.1 函数原型2.2 参数2.3 返回值2.4 使用说明 3 ftell()函数3.1 函数原型3.2 参数3.3 返回值 4 fseek()函数4.1 函数原型4.2 参数4.3 返回值 5 示例5.1 示例15.2 示例2 0 前言 C语言文件访问分为顺序文件访问和随机文件访问。 …...

Etcd 常用命令与备份恢复
1. etcd简介 官方网站:etcd.io 官方文档:etcd.io/docs/v3.5/op-guide/maintenance 官方硬件推荐:etcd.io/docs/v3.5/op-guide/hardware github地址:github.com/etcd-io/etcd etcd是CoreOS团队于2013年6月发起的开源项目…...
获取任意时间段内周、季度、半年的二级联动
#需求是获取两个时间内 年周 、年季度、年半年的二级联动# 找了半天也找不到什么有用的信息 就自己简单写了一个 思路是先获取年的列表再去嵌套查询 根据前端VUE提供的格式嵌套 public function getDate(){$leixing Request::param(leixing);$larr array(1,2,3,4);if(empty(…...

前端面试系列之工程化篇
如果对前端八股文感兴趣,可以留意公重号:码农补给站,总有你要的干货。 前端工程化 Webpack 概念 本质上,webpack 是一个用于现代 JavaScript 应用程序的静态模块打包工具。当 webpack 处理应用程序时,它会在内部从一个…...

京东按关键词搜索商品列表接口:竞品分析,商品管理,营销策略制定
京东搜索商品列表接口是京东开放平台提供的一种API接口,通过调用该接口,开发者可以获取京东平台上商品的列表数据,包括商品的标题、价格、库存、月销量、总销量、详情描述、图片等信息。 接口的主要作用包括: 市场调研ÿ…...

Microsoft Dynamics 365 CE 扩展定制 - 9. Dynamics 365扩展
在本章中,我们将介绍以下内容: Dynamics 365应用程序Dynamics 365通用数据服务构建Dynamics 365 PowerApp使用Flow在CDS和Dynamics 365之间移动数据从AppSource安装解决方案使用数据导出服务解决方案进行数据复制从CRM数据构建Power BI仪表板简介 多年来,Dynamics CRM已从一…...

多篇论文介绍-Wiou
论文地址 目录 https://arxiv.org/pdf/2301.10051.pdf 01 CIEFRNet:面向高速公路的抛洒物检测算法 02改进 YOLOv5 的 PDC 钻头复合片缺损识别 03 基于SimAM注意力机制的DCN-YOLOv5水下目标检测 04 基于改进YOLOv7-tiny 算法的输电线路螺栓缺销检测 05 基于改…...
Django介绍,安装,创建
文章目录 1. web应用程序1.1 什么是web?1.2 web应用程序的优点1.3 web应用程序的缺点1.4 什么是web框架? 2. 手撸web框架 1. web应用程序 1.1 什么是web? Web应用程序是一种可以通过Web访问的应用程序,用户只需要有浏览器即可,不需要再安装其他软件 案…...
Java通过javacv获取视频、音频、图片等元数据信息(分辨率、大小、帧等信息)
相信我们都会或多或少需要给前端返回视频或者音频的一些信息,那么今天这篇文章通过Java语言使用javacv来获取视频、音频、图片等元数据信息(分辨率、大小、帧等信息) 一、首先导入依赖 可以先导入javacv/javacv-platform依赖,由于依赖比较大,所以我们可以先去除部分不需…...
flask和fastapi的区别以及demo实现
flask和fastapi的区别以及demo实现 flask和fastapi的区别fastapi简单demoFastAPI包括全局异常捕捉和参数验证的demoflask和fastapi的区别 Flask:Flask是一个轻量级的Web框架,它提供了最基本的工具,可以自由选择其他库和组件来构建应用。灵活性:Flask允许用户自由选择数据库、…...

python特殊循环队列_队中元素个数代替队尾指针
对于循环队列来说,如果知道队头指针和队中元素个数,则可以计算出队尾指针。也就是说,可以用队中元素个数代替队尾指针。设计出这种循环队列的判队空、进队、出队和取队头元素的算法。 本例的循环队列包含data 数组、队头指针 front和队中元素…...

什么是观察者模式?用 Python 如何实现 Observer(观察者或发布订阅)对象行为型模式?
什么是观察者模式? 观察者模式(Observer pattern)是一种行为型设计模式,它允许对象之间建立一种一对多的依赖关系,当一个对象的状态发生变化时,其相关依赖对象都会得到通知并自动更新。 在观察者模式中&am…...

pytorch直线拟合
目录 1、数据分析 2、pytorch直线拟合 1、数据分析 直线拟合的前提条件通常包括以下几点: 存在线性关系:这是进行直线拟合的基础,数据点之间应该存在一种线性关系,即数据的分布可以用直线来近似描述。这种线性关系可以是数据点…...
相机传感器
相机的传感器大小通常用英寸(1英寸2.54厘米)来表示。例如:全画幅相机的传感器大小为:36mm*24mm,称为 35mm全画幅。 几分之一英寸 所谓的 1/2.7,1/2.5等等,里面的分子1是一个标准,分…...
大语言模型的关键技术
大语言模型的关键技术: 经过漫长的发展,LLM 进化到了当前的状态——通用且有能力的学习者。在这个过程中,人们提出了许多重要的技术,大大提升了 LLM 的能力。在此,我们简要列举了几种重要的技术,这些技术&a…...

uniapp使用vur-cli新建项目并打包
新建项目 npm install -g vue/cli vue create -p dcloudio/uni-preset-vue my-project选择默认模板npm run dev:h5 运行 安装sass和uview (npm安装失败) bug:使用uni.scss中的变量或样式,<style lang"scss"> 必…...

后台管理系统解决方案-中大型-Vben Admin
后台管理系统解决方案-中大型-Vben Admin 官网 Vben Admin 在线演示 Vben Admin 为什么选择它 github现有20K星,并且它有个可视化生成表单,我很喜欢 快速开始 # 拉取代码 git clone https://github.com/vbenjs/vue-vben-admin-doc# 安装依赖 yarn#…...

通俗理解repartition和coalesce区别
官方的解释 reparation 返回一个具有恰好numPartitions分区的新RDD。 可以增加或减少此RDD中的并行级别。在内部,reparation会使用shuffle来重新分发的数据。 如果要减少此RDD中的分区数量,请考虑使用coalesce,这样可以避免执行shuffle。 coalesce 返回一个新的RDD,该RDD被…...

优雅设计之美:实现Vue应用程序的时尚布局
本文为翻译文章,原文链接: ** https://fadamakis.com/clean-layout-architecture-for-vue-applications-a738201a2a1e 前言 页面布局是减少代码重复和创建可维护且具有专业外观的应用程序的基本模式。如果使用的是Nuxt,则可以提供开箱即用…...
接IT方案编写(PPT/WORD)、业务架构设计、投标任务
1、IT 方案编写(PPT/WORD) 定制化方案:根据客户需求,提供涵盖云计算、大数据、人工智能等前沿技术领域的 PPT/WORD 方案编写服务,精准提炼核心价值,呈现专业技术内容。 逻辑清晰架构:采用…...
Build a Large Language Model (From Scratch) 序章
关于本书 《从零构建大型语言模型》旨在帮助读者全面理解并从头创建类似GPT的大型语言模型(LLMs)。 全书首先聚焦于文本数据处理的基础知识和注意力机制的编码,随后指导读者逐步实现一个完整的GPT模型。书中还涵盖了预训练机制以及针对文本…...

MCU_IO驱动LED
注意事项: 1、亮度要求较高的情况下,不能由IO直接驱动LED MCU_IO引脚输出的电压和电流较弱,如果对光的亮度有要求的话,需要使用三极管来驱动。 MCU_IO的电压一般为3.3V或者5V,输出电流一般10mA-25mA。 2、不同颜色…...
Q: dify前端使用哪些开发框架?
【回到目录】~~~~【回到问题集】 Q: dify前端使用哪些开发框架? A: 通过查看Readme.md,可以了解到使用以下框架 1. [Next.js] (https://nextjs.org/) React Framework 2. Node.js > v22.11.x 3. pnpm v10.x 4. Storybook UI component development 4. Je…...
模板方法模式:优雅封装不变,灵活扩展可变
引言:代码复用与扩展的艺术 在日常开发中,我们常遇到核心流程固定但某些步骤需差异化的场景。例如: 数据库操作的通用流程(连接→执行→关闭)HTTP请求的固定步骤(构建请求→发送→解析响应)报表生成的骨架(数据获取→格式转换→输出)模板方法模式正是为解决这类问题而…...
Ubuntu 系统通过防火墙管控 Docker 容器
Ubuntu 系统通过防火墙管控 Docker 容器指南 一、基础防火墙配置 # 启用防火墙 sudo ufw enable# 允许 SSH 连接(防止配置过程中断联) sudo ufw allow 22/tcp二、Docker 配置调整 # 编辑 Docker 配置文件 sudo vim /etc/docker/daemon.json配置文件内…...

强化学习入门:Gym实现CartPole随机智能体
前言 最近想开一个关于强化学习专栏,因为DeepSeek-R1很火,但本人对于LLM连门都没入。因此,只是记录一些类似的读书笔记,内容不深,大多数只是一些概念的东西,数学公式也不会太多,还望读者多多指教…...
基于springboot的校园社团信息系统的设计与实现
其他源码获取可以看首页:代码老y 个人简介:专注于毕业设计项目定制开发:springbootvue系统,Java微信小程序,javaSSM系统等技术开发,并提供远程调试部署、代码讲解、文档指导、ppt制作等技术指导。源码获取&…...

LLM Agent 如何颠覆股价预测的传统范式
写在前面 股价预测,金融领域的“圣杯”之一,吸引了无数研究者和投资者。传统方法从技术指标到复杂的计量经济模型,再到机器学习,不断演进,但市场的高度复杂性、非线性和充斥噪声的特性,使得精准预测依然是巨大的挑战。大型语言模型(LLM)的崛起,特别是LLM Agent这一新…...
JVM——如何打造一个类加载器?
引入 在Java应用程序的生命周期中,类加载器扮演着至关重要的角色。它是Java运行时环境的核心组件之一,负责在需要时动态加载类文件到JVM中。理解类加载器的工作原理以及如何自定义类加载器,不仅可以帮助我们更好地管理应用程序的类加载过程&…...