官方文档链接:http://reactivex.io/documentation/operators.html
1.前言
接着讲解剩下的那一半操作符。这些操作符相对而言比较高级,大部分都不太常用,但在某些情况下可以明显减少代码逻辑,提高工作效率,还是有必要了解的。
2.实用辅助操作符
辅助被观察者的操作符的集合。
-
Subscribe — 对被观察者的事件和通知进行操作。
subscribe()
操作符像胶水一样连接着观察者和被观察者。为了使观察者捕获被观察者产生的事件或onError()
、onCompleted()
通知,必须用这个操作符先订阅那个被观察者。它的典型实现可能要接受一到三个方法(然后构成观察者),或者接受一个实现包含那三个方法的接口的对象(有时称为观察者或订阅者):onNext()
每当被观察者产生一个事件,它将调用这个方法,并使用它产生的事件作为参数。onError()
被观察者调用这个方法,表明它未能生成预期的数据,或者遇到一些其它的错误,将不会接着调用onNext()
或onCompleted()
方法。onError()
方法使用导致错误的指示(取决于自己的实现,有时是个对象,像Exception或Throwable;有时是简单字符串),作为它的参数。onCompleted()
被观察者调用onNext()
方法之后,且没有遇到任何问题,当结束事件的产生时,它调用这个方法。若被观察者是冷类型,它将不会开始产生事件直到一个观察者订阅了它;若被观察者是热类型,它可能在任何时间开始产生事件,而订阅者可能从某时刻开始捕获事件队列,并丢失之前的所有事件。
参考的文章:
-
Do — 注册一个动作结合被观察者生命周期内各种事件。
当被观察者发生某些事件时,ReactiveX将会调用注册的回调方法。并且这些回调方法,将会因为被观察者相关的一般通知事件独立地调用。不同的ReactiveX实现中,已经设计了相关的各种操作符。
参考的文章:
-
Materialize / Dematerialize — 将一般事件和通知事件当成一般事件传递,或反转此过程。
一个格式良好且有限的被观察者将调用自己观察者的
onNext()
方法零到多次,接着将调用onCompleted()
或onError()
方法至少一次。materialize()
操作符会将这一系列调用(不管一般的onNext()
通知还是终止的onCompleted()
或onError()
通知)都转换成一般事件。dematerialize()
操作符反转上面的过程。作用于一个之前被materialize()
操作符转换过的被观察者,使它返回原来的样子。参考的文章:
-
ObserveOn — 指定观察者所在的线程来订阅被观察者。
ReactiveX的许多实现使用
Schedulers
来管理被观察者在多线程环境下线程的切换。通过observeOn()
操作符,告诉被观察者发送自己的通知给指定线程下的观察者。注意,
observeOn()
操作符会直接传递接收到的一个onError()
终止通知,而不会等待观察者缓慢地接收任何尚未传递到的已知事件。这可能意味着,如上图所示,onError()
通知跳到部分原始事件之前。subscribeOn()
操作符与之类似,是告诉被观察者,它的操作所在的指定线程,以及在此线程上通知自己的观察者。默认情况下,被观察者、使用的操作链和将通知的观察者都执行在subscribe()
方法调用的线程上。subscribeOn()
操作符通过调度被观察者应该执行的指定线程,来改变这个行为。observeOn()
操作符则指定不同的线程,让被观察者发送通知给自己的观察者。如上图所示,
subscribeOn()
操作符指定被观察者将开始执行于哪个线程,且可以被调用于操作链的任意位置。observeOn()
操作符则相反,影响被观察者接下来使用的操作符出现的线程。为此,在被观察者操作链期间的不同位置,可能多次调用observeOn()
操作符来改变后面操作符执行的线程。参考的文章:
-
SubscribeOn — 指定被观察者执行时所在的线程。
ReactiveX的许多实现使用
Schedulers
来管理被观察者在多线程环境下线程的切换。通过调用被观察者的subscribeOn()
操作符,告诉它自己在指定线程下执行。observeOn()
操作符与之类似,但有更多限制,它告诉被观察者发送自己的通知给指定线程下的观察者。在一些实现中,也有unsubscribeOn()
操作符。参考的文章:
Scheduler
- ObserveOn
- Rx Workshop: Schedulers
- RxJava Threading Examples by Graham Lea
- Introduction to Rx: SubscribeOn and ObserveOn
- Async Abstractions using rx-java by Biju Kunjummen, DZone
- RxJava: Understanding observeOn() and subscribeOn() by Thomas Nield
- Advanced Reactive Java: SubscribeOn and ObserveOn by Dávid Karnok
-
Serialize — 迫使被观察者进行同步连续的调用,使行为良好。
当来自不同线程时,同一个被观察者异步调用自己观察者的方法也是有可能。这样可能导致被观察者违反之前的约定,因为它可能在
onNext()
通知之前,试着发送一个onCompleted()
或onError()
通知;或者可能从两个不同的线程同时发送onNext()
通知。可以通过使用serialize()
操作符,迫使这个被观察者有良好行为和同步。 -
Delay — 将被观察者的事件向后推迟指定的时间。
delay()
操作符通过在产生原始事件之前,停顿一个特定时间(人为指定),来修改自己原始被观察者。将导致被观察者产生的整个事件队列,推迟一个指定的时间。参考的文章:
-
TimeInterval — 将一个产生事件的被观察者转换成以原始事件的时间间隔的标识为事件的被观察者。
timeInterval()
操作符拦截原始事件,且将相邻事件之间的时间间隔的标识作为事件来替换。参考的文章:
-
Timestamp — 给被观察者产生的每个事件附加一个时间戳来表明它什么时候产生的。
timestamp()
操作符在传递事件队列前,给每个原始事件附加一个时间戳,表明事件产生的时间。参考的文章:
-
Timeout — 保持原始被观察者,当特定时间段内没有产生任何事件,发出一个
onError()
通知。当被观察者在特定时间段内没有产生任一事件,
timeout()
操作符允许通过onError()
通知终止它。参考的文章:
-
Using — 创建与被观察者有相同生命周期的一次性资源。
using
操作符是通知被观察者创建一个只存在自己生命周期内的资源的一种方式,且随着被观察者而终止。参考的文章:
3.条件判断操作符
判断一到多个被观察者及它们产生的事件的操作符。
-
All — 判断一个被观察者产生的所有事件是否满足一定的标准。
给
all()
操作符传递一个判定函数,基于对原始事件的判断返回一个布尔值。该操作符返回一个产生单一布尔值的被观察者:true,仅当原始被观察者正常终止,且每个原始事件通过函数判定为true;false,任一原始事件通过函数判定为false。参考的文章:
-
Amb — 当有两个以上原始被观察者时,从中选取最早产生事件或通知的被观察者产生的事件。
当传递许多原始被观察者给
Amb
操作符时,它只会传递其中一个的事件和通知:第一个发送通知给Amb
操作符,不管产生的是事件还是onError()
或onCompleted()
通知。该操作符将忽略并丢弃所有其它的原始被观察者的事件和通知。参考的文章:
-
Contains — 判断一个被观察者是否产生了某个特定的事件。
给
contains()
操作符传递一个特定事件,若此事件由原始被观察者产生,返回的被观察者将产生true事件;若原始被观察者终止前没产生该事件,则false。有一个相似的操作符isEmpty()
,它返回一个被观察者:产生true事件,仅当原始被观察者没产生任何事件就结束了;否则产生false事件。参考的文章:
-
DefaultIfEmpty — 传递原始事件,或者当原始被观察者没产生事件时提供默认事件。
defaultIfEmpty()
操作符在原始被观察者产生任何事件时,直接传递。如果原始被观察者正常终止(通过onCompleted()
通知)时没产生任何事件,该操作符返回的被观察者将代替产生一个之前指定的默认事件。参考的文章:
-
SequenceEqual — 判断两个被观察者是否产生相同的事件队列。
给
sequenceEqual()
操作符传递两个被观察者,它将比较每个被观察者产生的事件。仅当两个队列一样(相同的事件、相同的顺序和相同的终止状态),该操作符返回的被观察者将产生true事件。参考的文章:
-
SkipUntil — 丢弃被观察者产生的事件直到第二个被观察者产生一个事件。
skipUntil()
操作符订阅原始被观察者,但忽略它的事件直到第二个被观察者产生一个事件,之后开始正常传递。参考的文章:
-
SkipWhile — 丢弃被观察者产生的事件直到指定的条件变成false。
skipWhile()
操作符订阅原始被观察者,但忽略它的事件直到指定的某些条件变成false,之后开始正常传递。参考的文章:
-
TakeUntil — 传递被观察者产生的事件直到第二个被观察者产生一个事件或终止。
takeUntil()
操作符订阅并开始传递原始被观察者事件,同时监听提供的第二个被观察者。当第二个被观察者产生一个事件或发送onCompleted()
通知,操作符返回的被观察者停止产生事件并终止。参考的文章:
-
TakeWhile — 传递被观察者产生的事件直到指定的条件变成false。
takeWhile()
操作符传递原始被观察者事件直到指定的某些条件变成false,才停止传递事件并终止自己的被观察者。参考的文章:
4.数学计算操作符
作用于被观察者产生的整个事件队列的操作符。
-
Average — 计算被观察者产生数字的平均值并传递。
average()
操作符作用于被观察者产生的数字(或等价于数字的事件),并传递单一的值:原始被观察者产生的所有数字的平均值。参考的文章:
-
Sum — 计算被观察者产生数字的和并传递。
sum()
操作符作用于被观察者产生的数字(或等价于数字的事件),并传递单一的值:原始被观察者产生的所有数字的和。参考的文章:
-
Max — 判断并传递拥有最大值的原始事件。
max()
操作符作用于产生数字(或等价于数字的事件)的被观察者,并产生单一事件:最大数字的事件。参考的文章:
-
Min — 判断并传递拥有最小值的原始事件。
min()
操作符作用于产生数字(或等价于数字的事件)的被观察者,并产生单一事件:最小数字的事件。参考的文章:
-
Concat — 传递两个及以上被观察者的事件而不交错它们(由于是顺序连接算不上合并)。
concat()
操作符连接多个被观察者的输出,这样看起来像单一的被观察者。当第一个被观察者产生的所有事件传递完了,才开始第二个被观察者的(当然可以多于两个),即该操作符等到前一个被观察者完成,才订阅传给它的下个被观察者。注意,该操作符连接热类型被观察者时,那个被观察者在被此操作符订阅之前产生的所有事件将丢失。在一些ReactiveX实现中,也有
concatMap()
操作符(a.k.a.concat_all
,concat_map
,concatMapObserver
,for
,forIn
/for_in
,mapcat
,selectConcat
, orselectConcatObserver
)把原始事件转换成相应的被观察者,再按顺序连接每个被观察者产生的事件。startWith()
操作符与concat()
操作符类似,但事件或事件级被观察者产生的事件是从前部加而不是从后部加。merge()
操作符也相似,合并两个及以上被观察者的事件,但可能交错。参考的文章:
- Catch
- Merge
- StartWith
- Introduction to Rx: Concat
- RxMarbles:
concat
- 101 Rx Samples: Concat — cold observable
- 101 Rx Samples: Concat — hot observable
- Loading data from multiple sources with RxJava by Dan Lew (example using Concat and First)
-
Count — 只传递计算后原始事件的数量。
count()
操作符将产生事件的被观察者转换成产生代表原始事件数量的单一值的被观察者。如果原始被观察者因为onError()
通知终止,该操作符将优先传递此通知。如果原始被观察者一直不终止,该操作符既不传递事件也不终止。参考的文章:
-
Reduce — 对被观察者产生的每个事件依次使用函数,并产生最终值。
reduce()
操作符对第一个原始事件使用函数,接着将结果反馈到函数中作为参数与第二个原始事件计算,持续这个过程直到原始被观察者产生最后的事件而终止。因此,该操作符返回的被观察者产生函数返回的最终值作为事件。这种操作符在其它上下文中,有时被称为“累加”、“聚合”、“压缩”、“折叠”或“注入”。参考的文章:
5.背压处理操作符
应对被观察者产生事件的速率大于观察者消耗的速率的操作符。
在ReactiveX中不难遇到这种场景,一个被观察者产生事件的速率大于操作符或观察者消耗的速率。下面给出处理不断增长积压的未消耗事件的办法。
例如,假设使用zip()
操作符来压缩两个无穷事件的被观察者,其中一个产生事件的速率是另一个的两倍。按照这种简单的实现,不得不为快的被观察者产生的、最终需与慢的被观察者合并的事件,维持一个不断增长的缓存,可能导致ReactiveX占用大量的系统资源。有各种策略来控制数据流和背压,以期缓解产生事件快的被观察者遇上消耗事件慢的观察者而导致的问题。某些ReactiveX实现中,模拟背压操作和特定背压操作符。
冷类型的被观察者在观察者方便时,按照观察者期望的速率开始产生指定的事件队列,不会扰乱事件队列的完整性。例如,被观察者内有静态迭代器,不管它多晚被订阅或事件多频繁被捕获,都将产生相同的事件队列。冷类型的被观察者产生事件的例子,包括数据库查询的结果、文件检索或网络请求。
热类型的被观察者创建后立即开始产生事件,订阅者通常从事件队列中间的某个位置开始捕获它的事件,即被观察者订阅之后产生的第一个事件开始。这样的被观察者按自己的步调产生事件,任由自己的观察者订阅。热类型的被观察者产生事件的例子,包括鼠标和键盘事件、系统事件或股票价格。
当冷类型被观察者多播时(它被转换成可连接的被观察者且connect()
方法被调用),它将变成动态的,且为了背压和控制数据流应该被当作热类型被观察者对待。在某些ReactiveX实现中,冷类型被观察者是实现背压理想的响应模拟模型(具体在后面描述),热类型被观察者通常不能很好地实现响应模拟模型,且有更好的控制数据流策略,例如buffer()
、sample()
、debounce()
或window()
。
参考的文章:
6.连接类型操作符
特定被观察者能更精确地控制订阅的动态。
-
Connect — 通知某个可连接的被观察者开始给它的订阅者产生事件。
可连接的被观察者类似普通的被观察者,除了,当它被订阅时并不产生事件,只有对它使用
connect()
操作符。通过这种方式,可以等待所有预期的观察者订阅此被观察者后开始产生事件。参考的文章:
-
Publish — 将普通的被观察者转换成可连接的被观察者。
可连接的被观察者类似普通的被观察者,除了,当它被订阅时并不产生事件,只有对它使用
connect()
操作符。通过这种方式,可以让被观察者在指定的时间立刻开始产生事件。参考的文章:
-
RefCount — 使可连接的被观察者行为像普通被观察者。
refCount()
操作符对可连接的被观察者自动执行连接和断连操作。它作用于可连接的被观察者,且返回普通的被观察者。当第一个观察者订阅这个被观察者时,该操作符连接之前的可连接的被观察者,并保持监听有多少其它观察者订阅自己,直到最后一个观察者不订阅自己才断开连接。参考的文章:
-
Replay — 确保所有观察者捕获相同的事件队列,即使在被观察者已经开始产生事件之后订阅。
如果在将被观察者转换成可连接的被观察者之前,对它使用
replay()
操作符,那么该可连接的被观察者将总是产生相同完整的事件队列给任何订阅的观察者,即使那些观察者在它已经开始产生事件给那些订阅的观察者之后订阅。参考的文章:
7.转换类型操作符
To — 将一个被观察者转换成另一个对象或数据结构。
ReactiveX的各种特定语言实现有不同的操作符,能用来将被观察者或被观察者产生的事件队列转换成另一种对象或数据结构。其中一些操作符阻塞进行直到被观察者终止,再产生一个等价的对象或数据结构;其它则返回产生该对象或数据结构的被观察者。
在某些ReactiveX的实现中,也有一个操作符可以将被观察者转换成“阻塞的”被观察者。该被观察者提供一系列方法来扩展普通被观察者,并对产生的事件进行阻塞操作。一些to()
操作符实现,属于该被观察者系列的扩展操作。
参考的文章:
8.总结
ReactiveX的所有操作符到此就介绍完了,是不是感觉很多,有点记不住,而且不知道该如何使用。没关系,下一篇将介绍操作符的决策树,帮助你判断什么情况下使用什么操作符。给大家推荐几篇帮助理解的专栏: