Rxjava2.2.1(4) observeOn 线程切换-源码分析

rxjava代码

Observable
    .create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            emitter.onNext("有情况");
        }
    })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {

        }

        @Override
        public void onNext(String s) {
            Log.e("qwer", s);
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    });

然后create和subscribe也不讲了(可以看前面文章)
1、直接看observeOn

public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}

再进入重载的observeOn方法

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

再进入ObservableObserveOn

public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
    super(source);
    this.scheduler = scheduler;
    this.delayError = delayError;
    this.bufferSize = bufferSize;
}

同样的套路,完成赋值
通过前面的三篇文章,我们已经知道接下来会进入ObservableObserveOn的subscribeActual方法

protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();

        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

很显然,scheduler 不是 TrampolineScheduler类型,也就是进入else代码中,调用了scheduler.createWorker();
2、这个时候必须要先具体了解一下scheduler了
这个scheduler就是我们传的AndroidSchedulers.mainThread()
进入该方法

public static Scheduler mainThread() {
    return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}

其实这里onMainThreadScheduler返回的就是本身
3、所以继续看MAIN_THREAD

private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
    new Callable<Scheduler>() {
        @Override public Scheduler call() throws Exception {
            return MainHolder.DEFAULT;
        }
    });

进入RxAndroidPlugins.initMainThreadScheduler

public static Scheduler initMainThreadScheduler(Callable<Scheduler> scheduler) {
    if (scheduler == null) {
        throw new NullPointerException("scheduler == null");
    }
    Function<Callable<Scheduler>, Scheduler> f = onInitMainThreadHandler;
    if (f == null) {
        return callRequireNonNull(scheduler);
    }
    return applyRequireNonNull(f, scheduler);
}

因为f为空(为什么为空,就是我没有对onInitMainThreadHandler进行赋值),所以返回了callRequireNonNull(scheduler),进入该方法

static Scheduler callRequireNonNull(Callable<Scheduler> s) {
    try {
        Scheduler scheduler = s.call();
        if (scheduler == null) {
            throw new NullPointerException("Scheduler Callable returned null");
        }
        return scheduler;
    } catch (Throwable ex) {
        throw Exceptions.propagate(ex);
    }
}

其实就是返回了s.call(),而s.call()是什么,不错,就是步骤3最开始的
return MainHolder.DEFAULT;
4、继续查看 MainHolder.DEFAULT

private static final class MainHolder {
    static final Scheduler DEFAULT
        = new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
}

!!!发现重点
这里返回了一个HandlerScheduler,构造函数参数还放了一个
new Handler(Looper.getMainLooper()),是不是感觉有点感觉
进入HandlerScheduler

HandlerScheduler(Handler handler, boolean async) {
    this.handler = handler;
    this.async = async;
}

只是赋值,一个主线程handler,然后async是false
而这个时候我们联系步骤1最后是不是应该看它的createWorker方法

public Worker createWorker() {
    return new HandlerWorker(handler, async);
}

好,所以此时回到步骤1的最后,Scheduler.Worker就是HandlerWorker
5、顺着步骤1最后的else代码继续看
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
source.subscribe就不用讲了,直接看ObserveOnObserver

ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
    this.downstream = actual;
    this.worker = worker;
    this.delayError = delayError;
    this.bufferSize = bufferSize;
}

一系列的赋值操作
嗯哼?是不是好像结束了??
不存在的!!!
注意它可是Observer,注意它的方法
6、首先看onSubscribe方法,因为它会被第一个调用(看过第一篇文章的都知道),它也是我们自己new的观察者的第一个回调方法

public void onSubscribe(Disposable d) {
    if (DisposableHelper.validate(this.upstream, d)) {
        this.upstream = d;
        if (d instanceof QueueDisposable) {
            @SuppressWarnings("unchecked")
            QueueDisposable<T> qd = (QueueDisposable<T>) d;

            int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

            if (m == QueueDisposable.SYNC) {
                sourceMode = m;
                queue = qd;
                done = true;
                downstream.onSubscribe(this);
                schedule();
                return;
            }
            if (m == QueueDisposable.ASYNC) {
                sourceMode = m;
                queue = qd;
                downstream.onSubscribe(this);
                return;
            }
        }

        queue = new SpscLinkedArrayQueue<T>(bufferSize);

        downstream.onSubscribe(this);
    }
}

