RxJava_整体流程分析

一、RxJava2 整体功能分析

下面这段代码很简单,就是事件源会在当前线程通过 e.onNext() 的方式发送 "1","2","3" 三个事件,最后发送 e.onComplete() 第四个事件,那么在订阅者 Observer 中就可以收到这个几个由事件源发送的事件。接下来通过源码的角度分析下面这段代码的整体逻辑

RxJava2_执行流程分析图.png

在分析代码之前需要明白一个原则,那就是了解一个类首先先了解这个的顶层接口,通过顶层接口就可以明白这个类的框架体系的大体功能了,子类只是对这个体系的功能扩展而已。这就好比学习集合框架一样,我们首先会去了解 Collection 接口内部的所有的方法,知道了这些方法之后,我们心里就大概知道这个 Collection 体系大概的功能了,然后再慢慢的去了解它的实现类对这些功能的具体实现。

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        e.onNext("1");
        e.onNext("2");
        e.onNext("3");
        e.onComplete();
    }
}).subscribe(new Observer<String>() {
    private Disposable mD = null;
    @Override
    public void onSubscribe(Disposable d) {
        mD = d;
    }
    @Override
    public void onNext(String s) {
        if ("2".equals(s)) {
            mD.dispose();
        }
        System.out.println("s = " + s);
    }
    @Override
    public void onError(Throwable e) {
        System.out.println(e.toString());
    }
    @Override
    public void onComplete() {
        System.out.println("onComplete");
    }
});

二、Observable 的继承关系

Observable 是一个抽象类,是 ObservableSource 的实现类,而 ObservableSource 类是一个接口,它表示事件源。内部只有一个方法 subscribe 该方法表示通过 Observer 订阅当前的事件源。那么事件发布的事件,在 Observer 订阅者中就会被收到。了解了 Observable 的顶层接口之后,我们就知道该体系最重要的一个功能那就是 subscribe 方法了,因此我们就重点关注子类的 subscribe 方法。

public interface ObservableSource<T> {
    void subscribe(Observer<? super T> observer);
}

跟踪 Observable 中 subscribe 的调用关系,最后可以知道最终会调用到一个方法第 31 行代码 subscribeActual(observer); 期间做了多次转换操作,这些我们不用管。我说过现在分析是整体流程,所以没有必要去分析细枝末节的东西,不然会迷失方向的。所以大胆的得出一个结论,只要是 ObservableSource 的子类,那么我们只要关心 subscribeActual(observer); 这个方法就好的。

public abstract class Observable<T> implements ObservableSo
urce<T> {
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Disposable subscribe() {
        return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }
}

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
        Action onComplete, Consumer<? super Disposable> onSubscribe) {
    ObjectHelper.requireNonNull(onNext, "onNext is null");
    ObjectHelper.requireNonNull(onError, "onError is null");
    ObjectHelper.requireNonNull(onComplete, "onComplete is null");
    ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
    LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
    subscribe(ls);
    return ls;
}

public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);
        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
        //这段代码是核心代码。
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        RxJavaPlugins.onError(e);
        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS")
        npe.initCause(e);
        throw npe;
    }
}

三、Observable#create(ObservableOnSubscribe)

我们在 create 方法中传入一个 ObsevableOnSubscribe 对象,而这个对象就是一个 Observable 的父类。而 create 方法顾名思义就适用于创建 Observable 对象的。

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    //非空校验
    ObjectHelper.requireNonNull(source, "source is null");
    //内部就是创建一个 ObservableCreate 对象
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

四、ObservableCreate

上面的 create 方法中内部实际返回的是一个 ObservableCreate 对象,而这个类实际上就是 Observable 的子类。通过构造方法方法可以知道当前创建的 ObservableCreate 内部维护了上一级创建的 ObsevableOnSubscribe 对象,这个对象就是用户在 create 方法传入的对象。这里很重要,因为下面每一级都会创建一个新的 Observable 对象,内部都会保存上一级的 ObservableOnSubscribe 对象。如果不太理解的话,先放下,等下面分析了应该就会明白了。到这里我们就知道 Observable.create() 方法会返回一个 Observable 类型的对象。

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;
    public ObservableCreate(ObservableOnSubscribe<T> 
    source) {
        //内部保存了上一级创建的 ObservableOnSubscribe 对象的引用。
        this.source = source;
    }
}

五、触发 subscribe 方法

这个方法大家都知道,就是用来发生订阅关系的。在 RxJava 中事件源 Observable 只有发生了订阅才会发送事件。我们知道刚才通过 create 方法的分析可以知道,内部是创建了 ObservableCreate 这个 Observable 子类的,那么就分析 ObservableCreate 的 subscribe 的内部实现即可。

  • ObservableCreate#subscribeActual

在上面已经分析过了,只要是 Observable 类型的对象,在调用 subscribe(observer) 最终都会调用调 subscribeActual(observer) 方法。

@Override
//subscribe 方法内部会调用 subscribeActual
protected void subscribeActual(Observer<? super T> observer) {
    //发射器
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    //回调给 observer#onSubscribe
    observer.onSubscribe(parent);
    try {
        //告诉上一级的 observable 你可以发送事件了。
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}
  • 分析事件源是如何发送事件的?

在文章开头,我们在 ObservableOnSubscribe#subscribe 方法内部发送的了 4 个事件。那么这个 ObservableOnSubscribe#subscribe(ObservableEmitter) 是在哪里调用的呢?还记得 ObservableCreate 类中的 subscribeActual 的实现吗?它的内部调用 source.subscribe(parent); 这个方法,目的就是将发射器 CreateEmitter 传递给上一级创建的 ObservableOnSubscribe 对象。

@Override
//subscribe 方法内部会调用 subscribeActual
protected void subscribeActual(Observer<? super T> observer) {
    //发射器
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    //回调给 observer#onSubscribe
    observer.onSubscribe(parent);
    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

这样上面这 4 个事件就可以通过 ObservableEmitter 对象发送了,由于多态的原理,实际上是由 CreateEmitter 去发送这四个事件的。

**CreateEmitter 就是上面描述的 Emitter 的实现类。 **

//发射器
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//将发送器对象传入给上一级创建的 ObservableOnSubscribe 对象,其实也就类似于接口回调的方式去通知 Observable 您的订阅者 Observer 已准备好了,您可以发送事件了。
source.subscribe(parent);

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
       e.onNext("1");
       e.onNext("2");
       e.onNext("3");
       e.onComplete();
    }
});
  • ObservableCreate#CreateEmitter

