Rxjava-订阅和线程切换简单分析

喜欢的话,评论区留言点赞哦。

Wechat: CoolOriLans 酷奇源语

本篇文章简单分析下 Rxjava 订阅和线程切换简单:

我们先来看一个订阅的流程,这个流程是在之前的基础上,加了些调味剂:

val observable = Observable.create<Int>(object : ObservableOnSubscribe<Int> { // 返回的是 ObservableCreate 实例;
    override fun subscribe(emitter: ObservableEmitter<Int>) {
        emitter.onNext(1)
        emitter.onComplete()
    }
})
val observer = object : Observer<Int> {
    override fun onSubscribe(d: Disposable) {
        val d1 = d // 关键点 4: 这个 d 是 ObservableSubscribeOn.SubscribeOnObserver 对象。
        Log.d("rxjava", "Disposable: " +  d1.javaClass.canonicalName);
        // rxjava  : Disposable: io.reactivex.internal.operators.observable.ObservableSubscribeOn.SubscribeOnObserver
    }
    override fun onNext(t: Int) {}
    override fun onError(e: Throwable) {}
    override fun onComplete() {}
}
observable // 关键点 1: 是 ObservableCreate 实例;
.observeOn(Schedulers.io()) // 关键点 2: 切换线程1,返回的是 ObservableObserveOn 实例;
.subscribeOn(Schedulers.single()) // 关键点 3: 切换线程2,返回的是 ObservableSubscribeOn 实例;
.subscribe(observer)

本片文章也就是要弄懂上面的这个 4 个关键点:

注意:

1、我们是先调用 observeOn 再调用 subscribeOn,下面的分析也是按照这个顺序,实际上二者的顺序可以不一样。

这里先简单的总结下(其实网上的很多博客都有讲到,在这里我也总结了下)

subscribeOn 和observeOn 都是用来切换线程用的,但是区别如下:

  • subscribeOn:决定 Observable 对象在哪个线程上执行,其能改变调用它之前代码的线程;
  • observeOn:决定 Subscriber(Observer) 对象在哪个线程上执行,其能改变调用它之后代码的线程;

那么具体是怎么实现的,我们往下慢慢的跟踪代码了;

1 Schedulers - 简单分析

Schedulers 是 Rxjava 的调度器,用于为流提供线程环境,这里先简单的看下,后面会分析:

public final class Schedulers {
    @NonNull
    static final Scheduler SINGLE; // 单线程模式

    @NonNull
    static final Scheduler COMPUTATION;

    @NonNull
    static final Scheduler IO;

    @NonNull
    static final Scheduler TRAMPOLINE;

    @NonNull
    static final Scheduler NEW_THREAD;

    ... ... ... ...
}

Schedulers 默认提供了如上的线程实现。

1.1 Schedulers.io

这里先简单的看下 IO 的实现吧,这样方便我们对线程的切换进行分析:

    public static Scheduler io() {
        // RxJavaPlugins.onIoScheduler 是 hook 用的,这里不关注
        return RxJavaPlugins.onIoScheduler(IO);
    }

实际上默认返回的就是上面的 Schedulers.IO:

public final class Schedulers {

    static {
        SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());

        COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());

        // 这里会创建 IO,RxJavaPlugins.initIoScheduler 依然是 Hook 专用的
        // 默认返回 IOTask
        IO = RxJavaPlugins.initIoScheduler(new IOTask());

        TRAMPOLINE = TrampolineScheduler.instance();

        NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
    }
}

我们去看看 IOTask:

1.2 IOTask

    static final class IOTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            return IoHolder.DEFAULT;
        }
    }

1.3 IoHolder

    static final class IoHolder {
        // 最终实现为 IoScheduler;;
        static final Scheduler DEFAULT = new IoScheduler();
    }

实际上:Schedulers 只是一个包装而已,真正的线程切换是基于平台提供的线程工具的:线程池;

这个我们后面再分析哦。

2 Observable

2.1 observeOn

observeOn 用于指定被观察者逻辑所在的 Scheduler:

Observable.create 返回的是一个 ObservableCreate 实例:

    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }

    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError) {
        return observeOn(scheduler, delayError, bufferSize());
    }

    // 核心方法 3
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        // RxJavaPlugins.onAssembly 用于 Hook 操作,这里不关注
        //【-->3】默认返回 ObservableObserveOn 实例:
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

