Rxjava源码解析

先上代码:

ObservableOnSubscribe<Integer> oos = new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
         final int max = 100;
         for (int i = 1; i <= max; i++) {
             e.onNext(max);
         }
         e.onComplete();
     }
 };
 Observer<Integer> observer = new Observer<Integer>() {
     @Override
     public void onSubscribe(Disposable d) {
     }
     @Override
     public void onNext(Integer integer) {
     }
     @Override
     public void onError(Throwable e) {
     }
     @Override
     public void onComplete() {
     }
 };

 Observable.create(oos)
           .observeOn(AndroidSchedulers.mainThread())
           .subscribeOn(Schedulers.computation())
           .subscribe(observer);

上面是Rxjava最简单的实现模型。
从链式调用的返回值来看:

  Observable.create()------》ObservableCreate extends Observable
  ObservableCreate.observerOn()------->ObservableObserveOn extends  AbstractObservableWithUpstream  extends  Observable
  ObservableObserveOn.subscribeOn()------->ObservableSubscribeOn extends  AbstractObservableWithUpstream  extends  Observable

所以最后的调用对象是

  ObservableSubscribeOn.subscribe(observer)

从上面的返回值可以看出中间任一一个的返回值返回的都是observable的子对象。

为什么要强调中间几个的返回值都是observable的返回值,这里要先明确一下,待会会大量用到subscribe()方法,在Observable(子类)中的subscribe()方法:

public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        //钩子,如果未设置的话,返回值还是observer
        observer = RxJavaPlugins.onSubscribe(this, observer);
        //空检验
        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
        //核心
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        ...... //handle exception 
    }
}

因此下面的分析代码中,如果是调用上面4个对象的subscribe()方法的时候,直接看subscribeActual()方法即可。

那就从最后一层 ObservableSubscribeOn 的 subscribeActual() 方法开始分析。

@Override
public void subscribeActual(final Observer<? super T> s) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

    s.onSubscribe(parent);

    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

这里的这个s就是我们缩写的observer
第一行:先把我们的observer封装成了SubscribeOnObserver
第二行:调用了observer.onSubscribe()方法
              也就是observer订阅Observable时候的方法,一般这个时候可以做一些操作
第三行:
parent.setDisposable() 以及scheduler调度器先不论,待会再分析,这里先看SubscribeTask这个类:

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }

    @Override
    public void run() {
        source.subscribe(parent);
    }
}

看到runnable,熟悉线程的同学已经可以猜到source.subscribe(parent) 这句代码很可能在子线程中执行,这里先mark一下,待会回到这个地方再具体看。

这里要先插入一下source和Observer的问题:

public ObservableCreate(ObservableOnSubscribe<T> source) {
    this.source = source;
}

public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
    super(source);
    this.scheduler = scheduler;
}

public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
    super(source);
    this.scheduler = scheduler;
    this.delayError = delayError;
    this.bufferSize = bufferSize;
}

//后面两个类继承抽象类,调用super(source)方法。
AbstractObservableWithUpstream(ObservableSource<T> source) {
    this.source = source;
}

通过上面的代码可以看到所有的这三个关键类,source都是通过构造传入进来的,
而后两个类都还有schedule参数,这个涉及线程调度,待会也会说,也mark一下。
通过以上代码可以分析:

ObservableCreate 的 source 是 oos
ObservableObserveOn 的 source 是 ObservableCreate
ObservableSubscribeOn 的 source 是 ObservableObserveOn

至于Observer,通过代码可以分析:

 (ObservableObserveOn)source
    .subscribe(parent(SubscribeOnObserver));

 (ObservableCreate)source
    .subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));


 CreateEmitter<T> parent = new CreateEmitter<T>(observer);
 (oos)source.subscribe(parent);

也即是:

ObservableObserveOn 的 observer 是 SubscribeOnObserver
ObservableCreate 的 observer 是 ObserveOnObserver
oos 的 observer 是 CreateEmitter

这里有点一级一级调用的意味了,而这个意味就是Rxjava的一个很重要的点。

插入结束,继续回到刚才的 SubscribeTask
结合上面的分析:

 source.subscribe(parent)

