RxJava 订阅流程源码解析

简单示例

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d("XXW", "subscribe");
                emitter.onNext(1);
                emitter.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer s) {
                LogUtil.d("onNext " + s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

打印结果

D/XXW: onSubscribe  Thread Name : main
D/XXW: subscribe
D/XXW: subscribe Thread Name : RxCachedThreadScheduler-1
D/XXW: onNext 1
D/XXW: onNext 2
D/XXW: onNext 3
D/XXW: onNext 4

RxJava 之所以在我们Android开发者的圈子 如此火爆 主要是因为它的链式结构,切换线程之简便而闻名. 所以学习它的源码也是很有必要的事情

订阅流程

从基本的RxJava使用套路, 我们可以发现 是被观察者订阅观察者, 直白一点就是说, 被观察者有动作之后就会被发送给观察者, 所以我们从代码的角度的话 就先去看看subscribe(observer observer)这个方法


public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
             //抽象方法 进行订阅的关键方法
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }


//Observable类 的抽象方法
protected abstract void subscribeActual(Observer<? super T> observer)
  1. 可以看出subscribe方法 是一个被观察者(Observable) 的方法, 而且 这个方法只做了一件事 , 订阅观察者(observer) . 但是这个方法又是一个抽象方法, 从面向对象的角度出发, 那么必然是子类实现这个方法进行处理了. 所以我们先要找到这个子类. 所以回到RxJava基本代码来找这个子类
//通过构建者模式,返回一个Observable对象
 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d("XXW", "subscribe");
                Log.d("XXW", "subscribe Thread Name : " + Thread.currentThread().getName());
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onNext(4);
                emitter.onComplete();
            }
        })

通常,我们会在create方法, 传入一个ObservableOnSubscribe 接口的参数. 因为ObservableOnSubscirbe是一个接口, 所以要看create方法

//create方法会返回Observable类
 public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

//进行
 public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

create方法 直接返回RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)),所以我接着看这个方法内部如何实现,onAssembly 方法里 对onObservableAssembly 进行了判空,如果不为空 则返回apply(f,source), 但是我们查看源码, 发现这个对象默认是空对象, 所以一般就会直接返回我们的ObservableCreate 对象, 我们再看看ObservableCreate类的实现


public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        1.包装Observer类,进CreateEmitter类
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        2.调用observer的onSubscribe方法
        observer.onSubscribe(parent);

        try {
        3.调用subscribe方法
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

这个时候我们发现Observable是Observable的子类, 我们之前查看subscribe的源码发现, 它会调用子类的subscribeActual方法. 我们来看subscribeActual做了如上3点, 所以之前在示例中,我们先打印的会是OnSubscibe方法里的Log, 因为之前从create方法传进来的就是我们ObservableOnSubscribe接口, 所以在这里他会调用自己的回调方法subscibe. 我们在外面回调方法中就会通过CreateEmitter这个包装类,进行onNext,onComplete操作. 所以我们这时候要先看看CreateEmitter类.

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {


        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        //传进Observer对象
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }


        //ObservableOnSubscibe 回调方法中调用的oNext
        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }

        //ObservableOnSubscibe 回调方法中调用的oNext
        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

        @Override
        public void setDisposable(Disposable d) {
            DisposableHelper.set(this, d);
        }

        @Override
        public void setCancellable(Cancellable c) {
            setDisposable(new CancellableDisposable(c));
        }

        @Override
        public ObservableEmitter<T> serialize() {
            return new SerializedEmitter<T>(this);
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
    }

这时候我们发现, 平常我们调用的onNext等方法 其实都是走的这个包装类CreateEmitter类, 我们发现这个类实现了Disposable的接口并继承了AtomicReference类. 我们平时如果要切断 rxjava的消息, 一般都是调用Disposeable的dispose方法. 所以我们先看看dispose方法在CreateEmitter的实现,

DisposableHelper.dispose(this);

 public static boolean dispose(AtomicReference<Disposable> field) {
        //1.当前Disposeable
        Disposable current = field.get();
        //2. 断开的Disposeable
        Disposable d = DISPOSED;
        //如果当前Disposeable不是断开
        if (current != d) {
        //将当前的disposeable 设置为断开
            current = field.getAndSet(d);
            if (current != d) {
                if (current != null) {
                    current.dispose();
                }
                return true;
            }
        }
        return false;
    }

 public static boolean isDisposed(Disposable d) {
        return d == DISPOSED;
    }

当我们调用dispose方法后, 会判断当前是否断开订阅, 如果订阅了, 如果订阅了则会调用断开连接, 其中的原子性设置 我没有太明白, 以后弄明白再补充, , isDispose方法就很明白了, 来判断我们是否已经断开. 所以当我们上游调用了onNext方法

 public void onNext(T t) {
             //先判空
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            //消息没有切断才会调用onbserver的onNext方法把消息发送到下游
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

关于onError()和 onComplete() 为何只会执行一个, 源码如下

 @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

通过源码 我们可以发现这两个方法 调用方式都是一样的, onError只是多了一个异常的判断.
因为之前我们分析dispose方法后 我们可以知道, 调用过这个dispose方法后, 会切断消息, 所以!isDisposed方法肯定就会返回false了, 所以这就只有一个方法能执行的原因.

总结:

通过上面一系列分析,我们就能大概明白了 RxJava的订阅流程了, 会发现大神写的代码 封装性,扩展性都特别好, 而且设计模式也用了很多, (构建者, 装饰模式) 这些在我们平时写代码的时候也可以使用来提升我们技术!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,088评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,715评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,361评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,099评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,987评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,063评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,486评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,175评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,440评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,518评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,305评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,190评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,550评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,152评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,451评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,637评论 2 335