我是最简单的操作符-----Create

严格来说,create应不算操作符,但是第一篇文章我还是希望能先以最简单的入门。这样再去学习接下来的操作符的话会更加简单,由浅入深。

来段小代码
class ObservaleActivity : AppCompatActivity() {
    var sub: Subscription? = null
    var observable: Observable<String>? = null
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_common)
        btSub.setOnClickListener({
            /**
             * 暂时都不考虑 onComplete onError 只做onNext的处理,由浅入深
             */
            observable?.subscribe({ msg ->
                tvContent.text = tvContent.text.toString() + "\n" + msg
            })
        })
        /**
         * 最简单的使用方式
         */
        observable = Observable.create(object : Observable.OnSubscribe<String> {
            override fun call(t: Subscriber<in String>) {
                t.onNext("Test1")
            }
        })
    }

  //这里需要去解除注册,防止内存泄漏
    override fun onDestroy() {
        super.onDestroy()
        sub?.unsubscribe()
    }
}

这里直接先贴出整个Activity的代码,给大家一个印象,在以后的话就不再直接贴全代码,而是贴关键代码。

注意:一般来说,是需要在onDestroy的时候去unsubscribe,防止内存泄漏


先了解重要的接口和类

在阅读源码的时候,我get到了几个重要的方法

1. 不需要每行都去搞懂
2. 抓住最重要的类和接口去搞懂

这里比较重要的接口和类分别是:

接口或类名
OnSubscribe 实现了Action接口,只有一个call方法,在subscribe时候才会去调用
Subscriber 调用subscribe所传的参数,里面是onNext,onComplete,onError等方法
Observable 所有的操作符都在这个类里面,我认为最重要的其实是OnSubscribe和Subscriber,而Observable其实是作为一个桥梁来链接这2者
Subscription 这个是调用subscribe方法的返回值,用来取消这次订阅

我阅读RxJavau 源码的时候,最让我头疼的一个地方其实就是它的命名,很多名字起的都起的差不多,所以搞得很乱。

这里我专门提出这4个接口或类,只要能把这几个理清楚,其实就差不多。下面我们深入源码去理解。

看看源代码

Observable.create方法

Observable

    public static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(RxJavaHooks.onCreate(f));
    }

RxJavaHooks

public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
        Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate;
        if (f != null) {
            return f.call(onSubscribe);
        }
        return onSubscribe;
    }

这2段代码涉及到了刚才提到的Observable,OnSubscribe
Observable.create(OnSubscribe)其实只是new了一个Observable然后把OnSubscribe作为参数传入,这里的RxJavaHooks大家可以先忽略,这个主要是做全局处理使用的,如果没有全局设置的话,onObservableCreate = null,会直接在第二个方法直接返回你传入的OnSubscribe参数。

Observable.subscribe方法

Observable

 public final Subscription subscribe(final Action1<? super T> onNext) {
        if (onNext == null) {
            throw new IllegalArgumentException("onNext can not be null");
        }

        Action1<Throwable> onError = InternalObservableUtils.ERROR_NOT_IMPLEMENTED;
        Action0 onCompleted = Actions.empty();
        return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));
    }

 public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }


static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
        ...
        subscriber.onStart();
        if (!(subscriber instanceof SafeSubscriber)) {
            subscriber = new SafeSubscriber<T>(subscriber);
        }
        try {
            RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
            return RxJavaHooks.onObservableReturn(subscriber);
        } catch (Throwable e) {
            ...
            return Subscriptions.unsubscribed();
        }
    }

subscribe重载方法比较多,我把我们demo中涉及到的3个重载列出。
在demo中我们使用的是直接传入了一个Action对象处理了onNext的事件,在源码中,Observable会把我们的Action包装成一个Subscriber对象,添加上onError,onComplete等方法的默认实现。

最终调用的其实是subscribe的第三个重载方法。
首先判断下参数Subscriber对象是否是SafeSubscriber对象的子类,如果不是,那么就重新new一个SafeSubscriber替换掉当前Observable对象的subscriber变量。

注意:这里的SafeSubscriber大家可以先忽略,我们先看完完整的流程,在最后再为大家来看看SafeSubscriber

最重要的代码

...
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
...

RxJavaHooks

 public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) {
        Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart;
        if (f != null) {
            return f.call(instance, onSubscribe);
        }
        return onSubscribe;
    }

这里又出现了RxJavaHooks,大家可以直接忽略看最后返回的参数是什么,直接返回的是传入的参数OnSubscribe对象。这里的RxJavaHooks也是用来做全局处理的作用,所以我们可以先忽略,简单的来看整个流程。

所以上面所提到的最终的代码,其实可以简化为

observable.onSubscribe.call(subscriber)

大家可以看到,其实这就是最关键的代码,也可以看出 Observable这个对象的作用,其实就是用来链接OnSubscribeSubscriber对象。

那么我们重新来看看Activity中的关键的代码

...
observable?.subscribe({ msg ->
                tvContent.text = tvContent.text.toString() + "\n" + msg
            })
...
observable = Observable.create(object : Observable.OnSubscribe<String> {
            override fun call(t: Subscriber<in String>) {
                t.onNext("Test1")
            }
        })
...

应该对整个流程有一个简单的认识了。


附加知识点

SafeSubscriber

 public void onCompleted() {
        if (!done) {
            done = true;
            try {
                actual.onCompleted();
            } catch (Throwable e) {
           ...
          }
        }
    }

    @Override
    public void onError(Throwable e) {
        if (!done) {
            done = true;
            _onError(e);
        }
    }

    @Override
    public void onNext(T t) {
        try {
            if (!done) {
                 actual.onNext(t);
              }
        ...
    }

主要作用: 对原有的Subscriber对象包装一下。前面刚学RxJava的时候,看到有些文章说,onErroronComplete只有一个会运行,其实从这里可以看出。


总结

RxJava中核心的设计模式我认为是装饰者模式,所以在阅读源码的时候也非常的累,因为RxJava的链式结构,大家可以想象下如果4,5个操作符链式书写,然后一步步跟入源码会很难受的,完全不知道当前是哪个OnSubscribeSubscriber

当然只要我们能搞清楚1层结构的流程,那么后面的3,4层结构也能理解,只是稍微麻烦点而已。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343

推荐阅读更多精彩内容