RxJava2工作原理及线程切换


序言

RxJava是现在最流行的响应式函数编程框架,之前的项目中一直使用RxJava,结合Retrofit+OkHttp搭建网络请求框架,很是好用。
后来RxJava2出来了,官网表明一段时间之后不再维护RxJava,所以在新项目中,决定使用RxJava2。
对于新手来说,即使没用过RxJava,也可以直接学习RxJava2。而对于从RxJava过渡到RxJava2的同学,自然更容易上手。

响应式编程(Reactive Programming)这个词很多人都知道,但具体是什么含义可能没多少人能解释清楚。我简单说一下自己的理解:响应式编程可基于任何事物(数据、用户行为、时间、对象)创建事件流,并且框架提供一个强大的函数库来操作事件流,包括合并、过滤、转换、切换线程、监听...,流是响应式的核心

RxJava就是这样一个响应式编程框架,今天我们主要来介绍RxJava2的事件处理流程和线程切换原理。本文并不是一篇新手指引教程,而是一篇进阶教程。如果想入门RxJava2,可以看一下这篇文章

用户登录场景

下面以用户登录场景为例,介绍怎样通过RxJava2进行事件流处理,登录代码如下:

/**
 * 用户登录操作
 */
public void login(final String userName, final String password) {
    Observable.create(new ObservableOnSubscribe<CommonApiBean<UserInfo>>() {
                @Override
                public void subscribe(ObservableEmitter<CommonApiBean<UserInfo>> e) throws Exception {
                    e.onNext(loginApi(userName, password));
                }
            })
            .map(new Function<CommonApiBean<UserInfo>, UserInfo>() {
                @Override
                public UserInfo apply(CommonApiBean<UserInfo> bean) throws Exception {
                    if (bean != null && bean.body != null) {
                        return bean.body;
                    }
                    return new UserInfo();
                }
            })
            .doOnNext(new Consumer<UserInfo>() {
                @Override
                public void accept(UserInfo userInfo) throws Exception {
                    saveUserInfo(userInfo);
                }
            })
            .subscribeOn(Schedulers.io()) 
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<UserInfo>() {
                @Override
                public void accept(UserInfo userInfo) throws Exception {
                    //登录成功,跳转页面
                    loginSuccess(userInfo);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    //登录失败提示用户
                    loginFailed();
                }
            });
}

上面一段代码是RxJava2常规的使用方式,能够满足多数网络请求场景,下面我们就针对这段代码进行分析。

分析事件流

RxJava2处理事件流分为3个步骤:

  1. 构建操作符对应的Observable
  2. 逐级生成Observer,逆向订阅Observable
  3. 逐级调用Observer的onNext方法

下面我们就分别来介绍这三个流程。

1. 构建操作符对应的Observable

上面一段代码中,使用的操作符包括create、map、doOnNext、subscribeOn、observeOn,我们依次看这些操作符做了什么事情。

Observable
要了解RxJava2原理,必须先了解Observable,Observable是RxJava2事件流的入口类,也可以叫做事件源。

public abstract class Observalbe<T> implements ObservableSource<T> {
    //交由子类实现的抽象方法
    @Override  
    protected abstract void subscribeActual(Observer observer) ;

    //实现了ObservableSource的方法
    @Override  
    public final void subscribe(Observer<? super T> observer) {
        //省略一堆判空等处理 
        subscribeActual(observer);
    }

    //省略一堆静态操作符方法
}

从Observable的定义可知,它实现了ObservableSource接口,并定义了一个subscribeActual抽象方法,调用Observable的subscribe方法实际上是做了一些基础判断后,调用subscribeActual方法。Observable的每个子类需要需要实现自己的subscribeActual方法

create

跟踪到Observable的create方法:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

第一句代码对source进行判空,如果为空,会抛出异常。接着生成一个ObservableCreate对象,把这个对象传入RxJavaPlugin进行组装。
RxJavaPlugin提供了一系列的Hook Function,通过这种函数对RxJava的标准进行加工,如果我们不配置这些方法,默认直接返回原对象,即ObservableCreate
注:下面介绍其他操作符,就不再解释判空操作和RxJavaPlugin。

接着,我们看一下ObservableCretae的定义:

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    //省略其他方法
}

很简单,ObservableCreate继承Observable,并且在构造方法中保存了传入的ObservableOnSubscribe对象。
总结:create()构建了一个ObservableCreate对象,该对象继承Observable

