Rxjava2与1的差异,新东西介绍<2>

  1. RxJava 2.x与RxJava 1.x的差异
  • Nulls
    这是一个很大的变化,熟悉 RxJava 1.x 的童鞋一定都知道,1.x 是允许我们在发射事件的时候传入 null 值的,但现在我们的 2.x 不支持了,不信你试试? 大大的 NullPointerException 教你做人。这意味着 Observable<Void> 不再发射任何值,而是正常结束或者抛出空指针。

  • Flowable
    在 RxJava 1.x 中关于介绍 backpressure 部分有一个小小的遗憾,那就是没有用一个单独的类,而是使用 Observable 。而在 2.x 中 Observable 不支持背压了,将用一个全新的 Flowable 来支持背压。
    或许对于背压,有些小伙伴们还不是特别理解,这里简单说一下。大概就是指在异步场景中,被观察者发送事件的速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。感兴趣的小伙伴可以模拟这种情况,在差距太大的时候,我们的内存会猛增,直到OOM。而我们的 Flowable 一定意义上可以解决这样的问题,但其实并不能完全解决,这个后面可能会提到。

  • Single/Completable/Maybe
    其实这三者都差不多,Single 顾名思义,只能发送一个事件,和 Observable接受可变参数完全不同。而 Completable 侧重于观察结果,而 Maybe 是上面两种的结合体。也就是说,当你只想要某个事件的结果(true or false)的时候,你可以使用这种观察者模式。

  • 线程调度相关
    这一块基本没什么改动,但细心的小伙伴一定会发现,RxJava 2.x 中已经没有了 Schedulers.immediate() 这个线程环境,还有 Schedulers.test()。

  • Function相关
    熟悉 1.x 的小伙伴一定都知道,我们在1.x 中是有 Func1,Func2.....FuncN的,但 2.x 中将它们移除,而采用 Function 替换了 Func1,采用 BiFunction 替换了 Func 2..N。并且,它们都增加了 throws Exception,也就是说,妈妈再也不用担心我们做某些操作还需要 try-catch 了。

  • 其他操作符相关
    如 Func1...N 的变化,现在同样用 Consumer 和 BiConsumer 对 Action1 和 Action2 进行了替换。后面的 Action 都被替换了,只保留了 ActionN。

  1. RxJava 2.x 拥有了新的特性,其依赖于4个基础接口,它们分别是
  • Publisher
  • Subscriber
  • Subscription
  • Processor

其中最核心的莫过于 PublisherSubscriberPublisher 可以发出一系列的事件,而 Subscriber 负责和处理这些事件。

其中用的比较多的自然是 PublisherFlowable,它支持背压。关于背压给个简洁的定义就是:

背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。

简而言之,背压是流速控制的一种策略。有兴趣的可以看一下官方对于背压的讲解

可以明显地发现,RxJava 2.x 最大的改动就是对于 backpressure 的处理,为此将原来的 Observable 拆分成了新的 ObservableFlowable,同时其他相关部分也同时进行了拆分,但令人庆幸的是,是它,是它,还是它,还是我们最熟悉和最喜欢的 RxJava。
RxJava API文档
Flowable示例:

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                while (true){
                    e.onNext(1);
                }
            }
        })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Thread.sleep(2000);
                System.out.println(integer);
            }
        });

处理Backpressure的策略仅仅是处理Subscriber接收事件的方式,并不影响Flowable发送事件的方法。即使采用了处理Backpressure的策略,Flowable原来以什么样的速度产生事件,现在还是什么样的速度不会变化,主要处理的是Subscriber接收事件的方式。
Backpressure的策略:

ERROR

这种方式会在产生Backpressure问题的时候直接抛出一个异常,这个异常就是著名的MissingBackpressureException。

