当RxCache遇上Kotlin协程Flow,会擦出怎样的火花

什么?RxCache是什么?先整一段代码

RequestApi.api(ApiClient.create(Api.class).getBanner())
    .cacheKey("banner")
    .cacheStrategy(CacheStrategy.CACHE_AND_REMOTE)
    .cacheable(data -> data.hasData())
    .buildCacheWithCacheResult(new CacheType<ApiResponse<List<BannerBean>>>() {})
    .subscribe(new CacheSubscriber<ApiResponse<List<BannerBean>>>() {
        @Override
        public void onResponse(boolean isFromCache, ApiResponse<List<BannerBean>> result) {
            ((TextView) findViewById(R.id.textview)).setText(new Gson().toJson(result.data));
            Toast.makeText(MainActivity.this, "来自" + (isFromCache ? "缓存" : "网络"), Toast.LENGTH_SHORT).show();
        }
    });

上面这段代码干了两件事情:

  1. 从缓存取数据
  2. 加载网络更新缓存

这就是RxCache,基于RxJava+DiskLruCache实现的磁盘缓存库,支持根据策略自动处理网络数据缓存。

代码传送门:

随着kotlin的崛起,协程、Flow的出现,我思考着使用Flow重写一下RxCache。这次重写的过程,对Flow有了更深的理解,和大家一起分享。

这里简单说下使用方式。

API

初始化

使用前必须先进行初始化操作。

RxCache.initialize(context)

也可以设置更多参数

/**
 * 初始化
 *
 * @param cacheDir       缓存目录
 * @param cacheVersion   缓存版本
 * @param maxCacheSize   缓存最大size
 * @param cacheConverter 缓存Converter
 */
fun initialize(
    cacheDir: File,
    cacheConverter: GsonCacheConverter = GsonCacheConverter(Gson()),
    cacheVersion: Int = 1,
    maxCacheSize: Long = MAX_CACHE_SIZE

)

写入数据

// 同步写入数据
RxCache.apply {
    put("url", "111")
    put("data", BannerBean().apply {
        desc = "flutter"
        title = "flutter 中文社区"
    })
}

// 异步写入数据
lifecycleScope.launch {
    RxCache.rxPut("ulr2","222").collect()
}

读取数据

// 同步读取数据
RxCache.get("url", String::class.java)

// 异步读取数据
lifecycleScope.launch {
    RxCache.rxGet("data", BannerBean::class.java).collect {
        ToastUtil.toast("rxGet data = ${it?.title}")
    }
}

移除某缓存

RxCache.remove("url");

// 异步
lifecycleScope.launch {
    RxCache.rxRemove("url").collect()
}

清除全部缓存

// 同步
RxCache.clear()

// 异步
lifecycleScope.launch {
    RxCache.rxClear().collect()
    
    RxCache().clearAsync()
}

缓存策略

定义了IStrategy接口,框架内部提供了6中缓存策略,支持自定义。

缓存策略 说明
NO_CACHE 不使用RxCache进行缓存
ONLY_REMOTE 只请求网络,但数据依然会被缓存
ONLY_CACHE 只加载缓存,如离线模式
FIRST_REMOTE 优先请求网络,网络数据无效后,再加载缓存
(如果缓存也没有,则会响应网络的response or error)
FIRST_CACHE 优先加载缓存,缓存没有再去请求网络
CACHE_AND_REMOTE 先加载缓存(成功才会回调缓存response),不管缓存什么结果都会再请求网络。
如果缓存成功,网络请求数据无效,则网络不回调。
如果缓存成功,网络也成功,且网络和缓存数据相同则只有缓存回调,网络不再二次回调,否则会二次回调

网络请求

  • 生成请求的flow
  • 设置缓存策略
  • 设置cacheKey
  • 设置cacheable,用于判断数据是否有效,有效才进行缓存
  • buildCacheWithCacheResult构建
  • flowOn(Dispatchers.IO)指定运行在线程中
  • catch异常
  • collect获取数据
