Kotlin flow学习笔记

学习:https://juejin.cn/post/7031726493906829319 感觉别人写的很好,转载了部分,版权是人家的

Flow默认是Cold的,生产者和消费者的通信是同步非阻塞的,也就是生产和消费会顺序交替进行

image.png
  lifecycleScope.launch {


            flow {
                Log.d("haha",Thread.currentThread().name)
                emit("123")
            }.flowOn(Dispatchers.IO) //切换线程
                .onStart {
                    println("onStart")
                }.catch {
                    println("catch:${it.message}")//有异常会进入此方法
                }.onCompletion {
                    println("oniComplete:${it?.message}")//无论是否有异常都会执行
                }.collect {//收集流
                    Log.d("haha",Thread.currentThread().name)
                    println("result = $it")
                }



        }

flowOn以上的代码运行在flowon的线程上,collect中的代码运行在调用者的线程上

末端操作符

collect

搜集上游发送的数据

  lifecycleScope.launch {

            flow {
                emit("haha")

                delay(5000)
                emit("ddddd")

            }.flowOn(Dispatchers.IO)
                .collect{

                println(it)

            }
            println("tttt")

        }


打印haha,5秒后打印dddd和tttt,collect 没搜集完毕时,不会往下执行。

Backpressure 背压

MISSING:创建的 Flowable 没有指定背压策略,不会对emit 发射的数据做缓存或丢弃处理。

  lifecycleScope.launch {

            flow {
                emit("haha")

                delay(2000)
                emit("ddddd")

            }
                .collect {

                    println("collect start$it")
                    delay(5000)
                    println("collect end$it")
                }

        }

打印:
2022-08-21 15:15:07 /System.out: collect starthaha
2022-08-21 15:15:12 /System.out: collect endhaha
2022-08-21 15:15:14 /System.out: collect startddddd
2022-08-21 15:15:19 /System.out: collect endddddd

1 先执行 emit("haha") 发现通道buffer满了,挂起
2 再执行 println("collect start$it")
3 delay5秒
4 collect执行完毕后 执行delay2秒,2秒后emit("ddddd")挂起
5 继续执行collect

collect后才会emit下一个,因为没有缓冲

buffer
 lifecycleScope.launch {

            flow {
                println("emit 1111")
                emit(1111)
                println("emit 2222")
                emit(2222)
                println("emit 3333")
                emit(3333)
            }

                .collect {

                    println("collect $it")

                }

        }

打印
2022-08-22 21:46:22 I/System.out: emit 1111
2022-08-22 21:46:22 I/System.out: collect 1111
2022-08-22 21:46:22 I/System.out: emit 2222
2022-08-22 21:46:22 I/System.out: collect 2222
2022-08-22 21:46:22 I/System.out: emit 3333
2022-08-22 21:46:22 I/System.out: collect 3333

由于没有buffer,发送一个,通道满了挂起,消费一个,就继续发送下一个



        lifecycleScope.launch {

            flow {
                println("emit 1111")
                emit(1111)

                println("emit 2222")
                emit(2222)
                println("emit 3333")
                emit(3333)
                println("emit 444")
                emit(444)
                println("emit 555")
                emit(555)

            }.buffer(1)

                .collect {

                    println("collect $it")
                }

        }

打印
2022-08-22 22:06:15 I/System.out: emit 1111
2022-08-22 22:06:15 I/System.out: emit 2222
2022-08-22 22:06:15 I/System.out: emit 3333
2022-08-22 22:06:15 I/System.out: collect 1111
2022-08-22 22:06:15 I/System.out: collect 2222
2022-08-22 22:06:15 I/System.out: collect 3333
2022-08-22 22:06:15 I/System.out: emit 444
2022-08-22 22:06:15 I/System.out: emit 555
2022-08-22 22:06:15 I/System.out: collect 444
2022-08-22 22:06:15 I/System.out: collect 555

buffer 容量设置1, 发送111,缓冲没满 继续发送222,发送222后,去发送333时,发现通道满了,挂起,搜集了111后,开始发送444

buffer(1,BufferOverflow.DROP_LATEST)

则只会搜集1111 2222,超出缓冲后,最新的都被丢了

buffer(1,BufferOverflow.DROP_OLDEST)

则只会搜集1111 5555,超出缓冲后,旧的的都被丢了

缓冲超出缓冲区策略:


