Rxjava原理解析

先看RxJava的简单使用及解析:

        //产生事件并返回Single对象,Single和Observable是一样的作用,不一样的地方是Single只回调onSuccess(),不会回调onError()
        Single<Integer> just = Single.just(1);
                //订阅
        just.subscribe(new SingleObserver<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                
            }

            @Override
            public void onSuccess(Integer integer) {
                        
            }

            @Override
            public void onError(Throwable e) {

            }
        });

以上是Rxjava的一个简单示例,第一步通过Single.just()发送一个事件,第二部调用subscribe()订阅事件。

先看第一步Single.just():

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Single<T> just(final T item) {
        ObjectHelper.requireNonNull(item, "value is null");
        //构建一个SingleJust返回
        return RxJavaPlugins.onAssembly(new SingleJust<T>(item));
    }

        //钩子方法,如果设置了onSingleAssembly,那么可以通过apply()对数据进行再加工,默认没设置onSingleAssembly,不必关注此方法
    public static <T> Single<T> onAssembly(@NonNull Single<T> source) {
        Function<? super Single, ? extends Single> f = onSingleAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

可以看到只是构造了一个SingleJust()对象,传入事件并返回。

那么第一步简单总结一下为:构造新的被观察者SingleJust,并传递事件

public final class SingleJust<T> extends Single<T> {

    final T value;

    public SingleJust(T value) {
        this.value = value;
    }



}

可以看到SingleJust在构造里记录了数据,并且它还有一个subscribeActual(),这个方法里回调了onSubscribe()及onSuccess();

那么简单总结一下,第一步Single.just(1)执行后,构造里一个SingleJust对象,并存储了数据。所以返回的Single其实已经是SingleJust对象了,那么被观察者对象已经切换为SingleJust

再看第二步just.subscribe():

    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(SingleObserver<? super T> observer) {
                //钩子方法,和上面一样,不必关注。
        observer = RxJavaPlugins.onSubscribe(this, observer);
        try {
            //调用subscribeActual,传入observer
            subscribeActual(observer);
        } catch (Throwable ex) {
            throw npe;
        }
    }

第二部是订阅,它实际执行了subscribeActual(),并传入了下游的观察者,由于这里已经是SingleJust,那么执行的就是它的subscrieActual():

    @Override
    protected void subscribeActual(SingleObserver<? super T> observer) {
        //此方法非核心,忽略
        observer.onSubscribe(Disposables.disposed());
        //执行观察者的onSuccess()
        observer.onSuccess(value);
    }

可以看到,最后实际执行了下游观察者的onSuccess()

总结:

以上就是RxJava的一个简单模型过程,当调用一个操作符后,被观察者对象就会改变,同时事件从上往下传递。当产生订阅关系时,下游观察者在上游被观察者的subscribeActual()中获取结果**

为了更好理解复杂的情况,这里再明确一个概念,下游观察者是当前被观察者调用subscribe()时传入的参数。

Rxjava中其他复杂的操作,其实就是操作符的改变及累加,只是在此模型上增加中间过程,如处理数据等。

稍复杂的RxJava使用及解析

        Single.just("1")
                .map(new Function<String, Integer>() {
                    @Override
                    public Integer apply(String s) throws Exception {
                        return Integer.valueOf(s);
                    }
                }).subscribe(new SingleObserver<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onSuccess(Integer integer) {

            }

            @Override
            public void onError(Throwable e) {

            }
        });

Single.just("1")这一行在上面分析过了,主要是把Single转换为SingleJust,并传递数据。那么再看第二行map():

    public final <R> Single<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        //创建了SingleMap对象,并传入了this,和mapper参数,this就是SingleJust对象,mapper处理数据用,用来回调apply()
        return RxJavaPlugins.onAssembly(new SingleMap<T, R>(this, mapper));
    }

可以看到,这一步又把SingleJust转换为了一个SingleMap,那么现在被观察者就转换为了SingleMap,那么现在可以把被观察者SingleJust称为上游被观察者,同时把上游被观察者和处理数据的mapper,存入了SingleMap中。

第三行开始订阅,在这里被观察者已经变成了SingleMap,那么会执行它的subscribeActual():

    @Override
    protected void subscribeActual(final SingleObserver<? super R> t) {
        //这里的source其实是SingleJust,然后订阅了观察者MapSingleObserver,由于观察者是由下方的订阅产生的,这里称观察者t为下游观察者
        source.subscribe(new MapSingleObserver<T, R>(t, mapper));
    }

如果这里回调观察者的onSuccess(),那么就和之前总结的没有区别,但是这里调用了source.subscribe(),由上游被观察者再次订阅。也就是说,产生订阅关系后,事件开始从下往上传递,对数据进行处理。

