一、执行流程图
在上一节RxJava2线程切换流程分析_subscribeOn的示例代码中,我们是在 ObservableOnSubscribe#subscribe 中去执行 getBitampFormServer 方法去加载一个 Bitmap 对象,并且也分析了发射器在子线程中发射事件的原理。下面分析的是当成功获取到这个 bitmap 之后如何让 observer 在主线程去接收然后设置给 mImageView 对象。
二、observeOn(AndroidScheduler.mainThread())
- mainThread()
根据 mainThread() 源码的调用关系来看,最终返回的是 HandlerScheduler 对象,HandlerScheduler 是一个 Scheduler 的子类,其内部封装了一个可以在主线程发送消息的 handler 对象。看到这里就大概明白了,将 observer 切换到主线程去接收事件,内部就是通过一个可以在主线程发送消息的 Handler 去实现的。
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
private static final class MainHolder {
//HandlerScheduler 内装了一个可以在主线程发送消息的 handler 对象
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
//HandlerScheduler
final class HandlerScheduler extends Scheduler {}
- observerOn()
在 observeOn 内部源码的调用关系可以看到,最终是返回一个 ObservableObserveOn 对象,它是 Observable 的子类对象。从上一节的源码分析中,我们知道每次新创建的 Observable 对象都是需要去订阅对应的 observer 之后才能发送事件的。因此在发生订阅关系时,会回调 subscribeActual(observer) 方法。下面我们分析 ObservableObserveOn#subscribeActual 的内部实现。
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
//返回一个 ObservableObserveOn 对象
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
- ObservableObserveOn#subscribeActual(observer)
该方法内部通过 HandlerScheduler 创建一个 worker 用于去执行一个任务,因为内部维护了具备 MainLooper 的 Hadnler, 因此它具备在主线程执行任务的功能。
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//这里的 scheduler 就是 HandlerScheduler 对象
Scheduler.Worker w = scheduler.createWorker();
//source 就是上一级 subscribeOn 中创建的 ObservableSubscribeOn 对象
//内部创建一个 ObserveOnObserver 包装 传入的 observer 对象。
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
ObserveOnObserver 内部将事件切换到主线程运行呢?
- onNext
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
//核心代码
schedule();
}
- schedule()
该方法是负责去执行将上一级发送过来的任务交给下一级 observer 去处理。因为 ObserveOnObserver 是实现了 Runnable 接口,因此 this 就是表示 ObserveOnObserver 对象。所以任务被执行的话,那么当前 ObserveOnObserver 的 run 方法就会被执行。
void schedule() {
if (getAndIncrement() == 0) {
//通过 worker 去执行这个任务
worker.schedule(this);
}
}
- worker.schedule
内部通过 handler 发送 Message ,注意该 Message 的 Callback 是被赋值的了,对应的值就是 ScheduledRunnable 对象。
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
...
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
//这里 scheduled 是做为第二个参数,内部会给 Message 的 callback 赋值,这个会在接受消息那里使用。
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
//切换线程核心代码:通过 handler 将其切换到主线程执行
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
- 接受发送的事件
我们知道通过 Handler#send 的方式发送的消息最终都会在回调 Handler 的 dispatchMessage(Message) 方法进行分发操作。在上面 Message.obtain() 方法已经为 msg.callback 赋值了,因此在这里会调用 handleCallback 方法。
public void dispatchMessage(Message msg) {
if (msg.callback != null) {
handleCallback(msg);
} else {
if (mCallback != null) {
if (mCallback.handleMessage(msg)) {
return;
}
}
handleMessage(msg);
}
}
- handleCallback
这里可以知道,原始消息的 callback 的 run 方法会被执行。该消息是在 HandlerScheduler#HandlerWorker.schedule 中调用。也即是 ScheduledRunnable 会被调用,而 ScheduledRunnable 内部包装了 ObserveOnObserver 这个 Runnble 对象,因此 ObserveOnObserver 内部的 run 方法会被执行。
private static void handleCallback(Message message) {
message.callback.run();
}
- ObserveOnObserver#run()
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
- drainNormal()
在这里 actual.onNext(v) 往下传递事件。至此,事件通过 observeOn 方法就可以让 observer 在主线程中去接收事件。
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
//这个 actual 就是下一级的 Observer 对象。
final Observer<? super T> a = actual;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
//内部就是通过 a 再往传递的。
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}