RxJava源码分析之线程调度(二)

在上一篇文章当中我们把RxJava的上游线程切换的源码都大致梳理了一遍,如果还没有看的请猛戳这里,但是光有上游的线程切换是不足以让我们完成在实际项目中的应用的,绝大多数时候我们都需要在下游进行线程的切换来处理上游在其他线程中得到的结果。所以现在我们就来分析一下RxJava源码中是如何实现对下游线程的切换控制管理的。

这里我们一切换到Android主线程为例:

.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())

现在就从observeOn(AndroidSchedulers.mainThread())入手,探探究竟。
首先我们来看一下RxJava是如何得到一个Android主线程的Scheduler的即HandlerScheduler。我们点进源码看一下:

/** Android-specific Schedulers. */
public final class AndroidSchedulers {

    private static final class MainHolder {
        //创建一个Handle拿到主线程的Looper 创建默认的 HandlerScheduler
        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    //该Callable默认返回的就是上面的HandleScheduler
                    return MainHolder.DEFAULT;
                }
            });

    /** A {@link Scheduler} which executes actions on the Android main thread. */
    public static Scheduler mainThread() {
        //这里就是入口 可以看到其实该方法是直接获取到了一个静态的Scheduler常量。
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }

    /** A {@link Scheduler} which executes actions on {@code looper}. */
    public static Scheduler from(Looper looper) {
        if (looper == null) throw new NullPointerException("looper == null");
        return new HandlerScheduler(new Handler(looper));
    }

    private AndroidSchedulers() {
        throw new AssertionError("No instances.");
    }
}

好了现在Scheduler有了,我们继续分析observeOn方法。

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

看到了吧,RxJava所有的代码基本都是一致的,桥接模式,这里看到是创建了一个ObservableObserveOn对象,当然第二个参数默认是false,表明了如果执行了onError() 将会重新发送一遍上游的事件序列,第三个参数是缓存的大小默认是128。我们点进ObservableObserveOn的构造方法看看里面都做了什么,很关键。

//可以看到套路基本都是一样的, ObservableObserveOn<T> 同样是继承于AbstractObservableWithUpstream<T, T> ,用来保存上游的原事件流。
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }
//订阅的真正发生之处
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {//肯定不是这个Scheduler啊,我们这里是HandleScheduler
            source.subscribe(observer);
        } else {
            //创建HandlerScheduler的Worker,HandlerWorker.
            Scheduler.Worker w = scheduler.createWorker();
            //上游事件和下游事件产生订阅,这里又是一个包装类ObserveOnObserver包装了下游真正的Observer。
           //我们到ObserverOnObserver里面去看看,其是一个静态内部类
          //这里是把worker,delayError,bufferSizew也都传了进去
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
  //实现了Runnable接口,继承于BasicIntQueueDisposable
    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {

        private static final long serialVersionUID = 6576896619930983584L;
        final Observer<? super T> actual;
        final Scheduler.Worker worker;
        final boolean delayError;
        final int bufferSize;

        SimpleQueue<T> queue;

        Disposable s;

        Throwable error;
        volatile boolean done;

        volatile boolean cancelled;

        int sourceMode;

        boolean outputFused;

        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }

        @Override
        public void onSubscribe(Disposable s) {
            if (DisposableHelper.validate(this.s, s)) {
                this.s = s;
                if (s instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    QueueDisposable<T> qd = (QueueDisposable<T>) s;

                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

                    if (m == QueueDisposable.SYNC) {
                        sourceMode = m;
                        queue = qd;
                        done = true;
                        actual.onSubscribe(this);
                        schedule();
                        return;
                    }
                    if (m == QueueDisposable.ASYNC) {
                        sourceMode = m;
                        queue = qd;
                        actual.onSubscribe(this);
                        return;
                    }
                }
              //事件的缓存队列 确定了缓存队列的大小
                queue = new SpscLinkedArrayQueue<T>(bufferSize);
              //执行真正的onSubscribe方法
                actual.onSubscribe(this);
            }
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
         //开始调度    
        schedule();
        }

        @Override
        public void onError(Throwable t) {
            if (done) {
                RxJavaPlugins.onError(t);
                return;
            }
            error = t;
            done = true;
             //开始调度 
            schedule();
        }

        @Override
        public void onComplete() {
            if (done) {
                return;
            }
            done = true;//已经完成
            //开始调度  
            schedule();
        }
      //取消订阅
        @Override
        public void dispose() {
            if (!cancelled) {
                cancelled = true;
                s.dispose();
                worker.dispose();
                if (getAndIncrement() == 0) {
                    queue.clear();
                }
            }
        }
      //判断是否被取消订阅
        @Override
        public boolean isDisposed() {
            return cancelled;
        }
      //执行调度的方法 
        void schedule() {
            if (getAndIncrement() == 0) {
              //传入当前ObserveOnObserver对象,其实现了Runnable接口
                worker.schedule(this);
            }
        }
      
        void drainNormal() {
            int missed = 1;
            //缓存数据的队列
            final SimpleQueue<T> q = queue;
            //实际下游的Observer
            final Observer<? super T> a = actual;

            for (;;) {
                //检测事件是否被终止,如果终止了直接跳出循环
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }

                for (;;) {
                    //标记事件是否完成
                    boolean d = done;
                    T v;

                    try {
                        //拿到队列里的第一个事件
                        v = q.poll();
                    } catch (Throwable ex) {
                        //发生异常了 做一系列的后续动作
                        //取消订阅,队列的制空,发送异常事件,取消线程调度,最后跳出循环
                        Exceptions.throwIfFatal(ex);
                        s.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    //判断事件是否为空
                    boolean empty = v == null;

                    if (checkTerminated(d, empty, a)) {
                        return;
                    }
                    //为空直接进入下一轮循环
                    //因为上游的事件处理也是需要时间的,上游的执行有可能是非常大量的数据所以可能会出现缓存队列里面暂时没有事件,所以这里需要一直进行循环去等待新的事件产生
                    if (empty) {
                        break;
                    }
                    //发送事件
                    a.onNext(v);
                }
              //下面这段代码我也不是很确定他的意思,这里我说一下我自己的理解不知道正不正确:
              //因为ObserveOnObserver是继承于BasicIntQueueDisposable ,而BasicIntQueueDisposable 又继承了AtomicInteger,一个原子操作类
            //用一个Integer整数来控制当前ObserveOnObserver对象的并发操作
            //如果当前ObserveOnObserver对象没有被其他线程独占,那么该对象就自己持有的话(代表已经执行完了当前的事件),就可以执行addAndGet(int i)方法了。
            //执行完改方法对自己的负数相加那么最终得出的是0,为0的话就可以开始下一个循环了,那么以后的每一个循环missed的值都为0都可以直接break!
            //最终要的是addAndGet()是一个阻塞式的方法,如果不成功的话,它会重新执行一遍
          //所以我分析得出这里其实是一个控制标记位“好了!现在轮到你了,开始吧”当第一次拿到权限后就可以一直执行下去了。
        
                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

        void drainFused() {
   ...........
        }
        //具体的run方法内部
        @Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                //去处理队列里面缓存的数据
                drainNormal();
            }
        }
        //检查是否终止  代码都很简单 我就不做注释了
        boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
            if (cancelled) {
                queue.clear();
                return true;
            }
            if (d) {
                Throwable e = error;
                if (delayError) {
                    if (empty) {
                        if (e != null) {
                            a.onError(e);
                        } else {
                            a.onComplete();
                        }
                        worker.dispose();
                        return true;
                    }
                } else {
                    if (e != null) {
                        queue.clear();
                        a.onError(e);
                        worker.dispose();
                        return true;
                    } else
                    if (empty) {
                        a.onComplete();
                        worker.dispose();
                        return true;
                    }
                }
            }
            return false;
        }
    }
}

