RxJava线程切换流程分析_subscribeOn

在上一小节中RxJava2_整体流程分析,有这么一个结论,那就是每一次调用 Observable 的操作符都会返回一个新的 Observable 对象,并且会通过构造的方式传入上一级创建的 Observable 对象,将其保存起来,下面是示例代码。那么接下来操作的 subscribeOn、observeOn 操作符都会分别创建新的 Observable 对象,并存储上一级创建的 observable。

//上一级创建的 observable 对象:ObservableOnSubscribe
Observable.create(new ObservableOnSubscribe<String>() {...}

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;
    //保存上一级创建的 Observable 对象 : ObservableOnSubscribe
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
}

一、执行流程图

RxJava2_执行流程分析图.png

二、示例代码

下面这段代码的功能就是在 subscribe 方法内部通过调用 getBitampFormServer 去请求一个 Bitmap 对象,这个方法是耗时操作,当前的操作应该在子线程中执行,得到 bmp 之后,根据结果分别去调用 onNext() /onError() 方法。而在订阅者中若是 onNext 被回调则表示成功获取到 bmp,对应地将其设置给对应的 mImageView 对象上,如果 onError 被回调了,那么表示加载 Bitmap 是失败的,对应的再做一些其它操作,这些操作应该在主线程中进行。本次通过从源码的角度探究的是 RxJava2 内部是如何进行线程切换操作的。本小节先分析 subscribeOn 如何去实现事件源在子线程中发射事件。也就是 ObservableOnSubscribe#subscribe 在子线程中去执行。

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Bitmap> e) throws Exception {
     
        //该方法进行网络请求,是比较耗时的操作。
        Bitmap bmp = getBitampFormServer("uri");
        if(bmp!=null) {
            //获取 bmp 成功
            e.onNext(bmp);
            e.onComplete();
        }else{
            //如果从网络加载图片不成功,回调onError 来通知订阅者
            e.onError(new Exception("图片加载出错啦"));
        }
    }}) //事件源发射事件在子线程中运行
        .subscribeOn(Schedulers.io())
        //订阅者在主线程中接受事件
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<Bitmap>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("zeal", "onSubscribe");
            }
            @Override
            public void onNext(@NonNull Bitmap bmp) {
                //设置显示在 ImageView 上
                mImageView.setImageBitmap(bmp);             
            }
            @Override
            public void onError(@NonNull Throwable e) {
                Log.e("zeal","error:"+e.toString());
            }
            @Override
            public void onComplete() {
                Log.e("zeal", "onComplete");
            }
        });

2、.subscribeOn(Schedulers.io()) 源码分析

