Rxjava源码解读笔记:线程、map数据操作

一点牢骚:
前段时间,接到需求,旧项目要增添许多功能;旧项目是这样的:功能以及代码量就非常庞大,加上各种代码不规范、可读性很差、代码耦合度有点小高;
听到这个消息真的让我脑袋大了一圈,
如果真的要在原有架构上做开发,肯定会导致小组成员开发冲突以及众多的冗余代码,浪费时间和精力在非必要的事情上,之前自身也知道旧项目有这个问题 但由于新项目开发呀嫌弃旧项目一直没有决心去改动,这下好了完全推不了 那就改架构吧,新的模式是 组件化+Rxjava.Retrofit+MVP模式,最近一直在忙着项目代码架构调整,相对应的代码模板编写等等,虽然说改架构是被逼的,但改着改着还是有成长以及很有成就感的一件事情; 再接再厉。


说实话,rxjava的源码太难了,一直没有去时间(懒癌)去学习; 包括现在项目比较紧张,每天下班后更是不太想去学习,那么现在我就和大家一起看一下rxjava的源码吧;

1、正常简易流程;
2、带线程切换流程;
3、map之后;
4、一些总结

1、正常简易流程

基于以下这段代码查看源码

   Observable.just("11")
            .subscribe(observer);

大家应该都知道或者听过,Rxjava采用的是 增强版的观察者模式,在订阅的那一瞬间开始执行整个流程,那么现在看一下订阅方法subscribe(Observer<? super T> observer)

Observable.class
    @Override
    public final void subscribe(Observer<? super T> observer) {
        //..
        // 实际订阅
            subscribeActual(observer);
        //...
    }
    
    
RxJavaPlugins.class
    public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
        BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
        
        if (f != null) {
            return apply(f, source, observer);
        }
        return observer;
    }
    
    static <T, U, R> R apply(@NonNull BiFunction<T, U, R> f, @NonNull T t, @NonNull U u) {
        try {
            return f.apply(t, u);
        } catch (Throwable ex) {
            throw ExceptionHelper.wrapOrThrow(ex);
        }
    }

看到这里实际订阅是发生在 observable 的 subscribeActual 中 而 subscribeActual是个抽象方法; 那么我们又要去找它的实现;
这边通过Observable.just开始看

Observable.calss

  public static <T> Observable<T> just(T item) {
        //...
        return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
    }

ObservableJust.class
    
    protected void subscribeActual(Observer<? super T> s) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
        //调用 observer的 onSubsribe方法
        s.onSubscribe(sd);
        //执行
        sd.run();
    }
    
ScalarDisposable.calss
   public void run() {
   // 判断什么的
    if (get() == START && compareAndSet(START, ON_NEXT)) {
    // 
        observer.onNext(value);
        if (get() == ON_NEXT) {
            lazySet(ON_COMPLETE);
            observer.onComplete();
        }
    }
}

可以看到 run是直接执行的;
整体的一个简单正常的流程就是: observable.subscribe(Observer) -> observable.subscribeActual -> Observer.onSubscribe( Disposable ) -> ScalarDisposable.run -> observer.onNext(value) -> observer.onComplete();

简易源码流程——01

其中正常完整流程都会执行标红部分的方法;其中其它部分先放着,只是判断有没有完成完成所有数据流的发射

2、线程切换流程

基于以下这段代码查看源码

Observable.just("11")
        .subscribeOn(Schedulers.io())//指定Observable 在哪个线程上创建执行操作
        .observeOn(AndroidSchedulers.mainThread()) //在指定下一事件发生的线程
        .subscribe(observer);

2.1、 流向 Observable.subscribe 都经历了什么

先看下 Observable.subscribeOn都做了些什么

Observable.class
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        //
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
    
ObservableSubscribeOn.class    本质上继承 Observable
    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        //保存以及初始化
        super(source);
        this.scheduler = scheduler;
    }

