RxJava线程切换流程分析_observeOn

一、执行流程图

流程图

在上一节RxJava2线程切换流程分析_subscribeOn的示例代码中,我们是在 ObservableOnSubscribe#subscribe 中去执行 getBitampFormServer 方法去加载一个 Bitmap 对象,并且也分析了发射器在子线程中发射事件的原理。下面分析的是当成功获取到这个 bitmap 之后如何让 observer 在主线程去接收然后设置给 mImageView 对象。

二、observeOn(AndroidScheduler.mainThread())

  • mainThread()

根据 mainThread() 源码的调用关系来看,最终返回的是 HandlerScheduler 对象,HandlerScheduler 是一个 Scheduler 的子类,其内部封装了一个可以在主线程发送消息的 handler 对象。看到这里就大概明白了,将 observer 切换到主线程去接收事件,内部就是通过一个可以在主线程发送消息的 Handler 去实现的。

public static Scheduler mainThread() {
    return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}

private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
        new Callable<Scheduler>() {
            @Override public Scheduler call() throws Exception {
                return MainHolder.DEFAULT;
            }
        });


private static final class MainHolder {
    //HandlerScheduler 内装了一个可以在主线程发送消息的 handler 对象
    static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}

//HandlerScheduler 
final class HandlerScheduler extends Scheduler {}
  • observerOn()

在 observeOn 内部源码的调用关系可以看到,最终是返回一个 ObservableObserveOn 对象,它是 Observable 的子类对象。从上一节的源码分析中,我们知道每次新创建的 Observable 对象都是需要去订阅对应的 observer 之后才能发送事件的。因此在发生订阅关系时,会回调 subscribeActual(observer) 方法。下面我们分析 ObservableObserveOn#subscribeActual 的内部实现。

public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    //返回一个 ObservableObserveOn 对象
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
  • ObservableObserveOn#subscribeActual(observer)

该方法内部通过 HandlerScheduler 创建一个 worker 用于去执行一个任务,因为内部维护了具备 MainLooper 的 Hadnler, 因此它具备在主线程执行任务的功能。

@Override
protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        //这里的 scheduler 就是 HandlerScheduler 对象
        Scheduler.Worker w = scheduler.createWorker();
        //source 就是上一级 subscribeOn 中创建的 ObservableSubscribeOn 对象
        //内部创建一个 ObserveOnObserver 包装 传入的 observer 对象。
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

ObserveOnObserver 内部将事件切换到主线程运行呢?

  • onNext
@Override
public void onNext(T t) {
    if (done) {
        return;
    }
    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
    //核心代码
    schedule();
}
  • schedule()

该方法是负责去执行将上一级发送过来的任务交给下一级 observer 去处理。因为 ObserveOnObserver 是实现了 Runnable 接口,因此 this 就是表示 ObserveOnObserver 对象。所以任务被执行的话,那么当前 ObserveOnObserver 的 run 方法就会被执行。

void schedule() {
    if (getAndIncrement() == 0) {
        //通过 worker 去执行这个任务
        worker.schedule(this);
    }
}
  • worker.schedule

内部通过 handler 发送 Message ,注意该 Message 的 Callback 是被赋值的了,对应的值就是 ScheduledRunnable 对象。

@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
    ...
    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
    //这里 scheduled 是做为第二个参数,内部会给 Message 的 callback 赋值,这个会在接受消息那里使用。
    Message message = Message.obtain(handler, scheduled);
    message.obj = this; // Used as token for batch disposal of this worker's runnables.
    //切换线程核心代码:通过 handler 将其切换到主线程执行
    handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

    if (disposed) {
        handler.removeCallbacks(scheduled);
        return Disposables.disposed();
    }
    return scheduled;
}
  • 接受发送的事件

我们知道通过 Handler#send 的方式发送的消息最终都会在回调 Handler 的 dispatchMessage(Message) 方法进行分发操作。在上面 Message.obtain() 方法已经为 msg.callback 赋值了,因此在这里会调用 handleCallback 方法。

public void dispatchMessage(Message msg) {
    if (msg.callback != null) {
        handleCallback(msg);
    } else {
        if (mCallback != null) {
            if (mCallback.handleMessage(msg)) {
                return;
            }
        }
        handleMessage(msg);
    }
}
  • handleCallback

这里可以知道,原始消息的 callback 的 run 方法会被执行。该消息是在 HandlerScheduler#HandlerWorker.schedule 中调用。也即是 ScheduledRunnable 会被调用,而 ScheduledRunnable 内部包装了 ObserveOnObserver 这个 Runnble 对象,因此 ObserveOnObserver 内部的 run 方法会被执行。

private static void handleCallback(Message message) {
    message.callback.run();
}
  • ObserveOnObserver#run()
@Override
public void run() {
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
    }
}
  • drainNormal()

在这里 actual.onNext(v) 往下传递事件。至此,事件通过 observeOn 方法就可以让 observer 在主线程中去接收事件。

void drainNormal() {
    int missed = 1;
    final SimpleQueue<T> q = queue;
    //这个 actual 就是下一级的 Observer 对象。
    final Observer<? super T> a = actual;
    for (;;) {
        if (checkTerminated(done, q.isEmpty(), a)) {
            return;
        }
        for (;;) {
            boolean d = done;
            T v;
            try {
                v = q.poll();
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                s.dispose();
                q.clear();
                a.onError(ex);
                return;
            }
            boolean empty = v == null;
            if (checkTerminated(d, empty, a)) {
                return;
            }
            if (empty) {
                break;
            }
            //内部就是通过 a 再往传递的。
            a.onNext(v);
        }
        missed = addAndGet(-missed);
        if (missed == 0) {
            break;
        }
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342

推荐阅读更多精彩内容