技术不止,文章有料,加
JiuXinDev
入群,Android 搬砖路上不孤单
前言
在上一篇文章中,我和大家简单交流了一下关于 Paging 3 的用法,有同学就立即表示,Paging 3 有啥好学的,不就是一个分页库吗?
其实不然,Paging 3 看上去是一个分页的库,但实际你看到的只是表象,除去分页这个外衣,Paging 可以帮助我们管控数据的生命周期,从获取,到展示,提供了一套自动化啊和标准化的管控流程,加上Android Jetpack 中其他组件的加持:
- 如
LiveData
可以为我们解决组件生命周期异常带来的困扰 - 比如
Room
提供的 PagingSource 在数据库新增数据的时候会立刻在UI中刷新
别问,问就一个字,香~
众所周知,研究源码最好的方法,是带着问题去研究源码。
所以,我们从 Paging 3 入手,褪去分页这层外衣,看看它是如何处理了:
- 分页的整个流程是什么样的?
- PagingSource 和 RemoteMediator 是如何结合的?
- 如何实现它的状态管理?
目录
一、从使用角度看结构
关于使用的代码我们就不过多介绍了,感兴趣的可以阅读 《即学即用Android Jetpack - Paging 3》。
从之前的文章,我们了解到如下几个重要的类:
类 | 说明 |
---|---|
PagingSource |
Paing 的数据源。 |
RemoteMediator |
如果你既有本地的数据源,又有远程的数据源,这个时候你就可以使用 PagingSource 充当本地的数据源,RemoteMediator 充当远程的数据源。 |
Pager |
获取数据的入口,它可以向外提供一个 Flow ,它是一个响应式的数据源,数据的接收方可以获取到 PagingData 。 |
PagingData |
这个我们不会接触到,但是有必要了解。官方注释说Container for Paged data from a single generation of loads ,single generation 我理解就是数据源没有发生变化的时候就是一代,也就是没有发生增删改这种情况。 |
RecyclerView |
我们的老朋友,不用介绍 |
PagingDataAdapter |
Paging 3 中为 RecyclerView 量身定做的适配器 |
从这几个类中,先建立一个大概的结构图:
注意一下,这个 PagingAdapter
连接 Flow<PagingData>
这个过程,我前面说过,PagingData
提供的是一代数据,你可以理解它为一份数据快照,当数据源发生变化的时候,它才会生成一个新的 PagingSource
。
阅读此篇文章之前,你应该具有一定的协程知识,建议阅读我的文章:《即学即用Kotlin - 协程》。
二、分析准备
在讲下文之前,我觉得有必要和大家探讨一下 Paging 3
中的状态管理机制和事件消费机制。
1 状态管理和事件管理
1.1 状态管理
状态管理的场景:
data class LoadStates(
/** [LoadState] corresponding to [LoadType.REFRESH] loads. */
val refresh: LoadState,
/** [LoadState] corresponding to [LoadType.PREPEND] loads. */
val prepend: LoadState,
/** [LoadState] corresponding to [LoadType.APPEND] loads. */
val append: LoadState
)
可以看出状态管理场景分为三种:
-
refresh
:刷新状态,顾名思义,就是Pager
初始化数据时候的状态。 -
append
:向后加载的状态,常见于加载更多的场景。 -
prepend
:向前加载的状态,常见于数据从指定位置开始,但是这个位置之前的数据并没有加载,所以需要加载之前的数据。
有了这个三个场景,它会有哪些状态呢?状态对应的类为 LoadState
, 它为我们提供三种状态:
-
NotLoading
:未加载,并且未加载分为加载已完成和加载未完成,由成员变量endOfPaginationReached
控制。 -
Loading
:加载中 -
Error
:错误
有了这些状态,我们就可以做更多的事情,比如和UI交互,比如,管理请求事件。
1.2 事件管理
除了状态管理,我们还需要事件管理,比如,数据回来了,我需用通知一个 Insert
事件,并包含状态的变化,所以,事件管理,其实是包含了状态管理的。
事件管理也分为三种:
internal sealed class PageEvent<T : Any> {
// Intentional to prefer Refresh, Prepend, Append constructors from Companion.
@Suppress("DataClassPrivateConstructor")
data class Insert<T : Any> private constructor(
val loadType: LoadType,
val pages: List<TransformablePage<T>>,
val placeholdersBefore: Int,
val placeholdersAfter: Int,
val combinedLoadStates: CombinedLoadStates
) : PageEvent<T>() {
// ...
}
data class Drop<T : Any>(
val loadType: LoadType,
val minPageOffset: Int,
val maxPageOffset: Int,
val placeholdersRemaining: Int
) : PageEvent<T>() {
// ...
}
data class LoadStateUpdate<T : Any>(
val loadType: LoadType,
val fromMediator: Boolean,
val loadState: LoadState // TODO: consider using full state object here
) : PageEvent<T>() {
//...
}
}
具体是:
-
Insert
:插入事件,包括具体的数据pages
、加载类型(加载类型对应的是Refresh\Append\Prepend)loadType
和组合状态combinedLoadStates
(包含Refresh\Append\Prepend加载状态)。 -
Drop
:删除事件 -
LoadStateUpdate
:加载状态变化的事件。
三、数据的产生
有了上面的基础,我们开始讲解整个流程。
入口是 PagingAdapter
和 Pager
建立关系的时候:
lifecycleScope.launch {
// 注意:这里需要用 collectLatest,只用 collect 的话筛选会不生效
viewModel.shoes.collectLatest {
adapter.submitData(it)
}
}
我们先讨论一下数据如何产生。
上面的 viewModel.shoes
就是一个 Pager
提供的 Flow
:
// 使用的时候构建Flow的代码
Pager(config = PagingConfig(
pageSize = 20,
enablePlaceholders = false,
initialLoadSize = 20
), pagingSourceFactory = {
brand?.let { shoeRepository.getShoesByBrandPagingSource(it) }
?: shoeRepository.getAllShoesPagingSource()
}).flow
class Pager<Key : Any, Value : Any>
@JvmOverloads constructor(
config: PagingConfig,
initialKey: Key? = null,
@OptIn(ExperimentalPagingApi::class)
remoteMediator: RemoteMediator<Key, Value>? = null,
pagingSourceFactory: () -> PagingSource<Key, Value>
) {
/**
* A cold [Flow] of [PagingData], which emits new instances of [PagingData] once they become
* invalidated by [PagingSource.invalidate] or calls to [AsyncPagingDataDiffer.refresh] or
* [PagingDataAdapter.refresh].
*/
val flow: Flow<PagingData<Value>> = PageFetcher(
pagingSourceFactory,
initialKey,
config,
remoteMediator
).flow
}
第一段是我们使用 Paging 3
的代码,不再多讲,列出来只是希望你知道,它有这个过程。
第二段是 Pager
的源码,它其实只是一个壳,在构造函数中,最后一个参数是一个闭包,它会返回一个 PagingSource
。除此以外,Pager
中还提供了一个 类型是 Flow<PagingData<Value>>
的 flow
,它来自 PageFetcher
中的 flow
。
1. PageFetcher
Pager
的 Flow
部分如下:
internal class PageFetcher<Key : Any, Value : Any>(
// ... 构造参数省略
) {
// 用来控制刷新的 Channel
private val refreshChannel = ConflatedBroadcastChannel<Boolean>()
// 失败重试的Channel
private val retryChannel = ConflatedBroadcastChannel<Unit>()
// The object built by paging builder can maintain the scope so that on rotation we don't stop
// the paging.
val flow: Flow<PagingData<Value>> = channelFlow {
// 1. 构建 RemoteMediatorAccessor
val remoteMediatorAccessor = remoteMediator?.let {
RemoteMediatorAccessor(this, it)
}
// 2. 将refreshChannel转成为Flow
refreshChannel.asFlow()
.onStart {
// 3. collect之前触发的操作
@OptIn(ExperimentalPagingApi::class)
emit(remoteMediatorAccessor?.initialize() == LAUNCH_INITIAL_REFRESH)
}
.scan(null) {
// 4. 计算新结果前,可以处理一下老的结果
// ...
}
.filterNotNull()
.mapLatest { generation ->
// 5. 只处理最新的值,构建PagingData
// ...
}
.collect { send(it) }
}
// ...
}
鉴于比较长,省略了很多代码,具体方法处再放上代码。
1.1 两个属性
先看两个属性 refreshChannel
和 retryChannel
,他们两个其实就是用来发送信号的,refreshChannel
比较重要,发送刷新信号,retryChannel
用来发送重新请求的信号。
1.2 外层
返回的 Flow
用了一层 channelFlow
来包裹,使用 channelFlow
要么是为了在多个协程中传送数据,要么是数据数量具有不确定性,我们看看后面是不是这样的。
1.3 构建远程数据源相关
构建一个 remoteMediatorAccessor
,它包裹了远程数据源的 RemoteMediator
。
后面我们会以 Flow
的每个扩展方法为一部分,涉及到具体的扩展方法我们不会讲它的原理,只会讲它的作用,感兴趣的同学可以自己看一下它的实现。
1.4 创建Flow
将 refreshChannel
转化为 Flow
,然后调用了 Flow#onStart
方法,这个 onStart
方法会在 Flow
进行 collect
操作之前调用。这个方法做了什么呢?只有一行代码:
remoteMediatorAccessor?.initialize() == LAUNCH_INITIAL_REFRESH
验证 remoteMediatorAccessor
的初始化行为,之前我们提过,remoteMediatorAccessor
是 RemoteMediator
的壳,这次我们进代码看看:
// 第一处的调用点
val remoteMediatorAccessor = remoteMediator?.let {
RemoteMediatorAccessor(this, it)
}
// RemoteMediatorAccessor 方法
internal fun <Key : Any, Value : Any> RemoteMediatorAccessor(
scope: CoroutineScope,
delegate: RemoteMediator<Key, Value>
): RemoteMediatorAccessor<Key, Value> = RemoteMediatorAccessImpl(scope, delegate)
private class RemoteMediatorAccessImpl<Key : Any, Value : Any>(
private val scope: CoroutineScope,
private val remoteMediator: RemoteMediator<Key, Value>
) : RemoteMediatorAccessor<Key, Value> {
// ...
override suspend fun initialize(): RemoteMediator.InitializeAction {
return remoteMediator.initialize().also { action ->
if (action == RemoteMediator.InitializeAction.LAUNCH_INITIAL_REFRESH) {
// 如果当前在collect之前,RemoteMediator有默认的初始化行为,设置状态
accessorState.use {
it.setBlockState(LoadType.APPEND, REQUIRES_REFRESH)
it.setBlockState(LoadType.PREPEND, REQUIRES_REFRESH)
}
}
}
}
// ...
}
从我列出的代码来看,RemoteMediatorAccessor
包裹了 RemoteMediator
,并且 RemoteMediatorAccessImpl#initialize
也调用了 RemoteMediator#initialize
方法,该方法会返回一个枚举 InitializeAction
,这个枚举有两种:
-
LAUNCH_INITIAL_REFRESH
:在初始化的时候,会发射一个刷新的信号 -
SKIP_INITIAL_REFRESH
:初始化的时候不发射刷新信号,等待UI请求的时候发送
再回到 PageFetcher
中的 flow
,可以看到,在回到onStart
方法中,它会有两种情况:
- 如果你有
RemoteMediator
,默认情况下它会发射true
。 - 没有
RemoteMediator
或者初始化默认不请求远程的数据源,发射false
。
我们其实可以理解为它要不要在初始化的时候刷新远程的数据源。
1.5 Scan
进行 Flow#scan
方法,这个方法的作用就是每一次上流发射新的信号的时候,你可以获取新的信号计算新的结果,在此之前,你还可拿到老的结果,方便处理老的结果。
从这个方法的参数你就可以看出来:
-
previousGeneration
:上一次计算得出来的结果 -
triggerRemoteRefresh
:上面提到的onStart
方法发射出来的值,或者是别处调用refreshChannel
发射的信号,是否触发刷新远程的数据源。
internal class PageFetcher<Key : Any, Value : Any>(
private val pagingSourceFactory: () -> PagingSource<Key, Value>,
private val initialKey: Key?,
private val config: PagingConfig,
@OptIn(ExperimentalPagingApi::class)
private val remoteMediator: RemoteMediator<Key, Value>? = null
) {
// ...
// The object built by paging builder can maintain the scope so that on rotation we don't stop
// the paging.
val flow: Flow<PagingData<Value>> = channelFlow {
val remoteMediatorAccessor = remoteMediator?.let {
RemoteMediatorAccessor(this, it)
}
refreshChannel.asFlow()
.onStart {
@OptIn(ExperimentalPagingApi::class)
emit(remoteMediatorAccessor?.initialize() == LAUNCH_INITIAL_REFRESH)
}
.scan(null) {
previousGeneration: PageFetcherSnapshot<Key, Value>?, triggerRemoteRefresh ->
// 1. 产生新的数据源
var pagingSource = generateNewPagingSource(previousGeneration?.pagingSource)
while (pagingSource.invalid) {
pagingSource = generateNewPagingSource(previousGeneration?.pagingSource)
}
@OptIn(ExperimentalPagingApi::class)
val initialKey: Key? = previousGeneration?.refreshKeyInfo()
?.let { pagingSource.getRefreshKey(it) }
?: initialKey
// 2. 释放旧的数据源
previousGeneration?.close()
// 3. 生成新的 PageFetcherSnapshot
PageFetcherSnapshot<Key, Value>(
initialKey = initialKey,
pagingSource = pagingSource,
config = config,
retryFlow = retryChannel.asFlow(),
// Only trigger remote refresh on refresh signals that do not originate from
// initialization or PagingSource invalidation.
triggerRemoteRefresh = triggerRemoteRefresh,
remoteMediatorConnection = remoteMediatorAccessor,
invalidate = this@PageFetcher::refresh
)
}
.filterNotNull()
.mapLatest { generation ->
// ...
}
.collect { send(it) }
}
//...
}
这个方法干了三件事:
- 生成一个新的数据源
PageSource
:PageFetcher#generateNewPagingSource
这个方法调用了PageFetcher
构造函数中的pagingSourceFactory
创建了一个新的数据源,并做了一些监听处理。 - 释放旧的数据源。
- 返回一个新的
PageFetcherSnapshot
对象,它是数据快照的持有类。
1.6 过滤空值
Flow#filterNotNull
方法过滤发射过来空的值。
1.7 处理最新值
Flow#mapLatest
只处理最新的值,当这个方法正在工作的时候,上游发了一个新的值过来,这时,它会停止手上的工作,处理新的值。
internal class PageFetcher<Key : Any, Value : Any>(
private val pagingSourceFactory: () -> PagingSource<Key, Value>,
private val initialKey: Key?,
private val config: PagingConfig,
@OptIn(ExperimentalPagingApi::class)
private val remoteMediator: RemoteMediator<Key, Value>? = null
) {
// ...
// The object built by paging builder can maintain the scope so that on rotation we don't stop
// the paging.
val flow: Flow<PagingData<Value>> = channelFlow {
val remoteMediatorAccessor = remoteMediator?.let {
RemoteMediatorAccessor(this, it)
}
refreshChannel.asFlow()
.onStart {
@OptIn(ExperimentalPagingApi::class)
emit(remoteMediatorAccessor?.initialize() == LAUNCH_INITIAL_REFRESH)
}
.scan(null) {
// ...
}
.filterNotNull()
.mapLatest { generation ->
val downstreamFlow = if (remoteMediatorAccessor == null) {
generation.pageEventFlow
} else {
generation.injectRemoteEvents(remoteMediatorAccessor)
}
PagingData(
flow = downstreamFlow,
receiver = PagerUiReceiver(generation, retryChannel)
)
}
.collect { send(it) }
}
//...
}
在 Flow#mapLatest
方法中,它做了两件事:
- 得到一个事件流
pageEventFlow
。 - 讲这个事件流封装成
PagingData
。
1.8 发送PagingData
将上面得到的 PagingData
发送出去,最终被 PagingDataAdapter
消费,回到我们一开始写的代码:
// viewModel.shoes 就是 Flow<PagingData<T>>
viewModel.shoes.collectLatest {
adapter.submitData(it)
}
总结一下,虽然上面的过程很多,其实目的就是:
- 得到
PagingData
,而PagingData
中最重要的就是事件流Flow<PageEvent<T>>
,它来自PageFetcherSnapshot
。 - 根据代码是否启用
RemoteMediator
。
2 PagingData
从 1 中,我们了解到事件流 Flow<PageEvent<T>>
,它来自 PageFetcherSnapshot
,这是跟数据相关的核心代码。
好家伙,又是一大段代码,最重要的是 pageEventFlow
:
internal class PageFetcherSnapshot<Key : Any, Value : Any>(
internal val initialKey: Key?,
internal val pagingSource: PagingSource<Key, Value>,
private val config: PagingConfig,
private val retryFlow: Flow<Unit>,
private val triggerRemoteRefresh: Boolean = false,
val remoteMediatorConnection: RemoteMediatorConnection<Key, Value>? = null,
private val invalidate: () -> Unit = {}
) {
// ...
@OptIn(ExperimentalCoroutinesApi::class)
private val pageEventChCollected = AtomicBoolean(false)
private val pageEventCh = Channel<PageEvent<Value>>(Channel.BUFFERED)
private val stateLock = Mutex()
private val state = PageFetcherSnapshotState<Key, Value>(
config = config
)
private val pageEventChannelFlowJob = Job()
@OptIn(ExperimentalCoroutinesApi::class)
val pageEventFlow: Flow<PageEvent<Value>> = cancelableChannelFlow(pageEventChannelFlowJob) {
// 1. 建立一个协程pageEventCh收到的事件发送出去
launch {
pageEventCh.consumeAsFlow().collect {
// Protect against races where a subsequent call to submitData invoked close(),
// but a pageEvent arrives after closing causing ClosedSendChannelException.
try {
send(it)
} catch (e: ClosedSendChannelException) {
// Safe to drop PageEvent here, since collection has been cancelled.
}
}
}
// 2. 接受重试的信息,是不是为了缓存
val retryChannel = Channel<Unit>(Channel.RENDEZVOUS)
launch { retryFlow.collect { retryChannel.offer(it) } }
// 3. 重试的动作
launch {
retryChannel.consumeAsFlow()
.collect {
// 重试后处理对应的状态
// ...
}
}
// 4. 如果刷新的时候需要远程更新,就让remoteMediator加载数据
if (triggerRemoteRefresh) {
remoteMediatorConnection?.let {
val pagingState = stateLock.withLock { state.currentPagingState(null) }
it.requestLoad(LoadType.REFRESH, pagingState)
}
}
// 5. PageSource初始化数据
doInitialLoad(state)
// 6. 消费hint
if (stateLock.withLock { state.sourceLoadStates.get(LoadType.REFRESH) } !is LoadState.Error) {
startConsumingHints()
}
}
// ...
}
pageEventFlow
又被分为了6个部分,我们着重去了解1、4、5和6。
2.1 发射PageEvent<Value>
创建了一个协程,用来转发 pageEventCh
接收到的 PageEvent<Value>
。
2.2 请求远程数据源
如果创建了远程的数据源,并且需要在初始化的时候加载远程的数据,开始请求远程的数据,
2.3 PagingSource初始化
这里面发生了 PagingSource
的第一次数据初始化,来看看发生了什么?
internal class PageFetcherSnapshot<Key : Any, Value : Any>(
// ...
) {
// ...
private suspend fun doInitialLoad(
state: PageFetcherSnapshotState<Key, Value>
) {
// 1. 设置当前加载的状态 - 刷新
stateLock.withLock { state.setLoading(LoadType.REFRESH) }
// 构建参数
val params = loadParams(LoadType.REFRESH, initialKey)
// 2. 数据加载,得到结果 result
when (val result = pagingSource.load(params)) {
is PagingSource.LoadResult.Page<Key, Value> -> {
// 3. 处理一下得到的结果 pages
val insertApplied = stateLock.withLock { state.insert(0, LoadType.REFRESH, result) }
// 4. 处理一下各种状态
stateLock.withLock {
state.setSourceLoadState(LoadType.REFRESH, LoadState.NotLoading.Incomplete)
if (result.prevKey == null) {
state.setSourceLoadState(
type = PREPEND,
newState = when (remoteMediatorConnection) {
null -> LoadState.NotLoading.Complete
else -> LoadState.NotLoading.Incomplete
}
)
}
if (result.nextKey == null) {
state.setSourceLoadState(
type = APPEND,
newState = when (remoteMediatorConnection) {
null -> LoadState.NotLoading.Complete
else -> LoadState.NotLoading.Incomplete
}
)
}
}
// 5. 发送PageEvent
if (insertApplied) {
stateLock.withLock {
with(state) {
pageEventCh.send(result.toPageEvent(LoadType.REFRESH))
}
}
}
// 6. 是否有必要发生远程数据的请求
if (remoteMediatorConnection != null) {
if (result.prevKey == null || result.nextKey == null) {
val pagingState =
stateLock.withLock { state.currentPagingState(lastHint) }
if (result.prevKey == null) {
remoteMediatorConnection.requestLoad(PREPEND, pagingState)
}
if (result.nextKey == null) {
remoteMediatorConnection.requestLoad(APPEND, pagingState)
}
}
}
}
is PagingSource.LoadResult.Error -> stateLock.withLock {
// 错误状态的请求
val loadState = LoadState.Error(result.throwable)
if (state.setSourceLoadState(LoadType.REFRESH, loadState)) {
pageEventCh.send(PageEvent.LoadStateUpdate(LoadType.REFRESH, false, loadState))
}
}
}
}
}
从数据第一次初始化的时候,可以看到很多东西:
- 数据加载状态的变化:Refresh 场景
Incomplete
--Loading
- 根据返回的结果设置Complete
和Incomplete
,并且一些状态都会通过第一部分的pageEventCh
发送状态更新事件。 - 在
Refresh
场景设置Loading
状态以后,会构建加载的参数,放到pageSource
进行数据请求,终于见到pagingSource
了。 - 因为
pagingSource.load(params)
可能得到两种结果,如果是错误就直接处理错误。 - 如果是正常的结果,会先处理一下结果。再变更一下状态,之后统一发射一个
Insert
事件。 - 因为有的时候
pageSource
没有获取到结果 ,又设置了remoteMediator
,这个时候就需要再使用remoteMediator
进行下一步的数据请求
这个时候可以回答一开始第二个问题:
当
pageSource
获取不到结果的时候,如果存在remoteMediator
,会使用remoteMediator
进行数据请求。
2.4 如何加载更多的数据
如果一开始刷新没有出现纰漏即最开始的刷新没有出现错误,这里会调用下一步 startConsumingHints
方法:
internal class PageFetcherSnapshot<Key : Any, Value : Any>(
// ...
) {
// ...
private val state = PageFetcherSnapshotState<Key, Value>(
config = config
)
@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class)
private fun CoroutineScope.startConsumingHints() {
// ...
// 监听Prepend消息
launch {
state.consumePrependGenerationIdAsFlow()
.collectAsGenerationalViewportHints(PREPEND)
}
// 监听Append消息
launch {
state.consumeAppendGenerationIdAsFlow()
.collectAsGenerationalViewportHints(APPEND)
}
}
private suspend fun Flow<Int>.collectAsGenerationalViewportHints(
loadType: LoadType
) = flatMapLatest { generationId ->
// 处理状态
stateLock.withLock {
// ...
}
@OptIn(FlowPreview::class)
hintChannel.asFlow()
.drop(if (generationId == 0) 0 else 1)
.map { hint -> GenerationalViewportHint(generationId, hint) }
}
.runningReduce { previous, next ->
if (next.shouldPrioritizeOver(previous, loadType)) next else previous
}
.conflate()
.collect { generationalHint ->
// doLoad 方法跟 doInitialLoad 类似
// 1. 变更状态
// 2. 使用PageSource进行请求
// 3. 获取结果发送 Insert 事件
// 4. 根据需要是否使用RemoteMediator
doLoad(state, loadType, generationalHint)
}
}
最后一部分的 doLoad
跟第三部分的 doInitialLoad
类似,当然也有一些不一样,比如加载数据的时候,它会判断当前的数据位置距已加载数据的末尾或者头部是否小于一个阈值(创建PagerConfig
时设置),这个条件成立的时候才会加载更多。
看到这里,你可能有点晕,没关系,我们用一张图来总结我们之前讲解的部分:
看了这部分,似乎还剩下一些问题没搞清:
- 除了初始化的那部分数据加载外,Ui是如何驱动加载更多的数据的?
四、 数据的消费
1 具体的消费行为
从上游返回的数据中,我们得到了 PagingData<T>
,来看看适配器 PagingDataAdapter
是如何处理这些数据的:
abstract class PagingDataAdapter<T : Any, VH : RecyclerView.ViewHolder> @JvmOverloads constructor(
diffCallback: DiffUtil.ItemCallback<T>,
mainDispatcher: CoroutineDispatcher = Dispatchers.Main,
workerDispatcher: CoroutineDispatcher = Dispatchers.Default
) : RecyclerView.Adapter<VH>() {
private val differ = AsyncPagingDataDiffer(
diffCallback = diffCallback,
updateCallback = AdapterListUpdateCallback(this),
mainDispatcher = mainDispatcher,
workerDispatcher = workerDispatcher
)
// ...
suspend fun submitData(pagingData: PagingData<T>) {
differ.submitData(pagingData)
}
// ...
}
PagingDataAdapter
也没有亲自处理,而是把它交给了 AsyncPagingDataDiffer
,就像 PagingDataAdapter
注释中所说的,PagingDataAdapter
只是 AsyncPagingDataDiffer
的壳,同样,AsyncPagingDataDiffer
又把这个烫手的山芋交给了 PagingDataDiffer
。
abstract class PagingDataDiffer<T : Any>(
private val differCallback: DifferCallback,
private val mainDispatcher: CoroutineDispatcher = Dispatchers.Main
) {
// ...
suspend fun collectFrom(pagingData: PagingData<T>) = collectFromRunner.runInIsolation {
// 1. 给当前的receiver赋值
receiver = pagingData.receiver
pagingData.flow.collect { event ->
withContext<Unit>(mainDispatcher) {
// 切换到主线程
if (event is PageEvent.Insert && event.loadType == LoadType.REFRESH) {
// 1. 当前是插入事件并且当前是刷新的场景
lastAccessedIndexUnfulfilled = false
// 2. PagePresenter负责管理本地数据,生成一个新的PagePresenter
val newPresenter = PagePresenter(event)
// 3. 重新计算加载数据的位置
val transformedLastAccessedIndex = presentNewList(
previousList = presenter,
newList = newPresenter,
newCombinedLoadStates = event.combinedLoadStates,
lastAccessedIndex = lastAccessedIndex
)
presenter = newPresenter
// Dispatch LoadState + DataRefresh updates as soon as we are done diffing,
// but after setting presenter.
dataRefreshedListeners.forEach { listener ->
listener(event.pages.all { page -> page.data.isEmpty() })
}
// 4. 通知状态的变化
dispatchLoadStates(event.combinedLoadStates)
// 5. 如果数据加载位置发生变化了,则使用receiver发送通知
transformedLastAccessedIndex?.let { newIndex ->
lastAccessedIndex = newIndex
receiver?.accessHint(
newPresenter.viewportHintForPresenterIndex(newIndex)
)
}
} else {
// ...
}
}
}
}
}
我们以 Refresh
为例,简单讲解一下如何消费数据:
1.1 缓存数据,并通知UI更新
在 Refresh
的情况下,会先建一个数据管理器,这里对应的是 PagePresenter
。
接着就是通知数据刷新,这里的 presentNewList
方法是交给子类去实现的:
private val differBase = object : PagingDataDiffer<T>(differCallback, mainDispatcher) {
override suspend fun presentNewList(
previousList: NullPaddedList<T>,
newList: NullPaddedList<T>,
newCombinedLoadStates: CombinedLoadStates,
lastAccessedIndex: Int
) = when {
// fast path for no items -> some items
previousList.size == 0 -> {
// 第一次刷新的通知新的数据插入
differCallback.onInserted(0, newList.size)
null
}
// fast path for some items -> no items
newList.size == 0 -> {
differCallback.onRemoved(0, previousList.size)
null
}
else -> {
val diffResult = withContext(workerDispatcher) {
previousList.computeDiff(newList, diffCallback)
}
previousList.dispatchDiff(updateCallback, newList, diffResult)
previousList.transformAnchorIndex(
diffResult = diffResult,
newList = newList,
oldPosition = lastAccessedIndex
)
}
}
第一次刷新如果有数据回来就是第一种情况,直接使用 differCallback
去通知有数据新增了,当然,这些都会通知到我们的适配器 PagingAdapter
去调用对应的方法。
这里你可能会有一点疑问,适配器 PagingAdapter
并不持有任何数据,那它怎么获取到数据呢?
其实 PagingAdapter
复写了 getItem
方法,去除层层嵌套,最后也使用了 PagingDataDiffer
:
abstract class PagingDataDiffer<T : Any>(
private val differCallback: DifferCallback,
private val mainDispatcher: CoroutineDispatcher = Dispatchers.Main
) {
// ...
operator fun get(@IntRange(from = 0) index: Int): T? {
lastAccessedIndexUnfulfilled = true
lastAccessedIndex = index
receiver?.accessHint(presenter.viewportHintForPresenterIndex(index))
return presenter.get(index)
}
}
所以 getItem
方法还是通过数据管理者 PagePresenter
实现的,除此以外,每次获取数据的时候,都会调用 UiReceiver#accessHint
的方法,当你仍有数据需要加载并且当前展示位置距数据末尾小于一定的阈值的时候,这时会触发 doLoad
方法,这会让 Pager
加载更多的数据。
1.2 通知数据状态更新
回到 PagingDataDiffer#collect
方法中, 处理完上述的事以后,会更新状态:
abstract class PagingDataDiffer<T : Any>(
private val differCallback: DifferCallback,
private val mainDispatcher: CoroutineDispatcher = Dispatchers.Main
) {
// ...
private fun dispatchLoadStates(states: CombinedLoadStates) {
if (combinedLoadStates.snapshot() == states) return
combinedLoadStates.set(states)
loadStateListeners.forEach { it(states) }
}
}
这些状态可以用来干嘛呢?可以处理与用户的状态,比如,刷新错误可以切换错误的界面等等。
1.3 计算数据加载位置是否发生变化
如果不是第一次刷新,并且一些数据源发生变化的时候,比如删除或者新增数据,原来的一些位置信息就不准确了,则需要让调用 receiver.accessHint
方法发送通知。
非 Refresh
的情况最终所做的事情跟 Refresh
类似,就不再赘述了。
总结一下消费过程,用一张图描述:
两张图总结一下,我们开始的第一、三问题就清晰了:
整个
Paging 3
的流程都是围绕这个Flow<PageEvent<T>>
的,状态和数据变化了通过它发送,UI则通过它监听到数据,最后通知到数据和状态的观察者。
总结
看完 Paging 3
的源码,感受有两点:
- 第一点:原来协程可以使用的这么出神入化,第一次感觉到,我使用的协程和大佬使用协程不是一个东西,哭了~
- 第二点:整个协程中的消息传送都是通过
Channel
中实现的,结合Flow
来写,确实也比较简洁流畅。
下一篇文章中,我会和大家讨论我是如何在起点读书的业务中使用 Paging 3
的。
感谢阅读,如果你有其他的看法,欢迎下方留言讨论,如果觉得本文不错,三连是对作者最大的鼓励~