根据之前的代码source.subscribe(),最终会执行source的subscribeActual(),再执行到观察者的onSuccess(),也就是MapSingleObserver的onSuccess():

static final class MapSingleObserver<T, R> implements SingleObserver<T> {
                //观察者
        final SingleObserver<? super R> t;
                //处理数据对象
        final Function<? super T, ? extends R> mapper;
        @Override
        public void onSuccess(T value) {
            try {
                //这里调用apply()处理数据。
                v = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
              //如果出错调用onError()
                onError(e);
                return;
            }
                    //处理完成,下游观察者调用onSuccess()
            t.onSuccess(v);
        }
 
}

可以看到在onSuccess()中,先对数据进行处理,如果数据没出错,那么再把数据交给最终的观察者。

总结:根据上面的代码再完善一下简单模型,首先当发送一个事件后,事件开始从上游往下传递,传递过程中会由当前调用的操作符暂代被观察者功能,当传递完成后代表设置完成。然后开始订阅事件,发起订阅后那么事件开始从下往上传递,对之前的设置进行处理,最后处理完后成,事件再从上往下传递给最终的观察者。那么最终模型就是:

从上往下(开始传递事件,初始化操作符)—>从下往下(连接操作符,并进行部分设置)->从上往下(根据操作符对数据进行处理并返回)

根据模型,分析完整示例

        Single.just("1")
                .map(new Function<String, Integer>() {
                    @Override
                    public Integer apply(String s) throws Exception {
                        return Integer.valueOf(s);
                    }
                })
                //切换到io线程
                .subscribeOn(Schedulers.io())
                    //切换到主线程
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new SingleObserver<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onSuccess(Integer integer) {

            }

            @Override
            public void onError(Throwable e) {

            }
        });

这里只为分析过程,忽律当前代码并不需要切线程的操作。

第一步从上往下,初始化操作符:

map()分析过了,那么直接看subscribeOn():

    public final Single<T> subscribeOn(final Scheduler scheduler) {
        //构建一个SingleSubscribeOn返回,scheduler就是要被设置的参数
        return RxJavaPlugins.onAssembly(new SingleSubscribeOn<T>(this, scheduler));
    }

根据之前的分析,那么就是subscribeOn暂时接管了被观察者的职能,subscribeOn完成了设置,继续看observeOn():

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Single<T> observeOn(final Scheduler scheduler) {
        //构建一个SingleObserveOn返回,scheduler就是要被设置的参数
        return RxJavaPlugins.onAssembly(new SingleObserveOn<T>(this, scheduler));
    }

根据之前的分析,那么就是observeOn暂时接管了被观察者的职能,此时完成了第一步,事件从上到下传递的设置功能。

第二步从下往上,开始订阅,连接操作符

调用subscribe(),由于最后一个暂时接管被观察者功能的是observeOn,所以是由它进行的订阅,那么根据之前的代码可知,会执行SingleObserveOn的subscribeActual():

    @Override
    protected void subscribeActual(final SingleObserver<? super T> observer) {
        //调用上游订阅方法,这里的source是subscribeOn
        source.subscribe(new ObserveOnSingleObserver<T>(observer, scheduler));
    }

可以看到,调用了上游的subscribe(),那么会再执行上游的subscribeActual(),那么会来到SingleSubscribeOn的subscribeActual():

    @Override
    protected void subscribeActual(final SingleObserver<? super T> observer) {
        //构造一个SubscribeOnObserver
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer, source);
        observer.onSubscribe(parent);
                
        Disposable f = scheduler.scheduleDirect(parent);

        parent.task.replace(f);

    }

这里先看一下这个SubscribeOnObserver是什么:

    static final class SubscribeOnObserver<T>
    extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable, Runnable

可以看到它时一个Runnable对象,那么接上面的第二行:

//切换线程,这里具体实现不必深究,只需知道是完成了切换线程操作就行
Disposable f = scheduler.scheduleDirect(parent);

可以看到,这是把上面的Runnable对象parent,传了进去。这里面其实使用了Executors进行了线程切换操作

既然是Runnable,那么任务开始执行就会触发它的run():

        @Override
        public void run() {
          //这里的source是SingleMap
            source.subscribe(this);
        }

subscribeOn()的实际作用就是切线程, 那么它的设置就在这里生效。另外由于不管下游有什么设置,都会在这里进行切线程操作,然后再进行订阅,那么也就可以得出一个结论:无论subscribeOn()设置多少次,只有在第一次有效,因为从下往上传播最终都会回到第一次设置的地方进行切线程操作