根据之前的代码我们知道这个d不是QueueDisposable
所以直接跳过if
对queue 进行了初始化
这里queue 说一下,虽然不知道具体细节,单可以肯定的是,他是一个队列
然后继续往下看downstream.onSubscribe(this)完成Disposable的继续传递
7、接下来再看什么?是不是就是我们的onNext()方法了

public void onNext(T t) {
    if (done) {
        return;
    }

    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
    schedule();
}

因为我们知道这个t肯定要往下传,所以done为false,sourceMode 也不等于QueueDisposable.ASYNC
然后我们被观察者发送的数据t就被压入了队列queue里去了
8、然后执行schedule()

void schedule() {
    if (getAndIncrement() == 0) {
        worker.schedule(this);
    }
}

这里getAndIncrement()应该是线程任务数(我猜的,返回这里肯定要为0)
然后进入schedule方法

public Disposable schedule(@NonNull Runnable run) {
    return schedule(run, 0L, TimeUnit.NANOSECONDS);
}
走了重载的方法,而且schedule是个抽象方法

9、那此时你还记得这个worker是谁了么?
不错,看步骤4的最后,就是HandlerWorker,我们看它的schedule方法

public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
    if (run == null) throw new NullPointerException("run == null");
    if (unit == null) throw new NullPointerException("unit == null");

    if (disposed) {
        return Disposables.disposed();
    }

    run = RxJavaPlugins.onSchedule(run);

    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

    Message message = Message.obtain(handler, scheduled);
    message.obj = this; // Used as token for batch disposal of this worker's runnables.

    if (async) {
        message.setAsynchronous(true);
    }

    handler.sendMessageDelayed(message, unit.toMillis(delay));

    // Re-check disposed state for removing in case we were racing a call to dispose().
    if (disposed) {
        handler.removeCallbacks(scheduled);
        return Disposables.disposed();
    }

    return scheduled;
}

在代码里只需要注意重点
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run)
这个handler是主线程handler(步骤4)
这个run就是步骤5里new的那个ObserveOnObserver,它实现了Runnable接口
下面执行代码
handler.sendMessageDelayed(message, unit.toMillis(delay))
好,此时run线程被执行(如果不知道为什么可以好好看看handler基础),而且是在主线程执行!!!说明线程已经切换到主线程了
那么接下来呢?
不错!回到步骤5里new的那个ObserveOnObserver的run方法

public void run() {
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
    }
}

这里outputFused为false(赋值true的地方我没有执行,其实具体看drainFused和drainNormal两个方法的代码,也可以辨别出其为false)
10、然后进入drainNormal方法

void drainNormal() {
    int missed = 1;

    final SimpleQueue<T> q = queue;
    final Observer<? super T> a = downstream;

    for (;;) {
        if (checkTerminated(done, q.isEmpty(), a)) {
            return;
        }

        for (;;) {
            boolean d = done;
            T v;

            try {
                v = q.poll();
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                disposed = true;
                upstream.dispose();
                q.clear();
                a.onError(ex);
                worker.dispose();
                return;
            }
            boolean empty = v == null;

            if (checkTerminated(d, empty, a)) {
                return;
            }

            if (empty) {
                break;
            }

            a.onNext(v);
        }

        missed = addAndGet(-missed);
        if (missed == 0) {
            break;
        }
    }
}

直接说重点
首先开启for循环,当队列不为空的时候
v = q.poll();
拿到队列里的值(就是步骤7里offer的那个)
然后a.onNext(v)完成事件传递(往上面看,a就是downstream)

自此就算结束了
总结---
关键步骤在9,通过主线程的hanlder完成线程切换

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,911评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,014评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 142,129评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,283评论 1 264
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,159评论 4 357
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,161评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,565评论 3 382
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,251评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,531评论 1 292
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,619评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,383评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,255评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,624评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,916评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,199评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,553评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,756评论 2 335

推荐阅读更多精彩内容