Kotlin学习:5.2、异步数据流 Flow
Flow
- 一、Flow
- 1、Flow是什么东西?
- 2、实现功能
- 3、特点
- 4、冷流和热流
- 5、流的连续性
- 6、流的构建器
- 7、流的上下文
- 8、指定流所在协程
- 9、流的取消
- 9.1、超时取消
- 9.2、主动取消
- 9.3、密集型任务的取消
- 10、背压和优化
- 10.1、buffer 操作符
- 10.2、 flowOn
- 10.3、conflate 操作符
- 10.4、collectLatest 操作符
- 二、操作符
- 1、变换操作符
- 1.1、buffer (缓存)
- 1.2、map (变换)
- 1.2.1、map
- 1.2.2、mapNotNull (不空的下发)
- 1.2.3、mapLatest
- 1.3、transform (一转多)
- 1.4、reduce (累*加减乘除)
- 1.5、fold(累*加减乘除 and 拼接)
- 1.6、flatMapConcat (有序变换)
- 1.7、flatMapMerge (无序变换)
- 1.8、flatMapLatest (截留)
- 2、过滤型操作符
- 2.1、take (截留)
- 2.1.2、takeWhile
- 2.2、filter(满足条件下发)
- 2.2.2、filterNotNull (不空的下发)
- 2.2.3、filterNot(符合条件的值将被丢弃)
- 2.2.4、filterInstance (筛选符合类型的值)
- 2.3、skip 和 drop(跳过)
- 2.3.2、dropWhile
- 2.4、distinctUntilChanged (过滤重复)
- 2.4.2、distinctUntilChangedBy
- 2.5、single (判断是否一个事件)
- 2.6、first (截留第一个事件)
- 2.7、debounce (防抖动)
- 2.8、conflate
- 2.9、sample (周期采样)
- 3、组合型操作符
- 3.1、count (计数)
- 3.2、zip (合并元素)
- 3.3、combine(合并元素)
- 3.4、merge (合并成流)
- 3.5、flattenConcat (展平流)
- 3.6、flattenMerge(展平流)
- 4、异常操作符
- 4.1、catch (拦截异常)
- 4.2、retry (重试)
- 4.2.2、retryWhen
- 4.3、withTimeout (超时)
- 5、辅助操作符
- 5.1、onXXX
- 5.2、delay (延时)
- 5.3、measureTimeMillis (计时)
- 参考地址
一、Flow
1、Flow是什么东西?
Flow 是有点类似 RxJava 的 Observable
都有冷流和热流之分;
都有流式构建结构;
都包含 map、filter 等操作符。
区别于Observable,Flow可以配合挂起函数使用
2、实现功能
异步返回多个值
可以实现下载功能等,Observable 下发数组时可以实现什么功能,他就能实现什么功能
当文件下载时,对应的后台下载进度,就可以通过Flow里面的emit发送数据,通过collect接收对应的数据。
转:https://blog.csdn.net/qq_30382601/article/details/121825461
3、特点
- flow{…}块中的代码可以挂起
- 使用flow,suspend修饰符可以省略
- 流使用emit函数发射值
- 流使用collect的函数收集值
- flow类似冷流,flow中代码直到流被收集(调用collect)的时候才运行,类似lazy,什么时候用,什么时候执行。
- 流的连续性:流收集都是按顺序收集的
- flowOn可更改流发射的上下文,即可以指定在主线程或子线程中执行
- 与之相对的是热流,我们即将介绍的 StateFlow 和 SharedFlow 是热流,在垃圾回收之前,都是存在内存之中,并且处于活跃状态的。
转:https://blog.csdn.net/zx_android/article/details/122744370
4、冷流和热流
-
冷流
冷流类似冷启动,代码在被用到才会执行,如你需要使用的数据在网络,需要先请求网络才能得到数据
Flow是一种类似于序列的冷流,flow构建器中的代码直到流被收集的时候才运行。 -
热流
热流类似热启动,代码在用到之前已经准备好,如你请求过网络,数据已经缓存在本地,你只需直接使用即可
5、流的连续性
流的连续性:流收集都是按顺序收集的
6、流的构建器
如下三种为冷流构建器
- flow{emit} .collect{}
- flowOf(***).collect{}
- (***).asFlow().collect{}
@Testfun `test flow builder`() = runBlocking<Unit> {flowOf("one", "two", "three").onEach { delay(1000) }.collect { value ->println(value)}(1..3).asFlow().collect { value ->println(value)}flow<Int> {for (i in 11..13) {delay(1000) //假装在一些重要的事情emit(i) //发射,产生一个元素}}.collect { value ->println(value)}}
7、流的上下文
flowOn (多用于切线程)
流的收集总是在调用协程的上下文中发生,流的该属性称为上下文保存。
fun simpleFlow3() = flow<Int> {println("Flow started ${Thread.currentThread().name}")for (i in 1..3) {delay(1000)emit(i)}}@Testfun `test flow context`() = runBlocking<Unit> {simpleFlow3().collect { value -> println("Collected $value ${Thread.currentThread().name}") }}
如下:流的发射和接收在一个协程内
Flow started Test worker @coroutine#1
Collected 1 Test worker @coroutine#1
Collected 2 Test worker @coroutine#1
Collected 3 Test worker @coroutine#1
flow{…}构建器中的代码必须遵循上下文保存属性,并且不允许从其他上下文中发生(emit)
如下这种写法不被允许
fun simpleFlow4() = flow<Int> {withContext(Dispatchers.Default) {println("Flow started ${Thread.currentThread().name}")for (i in 1..3) {delay(1000)emit(i)}}}
那么如何切换协程上下文呢?
flowOn操作符,该函数用于更改流发射的上下文
fun simpleFlow5() = flow<Int> {println("Flow started ${Thread.currentThread().name}")for (i in 1..3) {delay(1000)emit(i)}}.flowOn(Dispatchers.Default)@Testfun `test flow context`() = runBlocking<Unit> {simpleFlow5().collect { value -> println("Collected $value ${Thread.currentThread().name}") }}
如下:切换上下文成功
Flow started DefaultDispatcher-worker-2 @coroutine#2
Collected 1 Test worker @coroutine#1
Collected 2 Test worker @coroutine#1
Collected 3 Test worker @coroutine#1
8、指定流所在协程
launchIn 用于指定协程作用域通知flow执行
使用 launchIn 替换 collect 在单独的协程中启动收集流
- 指定协程
//事件源private fun events() = (1..3).asFlow().onEach { delay(100) }.flowOn(Dispatchers.Default)@Testfun `test flow launch`() = runBlocking<Unit> {val job = events().onEach { event -> println("Event: $event ${Thread.currentThread().name}") }
// .collect {}.launchIn(CoroutineScope(Dispatchers.IO)) //这里使用另一个上下文传入Flow
// .launchIn(this)//这里使用当前上下文传入Flowjob.join()}
打印:
Event: 1 DefaultDispatcher-worker-2 @coroutine#2
Event: 2 DefaultDispatcher-worker-1 @coroutine#2
Event: 3 DefaultDispatcher-worker-3 @coroutine#2
- 也可以指定当前协程中执行
@Testfun `test flow launch`() = runBlocking<Unit> {val job = events().onEach { event -> println("Event: $event ${Thread.currentThread().name}") }
// .collect {}
// .launchIn(CoroutineScope(Dispatchers.IO)) //这里使用另一个上下文传入Flow.launchIn(this)//这里使用当前上下文传入Flow// job.join()}
Event: 1 Test worker @coroutine#2
Event: 2 Test worker @coroutine#2
Event: 3 Test worker @coroutine#2
9、流的取消
流采用和协程同样的协作取消。流可以在挂起函数的挂起的时候取消。
9.1、超时取消
withTimeoutOrNull 不能取消密集型任务
fun simpleFlow6() = flow<Int> {for (i in 1..300) {delay(1000)emit(i)println("Emitting $i")}}@Testfun `test cancel flow`() = runBlocking<Unit> {withTimeoutOrNull(2500) {simpleFlow6().collect { value -> println(value) }}println("Done")}
9.2、主动取消
cancel
@Testfun `test cancel flow `() = runBlocking<Unit> {simpleFlow6().collect { value ->if (value == 3) {cancel()}println(value)}println("Done")}
9.3、密集型任务的取消
密集型任务需要流的取消检测
cancel + cancellable
@Testfun `test cancel flow check`() = runBlocking<Unit> {(1..5).asFlow().cancellable().collect { value ->println(value)if (value == 3) cancel()println("cancel check ${coroutineContext[Job]?.isActive}")}}
10、背压和优化
- 什么是背压?
生产者生产的效率大于消费者消费的效率,元素积压
例,演示背压
fun simpleFlow8() = flow<Int> {for (i in 1..10) {// emit 上面这段代码在collect之前执行delay(100)emit(i) // 调用collect// emit下面这段代码在 collect 之后执行println("Emitting $i ${Thread.currentThread().name}")}}@Testfun `test flow back pressure`() = runBlocking<Unit> {val time = measureTimeMillis {simpleFlow8().collect { value ->delay(200) //处理这个元素消耗 200msprintln("Collected $value ${Thread.currentThread().name}")}}println("Collected in $time ms")}
Collected 1 Test worker @coroutine#1
Emitting 1 Test worker @coroutine#1
Collected 2 Test worker @coroutine#1
Emitting 2 Test worker @coroutine#1
Collected 3 Test worker @coroutine#1
Emitting 3 Test worker @coroutine#1
Collected 4 Test worker @coroutine#1
Emitting 4 Test worker @coroutine#1
Collected 5 Test worker @coroutine#1
Emitting 5 Test worker @coroutine#1
Collected 6 Test worker @coroutine#1
Emitting 6 Test worker @coroutine#1
Collected 7 Test worker @coroutine#1
Emitting 7 Test worker @coroutine#1
Collected 8 Test worker @coroutine#1
Emitting 8 Test worker @coroutine#1
Collected 9 Test worker @coroutine#1
Emitting 9 Test worker @coroutine#1
Collected 10 Test worker @coroutine#1
Emitting 10 Test worker @coroutine#1
Collected in 3169 ms
- 如何解决背压?
通过缓存进行性能优化
10.1、buffer 操作符
并发运行流中发射元素的代码
注意:for (i in 1…10) 这里用的是 1到 10,原因是 for循环 有耗时问题,通过打印时间戳在 for (i in 1…x) 上下,发现 for (i in 1…x) 这行代码有时耗时超过200毫秒,目前不知是何问题,特此记录,为方便对比优化时长,使用1到10.
@Testfun `test flow back pressure buffer`() = runBlocking<Unit> {val time = measureTimeMillis {simpleFlow8().buffer(10) //缓存发射事件.collect { value ->delay(200) //处理这个元素消耗 200msprintln("Collected $value ${Thread.currentThread().name}")}}println("Collected in $time ms")}
Emitting 1 Test worker @coroutine#2
Emitting 2 Test worker @coroutine#2
Collected 1 Test worker @coroutine#1
Emitting 3 Test worker @coroutine#2
Emitting 4 Test worker @coroutine#2
Collected 2 Test worker @coroutine#1
Emitting 5 Test worker @coroutine#2
Emitting 6 Test worker @coroutine#2
Collected 3 Test worker @coroutine#1
Emitting 7 Test worker @coroutine#2
Emitting 8 Test worker @coroutine#2
Collected 4 Test worker @coroutine#1
Emitting 9 Test worker @coroutine#2
Emitting 10 Test worker @coroutine#2
Collected 5 Test worker @coroutine#1
Collected 6 Test worker @coroutine#1
Collected 7 Test worker @coroutine#1
Collected 8 Test worker @coroutine#1
Collected 9 Test worker @coroutine#1
Collected 10 Test worker @coroutine#1
Collected in 2398 ms
10.2、 flowOn
flowOn(),修改流上下文,达到异步处理的效果,从而优化背压
@Testfun `test flow back pressure flowOn`() = runBlocking<Unit> {val time = measureTimeMillis {simpleFlow8().flowOn(Dispatchers.IO).collect { value ->delay(200) //处理这个元素消耗 200msprintln("Collected $value ${Thread.currentThread().name}")}}println("Collected in $time ms")}
Emitting 1 DefaultDispatcher-worker-1 @coroutine#2
Emitting 2 DefaultDispatcher-worker-1 @coroutine#2
Collected 1 Test worker @coroutine#1
Emitting 3 DefaultDispatcher-worker-1 @coroutine#2
Emitting 4 DefaultDispatcher-worker-1 @coroutine#2
Collected 2 Test worker @coroutine#1
Emitting 5 DefaultDispatcher-worker-1 @coroutine#2
Emitting 6 DefaultDispatcher-worker-1 @coroutine#2
Collected 3 Test worker @coroutine#1
Emitting 7 DefaultDispatcher-worker-1 @coroutine#2
Emitting 8 DefaultDispatcher-worker-1 @coroutine#2
Collected 4 Test worker @coroutine#1
Emitting 9 DefaultDispatcher-worker-1 @coroutine#2
Emitting 10 DefaultDispatcher-worker-1 @coroutine#2
Collected 5 Test worker @coroutine#1
Collected 6 Test worker @coroutine#1
Collected 7 Test worker @coroutine#1
Collected 8 Test worker @coroutine#1
Collected 9 Test worker @coroutine#1
Collected 10 Test worker @coroutine#1
Collected in 2385 ms
10.3、conflate 操作符
conflate(),合并发射项,处理最新的值,不对每个值进行处理;
@Testfun `test flow back pressure conflate`() = runBlocking<Unit> {val time = measureTimeMillis {simpleFlow8().conflate().collect { value ->delay(200) //处理这个元素消耗 200msprintln("Collected $value ${Thread.currentThread().name}")}}println("Collected in $time ms")}
Emitting 1 Test worker @coroutine#2
Emitting 2 Test worker @coroutine#2
Collected 1 Test worker @coroutine#1
Emitting 3 Test worker @coroutine#2
Emitting 4 Test worker @coroutine#2
Collected 2 Test worker @coroutine#1
Emitting 5 Test worker @coroutine#2
Emitting 6 Test worker @coroutine#2
Collected 4 Test worker @coroutine#1
Emitting 7 Test worker @coroutine#2
Emitting 8 Test worker @coroutine#2
Collected 6 Test worker @coroutine#1
Emitting 9 Test worker @coroutine#2
Emitting 10 Test worker @coroutine#2
Collected 8 Test worker @coroutine#1
Collected 10 Test worker @coroutine#1
Collected in 1554 ms
10.4、collectLatest 操作符
collectLatest(),取消并重新发射最后一个值
@Testfun `test flow back pressure collectLatest`() = runBlocking<Unit> {val time = measureTimeMillis {simpleFlow8().collectLatest { value ->delay(200) //处理这个元素消耗 200msprintln("Collected $value ${Thread.currentThread().name}")}}println("Collected in $time ms")}
Emitting 1 Test worker @coroutine#2
Emitting 2 Test worker @coroutine#2
Emitting 3 Test worker @coroutine#2
Emitting 4 Test worker @coroutine#2
Emitting 5 Test worker @coroutine#2
Emitting 6 Test worker @coroutine#2
Emitting 7 Test worker @coroutine#2
Emitting 8 Test worker @coroutine#2
Emitting 9 Test worker @coroutine#2
Emitting 10 Test worker @coroutine#2
Collected 10 Test worker @coroutine#12
Collected in 1648 ms
二、操作符
1、变换操作符
1.1、buffer (缓存)
上面背压有栗子
1.2、map (变换)
1.2.1、map
map 是变换元素
data class Student(var name: String, var age: Int)
private suspend fun performRequest(age: Int): Student {delay(500)return Student("这是name", age)}@Testfun `test map flow operator`() = runBlocking<Unit> {(1..3).asFlow().map { request -> performRequest(request) }.collect { value -> println(value) }}
Student(name=这是name, age=1)
Student(name=这是name, age=2)
Student(name=这是name, age=3)
1.2.2、mapNotNull (不空的下发)
@Testfun `test mapNotNull flow operator`() = runBlocking<Unit> {flow {emit(1)emit(3)emit(2)}.mapNotNull { request ->if (1 == request) {null} else {Student("这是name", request)}}.collect { value -> println(value) }}
Student(name=这是name, age=3)
Student(name=这是name, age=2)
1.2.3、mapLatest
当有新值发送时,如果上个转换还没结束,会取消掉,用法同map
@Testfun `test mapLatest flow operator`() = runBlocking<Unit> {flow {emit(1)emit(2)emit(3)}.mapLatest {if (2 == it) delay(100L)"it is $it"}.collect {println(it)}}
1.3、transform (一转多)
@Testfun `test transform flow operator`() = runBlocking<Unit> {(1..3).asFlow().transform { request ->emit("Making request $request")emit(performRequest(request))}.collect { value -> println(value) }}
Making request 1
Student(name=这是name, age=1)
Making request 2
Student(name=这是name, age=2)
Making request 3
Student(name=这是name, age=3)
1.4、reduce (累*加减乘除)
@Testfun `test reduce operator`() = runBlocking<Unit> {println(flow<Int> {emit(1)emit(1)emit(2)emit(3)emit(3)emit(4)}.reduce { accumulator, value -> accumulator + value })}
1.5、fold(累*加减乘除 and 拼接)
- 加
@Testfun `test fold + operator`() = runBlocking<Unit> {println(flow<Int> {emit(1)emit(1)emit(2)emit(3)emit(3)emit(4)}.fold(3) { accumulator, value -> accumulator + value })}
17
- 减
@Testfun `test fold - operator`() = runBlocking<Unit> {println(flow<Int> {emit(2)emit(3)}.fold(18) { accumulator, value -> accumulator - value })}
13
- 乘
@Testfun `test fold multiply by operator`() = runBlocking<Unit> {println(flow<Int> {emit(1)emit(1)emit(2)emit(3)}.fold(3) { accumulator, value -> accumulator * value })}
18
- 除
@Testfun `test fold devide operator`() = runBlocking<Unit> {println(flow<Int> {emit(2)emit(3)}.fold(18) { accumulator, value -> accumulator / value })}
3
- 拼接
@Testfun `test fold joint operator`() = runBlocking<Unit> {println(flow<Int> {emit(1)emit(2)emit(3)}.fold("拼接") { accumulator, value -> return@fold "$accumulator =+= $value" })}
拼接 =+= 1 =+= 2 =+= 3
1.6、flatMapConcat (有序变换)
元素会变换完,以流的形式继续下发,并且某个元素需要耗时,它后面的元素会等待。
@Testfun `test flatMapConcat operator`() = runBlocking<Unit> {(1..5).asFlow().onEach { delay(100) }.flatMapConcat { num ->flow {if (3==num){delay(200)}emit("num: $num")}}.collect {println("value -> $it")}}
value -> num: 1
value -> num: 2
value -> num: 3
value -> num: 4
value -> num: 5
1.7、flatMapMerge (无序变换)
元素会变换完,以流的形式继续下发,并且某个元素需要耗时,它后面的元素不会等待。
@Testfun `test flatMapMerge operator`() = runBlocking<Unit> {(1..5).asFlow().onEach { delay(100) }.flatMapMerge() { num ->flow {if (3==num){delay(200)}emit("num: $num")}}.collect {println("value -> $it")}}
value -> num: 1
value -> num: 2
value -> num: 4
value -> num: 3
value -> num: 5
1.8、flatMapLatest (截留)
快速执行的事件都正常下发,
当有新值发送时,如果上个转换还没结束,会上取消掉上一个,直接下发新值。
@Testfun `test flatMapLatest operator`() = runBlocking<Unit> {(1..5).asFlow().onEach { delay(100) }.flatMapLatest() { num ->flow {if (3 == num) {delay(200)}emit("num: $num")emit("num2: $num")}}.collect {println("value -> $it")}}
value -> num: 1
value -> num2: 1
value -> num: 2
value -> num2: 2
value -> num: 4
value -> num2: 4
value -> num: 5
value -> num2: 5
2、过滤型操作符
2.1、take (截留)
跟Rxjava一样
fun numbers() = flow<Int> {try {emit(1)emit(2)println("This line will not execute")emit(3)} finally {println("Finally in numbers")}}@Testfun `test limit length operator`() = runBlocking<Unit> {//take(2),表示 当计数元素被消耗时,原始流被取消numbers().take(2).collect { value -> println(value) }}
1
2
Finally in numbers
2.1.2、takeWhile
找到第一个不满足条件的值,发送它之前的值,和dropWhile相反
@Testfun `test takeWhile operator`() = runBlocking<Unit> {flow<Int> {emit(2)emit(1)emit(3)emit(4)emit(1)}.takeWhile { it < 2 }.collect { value -> println(value) }}
如上什么也不会输出;
@Testfun `test takeWhile operator`() = runBlocking<Unit> {flow<Int> {emit(1)emit(2)emit(3)emit(4)emit(1)}.takeWhile { it < 2 }.collect { value -> println(value) }}
会输出 1
2.2、filter(满足条件下发)
跟Rxjava一样
@Testfun `test filter operator`() = runBlocking<Unit> {numbers().filter {it == 2}.collect { value -> println(value) }}
2.2.2、filterNotNull (不空的下发)
@Testfun `test filterNotNull flow operator`() = runBlocking<Unit> {flow {emit(1)emit(3)emit(null)emit(2)}.filterNotNull ().collect { value -> println(value) }}
1
3
2
2.2.3、filterNot(符合条件的值将被丢弃)
筛选不符合条件的值,相当于filter取反
@Testfun `test filterNot operator`() = runBlocking<Unit> {flow<Int> {emit(1)emit(2)emit(3)}.filterNot {it > 2}.collect { value -> println(value) }}
1
2
2.2.4、filterInstance (筛选符合类型的值)
对标rxjava中的ofType
筛选符合类型的值(不符合类型的值将被丢弃)
@Testfun `test filterInstance operator`() = runBlocking<Unit> {flow<Any> {emit(1)emit("2")emit(3)emit("str")}.filterIsInstance<String>().collect { value -> println(value) }}
2
str
2.3、skip 和 drop(跳过)
@Testfun `test skip operator`() = runBlocking<Unit> {numbers().drop(2).collect { value -> println(value) }}
输出
3
2.3.2、dropWhile
找到第一个不满足条件的值,继续发送它和它之后的值
@Testfun `test dropWhile operator`() = runBlocking<Unit> {numbers().dropWhile { it <= 2 }.collect { value -> println(value) }}
This line will not execute
3
Finally in numbers
2.4、distinctUntilChanged (过滤重复)
@Testfun `test distinctUntilChanged operator`() = runBlocking<Unit> {flow<Int> {emit(1)emit(1)emit(2)emit(3)emit(3)emit(4)}.distinctUntilChanged().collect { value -> println(value) }}
2.4.2、distinctUntilChangedBy
判断两个连续值是否重复,可以设置是否丢弃重复值。
去重规则有点复杂,没完全懂
@Testfun `test distinctUntilChangedBy operator`() = runBlocking<Unit> {flowOf(Student(name = "Jack", age = 11),Student(name = "Tom", age = 10),Student(name = "Jack", age = 12),Student(name = "Jack", age = 13),Student(name = "Tom", age = 11)).distinctUntilChangedBy { it.name == "Jack" }.collect { //第三个Stu将被丢弃println(it.toString())}}
Student(name=Jack, age=11)
Student(name=Tom, age=10)
Student(name=Jack, age=12)
Student(name=Tom, age=11)
2.5、single (判断是否一个事件)
用于确保 flow 输出值唯一。若只有一个值,则可以正常执行,若输出的值不止只有一个的时候,就会抛出异常:
@Testfun `test single operator`() = runBlocking<Unit> {try {println(flow<Int> {emit(1)emit(1)emit(2)emit(3)emit(3)emit(4)}.single())} catch (e: Exception) {println("e =$e")}}
如果一个事件,就正常执行;否则异常。
e =java.lang.IllegalArgumentException: Flow has more than one element
2.6、first (截留第一个事件)
@Testfun `test first operator`() = runBlocking<Unit> {println(flow<Int> {emit(1)emit(1)emit(2)emit(3)emit(3)emit(4)}.first())}
1
2.7、debounce (防抖动)
@Testfun `test debounce operator`() = runBlocking<Unit> {flowOf(Student(name = "Jack", age = 11),Student(name = "Tom", age = 10),Student(name = "Jack", age = 12),Student(name = "Jack", age = 13),Student(name = "Tom", age = 11)).onEach {if (it.name == "Jack" && it.age == 13)delay(500)}.debounce(500).collect { //第三个Stu将被丢弃println(it.toString())}}
Student(name=Jack, age=12)
Student(name=Tom, age=11)
2.8、conflate
见 10.3、conflate
仅保留最新值, 内部就是 buffer(CONFLATED``)
2.9、sample (周期采样)
固定周期采样 ,给定一个时间周期,保留周期内最后发出的值,其他的值将被丢弃
sample操作符与debounce操作符有点像,但是却限制了一个周期性时间,sample操作符获取的是一个周期内的最新的数据,可以理解为debounce操作符增加了周期的限制。
@Testfun `test sample operator`() = runBlocking<Unit> {flow {repeat(10) {delay(50)emit(it)}}.sample(100).collect {println(it)}}
0
2
4
6
8
3、组合型操作符
3.1、count (计数)
@Testfun `test count operator`() = runBlocking<Unit> {println(flow<Int> {emit(1)emit(1)emit(2)emit(3)emit(3)emit(4)}.count())}
3.2、zip (合并元素)
跟Rxjava一样
@Testfun `test zip operator`() = runBlocking<Unit> {val nameFlow = mutableListOf("小红", "小黑").asFlow()val numFlow = (1..3).asFlow()nameFlow.zip(numFlow) { string, num ->"$string:$num"}.collect {println("value -> $it")}}
3.3、combine(合并元素)
@Testfun `test combine operator`() = runBlocking<Unit> {val nameFlow = mutableListOf("小红", "小黑").asFlow()val numFlow = (1..3).asFlow()nameFlow.combine(numFlow) { string, num ->"$string:$num"}.collect {println("value -> $it")}}
value -> 小红:1
value -> 小黑:2
value -> 小黑:3
3.4、merge (合并成流)
merge 是将两个flow合并起来,将每个值依次发出来
@Testfun `test merge operator`() = runBlocking<Unit> {val flow1 = listOf(1, 2).asFlow()val flow2 = listOf("one", "two", "three").asFlow()merge(flow1, flow2).collect { value -> println(value) }}
1
2
one
two
three
3.5、flattenConcat (展平流)
展平操作符 flattenConcat 以顺序方式将给定的流展开为单个流,通俗点讲,减少层级 ,感觉和merge这么像呢,这个不太理解啥用
@Testfun `test flattenConcat operator`() = runBlocking<Unit> {val flow1 = listOf(1, 2).asFlow()val flow2 = listOf("one", "two", "three").asFlow()val flow3 = listOf("x", "xx", "xxx").asFlow()flowOf(flow1, flow2, flow3).flattenConcat().collect { value -> println(value) }}
1
2
one
two
three
x
xx
xxx
3.6、flattenMerge(展平流)
flattenMerge 作用和 flattenConcat 一样,但是可以设置并发收集流的数量
@Testfun `test flattenMerge operator`() = runBlocking<Unit> {val flow1 = listOf(1, 2).asFlow()val flow2 = listOf("one", "two", "three").asFlow()val flow3 = listOf("x", "xx", "xxx").asFlow()flowOf(flow1, flow2, flow3).flattenMerge(2).collect { value -> println(value) }}
1
2
one
two
three
x
xx
xxx
4、异常操作符
4.1、catch (拦截异常)
对标rxjava 中的 onErrorResumeNext
Exception、Throwable、Error 都会拦截
@Testfun `test catch operator`() = runBlocking<Unit> {(1..5).asFlow().onEach { delay(100) }.onEach { if (2 == it) throw NullPointerException() }.catch {emit(110)println("e == $it")}.collect {println("value -> $it")}}
@Testfun `test catch operator`() = runBlocking<Unit> {(1..5).asFlow().onEach { delay(100) }.onEach { if (2 == it)
// throw Exception("测试 异常")
// throw Throwable("测试 异常")throw Error("测试 错误")}.catch {emit(110)println("e == $it")}.collect {println("value -> $it")}}
value -> 1
value -> 110
e == java.lang.Error: 测试 错误
4.2、retry (重试)
所有异常错误都拦截
- 拦截次数
@Testfun `test retry operator`() = runBlocking<Unit> {flow<Any> {emit(1)emit(2)throw Exception("异常")emit(3)}.retry(2).catch { emit(110) }.collect { value -> println(value) }}
- 拦截条件
@Testfun `test retry 2 operator`() = runBlocking<Unit> {flow<Any> {emit(1)emit(2)throw Error("异常")emit(3)}.retry { it.message == "异常" }.catch { emit(110) }.collect { value -> println(value) }}
如上,满足拦截条件,所以会一直打印日志
1
2
1
2
1
2
1
2
1
... 不杀死程序一直打印
4.2.2、retryWhen
4.3、withTimeout (超时)
@Testfun `test retry 2 operator`() = runBlocking<Unit> {withTimeout(2500) {flow<Any> {emit(1)throw Error("异常")}.retry { it.message == "异常" }.catch { emit(110) }.collect { value -> println(value) }}}
输出:
1
1
... 好多个
1
1
1Timed out waiting for 2500 ms
kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 2500 ms(Coroutine boundary)at com.yoshin.kt.kotlindemo20220713.ExampleUnitTest$test retry 2 operator$1.invokeSuspend(ExampleUnitTest.kt:928)
Caused by: kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 2500 msat app//kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:184)at app//kotlinx.coroutines.TimeoutCoroutine.run(Timeout.kt:154)at app//kotlinx.coroutines.EventLoopImplBase$DelayedRunnableTask.run(EventLoop.common.kt:508)at app//kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:284)at app//kotlinx.coroutines.DefaultExecutor.run(DefaultExecutor.kt:108)at java.base@11.0.13/java.lang.Thread.run(Thread.java:834)
5、辅助操作符
5.1、onXXX
onXXX 的方法包含
onCompletion 流完成时调用
onStart 流开始时调用
onEach 元素下发时调用,每次下发都调用
对比rxjava 中:
onCompletion == doOnComplete
onStart == doOnSubscribe 或者 doOnLifecycle
onEach == doNext
@Testfun `test do operator`() = runBlocking<Unit> {(1..5).asFlow().onCompletion { println(" onCompletion == $it ") }.onStart { println(" onStart ") }.onEach { println(" onEach == $it ") }.collect {println("value -> $it")}}
5.2、delay (延时)
延时
private fun events() = (1..3).asFlow().onEach { delay(100) }.flowOn(Dispatchers.Default)
5.3、measureTimeMillis (计时)
测量代码用时
@Testfun `test flow back pressure`() = runBlocking<Unit> {val time = measureTimeMillis {simpleFlow8().collect { value ->delay(200) //处理这个元素消耗 200msprintln("Collected $value ${Thread.currentThread().name}")}}println("Collected in $time ms")}
参考地址
笔记大部分内容来自动脑学院的文章和视频
动脑学院
:https://blog.csdn.net/qq_30382601/article/details/121825461
Kotlin 之 协程(三)Flow异步流
:https://blog.csdn.net/zx_android/article/details/122744370
Android Kotlin之Flow数据流:https://blog.csdn.net/u013700502/article/details/120526170
相关文章:

Kotlin学习:5.2、异步数据流 Flow
Flow一、Flow1、Flow是什么东西?2、实现功能3、特点4、冷流和热流5、流的连续性6、流的构建器7、流的上下文8、指定流所在协程9、流的取消9.1、超时取消9.2、主动取消9.3、密集型任务的取消10、背压和优化10.1、buffer 操作符10.2、 flowOn10.3、conflate 操作符10.…...

EPICS synApps介绍
一、synApps是什么? 1) 一个用于同步束线用户的EPICS模块集合。 2) EPICS模块 alive, autosave, busy, calc, camac, caputRecorder, dac128V, delaygen, dxp, ip, ip330, ipUnidig, love, mca, measComp, modbus, motor, optics, quadEM,…...

Pycharm和跳板机 连接内网服务器
Pycharm和跳板机 连接内网服务器 建立配置文件 本地配置 .ssh 文件夹下配置 config 文件 Host jumpHostName xxxPort 22User xxxServerAliveInterval 30IdentityFile C:\Users\15284\.ssh\id_rsa # 通过密钥连接Host server # 同样,任意名字,随…...

mysql去重查询的三种方法
文章目录前言一、插入测试数据二、剔除重复数据方法1.方法一:使用distinct2.方法二:使用group by3.方法三:使用开窗函数总结前言 数据库生成环境中经常会遇到表中有重复的数据,或者进行关联过程中产生重复数据,下面介…...

PHP反序列化
文章目录简介POP链构造和Phar://题目[CISCN2019 华北赛区 Day1 Web1]Dropbox字符串逃逸简介 php序列化的过程就是把数据转化成一种可逆的数据结构,逆向的过程就叫做反序列化。 php将数据序列化和反序列化会用到两个函数: serialize 将对象格式化成有序的…...

什么蓝牙耳机打电话效果最好?通话效果好的无线蓝牙耳机
2023年了,TWS耳机虽说近乎人手一只了,但用户换新的需求和呼声依然热火朝天,因为我们想要听音乐、刷视频的时候都得准备,下面整理一些通话效果不错的耳机品牌。 第一款:南卡小音舱蓝牙耳机 动圈单元:13.3m…...
Tesseract centos环境安装,基于springboot图片提取文字
下载tesseract-orc https://github.com/tesseract-ocr/tesseract/tags下载leptonica wget http://www.leptonica.org/source/leptonica-1.78.0.tar.gz解压leptonica tar -xvf leptonica-1.78.0.tar.gz 配置编译安装leptonica 进文件夹 ./configure make make install安装aut…...
Elasticsearch7.8.0版本优化——写入速度优化
目录一、 写入速度优化的概述二、如何写入速度优化2.1、 批量数据提交2.2、 优化存储设备2.31、 合理使用合并2.4、 减少 Refresh2.5、 加大 Flush2.6、 减少副本的数量一、 写入速度优化的概述 ES 的默认配置,是综合了数据可靠性、写入速度、搜索实时性等因素。实使…...

【Redis】Redis主从同步中数据同步原理
【Redis】Redis主从同步中数据同步原理 文章目录【Redis】Redis主从同步中数据同步原理1. 全量同步1.1 判断是否第一次数据同步2. 增量同步3. 优化Redis主从集群4. 总结1. 全量同步 主从第一次同步是全量同步。 数据同步包括以下三个阶段: 在从节点执行slaveof命令…...

Python基础—while循环
(1)while循环: 语法格式: while 条件: 执行语句1…… 执行语句2…… 适用条件:无限循环 死循环 while True:print(条件是真的!)代码实例: i 0 # 创建一个计数的变量 while i < 5: # Truepr…...
linux基础(管道符,检索,vim和vi编辑使用)
♥️作者:小刘在C站 ♥️个人主页:小刘主页 ♥️每天分享云计算网络运维课堂笔记,努力不一定有收获,但一定会有收获加油!一起努力,共赴美好人生! ♥️夕阳下,是最美的绽放࿰…...

GAN | 代码简单实现生成对抗网络(GAN)(PyTorch)
2014年GAN发表,直到最近大火的AI生成全部有GAN的踪迹,快来简单实现它!!!GAN通过计算图和博弈论的创新组合,他们表明,如果有足够的建模能力,相互竞争的两个模型将能够通过普通的旧反向…...

华为面试题就这?00后卷王直接拿下30k华为offer......
先说一下我的情况,某211本计算机,之前在深圳那边做了大约半年多少儿编程老师,之后内部平调回长沙这边,回来之后发现有点难,这边可能是业绩难做,虚假承诺很厉害,要给那些家长虚假承诺去骗人家&am…...

html的常见标签使用
目录 1.vscode基础操作 2.html基础 语法 3.HTML文件的基本结构标签 4.注释标签 5.标题标签 6.段落标签:p 7.格式化标签 8.图片标签:img 绝对路径 相对路径 网络路径 alt属性 title属性 width/height属性 9.超链接标签:a 10.表格标签 11.列表标签 有序列表 无…...

STM32——毕设智能感应窗户
智能感应窗户 一、功能设计 以STM32F103芯片最小系统作为主控,实现自动监测、阈值设定功能和手动控制功能。 1、自动监测模式下: ① 采用温湿度传感器,实现采集当前环境的温度、湿度数值。 ② 采用光敏传感器,实现判断当前的环境…...
golang archive/tar库的学习
archive/tar 是 Golang 标准库中用于读取和写入 tar 归档文件的包。tar 是一种常见的文件压缩格式,它可以将多个文件和目录打包成单个文件,可以用于文件备份、传输等场景。 以下是一些学习 archive/tar 包的建议: 了解 tar 文件格式。在学习…...

MongoDB 详细教程,这一篇就够啦
文章目录1. 简介2. 特点3. 应用场景4. 安装(docker)5. 核心概念5.1 库5.2 集合5.3 文档6. 基本操作6.1 库6.1.1 增6.1.2 删6.1.3 改6.1.4 查6.2 集合6.2.1 增6.2.2 删6.2.3 改6.2.4 查6.3. 文档6.3.1 增6.3.2 删6.3.3 改6.3.4 查1. 语法2. 对比语法3. AN…...
python为什么慢
解释性 python是动态类型解释性语言,不管使用哪种解释器 因为“解释性语言”这个概念更多地是指代码的执行方式,而不是编译方式。在解释性语言中,代码在执行时会一行一行地解释并执行,而不是预先编译为机器语言。而即使使用了PyP…...

Android kotlin 组件间通讯 - LiveEventBus 及测试(更新中)
<<返回总目录 文章目录 一、LiveEventBus是什么二、测试一、LiveEventBus是什么 LiveEventBus是Android中组件间传递消息,支持AndroidX,Event:事件,Bus:总线 范围全覆盖的消息总线解决方案 进程内消息发送App内,跨进程消息发送App之间的消息发送更多特性支持 免配…...

linux服务器时间同步
Linux服务器时间同步 需求:两台以上服务器之间的时间同步,以其中一台服务器为时间源,其余服务器同步这台时间源服务器的时间 其中,时间源服务器需要有访问外网权限,不然时间源服务器无法同互联网同步最新的时间&#…...

51c自动驾驶~合集58
我自己的原文哦~ https://blog.51cto.com/whaosoft/13967107 #CCA-Attention 全局池化局部保留,CCA-Attention为LLM长文本建模带来突破性进展 琶洲实验室、华南理工大学联合推出关键上下文感知注意力机制(CCA-Attention),…...

Linux相关概念和易错知识点(42)(TCP的连接管理、可靠性、面临复杂网络的处理)
目录 1.TCP的连接管理机制(1)三次握手①握手过程②对握手过程的理解 (2)四次挥手(3)握手和挥手的触发(4)状态切换①挥手过程中状态的切换②握手过程中状态的切换 2.TCP的可靠性&…...

MODBUS TCP转CANopen 技术赋能高效协同作业
在现代工业自动化领域,MODBUS TCP和CANopen两种通讯协议因其稳定性和高效性被广泛应用于各种设备和系统中。而随着科技的不断进步,这两种通讯协议也正在被逐步融合,形成了一种新型的通讯方式——开疆智能MODBUS TCP转CANopen网关KJ-TCPC-CANP…...
关于 WASM:1. WASM 基础原理
一、WASM 简介 1.1 WebAssembly 是什么? WebAssembly(WASM) 是一种能在现代浏览器中高效运行的二进制指令格式,它不是传统的编程语言,而是一种 低级字节码格式,可由高级语言(如 C、C、Rust&am…...
Pinocchio 库详解及其在足式机器人上的应用
Pinocchio 库详解及其在足式机器人上的应用 Pinocchio (Pinocchio is not only a nose) 是一个开源的 C 库,专门用于快速计算机器人模型的正向运动学、逆向运动学、雅可比矩阵、动力学和动力学导数。它主要关注效率和准确性,并提供了一个通用的框架&…...
Go 语言并发编程基础:无缓冲与有缓冲通道
在上一章节中,我们了解了 Channel 的基本用法。本章将重点分析 Go 中通道的两种类型 —— 无缓冲通道与有缓冲通道,它们在并发编程中各具特点和应用场景。 一、通道的基本分类 类型定义形式特点无缓冲通道make(chan T)发送和接收都必须准备好࿰…...

【C++进阶篇】智能指针
C内存管理终极指南:智能指针从入门到源码剖析 一. 智能指针1.1 auto_ptr1.2 unique_ptr1.3 shared_ptr1.4 make_shared 二. 原理三. shared_ptr循环引用问题三. 线程安全问题四. 内存泄漏4.1 什么是内存泄漏4.2 危害4.3 避免内存泄漏 五. 最后 一. 智能指针 智能指…...

STM32HAL库USART源代码解析及应用
STM32HAL库USART源代码解析 前言STM32CubeIDE配置串口USART和UART的选择使用模式参数设置GPIO配置DMA配置中断配置硬件流控制使能生成代码解析和使用方法串口初始化__UART_HandleTypeDef结构体浅析HAL库代码实际使用方法使用轮询方式发送使用轮询方式接收使用中断方式发送使用中…...

给网站添加live2d看板娘
给网站添加live2d看板娘 参考文献: stevenjoezhang/live2d-widget: 把萌萌哒的看板娘抱回家 (ノ≧∇≦)ノ | Live2D widget for web platformEikanya/Live2d-model: Live2d model collectionzenghongtu/live2d-model-assets 前言 网站环境如下,文章也主…...

阿里云Ubuntu 22.04 64位搭建Flask流程(亲测)
cd /home 进入home盘 安装虚拟环境: 1、安装virtualenv pip install virtualenv 2.创建新的虚拟环境: virtualenv myenv 3、激活虚拟环境(激活环境可以在当前环境下安装包) source myenv/bin/activate 此时,终端…...