public final Observable<T> subscribeOn(Scheduler scheduler) {
    ...
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

2.1、Scheduler

从下面的类注释可以知道,这个类是一个调度类,可以延时/周期性地去执行一个任务。可以从 Schedulers 这个类去获取 Scheduler 的实现子类对象,例如在频繁进行 io 操作就可以调用 Schedulers.io() ,如果是计算比较多的可以调用 Schedulers.computation()。

/**
 * A {@code Scheduler} is an object that specifies an API for scheduling
 * units of work with or without delays or periodically.
 * You can get common instances of this class in {@link io.reactivex.schedulers.Schedulers}.
 */
public abstract class Scheduler {}

2.2、Schedulers.io()

通过下面的 Schedulers.io() 源码跟踪,最终返回的是一个 IoScheduler 对象,这个对象实际上就是 Scheduler 的子类对象。那么就符合 subscribeOn(Scheduler) 参数的要求了。

@NonNull
public static Scheduler io() {
    //内部是 IO 
    return RxJavaPlugins.onIoScheduler(IO);
}
//-----------------------------------------------------
@NonNull
static final Scheduler IO;

static {
    ...
    // IO 是在静态代码块中实例化的
    IO = RxJavaPlugins.initIoScheduler(new Callable<Scheduler>() {
        @Override
        public Scheduler call() throws Exception {
            //这里返回一个 IoHolder 对象。
            return IoHolder.DEFAULT;
        }
    });
    ...
}
//-----------------------------------------------------
static final class IoHolder {
    static final Scheduler DEFAULT = new IoScheduler();
}

//-----------------------------------------------------
//IoHolder 类定义中可以知道,该类是继承至 Scheduler
public final class IoScheduler extends Scheduler {}

2.3、subscribeOn 内部实现

  • subscribeOn(Scheduler scheduler)

这个方法内部会通过创建一个 ObservableSubscribeOn 对象,根据之前的经验可知道,这个类肯定也是一个 Observable 的子类对象。因此对于 subscribe(observer) 方法而言,我们就只关心它真正调用的方法 subscribeActual(observer) 。

  • subscribeActual(observer)

在subscribeActual 内部首先是对 observer 进行包装成 SubscribeOnObserver 对象。这里的 SubscribeOnObserver 不仅是一个 Observer ,而且具备一个连接器的作用 Disposable 。

@Override
public void subscribeActual(final Observer<? super T> s) {
    //包装 observer 对象
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    //将连接器 parent 通过 onSubscribe 回调给 observer 对象
    s.onSubscribe(parent);
    //这里是通过 scheduler 去执行一个任务 SubscribeTask。
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
  • SubscribeOnObserver

这个类是对 observer 的包装,内部实现了 Observer 和 Disposable 接口。也就是说它既有订阅者的功能,也实现了连接器的功能。注意 actual 这个变量,它是下一级的 Observer 对象,为什么说是下一级呢?因为每次包装的 Observer 是一级级别往上被订阅的,当前的 Observer 都会包装下一级别的 Observer 对象。例如 SubscribeOnObserver 就封装了下一级的 Observer 对象,其实就是当前 Observer 接受到事件源发送过来的事件时,再调用包装的 Observer 回调给下一级,这样一级级传递下去知道最后一级 Observer。

static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
    ...
    final Observer<? super T> actual;
    final AtomicReference<Disposable> s;
    SubscribeOnObserver(Observer<? super T> actual) {
        this.actual = actual;
        this.s = new AtomicReference<Disposable>();
    }
    @Override
    public void onSubscribe(Disposable s) {
        DisposableHelper.setOnce(this.s, s);
    }
    //发送事件
    @Override
    public void onNext(T t) {
        //回调给下一级
        actual.onNext(t);
    }
    //发送事件
    @Override
    public void onError(Throwable t) {
        //回调给下一级
        actual.onError(t);
    }
    //发送事件
    @Override
    public void onComplete() {
        //回调给下一级
        actual.onComplete();
    }
    @Override
    public void dispose() {
        DisposableHelper.dispose(s);
        DisposableHelper.dispose(this);
    }
    @Override
    public boolean isDisposed() {
        return DisposableHelper.isDisposed(get());
    }
    void setDisposable(Disposable d) {
        DisposableHelper.setOnce(this, d);
  • SubscribeTask(parent)

SubscribeTask 它是一个 Runnbale ,因此我们把它理解为一个任务。首先关注是它的 run 方法,它内部实现很简单,就是**通知上一级的 Observable 通过 subscribe 这个方法进行订阅当前 observer **。下面会执行一大堆代码,其实都会为创建一个线程然后交给指定的线程池取执行这个任务,先记住这个任务的使命。那么既然是一个线程,那么肯定有一个地方需要执行这个线程的,接下来关注 scheduler.scheduleDirect 方法。

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;
    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }
    @Override
    public void run() {
        //【核心代码,这段代码决定上一级observable订阅在哪个线程执行。】
        //source:就是上一级创建的 observable
        //parent 就是包装后的 observer
        source.subscribe(parent);
    }
}

开始寻找 SubscribeTask 这个线程实在哪里被执行的。

  • scheduler.scheduleDirect(new SubscribeTask(parent))

刚才分析过 scheduler 就是 IoScheduler 对象了,跟踪源码发现,这个类并没有重写这个方法,因此直接进入 Scheduler 查看。

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

//这里的 delay = 0,也就是马上执行这个任务。
//【这个 run 就是我们的目标】
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    
    //核心代码 createWorker() 创建一个可以可以执行 run 的 worker 
    final Worker w = createWorker();
    //对 run 进行了包装,实际上还是 run 这个对象。【这个 run 就是我们的目标】
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
    //decoratedRun 交给了 worker 去执行
    w.schedule(new Runnable() {
        @Override
        public void run() {
            try {
                【我们的目标在此处被执行】
                decoratedRun.run();
            } finally {
                //事件源发射事件完毕之后,就关闭连接器。
                w.dispose();
            }
        }
    }, delay, unit);
    return w;
}
  • IoScheduler#createWorker();

