RxJava系列_02线程调度

  • 1、关于线程调度的例子, 就只用过几个操作符, 所以只针对这几个操作符进行源码阅读;
  • 2、关于线程调度, 有下面几个api需要分析:
Schedulers.newThread();
AndroidSchedulers.mainThread()
Schedulers.io();
  • 3、关于newThread与io, 是如何操作线程池?
  • 4、demo1讲Schedulers.newThread(), demo2讲Schedulers.io()
  • 5、切记一句话, 一旦看晕了, 赶紧翻到最后结合流程图尝试对当前片段的理解;

demo1 :

Observable
        .create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                LogUtils.log(Note01.class, "subscribe()->ThreadName:" + Thread.currentThread().getName());
                emitter.onNext(1);
                emitter.onComplete();
            }
        })
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                LogUtils.log(Note01.class, "onSubscribe()->ThreadName:" + Thread.currentThread().getName());
            }

            @Override
            public void onNext(Integer value) {
                LogUtils.log(Note01.class, "onNext()->ThreadName:" + Thread.currentThread().getName());
            }

            @Override
            public void onError(Throwable e) {
                LogUtils.log(Note01.class, "onError()->ThreadName:" + Thread.currentThread().getName());
            }

            @Override
            public void onComplete() {
                LogUtils.log(Note01.class, "onComplete()->ThreadName:" + Thread.currentThread().getName());
            }
        });
  • 打印结果:
04-24 21:05:57.418 3141-3141/ Note01->onSubscribe()->ThreadName:main
04-24 21:05:57.418 3141-3241/ Note01->subscribe()->ThreadName:RxNewThreadScheduler-1
04-24 21:05:57.418 3141-3141/ Note01->onNext()->ThreadName:main
04-24 21:05:57.418 3141-3141/ Note01->onComplete()->ThreadName:main

一、Schedulers.newThread:

1.1 Schedulers.newThread
public final class Schedulers {
    static final Scheduler NEW_THREAD;

    static {
        NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new Callable<Scheduler>() {
            @Override
            public Scheduler call() throws Exception {
                return NewThreadHolder.DEFAULT;
            }
        });
    }

    public static Scheduler newThread() {
        return NEW_THREAD;
    }

    static final class NewThreadHolder {
        static final Scheduler DEFAULT = NewThreadScheduler.instance();
    }
}
public final class NewThreadScheduler extends Scheduler {

    public static NewThreadScheduler instance() {
        return INSTANCE;
    }

    private static final NewThreadScheduler INSTANCE = new NewThreadScheduler();
}
  • 主要是构建Schedules的实例, 实际指向NewThreadScheduler, 给这里的Schedulers打算标签,Schedulers_1(NewThreadScheduler);
1.2 Observable.subscribeOn:
public abstract class Observable<T> implements ObservableSource<T> {
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        return new ObservableSubscribeOn<T>(this, scheduler);
    }
}

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    /** 
     * 1. ObservableSubscribeOn持有Observable_1(ObservableCreate)的引用, 
     *    ObservableSubscribeOn持有Scheduler_1(NewThreadScheduler)的引用, 继续模块<1.4>;
     * 2. 给此处返回的ObservableSubscribeOn打上标签Observable_2(ObservableSubscribeOn);
     */
    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }
}
1.3 AndroidSchedulers.mainThread:
public final class AndroidSchedulers {

    private static final class MainHolder {

        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });

    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }
}
  • 创建一个持有主线程Handler的HandlerScheduler实例, 给此处的HandlerScheduler打上标签, Schedulers_2(HandlerScheduler);
1.4 Observable.observeOn:
public abstract class Observable<T> implements ObservableSource<T> {
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        /**
         * 1. 给此处构造的实例ObservableObserveOn打上标签为Observable_3(ObservableObserveOn),
         * 而这里的this指向的是Observable_2(ObservableSubscribeOn);
         * 2. 同时Observable_3(ObservableObserveOn)持有Schedulers_2(HandlerScheduler)的引用;
         */
        return new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize);
    }
}

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
    }
}
  • 1.1 ~ 1.4仅仅是进行了初始化实例的操作, 很关键的一点是每次调用都会返回一个Observable对象, 该Observable对象会持有前一个Observable的引用, 这点也是RxJava链式调用的一个核心;
