Kotlin flow实践总结

Flow是什么

按顺序发出多个值的数据流。 本质就是一个生产者消费者模型,生产者发送数据给消费者进行消费。

  • 冷流:当执行collect的时候(也就是有消费者的时候),生产者才开始发射数据流。 生产者与消费者是一对一的关系。当生产者发送数据的时候,对应的消费者才可以收到数据。
  • 热流:不管有没有执行collect(也就是不管有没有消费者),生产者都会发射数据流到内存中。 生产者与消费者是一对多的关系。当生产者发送数据的时候,多个消费者都可以收到数据

实践场景

场景一:简单列表数据的加载状态

简单的列表显示场景,可以使用onStart,onEmpty,catch,onCompletion等回调操作符,监听数据流的状态,显示相应的加载状态UI。

  • onStart:在数据发射之前触发,onStart所在的线程,是数据产生的线程
  • onCompletion:在数据流结束时触发,onCompletion所在的线程,是数据产生的线程
  • onEmpty:当数据流结束了,缺没有发出任何元素的时候触发。
  • catch:数据流发生错误的时候触发
  • flowOn:指定上游数据流的CoroutineContext,下游数据流不会受到影响
private fun coldFlowDemo() {
    //创建一个冷流,在3秒后发射一个数据
    val coldFlow = flow<Int> {
        delay(3000)
        emit(1)
    }
    lifecycleScope.launch(Dispatchers.IO) {
        coldFlow.onStart {
            Log.d(TAG, "coldFlow onStart, thread:${Thread.currentThread().name}")
            mBinding.progressBar.isVisible = true
            mBinding.tvLoadingStatus.text = "加载中"
        }.onEmpty {
            Log.d(TAG, "coldFlow onEmpty, thread:${Thread.currentThread().name}")
            mBinding.progressBar.isVisible = false
            mBinding.tvLoadingStatus.text = "数据加载为空"
        }.catch {
            Log.d(TAG, "coldFlow catch, thread:${Thread.currentThread().name}")
            mBinding.progressBar.isVisible = false
            mBinding.tvLoadingStatus.text = "数据加载错误:$it"
        }.onCompletion {
            Log.d(TAG, "coldFlow onCompletion, thread:${Thread.currentThread().name}")
            mBinding.progressBar.isVisible = false
            mBinding.tvLoadingStatus.text = "加载完成"
        }
            //指定上游数据流的CoroutineContext,下游数据流不会受到影响
            .flowOn(Dispatchers.Main)
            .collect {
                Log.d(TAG, "coldFlow collect:$it, thread:${Thread.currentThread().name}")
            }
    }
}

比如上面的例子。 使用flow构建起函数,创建一个冷流,3秒后发送一个值到数据流中。 使用onStart,onEmpty,catch,onCompletion操作符,监听数据流的状态。

日志输出:

coldFlow onStart, thread:main
coldFlow onCompletion, thread:main
coldFlow collect:1, thread:DefaultDispatcher-worker-1

场景二:同一种数据,需要加载本地数据和网络数据

在实际的开发场景中,经常会将一些网络数据保存到本地,下次加载数据的时候,优先使用本地数据,再使用网络数据。 但是本地数据和网络数据的加载完成时机不一样,所以可能会有下面几种场景。

  1. 本地数据比网络数据先加载完成:那先使用本地数据,再使用网络数据
  2. 网络数据比本地数据先加载完成:
  • 网络数据加载成功,那只使用网络数据即可,不需要再使用本地数据了。
  • 网络数据加载失败,可以继续尝试使用本地数据进行兜底。
  1. 本地数据和网络数据都加载失败:通知上层数据加载失败

实现CacheRepositity

将上面的逻辑进行简单封装成一个基类,CacheRepositity。 相应的子类,只需要实现两个方法即可。

  • CResult:代表加载结果,Success 或者 Error。
  • fetchDataFromLocal(),实现本地数据读取的逻辑
  • fetchDataFromNetWork(),实现网络数据获取的逻辑
abstract class CacheRepositity<T> {
    private val TAG = "CacheRepositity"