也就意味着

ObservableObserveOn.subscribeActual()

这里转了两个弯,各位可以稍微思考一下
而在ObservableObserveOn中:

@Override
protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        //这个暂时忽略,未设置的时候不走这里
        source.subscribe(observer);
    } else {
        //线程调度,待会再分析
        Scheduler.Worker w = scheduler.createWorker();
        //最终会调用这个,又是很熟悉的source subscribe()方法
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

继续往上走,走到 ObservableCreate 中,这里省略了重复流程。

@Override
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);
    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

走到最上层,这个最上层的source就是我们前面写的oos。
这个observer是在 ObservableObserverOn 中的 ObserveOnObserver。这个名字有点像,汗

第一行:先把 ObserveOnObserver 封装成 CreateEmitter
第二行:调用 ObserveOnObserver.onSubscribe()方法。

@Override
public void onSubscribe(Disposable s) {
    if (DisposableHelper.validate(this.s, s)) {
        this.s = s;
        if (s instanceof QueueDisposable) {
            @SuppressWarnings("unchecked")
            QueueDisposable<T> qd = (QueueDisposable<T>) s;

            int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

            if (m == QueueDisposable.SYNC) {
                sourceMode = m;
                queue = qd;
                done = true;
                actual.onSubscribe(this);
                schedule();
                return;
            }
            if (m == QueueDisposable.ASYNC) {
                sourceMode = m;
                queue = qd;
                actual.onSubscribe(this);
                return;
            }
        }

        queue = new SpscLinkedArrayQueue<T>(bufferSize);

        actual.onSubscribe(this);
    }
}

这个方法比较长,但是对我们的流程分析关键的代码其实就一句

actual.onSubscribe(this);

根据前面的observer的分析,这个observer其实就是 ObservableSubscribeOn 的 SubscribeOnObserver
最后找到源码,调用了 SubscribeOnObserver 的 onSubscribe()方法。

    @Override
    public void onSubscribe(Disposable s) {
        DisposableHelper.setOnce(this.s, s);
    }

    public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
        ObjectHelper.requireNonNull(d, "d is null");
        if (!field.compareAndSet(null, d)) {
            d.dispose();
            if (field.get() != DISPOSED) {
                reportDisposableSet();
            }
            return false;
        }
        return true;
    }

涉及到CAS的操作,感兴趣的同学可以研究一下,这里对我们的流程没有太大影响。

第三行:至此,整个的流程终于回到了我们的oos。

从ObservableSubscribeOn的subscribe()方法历尽千辛万苦终于调用了oos的subscribe()方法。

    ObservableOnSubscribe<Integer> oos = new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            final int max = 100;
            for (int i = 1; i <= max; i++) {
                e.onNext(max);
            }
            e.onComplete();
        }
    };

首先创建了 ObservableEmitter ,然后调用emmiter.onNext()方法。

    @Override
    public void onNext(T t) {
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }

异常先不考虑,最终是调用 observer.onNext()方法。
根据上面的分析,这个Observer是ObserveOnObserver
第一行:先把 ObserveOnObserver 封装成 CreateEmitter,而CreateEmmiter的构造:

    CreateEmitter(Observer<? super T> observer) {
        this.observer = observer;
    }

所以,可以知道这个observer就是最前面的 ObserveOnObserver

也就是e.Next(n)------>最终会调用ObserveOnObserver.onNext(n)

    @Override
    public void onNext(T t) {
        if (done) {
            return;
        }
        if (sourceMode != QueueDisposable.ASYNC) {
            queue.offer(t);
        }
        schedule();
    }

最终调用了 schedule() 方法。

    void schedule() {
        if (getAndIncrement() == 0) {
            worker.schedule(this);
        }
    }

将当前对象添加到worker中,这个是线程调度的问题了,待会分析。

再看一下ObserveOnObserver 类的声明:

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable{}