A strategy for buffer overflow handling in channels and flows that controls what is going to be sacrificed on buffer overflow:
SUSPEND — the upstream that is sending or is emitting a value is suspended while the buffer is full.
DROP_OLDEST — drop the oldest value in the buffer on overflow, add the new value to the buffer, do not suspend.
DROP_LATEST — drop the latest value that is being added to the buffer right now on buffer overflow (so that buffer contents stay the same), do not suspend.

public enum class BufferOverflow {
    /**
     * Suspend on buffer overflow.
     */
    SUSPEND,

    /**
     * Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
     */
    DROP_OLDEST,

    /**
     * Drop **the latest** value that is being added to the buffer right now on buffer overflow
     * (so that buffer contents stay the same), do not suspend.
     */
    DROP_LATEST
}


map

转换上游发送的数据类型后到下游

     lifecycleScope.launch {

            flow {
                emit(1111)


            }.map {

                "map" + it
            }

                .collect {

                    println("collect $it")
                }

        }
transform

把上游的每一次发送,转换成任意次发送

       lifecycleScope.launch {

            flow {
                emit("aaaa")

            }
                .transform {

                    emit(it+"1111")
                    emit(it+"2222")
                    emit(it+"3333")

                }

                .collect {

                    println("collect $it")
                }

        }

打印
collect aaaa1111
collect aaaa2222
collect aaaa3333

onEach

遍历上游,再发往下游,所以必须有collect才会执行,因为flow是冷流

   lifecycleScope.launch {

            flow {
                emit("aaaa")
                emit("bbbb")

            } .onEach { println("onEach: $it") }

                .collect {

                    println("collect $it")
                }

        }

打印
onEach: aaaa
collect aaaa
onEach: bbbb
collect bbbb

take

取前面几个发送的

   lifecycleScope.launch {


            (1..6).asFlow()
                .take(2)
                .collect{

                    println(it)

                }

        }

打印 1 2

zip

把2个流 一一对应合并,数量不一致 取小的数量,时间耗费时长,取长的时间消耗

    lifecycleScope.launch {


            val flowa = (1..6).asFlow()


            val flowb = flowOf("aaa", "bbb")

            flowa.zip(flowb) { a, b ->
                b + a
            }.collect {

                println(it)

            }

        }

打印
aaa1
bbb2

flattenContact

把流依次展开并且发送

        lifecycleScope.launch {


            val flowa = (1..3).asFlow()
            val flowb = flowOf("aaa", "bbb")
           
            flowOf(flowa,flowb)
                .flattenConcat()
                .collect{

                    println(it)

                }
            
        }

打印
1
2
3
aaa
bbb

flattenMerge

把流展开,可以设置并发数量,默认16,如果设置1就与 flattenConcat 一致。

     lifecycleScope.launch {


            val flowa = (1..3).asFlow().onEach { delay(100) }
            val flowb = flowOf("aaa", "bbb").onEach { delay(200) }

            flowOf(flowa, flowb)
                .flattenMerge(2)
                .collect {

                    println(it)

                }

        }

打印
1
aaa
2
3
bbb

StateFlow 和 SharedFlow

用于上游发射数据,能同时被多个订阅者收集数据。

1 StateFlow

StateFlow 是一个状态容器式可观察数据流,可以向其收集器发出当前状态更新和新状态更新。
StateFlow 非常适合需要让可变状态保持可观察的类,任何对值的更新,都会被搜集到。


class MyViewModel : ViewModel() {

    private val _state = MutableStateFlow<String>("null")
    val state: StateFlow<String> get() = _state


    fun getData() {

        viewModelScope.launch {

            _state.value = "hahaha"

        }

    }


}


class MainActivity : AppCompatActivity() {


    val model: MyViewModel by viewModels()


    @OptIn(FlowPreview::class)
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)


        lifecycleScope.launch {

            model.state.collect {

                println(it)

            }
            println("end")
        }
        lifecycleScope.launch {

            model.state.collect {

                println(it)

            }
            println("end")
        }

        
        model.getData()
        
    }
}

打印
null
null
hahaha
hahaha

