RxJava2 源码总结

总感觉跟着源码走过程过段时间又会忘记,又得翻一遍源码,所以在第一次学习源码时,把领悟的关键点记录下来,以后回看只要稍微浏览下源码,就能迅速明白它的思想(也许以后还会有更深刻的理解😂)。

本学习源码基于 RxJava 2.1.7
源码地址:https://github.com/ReactiveX/RxJava

一,事件产生和消费--create()

首先,以最简单的使用方式来说:

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onComplete();
    }
}).subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.e(">>>>>>>>", "onSubscribe");
    }
    @Override
    public void onNext(Integer integer) {
        Log.e(">>>>>>>>", "onNext:" + integer);
    }
    @Override
    public void onError(Throwable e) {
         Log.e(">>>>>>>>", "onError:" + e.toString());
    }
    @Override
    public void onComplete() {
        Log.e(">>>>>>>>", "onComplete");
    }
});
// ObservableCreate#subscribeActual
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent); 
    source.subscribe(parent);
    ...
}
  • Observable 是个抽象类,而 Observable.create() 方法返回的对象 ObservableCreate(继承于Observable),可以看作是事件统一管理者。
  • ObservableCreate 在创建的时候保存了用于事件实际生产者的 ObservableOnSubscribe ,在事件订阅 Observable#subscribe() 时,传入了事件消费者 Observer
  • 在事件订阅时,ObservableCreate#subscribeActual() 方法中创建中间者 CreateEmitterCreateEmitter 把事件消费者 Observer 保存起来,然后调用事件生产者 ObservableOnSubscribe#subscribe() 方法传入 CreateEmitterCreateEmitter 是方法 subscribe() 的第一个参数的 ObservableEmitter 子类)。
  • 然后 CreateEmitter 调用自己的 onNext() 方法生产事件,在 onNext()方法中把事件传给消费者 Observer#onNext()
  • 由于 CreateEmitter 又实现了 Disposable ,在 ObservableCreate#subscribeActual() 方法中调用了事件消费者 Observer#onSubscribe() 把自己传入消费者,所以在消费者的回调方法中,可以实现终止事件传给消费者 dispose()(因为 CreateEmitter#onNext() 等方法中会首先判断 isDisposed())。

二,操作符--map()

Observable.create(...) 
        .map(new Function<Integer, String>() {
             @Override
             public String apply(Integer integer) throws Exception {
                 return String.valueOf(integer);
             }
         })
         .subscribe(···);
// ObservableMap#subscribeActual
public void subscribeActual(Observer<? super U> t) {
    source.subscribe(new MapObserver<T, U>(t, function));
}
  • map() 方法返回的是新的包装 ObservableObservableMap<T, U> (装饰着模式?),ObservableMap<T, U> 中保存了 create() 返回了上一节的 ObservableCreate
  • 然后调用 Observable#subscribe() 开始订阅事件,实际是首先调用 ObservableMap<T, U>#subscribeActual() 方法,该方法里再调用 create() 方法返回的 ObservableCreate#subscribeActual()
  • ObservableMap<T, U>#subscribeActual() 方法把传入的消费者 Observer 包装为 MapObserver ,再传给 ObservableCreate#subscribeActual() 开始生产事件。
  • 在新的 ObserverMapObserver 中,主要任务是把事件生产者生产的事件通过 Function 转化为对应的类型。

三,线程调度--subscribeOn()

Observable.create(...) 
        .subscribeOn(Schedulers.io())
        .subscribe(···);
// ObservableSubscribeOn#subscribeActual
public void subscribeActual(final Observer<? super T> s) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    s.onSubscribe(parent);
    parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
        @Override
        public void run() { 
            source.subscribe(parent);
        }
    }));
}
  • subscribeOn() 返回的又是新的包装 ObservableObservableSubscribeOn(和上一节很像),ObservableSubscribeOn自然又是保存了 create() 返回了的 ObservableCreate
  • 开始订阅事件时,实际是首先调用 ObservableSubscribeOn#subscribeActual() 方法,方法里又包装新的 ObserverSubscribeOnObserver
  • 通过 scheduler.scheduleDirect(runnable)subscribeOn() 指定的线程直接调用 run() 方法,方法里再调用 ObservableSubscribeOn 保存的最初的 ObservableObservableCreate#subscribeActual() 开始生产事件,然后返回一个 worker 调度管理者(实现 Disposable 接口)。
  • 新的包装 ObserverSubscribeOnObserver 把返回的 worker (子线程)加入 dispose() 管理。
  • 可以发现,当 subscribeOn() 多次调用时,最终只有 ObservableCreate 调用的 subscribeOn()(即第一个)起作用,因为每次 subscribeOn() 时新创建的包装 ObservableObservableSubscribeOn 都会保存上一个 Observable ,然后订阅时(在 run() 方法调用),会调用上一个 Observable.subscribe() ,一直调用到最开始的 ObservableObservableCreate 才开始切换线程并生产事件。

三,线程调度--observeOn()

Observable.create(...) 
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(···);
// ObservableObserveOn#subscribeActual
protected void subscribeActual(Observer<? super T> observer) {
    ...
    Scheduler.Worker w = scheduler.createWorker();
    source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); 
}
  • 也是和上两节类似,新的包装 ObservableObservableObserveOn
  • 创建一个 AndroidSchedulers.mainThread() 对应的 Worker 。
  • 创建新的包装 ObserverObserveOnObserver (又实现了Runnable)传给最开始的 ObservableCreate ,这样生产的事件会由 ObserveOnObserver
    对应的onXXX()处理。
  • onNext() 里,首先把生产的数据加入队列,然后切换会 observeOn() 指定的线程,最后才把数据取出来传给消费者。
  • 可以看到,每次切换线程后会立刻发送数据,所以调用 observeOn() 会生效多次,与 subscribeOn() 相反。
新年快乐,2018

本人水平有限,如有错误,欢迎批评指出😊

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容