observeOn 有三个重载函数。

最终调用的都是核心方法 3

可以看到,默认其返回了一个 ObservableObserveOn 对象。将 ObservableCreate 包装到了内部,如下;

ObservableObserveOn [ // --> 3
    ObservableCreate [ 
        ObservableOnSubscribe [  
            emitter: ObservableEmitter<Int>
        ]  
    ]  
]

其实脉络已经很清楚了;

2.2 subscribeOn

observeOn 用于指定观察者逻辑所在的 Scheduler:

    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        //【-->4.x】进入;
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

可以看到,默认其返回了一个 ObservableSubscribeOn 对象。将 ObservableCreate 包装到了内部!

如下:

ObservableSubscribeOn [ // --> 4
    ObservableObserveOn [ // --> 3
        ObservableCreate [ 
            ObservableOnSubscribe [  
                emitter: ObservableEmitter<Int>
            ]  
        ]  
    ]
]

其实脉络已经很清楚了;

3 ObservableObserveOn

3.1 constructor

ObservableObserveOn 的父类 AbstractObservableWithUpstream 继承了 Observable,所以 ObservableObserveOn 具有 Observable 的能力:

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> { // 如下:
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    
    // 参数 Scheduler 是调度器;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source); // 是 ObservableCreate 实例;
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }
    

按照例子的顺序,这里的 ObservableSource<T> source 是 ObservableCreate 实例!

abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
    protected final ObservableSource<T> source;
    ... ... ...
}

public interface HasUpstreamObservableSource<T> {
    ObservableSource<T> source();
}

3.2 subscribeActual

我们回忆下 subscribe 的流程,subscribe 实际上最后会调用 subscribeActual

但是此时执行完 observeOn 后,返回的是 ObservableObserveOn,其将 ObservableCreate 封装在了内部,所以会先执行 ObservableObserveOn.subscribeActual

    @Override
    protected void subscribeActual(Observer<? super T> observer) { //【-->4.3】是 SubscribeOnObserver 实例
        // 核心 1: 如果 scheduler 是 TrampolineScheduler 类型的,那么会直接执行订阅
        // 对观察者 observer 不做任何的包装;
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            // 核心 2: 如果是其他类型的,那么这里会通过 scheduler 创建对应的 Worker
            Scheduler.Worker w = scheduler.createWorker();
            // 核心 3: 然后将观察者 observer 封装成一个 【-->3.3】ObserveOnObserver,然后在执行订阅;
            // source 是【-->6.1】 ObservableCreate 实例;
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

这样,相当于发射器通过 ObserveOnObserver 委托代理执行 Observer 的方法了;

source 是 ObservableCreate 实例,这里最终会调用 ObservableCreate.subcribeActual。

只不过参数由 Observer 变成了 ObserveOnObserver(Observer)。

    ObservableObserveOn [ // --> 3
        ObservableCreate [ 
            ObservableOnSubscribe [  
                emitter: ObservableEmitter<Int>
            ]  
        ]  
    ].subscribe -> subscribeActual {
        ObserveOnObserver [
            SubscribeOnObserver [ // 下面返回的;
                
            ]
        ]
    }

3.3 ObserveOnObserver - 切换核心

ObserveOnObserver 是 ObservableObserveOn 的内部类:

3.3.1 Constructor

   static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
   implements Observer<T>, Runnable {

        private static final long serialVersionUID = 6576896619930983584L;
        final Observer<? super T> downstream; //【1】下游的观察者;
        final Scheduler.Worker worker; //【2】线程调度;
        final boolean delayError;
        final int bufferSize;

        SimpleQueue<T> queue;

        Disposable upstream; // 表示上游的数据源,

        Throwable error;
        volatile boolean done;

        volatile boolean disposed;

        int sourceMode;

        boolean outputFused;

        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.downstream = actual; //【3】表示下游的数据接收方 Observer 实例,本例中是 SubscribeOnObserver 实例;
            this.worker = worker; //【4】线程调度;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
       ... ... ...
   }

由于 ObserveOnObserver 本质也是一个 Observer,他代理了 Observer 一系列方法,我们去看看

3.3.2 onSubscribe

在订阅发生之前会先执行 onSubscribe 方法:

参数 Disposable d 就是 CreateEmitter 发射器!

        @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.validate(this.upstream, d)) {
                // 核心 1: CreateEmitter 作为发射器,用于发射数据;
                this.upstream = d;
                if (d instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    QueueDisposable<T> qd = (QueueDisposable<T>) d;

                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

                    if (m == QueueDisposable.SYNC) {
                        sourceMode = m;
                        queue = qd;
                        done = true;
                        downstream.onSubscribe(this);
                        schedule();
                        return;
                    }
                    if (m == QueueDisposable.ASYNC) {
                        sourceMode = m;
                        queue = qd;
                        downstream.onSubscribe(this);
                        return;
                    }
                }

                queue = new SpscLinkedArrayQueue<T>(bufferSize);
                // 核心 2: 调用下游的 Observer 的 onSubscribe 方法,将自身传递下去!
                // downstream 是 SubscribeOnObserver
                downstream.onSubscribe(this);
            }
        }