map
同样,跟踪到Observable的map方法中:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

这里创建并返回一个ObservalbeMap对象:

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }
    //省略其他方法
}

ObservableMap继承AbstractObservableWithUpstream类:

/**
 * Base class for operators with a source consumable.
 */
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {

    protected final ObservableSource<T> source;
    
    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }

    @Override
    public final ObservableSource<T> source() {
        return source;
    }
}

AbstractObservableWithUpstream是所有接收上一级输入操作符的基类。
总结:map()构建了一个ObservableMap对象

doOnNext
根据上面两个操作符的源码,我们猜测这里也会返回一个Observable子类对象,进入源码验证一下:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> doOnNext(Consumer<? super T> onNext) {
    return doOnEach(onNext, Functions.emptyConsumer(), Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
private Observable<T> doOnEach(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Action onAfterTerminate) {
    //省略判空操作
    return RxJavaPlugins.onAssembly(new ObservableDoOnEach<T>(this, onNext, onError, onComplete, onAfterTerminate));
}

果不其然,doOnNext最后返回了一个ObservableDoOnEach对象:

public final class ObservableDoOnEach<T> extends AbstractObservableWithUpstream<T, T> {
    final Consumer<? super T> onNext;
    final Consumer<? super Throwable> onError;
    final Action onComplete;
    final Action onAfterTerminate;

    public ObservableDoOnEach(ObservableSource<T> source, Consumer<? super T> onNext,
                              Consumer<? super Throwable> onError,
                              Action onComplete,
                              Action onAfterTerminate) {
        super(source);
        this.onNext = onNext;
        this.onError = onError;
        this.onComplete = onComplete;
        this.onAfterTerminate = onAfterTerminate;
    }
    //省略其他代码
}

doOnNext也需要接收上流传来的Observable作为source,所以也继承了AbstractObservableWithUpstream。
总结:doOnNext()构建了一个ObservableDoOnEach对象

subscribeOn
用过RxJava的同学都知道,subscribeOn是用来切换线程的,用于指定被观察者执行的线程。
不着急,怎样切换线程我们后面会分析,先看一下它的源码:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

这里返回了一个ObservableSubscribeOn对象,继续跟踪:

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }
    //省略其他代码
}

ObservableSubscribeOn中保存了事件源source和线程调度器scheduler,而这个scheduler是我们传入的Schedulers.io()
总结:subscribeOn()构建了一个ObservableSubscribeOn对象

observeOn
各位同学肯定也知道,observeOn用于指定观察者执行的线程,至于怎样实现线程切换等到后面分析。
老套路,我们跟踪到Observable.observeOn方法:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    //省略判空验证操作
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

observeOn最后返回了一个ObservableObserveOn对象:

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }
    //省略其他代码
}

ObservableObserveOn对象也保存了事件源source和线程调度器scheduler,这里的scheduler是我们传入的AndroidScheduler.mainThread()
总结:observeOn构建了一个ObservableObserveOn对象

到这里,操作符对应的Observable构建完成,总结一下,按照操作符顺序,构建了ObservableCreate -> ObservableMap -> ObservableDoOnEach -> ObservableSubscribeOn -> ObservableObserveOn几个Observable对象。

2. 逐级生成Observer,逆向订阅Observable

LambdaObserver
在登录场景的最后,调用了subscribe方法,传入了两个Comsumer对象,我们看一下subscribe方法的实现:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
    return subscribe(onNext, onError, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
        Action onComplete, Consumer<? super Disposable> onSubscribe) {
    //省略判空操作
    LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

    subscribe(ls);

    return ls;
}

从代码看出,subscribe方法拿我们传入的两个Consumer构建了一个LambdaObserver对象,而两个Consumer分别对应onNext和onError,并且用LambdaObserver来订阅ObservableObserveOn。
总结:用LambdaObserver订阅ObservableObserveOn对象

ObserveOnObserver
继续分析ObservableObserveOn.subscribe方法,上面提到过,Observable的subscribe方法实际上会调用具体子类的subscribeActual方法,所以我们跟踪ObservableObserveOn的subscribeActual方法:

//ObservableObserveOn
@Override
protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();

        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