可以看就就是转换变成了 ObservableSubscribeOn

再看下 Observable.observeOn(Scheduler scheduler) 做了些什么

Observable.class  这边应该是: ObservableSubscribeOn extends .... Observable
    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

ObservableObserveOn.class 
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
    super(source);
    this.scheduler = scheduler;
    this.delayError = delayError;
    this.bufferSize = bufferSize;
}

本质是将Observable转换成ObservableObserveOn ,在这个流程中是将 ObservableSubscribeOn 转换成ObservableObserveOn;

我们的Observable变换是这样子的,ObservableJust->ObservableSubscribeOn->ObservableObserveOn
一层一层被包含

Obserable转换

2.2、流向 -> Observer.onSubscribe 都经历了什么

那么又到了我们的 订阅方法subscribe(Observer<? super T> observer)了,只不过我们中间多了几层转换; 我们再来看一下

Observable.class
    @Override
    public final void subscribe(Observer<? super T> observer) {
        //...
        // 实际订阅
            subscribeActual(observer);//...
    }
ObservableObserveOn.class
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
        //创建一个  Scheduler.Worker
            Scheduler.Worker w = scheduler.createWorker();
        //   new一个新的 ObserveOnObserver implements Observer 再次循环  Observable.subscribe
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    
ObserveOnObserver.class .... implements Observer<T>, Runnable
    
Observable.class
    @Override
    public final void subscribe(Observer<? super T> observer) {
        //..
        // 实际订阅
            subscribeActual(observer);
        //...
    }

ObservableSubscribeOn.class 
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        //直接执行,what? Observer.onSubscribe 不能指定线程   
        // 记录一下   Observer.onSubscribe 的入口是
        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

Observer的转变是这样的 Observer->ObserveOnObserver->SubscribeOnObserver

以上面为准,先看下 s.onSubscribe(parent)所经历的事情

ObserveOnObserver.class 
    @Override
    public void onSubscribe(Disposable s) {
        if (DisposableHelper.validate(this.s, s)) {
            this.s = s;
            if (s instanceof QueueDisposable) {
                @SuppressWarnings("unchecked")
                QueueDisposable<T> qd = (QueueDisposable<T>) s;

                int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

                if (m == QueueDisposable.SYNC) {
                    sourceMode = m;
                    queue = qd;
                    done = true;
                    actual.onSubscribe(this);
                    schedule();
                    return;
                }
                if (m == QueueDisposable.ASYNC) {
                    sourceMode = m;
                    queue = qd;
                    actual.onSubscribe(this);
                    return;
                }
            }

            queue = new SpscLinkedArrayQueue<T>(bufferSize);

//   看这里  actual 其实是  Observer ;
            actual.onSubscribe(this);
        }
    }
    
Observer.class 
    onSubscribe(sd){...}

这里究竟可以看到 执行到 最初observeronSubscribe的一条完整的线路;
ObserveOnObserver.subscribeActual -> ObservableSubscribeOn.subscribeActual -> ObserveOnObserver.onSubscribe -> Observer.onSubscribe ;
不知道有没有细心的同学发现了没有,'onSubscribe'的执行没有SubscribeOnObserver什么事情,虽然说上面有一层转换成功了SubscribeOnObserver
画成图应该就是下面这样:

onSubscribe执行链

我们发现了 从订阅开始一直到执行我们的 observer.onSubscribe() 中间没有任何切换线程的影子;
所以我们得出了一个

observer的 onSubscribe 运行与订阅动作发生在同一线程,不受线程指定方法(observeOn subscribeOn)影响

2.3、流向 -> observer.next、onComplete 都经历了什么

ObservableSubscribeOn.class 
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        s.onSubscribe(parent);

//      new 出一个  SubscribeTask
//      scheduler.scheduleDirect 切换线程执行  SubscribeTask
//      SubscribeOnObserver.setDisposable方法

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