3.3.3 onNext

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            if (sourceMode != QueueDisposable.ASYNC) {
                // 把执行的结果保存到内部队列里;
                queue.offer(t);
            }
            schedule();
        }

3.3.4 onError

        @Override
        public void onError(Throwable t) {
            if (done) {
                RxJavaPlugins.onError(t);
                return;
            }
            // 保存错误到 error.
            error = t;
            done = true;
            schedule();
        }

3.3.5 onComplete

        @Override
        public void onComplete() {
            if (done) {
                return;
            }
            done = true;
            schedule();
        }

4 ObservableSubscribeOn

4.1 constructor

同样的,ObservableSubscribeOn 也继承了 AbstractObservableWithUpstream 类,也具有 Observable 的能力:

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { // 如下:
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source); // 父类的变量;
        this.scheduler = scheduler;
    }

注意:按照例子的顺序,这里的 ObservableSource<T> source 是 ObservableObserveOn 实例!(这里不一定是这样,因为调用顺序可以切换)

abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {

    /** The source consumable Observable. */
    protected final ObservableSource<T> source;
    
    ... ... ...
}

4.2 subscribeActual - 切换核心

    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        //【-->4.3】将 observer 包装成 SubscribeOnObserver 实例;;
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
        // 核心:这里立刻回调,这个回调会触发器上面的关键点 4:
        observer.onSubscribe(parent);
        //【-->】创建了 SubscribeTask 实例,将 SubscribeOnObserver 保存到内部;;
        // SubscribeTask 是一个 Runnable 实例;;
        //【-->4.3.5】scheduler.scheduleDirect 会返回一个 Disposable 对象。
        // SubscribeOnObserver 会持有该对象的原子引用;;
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

4.2.1 SubscribeTask - 切换核心

可以看到 SubscribeTask 实际上就是一个 Runnable 实例;

这里其实已经能够看到线程的切换的实现:将 Observer 封装到 SubscribeTask ( Runnable ) 中,通过 handler / ThreadPool 执行:

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent; //【-->4.3】SubscribeOnObserver 实例

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            // 这里的 source 来自 ObservableSubscribeOn
            // 就是 ObservableObserveOn 实例了;
            //【-->3.2】这样就进入了 ObservableObserveOn 的 subscribeActual 方法; 
            source.subscribe(parent);
        }
    }

4.3 SubscribeOnObserver

是对 Observer 的封装,继承了 AtomicReference<Disposable> 类,内部持有 Disposable 对象的原子引用:

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> downstream; // 下游的观察者;

        final AtomicReference<Disposable> upstream;

        SubscribeOnObserver(Observer<? super T> downstream) {
            this.downstream = downstream; // 这个就是我们的 obsever
            this.upstream = new AtomicReference<Disposable>();
        }
    }

可以的得到如下的关系图了:

ObservableSubscribeOn [ // --> 4
    ObservableObserveOn [ // --> 3
        ObservableCreate [ 
            ObservableOnSubscribe [  
                emitter: ObservableEmitter<Int>
            ]  
        ]  
    ]
].subscribe -> subscribeActual {
    SubscribeOnObserver [
        Observer [
            ...
        ]
    ]
}

4.3.1 onSubscribe

保存上游的 Disposable 实例:

        @Override
        public void onSubscribe(Disposable d) {
            // 核心 1,这里的 d 是 ObserveOnObserver 实例【-->4.3.1】;
            DisposableHelper.setOnce(this.upstream, d);
        }