现在我们知道我们的任务是交给 worker.schedule() 去执行的。因为 Worker 是负责去执行调度的,因此不同的子类会有不同的 Worker 的实现,在 Scheduler 中通过 createWorker() 来获取子类实现的 Worker 对象。

@Override
public Worker createWorker() {
    return new EventLoopWorker(pool.get());
}
  • Scheduler#Worker

这个类具备延迟执行任务,周期性执行任务的功能。所有的执行都是基于 schedule() 方法,而这个方法是一个抽象方法,也就是它无法知道子类需要怎么执行这个任务,因为每一种调度器执行的方式 schedule 都不一样,因此交给子类去实现。

 @NonNull
 public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
  • EnentLooerWorker#schedule()

有了 Worker 之后就要开始执行【我们的任务 action 啦】

@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
    if (tasks.isDisposed()) {
        // don't schedule, we are unsubscribed
        return EmptyDisposable.INSTANCE;
    }
    //【任务 action 】交给 threadWorker 去执行
    return threadWorker.scheduleActual(action, delayTime, unit, tasks);
  • threadWorker.scheduleActual

threadWorker 是 ThreadWorker ,继承至 NewThreadWorker 。

static final class ThreadWorker extends NewThreadWorker 

//NewThreadWorker 内部维护一个线程池 executor。
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;

    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }
    

//最终代码会走到这里
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
    //对 run 进行包装
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    if (parent != null) {
        if (!parent.add(sr)) {
            return sr;
        }
    }
    Future<?> f;
    try {
        //上面已经提到,delayTime = 0;所以这个任务会被立即执行,
        if (delayTime <= 0) {
            f = executor.submit((Callable<Object>)sr);
        } else {
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        if (parent != null) {
            parent.remove(sr);
        }
        RxJavaPlugins.onError(ex);
    }
    return sr;
}

@Override
public void run() {
    try {
        try {
            //执行原始的 run 方法。
            actual.run();
        } catch (Throwable e) {
            // Exceptions.throwIfFatal(e); nowhere to go
            RxJavaPlugins.onError(e);
        }
    } finally {
        Object o = get(PARENT_INDEX);
        if (o != DISPOSED && o != null && compareAndSet(PARENT_INDEX, o, DONE)) {
            ((DisposableContainer)o).delete(this);
        }
        for (;;) {
            o = get(FUTURE_INDEX);
            if (o == DISPOSED || compareAndSet(FUTURE_INDEX, o, DONE)) {
                break;
            }
        }
    }
}

2.4、 结果

f = executor.submit((Callable<Object>)sr); 这里执行了 SubscribeTask#run() 方法,也就是当前的订阅者 Observer 订阅了上一级的 Observable 。也就是上一级的 ObservableCreate.subscribe(observer) 被执行了。请注意它是在子线程中被执行的。如果想要了解接下来的事件源是怎么发送事件的可以参考RxJava2_整体流程分析

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,968评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,601评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,220评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,416评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,425评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,144评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,432评论 3 401
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,088评论 0 261
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,586评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,028评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,137评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,783评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,343评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,333评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,559评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,595评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,901评论 2 345

推荐阅读更多精彩内容