可以看到上面最后一段代码做个这样事情,一件一件去看一下:

// new 出一个 SubscribeTask
// scheduler.scheduleDirect 切换线程执行 SubscribeTask
// SubscribeOnObserver.setDisposable方法

先看一下SubscribeTaskrun 里面是干嘛的

ObservableSubscribeOn.class
    class SubscribeOnObserver
        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }
        
    class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            //其中  source 是  ObservableJust  
            source.subscribe(parent);
        }
    }

由第一节的分析我们可以知道,这边最终会执行到 SubscribeOnObserver.onNext() -> ObserveOnObserver.onNext()->Observer.onNext() 这边一层一层调用出来;

SubscribeTask.run 最终执行我们的 最初observer.onNext() onComplete(); 这边还没有涉及到线程切换

再看我们的 scheduler.scheduleDirect(new SubscribeTask)
我们上面用的是 Scheduler.IO 实际上是 IoScheduler;

IoScheduler extends Scheduler.class
    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();    

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        //指定工作线程
        w.schedule(task, delay, unit);

        return task;
    }
    @Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }
    
    EventLoopWorker extends Scheduler.Worker 
       @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }

            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    

那么这边流程就比较清晰了,拿到subscribeOn 设置的Scheduler中创建一个Worker 设定了一个 IO 线程;
看到这里 我们就该逆向地执行我们 Observer 真正的方法了;
执行到 SubscribeOnObserver.onNext()

ObservableSubscribeOn : SubscribeOnObserver<T> 
        @Override
        public void onNext(T t) {
        // actual 为 ObserveOnObserver
            actual.onNext(t);
        }
//  scheduler  这边指定为  AndroidSchedulers.mainThread()    createWorker() 这边不深究,里面转成了 handler
Scheduler.Worker worker = scheduler.createWorker();
ObservableObserveOn : ObserveOnObserver
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }
        void schedule() {
            if (getAndIncrement() == 0) {
            // 这个最终 执行在handler
                worker.schedule(this);
            }
        }

最后的流程应该是这样的


线程切换

3、map 数据操作源码

Observable.just(1)
        .map(new Function<Integer, Integer>() {
            @Override
            public Integer apply(@NonNull Integer integer) throws Exception {
                return null;
            }
        }).subscribe(integer -> out("accept:" + integer));
Observable.class
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }
    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
    
   MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }

            U v;

            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            actual.onNext(v);
        }

可以看到 它是在 执行完 function.apply在执行 onNext();
配合上一节 ,流程图就变成这样了


加了map以后的流程

4、一些总结

来个总结吧: 估计源码看得很混乱。

1、对Observable指定线程、数据变换等等,都采用了一种代理包装模式; 比如 ObservableJust-> ObservableSubscribeOn -> ObservableMap -> ObservableObserveOn ; 进行了一层包装;
2、在订阅完成的那一刻起,反向调用 subscribe():subscribeActual()方法;比如 :(ObservableObserveOn.subscribe->ObservableObserveOn.subscribeActual())->(ObservableMap.subscribe->ObservableMap.subscribeActual())->(ObservableSubscribeOn.subscribe->ObservableSubscribeOn.subscribeActual())->(ObservableJust.subscribe->ObservableJust.subscribeActual())
3、Observer ,同理包装 Observer -> ObservableMap... 添加了指定 Schedulers.createWorker() ;
4、 Observer 的执行顺序是 Observer.onSubscribe() -> ObservableXX.onNext() -> ObsevableXXX.onNext() ->...-> Obsever.onNext() -> ObservableXX.OnComplete() -> ObsevableXXX.OnComplete() ->...-> Obsever.OnComplete();
5、 中间有些操作放入到了线程当中.

其实有点坑的是:原本我就知道这个流程应该是这样的,类似于事件分发机制成 U 字型的流程...... 本篇只是在 众多代码 中验证我的思路.................、

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343

推荐阅读更多精彩内容