实现了Runnable接口,所以关键代码就在run()方法之中。

   @Override
    public void run() {
        if (outputFused) {
            drainFused();
        } else {
            drainNormal();
        }
    }

    //我们看drainNormal()的方法
    void drainNormal() {
        int missed = 1;

        final SimpleQueue<T> q = queue;
        final Observer<? super T> a = actual;

        for (;;) {
            if (checkTerminated(done, q.isEmpty(), a)) {
                return;
            }

            for (;;) {
                boolean d = done;
                T v;

                try {
                    v = q.poll();
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    s.dispose();
                    q.clear();
                    a.onError(ex);
                    worker.dispose();
                    return;
                }
                boolean empty = v == null;

                if (checkTerminated(d, empty, a)) {
                    return;
                }

                if (empty) {
                    break;
                }

                a.onNext(v);
            }

            missed = addAndGet(-missed);
            if (missed == 0) {
                break;
            }
        }
    }

终于看到了a.onNext()方法,也就是actual.onNext()方法。
通过 ObserveOnObserver 的构造:

//构造方法
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
    this.actual = actual;
    this.worker = worker;
    this.delayError = delayError;
    this.bufferSize = bufferSize;
}

//创建对象的时候
@Override
protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();
        //这里new了ObserveOnObserver对象。
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

可以看出,这个actual对象,其实就是传入进来的observer。
而这个observer结合SubscribeTask代码,可以知道:
这个observer其实就是讲我们的observer封装起来的SubscribeOnObserver对象。

而SubscribeOnObserver的onNext()方法:

    @Override
    public void onNext(T t) {
        actual.onNext(t);
    }

其实就是我们的o.next()方法。

七转八弯,经历这个这么多,也是本文最核心的:

subscribe()方法,先一层一层往上回调,调用了我们的oos的onNext()方法,
而onNext()里面又一层一层往下回调,调用了我们的obsrever的onNext()方法,实现了数据的传递。

然后是线程切换问题:

还记得我们之前说ObservableSubscribeOn, ObservableObserveOn这两个对象的构造都会传入一个 schedule 的调度器吗?

先看 ObservableSubscribeOn

@Override
public void subscribeActual(final Observer<? super T> s) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    s.onSubscribe(parent);
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

再结合前面的代码,我们知道这个 scheduler 是Schedulers.computation()

最后走到了:

public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
    ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run));
    try {
        Future<?> f;
        if (delayTime <= 0L) {
            f = executor.submit(task);
        } else {
            f = executor.schedule(task, delayTime, unit);
        }
        task.setFuture(f);
        return task;
    } catch (RejectedExecutionException ex) {
        RxJavaPlugins.onError(ex);
        return EmptyDisposable.INSTANCE;
    }
}

executor,我们非常熟悉的线程池。看到这,也就大概明白了我们的 source.subscribe(parent)
以及其对应的一层层往上回调都是在subscribeOn(线程) 所调用的线程之中

然后线程什么时候会再度切换呢?
是在ObservableObserveOn中的 schedule() 方法中:

    void schedule() {
        if (getAndIncrement() == 0) {
            worker.schedule(this);
        }
    }

这个worker一层层追踪溯源,找到了其初始化的地方,是在ObservableObserveOn的subscribeActual()方法之中:

protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();

        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

这个schedule就是observerOn所对应的线程。
AndroidSchedulers.mainThread() 的实现是 HandlerScheduler

    public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");

        if (disposed) {
            return Disposables.disposed();
        }

        run = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

        Message message = Message.obtain(handler, scheduled);
        message.obj = this; // Used as token for batch disposal of this worker's runnables.

        if (async) {
            message.setAsynchronous(true);
        }

        handler.sendMessageDelayed(message, unit.toMillis(delay));

        // Re-check disposed state for removing in case we were racing a call to dispose().
        if (disposed) {
            handler.removeCallbacks(scheduled);
            return Disposables.disposed();
        }

        return scheduled;
    }

最终通过hanler进行了线程的切换。
也就是最后我们的observer.onNext()方法执行的线程是由observeOn()所对应的线程

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,607评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,047评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,496评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,405评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,400评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,479评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,883评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,535评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,743评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,544评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,612评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,309评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,881评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,891评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,136评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,783评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,316评论 2 342

推荐阅读更多精彩内容