这个类是一个发射器,它是 Emitter 的实现类,主要用于发射事件的。内部封装了 Observer 对象,这个 Observer 就是通过 subscribe(observer) 参数传入的 observer 对象,那么在 CreateEmitter 中调用 onNext,onError,onComplete 方法内部都去调用该 observber 对象对应的 onNext(t),onError(t),onComplete() 方法。这样就实现了事件源 Emitter 发送事件,在订阅者 Observer 收到事件了。

static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
    final Observer<? super T> observer;
    CreateEmitter(Observer<? super T> observer) {
        this.observer = observer;
    }
    @Override
    public void onNext(T t) {
        //onNext 的参数不能为 null
        if (t == null) {
            return;
        }
        if (!isDisposed()) {
            //回调 observer 对应的方法
            observer.onNext(t);
        }
    }
    @Override
    public void onError(Throwable t) {
        //onError 的参数不能为 null
        if (t == null) {
            t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."
        }
        if (!isDisposed()) {
            try {
                //回调 observer 对应的方法
                observer.onError(t);
            } finally {
                dispose();
            }
        } else {
            RxJavaPlugins.onError(t);
        }
    }
    @Override
    public void onComplete() {
        if (!isDisposed()) {
            try {
                //回调 observer 对应的方法
                observer.onComplete();
            } finally {
                dispose();
            }
        }
    }
    @Override
    public void setDisposable(Disposable d) {
        DisposableHelper.set(this, d);
    }
    @Override
    public void setCancellable(Cancellable c) {
        setDisposable(new CancellableDisposable(c));
    }
    @Override
    public ObservableEmitter<T> serialize() {
        return new SerializedEmitter<T>(this);
    }
    @Override
    public void dispose() {
        DisposableHelper.dispose(this);
    }
    @Override
    public boolean isDisposed() {
        return DisposableHelper.isDisposed(get());
    }
}
  • Emitter

发射器顶层接口,定义 onNext,onError,onComplete 方法。

public interface Emitter<T> {

    /**
     * Signal a normal value.
     * @param value the value to signal, not null
     */
    void onNext(T value);

    /**
     * Signal a Throwable exception.
     * @param error the Throwable to signal, not null
     */
    void onError(Throwable error);

    /**
     * Signal a completion.
     */
    void onComplete();
}

  • Disposable 的作用

Disposable 可以理解为一个事件源和订阅者的一个连接器,当调用 dispose() 方法之后,这个连接器就关闭了,那么事件源将不会往该订阅者 observer 发送事件了。isDisposed() 就是用于判断该连接器是否被中断了。

public interface Disposable {
    /**
     * Dispose the resource, the operation should be idempotent.
     */
    void dispose();
    /**
     * Returns true if this resource has been disposed.
     * @return true if this resource has been disposed
     */
    boolean isDisposed();
}
  • Disposable 的使用

还是回到 ObservableCreate 这个类的 subscribeActual 方法,这个方法中是发生订阅的时候调用的。在其内部有这段代码
observer.onSubscribe(parent); 这个 parent 就是先前创建的 CreateEmitter 对象,从上面的源码可以看到该类实现了 Emitter 接口外,还实现了 Disposable 接口。那么在外部的Observer 中的 onSubscribe 这个方法可以收到 Disposable 对象,那么用户就可以在适当的时候进行关闭连接器操作了。下面的代码示例中,在 onNext 方法中当收到的事件为 "2" 时,那么就调用 dispose() 关闭连接器。而关闭之后事件源在发送下一个事件的时候就会判断该连接器是否是关闭的,具体代码看 CreateEmitter#onNext 方法,它内部会判断 if (!isDisposed()) 判断。如果已经关,那么将不会再往该 Observer 发送事件了。

//CreateEmitter 类继承结构
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable 

//开始订阅
.subscribe(new Observer<String>() {
    private Disposable mD = null;
    //onSubscribe 方法用于接收一个 Dispoable 对象。
    @Override
    public void onSubscribe(Disposable d) {
        mD = d;
    }
    @Override
    public void onNext(String s) {
        //当接收到的事件为 "2" 时,那么就关闭连接器。
        if ("2".equals(s)) {
            mD.dispose();
        }
        System.out.println("s = " + s);
    }
    @Override
    public void onError(Throwable e) {
        System.out.println(e.toString());
    }
    @Override
    public void onComplete() {
        System.out.println("onComplete");
    }
});

六、总结

1、在 RxJava 中最重要的就是每一次 Observable 的创建都会保存上一级的创建的 Observable 对象,这个有什么用呢?其实每一个 Observable 都要进行 subscribe 发生订阅关系的。在当前 Observable 调用了 subscribe 之后,还需要调用上一级创建的 Observable.subscribe() 进行订阅,这样一级级往上发生订阅关系。这个作用是可以在下一节分析线程切换时就用体现了,到时再分析咯。
2、分析整体流程不要在意细枝末节,先接触顶层接口,了解体系功能。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342

推荐阅读更多精彩内容