RxJava浅析——事件如何从上游传递到下游

之前学过一阵子RxJava1.x,但没应用到项目中。最近在Android上使用一个Stomp协议的库,这里面用到了RxJava2,所以重新把RxJava给捡起来了。用了一阵子之后觉得光知其然而不知其所以然比较别扭,而且作为开发总是对源码充满着好奇。虽然源码可能晦涩难懂,逻辑千回百转,但如果能从中领悟一点架构设计的精妙之处或许就是值得的。所以就开始了这一段读RxJava2源码之路。

不在这里立任何的Flag!!!总之算是有一个开始。

怕事情说不清楚,所以语言会比较啰嗦,请大家见谅。

本文将会分析最简单最基础的RxJava2的源码——如何将事件从Observable(上游)传递到Observer(下游)。

先上代码(为了能看得清楚,这里不使用lambda写法),没错,本文就是解释一下这段代码会如何执行。

Observable
    .create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                e.onNext("abc");
            }
        })
     .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
            }
            
            @Override
            public void onNext(@NonNull String s) {
                Log.i(TAG, "onNext " + s);
            }
            
            @Override
            public void onError(@NonNull Throwable e) {
                Log.e(TAG, "onError", e);
            }
            
            @Override
            public void onComplete() {
                Log.i(TAG, "onComplete");
            }
        });

先把几个类罗列一下:

  • Observable:上游。事件的源头。
  • Observer:下游。事件的处理者。
  • ObservableEmitter:事件发送者。
  • Disposable:事件切断者。
  • ObservableOnSubscribe:一个接口。只是为了将ObservableEmitter返回给上游,以便发送事件。

使用过RxJava2的童鞋应该对ObservableObserver,还有这一串链式操作非常熟悉。

先看一下create()方法干了什么:

//代码位置:io.reactivex.Observable.java
//不是源码,是我简化后的代码。源码还有空检查,Hook方法的回调等。这些不影响整个逻辑,所以可以先忽略。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        return new ObservableCreate<T>(source);
}

create()方法很简单,只给我们返回了一个ObservableCreate对象。然后把我们传入的ObservableOnSubscribe对象(source)传入了ObservableCreate的构造方法。

ObservableOnSubscribe是一个接口。我们在create()方法的入参时给了一个匿名内部类的实现。也就是说这个ObservableOnSubscribe完全是我们外部给的实现。注意下这个subscribe()方法的入参是ObservableEmitter,这个是事件发送器,我们用这个来发送onNextonErroronComplete事件。

//代码位置:io.reactivex.ObservableOnSubscribe.java
public interface ObservableOnSubscribe<T> {
    void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}

接下来就要看ObservableCreate里面干了啥了。

//代码位置:io.reactivex.internal.operators.observable.ObservableCreate.java
public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
}

what??? 仅仅是保存了我们传入的source。然后就没有然后了。

所以说创建一个Observable的时候其实啥也没有发生,此时我们只有一个上游,没有下游,也没有连接上下游的管道,所以水流不通。我们已经知道下游是Observer了,那么管道是什么呢?

对,就是这个subscribe()方法。

注意注意,创建上游的时候我们返回的类型其实是ObservableCreate,从签名上可以看到这个类是Observable的子类。因为ObservableCreate没有重写父类的subscribe()方法,所以我们来看Observable中的subscribe()方法的实现。

//代码位置:io.reactivex.Observable.java
//已经化简,去掉了一些空检查等代码.
public final void subscribe(Observer<? super T> observer) {
        try {
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
        }
    }

核心代码就一行:subscribeActual(observer);

这个实现,不用说,当然是ObservableCreate给的。

//代码位置:io.reactivex.internal.operators.observable.ObservableCreate.java 
@Override
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);//Point 1
    observer.onSubscribe(parent); //Point 2
    try {
        source.subscribe(parent);//Point 3
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

这段代码也很简洁。重点就3句话。

先来讲Point 1 :创建了一个CreateEmitter,并把我们的下游(Observer)传入了。

这个CreateEmitter是什么鬼?

上文我们说到ObservableEmitter是事件发送器,然而他只是个接口,那么CreateEmitter是他的某个实现啦。

上代码:

//ObservableCreate的静态内部类。
static final class CreateEmitter<T>  extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable{
     final Observer<? super T> observer;

     CreateEmitter(Observer<? super T> observer) {
      this.observer = observer;
     } 
}

创建CreateEmitter也只是把我们传入的Observer保存下来。

接着讲Point 2:回调Observer.subscribe()方法,并把Point 1中创建的CreateEmitter传递出去。纳尼???Observer.subscribe()方法入参可是Disposable啊。果然这个CreateEmitter也实现了Disposable接口。所以说这个CreateEmitter既是事件发送器也是事件阶段器。这很好理解,最方便快捷的操作就是在事件发送的地方截断事件。

最后Point 3: source.subscribe(parent). 啥??? source是啥,奥,还记得创建Observable的时候唯一做的事情就是保存了我们传入的ObservableOnSubscribe对象么?对,现在他给我们回调了。并且把一个事件发送器传递给了我们。这句话就回调到了我们用匿名内部类去实现的ObservableOnSubscribe接口。

Observable
    .create(new ObservableOnSubscribe<String>() {//对对对,这个就是source
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                e.onNext("abc");//对对对,这个e就是parent,就是内部创建的CreateEmitter
            }
        })

所以呐,拿着这个事件发送器我们就可以发送事件啦!

上文的代码里只发送了一个事件:e.onNext("abc")

最后我们就可以来看调用onNext,onComplete,onError事件分别会发生什么了。

//代码位置:当然还是在CreateEmitter啦。因为调用的就是e.onNext("abc")嘛
@Override
    public void onNext(T t) {
      if (t == null) { //Point 1
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
      }
      if (!isDisposed()) {//Point 2
            observer.onNext(t);//Point 3
      }
    }

Point 1: 因为RxJava2不允许空对象出现啦,包括之前的代码里也有很多空检查的操作。这个要注意了。

Point 2: 就是先检测一下事件有没有被切断,如果被切断了(isDisposed()返回true),当然不能往下传了。

Point 3: 还记得我们创建CreateEmitter时,把我们外部创建的Observer传入了吗?对,就是这个observer。所以发现没,就这么调用到了Observer.onNext()

没错,事件就这样从传递过来啦。

可以想象,onError和onComplete事件也是类似的。

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t); //point 1
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);// point 2
                } finally {
                    dispose();  //point 3
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();//point 4
                } finally {
                    dispose();//point 5
                }
            }
        }


onError比较特殊,就是如果事件流已经被切断,再次发送onError,会触发point 1处的代码(具体不用挂心,反正就是会抛一个异常,UndeliverableException)。当然onError()方法传递的Throwable也不能为空!!!

Point 2 和 Point 4分别是回调外部Observer的onErroronComplete方法。这个回调跟onNext是一毛一样的。

从Point 3和Point 5来看,一旦发生onError或者onComplete事件,RxJava内部就会把事件流切断。切断的机制我们暂时不讲。我们知道CreateEmitter也是一个事件切断器,所以他当然会有dispose()方法去切断事件,isDispose()方法判断事件有没有被切断。

OK! 本文要讲的差不多了。最后附上一张类图,帮助理解和回忆。

RxJava2.png

ps:之所以我们有时候搞不清楚内部实现的逻辑,是因为对外暴露的永远都是接口,接口,接口,而内部的实现可能有很多其他角色在表演。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容