1 StateFlow 是发射的数据可以被在不同的协程中,被多个接受者同时收集的。
2 StateFlow 是热流,只要数据发生变化,就会发射数据
3 StateFlow 的收集者调用 collect 会挂起当前协程,而且永远不会结束。
4 StateFlow 不跟声明周期绑定,需要手动取消订阅者协程 repeatOnLifecycle(Lifecycle.State.STARTED)
5 StateFlow跟LiveData一样 只会发送最新的数据给订阅者,例如
6 StateFlow是粘性的,横竖屏切换后,会把最后一次的数据重新发送给搜集者
7 StateFlow防抖,重复的数据不会更新
8 StateFlow上游发送数据比下游搜集数据快,会把旧的数据丢弃,只搜集最新的

比较适用于页面状态的更新

class MyViewModel : ViewModel() {

    private val _state = MutableStateFlow<String>("null")
    val state: StateFlow<String> get() = _state


    fun getData() {

        viewModelScope.launch {

            _state.value = "111"
            _state.value = "222"
            _state.value = "333"
            _state.value = "444"

        }

    }


}

class MainActivity : AppCompatActivity() {


    val model: MyViewModel by viewModels()


    @OptIn(FlowPreview::class)
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)


        lifecycleScope.launch {

                model.state.collect {

                    println(it)

                }

        }

        model.getData()
    }
}

只会打印null和44444

SharedFlow

1 SharedFlow没有默认值
2 SharedFlow会挂起直到所有的订阅者处理完成。


public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> 

replay 是重放个数,默认为0,0的意思是,不会新的搜集者搜集,不会把最后一次数据提供给新的搜集者,如果设置为1,那就是有粘性了。
extraBufferCapacity 缓冲数量
onBufferOverflow 超过后的处理规则
这2个参数类似flow里面的buffer,不过这里的buffer缓存数量是replay+onBufferOverflow

3 SharedFlow适合作为事件总线 分发事件

class EventBus {
    private val _events = MutableSharedFlow<Event>() 
    val events = _events.asSharedFlow() 

    suspend fun produceEvent(event: Event) {
        _events.emit(event)
    }
}

事件:希望每一条事件都会被消费,消费结束后挂起。适合使用SharedFlow
状态:显示页面,期望显示最新的,并且切换横竖屏后再次搜集,希望得到最后一次数据(粘性),也允许丢弃中间的数据,只显示最新的数据。适合使用StateFlow

ChannelFlow

冷流,异步非阻塞
flow{ ... } 可以通过 flowOn切换线程 切换成子线程后和 channelFlow花的时间差不多

image.png
基于Flow/Channel的MVI如何实现

抽象出基类BaseViewModel
UiState是可以表征UI的Model,用StateFlow承载(也可以使用LiveData)
UiEvent是表示交互事件的Intent,用SharedFlow承载
UiEffect是事件带来除了改变UI以外的副作用,用channelFlow承载


BaseViewModel.kt

abstract class BaseViewModel<State : UiState, Event : UiEvent, Effect : UiEffect> : ViewModel() {

    /**
     * 初始状态
     * stateFlow区别于LiveData必须有初始值
     */
    private val initialState: State by lazy { createInitialState() }

    abstract fun createInitialState(): State

    /**
     * uiState聚合页面的全部UI 状态
     */
    private val _uiState: MutableStateFlow<State> = MutableStateFlow(initialState)
    
    val uiState = _uiState.asStateFlow()


    /**
     * event包含用户与ui的交互(如点击操作),也有来自后台的消息(如切换自习模式)
     */
     private val _event: MutableSharedFlow<Event> = MutableSharedFlow()
     
     val event = _event.asSharedFlow()
     
     
    /**
     * effect用作 事件带来的副作用,通常是 一次性事件 且 一对一的订阅关系
     * 例如:弹Toast、导航Fragment等
     */
     private val _effect: Channel<Effect> = Channel()

     val effect = _effect.receiveAsFlow()


    init {
        subscribeEvents()
    }

    private fun subscribeEvents() {
        viewModelScope.launch {
            event.collect {
                handleEvent(it)
            }
        }
    }

    protected abstract fun handleEvent(event: Event)

    fun sendEvent(event: Event) {
        viewModelScope.launch {
            _event.emit(event)
        }
     }

    protected fun setState(reduce: State.() -> State) {
        val newState = currentState.reduce()
        _uiState.value = newState
    }

    protected fun setEffect(builder: () -> Effect) {
        val newEffect = builder()
        viewModelScope.launch {
            _effect.send(newEffect)
        }
     }
}


interface UiState

interface UiEvent