4.3.2 onNext

发送数据:

        @Override
        public void onNext(T t) {
            downstream.onNext(t);
        }

4.3.3 onError

发送异常:

        @Override
        public void onError(Throwable t) {
            downstream.onError(t);
        }

4.3.4 onComplete

完成回调:

        @Override
        public void onComplete() {
            downstream.onComplete();
        }

4.3.5 setDisposable - 核心

设置 Disposable 对象的弱引用,这个 Disposable 对象是由 Scheduler 对 new SubscribeTask(SubscribeOnObserver)) 封装后返回的:

        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }

4.3.5.1 DisposableHelper.setOnce

DisposableHelper 枚举类封装了对 Disposable 的操作:

    public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
        ObjectHelper.requireNonNull(d, "d is null");
        // 通过 CAS 设置内部的 field 变量为 d;
        if (!field.compareAndSet(null, d)) {
            // 如果失败的话,那么说明内部已经有 Disposable 对象了
            // 那这里会执行 d 的取下操作;
            d.dispose();
            if (field.get() != DISPOSED) {
                reportDisposableSet();
            }
            return false;
        }
        return true;
    }

不多说了。

4.3.6 dispose

取消订阅:

@Override
public void dispose() {
    // 清空对上游的弱引用;
    DisposableHelper.dispose(upstream);
    // 清空自身对下游的弱引用:SubscribeTask【-->4.2.1】
    DisposableHelper.dispose(this);
}

4.3.6 isDisposed

是否取消订阅:

@Override
public boolean isDisposed() {
    return DisposableHelper.isDisposed(get());
}

5 subscribe 再分析

接下来,我们来看看,在执行了 observeOn 和 subscribeOn 后的执行逻辑:

回顾一下:

observable // 关键点 1: 是 ObservableCreate 实例
.observeOn(Schedulers.io()) // 关键点 2: 被观察者所在的线程,返回的是 ObservableObserveOn 实例;
.subscribeOn(Schedulers.single()) // 关键点 3: 观察者所在的线程,返回的是 ObservableSubscribeOn 实例;
.subscribe(observer)

实际上经过了 observeOn/subscribeOn 方法的执行,整个逻辑变的很复杂了;

5.1 subscribe 回顾

subscribe 方法会调用 subscribeActual 方法:

    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        try {
            // --> 按照现在的逻辑,进入 4.2 了;
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            ... ... ...
            throw npe;
        }
    }

按照上面的包装顺序,会先调用 ObservableSubscribeOn 的 subscribeActual 方法了;

6 ObservableCreate - 回顾

6.1 subscribeActual - 回顾

回顾下 subscribeActual 方法:

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        // 核心 1: 创建的 CreateEmitter 实例,包装下 Observer; 
        // 此时的 observer
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        // 核心 2: 这里的 observer 就是【-->3.3】ObserveOnObserver 实例;
        observer.onSubscribe(parent);

        try {
            // 核心 1: 这里的 source 是 ObservableOnSubscribe 实例;
            // 见【-->6.2】ObservableOnSubscribe
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

6.2 ObservableOnSubscribe

回顾下 ObservableOnSubscribe

public interface ObservableOnSubscribe<T> {
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}

在这个方法中,我们使用:

emitter.onNext(1)
emitter.onComplete()

6.2.1 onNext

发射数据:

@Override
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    if (!isDisposed()) {
        // 这里的 observer 实际上是【-->3.3】ObserveOnObserver 实例;
        observer.onNext(t);
    }
}

7 总结

总结一下哈

7.1 完整订阅的流程

我们回顾下整个流程,下面的这个流程包含了线程切换的流程,以及调度的走势:

(好累,后序补上哦)

7.2 线程切换的关键

被观察者的线程切换的 Mr.Key : ObservableObserveOn 和 ObserveOnObserver

观察者的线程切换的 Mr.Key : ObservableSubscribeOn 和 SubscribeOnObserver