1.5 Observable.subscribe:
1.6 Observable3(ObservableObserveOn).subscribe:
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Observer<? super T> actual;
    protected final ObservableSource<T> source;
    /**
     * Observable3(ObservableObserveOn).subscribe最终会触发subscribeActual的执行;
     */
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        Scheduler.Worker w = scheduler.createWorker();
        /**
         * 1. 然后传入Observer的实例, 此处给Observer打上标签Observer_1(Observer);
         * 2. 然后将Observer_1(Observer)与w(HandlerScheduler)封装进ObserveOnObserver,
         *    给此处的ObserveOnObserver打上标签Observer_2(ObserveOnObserver);
         * 3. 然后通过subscribe将Observer_2(ObserveOnObserver)传给Observable2(ObservableObserveOn);
         */
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

final class HandlerScheduler extends Scheduler {
    private final Handler handler;

    HandlerScheduler(Handler handler) {
        this.handler = handler;
    }
    @Override
    public Worker createWorker() {
        return new HandlerWorker(handler);
    }
}
1.7 Observable_2(ObservableSubscribeOn).subscribeActual:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    @Override
    public void subscribeActual(final Observer<? super T> s) {
        /**
         * 1. 给此处的parent打上标签Observer_3(SubscribeOnObserver);
         * 2. Observer_3(SubscribeOnObserver)持有Observer_2(ObserveOnObserver)的引用;
         * 3. Observer_2(ObserveOnObserver)通过onSubscribe持有
         *    Observer_3(SubscribeOnObserver)的引用, 又是一个相互持有的过程;
         */
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        
        /**
         * Observer_2(ObserveOnObserver)通过onSubscribe持有
         * Observer_3(SubscribeOnObserver)的引用, 又是一个相互持有的过程进入到模块<1.8>;
         */
        s.onSubscribe(parent);
        /**
         * 1. 此处的schedule实际为Scheduler_1(NewThreadScheduler), 进入模块<1.9>中;
         */
        scheduler.scheduleDirect(new Runnable() {
            @Override
            public void run() {
                /**
                 * 1. 通过对模块<1.9>的分析可知, source.subscribe(parent)运行在子线程中;
                 * 2. 此处的source指向Observable_1(ObservableCreate), 通过subscribe, 
                 *    Observable_1(ObservableCreate)持有parent即Observer_3(SubscribeOnObserver)
                 *    的引用, 然后进入模块<1.10>;
                 */
                source.subscribe(parent);
            }
        });
    }

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        final Observer<? super T> actual;

        final AtomicReference<Disposable> s;

        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }
}
1.8 Observer_2(ObserverOnObserver).onSubscribe:
static final class ObserverOnObserver<T> implements Observer<T>, Runnable {

    final Observer<? super T> actual;
    final Scheduler.Worker worker;
    Disposable s;
    /**
     * 1. 此处的ObserverOnObserver为Observer_2(ObserverOnObserver), 在模块<1.6>中被创建;
     * 2. 有模块<1.6>可知, 此处的actual指向Observer_1(Observer);
     * 3. 而worker指向了Worker(HandlerScheduler);
     */
    ObserverOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) { 
        this.actual = actual;
        this.worker = worker;
    }

    @Override
    public void onSubscribe(Disposable s) {
        /**
         * 类似于递归的方式, 依次往上调用, 直到调用到Observer1.onSubscribe为止;
         * 到目前为止还没有发现有子线程的迹象, 所以此时Observer.onSubscribe(Dispose)运行在主线程;
         */
        actual.onSubscribe(this);
    }
}
1.9 NewThreadScheduler.scheduleDirect:
public abstract class NewThreadScheduler {
    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        w.schedule(new Runnable() {
            @Override
            public void run() {
                try {
                    decoratedRun.run();
                } finally {
                    w.dispose();
                }
            }
        }, delay, unit);

        return w;
    }

    @Override
    public Worker createWorker() {
        return new NewThreadWorker(THREAD_FACTORY);
    }
}

public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;

    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }
}
  • 创建线程池, 使run运行在子线程中;
