RxJava2源码解析系列——1.使用流程解析

概述

Rx系列现在火遍全球,网上也纷纷涌现各类教程、博客。作为一个Android开发人员,我认为掌握RxJava已经成为了一项必不可少的专业技能,然而一眛的去看网上已有的教程和博客,并不能让自己深入理解RxJava,于是有了本系列,也当作为自己的一个总结。本系列章节打算从最常见的使用开始,然后进入源码具体分析。

使用

最简单的使用RxJava的一个例子,我们需要三个元素

  1. Observable(被观察者)
  2. Observer(观察者)
  3. subscribe(订阅关系)

废话不多说,就是上代码

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                Log.d(TAG, "subscribe");
                e.onNext("This is String");
                e.onNext("This is String");
                e.onComplete();
            }
        })
                .subscribe(new Observer() {
                    @Override
                    public void onSubscribe(@NonNull final Disposable d) {
                        Log.d(TAG, "onSubScribe");
                    }

                    @Override
                    public void onNext(@NonNull Object o) {
                        Log.d(TAG, "onNext");

                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
                        Log.d(TAG, "onError");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

1.首先我们需要创建Observable,创建Observable的操作符有很多,这里不一一写出,本文中先使用create操作符创建Observable对象。我们来看一下create方法:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
  1. create方法中需要一个ObservableOnSubscribe<T>类型参数
  2. 方法中最先使用ObjectHelper做了一次判空处理。
  3. 使用RxJavaPlugins做了某些事情
mark

ObservableOnSubscribe<T>是一个接口,因此需要实现当中的subscribe方法,而subscribe方法中存在ObservableEmitter<T>类型的参数。

mark
mark

ObservableEmitter<T>继承Emitter<T>接口,Emitter<T>接口中定义了我们熟知的onNext、onError、onComplete方法。

ObjectHelper是个工具类,在这就不多说了。

mark

RxJavaPlugins顾名思义,应该叫做插件类,具体的作用后续再做详解,这里大概说明一下,该类的onAssembly方法跟hook相关。而这里hook不影响我们主流程,因此传进去什么参数就返回什么(我们这里传进去的是ObservableCreate<T>对象),在ObservableCreate<T>中定义了具体订阅时的逻辑以及发射器的逻辑。

从这个流程我们看出,当我们使用create操作符创建一个Observable时,我们需要传入一个实现了ObservableOnSubscribe<T>接口的对象,在这个实现中,存在发射器ObservableEmitter<T>,通过它可以让使用者自由定义数据流向。并且在create操作符过程中,ObservableOnSubscribe<T>对象与ObservableCreate<T>相关联。

2.接下来我们需要创建Observer,Observer是个接口,因此我们只需要实现该接口即可。

new Observer() {
                    @Override
                    public void onSubscribe(@NonNull final Disposable d) {
                        Log.d(TAG, "onSubScribe");
                    }

                    @Override
                    public void onNext(@NonNull Object o) {
                        Log.d(TAG, "onNext");

                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
                        Log.d(TAG, "onError");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                }

创建Observer很简单,实现接口当中定义的4个方法即可。

  1. onSubscribe(@NonNull final Disposable d),当订阅时会回调该方法,Disposable 用来取消订阅关系。
  2. onNext(@NonNull Object o),发射数据
  3. onComplete(),当onNext方法全部执行完毕,执行该方法。
  4. onError(@NonNull Throwable e),当数据流向出现问题或使用者自己调用时执行。

3.完成订阅过程,首先我们来看看如何完成订阅的。

observable.subscribe(observer) 

当我们使用创建操作符create创建Observable时,还记得create方法返回的是什么类型吗?ok,就是Observable类型,但具体的实现类是ObservableCreate<T>。

进入Observable的subscribe方法看看:

mark
  1. 判空处理
  2. hook相关
  3. 执行subscribeActual(observer)
  4. 抛出异常

在这4个流程当中,subscribeActual(observer)方法才是我们应该关注的。在Observable类中,subscribeActual(observer)是个抽象方法,因此我们需要寻找它的具体实现。上面已经提到使用create操作符,具体的实现类是ObservableCreate<T>。

mark

在ObservableCreate<T>类的subscribeActual(observer)中,所声明的方法参数便是我们外部传进来的实现Observer接口的对象。

  1. 将Observer(观察者)与发射器相关联
  2. 调用observer的onSubscribe方法,为了方便观察者可随时解除订阅关系。
  3. 执行使用者自定义的subscribe方法中的逻辑,同时也将发射器与Observable(被观察者)做关联
  4. 发生异常,回调onError

因此,当执行到subscribeActual(observer)时,才是真正的订阅。

而当执行到

 source.subscribe(parent);  

将ObservableOnSubscribe(源头)与CreateEmitter(Observer,终点)联系起来。

这里的souce就是

mark

这里的parent就是 CreateEmitter<T>。因此,会执行parent.onNext(), parent.onComplete(),parent.onError()

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {
  
        private static final long serialVersionUID = -3434801548987643227L;
        final Observer<? super T> observer;
  
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            ...
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            ...
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }
        ...
    }

到这,最简单的一个使用RxJava的流程就结束了。

最后总结

1.创建Observable过程

  1. 需要传入一个实现了ObservableOnSubscribe<T>接口的对象
  2. 在ObservableOnSubscribe<T>实现中,通过ObservableEmitter<T>可以让使用者自由定义数据流向
  3. create操作符过程中,采用适配器模式,将ObservableOnSubscribe<T>通过ObservableCreate<T>适配为Observable<T>对象,让ObservableOnSubscribe<T>与ObservableCreate<T>相关联

2.订阅过程

  1. 真正订阅的方法在subscribeActual(Observer<? super T> observer)
  2. source.subscribe(parent); 这行代码执行时,才开始发射数据,在ObservableOnSubscribe<T>中通过ObservableEmitter<T>发送数据给Observer
  3. 当Observable与Observer订阅关系被dispose时,不会执行onXXX方法。
  4. Observer 的 onComplete() 和 onError() 互斥只能执行一次,因为CreateEmitter 在回调他们两中任意一个后,都会自动 dispose()。
  5. 先 error 后 complete,complete 不显示。 反之会 crash
  6. 还有一点要注意的是 onSubscribe() 是在我们执行 subscribe() 这句代码的那个线程回调的,并不受线程调度影响
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 195,783评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,360评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 142,942评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,507评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,324评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,299评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,685评论 3 386
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,358评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,652评论 1 293
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,704评论 2 312
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,465评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,318评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,711评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,991评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,265评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,661评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,864评论 2 335

推荐阅读更多精彩内容