喜欢的话,评论区留言点赞哦。
Wechat: CoolOriLans 酷奇源语
本篇文章简单分析下 Rxjava 订阅和线程切换简单:
我们先来看一个订阅的流程,这个流程是在之前的基础上,加了些调味剂:
val observable = Observable.create<Int>(object : ObservableOnSubscribe<Int> { // 返回的是 ObservableCreate 实例;
override fun subscribe(emitter: ObservableEmitter<Int>) {
emitter.onNext(1)
emitter.onComplete()
}
})
val observer = object : Observer<Int> {
override fun onSubscribe(d: Disposable) {
val d1 = d // 关键点 4: 这个 d 是 ObservableSubscribeOn.SubscribeOnObserver 对象。
Log.d("rxjava", "Disposable: " + d1.javaClass.canonicalName);
// rxjava : Disposable: io.reactivex.internal.operators.observable.ObservableSubscribeOn.SubscribeOnObserver
}
override fun onNext(t: Int) {}
override fun onError(e: Throwable) {}
override fun onComplete() {}
}
observable // 关键点 1: 是 ObservableCreate 实例;
.observeOn(Schedulers.io()) // 关键点 2: 切换线程1,返回的是 ObservableObserveOn 实例;
.subscribeOn(Schedulers.single()) // 关键点 3: 切换线程2,返回的是 ObservableSubscribeOn 实例;
.subscribe(observer)
本片文章也就是要弄懂上面的这个 4 个关键点:
注意:
1、我们是先调用 observeOn 再调用 subscribeOn,下面的分析也是按照这个顺序,实际上二者的顺序可以不一样。
这里先简单的总结下(其实网上的很多博客都有讲到,在这里我也总结了下)
subscribeOn 和observeOn 都是用来切换线程用的,但是区别如下:
- subscribeOn:决定 Observable 对象在哪个线程上执行,其能改变调用它之前代码的线程;
- observeOn:决定 Subscriber(Observer) 对象在哪个线程上执行,其能改变调用它之后代码的线程;
那么具体是怎么实现的,我们往下慢慢的跟踪代码了;
1 Schedulers - 简单分析
Schedulers 是 Rxjava 的调度器,用于为流提供线程环境,这里先简单的看下,后面会分析:
public final class Schedulers {
@NonNull
static final Scheduler SINGLE; // 单线程模式
@NonNull
static final Scheduler COMPUTATION;
@NonNull
static final Scheduler IO;
@NonNull
static final Scheduler TRAMPOLINE;
@NonNull
static final Scheduler NEW_THREAD;
... ... ... ...
}
Schedulers 默认提供了如上的线程实现。
1.1 Schedulers.io
这里先简单的看下 IO 的实现吧,这样方便我们对线程的切换进行分析:
public static Scheduler io() {
// RxJavaPlugins.onIoScheduler 是 hook 用的,这里不关注
return RxJavaPlugins.onIoScheduler(IO);
}
实际上默认返回的就是上面的 Schedulers.IO:
public final class Schedulers {
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
// 这里会创建 IO,RxJavaPlugins.initIoScheduler 依然是 Hook 专用的
// 默认返回 IOTask
IO = RxJavaPlugins.initIoScheduler(new IOTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
}
我们去看看 IOTask:
1.2 IOTask
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
}
1.3 IoHolder
static final class IoHolder {
// 最终实现为 IoScheduler;;
static final Scheduler DEFAULT = new IoScheduler();
}
实际上:Schedulers 只是一个包装而已,真正的线程切换是基于平台提供的线程工具的:线程池;
这个我们后面再分析哦。
2 Observable
2.1 observeOn
observeOn 用于指定被观察者逻辑所在的 Scheduler:
Observable.create 返回的是一个 ObservableCreate 实例:
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError) {
return observeOn(scheduler, delayError, bufferSize());
}
// 核心方法 3
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
// RxJavaPlugins.onAssembly 用于 Hook 操作,这里不关注
//【-->3】默认返回 ObservableObserveOn 实例:
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
observeOn 有三个重载函数。
最终调用的都是核心方法 3
可以看到,默认其返回了一个 ObservableObserveOn 对象。将 ObservableCreate 包装到了内部,如下;
ObservableObserveOn [ // --> 3
ObservableCreate [
ObservableOnSubscribe [
emitter: ObservableEmitter<Int>
]
]
]
其实脉络已经很清楚了;
2.2 subscribeOn
observeOn 用于指定观察者逻辑所在的 Scheduler:
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
//【-->4.x】进入;
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
可以看到,默认其返回了一个 ObservableSubscribeOn 对象。将 ObservableCreate 包装到了内部!
如下:
ObservableSubscribeOn [ // --> 4
ObservableObserveOn [ // --> 3
ObservableCreate [
ObservableOnSubscribe [
emitter: ObservableEmitter<Int>
]
]
]
]
其实脉络已经很清楚了;
3 ObservableObserveOn
3.1 constructor
ObservableObserveOn 的父类 AbstractObservableWithUpstream 继承了 Observable,所以 ObservableObserveOn 具有 Observable 的能力:
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> { // 如下:
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
// 参数 Scheduler 是调度器;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source); // 是 ObservableCreate 实例;
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
按照例子的顺序,这里的 ObservableSource<T> source
是 ObservableCreate 实例!
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
protected final ObservableSource<T> source;
... ... ...
}
public interface HasUpstreamObservableSource<T> {
ObservableSource<T> source();
}
3.2 subscribeActual
我们回忆下 subscribe 的流程,subscribe 实际上最后会调用 subscribeActual
但是此时执行完 observeOn 后,返回的是 ObservableObserveOn,其将 ObservableCreate 封装在了内部,所以会先执行 ObservableObserveOn.subscribeActual
@Override
protected void subscribeActual(Observer<? super T> observer) { //【-->4.3】是 SubscribeOnObserver 实例
// 核心 1: 如果 scheduler 是 TrampolineScheduler 类型的,那么会直接执行订阅
// 对观察者 observer 不做任何的包装;
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
// 核心 2: 如果是其他类型的,那么这里会通过 scheduler 创建对应的 Worker
Scheduler.Worker w = scheduler.createWorker();
// 核心 3: 然后将观察者 observer 封装成一个 【-->3.3】ObserveOnObserver,然后在执行订阅;
// source 是【-->6.1】 ObservableCreate 实例;
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
这样,相当于发射器通过 ObserveOnObserver 委托代理执行 Observer 的方法了;
source 是 ObservableCreate 实例,这里最终会调用 ObservableCreate.subcribeActual。
只不过参数由 Observer 变成了 ObserveOnObserver(Observer)。
ObservableObserveOn [ // --> 3
ObservableCreate [
ObservableOnSubscribe [
emitter: ObservableEmitter<Int>
]
]
].subscribe -> subscribeActual {
ObserveOnObserver [
SubscribeOnObserver [ // 下面返回的;
]
]
}
3.3 ObserveOnObserver - 切换核心
ObserveOnObserver 是 ObservableObserveOn 的内部类:
3.3.1 Constructor
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
private static final long serialVersionUID = 6576896619930983584L;
final Observer<? super T> downstream; //【1】下游的观察者;
final Scheduler.Worker worker; //【2】线程调度;
final boolean delayError;
final int bufferSize;
SimpleQueue<T> queue;
Disposable upstream; // 表示上游的数据源,
Throwable error;
volatile boolean done;
volatile boolean disposed;
int sourceMode;
boolean outputFused;
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.downstream = actual; //【3】表示下游的数据接收方 Observer 实例,本例中是 SubscribeOnObserver 实例;
this.worker = worker; //【4】线程调度;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
... ... ...
}
由于 ObserveOnObserver 本质也是一个 Observer,他代理了 Observer 一系列方法,我们去看看
3.3.2 onSubscribe
在订阅发生之前会先执行 onSubscribe 方法:
参数 Disposable d 就是 CreateEmitter 发射器!
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
// 核心 1: CreateEmitter 作为发射器,用于发射数据;
this.upstream = d;
if (d instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) d;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
downstream.onSubscribe(this);
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
downstream.onSubscribe(this);
return;
}
}
queue = new SpscLinkedArrayQueue<T>(bufferSize);
// 核心 2: 调用下游的 Observer 的 onSubscribe 方法,将自身传递下去!
// downstream 是 SubscribeOnObserver
downstream.onSubscribe(this);
}
}
3.3.3 onNext
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
// 把执行的结果保存到内部队列里;
queue.offer(t);
}
schedule();
}
3.3.4 onError
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
// 保存错误到 error.
error = t;
done = true;
schedule();
}
3.3.5 onComplete
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
schedule();
}
4 ObservableSubscribeOn
4.1 constructor
同样的,ObservableSubscribeOn 也继承了 AbstractObservableWithUpstream 类,也具有 Observable 的能力:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { // 如下:
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source); // 父类的变量;
this.scheduler = scheduler;
}
注意:按照例子的顺序,这里的 ObservableSource<T> source
是 ObservableObserveOn 实例!(这里不一定是这样,因为调用顺序可以切换)
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
/** The source consumable Observable. */
protected final ObservableSource<T> source;
... ... ...
}
4.2 subscribeActual - 切换核心
@Override
public void subscribeActual(final Observer<? super T> observer) {
//【-->4.3】将 observer 包装成 SubscribeOnObserver 实例;;
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
// 核心:这里立刻回调,这个回调会触发器上面的关键点 4:
observer.onSubscribe(parent);
//【-->】创建了 SubscribeTask 实例,将 SubscribeOnObserver 保存到内部;;
// SubscribeTask 是一个 Runnable 实例;;
//【-->4.3.5】scheduler.scheduleDirect 会返回一个 Disposable 对象。
// SubscribeOnObserver 会持有该对象的原子引用;;
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
4.2.1 SubscribeTask - 切换核心
可以看到 SubscribeTask 实际上就是一个 Runnable 实例;
这里其实已经能够看到线程的切换的实现:将 Observer 封装到 SubscribeTask ( Runnable ) 中,通过 handler / ThreadPool 执行:
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent; //【-->4.3】SubscribeOnObserver 实例
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
// 这里的 source 来自 ObservableSubscribeOn
// 就是 ObservableObserveOn 实例了;
//【-->3.2】这样就进入了 ObservableObserveOn 的 subscribeActual 方法;
source.subscribe(parent);
}
}
4.3 SubscribeOnObserver
是对 Observer 的封装,继承了 AtomicReference<Disposable>
类,内部持有 Disposable 对象的原子引用:
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> downstream; // 下游的观察者;
final AtomicReference<Disposable> upstream;
SubscribeOnObserver(Observer<? super T> downstream) {
this.downstream = downstream; // 这个就是我们的 obsever
this.upstream = new AtomicReference<Disposable>();
}
}
可以的得到如下的关系图了:
ObservableSubscribeOn [ // --> 4
ObservableObserveOn [ // --> 3
ObservableCreate [
ObservableOnSubscribe [
emitter: ObservableEmitter<Int>
]
]
]
].subscribe -> subscribeActual {
SubscribeOnObserver [
Observer [
...
]
]
}
4.3.1 onSubscribe
保存上游的 Disposable 实例:
@Override
public void onSubscribe(Disposable d) {
// 核心 1,这里的 d 是 ObserveOnObserver 实例【-->4.3.1】;
DisposableHelper.setOnce(this.upstream, d);
}
4.3.2 onNext
发送数据:
@Override
public void onNext(T t) {
downstream.onNext(t);
}
4.3.3 onError
发送异常:
@Override
public void onError(Throwable t) {
downstream.onError(t);
}
4.3.4 onComplete
完成回调:
@Override
public void onComplete() {
downstream.onComplete();
}
4.3.5 setDisposable - 核心
设置 Disposable 对象的弱引用,这个 Disposable 对象是由 Scheduler 对 new SubscribeTask(SubscribeOnObserver)) 封装后返回的:
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
4.3.5.1 DisposableHelper.setOnce
DisposableHelper 枚举类封装了对 Disposable 的操作:
public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
ObjectHelper.requireNonNull(d, "d is null");
// 通过 CAS 设置内部的 field 变量为 d;
if (!field.compareAndSet(null, d)) {
// 如果失败的话,那么说明内部已经有 Disposable 对象了
// 那这里会执行 d 的取下操作;
d.dispose();
if (field.get() != DISPOSED) {
reportDisposableSet();
}
return false;
}
return true;
}
不多说了。
4.3.6 dispose
取消订阅:
@Override
public void dispose() {
// 清空对上游的弱引用;
DisposableHelper.dispose(upstream);
// 清空自身对下游的弱引用:SubscribeTask【-->4.2.1】
DisposableHelper.dispose(this);
}
4.3.6 isDisposed
是否取消订阅:
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
5 subscribe 再分析
接下来,我们来看看,在执行了 observeOn 和 subscribeOn 后的执行逻辑:
回顾一下:
observable // 关键点 1: 是 ObservableCreate 实例
.observeOn(Schedulers.io()) // 关键点 2: 被观察者所在的线程,返回的是 ObservableObserveOn 实例;
.subscribeOn(Schedulers.single()) // 关键点 3: 观察者所在的线程,返回的是 ObservableSubscribeOn 实例;
.subscribe(observer)
实际上经过了 observeOn/subscribeOn 方法的执行,整个逻辑变的很复杂了;
5.1 subscribe 回顾
subscribe 方法会调用 subscribeActual 方法:
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
try {
// --> 按照现在的逻辑,进入 4.2 了;
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
... ... ...
throw npe;
}
}
按照上面的包装顺序,会先调用 ObservableSubscribeOn 的 subscribeActual 方法了;
6 ObservableCreate - 回顾
6.1 subscribeActual - 回顾
回顾下 subscribeActual 方法:
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 核心 1: 创建的 CreateEmitter 实例,包装下 Observer;
// 此时的 observer
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 核心 2: 这里的 observer 就是【-->3.3】ObserveOnObserver 实例;
observer.onSubscribe(parent);
try {
// 核心 1: 这里的 source 是 ObservableOnSubscribe 实例;
// 见【-->6.2】ObservableOnSubscribe
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
6.2 ObservableOnSubscribe
回顾下 ObservableOnSubscribe
public interface ObservableOnSubscribe<T> {
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
在这个方法中,我们使用:
emitter.onNext(1)
emitter.onComplete()
6.2.1 onNext
发射数据:
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
// 这里的 observer 实际上是【-->3.3】ObserveOnObserver 实例;
observer.onNext(t);
}
}
7 总结
总结一下哈
7.1 完整订阅的流程
我们回顾下整个流程,下面的这个流程包含了线程切换的流程,以及调度的走势:
(好累,后序补上哦)
7.2 线程切换的关键
被观察者的线程切换的 Mr.Key : ObservableObserveOn 和 ObserveOnObserver
观察者的线程切换的 Mr.Key : ObservableSubscribeOn 和 SubscribeOnObserver
7.2.1 ObservableObserveOn 的关键点
- ObservableObserveOn 内部会讲 Scheduler 保存下来
// 参数 Scheduler 是调度器;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source); // 是 ObservableCreate 实例;
this.scheduler = scheduler;
- subscribeActual 方法,这里会实现订阅,用 ObserveOnObserver 封装 observer,同时传入 Scheduler.Worker 作为调度器:
@Override
protected void subscribeActual(Observer<? super T> observer) { //【-->4.3】是 SubscribeOnObserver 实例
// 核心 1: 如果 scheduler 是 TrampolineScheduler 类型的,那么会直接执行订阅
// 对观察者 observer 不做任何的包装;
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
// 核心 2: 如果是其他类型的,那么这里会通过 scheduler 创建对应的 Worker
Scheduler.Worker w = scheduler.createWorker();
// 核心 3: 然后将观察者 observer 封装成一个 【-->3.3】ObserveOnObserver,然后在执行订阅;
// source 是【-->6.1】 ObservableCreate 实例;
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
- ObserveOnObserver 中的 onNext 通过 schedule() 通过线程池实现在指定线程中执行数据分发:
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
// 执行线程池调度;
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this); // 将自身加入到 worker 中;
}
}
刚刚我们说了 ObserveOn 能改变调用后的逻辑所在的线程,这里其实已经能看到原因了。
按照逻辑,发射器最终会调用 ObserveOnObserver 的 onNext/onComplete 方法,而这个方法会讲 ObserveOnObserver 自身(就是一个 Runnable)放入 worker 中执行调度,后续的操作就会在 worker 所在的线程了(这是一个上游 -> 下游的过程);
到这里,就能够推断,ObserveOn 是可以调多次的,每次都会将下游的回调逻辑通过装饰器的方式切换到对应的线程;
7.2.2 ObservableSubscribeOn 的关键点
- ObservableSubscribeOn 内部会讲 Scheduler 保存下来
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { // 如下:
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source); // 父类的变量;
this.scheduler = scheduler;
}
- subscribeActual 方法,这里会实现订阅,用 ObserveOnObserver 封装 observer,同时传入 Scheduler.Worker 作为调度器:
@Override
public void subscribeActual(final Observer<? super T> observer) {
//【-->4.3】将 observer 包装成 SubscribeOnObserver 实例;;
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
// 这里立刻回调;
observer.onSubscribe(parent);
//【-->】创建了 SubscribeTask 实例,将 SubscribeOnObserver 保存到内部;;
// SubscribeTask 是一个 Runnable 实例;;
//【-->4.3.5】scheduler.scheduleDirect 会返回一个 Disposable 对象。
// SubscribeOnObserver 会持有该对象的原子引用;;
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
- SubscribeTask 本质上就是一个 runnable:
final class SubscribeTask implements Runnable {
//【-->4.3】SubscribeOnObserver 实例
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
// 这里的 source 来自 ObservableSubscribeOn
// 就是 ObservableObserveOn 实例了;
//【-->3.2】这样就进入了 ObservableObserveOn 的 subscribeActual 方法;
source.subscribe(parent);
}
}
刚刚我们说了 subscribeOn 能改变调用前的逻辑所在的线程,这里其实已经能看到原因了。
按照逻辑,这里会将 source.subscribe(parent) 放到 scheduler 指定的线程中执行,而 subscribe 是一个从下游->上游的过程。
source.subscribe(parent) 会按照订阅流程,向上不断的执行 source[x].subscribe(parent) -> source[x].subscribeActual(parent)
一直到最终的 ObservableOnSubscribe
,而这个过程是在 scheduler 指定的线程中执行。
同时,当到 ObservableOnSubscribe
的时候,会通过发射器发送数据,这样又会变成上游 -> 下游,而发生器也是在 ObservableOnSubscribe.subscribe
方法中触发,所以此时依然是在 scheduler 指定的线程中执行。
最终,执行到最近的 ObserveOn 处,这里才会切换线程,这样下游的调度线程都会在 ObserveOn 方法进行切换了;
疑问
上面是先调用 observeOn 再调用 subscribeOn,实际上二者的顺序可以不一样。如果我们反过来呢?
如果反过来的话只是装饰的对象顺序发生了变化,对于 subscribeOn 和 observeOn 的效果依然是没有变化的。
这里不多说了,溜了溜了。