7.2.1 ObservableObserveOn 的关键点

  • ObservableObserveOn 内部会讲 Scheduler 保存下来
    // 参数 Scheduler 是调度器;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source); // 是 ObservableCreate 实例;
        this.scheduler = scheduler;
  • subscribeActual 方法,这里会实现订阅,用 ObserveOnObserver 封装 observer,同时传入 Scheduler.Worker 作为调度器:
    @Override
    protected void subscribeActual(Observer<? super T> observer) { //【-->4.3】是 SubscribeOnObserver 实例
        // 核心 1: 如果 scheduler 是 TrampolineScheduler 类型的,那么会直接执行订阅
        // 对观察者 observer 不做任何的包装;
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            // 核心 2: 如果是其他类型的,那么这里会通过 scheduler 创建对应的 Worker
            Scheduler.Worker w = scheduler.createWorker();
            // 核心 3: 然后将观察者 observer 封装成一个 【-->3.3】ObserveOnObserver,然后在执行订阅;
            // source 是【-->6.1】 ObservableCreate 实例;
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
  • ObserveOnObserver 中的 onNext 通过 schedule() 通过线程池实现在指定线程中执行数据分发:
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            // 执行线程池调度;
            schedule();
        }

        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this); // 将自身加入到 worker 中;
            }
        }

刚刚我们说了 ObserveOn 能改变调用后的逻辑所在的线程,这里其实已经能看到原因了。

按照逻辑,发射器最终会调用 ObserveOnObserver 的 onNext/onComplete 方法,而这个方法会讲 ObserveOnObserver 自身(就是一个 Runnable)放入 worker 中执行调度,后续的操作就会在 worker 所在的线程了(这是一个上游 -> 下游的过程);

到这里,就能够推断,ObserveOn 是可以调多次的,每次都会将下游的回调逻辑通过装饰器的方式切换到对应的线程;

7.2.2 ObservableSubscribeOn 的关键点

  • ObservableSubscribeOn 内部会讲 Scheduler 保存下来
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { // 如下:
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source); // 父类的变量;
        this.scheduler = scheduler;
    }
  • subscribeActual 方法,这里会实现订阅,用 ObserveOnObserver 封装 observer,同时传入 Scheduler.Worker 作为调度器:
    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        //【-->4.3】将 observer 包装成 SubscribeOnObserver 实例;;
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
        // 这里立刻回调;
        observer.onSubscribe(parent);
        //【-->】创建了 SubscribeTask 实例,将 SubscribeOnObserver 保存到内部;;
        // SubscribeTask 是一个 Runnable 实例;;
        //【-->4.3.5】scheduler.scheduleDirect 会返回一个 Disposable 对象。
        // SubscribeOnObserver 会持有该对象的原子引用;;
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
  • SubscribeTask 本质上就是一个 runnable:
    final class SubscribeTask implements Runnable {
        //【-->4.3】SubscribeOnObserver 实例
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            // 这里的 source 来自 ObservableSubscribeOn
            // 就是 ObservableObserveOn 实例了;
            //【-->3.2】这样就进入了 ObservableObserveOn 的 subscribeActual 方法; 
            source.subscribe(parent);
        }
    }

刚刚我们说了 subscribeOn 能改变调用前的逻辑所在的线程,这里其实已经能看到原因了。

按照逻辑,这里会将 source.subscribe(parent) 放到 scheduler 指定的线程中执行,而 subscribe 是一个从下游->上游的过程。

source.subscribe(parent) 会按照订阅流程,向上不断的执行 source[x].subscribe(parent) -> source[x].subscribeActual(parent)

一直到最终的 ObservableOnSubscribe ,而这个过程是在 scheduler 指定的线程中执行。

同时,当到 ObservableOnSubscribe 的时候,会通过发射器发送数据,这样又会变成上游 -> 下游,而发生器也是在 ObservableOnSubscribe.subscribe 方法中触发,所以此时依然是在 scheduler 指定的线程中执行。

最终,执行到最近的 ObserveOn 处,这里才会切换线程,这样下游的调度线程都会在 ObserveOn 方法进行切换了;

疑问

上面是先调用 observeOn 再调用 subscribeOn,实际上二者的顺序可以不一样。如果我们反过来呢?

如果反过来的话只是装饰的对象顺序发生了变化,对于 subscribeOn 和 observeOn 的效果依然是没有变化的。

这里不多说了,溜了溜了。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,126评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,254评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,445评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,185评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,178评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,970评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,276评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,927评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,400评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,883评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,997评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,646评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,213评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,204评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,423评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,423评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,722评论 2 345

推荐阅读更多精彩内容