什么?RxCache是什么?先整一段代码
RequestApi.api(ApiClient.create(Api.class).getBanner())
.cacheKey("banner")
.cacheStrategy(CacheStrategy.CACHE_AND_REMOTE)
.cacheable(data -> data.hasData())
.buildCacheWithCacheResult(new CacheType<ApiResponse<List<BannerBean>>>() {})
.subscribe(new CacheSubscriber<ApiResponse<List<BannerBean>>>() {
@Override
public void onResponse(boolean isFromCache, ApiResponse<List<BannerBean>> result) {
((TextView) findViewById(R.id.textview)).setText(new Gson().toJson(result.data));
Toast.makeText(MainActivity.this, "来自" + (isFromCache ? "缓存" : "网络"), Toast.LENGTH_SHORT).show();
}
});
上面这段代码干了两件事情:
- 从缓存取数据
- 加载网络更新缓存
这就是RxCache,基于RxJava+DiskLruCache实现的磁盘缓存库,支持根据策略自动处理网络数据缓存。
代码传送门:
随着kotlin的崛起,协程、Flow的出现,我思考着使用Flow重写一下RxCache。这次重写的过程,对Flow有了更深的理解,和大家一起分享。
这里简单说下使用方式。
API
初始化
使用前必须先进行初始化操作。
RxCache.initialize(context)
也可以设置更多参数
/**
* 初始化
*
* @param cacheDir 缓存目录
* @param cacheVersion 缓存版本
* @param maxCacheSize 缓存最大size
* @param cacheConverter 缓存Converter
*/
fun initialize(
cacheDir: File,
cacheConverter: GsonCacheConverter = GsonCacheConverter(Gson()),
cacheVersion: Int = 1,
maxCacheSize: Long = MAX_CACHE_SIZE
)
写入数据
// 同步写入数据
RxCache.apply {
put("url", "111")
put("data", BannerBean().apply {
desc = "flutter"
title = "flutter 中文社区"
})
}
// 异步写入数据
lifecycleScope.launch {
RxCache.rxPut("ulr2","222").collect()
}
读取数据
// 同步读取数据
RxCache.get("url", String::class.java)
// 异步读取数据
lifecycleScope.launch {
RxCache.rxGet("data", BannerBean::class.java).collect {
ToastUtil.toast("rxGet data = ${it?.title}")
}
}
移除某缓存
RxCache.remove("url");
// 异步
lifecycleScope.launch {
RxCache.rxRemove("url").collect()
}
清除全部缓存
// 同步
RxCache.clear()
// 异步
lifecycleScope.launch {
RxCache.rxClear().collect()
RxCache().clearAsync()
}
缓存策略
定义了IStrategy接口,框架内部提供了6中缓存策略,支持自定义。
缓存策略 | 说明 |
---|---|
NO_CACHE | 不使用RxCache进行缓存 |
ONLY_REMOTE | 只请求网络,但数据依然会被缓存 |
ONLY_CACHE | 只加载缓存,如离线模式 |
FIRST_REMOTE | 优先请求网络,网络数据无效后,再加载缓存 (如果缓存也没有,则会响应网络的response or error) |
FIRST_CACHE | 优先加载缓存,缓存没有再去请求网络 |
CACHE_AND_REMOTE | 先加载缓存(成功才会回调缓存response),不管缓存什么结果都会再请求网络。 如果缓存成功,网络请求数据无效,则网络不回调。 如果缓存成功,网络也成功,且网络和缓存数据相同则只有缓存回调,网络不再二次回调,否则会二次回调 |
网络请求
- 生成请求的flow
- 设置缓存策略
- 设置cacheKey
- 设置cacheable,用于判断数据是否有效,有效才进行缓存
- buildCacheWithCacheResult构建
- flowOn(Dispatchers.IO)指定运行在线程中
- catch异常
- collect获取数据
lifecycleScope.launch {
RequestApi(
flow {
emit(ApiClient.create(Api::class.java).getBanner())
}
) // 创建flow
.cacheStrategy(CacheStrategy.CACHE_AND_REMOTE) // 配置缓存策略
.cacheKey("banner") // 设置缓存key
.cacheable(object : ICacheable<ApiResponse<MutableList<BannerBean>>> { // 判断数据是否有效,有效才缓存
override fun cacheable(data: ApiResponse<MutableList<BannerBean>>?): Boolean {
return data?.errorCode == 0 && data.data != null
}
})
// .buildCache(object : CacheType<ApiResponse<MutableList<BannerBean>>>() {})
.buildCacheWithCacheResult(object : CacheType<ApiResponse<MutableList<BannerBean>>>() {})//构建
.flowOn(Dispatchers.IO)
.catch { // 捕获异常
it.printStackTrace()
ToastUtil.toast(it.message)
binding.textview.text = null
}
.collect {
ToastUtil.toast("数据是否来自缓存:${it.isFromCache}")
binding.textview.text = Gson().toJson(it.data?.data)
}
}
Flow
下面我们通过RxJava与Flow对比来认识Flow的操作符
对比 | RxJava | Flow |
---|---|---|
数据源 | Observable<T> | Flow<T> |
发射数据 | onNext | emit |
改变数据发射的线程 | subscribeOn | flowOn |
改变消费数据的线程 | observeOn | 协程launch的时候指定context |
捕获异常 | onError | catch或者try-cathch块 |
完成 | onComplete | onCompletion |
map | map | map |
flatMap | flatMap | flatMapConcat |
compose | compose | let(transformer) |
转换 | transformer | transformer |
去重 | distinct | distinctUntilChanged |
合并 | concatWith | onCompletion { emitAll(other) } |
onErrorResumeNext | onErrorResumeNext | catch { emitAll(fallback) } |
onErrorReturn | onErrorReturn | catch { emit(fallback) } |
压缩 | zip | zip |
创建flow
创建flow有多种方式
- flowOf
flowOf(1)
flowOf(1, 2, 3)
- asFlow()
方法.asFlow()
(1..3).asFlow()
- flow{ emit(value) }
flow {
emit(1)
}
切换线程
RxJava中可以使用subscribeOn来切换发射线程,使用observeOn来指定消费线程。而Flow只能通过flowOn来切换发射线程,不能切换消费线程。collect执行的线程取决于协程launch时指定的上下文。
异常
RxJava中使用onError来捕获异常。Flow中使用catch{}或者try-cathch()语句来捕获异常。推荐使用catch{}。
lifecycleScope.launch {
flow<String> {
emit("111")
throw NullPointerException()
emit("222") // 这个不会发射
}.catch {
it.printStackTrace()
emit("333")
}.flowOn(Dispatchers.Main)
.collect {
println("collect >> $it")
}
}
// 输出结果:
collect >> 111
collect >> 333
catch可以调用多次,作用范围是调用catch之前的代码。catch内部可以调用emit(value)或者emitAll(flow)再次发射数据。
onCompletion
不管onCompletion之前是否发生了异常,都会回调该方法。只有onCompletion之前发生了异常且没有被catch,参数cause才不会空,参数是throwable。
lifecycleScope.launch {
flow<String> {
emit("111")
throw NullPointerException()
emit("222") // 这个不会发射
}.catch {
it.printStackTrace()
emit("333")
}.onCompletion { throwable ->
if (throwable != null) { // 发生了异常
emit("444")
} else {
emit("555")
}
}.flowOn(Dispatchers.Main)
.collect {
println("collect >> $it")
}
}
// 输出结果:
collect >> 111
collect >> 333
collect >> 555
虽然发生了异常,但由于onCompletion被catch住了,所以到onCompletion时是不存在异常的,所以throwable==null。如果前面没有catch,那么throwable就是上面的NullPointerException。
CacheAndRemoteStrategy
下面我们通过分析缓存策略CacheAndRemoteStrategy的实现过程,来简单分析下Flow的操作符。
策略:先加载缓存(成功才会回调缓存response),不管缓存什么结果都会再请求网络。
如果缓存成功,网络请求数据无效,则网络不回调
如果缓存成功,网络也成功,且网络和缓存数据相同则只有缓存回调,网络不再二次回调,否则会二次回调
- 首先我们是要发射两个数据源,一个是cache一个是net。在RxJava中,我们可以使用concatWith来组合两个Observable。
Flow也有这个操作符,不过我们看下实现。
@Deprecated(
level = DeprecationLevel.ERROR,
message = "Flow analogue of 'concatWith' is 'onCompletion'. Use 'onCompletion { emit(value) }'",
replaceWith = ReplaceWith("onCompletion { emit(value) }")
)
public fun <T> Flow<T>.concatWith(value: T): Flow<T> = noImpl()
这个扩展方法是不能调用的,让我们使用onCompletion { emit(value) }实现。
还有subscribe、compose、onErrorXxx等RxJava的常用操作符。看到这里,我有点怀疑,官方是故意这么设计的。参考RxJava的常用操作符,让我们可以快速熟悉使用。
不得不说,Flow的操作符的实现更简单,没有各种复杂的操作符,而是通过简单的集中操作符组合来实现功能,更方便理解和使用。
- loadCache返回cache的Flow。onCompletion在cache执行完成之后,通过emitAll发射网络数据。
前面说过,如果loadCache发生了异常(无缓存时,内部会抛出NoCacheException),onCompletion中可以拿到这个异常。
我们看下网络Flow都干了什么。
- netSource通过flatMapConcat来判断网络数据是否有效,有效则直接发射netResult、如果无效,则判断当前缓存是否有效。
if (cacheEx != null) 意味着loadCache发生了异常,没有缓存,那么我们发射netResult。这么做的目的是能响应错误数据,如响应不同的错误码,或者toast 错误信息。
如果有缓存,则发射一个空的Flow,跳过netFlow。
再往下看,一个catch。
- 这一步的目的是,如果netSource发生了异常,如网络相关异常、数据解析异常等,那么上面的flatMapConcat里的代码就不会被执行了。
在catch中,我们判断是否有缓存,如果没有缓存,则抛出netSource发生的异常。
继续往下看,distinctUntilChanged的作用是去重
- 判断两个数据源response是否一致,一致则不再触发collect二次回调。
下面还有个catch
- 这一步的目的是捕获住无缓存的异常,如果不是NoCacheException则抛出。
至此,我们就实现了策略CacheAndRemoteStrategy。这算是一个比较复杂的Flow的场景了。
- 组合Flow
- Flow的数据转换flatMapConcat
- 发射数据flowOf
- 发射emptyFlow
- 异常处理,多次catch的使用
- 异常中发射数据emitAll
- 去重distinctUntilChanged
override fun <T> execute(
cache: RxCache,
cacheKey: String,
netSource: Flow<CacheResult<T?>>,
type: Type
): Flow<CacheResult<T?>> {
return loadCache<T>(cache, cacheKey, type)
.onCompletion { cacheEx ->
// 判断是否发生异常
emitAll(
netSource.flatMapConcat { netResult ->
// 如果网络数据有效则正常处理
if (netResult.cacheable) {
flowOf(netResult)
} else {
// 如果网络数据是无效的,缓存也是无效的,则抛出网络的结果。如果有缓存,则网络结果不再分发
if (cacheEx != null) { // 没有缓存
flowOf(netResult)
} else {
emptyFlow()
}
}
}.catch { netEx ->
// 网络请求发生了异常,根据是否有缓存判断如何分发
if (cacheEx != null) { // 没有缓存,则分发网络结果
throw netEx
} else { // 有缓存则不发射网络结果
emitAll(emptyFlow())
}
}
)
}
.distinctUntilChanged { old, new ->
// 如果网络数据和缓存数据一致,则只发射一次
if (old.data == null || !new.cacheable) { // 网络无数据或没有缓存
false
} else {
isDataSame(old.data, new.data)
}
}
.catch { // 捕获NoCacheException
if (it !is NoCacheException) {
throw it
}
}
}
通过这次重写,不得不说,Flow是真的爽。
现在Kotlin越来越流行了,协程也逐渐兴起,新知识更新很快,一不留神,就落后了。我个人觉得协程和Flow都很简单,只要对比这RxJava,然后多加练习,很快就能上手。