同样是装饰模式,关键就是每当执行onNext(),onError(),onCompleted()方法的时候,都会开启线程的调度,上游的每一次事件,都会在指定线程中处理,这就是核心。然后就执行了具体的Worker实现类里面的schedule方法,我们一起看一下。

//HandlerWorker里面的schedule方法,其第二个参数为0L,第三个参数为TimeUnit.NANOSECONDS。
 @Override
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");

            if (disposed) {
                //判断是否取消订阅了
                return Disposables.disposed();
            }
            //满篇飞的Hook函数 +_+
            run = RxJavaPlugins.onSchedule(run);
            //封装当前持有主线程Looper的handler和ObserveOnObserver对象
            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
            //创建Message
            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.
            //给主线程发送消息
            handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

            // Re-check disposed state for removing in case we were racing a call to dispose().
            //判断是否取消订阅了
            if (disposed) {
            //如果取消订阅了 就remove掉消息处理的回调接口
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }

            return scheduled;
        }

当然了最后主线程的执行的程序是ScheduledRunnable里面的run()方法,代码如下:

 @Override
        public void run() {
            try {
              //ObserveOnObserver对象的run方法
                delegate.run();
            } catch (Throwable t) {
                //捕获异常了进行一系列处理
                IllegalStateException ie =
                    new IllegalStateException("Fatal Exception thrown on Scheduler.", t);
                RxJavaPlugins.onError(ie);
                Thread thread = Thread.currentThread();
                thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
            }
        }

这样RxJava就实现了把上游发送的每一个事件都巧妙地转换到了指定线程中处理,此处是Android主线程。
可以看到如果你在下游多次调用observeon()的话线程是会一直切换的,这也是网上一直说的结论。每一次切换线程,都会把对应的Observer对象的各项处理方法的处理执行在制定线程当中。
大概浏览完源码你会发现,RxJava的设计者真的是把面向对象的思想用到了极致,抽象接口与实体,设计模式地巧用都无处不在,感叹自己要学的真的还有太多,如果让我来写不知道还要多少年才能写出如此牛B的代码。
这也算是我第一次写源码分析的文章,还有很多地方有待提高,最开始听说别人源码分析很重要,不光要会用那些优秀的Library更要理解其中的精髓,与是我傻乎乎地闷着脑袋去看,结果真的看不懂,后来看了一本书叫做《Android源码设计模式》才恍然大悟,设计模式地巧用在各大优秀的开源Library中无处不在,只有真正地理解了设计模式,精通架构,才能写出如此优秀的代码。最后再安利一本书《设计模式之禅》这本书很有意思,作者语言幽默风趣,像看连环画一样很有意思。
哈哈 废话说了一大堆了,如果上面我的分析有误的话,欢迎指正批评,有什么不懂得地方也可以一起探讨。

最后

没有最后了,大家再见~~~

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,670评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,928评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,926评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,238评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,112评论 4 356
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,138评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,545评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,232评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,496评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,596评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,369评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,226评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,600评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,906评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,185评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,516评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,721评论 2 335

推荐阅读更多精彩内容