一个Hot Flow可以以广播的形式为所有的订阅者共享已发射的值,其特性如下
共享的shared flow永远不会结束,shared flow的collector即收集者可以被称作
shared flowd的订阅者是可以取消的。在订阅者所在协程结束时,订阅者会自动取消订阅
构建shared flow 可以通过MutableSharedFlow(...)构造函数来实现,也可以通过将cold flow通过shareIn操作符进行转换
-
重播缓存和缓冲
一个shared flow可以指定重播缓存的最近值的数量。每一个订阅者首先会从重播缓存获取发射值然后再获取shared flow发射的值。重播缓存可以为shared flow的发射值提供缓冲,使慢订阅者可以从缓冲获取发射值,避免挂起发射者。
-
缓冲溢出策略BufferOverflow
- SUSPEND:发射者挂起
- DROP_OLDEST:丢弃缓冲中最久的值,不会挂起
- DROP_LATEST:丢弃缓冲中最近的值,不会挂起
MutableSharedFlow()默认构造函数是没有重播缓存和缓冲的
对于默认构造函数构建的MutableSharedFlow()的[MutableSharedFlow.emit]方法调用会导致sharedflow挂起,直到所有的订阅者都接受到发射值,或者如果没有任何订阅者的情况下会立马返回
对于[MutableSharedFlow.tryEmit]函数的调用成功的话会立马返回true,假若没有订阅者,也会立马返回
对于shared flow 方法的调用都是线程安全的
简单示例如下
internal object EventBus {
/**
* private mutable shared flow
*/
private val mutableSharedFlow = MutableSharedFlow<Event>()
/**
* publicly exposed as read-only shared flow
*/
private val asSharedFlow = mutableSharedFlow.asSharedFlow()
val eventBus: SharedFlow<Event>
get() = asSharedFlow
init {
GlobalScope.launch {
//日志打印当前订阅的订阅者数量
mutableSharedFlow.subscriptionCount.collect {
Log.d("flow", "subscriptionCount $it")
}
}
}
/**
* 发布事件
* Launches a new coroutine without blocking the current thread and returns a reference to the coroutine as a [Job].
* The coroutine is cancelled when the resulting job is [cancelled][Job.cancel].
*/
fun <T : Event> LifecycleOwner.produceEvent(event: T): Job {
// suspends until all subscribers receive it
return lifecycleScope.launch {
mutableSharedFlow.emit(event)
}
}
/**
* 在GlobalScope中发布
*/
fun <T : Event> produceEventGlobal(event: T) {
// suspends until all subscribers receive it
GlobalScope.launch {
mutableSharedFlow.emit(event)
}
}
/**
* Launches and runs the given block when the [Lifecycle] controlling this
* [LifecycleCoroutineScope] is at least in [Lifecycle.State.CREATED] state.
*
* The returned [Job] will be cancelled when the [Lifecycle] is destroyed.
*/
fun <T : Event> LifecycleOwner.produceEventWhenCreated(event: T): Job {
// suspends until all subscribers receive it
return lifecycleScope.launchWhenCreated {
mutableSharedFlow.emit(event)
}
}
/**
* Launches and runs the given block when the [Lifecycle] controlling this
* [LifecycleCoroutineScope] is at least in [Lifecycle.State.STARTED] state.
*
* The returned [Job] will be cancelled when the [Lifecycle] is destroyed.
*/
fun <T : Event> LifecycleOwner.produceEventWhenStared(event: T): Job {
// suspends until all subscribers receive it
return lifecycleScope.launchWhenStarted {
mutableSharedFlow.emit(event)
}
}
/**
* Launches and runs the given block when the [Lifecycle] controlling this
* [LifecycleCoroutineScope] is at least in [Lifecycle.State.RESUMED] state.
*
* The returned [Job] will be cancelled when the [Lifecycle] is destroyed.
*/
fun <T : Event> LifecycleOwner.produceEventWhenResumed(event: T): Job {
// suspends until all subscribers receive it
return lifecycleScope.launchWhenResumed {
mutableSharedFlow.emit(event)
}
}
/**
* subscribe event
* The returned [Job] can be cancelled
*/
inline fun LifecycleOwner.subscribeEvent(
crossinline predicate: suspend (e: Event) -> Boolean,
crossinline action: suspend (e: Event) -> Unit,
): Job {
return eventBus
.filter { predicate.invoke(it) }
.onEach {
action.invoke(it)
}.cancellable()
.launchIn(lifecycleScope)
}
}
open class Event(open val key: String)