一、Flow
1.Flow 是什么?
- Flow 库是在 Kotlin Coroutines 1.3.2 发布之后新增的库,也叫做异步流,类似 RxJava 的 Observable、Flowable 等等,所以很多人都用 Flow 与 RxJava 做对比。
- Flow 是对 Kotlin 协程的扩展,让我们可以像运行同步代码一样运行异步代码,使得代码更加简洁,提高了代码的可读性。
2.如何创建 Flow?
-
使用 flow 创建,生产者和消费者的通信是同步非阻塞的。
private fun testFlow() { CoroutineScope(Job()).launch { Log.d(TAG, "zwm, launch thread: ${Thread.currentThread().name}") flow { for (i in 1..3) { delay(500) Log.d(TAG, "zwm, emit: $i thread: ${Thread.currentThread().name}") emit(i) } }.collect { Log.d(TAG, "zwm, collect: $it thread: ${Thread.currentThread().name}") } } } //日志打印 2022-07-16 20:46:39.545 11250-11358/com.tomorrow.target30 D/KtActivity: zwm, launch thread: DefaultDispatcher-worker-1 2022-07-16 20:46:40.050 11250-11358/com.tomorrow.target30 D/KtActivity: zwm, emit: 1 thread: DefaultDispatcher-worker-1 2022-07-16 20:46:40.051 11250-11358/com.tomorrow.target30 D/KtActivity: zwm, collect: 1 thread: DefaultDispatcher-worker-1 2022-07-16 20:46:40.553 11250-11358/com.tomorrow.target30 D/KtActivity: zwm, emit: 2 thread: DefaultDispatcher-worker-1 2022-07-16 20:46:40.553 11250-11358/com.tomorrow.target30 D/KtActivity: zwm, collect: 2 thread: DefaultDispatcher-worker-1 2022-07-16 20:46:41.055 11250-11358/com.tomorrow.target30 D/KtActivity: zwm, emit: 3 thread: DefaultDispatcher-worker-1 2022-07-16 20:46:41.055 11250-11358/com.tomorrow.target30 D/KtActivity: zwm, collect: 3 thread: DefaultDispatcher-worker-1
-
使用 channelFlow 创建,生产者和消费者的通信是异步非阻塞的。
@ExperimentalCoroutinesApi private fun testFlow() { CoroutineScope(Job()).launch { Log.d(TAG, "zwm, launch thread: ${Thread.currentThread().name}") channelFlow { for (i in 1..3) { delay(500) Log.d(TAG, "zwm, send: $i thread: ${Thread.currentThread().name}") send(i) } }.collect { Log.d(TAG, "zwm, collect: $it thread: ${Thread.currentThread().name}") } } } //日志打印 2022-07-16 20:37:37.327 9531-9605/com.tomorrow.target30 D/KtActivity: zwm, launch thread: DefaultDispatcher-worker-2 2022-07-16 20:37:37.839 9531-9604/com.tomorrow.target30 D/KtActivity: zwm, send: 1 thread: DefaultDispatcher-worker-1 2022-07-16 20:37:37.841 9531-9605/com.tomorrow.target30 D/KtActivity: zwm, collect: 1 thread: DefaultDispatcher-worker-2 2022-07-16 20:37:38.342 9531-9605/com.tomorrow.target30 D/KtActivity: zwm, send: 2 thread: DefaultDispatcher-worker-2 2022-07-16 20:37:38.343 9531-9604/com.tomorrow.target30 D/KtActivity: zwm, collect: 2 thread: DefaultDispatcher-worker-1 2022-07-16 20:37:38.844 9531-9604/com.tomorrow.target30 D/KtActivity: zwm, send: 3 thread: DefaultDispatcher-worker-1 2022-07-16 20:37:38.845 9531-9605/com.tomorrow.target30 D/KtActivity: zwm, collect: 3 thread: DefaultDispatcher-worker-2
3.如何切换线程?
private fun testFlow() {
CoroutineScope(Job()).launch {
Log.d(TAG, "zwm, launch thread: ${Thread.currentThread().name}")
flow {
for (i in 1..3) {
delay(500)
Log.d(TAG, "zwm, emit: $i thread: ${Thread.currentThread().name}")
emit(i)
}
}.flowOn(Dispatchers.IO)
.collect {
Log.d(TAG, "zwm, collect: $it thread: ${Thread.currentThread().name}")
}
}
}
//日志打印
2022-07-16 20:54:55.376 13260-13337/com.tomorrow.target30 D/KtActivity: zwm, launch thread: DefaultDispatcher-worker-1
2022-07-16 20:54:55.889 13260-13338/com.tomorrow.target30 D/KtActivity: zwm, emit: 1 thread: DefaultDispatcher-worker-2
2022-07-16 20:54:55.892 13260-13337/com.tomorrow.target30 D/KtActivity: zwm, collect: 1 thread: DefaultDispatcher-worker-1
2022-07-16 20:54:56.393 13260-13337/com.tomorrow.target30 D/KtActivity: zwm, emit: 2 thread: DefaultDispatcher-worker-1
2022-07-16 20:54:56.395 13260-13338/com.tomorrow.target30 D/KtActivity: zwm, collect: 2 thread: DefaultDispatcher-worker-2
2022-07-16 20:54:56.895 13260-13338/com.tomorrow.target30 D/KtActivity: zwm, emit: 3 thread: DefaultDispatcher-worker-2
2022-07-16 20:54:56.897 13260-13337/com.tomorrow.target30 D/KtActivity: zwm, collect: 3 thread: DefaultDispatcher-worker-1
4.操作符有哪些?
map 操作符
filter 操作符
末端操作符,如 collect、toList、reduce、fold、onEach 等
flowOn 操作符
retry 操作符
zip 操作符
combine 操作符
其它操作符,如 flattenMerge、take、drop、onEach、onStart 等
-
协程背压
Kotlin 协程支持背压。Kotlin 协程中所有函数都标有 suspend 修饰符,具有在不阻塞线程的情况下挂起调用程序执行的强大功能。因此,当流的收集器不堪重负时,它可以简单地挂起发射器,并在准备好接受更多元素时稍后将其恢复。
buffer 操作符:没有固定大小,可以无限制添加数据,不会抛出 MissingBackpressureException 异常,但可能会导致 OOM。
conflate 操作符:如果缓存池满了,新数据会覆盖老数据。
collectLatest 操作符:不会直接用新数据覆盖老数据,而是每一个都会被处理,只不过如果前一个还没被处理完后一个就来了的话,处理前一个数据的逻辑就会被取消。
二、冷流与热流
1.冷流是什么?
在 Kotlin 中,Flow 是冷流。只有订阅者订阅时,才开始执行发射数据流的代码。并且冷流和订阅者只能是一对一的关系,当有多个不同的订阅者时,消息是重新完整发送的。也就是说对冷流而言,有多个订阅者的时候,它们各自的事件是独立的。
2.热流是什么?
在 Kotlin 中,StateFlow、SharedFlow 是热流。无论有没有订阅者订阅,事件始终都会发生。当热流有多个订阅者时,热流与订阅者们的关系是一对多的关系,可以与多个订阅者共享信息。
三、StateFlow
1.StateFlow 是什么?
StateFlow 是一个状态容器式可观察数据流,可以向其收集器发出当前状态更新和新状态更新。可以通过其 value 属性读取当前状态值,如需更新状态并将其发送到数据流,那么就需要使用 MutableStateFlow。
2.StateFlow 如何使用?
private fun testFlow() {
val data = MutableStateFlow(100) //需要有初始值
CoroutineScope(Job()).launch {
Log.d(TAG, "zwm, launch thread: ${Thread.currentThread().name}")
data.collect {
Log.d(TAG, "zwm, collect thread: ${Thread.currentThread().name}, data: ${data.value}")
}
}
Handler(Looper.getMainLooper()).postDelayed(Runnable {
Log.d(TAG, "zwm, postDelayed thread: ${Thread.currentThread().name}")
CoroutineScope(Job()).launch {
Log.d(TAG, "zwm, launch thread: ${Thread.currentThread().name}")
Log.d(TAG, "zwm, current data: ${data.value}")
data.value = 200 //上一个值为 100,更新值为 200,正常接收
// data.value = 100 //上一个值为 100,更新值为 100,不接收
}
}, 3000)
}
//日志打印
2022-07-17 10:35:13.175 3097-3325/com.tomorrow.target30 D/KtActivity: zwm, launch thread: DefaultDispatcher-worker-1
2022-07-17 10:35:13.176 3097-3325/com.tomorrow.target30 D/KtActivity: zwm, collect thread: DefaultDispatcher-worker-1, data: 100
2022-07-17 10:35:16.174 3097-3097/com.tomorrow.target30 D/KtActivity: zwm, postDelayed thread: main
2022-07-17 10:35:16.176 3097-3325/com.tomorrow.target30 D/KtActivity: zwm, launch thread: DefaultDispatcher-worker-1
2022-07-17 10:35:16.176 3097-3325/com.tomorrow.target30 D/KtActivity: zwm, current data: 100
2022-07-17 10:35:16.178 3097-3326/com.tomorrow.target30 D/KtActivity: zwm, collect thread: DefaultDispatcher-worker-2, data: 200
四、SharedFlow
1.SharedFlow 是什么?
SharedFlow 提供了 SharedFlow 与 MutableSharedFlow 两个版本,平时使用较多的是 MutableSharedFlow。MutableSharedFlow 没有起始值,发送数据时需要调用 emit()、tryEmit() 方法。构造函数如下:
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>
replay:表示当新的订阅者 Collect 时,发送几个已经发送过的数据给它,默认为 0,即默认新订阅者不会获取以前的数据。
extraBufferCapacity:表示减去 replay,MutableSharedFlow 还缓存多少数据,默认为 0。
onBufferOverflow:表示缓存策略,即缓冲区满了之后 Flow 如何处理,默认为挂起。除此之外,还支持 DROP_OLDEST 和 DROP_LATEST。
2.SharedFlow 如何使用?
private fun testFlow() {
val data = MutableSharedFlow<Int>() //不需要初始值
CoroutineScope(Job()).launch {
Log.d(TAG, "zwm, launch thread: ${Thread.currentThread().name}")
data.collect {
Log.d(TAG, "zwm, collect thread: ${Thread.currentThread().name}, data: $it")
}
}
Handler(Looper.getMainLooper()).postDelayed(Runnable {
Log.d(TAG, "zwm, postDelayed thread: ${Thread.currentThread().name}")
CoroutineScope(Job()).launch {
Log.d(TAG, "zwm, launch thread: ${Thread.currentThread().name}")
data.emit(200)
data.emit(200) //上一个值为 200,更新值为 200,正常接收
}
}, 3000)
}
//日志打印
2022-07-17 10:49:00.782 5908-6008/com.tomorrow.target30 D/KtActivity: zwm, launch thread: DefaultDispatcher-worker-1
2022-07-17 10:49:03.782 5908-5908/com.tomorrow.target30 D/KtActivity: zwm, postDelayed thread: main
2022-07-17 10:49:03.787 5908-6008/com.tomorrow.target30 D/KtActivity: zwm, launch thread: DefaultDispatcher-worker-1
2022-07-17 10:49:03.793 5908-6010/com.tomorrow.target30 D/KtActivity: zwm, collect thread: DefaultDispatcher-worker-2, data: 200
2022-07-17 10:49:03.801 5908-6010/com.tomorrow.target30 D/KtActivity: zwm, collect thread: DefaultDispatcher-worker-2, data: 200
3.StateFlow 与 SharedFlow 的区别是什么?
- StateFlow 与 SharedFlow 都是热流,都是为了满足流的多个订阅者的使用场景。
- SharedFlow 配置更为灵活,支持配置重播个数、缓冲区大小等。StateFlow 是 SharedFlow 的特殊化版本,重播个数固定为 1,缓冲区大小默认为 0。
- StateFlow 当 value 重复时,不会回调 collect 给新的订阅者。SharedFlow 支持发射和接收重复值。
- StateFlow 只会重播当前最新值,SharedFlow 可配置重播元素个数,默认为 0,即不重播。
- StateFlow 与 LiveData 类似,支持通过 myFlow.value 获取当前状态,如果有这个需求,必须使用 StateFlow。