前言
Observable
英文直译为:可观察的,可看见的。但是在RxSwift
普遍的称它为“可观察序列”,它的作用主要用来形成一条数据流或者事件流,所有的操作产生的事件都会通过Observable
进行传输。
在Observable
里有三种事件(Event
: Event
枚举类型,有三个成员,next
,error
,completed
) Observable
发送的所有事件都是一个Event
:
next
事件主要是当Observable
里出现新的数据时会发出的事件,同时该事件会携带新的数据对象。completed
事件是当Observable
不再有新的数据出现,Observable
被标记完成,不再产生数据.error
事件是当数据流遇到了错误会发出的事件,该事件也会导致Observable
结束。
Observable(可观察序列)
1. Observable的创建
在RxSwift
中,可以有多种创建Observable
对象的方法,下面我们逐一介绍:
(1). create()方法
public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
return AnonymousObservable(subscribe)
}
create()
方法的参数是一个函数(闭包),根据闭包来创建序列,在闭包里面可自定义事件。函数需要传入参数AnyObserver
类型,返回的是Disposable
,AnyObserver
其实就是观察者,Disposable
是一个协议接口,里面只有一个dispose
方法,用来释放一些资源。整个create()
方法返回的是一个AnonymousObservable
(匿名Observable
).
例子:
let createOb = Observable<Int>.create{ (observer) -> Disposable in
observer.onNext(1)
observer.onCompleted()
return Disposables.create()
}
let _ = createOb.subscribe(onNext: { (number) in
print("订阅:",number)
}, onError: { (error) in
print("error:",error)
}, onCompleted: {
print("完成回调")
}) {
print("释放回调")
}
(2). empty()方法
public static func empty() -> Observable<E> {
return EmptyProducer<E>()
}
final private class EmptyProducer<Element>: Producer<Element> {
override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
observer.on(.completed)
return Disposables.create()
}
}
empty()
方法返回一个EmptyProducer
类,在这个类的内部实现了subscribe()
订阅方法,且只有一个.completed
状态,所以,empty()
方法是一个空方法,里面没有onNext
事件处理,只会处理onComplete
事件。
例子:
let emtyOb = Observable<Int>.empty()
let _ = emtyOb.subscribe(onNext: { (number) in
print("订阅:",number)
}, onError: { (error) in
print("error:",error)
}, onCompleted: {
print("完成回调")
}) {
print("释放回调")
}
(3). just()方法
public static func just(_ element: E) -> Observable<E> {
return Just(element: element)
}
just()
方法是单个信号序列创建,只能处理单个事件,简单来说,我们使用just()
方法时不能将一组数据一起处理,只能一个一个数据进行处理。just()
根据传入的一个参数来创建序列,它会向订阅者发送两个事件,第一个发送带元素数据的.next
事件,第二个发送 .completed
事件。
例子:
let array = ["You","Me"]
Observable<[String]>.just(array)
.subscribe { (event) in
print(event)
}.disposed(by: disposeBag)
let _ = Observable<[String]>.just(array).subscribe(onNext: { (number) in
print("订阅:",number)
}, onError: { (error) in
print("error:",error)
}, onCompleted: {
print("完成回调")
}) {
print("释放回调")
}
(4). of()方法
public static func of(_ elements: E ..., scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> {
return ObservableSequence(elements: elements, scheduler: scheduler)
}
of()
方法可以接受多个参数来创建实例,但这些参数必须是同类型,也就是说,of()
方法是just()
方法的升级版,它可以将一序列的事情组合起来一起处理。
例子:
Observable<String>.of("Henry","Jeannie")
.subscribe { (event) in
print(event)
}.disposed(by: disposeBag)
// 字典
Observable<[String: Any]>.of(["name":"HuGe","age":18])
.subscribe { (event) in
print(event)
}.disposed(by: disposeBag)
// 数组
Observable<[String]>.of(["Peter","July"])
.subscribe { (event) in
print(event)
}.disposed(by: disposeBag)
(5). from()方法
public static func from(optional: E?) -> Observable<E> {
return ObservableOptional(optional: optional)
}
from()
方法只接收数组(数组,集合)作为参数,并抽取出数组里的元素来作为数据流中的元素,也就是说,from()
根据传入的数组元素来创建序列。它会依次发出.next
事件,最后发出.completed
事件,结果和of()
方法一样。
例子:
Observable<[String]>.from(optional: ["Hu","Ge"])
.subscribe { (event) in
print(event)
}.disposed(by: disposeBag)
(6). deferred()方法
public static func deferred(_ observableFactory: @escaping () throws -> Observable<E>)
-> Observable<E> {
return Deferred(observableFactory: observableFactory)
}
deferred()
方法是延时创建Observable
对象,当subscribe
的时候才去创建,它为每一个bserver
创建一个新的Observable
;
例子:
//用于标记是奇数、还是偶数
var isOdd = true
//使用deferred()方法延迟Observable序列的初始化,通过传入的block来实现Observable序列的初始化并且返回。
let factory : Observable<Int> = Observable.deferred {
//让每次执行这个block时候都会让奇、偶数进行交替
isOdd = !isOdd
//根据isOdd参数,决定创建并返回的是奇数Observable、还是偶数Observable
if isOdd {
return Observable.of(1, 3, 5 ,7)
}else {
return Observable.of(2, 4, 6, 8)
}
}
//第1次订阅测试
factory.subscribe { event in
print("\(isOdd)", event)
}
//第2次订阅测试
factory.subscribe { event in
print("\(isOdd)", event)
}
(7). range()方法
public static func range(start: E, count: E, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> {
return RangeProducer<E>(start: start, count: count, scheduler: scheduler)
}
range()
方法通过指定起始和结束数值,创建一个以这个范围内所有值作为初始值的 Observable
序列。range()
方法其实和of()
方法很相似,其功能和of()
差不多,我们只要输出start
和count
然后就能生成一组数据,让它们执行onNext
。值得注意的是,range()
方法只生成Observable
类型。
例子:
Observable.range(start: 2, count: 5)
.subscribe { (event) in
print(event)
}.disposed(by: disposeBag)
(8). generate()方法
public static func generate(initialState: E, condition: @escaping (E) throws -> Bool, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance, iterate: @escaping (E) throws -> E) -> Observable<E> {
return Generate(initialState: initialState, condition: condition, iterate: iterate, resultSelector: { $0 }, scheduler: scheduler)
}
generate()
方法创建一个只有当提供的所有的判断条件都为 true
的时候,才会给出动作的Observable
序列。generate()
方法是一个迭代器,它一直循环onNext
事件,直到condition
不满足要求退出。generate()
有四个参数,第一个是最开始的循环变量,第二个是条件,第三个是迭代器,这个迭代器每次运行都会返回一个E
类型,作为下一次是否执行onNext
事件源,而是否正的要执行则看是否满足condition
条件。其实我们可以理解generate()
就是一个循环体,其内部实现也正是一个循环,类似于数组的遍历,具体实现代码在GenerateSink
的run
方法。
例子:
Observable.generate(initialState: 0,// 初始值
condition: { $0 < 10}, // 条件1
iterate: { $0 + 2 }) // 条件2 +2
.subscribe { (event) in
print(event)
}.disposed(by: disposeBag)
(9). timer()方法
public static func timer(_ dueTime: RxTimeInterval, period: RxTimeInterval? = nil, scheduler: SchedulerType)
-> Observable<E> {
return Timer(
dueTime: dueTime,
period: period,
scheduler: scheduler
)
}
timer()
方法是创建的Observable
序列在经过设定的一段时间后,产生唯一的一个元素,或者每隔一段时间产生一个元素。这取决于period
是否有值。
例子:
let observable = Observable<Int>.timer(5, period: 1, scheduler: MainScheduler.instance)
observable.subscribe { event in
print(event)
}
(10). interval()方法
public static func interval(_ period: RxTimeInterval, scheduler: SchedulerType)
-> Observable<E> {
return Timer(
dueTime: period,
period: period,
scheduler: scheduler
)
}
interval()
方法创建的 Observable
序列每隔一段设定的时间,会发出一个索引数的元素。而且它会一直发送下去。差别在于timer()
可以设置间隔时间和持续时间,而interval()
的间隔时间和持续时间是一样的。
例子:
//下面方法让其每 1秒发送一次,并且是在主线程(MainScheduler)发送。
let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
observable.subscribe { event in
print(event)
}
(11). repeatElement()方法
public static func repeatElement(_ element: E, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> {
return RepeatElement(element: element, scheduler: scheduler)
}
repeatElement()
方法创建一个可以无限发出给定元素的Event
的Observable
序列,它是无限循环的,会一直循环onNext方法,不会终止。
例子:
Observable<Int>.repeatElement(5)
.subscribe { (event) in
print("订阅:",event)
}
.disposed(by: disposeBag)
(12). error()方法
public static func error(_ error: Swift.Error) -> Observable<E> {
return ErrorProducer(error: error)
}
final private class ErrorProducer<Element>: Producer<Element> {
private let _error: Swift.Error
init(error: Swift.Error) {
self._error = error
}
override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
observer.on(.error(self._error))
return Disposables.create()
}
}
error()
方法是返回一个只能调用onError
方法的Observable
序列。其中的onNext
和OnCompleted
方法是不会执行的。
例子:
Observable<String>.error(NSError.init(domain: "ObservableError", code: 10010, userInfo: ["reason":"unknow"]))
.subscribe { (event) in
print("订阅:",event)
}
.disposed(by: disposeBag)
(13). never()方法
public static func never() -> Observable<E> {
return NeverProducer()
}
final private class NeverProducer<Element>: Producer<Element> {
override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
return Disposables.create()
}
}
never()
方法创建一个永远不会发出 Event
,也不会终止的 Observable
序列。就是返回一个无终止的观察者事件序列,可以用来表示无限持续时间。
例子:
let neverSequence = Observable<Int>.never()
_ = neverSequence.subscribe { event in
print(event)
}
2. Observable的订阅
创建完了 Observable
,还要使用 subscribe()
方法来订阅它,并接收它发出的 Event
。
(1). Subscribes event handler
public func subscribe(_ on: @escaping (Event<E>) -> Void)
-> Disposable {
}
这种subscribe()
方法订阅了一个 Observable
对象,该方法的 block
的回调参数就是被发出的 event
事件.
例子:
let observable = Observable.of(1, 2, 3)
observable.subscribe { event in
print(event)
print(event.element)
}
这里可以看到初始化 Observable
序列时设置的默认值都按顺序通过 .next
事件发送出来,等到数据都发送完毕,它还会自动发一个 .completed
事件出来。
(2). Subscribes element handler
public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
}
这种subscribe()
预订方法,它可以把 event
进行分类:.next
,.completed
,.error
以处理不同类型的event
,同时会把 event
携带的数据直接解包出来作为参数.
例子:
let observable = Observable.of("A", "B", "C")
observable.subscribe(onNext: { element in
print(element)
}, onError: { error in
print(error)
}, onCompleted: {
print("completed")
}, onDisposed: {
print("disposed")
})
onNext
、onError
、onCompleted
和 onDisposed
这四个回调参数都是有默认值的,即它们都是可选的。所以也可以只处理 onNext
而不管其他的情况。
3. Observable的事件生命周期
public func `do`(onNext: ((E) throws -> Void)? = nil, onError: ((Swift.Error) throws -> Void)? = nil, onCompleted: (() throws -> Void)? = nil, onSubscribe: (() -> Void)? = nil, onSubscribed: (() -> Void)? = nil, onDispose: (() -> Void)? = nil)
-> Observable<E> {
return Do(source: self.asObservable(), eventHandler: { e in
switch e {
case .next(let element):
try onNext?(element)
case .error(let e):
try onError?(e)
case .completed:
try onCompleted?()
}
}, onSubscribe: onSubscribe, onSubscribed: onSubscribed, onDispose: onDispose)
}
当 Observable
的某些事件产生时,你可以使用 do()
操作符来注册一些回调操作。这些回调会被单独调用,它们会和 Observable
原本的回调分离。
可以使用 do()
方法来监听事件的生命周期,它会在每一次事件发送前被调用,同时它和 subscribe()
一样,可以通过不同的回调处理不同类型的 event
。
例子:
let observable = Observable.of("A", "B", "C")
observable.do(onNext: { element in
print("Next:", element)
}, onError: { error in
print("Error:", error)
}, onCompleted: {
print("Completed")
}, onDispose: {
print("Disposed")
})
observable.subscribe(onNext: { element in
print(element)
}, onError: { error in
print(error)
}, onCompleted: {
print("completed")
}, onDisposed: {
print("disposed")
})
4. Observable的销毁
通过上面的这些内容,我们大概已经知道了,当一个Observable
序列被正常创建出来后它不会马上就开始被激活从而发出 event
,而是要等到它被订阅了才会激活它。而 Observable
序列激活之后要一直等到它发出了.error
或者 .completed
的 event
后,它才被终结。
(1). 显式销毁--dispose() 方法
当一个订阅结束了不再需要了,就可以调用 dispose()
方法把这个订阅给销毁掉,防止内存泄漏。订阅行为被 dispose
了,订阅将被取消,并且内部资源都会被释放,那么之后 Observable
如果再发出 event
,这个已经 dispose
的订阅就收不到消息了。通常情况下,你是不需要手动调用 dispose
方法的。
例子:
let observable = Observable.of("A", "B", "C")
let observableDispose = observable.subscribe { event in
print(event)
}
observableDispose.dispose()
(2). 隐式销毁--DisposeBag 方法
DisposeBag
创建一个对象来管理多个订阅行为的销毁,可以把一个 DisposeBag
对象看成一个回收垃圾袋,把用过的订阅行为都放进去。
而这个 DisposeBag
就会在自己快要销毁 的时候,对它里面的所有订阅行为都调用 dispose()
方法。
例子:
let disposeBag = DisposeBag()
let observable = Observable.of(1, 2, 3)
observable.subscribe { event in
print(event)
}.disposed(by: disposeBag)
总结
这篇内容简单的介绍了Observable
序列的创建,订阅,和销毁。这里创建的序列都是最基本的序列,在RxSwift
里面Observable
也存在一些特征序列,Driver
,Single
,ControlEvent
...这些特征序列可以帮助我们更准确的描述序列,让我们能够用更加优雅的方式书写代码,关于这些特征序列后面我会再学习。关于Observable
的销毁好像还有一种方法,后面研究到了会回来补充。如文中有错误还请各位指正!
最后感谢RxSwift中文文档.