lifecycleScope.launch {
    RequestApi(
        flow {
            emit(ApiClient.create(Api::class.java).getBanner())
        }
    )  // 创建flow
        .cacheStrategy(CacheStrategy.CACHE_AND_REMOTE) // 配置缓存策略
        .cacheKey("banner") // 设置缓存key
        .cacheable(object : ICacheable<ApiResponse<MutableList<BannerBean>>> { // 判断数据是否有效,有效才缓存
            override fun cacheable(data: ApiResponse<MutableList<BannerBean>>?): Boolean {
                return data?.errorCode == 0 && data.data != null
            }
        })
//                .buildCache(object : CacheType<ApiResponse<MutableList<BannerBean>>>() {})
        .buildCacheWithCacheResult(object : CacheType<ApiResponse<MutableList<BannerBean>>>() {})//构建
        .flowOn(Dispatchers.IO) 
        .catch { // 捕获异常
            it.printStackTrace()
            ToastUtil.toast(it.message)
            binding.textview.text = null
        }
        .collect {
            ToastUtil.toast("数据是否来自缓存:${it.isFromCache}")
            binding.textview.text = Gson().toJson(it.data?.data)
        }
}

Flow

下面我们通过RxJava与Flow对比来认识Flow的操作符

对比 RxJava Flow
数据源 Observable<T> Flow<T>
发射数据 onNext emit
改变数据发射的线程 subscribeOn flowOn
改变消费数据的线程 observeOn 协程launch的时候指定context
捕获异常 onError catch或者try-cathch块
完成 onComplete onCompletion
map map map
flatMap flatMap flatMapConcat
compose compose let(transformer)
转换 transformer transformer
去重 distinct distinctUntilChanged
合并 concatWith onCompletion { emitAll(other) }
onErrorResumeNext onErrorResumeNext catch { emitAll(fallback) }
onErrorReturn onErrorReturn catch { emit(fallback) }
压缩 zip zip

创建flow

创建flow有多种方式

  • flowOf
flowOf(1)
flowOf(1, 2, 3)
  • asFlow()
方法.asFlow()
(1..3).asFlow()
  • flow{ emit(value) }
flow { 
    emit(1)
}

切换线程

RxJava中可以使用subscribeOn来切换发射线程,使用observeOn来指定消费线程。而Flow只能通过flowOn来切换发射线程,不能切换消费线程。collect执行的线程取决于协程launch时指定的上下文。

异常

RxJava中使用onError来捕获异常。Flow中使用catch{}或者try-cathch()语句来捕获异常。推荐使用catch{}。

lifecycleScope.launch {
    flow<String> {
        emit("111")
        throw NullPointerException()
        emit("222") // 这个不会发射
    }.catch {
        it.printStackTrace()
        emit("333")
    }.flowOn(Dispatchers.Main)
    .collect {
        println("collect >> $it")
    }
}
// 输出结果:
collect >> 111
collect >> 333

catch可以调用多次,作用范围是调用catch之前的代码。catch内部可以调用emit(value)或者emitAll(flow)再次发射数据。

onCompletion

不管onCompletion之前是否发生了异常,都会回调该方法。只有onCompletion之前发生了异常且没有被catch,参数cause才不会空,参数是throwable。

lifecycleScope.launch {
    flow<String> {
        emit("111")
        throw NullPointerException()
        emit("222") // 这个不会发射
    }.catch {
        it.printStackTrace()
        emit("333")
    }.onCompletion { throwable ->
        if (throwable != null) { // 发生了异常
            emit("444")
        } else {
            emit("555")
        }
    }.flowOn(Dispatchers.Main)
        .collect {
            println("collect >> $it")
        }
}
// 输出结果:
collect >> 111
collect >> 333
collect >> 555

虽然发生了异常,但由于onCompletion被catch住了,所以到onCompletion时是不存在异常的,所以throwable==null。如果前面没有catch,那么throwable就是上面的NullPointerException。

CacheAndRemoteStrategy

下面我们通过分析缓存策略CacheAndRemoteStrategy的实现过程,来简单分析下Flow的操作符。

策略:先加载缓存(成功才会回调缓存response),不管缓存什么结果都会再请求网络。

如果缓存成功,网络请求数据无效,则网络不回调

如果缓存成功,网络也成功,且网络和缓存数据相同则只有缓存回调,网络不再二次回调,否则会二次回调

  • 首先我们是要发射两个数据源,一个是cache一个是net。在RxJava中,我们可以使用concatWith来组合两个Observable。

Flow也有这个操作符,不过我们看下实现。

