相关文章链接:
在ReactiveX中一个观察者(observer)订阅一个被观察的对象(Observable)。那个观察者对Observable发出的无论是单个对象或者是一串对象作出反馈。这种模式有利于并发操作因为它不需要阻塞线程,当等待Observable发送对象的过程中,它建立一种xiang哨兵一样的机制,我们称之为观察者,它会一直关注那个Observable然后做出适当的反应。
这篇文章解释了reactive pattern是什么,Observable(被观察的对象)和observer(观察者)是什么,以及observer如何订阅Observable。其他的几篇文章会解释如何使用Observable的各种操作符去链接不同的Observables并且改变他们的行为。
这篇文章会结合一些“弹子图”进行解释。这张图简单介绍了弹子图如何代表Observables和Observables的变形。第一行的一个个图形代表Observable发送的一个个事件,每个事件都经过翻转的操作,得到新的Observable事件,按照顺序发送。
背景介绍
在很多软件编程任务中,你或多或少希望你写的指令会逐步地执行和完成,一次一条,按照你写的顺序。但在ReactiveX中,很多指令同时执行,执行结果会在稍后以任意的顺序被observer捕捉。与以往调用函数不同,你以Observable的形式定义一种新的得到数据并转化数据的机制,并且可以创建一个observer订阅它,当这种机制被触发,observer就会捕捉到并且做出反应。
这种方式的优点在于当你有一些互不关联的任务时,你可以同时启动它们而不是等这些任务一个个完成才开始另外一个,这样的话,完成所有任务的总时长只是完成那个最长任务的时长。
有很多用来描述这种异步编程和设计的术语,这里我们用以下的术语:一个观察者订阅一个被观察对象(An observer subscribes to an Observable)。一个被观察对象(Observable)通过调用观察者(observer)的方法来给观察者(observer)发送条目或者通知。
在其他文章中,可能会把observer称之为订阅者(subscriber),观察者(watcher)或者反映者(reactor)。这个模式经常被引用为“反应器模式”
创建观察者
这篇文章中将会用一些伪代码来展示一些例子,ReactiveX的实现可以用很多种语言.
在一个普通的方法调用中,流程是这样的:
1. 调用函数
2. 把函数的返回值存在一个变量里
3. 用这个变量和新的值去做一些有用的事
// make the call, assign its return value to `returnVal`
returnVal = someMethod(itsParameters);
// do something useful with returnVal
在异步模型里,流程应该是这样的:
1. 定义一个方法,这个方法可以使用异步调用的返回值做一些有用的事,这个方法是observer的一部分
2. 定义一个异步调用,称它为Observable
3. 通过订阅的方式,把这个observer附加到Observable上去(这也同时初始化了Observable的行为actions)
4. 无论那个异步调用返回什么,observer的方法都会开始对异步调用的返回值进行操作,这些返回值其实也就是Observable发送的对象,条目。
// defines, but does not invoke, the Subscriber's onNext handler
// (in this example, the observer is very simple and has only an onNext handler)
def myOnNext = { it -> do something useful with it };
// defines, but does not invoke, the Observable
def myObservable = someObservable(itsParameters);
// subscribes the Subscriber to the Observable, and invokes the Observable
myObservable.subscribe(myOnNext);
// go on about my business
OnNext, onCompleted, and onError
订阅(Subscribe)的方法是指你如何把一个observer与一个Observable联系起来。你的observer会实现下面的其中一些方面。
-
onNext
当Observable发送一个对象的时候,Observable就开始调用 onNext
方法,这个方法把Observable发送的对象作为参数
-
onError
当Observable不能产生希望的数据或者遇到一些错误的时候,就会调用这个onError
方法。调用这个方法之后,将不会再调用onNext
或者onCompleted
这些函数。onError
方法把error作为参数。
-
onCompleted
如果没有遇到任何错误,Observable在最后一次调用onNext
之后调用这个方法。
根据Observable规定(Observable Contract),Observable调用onNext
大于等于零次,然后会接着调用onCompleted
或者onError
方法,但不会两个都调用,一旦调用这两个方法中的任何一个,这个方法将会是最后一次调用。我们约定俗成地把onNext
的调用称为事项的发送(emissions of items),而把onCompleted
或者onError
叫做通知(notifications).
一个更加完整的subscribe调用例子大概是这样的:
def myOnNext = { item -> /* do something useful with item */ };
def myError = { throwable -> /* react sensibly to a failed call */ };
def myComplete = { /* clean up after the final response */ };
def myObservable = someMethod(itsParameters);
myObservable.subscribe(myOnNext, myError, myComplete);
// go on about my business
取消订阅
在一些ReactiveX的实现中,有一个专门的observer接口,订阅器(Subscriber),它实现了取消订阅这个方法。如果Subscriber再也不对现在订阅的Observable感兴趣了,你可以调用这个方法去取消订阅。那些Observable将不再会发送新的事件(如果Observable没有其他observer的话)。
取消订阅的操作会顺着运用在Observable上的操作符一级一级地向前推,让这个操作符链上的每个节点都停止发送事件。这一过程可能需要一定的事件,并不保证会立刻发生,对一个Observable来说很有可能在没有observer订阅它的情况下仍然会发一段时间的事件。
一些关于命名规范的提示
ReactiveX中的不同语言每一种都有它们自己的命名规范。尽管有一点共同点,但是没有一个各种语言通用的标准。
更重要的是这些名称在其他的上下文中可能有会推断出来不同的意义,或者在某一种特别的语言环境下看起来特别奇怪。
举个例子来说,onEvent
命名模式中(例如onNext
,onCompleted
,onError
),在一些语言的上下文中,这些名称会显示注册了哪些事件处理器(event handler),而在ReactiveX中,这些名称就是event handler本身。
Hot Observable 和 Cold Observable
什么时候一个Observable会开始发送一串事件呢?取决于这个Observable。一个“hot” Observable一旦被创建就会发送事件,所以任何之后订阅这个Observable的observer会从中间的某个点开始观察。一个“cold” Observable直到有一个observer订阅它,它才开始发送事件,所以observer保证能够观察到整个事件串。
在一些ReactiveX的实现中,有一个被称为可连接的(Connectable)Observable的东西,这个Observable直到调用Connect方法才会开始发送事件,无论有没有observer订阅它。
运用Observable操作符进行组合
Observable和observer仅仅是ReactiveX的开始,它们本身仅仅是一种标准observer模式的拓展延伸而已,能更好地处理事件串。
真正强大的功能来自于“reactive extensions”(这也是为什么ReactiveX的名字由来),reactive extensions指的是操作符,这些操作符可以让你变化,组合,处理那些Observable发出来的一串串事件。
这些操作符让你能够把异步序列组合在一起,以一种描述性的方式组合在一起,同时运用了回调函数(callbacks)的高效性,摒弃了异步系统中常常出现的冗长的callback嵌套问题。
链式操作符
大多数操作符处理一个Observable并且返回一个Observable。这个特点能够让你一个接一个地运用这些操作符,像一个链条一样。每一个在链条中的操作符都处理从上一个操作符传下来的Observable然后传给下一个操作符。
也有一些其他的模式,像Builder Pattern,在这种模式中,用一个类的各种不同的方法对这个类的事件进行操作,通过这些方法的操作改变那些对象。这些模式也可以用链式的方式,但在Builder Pattern里,与Observable操作符不同的是,方法在链中的顺序并不重要。
Observable操作符不是对最初的那个Observable进行操作,而是根据链条中上一个操作符产生的Observable进行操作。