rxjava 的东西是很多的,难免有理解错误的地方,这两天面试碰到有人问 subscribeOn/observeOn 线程切换的问题,我回答完,面试官明显不满意,回来找了找资料,还真是自己理解错了,有必要专门写一篇文章出来。
例子1 : subscribeOn/observeOn 最简单使用
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d("AA", "数据源" + Thread.currentThread().getName());
emitter.onNext("");
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("AA", "监听者" + Thread.currentThread().getName());
}
});
按照之前的理解:
- subscribeOn 是决定 observable 中产生数据的方法执行在哪个线程
- observeOn 是决定 observer 消费数据的方法执行在哪个线程
我们看这个最简单的例子,的确是这样,那么更复杂的情况呢。
例子2:subscribeOn/observeOn 连着重复写,哪个为准
还是以上面那个最简单的例子来
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d("AA", "数据源" + Thread.currentThread().getName());
emitter.onNext("");
}
})
.subscribeOn(Schedulers.io())
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.io())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("AA", "监听者" + Thread.currentThread().getName());
}
});
从结果来看:
- 多个 subscribeOn 连着写,以第一个为准
- 多个 observeOn 连着写,以最后一个为准
例子3 :添加多个操作符呢
rxjava 中的操作符基本都会生成一个新的 observable 出来,上下游的关系就复杂了,情况会不会有变化呢,这个例子就复杂了
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d("AA", "数据源" + Thread.currentThread().getName());
emitter.onNext("");
}
})
.subscribeOn(Schedulers.computation())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d("AA", "第1次变化" + Thread.currentThread().getName());
return "";
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d("AA", "第2次变化" + Thread.currentThread().getName());
return "";
}
})
.observeOn(Schedulers.io())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d("AA", "第3次变化" + Thread.currentThread().getName());
return "";
}
})
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d("AA", "第4次变化" + Thread.currentThread().getName());
return "";
}
})
.observeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("AA", "监听者" + Thread.currentThread().getName());
}
});
从结果看:
- subscribeOn 决定 Observable.create 的执行线程,之后再写 subscribeOn ,无论是挨着写,还是隔着操作符写都没有意思
- subscribeOn 决定数据源的执行线程后,也会当前线程置为这个线程,若无其他设置,之后操作符的操作也是在当前线程执行,也就是 subscribeOn 指定的线程
- observeOn 不仅仅可以决定 .subscribe 执行的线程,更能够更改 observeOn 之后书写的操作符的执行线程,也就是可以切换当前线程。
例子4:用 observeOn 给多个操作符切换线程
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d("AA", "数据源" + Thread.currentThread().getName());
emitter.onNext("");
}
})
.subscribeOn(Schedulers.computation())
.observeOn( Schedulers.io() )
.observeOn(AndroidSchedulers.mainThread())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d("AA", "第1次变化" + Thread.currentThread().getName());
return "";
}
})
.observeOn(Schedulers.io())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d("AA", "第2次变化" + Thread.currentThread().getName());
return "";
}
})
.observeOn(Schedulers.computation())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d("AA", "第3次变化" + Thread.currentThread().getName());
return "";
}
})
.observeOn(AndroidSchedulers.mainThread())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d("AA", "第4次变化" + Thread.currentThread().getName());
return "";
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("AA", "监听者" + Thread.currentThread().getName());
}
});
从结果看:
- observeOn 的的确确是可以 rxjava 所在线程
好了来说说原理
因为 observeOn() 指定的是 Subscriber 的线程,而这个 Subscriber 并不是(严格说应该为『不一定是』,但这里不妨理解为『不是』)subscribe() 参数中的 Subscriber ,而是 observeOn() 执行时的当前 Observable 所对应的 Subscriber ,即它的直接下级 Subscriber 。换句话说,observeOn() 指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次 observeOn() 即可。 ——扔物线
摘自:拥抱RxJava(番外篇):关于RxJava的Tips & Tricks ,推荐大家去看看原文
我们翻翻源码呢,看看能不能简单的走一下逻辑
- 我们可以看到 map 是生成了一个新的 observable 出来,这个 observable 还有我们的变化数据的接口类
- 在这个 新的 observable 里面,有上面的 observable 对象引用,然后给这个上面的 observable 对象注册了一个新的观察者进来
- 这个新的观察者即是一个 observer,但同时还是一个 observable ,这个新的观察者在数据生成方法中接受上一级 observable 发送过来的数据,然后根据我们传入的数据变换接口对象计算出新的数据,最后发送给消费者或是下一级
不是很好理解,但是大概应该是这个意思
换个更容易理解的描述:
- subscribeOn 决定的是上游线程,上游切换到哪个线程,下游要是不改的话,rxjava 就在这个线程一直跑
- observeOn 决定的是下游线程
- 整个 rxjava 中严格说来真正的上游只有一个,那就是产生数据的位置,比如 .just / ,create,其他任何变换和操作符,注册都是下游。
- 所以 subscribeOn 只有第一次切换有效,作用范围也是最小的,就是 .just / ,create
- 基本上操作符都会生成一个新的 observable 出来,和之前的 observable 关联(其实也是注册到之前的 observable)。所以在一个操作的范围来看,前一个 observable 发送数据给我,算是上游,我这个操作符消费数据,产生新的 observable ,算是下游
- 所以 observeOn 可以多次切换他之后的操作符的线程