@Deprecated(
    level = DeprecationLevel.ERROR,
    message = "Flow analogue of 'concatWith' is 'onCompletion'. Use 'onCompletion { emit(value) }'",
    replaceWith = ReplaceWith("onCompletion { emit(value) }")
)
public fun <T> Flow<T>.concatWith(value: T): Flow<T> = noImpl()

这个扩展方法是不能调用的,让我们使用onCompletion { emit(value) }实现。

还有subscribe、compose、onErrorXxx等RxJava的常用操作符。看到这里,我有点怀疑,官方是故意这么设计的。参考RxJava的常用操作符,让我们可以快速熟悉使用。

不得不说,Flow的操作符的实现更简单,没有各种复杂的操作符,而是通过简单的集中操作符组合来实现功能,更方便理解和使用。

  • loadCache返回cache的Flow。onCompletion在cache执行完成之后,通过emitAll发射网络数据。

前面说过,如果loadCache发生了异常(无缓存时,内部会抛出NoCacheException),onCompletion中可以拿到这个异常。

我们看下网络Flow都干了什么。

  • netSource通过flatMapConcat来判断网络数据是否有效,有效则直接发射netResult、如果无效,则判断当前缓存是否有效。

    if (cacheEx != null) 意味着loadCache发生了异常,没有缓存,那么我们发射netResult。这么做的目的是能响应错误数据,如响应不同的错误码,或者toast 错误信息。

    如果有缓存,则发射一个空的Flow,跳过netFlow。

再往下看,一个catch。

  • 这一步的目的是,如果netSource发生了异常,如网络相关异常、数据解析异常等,那么上面的flatMapConcat里的代码就不会被执行了。

    在catch中,我们判断是否有缓存,如果没有缓存,则抛出netSource发生的异常。

继续往下看,distinctUntilChanged的作用是去重

  • 判断两个数据源response是否一致,一致则不再触发collect二次回调。

下面还有个catch

  • 这一步的目的是捕获住无缓存的异常,如果不是NoCacheException则抛出。

至此,我们就实现了策略CacheAndRemoteStrategy。这算是一个比较复杂的Flow的场景了。

  • 组合Flow
  • Flow的数据转换flatMapConcat
  • 发射数据flowOf
  • 发射emptyFlow
  • 异常处理,多次catch的使用
  • 异常中发射数据emitAll
  • 去重distinctUntilChanged
override fun <T> execute(
        cache: RxCache,
        cacheKey: String,
        netSource: Flow<CacheResult<T?>>,
        type: Type
    ): Flow<CacheResult<T?>> {
        return loadCache<T>(cache, cacheKey, type)
            .onCompletion { cacheEx ->
                // 判断是否发生异常
                emitAll(
                    netSource.flatMapConcat { netResult ->
                        // 如果网络数据有效则正常处理
                        if (netResult.cacheable) {
                            flowOf(netResult)
                        } else {
                            // 如果网络数据是无效的,缓存也是无效的,则抛出网络的结果。如果有缓存,则网络结果不再分发
                            if (cacheEx != null) { // 没有缓存
                                flowOf(netResult)
                            } else {
                                emptyFlow()
                            }
                        }
                    }.catch { netEx ->
                        // 网络请求发生了异常,根据是否有缓存判断如何分发
                        if (cacheEx != null) { // 没有缓存,则分发网络结果
                            throw netEx
                        } else {  // 有缓存则不发射网络结果
                            emitAll(emptyFlow())
                        }
                    }
                )
            }
            .distinctUntilChanged { old, new ->
                // 如果网络数据和缓存数据一致,则只发射一次
                if (old.data == null || !new.cacheable) { // 网络无数据或没有缓存
                    false
                } else {
                    isDataSame(old.data, new.data)
                }
            }
            .catch { // 捕获NoCacheException
                if (it !is NoCacheException) {
                    throw it
                }
            }
    }

通过这次重写,不得不说,Flow是真的爽。

现在Kotlin越来越流行了,协程也逐渐兴起,新知识更新很快,一不留神,就落后了。我个人觉得协程和Flow都很简单,只要对比这RxJava,然后多加练习,很快就能上手。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,126评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,254评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,445评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,185评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,178评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,970评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,276评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,927评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,400评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,883评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,997评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,646评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,213评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,204评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,423评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,423评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,722评论 2 345

推荐阅读更多精彩内容