目录
一、Is what 是什么
二、Concept 概念
三、Basic realization 基本实现
四、Scheduler 线程控制(上)
五、Scheduler 线程控制(下)
六、变换
因个人学习需要,故文章内容均为网上摘抄整理,感谢创作者的辛勤,源文章地址请看文末。
API(二)
通过subscribeOn()
和observeOn()
控制线程,让事件的产生和消费发生在不同的线程。同样,在使用变换的方法(map()
、flatMap()
)时,可以多次切换线程。
线程多次切换
observeOn()
指定的是Subscriber
的线程,而这个Subscriber
并不是(严格说应为『不一定是』)subscriber()
参数中的Subscriber
,而是observeOn()
执行时的当前Observable
所对应的Subscriber
,即它的直接下级Subscriber
。换句话说,observeOn()
指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个切换线程的位置调用一次observeOn()
即可,示例如下:
Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map(mapOperator) // 新线程,由 observeOn() 指定
.observeOn(Schedulers.io())
.map(mapOperator2) // IO 线程,由 observeOn() 指定
.observeOn(AndroidSchedulers.mainThread)
.subscribe(subscriber); // Android 主线程,由 observeOn() 指定
不同于
observeOn()
,subscribeOn()
的位置放在哪里都可以,只被调用一次。
含多个subscribeOn()
时,只有第一个会起作用。
Scheduler 的原理(二)
subscribeOn()
和observeOn()
的内部实现,是用lift()
。
subscribeOn()
示意图:
observeOn()
示意图:
相同点:subscribeOn()
和observeOn()
都做了线程切换的操作(图中"schedule..."部分)。
不同点:
-
subscribeOn()
的线程切换发生在OnSubscribe
中,即在它通知上一级OnSubscribe
时,此时事件还没有开始发送,因此subscribeOn()
的线程控制可以从事件发出的开端就造成影响; -
observaOn()
的线程切换发生在它内建的Subscriber
中,即发生在它即将给下级Subscriber
发送事件时,因此observeOn()
控制的是它后面的线程。
下面用一张图来解释当多个 subscribeOn()
和 observeOn()
混合使用时,线程调度是怎么发生的:
图中含 5 处对事件的操作:
①和②两处受第一个 subscribeOn()
影响,运行在红色线程;
③和④处受第一个 observeOn()
影响,运行在绿色线程;
⑤处受第二个 observeOn()
影响,运行在紫色线程;
第二个 subscribeOn()
,由于在通知过程中线程就被第一个 subscribeOn()
截断,因此对整个流程并没有任何影响。
doOnSubscribe()
虽然超过一个的subscribeOn()
对事件处理的流程没有影响,但在流程之前却可以利用。
因为Subscriber
的onStart()
可以用作流程开始前的初始化,而onStart()
在subscribe()
发生时就被调用了,因此不能指定线程,只能执行在subscribe()
被调用时的线程。试想,如果onStart()
中含有对线程有要求的代码(例:在界面显示ProgressBar,必须在主线程执行),将会有线程非法的风险,因为有时无法预测subscribe()
将会在什么线程执行。
与Subscriber.onStart()
相对应的方法Observable.doOnSubscribe()
。和Subscriber.onStart()
同样是在sbscribe()
调用后且事件发送前执行,区别在于可以指定线程。
- 默认情况:
doOnSubscribe()
执行在subscribe()
发生的线程; -
doOnSubscribe()
后有subscribeOn()
时,将执行在离他最近的subscribeOn()
所指定的线程。
示例:
Observable.create(onSubscribe)
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0() {
@Override
public void call() {
progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行
}
})
.subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);