作者: @怪盗kidou
如需转载需在明显位置保留作者信息及原文链接
如果博客中有不恰当之处欢迎留言交流
http://www.jianshu.com/p/59c3d6bb6a6b
前言
距离上次写博客又是一个多月了,这次来聊一聊 RxJava 的线程变换,即 observeOn(Scheduler) 和 subscribeOn(Scheduler) 的作用范围。
为什么要做线程变换
最最简单的例子就是 Anroid 中数据获取、处理需要在后台进行,而展示的过程则需要在UI线程中执行,如果你没有搞懂 observeOn 和 subscribeOn 的作用范围,那你以后写程序的时候 Crash 就会对你不弃不离了,当然了你读了这篇博客就可以很轻松搞定了!
observeOn
先说结论:observeOn作用于该操作符之后操作符直到出现新的observeOn操作符
举个例子:
Observable.just("RxJava")
.observeOn(getNamedScheduler("map之前的observeOn"))
.map(s -> {
threadInfo(".map()-1");
return s + "-map1";
})
.map( s -> {
threadInfo(".map()-2");
return s + "-map2";
})
.observeOn(getNamedScheduler("subscribe之前的observeOn"))
.subscribe(s -> {
threadInfo(".onNext()");
System.out.println(s + "-onNext");
});
结果如下:
.map()-1 => map之前的observeOn
.map()-2 => map之前的observeOn
.onNext() => subscribe之前的observeOn
RxJava-map1-map2-onNext
observeOn作用域图示:
subscribeOn
同样先说结论:subscribeOn 作用于该操作符之前的 Observable 的创建操符作以及 doOnSubscribe 操作符 ,换句话说就是 doOnSubscribe 以及 Observable 的创建操作符总是被其之后最近的 subscribeOn 控制 。没看懂不要紧,看下面代码和图你就懂了。
举个例子:
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
threadInfo("OnSubscribe.call()");
subscriber.onNext("RxJava");
}
})
.subscribeOn(getNamedScheduler("create之后的subscribeOn"))
.doOnSubscribe(() -> threadInfo(".doOnSubscribe()-1"))
.subscribeOn(getNamedScheduler("doOnSubscribe1之后的subscribeOn"))
.doOnSubscribe(() -> threadInfo(".doOnSubscribe()-2"))
.subscribe(s -> {
threadInfo(".onNext()");
System.out.println(s + "-onNext");
});
结果如下:
.doOnSubscribe()-2 => main
.doOnSubscribe()-1 => doOnSubscribe1之后的subscribeOn
OnSubscribe.call() => create之后的subscribeOn
.onNext() => create之后的subscribeOn
RxJava-onNext
subscribeOn作用域图示:
3号框中的.doOnSubscribe(() -> threadInfo(".doOnSubscribe()-2"))
的之后由于没有subscribeOn
操作符所以回调在该段代码被调用的线程(即主线程)
由于 subscribe 之前 没有 使用observeOn 指定Scheduler,所以.onNext()
的线程是和OnSubscribe.call()
使用相同的Scheduler 。
其它说明
博文中所谓的 之后、之前 不代表中间没有其它操作符。
doOnSubscribe
与onStart
类似,均在代码调用时就会回调,但doOnSubscribe
可以通过subscribeOn
操作符改变运行的线程且越在后面运行越早。
本文中用到的其它方法
public static Scheduler getNamedScheduler(String name) {
return Schedulers.from(Executors.newCachedThreadPool(r -> new Thread(r, name)));
}
public static void threadInfo(String caller) {
System.out.println(caller + " => " + Thread.currentThread().getName());
}
我最近刚刚开通了微信公众号(怪盗kidou),欢迎关注