1.10 Observable_1(ObservableCreate).subscribe:
public final class ObservableCreate<T> extends Observable<T> {
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        /**
         * 1. 此处的observer实际为Observer_3(SubscribeOnObserver);
         * 2. source实际为ObservableOnSubscribe, 通过subscribe持有CreateEmitter的引用;
         */ 
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
        source.subscribe(parent);
    }
}
  • 目前算是完成了观察者观察事件的注册, 接下来被观察者通过CreateEmitter发送几个事件看看会发生些什么;
1.11 CreateEmitter.onNext:
static final class CreateEmitter<T> extends AtomicReference<Disposable>  implements ObservableEmitter<T>, Disposable {

    final Observer<? super T> observer;

    CreateEmitter(Observer<? super T> observer) {
        this.observer = observer;
    }

    @Override
    public void onNext(T t) {
        if (!isDisposed()) {
            /**
             * 为Observer_3(SubscribeOnObserver)触发onNext的执行, 进入到模块<1.12>;
             */
            observer.onNext(t);
        }
    }
}
1.12 Observer_3(SubscribeOnObserver).onNext:
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

    final Observer<? super T> actual;

    final AtomicReference<Disposable> s;

    @Override
     public void onNext(T t) {
        /**
         * Observer_3(SubscribeOnObserver)在模块<1.7>进行初始化, 然后传入Observer_2(ObserveOnObserver)
         * 使actual指向Observer_2(ObserveOnObserver);
         */
        actual.onNext(t);
    }
}
1.13 Observer_2(ObserveOnObserver).onNext:
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {
    @Override
    public void onNext(T t) {
        if (done) {
            return;
        }
        schedule();
    }

    void schedule() {
        if (getAndIncrement() == 0) {
           /**
            * Observer_2(ObserveOnObserver)在模块<1.6>处被初始化, 并且传入HandlerWorker使worker
            * 指向worker, 然后触发Observer_2(ObserveOnObserver)的run方法的执行;
            */
            worker.schedule(this);
        }
    }
    /**
     * 通过阅读HandlerWorker源码可知, HandlerWorker通过schedule将run运行在主线程中;
     */
    @Override
    public void run() {
        if (outputFused) {
            drainFused();
        } else {
            drainNormal();
        }
    }
    void drainNormal() {
        /**
         * actual指向的是Observer_1(Observer);
         */
        actual.onNext(v);
    }
}

关于demo1的流程图:

关于demo1的流程图
  • 1、关于上面的流程大致可以看清楚RxJava的套路了, 首先是Observable<N>持有Observable<N-1>的引用, 然后在Observable<N>的subscribe中传入Observer1, 并创建Observer2, Observer2持有Observer1的引用, 然后将Observer2传给Observable<N-1>, 采用递归的方式, 直到Observable<1>持有Observer<N>的引用为止, 此时将Observer<N>传给CreateEmitter, 然后当执行CreateEmitter.onNext事件时, Observer.onNext的方式为Observer<N> ---> Observer<1>;
  • 2、在1的总结中也可以看出来, Observable与Observer的索引为逆序持有;

demo2:

Observable
        .create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                LogUtils.log(Note01.class, "subscribe()->ThreadName:" + Thread.currentThread().getName());
                emitter.onNext(1);
                emitter.onComplete();
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {
                LogUtils.log(Note01.class, "onSubscribe()->ThreadName:" + Thread.currentThread().getName());
            }

            @Override
            public void onNext(Object value) {
                LogUtils.log(Note01.class, "onNext()->ThreadName:" + Thread.currentThread().getName());
            }

            @Override
            public void onError(Throwable e) {
                LogUtils.log(Note01.class, "onError()->ThreadName:" + Thread.currentThread().getName());
            }

            @Override
            public void onComplete() {
                LogUtils.log(Note01.class, "onComplete()->ThreadName:" + Thread.currentThread().getName());
            }
       });

关于demo2, 只分析Scheduler.io, 其他的在demo1里面已经进行了分析, demo1Schedulers.newThread每次都会创建一个新的线程池;

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,911评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,014评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 142,129评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,283评论 1 264
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,159评论 4 357
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,161评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,565评论 3 382
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,251评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,531评论 1 292
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,619评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,383评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,255评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,624评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,916评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,199评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,553评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,756评论 2 335

推荐阅读更多精彩内容