我们先以代码示例介绍一下Flowable相比与Observable新的东西。

Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "emit 1");
                emitter.onNext(1);
                Log.d(TAG, "emit 2");
                emitter.onNext(2);
                Log.d(TAG, "emit 3");
                emitter.onNext(3);
                Log.d(TAG, "emit complete");
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR); //增加了一个参数

        Subscriber<Integer> subscriber = new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                Log.d(TAG, "onSubscribe");
                s.request(Long.MAX_VALUE);  
            }
            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: " + integer);

            }
            @Override
            public void onError(Throwable t) {
                Log.w(TAG, "onError: ", t);
            }
            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        };
        flowable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber);

01-20 16:18:58.898 17829-17851/? D/MainActivity: emit 1

01-20 16:18:58.898 17829-17851/? D/MainActivity: emit 2

01-20 16:18:58.898 17829-17851/? D/MainActivity: emit 3

01-20 16:18:58.898 17829-17851/? D/MainActivity: emit complete

01-20 16:18:58.908 17829-17829/? D/MainActivity: onNext: 1

01-20 16:18:58.908 17829-17829/? D/MainActivity: onNext: 2

01-20 16:18:58.908 17829-17829/? D/MainActivity: onNext: 3

01-20 16:18:58.908 17829-17829/? D/MainActivity: onComplete

上述代码创建了一个Flowable(被观察者)和一个Subscriber(观察者),可以看到程序如我们预期的一样输出结果了。不同的是 onSubscribe(Subscription s)中传给我们的不再是Disposable了, 而是Subscription。然而Subscription也可以用于切断观察者与被观察者之间的联系,调用Subscription.cancel()方法便可。 不同的地方在于Subscription增加了一个void request(long n)方法, 这个方法有什么用呢, 在上面的代码中也有这么一句代码:

  s.request(Long.MAX_VALUE);  

这个方法就是用来向生产者申请可以消费的事件数量。这样我们便可以根据本身的消费能力进行消费事件。

当调用了request()方法后,生产者便发送对应数量的事件供消费者消费。

这是因为Flowable在设计的时候采用了一种新的思路也就是响应式拉取的方式,你要求多少,我便传给你多少。

注意:如果不显示调用request就表示消费能力为0。

虽然并不限制向request()方法中传入任意数字,但是如果消费者并没有这么多的消费能力,依旧会造成资源浪费,最后产生OOM。形象点就是不能打肿脸充胖子。

而ERROR策略就避免了这种情况的出现(讲了这么多终于出现了)。

在异步调用时,RxJava中有个缓存池,用来缓存消费者处理不了暂时缓存下来的数据,缓存池的默认大小为128,即只能缓存128个事件。无论request()中传入的数字比128大或小,缓存池中在刚开始都会存入128个事件。当然如果本身并没有这么多事件需要发送,则不会存128个事件。

在ERROR策略下,如果缓存池溢出,就会立刻抛出MissingBackpressureException异常。

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 129; i++) {
                    Log.d(TAG, "emit " + i);
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        mSubscription = s;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }
                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                    }
                });

01-20 16:58:48.993 32434-32474/? D/MainActivity: emit 125

01-20 16:58:48.993 32434-32474/? D/MainActivity: emit 126

01-20 16:58:48.993 32434-32474/? D/MainActivity: emit 127

01-20 16:58:48.993 32434-32474/? D/MainActivity: emit 128

01-20 16:58:49.003 32434-32434/? W/MainActivity: onError:

io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests

我们让Flowable发送129个事件,而Subscriber一个也不处理,就产生了异常。

因此,ERROR即保证在异步操作中,事件累积不能超过128,超过即出现异常。消费者不能再接收事件了,但生产者并不会停止。

BUFFER

所谓BUFFER就是把RxJava中默认的只能存128个事件的缓存池换成一个大的缓存池,支持存很多很多的数据。

这样,消费者通过request()即使传入一个很大的数字,生产者也会生产事件,并将处理不了的事件缓存。

