- RxJava 2.x与RxJava 1.x的差异
Nulls
这是一个很大的变化,熟悉 RxJava 1.x 的童鞋一定都知道,1.x 是允许我们在发射事件的时候传入 null 值的,但现在我们的 2.x 不支持了,不信你试试? 大大的 NullPointerException 教你做人。这意味着 Observable<Void> 不再发射任何值,而是正常结束或者抛出空指针。Flowable
在 RxJava 1.x 中关于介绍 backpressure 部分有一个小小的遗憾,那就是没有用一个单独的类,而是使用 Observable 。而在 2.x 中 Observable 不支持背压了,将用一个全新的 Flowable 来支持背压。
或许对于背压,有些小伙伴们还不是特别理解,这里简单说一下。大概就是指在异步场景中,被观察者发送事件的速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。感兴趣的小伙伴可以模拟这种情况,在差距太大的时候,我们的内存会猛增,直到OOM。而我们的 Flowable 一定意义上可以解决这样的问题,但其实并不能完全解决,这个后面可能会提到。Single/Completable/Maybe
其实这三者都差不多,Single 顾名思义,只能发送一个事件,和 Observable接受可变参数完全不同。而 Completable 侧重于观察结果,而 Maybe 是上面两种的结合体。也就是说,当你只想要某个事件的结果(true or false)的时候,你可以使用这种观察者模式。线程调度相关
这一块基本没什么改动,但细心的小伙伴一定会发现,RxJava 2.x 中已经没有了 Schedulers.immediate() 这个线程环境,还有 Schedulers.test()。Function相关
熟悉 1.x 的小伙伴一定都知道,我们在1.x 中是有 Func1,Func2.....FuncN的,但 2.x 中将它们移除,而采用 Function 替换了 Func1,采用 BiFunction 替换了 Func 2..N。并且,它们都增加了 throws Exception,也就是说,妈妈再也不用担心我们做某些操作还需要 try-catch 了。其他操作符相关
如 Func1...N 的变化,现在同样用 Consumer 和 BiConsumer 对 Action1 和 Action2 进行了替换。后面的 Action 都被替换了,只保留了 ActionN。
- RxJava 2.x 拥有了新的特性,其依赖于4个基础接口,它们分别是
- Publisher
- Subscriber
- Subscription
- Processor
其中最核心的莫过于 Publisher
和 Subscriber
。Publisher
可以发出一系列的事件,而 Subscriber
负责和处理这些事件。
其中用的比较多的自然是 Publisher
的 Flowable
,它支持背压。关于背压给个简洁的定义就是:
背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。
简而言之,背压是流速控制的一种策略。有兴趣的可以看一下官方对于背压的讲解。
可以明显地发现,RxJava 2.x 最大的改动就是对于 backpressure
的处理,为此将原来的 Observable
拆分成了新的 Observable
和 Flowable
,同时其他相关部分也同时进行了拆分,但令人庆幸的是,是它,是它,还是它,还是我们最熟悉和最喜欢的 RxJava。
RxJava API文档
Flowable示例:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
while (true){
e.onNext(1);
}
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Thread.sleep(2000);
System.out.println(integer);
}
});
处理Backpressure的策略仅仅是处理Subscriber接收事件的方式,并不影响Flowable发送事件的方法。即使采用了处理Backpressure的策略,Flowable原来以什么样的速度产生事件,现在还是什么样的速度不会变化,主要处理的是Subscriber接收事件的方式。
Backpressure的策略:
ERROR
这种方式会在产生Backpressure问题的时候直接抛出一个异常,这个异常就是著名的MissingBackpressureException。
我们先以代码示例介绍一下Flowable相比与Observable新的东西。
Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Log.d(TAG, "emit complete");
emitter.onComplete();
}
}, BackpressureStrategy.ERROR); //增加了一个参数
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
flowable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber);
01-20 16:18:58.898 17829-17851/? D/MainActivity: emit 1
01-20 16:18:58.898 17829-17851/? D/MainActivity: emit 2
01-20 16:18:58.898 17829-17851/? D/MainActivity: emit 3
01-20 16:18:58.898 17829-17851/? D/MainActivity: emit complete
01-20 16:18:58.908 17829-17829/? D/MainActivity: onNext: 1
01-20 16:18:58.908 17829-17829/? D/MainActivity: onNext: 2
01-20 16:18:58.908 17829-17829/? D/MainActivity: onNext: 3
01-20 16:18:58.908 17829-17829/? D/MainActivity: onComplete
上述代码创建了一个Flowable(被观察者)和一个Subscriber(观察者),可以看到程序如我们预期的一样输出结果了。不同的是 onSubscribe(Subscription s)中传给我们的不再是Disposable了, 而是Subscription。然而Subscription也可以用于切断观察者与被观察者之间的联系,调用Subscription.cancel()方法便可。 不同的地方在于Subscription增加了一个void request(long n)方法, 这个方法有什么用呢, 在上面的代码中也有这么一句代码:
s.request(Long.MAX_VALUE);
这个方法就是用来向生产者申请可以消费的事件数量。这样我们便可以根据本身的消费能力进行消费事件。
当调用了request()方法后,生产者便发送对应数量的事件供消费者消费。
这是因为Flowable在设计的时候采用了一种新的思路也就是响应式拉取的方式,你要求多少,我便传给你多少。
注意:如果不显示调用request就表示消费能力为0。
虽然并不限制向request()方法中传入任意数字,但是如果消费者并没有这么多的消费能力,依旧会造成资源浪费,最后产生OOM。形象点就是不能打肿脸充胖子。
而ERROR策略就避免了这种情况的出现(讲了这么多终于出现了)。
在异步调用时,RxJava中有个缓存池,用来缓存消费者处理不了暂时缓存下来的数据,缓存池的默认大小为128,即只能缓存128个事件。无论request()中传入的数字比128大或小,缓存池中在刚开始都会存入128个事件。当然如果本身并没有这么多事件需要发送,则不会存128个事件。
在ERROR策略下,如果缓存池溢出,就会立刻抛出MissingBackpressureException异常。
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 129; i++) {
Log.d(TAG, "emit " + i);
emitter.onNext(i);
}
}
}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
mSubscription = s;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
}
});
01-20 16:58:48.993 32434-32474/? D/MainActivity: emit 125
01-20 16:58:48.993 32434-32474/? D/MainActivity: emit 126
01-20 16:58:48.993 32434-32474/? D/MainActivity: emit 127
01-20 16:58:48.993 32434-32474/? D/MainActivity: emit 128
01-20 16:58:49.003 32434-32434/? W/MainActivity: onError:
io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
我们让Flowable发送129个事件,而Subscriber一个也不处理,就产生了异常。
因此,ERROR即保证在异步操作中,事件累积不能超过128,超过即出现异常。消费者不能再接收事件了,但生产者并不会停止。
BUFFER
所谓BUFFER就是把RxJava中默认的只能存128个事件的缓存池换成一个大的缓存池,支持存很多很多的数据。
这样,消费者通过request()即使传入一个很大的数字,生产者也会生产事件,并将处理不了的事件缓存。
但是这种方式任然比较消耗内存,除非是我们比较了解消费者的消费能力,能够把握具体情况,不会产生OOM。
总之BUFFER要慎用。
DROP
看名字就可以了解其作用:当消费者处理不了事件,就丢弃。
消费者通过request()传入其需求n,然后生产者把n个事件传递给消费者供其消费。其他消费不掉的事件就丢掉。
下面例子具体介绍:
点击“开始”按钮,建立连接。生产者开始生产事件,刚开始消费者通过request()只要了50个事件消费。然后每次点击“消费”按钮,再次消费50个事件。
mFlowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.DROP);
mSubscriber = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
mSubscription = s;
s.request(50);
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
}
};
}
public void start(View view){
mFlowable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(mSubscriber);
}
public void consume(View view){
mSubscription.request(50);
}
01-20 17:25:44.331 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 0
..........................................................
01-20 17:25:44.331 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 49
01-20 17:25:47.891 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 50
..........................................................
01-20 17:25:47.891 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 99
01-20 17:25:50.241 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 100
..........................................................
01-20 17:25:50.241 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 127
01-20 17:25:50.241 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 17749078
..........................................................
01-20 17:25:50.241 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 17749099
可以看出,生产者一次性传入128个事件进入缓存池。点击“开始”按钮,消费了50个。然后第一次点击“消费”按钮,又消费了50个,第二次点击“消费”按钮,再次消费50个。然而此时原来的128个缓存只剩下28个了,所以先消费掉28个,然后剩下22个是后来传入的(其实后来的是在消费了96个后传入,并一次性在缓存池中又传入了96个,具体可以看源码,这里不解释了)。
LATEST
LATEST与DROP功能基本一致。
消费者通过request()传入其需求n,然后生产者把n个事件传递给消费者供其消费。其他消费不掉的事件就丢掉。
唯一的区别就是LATEST总能使消费者能够接收到生产者产生的最后一个事件。
还是以上述例子展示,唯一的区别就是Flowable不再无限发事件,只发送1000000个。
结果如下:
01-20 17:50:30.459 25334-25334/com.lvr.rxjavalearning D/MainActivity: onNext: 0
..........................................................
01-20 17:50:30.459 25334-25334/com.lvr.rxjavalearning D/MainActivity: onNext: 49
01-20 17:50:31.569 25334-25334/com.lvr.rxjavalearning D/MainActivity: onNext: 50
..........................................................
01-20 17:50:32.459 25334-25334/com.lvr.rxjavalearning D/MainActivity: onNext: 100
01-20 17:50:32.459 25334-25334/com.lvr.rxjavalearning D/MainActivity: onNext: 101
..........................................................
01-20 17:50:32.459 25334-25334/com.lvr.rxjavalearning D/MainActivity: onNext: 127
01-20 17:50:32.459 25334-25334/com.lvr.rxjavalearning D/MainActivity: onNext: 999999
唯一的区别就在最后一行。这就是LATEST与DROP的区别。
上述例子Flowable对象的获取都是通过create()获取的,自然可以通过BackpressureStrategy.LATEST之类的方式指定处理背压的策略。如果Flowable对象不是自己创建的,可以采用onBackpressureBuffer()、onBackpressureDrop()、onBackpressureLatest()的方式指定。
Flowable.just(1).onBackpressureBuffer()
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
}
});
以上就是关于Flowable及backpressure的内容。
作者:Ruheng
链接:https://www.jianshu.com/p/1f4867ce3c01
來源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。</pre>