TrampolineScheduler表示是否当前线程,而我们传入的schedule是AndroidScheduler.mainThread(),并不是TrampolineScheduler对象。
所以会走else逻辑,创建一个Scheduler.Worker,并把它作为参数构建一个ObserveOnObserver对象。
用ObserveOnObserver对象订阅source(source是我们构建ObservableObserveOn对象传入的ObservableSubscribeOn对象)。
总结:用ObserveOnObserver订阅ObservableSubscribeOn对象

SubscribeOnObserver
接着,看一下ObservableSubscribeOn的订阅逻辑:

@Override
public void subscribeActual(final Observer<? super T> s) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

    s.onSubscribe(parent);

    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

首先创建一个SubscribeOnObserver对象,scheduler.scheduleDirect从名字上看,大概用来切换线程的。在指定线程中执行SubscribeTask任务:

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }

    @Override
    public void run() {
        source.subscribe(parent);
    }
}

这个任务很简单,就是用parent(SubscribeOnObserver对象)来订阅source(此处的source是ObservableDoOnEach)。
总结:SubscribeOnObserver订阅ObservableDoOnEach对象

DoOnEachObserver
老套路,我们继续看ObservableDoOnEach的subscribeActual方法:

@Override
public void subscribeActual(Observer<? super T> t) {
    source.subscribe(new DoOnEachObserver<T>(t, onNext, onError, onComplete, onAfterTerminate));
}

这个方法很干脆,直接用DoOnEachObserver订阅source(此处的source是ObservableMap对象)
总结:用DoOnEachObserver订阅ObservableMap对象

MapObserver
到这里,相信大家也可以猜到,ObservableMap的subscribeActual中,肯定也是构建一个MapObserver来订阅source,本着实事求是的精神,源码还是要看的:

@Override
public void subscribeActual(Observer<? super U> t) {
    source.subscribe(new MapObserver<T, U>(t, function));
}

果不其然_!!
总结:用MapObserver订阅ObservableCreate

最后,我们要看一下ObservableCreate中的subscribeActual方法:

 @Override
 protected void subscribeActual(Observer<? super T> observer) {
     CreateEmitter<T> parent = new CreateEmitter<T>(observer);
     observer.onSubscribe(parent);

     try {
         source.subscribe(parent);
     } catch (Throwable ex) {
         Exceptions.throwIfFatal(ex);
         parent.onError(ex);
     }
 }

这里也出现了source.subscribe(parent),parent是CreateEmitter对象,那么source是什么呢?

有哪位同学能回答一下这个问题吗?

哈哈,不卖关子了,这里的source就是我们最开始创建Observable事件流传入的ObservableOnSubscribe对象,还有印象吗,没有也没关系:

public void login(final String userName, final String password) {
    Observable.create(new ObservableOnSubscribe<CommonApiBean<UserInfo>>() {
                @Override
                public void subscribe(ObservableEmitter<CommonApiBean<UserInfo>> e) throws Exception {
                    e.onNext(loginApi(userName, password));
                }
            }).//省略后续操作代码
}

没错,就是调用此处我们实现的subscribe方法。
到这里,逆向订阅Observable的过程分析完毕了。
总结:
LambdaObserver -> ObservableObserveOn
ObserveOnObserver -> ObservableSubscribeOn
SubscribeOnObserver -> ObservableDoOnEach
DoOnEachObserver -> ObservableMap
MapObserver -> ObservableCreate
最后调用ObservableOnSubscribe的subscribe方法

有木有感觉思绪逐渐明朗起来!不急,后面还有呢!

3. 逐级调用Observer的onNext方法

MapObserver.onNext
接着上面最后一步进行分析,调用ObservableOnSubscribe的subscribe方法,而我们在subscribe中调用了e.onNext,此处的e是CreateEmmiter对象,进入它的onNext方法:

@Override
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    if (!isDisposed()) {
        observer.onNext(t);
    }
}

逻辑很简单,对发射的数据判空,如果数据为空则抛出异常。如果没有中断事件流,则调用observer.onNext,此处的observer是创建CreateEmitter时传入的MapObserver。
总结:调用MapObserver的onNext方法

DoOnEachObserver
跟踪到MapObserver的onNext方法:

@Override
public void onNext(T t) {
    //省略一些判断
    U v;
    try {
        v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
    } catch (Throwable ex) {
        fail(ex);
        return;
    }
    actual.onNext(v);
}

先通过mapper.appply对数据t做变换,变换之后继续调用DoOnEachObserver的onNext方法。