interface UiEffect

StateFlow基本等同于LiveData,区别在于StateFlow必须有初值,这也更符合页面必须有初始状态的逻辑。一般使用data class实现UiState,页面所有元素的状态用成员变量表示。
用户交互事件用SharedFlow,具有时效性且支持一对多订阅,使用它可以解决上文提到的痛点二问题。
消费事件带来的副作用影响用ChannelFlow承载,不会丢失且一对一订阅,只执行一次。使用它可以解决上文提到的痛点一问题。
协议类,定义具体业务需要的State、Event、Effect类


class NoteContract {

    /**
    * pageTitle: 页面标题
    * loadStatus: 上拉加载的状态
    * refreshStatus: 下拉刷新的状态
    * noteList : 备忘录列表
    */
    data class State(
        val pageTitle: String,
        val loadStatus: LoadStatus,
        val refreshStatus: RefreshStatus,
        val noteList: MutableList<NoteItem>
    ) : UiState

    sealed class Event : UiEvent {
        // 下拉刷新事件
        object RefreshNoteListEvent : Event()
        
        // 上拉加载事件
        object LoadMoreNoteListEvent: Event()

        // 添加按键点击事件
        object AddingButtonClickEvent : Event()

        // 列表item点击事件
        data class ListItemClickEvent(val item: NoteItem) : Event()

        // 添加项弹窗消失事件
        object AddingNoteDialogDismiss : Event()

        // 添加项弹窗添加确认点击事件
        data class AddingNoteDialogConfirm(val title: String, val desc: String) : Event()

        // 添加项弹窗取消确认点击事件
        object AddingNoteDialogCanceled : Event()
    }



    sealed class Effect : UiEffect {
    
        // 弹出数据加载错误Toast
        data class ShowErrorToastEffect(val text: String) : Effect()

        // 弹出添加项弹窗
        object ShowAddNoteDialog : Effect()
    }



    sealed class LoadStatus {

        object LoadMoreInit : LoadStatus()

        object LoadMoreLoading : LoadStatus()

        data class LoadMoreSuccess(val hasMore: Boolean) : LoadStatus()

        data class LoadMoreError(val exception: Throwable) : LoadStatus()

        data class LoadMoreFailed(val errCode: Int) : LoadStatus()

    }



    sealed class RefreshStatus {

        object RefreshInit : RefreshStatus()

        object RefreshLoading : RefreshStatus()

        data class RefreshSuccess(val hasMore: Boolean) : RefreshStatus()

        data class RefreshError(val exception: Throwable) : RefreshStatus()

        data class RefreshFailed(val errCode: Int) : RefreshStatus()

    }

}

在生命周期组件中收集状态变化流和一次性事件流,发送用户交互事件

class NotePadActivity : BaseActivity() {

      ...

    override fun initObserver() {
        super.initObserver()
        lifecycleScope.launchWhenStarted {
            viewModel.uiState.collect {
                when (it.loadStatus) {
                    is NoteContract.LoadStatus.LoadMoreLoading -> {
                        adapter.loadMoreModule.loadMoreToLoading()
                    }
                    ...
                }
                
                when (it.refreshStatus) {
                    is NoteContract.RefreshStatus.RefreshSuccess -> {
                        adapter.setDiffNewData(it.noteList)
                        refresh_layout.finishRefresh()
                        if (it.refreshStatus.hasMore) {
                            adapter.loadMoreModule.loadMoreComplete()
                        } else {
                            adapter.loadMoreModule.loadMoreEnd(false)
                        }
                    }
                    ...
                }
                
                txv_title.text = it.pageTitle
                txv_desc.text = "${it.noteList.size}条记录"
            }
        }

        lifecycleScope.launchWhenStarted {
            viewModel.effect.collect {
                when (it) {
                
                    is NoteContract.Effect.ShowErrorToastEffect -> {
                        showToast(it.text)
                    }
                    
                    is NoteContract.Effect.ShowAddNoteDialog -> {
                        showAddNoteDialog()
                    }
                }
            }
        }
    }



    private fun initListener() {
        btn_floating.setOnClickListener {
            viewModel.sendEvent(NoteContract.Event.AddingButtonClickEvent)
        }
    }

}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,457评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,837评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,696评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,183评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,057评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,105评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,520评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,211评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,482评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,574评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,353评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,897评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,489评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,683评论 2 335