可以看到,继续往上传递,那么会来到SingleMap的subscribeActual():

    @Override
    protected void subscribeActual(final SingleObserver<? super R> t) {
        //这里的source是SingleJust
        source.subscribe(new MapSingleObserver<T, R>(t, mapper));
    }

还是继续往上传递,来到SingleJust的subscribeActual():

    @Override
    protected void subscribeActual(SingleObserver<? super T> observer) {
        observer.onSubscribe(Disposables.disposed());
        //这里的observer是MapSingleObserver
        observer.onSuccess(value);
    }

最终调用了观察者的onSuccess(),那么这里完成了第二部,从下往上,对数据进行相应的操作处理

第三步,从上往下,处理数据

根据之前的总结:下游观察者是当前被观察者调用subscribe()时传入的数(之后的推论同理,不再强调),那么最后传入的参数的map()操作符中的MapSingleObserver,那么就会执行它的onSuccess():

        @Override
        public void onSuccess(T value) {
            R v;
            try {
                v = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                onError(e);
                return;
            }
                //这里的t是MapSingleObserver
            t.onSuccess(v);
        }

可以看到,这里首先调用了apply(),让用户对数据进行处理,那么操作符map,已经完全发挥了作用。继续往下,根据之前的总结可得知t是subscribeOn()操作符中的SubscribeOnObserver,再看它的onSuccess:

        @Override
        public void onSuccess(T value) {
          //这里的downstream是ObserveOnSingleObserver
            downstream.onSuccess(value);
        }

这里没有操作,直接向下传递是因为之前已经分析过,它的作用是切线程,并且已经切过了。上面分析过。那么继续往下,downstream是observeOn()操作符的ObserveOnSingleObserver(),再看它的onSuccess::

        public void onSuccess(T value) {
            this.value = value;
            //切换线程
            Disposable d = scheduler.scheduleDirect(this);
            DisposableHelper.replace(this, d);
        }

observeOn的作用也是切换线程,所以这里也进行了切线程操作,并把this作为参数,那么就是看它自身的run():

        @Override
        public void run() {
            Throwable ex = error;
            if (ex != null) {
                downstream.onError(ex);
            } else {
                 //这里的downstream是最终的观察者
                downstream.onSuccess(value);
            }
        }

这里可以看到,切换线程后将结果交给了最终的观察者。完成了整个流程。observeOn()的线程是多次有效的,从上面的代码可以看出,切完线程后继续将结果传递给下游观察者,假如继续调用observeOn(),那么就会继续切线程,结果也是在下游的run()中执行,所以切线程有效。

总结:

RxJava总体流程可以概括为以下三步,复杂的操作只是增加中间环节,以及中间环节的各种细化处理

  • 第一步流程-从上往下,初始化操作符,对所有操作符进行初始化
  • 第二步流程-从下往上,开始订阅,并连接操作符,对连接的操作符进行设置,此例中主要是异步请求时切换线程,
    • subscribeOn()的设置只有第一次生效,因为在subscribeOn中会切换线程,然后进行订阅。这时的流程时从下往上,最终都会回到第一次的设置。所以之前不管切到哪个线程,最终又会由第一次的设置切回它所设置的线程。
  • 第三步流程-从上往下,处理数据,获得数据后由各个操作符对数据进行加工处理,最终传递给最终的观察者。
    • observeOn()每次设置都会起效果,因为observeOn()发挥作用是最后一步从上往下的过程中,所以它每一次切完下次,然后再传递到下一层,下一次同样可以切线程操作。所以如果要对结果再次进行切线程操作,可多次使用observeOn()
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,098评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,213评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,960评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,519评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,512评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,533评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,914评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,804评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,563评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,644评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,350评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,933评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,908评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,146评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,847评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,361评论 2 342

推荐阅读更多精彩内容

  • 基本使用 添加依赖 定义Api请求接口仓库 发起网络请求 框架结构 RxJava的整体结构是一条链 链的最上游:生...
    Hsicen阅读 1,408评论 0 1
  • 初学RxJava,对其许多的API颇感神奇,所以RxJava的原理充满了兴趣。正好最近教父大头鬼也出了一篇RxJa...
    alighters阅读 28,120评论 2 28
  • 前言 Rxjava是NetFlix出品的Java框架, 官方描述为 a library for composing...
    kamisamer阅读 915评论 0 1
  • 创建操作符 操作符使用 基本创建create() 完整创建1个被观察者对象(Observable) 快速创建,发送...
    帝王鲨kingcp阅读 1,503评论 0 1
  • 今天下午在学校把儿子老师发的试题整理一下,没找到部编一,就先弄了二三四和数学老师发的一份,一共17页,感觉有点多,...
    冰园哲月阅读 130评论 0 3