    fun getData() = channelFlow<CResult<T>> {
        supervisorScope {
            val dataFromLocalDeffer = async {
                fetchDataFromLocal().also {
                    Log.d(TAG,"fetchDataFromLocal result:$it , thread:${Thread.currentThread().name}")
                    //本地数据加载成功  
                    if (it is CResult.Success) {
                        send(it)
                    }
                }
            }

            val dataFromNetDeffer = async {
                fetchDataFromNetWork().also {
                    Log.d(TAG,"fetchDataFromNetWork result:$it , thread:${Thread.currentThread().name}")
                    //网络数据加载成功  
                    if (it is CResult.Success) {
                        send(it)
                        //如果网络数据已加载,可以直接取消任务,就不需要处理本地数据了
                        dataFromLocalDeffer.cancel()
                    }
                }
            }

            //本地数据和网络数据,都加载失败的情况
            val localData = dataFromLocalDeffer.await()
            val networkData = dataFromNetDeffer.await()
            if (localData is CResult.Error && networkData is CResult.Error) {
                send(CResult.Error(Throwable("load data error")))
            }
        }
    }

    protected abstract suspend fun fetchDataFromLocal(): CResult<T>

    protected abstract suspend fun fetchDataFromNetWork(): CResult<T>

}

sealed class CResult<out R> {
    data class Success<out T>(val data: T) : CResult<T>()
    data class Error(val throwable: Throwable) : CResult<Nothing>()
}

测试验证

写个TestRepositity,实现CacheRepositity的抽象方法。 通过delay延迟耗时来模拟各种场景,观察日志的输出顺序。

private fun cacheRepositityDemo(){
    val repositity=TestRepositity()
    lifecycleScope.launch {
        repositity.getData().onStart {
            Log.d(TAG, "TestRepositity: onStart")
        }.onCompletion {
            Log.d(TAG, "TestRepositity: onCompletion")
        }.collect {
            Log.d(TAG, "collect: $it")
        }
    }
}

本地数据比网络数据加载快

class TestRepositity : CacheRepositity<String>() {
    override suspend fun fetchDataFromLocal(): CResult<String> {
        delay(1000)
        return CResult.Success("data from fetchDataFromLocal")
    }

    override suspend fun fetchDataFromNetWork(): CResult<String> {
        delay(2000)
        return CResult.Success("data from fetchDataFromNetWork")
    }
}

模拟数据:本地加载delay1秒,网络加载delay2秒 日志输出:collect 执行两次,先收到本地数据,再收到网络数据。

onStart
fetchDataFromLocal result:Success(data=data from fetchDataFromLocal) , thread:main
collect: Success(data=data from fetchDataFromLocal)
fetchDataFromNetWork result:Success(data=data from fetchDataFromNetWork) , thread:main
collect: Success(data=data from fetchDataFromNetWork)
onCompletion

网络数据比本地数据加载快

class TestRepositity : CacheRepositity<String>() {
    override suspend fun fetchDataFromLocal(): CResult<String> {
        delay(2000)
        return CResult.Success("data from fetchDataFromLocal")
    }

    override suspend fun fetchDataFromNetWork(): CResult<String> {
        delay(1000)
        return CResult.Success("data from fetchDataFromNetWork")
    }
}

模拟数据:本地加载delay 2秒,网络加载delay 1秒 日志输出:collect 只执行1次,只收到网络数据。

onStart
fetchDataFromNetWork result:Success(data=data from fetchDataFromNetWork) , thread:main
collect: Success(data=data from fetchDataFromNetWork)
onCompletion

网络数据加载失败,使用本地数据

class TestRepositity : CacheRepositity<String>() {
    override suspend fun fetchDataFromLocal(): CResult<String> {
        delay(2000)
        return CResult.Success("data from fetchDataFromLocal")
    }

    override suspend fun fetchDataFromNetWork(): CResult<String> {
        delay(1000)
        return CResult.Error(Throwable("fetchDataFromNetWork Error"))
    }
}

模拟数据:本地加载delay 2秒,网络数据加载失败 日志输出:collect 只执行1次,只收到本地数据。

onStart
fetchDataFromNetWork result:Error(throwable=java.lang.Throwable: fetchDataFromNetWork Error) , thread:main
fetchDataFromLocal result:Success(data=data from fetchDataFromLocal) , thread:main
collect: Success(data=data from fetchDataFromLocal)
onCompletion

网络数据和本地数据都加载失败

class TestRepositity : CacheRepositity<String>() {
    override suspend fun fetchDataFromLocal(): CResult<String> {
        delay(2000)
        return CResult.Error(Throwable("fetchDataFromLocal Error"))
    }

    override suspend fun fetchDataFromNetWork(): CResult<String> {
        delay(1000)
        return CResult.Error(Throwable("fetchDataFromNetWork Error"))
    }
}

模拟数据:本地数据加载失败,网络数据加载失败 日志输出: collect 只执行1次,结果是CResult.Error,代表加载数据失败。