但是这种方式任然比较消耗内存,除非是我们比较了解消费者的消费能力,能够把握具体情况,不会产生OOM。

总之BUFFER要慎用。

DROP

看名字就可以了解其作用:当消费者处理不了事件,就丢弃。

消费者通过request()传入其需求n,然后生产者把n个事件传递给消费者供其消费。其他消费不掉的事件就丢掉。

下面例子具体介绍:

image.png

点击“开始”按钮,建立连接。生产者开始生产事件,刚开始消费者通过request()只要了50个事件消费。然后每次点击“消费”按钮,再次消费50个事件。

mFlowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {

                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.DROP);
        mSubscriber = new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                mSubscription = s;
                s.request(50);
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: " + integer);
            }

            @Override
            public void onError(Throwable t) {
                Log.w(TAG, "onError: ", t);
            }

            @Override
            public void onComplete() {

            }
        };
    }
    public void start(View view){
        mFlowable.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(mSubscriber);

    }
    public void consume(View view){
        mSubscription.request(50);

    }

01-20 17:25:44.331 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 0

..........................................................

01-20 17:25:44.331 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 49

01-20 17:25:47.891 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 50

..........................................................

01-20 17:25:47.891 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 99

01-20 17:25:50.241 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 100

..........................................................

01-20 17:25:50.241 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 127

01-20 17:25:50.241 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 17749078

..........................................................

01-20 17:25:50.241 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 17749099

可以看出,生产者一次性传入128个事件进入缓存池。点击“开始”按钮,消费了50个。然后第一次点击“消费”按钮,又消费了50个,第二次点击“消费”按钮,再次消费50个。然而此时原来的128个缓存只剩下28个了,所以先消费掉28个,然后剩下22个是后来传入的(其实后来的是在消费了96个后传入,并一次性在缓存池中又传入了96个,具体可以看源码,这里不解释了)。

LATEST

LATEST与DROP功能基本一致。

消费者通过request()传入其需求n,然后生产者把n个事件传递给消费者供其消费。其他消费不掉的事件就丢掉。

唯一的区别就是LATEST总能使消费者能够接收到生产者产生的最后一个事件。

还是以上述例子展示,唯一的区别就是Flowable不再无限发事件,只发送1000000个。

结果如下:

01-20 17:50:30.459 25334-25334/com.lvr.rxjavalearning D/MainActivity: onNext: 0

..........................................................

01-20 17:50:30.459 25334-25334/com.lvr.rxjavalearning D/MainActivity: onNext: 49

01-20 17:50:31.569 25334-25334/com.lvr.rxjavalearning D/MainActivity: onNext: 50

..........................................................

01-20 17:50:32.459 25334-25334/com.lvr.rxjavalearning D/MainActivity: onNext: 100

01-20 17:50:32.459 25334-25334/com.lvr.rxjavalearning D/MainActivity: onNext: 101

..........................................................

01-20 17:50:32.459 25334-25334/com.lvr.rxjavalearning D/MainActivity: onNext: 127

01-20 17:50:32.459 25334-25334/com.lvr.rxjavalearning D/MainActivity: onNext: 999999

唯一的区别就在最后一行。这就是LATEST与DROP的区别。

上述例子Flowable对象的获取都是通过create()获取的,自然可以通过BackpressureStrategy.LATEST之类的方式指定处理背压的策略。如果Flowable对象不是自己创建的,可以采用onBackpressureBuffer()、onBackpressureDrop()、onBackpressureLatest()的方式指定。

 Flowable.just(1).onBackpressureBuffer()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {

                    }
                });

以上就是关于Flowable及backpressure的内容。

作者:Ruheng
链接:https://www.jianshu.com/p/1f4867ce3c01
來源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。</pre>

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,839评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,543评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,116评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,371评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,384评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,111评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,416评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,053评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,558评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,007评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,117评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,756评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,324评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,315评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,539评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,578评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,877评论 2 345