总结:调用DoOnEachObserver的onNext方法

SubscribeOnObserver.onNext
我们跟到DoOnEachObserver.onNext方法:

@Override
public void onNext(T t) {
    if (done) {
        return;
    }
    try {
        onNext.accept(t);
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        s.dispose();
        onError(e);
        return;
    }

    actual.onNext(t);
}

跟我们预料一样,继续调用下一级observer(SubscribeOnObserver)的onNext方法。

总结:调用SubscribeOnObserver的onNext方法

ObserveOnObserver.onNext
长征的路一步一步走,接着看SubscribeOnObserver.onNext方法:

@Override
public void onNext(T t) {
    actual.onNext(t);
}

这里也很干脆直接,调用上一级observer(ObserveOnObserver)的onNext方法。
总结:调用ObserveOnObserver的onNext方法

LambdaObserver.onNext
进入ObserveOnObserver.onNext方法:

@Override
public void onNext(T t) {
    if (done) {
        return;
    }

    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
    schedule();
}

先把数据加入queue,然后代用schedule方法,这里涉及到线程调度,我们稍后分析,总之最后会调用到LambdaObserver.onNext方法。

到这里,逐级调用Observer的onNext方法也分析完毕了。
总结:
MapObserver.onNext -> DoOnEachObserver.onNext -> SubscribeOnObserver.onNext -> ObserveOnObserver.onNext -> LambdaObserver.onNext

到这里,RxJava整个事件流的原理分析完毕了。回顾一下,包括三个步骤:

  1. 构建操作符对应的Observable
  2. 逐级生成Observer,逆向订阅Observable
  3. 逐级调用Observer的onNext方法

前面很详细讲解了每一个步骤,下面我们用一张图来概括整个过程:
image.png

相信各位同学很容易理解这个图片描述的流程。

线程切换原理

在理解了RxJava2操作符工作原理之后,我们需要分析subscribeOn和observeOn切换线程的原理。

1. subscribeOn

上面提到过,subscribeOn指定被观察者处理事件流所在线程,它作用在subscribe阶段(即图中逆向订阅过程),我们重新看一下ObservableSubscribeOn的订阅过程:

@Override
public void subscribeActual(final Observer<? super T> s) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

    s.onSubscribe(parent);

    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

此处的scheduler,是我们出传入的Schedulers.io(),这是个什么东西呢?

public final class Schedulers {
    @NonNull
    static final Scheduler IO;
    
    @NonNull
    public static Scheduler io() {
        return RxJavaPlugins.onIoScheduler(IO);
    }
    
    static {
        IO = RxJavaPlugins.initIoScheduler(new IOTask());
        //省略其他几个Scheduler初始化过程
    }
    
    static final class IOTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            return IoHolder.DEFAULT;
        }
    }
    
    static final class IoHolder {
        static final Scheduler DEFAULT = new IoScheduler();
    }

//省略其他代码
}

看到这里,我们知道了,Scheduler.io()具体是指IoScheduler对象,IoScheduler继承Scheduler,Scheduler是所有线程调度器的父类,看一下Scheduler的实现:

public abstract class Scheduler {
    @NonNull
    public abstract Worker createWorker();
    
    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
    
    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();
    
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
        DisposeTask task = new DisposeTask(decoratedRun, w);
    
        w.schedule(task, delay, unit);
    
        return task;
    }
    //省略其代码
}

Scheduler是个抽象类,包含一个抽象方法createWorker,返回一个Worker对象。
而它的scheduleDirect方法实际上就是调用这个Worker的schedule方法

继续分析线程切换逻辑,代码中调用了IoScheduler.scheduleDirect方法,实际就是把SubscribeTask交给IoScheduler.createWorker构建的worker去执行。
跟到IoScheduler.createWorker方法:

@NonNull
@Override
public Worker createWorker() {
    return new EventLoopWorker(pool.get());
}

返回了一个EventLoopWorker对象,进入它的schedule方法:

@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
    if (tasks.isDisposed()) {
        // don't schedule, we are unsubscribed
        return EmptyDisposable.INSTANCE;
    }

    return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}

这里也不是真正执行任务的地方,那就继续跟进到ThreadWorker.scheduleActual方法:

@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

    if (parent != null) {
        if (!parent.add(sr)) {
            return sr;
        }
    }

    Future<?> f;
    try {
        if (delayTime <= 0) {
            f = executor.submit((Callable<Object>)sr);
        } else {
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        if (parent != null) {
            parent.remove(sr);
        }
        RxJavaPlugins.onError(ex);
    }

    return sr;
}

到这里,总算看到了,通过线程池分配线程来执行任务。
总结:subscribeOn(Schedulers.io())会在逆向订阅步骤中,通过线程池分配一个子线程来执行任务。

图中通过粉色和红色的箭头区分了UI线程和子线程,走到订阅ObservableSubscribeOn时,从UI线程切换到子线程,箭头从粉色变为红色,之后的逆向订阅操作都在子线程中进行

2. observeOn

接下来分析observeOn,它是指定观察者(订阅者)处理事件所在线程。我们传入的是AndroidScheduler.mainThread(),这又是个什么东西呢?

public final class AndroidSchedulers {

    private static final class MainHolder {

        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });

    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }

    public static Scheduler from(Looper looper) {
        if (looper == null) throw new NullPointerException("looper == null");
        return new HandlerScheduler(new Handler(looper));
    }

    private AndroidSchedulers() {
        throw new AssertionError("No instances.");
    }
}

看到这段代码,我们就知道了,AndroidScheduler.mainThread指的是HandlerThread,在它构造方法中会传入一个主线程Handler,相信不用解释,各位同学明白这个Handler的作用吧。对,,,就是用来把观察者执行逻辑切换到主线程。

那么,具体是在哪个过程切换的呢?执行ObserveOnObserver的onNext阶段

代码如下:

@Override
public void onNext(T t) {
    if (done) {
        return;
    }

    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
    schedule();
}

先把事件流中的数据t加入队列queue,然后执行schedule方法:

void schedule() {
    if (getAndIncrement() == 0) {
        worker.schedule(this);
    }
}

这个worker相信大家猜到了,就是HandlerScheduler的createWorker得到的对象,把this作为任务(ObserveOnObserver实现了Runnable),交给worker执行:

@Override
public Worker createWorker() {
    return new HandlerWorker(handler);
}

这里的worker是HandlerWorker对象,继续看它的schedule方法:

@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
    //省略一些判断

    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

    Message message = Message.obtain(handler, scheduled);
    message.obj = this; // Used as token for batch disposal of this worker's runnables.

    handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

    // Re-check disposed state for removing in case we were racing a call to dispose().
    if (disposed) {
        handler.removeCallbacks(scheduled);
        return Disposables.disposed();
    }

    return scheduled;
}

到这里,总算守得云开见日出。通过Handler发送消息把任务切换到主线程执行。

这个任务就是刚才提到的ObserveOnObserver,我们看一下它的run方法:

@Override
public void run() {
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
    }
}

这里会根据outputFused走不通的逻辑,正常情况下都会走else逻辑,我们就只分析这条分支。

void drainNormal() {
    int missed = 1;

    final SimpleQueue<T> q = queue;
    final Observer<? super T> a = actual;

    for (;;) {
        if (checkTerminated(done, q.isEmpty(), a)) {
            return;
        }

        for (;;) {
            boolean d = done;
            T v;

            try {
                v = q.poll();
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                s.dispose();
                q.clear();
                a.onError(ex);
                worker.dispose();
                return;
            }
            boolean empty = v == null;

            if (checkTerminated(d, empty, a)) {
                return;
            }

            if (empty) {
                break;
            }

            a.onNext(v);
        }

        missed = addAndGet(-missed);
        if (missed == 0) {
            break;
        }
    }
}

这段代码比较长,但仔细看下来,逻辑还是比较简单,就是从queue中获取数据,然后把数据交给actual(LambdaObserver)的onNext方法。到这里,经过转换的数据交给我们传入的Comsumer,在主线程中处理,observeOn切换线程的逻辑分析完毕

还是可以看回那张图,当执行ObserveOnObserver.onNext时,就从子线程切换回UI线程,箭头变成粉色
总结:observeOn作用于onNext阶段

总结

这篇文章很长,相信看完的同学肯定会有所收获,对RxJava2有更好的认识。
当然,除了本文举例场景中的操作符,RxJava2还提供了很有强的而好用的操作符,各位同学可以学习学习。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,088评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,715评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,361评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,099评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,987评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,063评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,486评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,175评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,440评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,518评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,305评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,190评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,550评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,152评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,451评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,637评论 2 335

推荐阅读更多精彩内容