onStart
fetchDataFromNetWork result:Error(throwable=java.lang.Throwable: fetchDataFromNetWork Error) , thread:main
fetchDataFromLocal result:Error(throwable=java.lang.Throwable: fetchDataFromLocal Error) , thread:main
collect: Error(throwable=java.lang.Throwable: load data error)
onCompletion

场景三:多种数据源,按照顺序合并进行展示

在实际的开发场景中,经常一个页面的数据,是需要发起多个网络请求之后,组合数据之后再进行显示。 比如类似这种页面,3种数据,需要由3个网络请求获取得到,然后再进行相应的显示。

实现目标:

  1. 接口间不需要互相等待,哪些数据先回来,就先展示哪部分
  2. 控制数据的显示顺序

flow combine操作符

可以合并多个不同的 Flow 数据流,生成一个新的流。 只要其中某个子 Flow 数据流有产生新数据的时候,就会触发 combine 操作,进行重新计算,生成一个新的数据。

例子

class HomeViewModel : ViewModel() {

    //暴露给View层的列表数据
    val list = MutableLiveData<List<String?>>()

    //多个子Flow,这里简单都返回String,实际场景根据需要,返回相应的数据类型即可
    private val bannerFlow = MutableStateFlow<String?>(null)
    private val channelFlow = MutableStateFlow<String?>(null)
    private val listFlow = MutableStateFlow<String?>(null)


    init {
        //使用combine操作符
        viewModelScope.launch {
            combine(bannerFlow, channelFlow, listFlow) { bannerData, channelData, listData ->
                Log.d("HomeViewModel", "combine  bannerData:$bannerData,channelData:$channelData,listData:$listData")
                //只要子flow里面的数据不为空,就放到resultList里面
                val resultList = mutableListOf<String?>()
                if (bannerData != null) {
                    resultList.add(bannerData)
                }
                if (channelData != null) {
                    resultList.add(channelData)
                }
                if (listData != null) {
                    resultList.add(listData)
                }
                resultList
            }.collect {
                //收集combine之后的数据,修改liveData的值,通知UI层刷新列表
                Log.d("HomeViewModel", "collect: ${it.size}")
                list.postValue(it)
            }
        }
    }

    fun loadData() {
        viewModelScope.launch(Dispatchers.IO) {
            //模拟耗时操作
            async {
                delay(1000)
                Log.d("HomeViewModel", "getBannerData success")
                bannerFlow.emit("Banner")
            }
            async {
                delay(2000)
                Log.d("HomeViewModel", "getChannelData success")
                channelFlow.emit("Channel")
            }
            async {
                delay(3000)
                Log.d("HomeViewModel", "getListData success")
                listFlow.emit("List")
            }
        }
    }
}

HomeViewModel

  1. 提供一个 LiveData 的列表数据给View层使用
  2. 内部有3个子 flow ,分别负责相应数据的生产。(这里简单都返回String,实际场景根据需要,返回相应的数据类型即可)。
  3. 通过 combine 操作符,组合这3个子flow的数据。
  4. collect 接收生成的新数据,并修改liveData的数据,通知刷新UI

View层使用

private fun flowCombineDemo() {
    val homeViewModel by viewModels<HomeViewModel>()
    homeViewModel.list.observe(this) {
        Log.d("HomeViewModel", "observe size:${it.size}")
    }
    homeViewModel.loadData()
}

简单的创建一个 ViewModel ,observe 列表数据对应的 LiveData。 通过输出的日志发现,触发数据加载之后,每次子 Flow 流生产数据的时候,都会触发一次 combine 操作,生成新的数据。

日志输出:
combine  bannerData:null,channelData:null,listData:null
collect: 0
observe size:0

getBannerData success
combine  bannerData:Banner,channelData:null,listData:null
collect: 1
observe size:1

getChannelData success
combine  bannerData:Banner,channelData:Channel,listData:null
collect: 2
observe size:2

getListData success
combine  bannerData:Banner,channelData:Channel,listData:List
collect: 3
observe size:3

总结

具体场景,具体分析。刚好这几个场景,配合Flow进行使用,整体实现也相对简单了一些。

作者:入魔的冬瓜
链接:https://juejin.cn/post/7065327938064875534
来源于掘金

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,968评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,601评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,220评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,416评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,425评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,144评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,432评论 3 401
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,088评论 0 261
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,586评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,028评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,137评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,783评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,343评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,333评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,559评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,595评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,901评论 2 345

推荐阅读更多精彩内容