一、Observable
二、Observer
三、Subject
四、Operator
五、Disposable
六、Schedulers
七、Error Handling
凡事物皆序列
一、Observable
Observable
为可监听序列,用于描述一个对象所生产的序列,任何序列都可以用Observable
来标识。
万事万物都为序列,每个个体都是观察者,观察周围事物的变化。
页面开发中的序列
-
scrollview页面滑动
开发中,会遇到scrollview
滑动监听位置的需求,在某某位置,修改导航透明度。那么位置变化所产生的一系列的位置参数就构成了一个序列。如下图:
-
定时器序列
用户注册信息,为验证用户信息,需要触发获取验证码按钮,一般60
秒内只能获取一次短信,因此在发送请求后60
秒内,按钮应该处于失能状态,这时就需要一个60
秒倒计时定时器,60
秒后恢复按钮。如下图:
以上都是对某一个点的监听,除以上还有button点击,还有textField的编辑,手势的监听,switch的状态等等这些事件都可以抽象为对序列的监听。通常对于监听有着对应的触发方法,或代理。在RxSwift
中呢,则是对这些事件做了封装,通过observer
来监听事件的到来。
Observable
是对这些不同类型序列的统一管理,看一下具体是一个什么类:
public class Observable<Element> : ObservableType {
/// Type of elements in sequence.
public typealias E = Element
……
}
是一个继承自ObservableType
的一个类,先不管ObservableType
这个类,重点是Observable
是一个泛型类,好像是明白了些什么,泛型就是不指定类型,传入什么类型,Observable
就去找对应类型,如button
,textview
等等对应的一些序列,方法公用,统一管理各种类型序列。
序列可表示的不同类型如下:
Observable<Int>
Observable<String>
Observable<JSON>
Observable<Void>
……
创建一个序列并监听序列消息发送:
//1、创建一个序列
let ob = Observable<Int>.create { observer in
//3、发送一个消息
observer.onNext(1)
observer.onNext(2)
return Disposables.create()
}
//2、监听序列消息
ob.subscribe(onNext: { (val) in
print(val)
}).disposed(by: disposeBag)
对于Observable
还存在一些特征序列就不一一介绍。RxSwift核心逻辑
二、Observer
Observer
观察者,观察一系列事件的产生,根据需要做出必要的操作,如同button
的点击事件,textfield
的代理方法等等,实现后便是一个观察者,而在Rx
中则被抽象出来为一个观察者Observer
。如下图:
下面看一下观察者是怎么抽象出来的,如下代码:
KVO:
self.person.rx.observeWeakly(String.self, "name").subscribe(onNext: { (value) in
print(value as Any)
}).disposed(by: disposeBag)
UIButton:
button.rx.tap.subscribe(onNext: { () in
print("被点击了")
//处理事件
}).disposed(by: disposeBag)
UITextField:
textfield.rx.text.orEmpty.changed.subscribe(onNext: { (text) in
print(text)
}).disposed(by: disposeBag)
通过以上代码观察到,KVO
的通知,按钮的点击及文本框的输入,均回调到subscribe
的闭包中(参数闭包)。点击进入这个事件(可以通过cmd+点击进入查看)可以找到如下:
public enum Event<Element> {
/// Next element is produced.
case next(Element)
/// Sequence terminated with an error.
case error(Swift.Error)
/// Sequence completed successfully.
case completed
}
类型 | 说明 |
---|---|
next | 是产生一个新的元素,参数为任意类型 |
error | 创建序列所产生的错误,如网络请求失败,终止序列 |
completed | 所有序列都已经发送完成 |
是一个能够接收任意类型的枚举,说明是所有事件回调的媒介。
这里的观察者和订阅写在一起,下面再看看其他的写法,AnyObserver、Binder
。
1、AnyObserver
表示任意观察者。如网络请求:
let url = URL.init(string: "https://www.baidu.com")
URLSession.shared.rx.response(request: URLRequest.init(url: url!))
.subscribe(onNext: { (data) in
print(data)
}, onError: { (error) in
print(error)
}, onCompleted: {
print("请求完成")
}).disposed(by: disposeBag)
等价于:
let observer: AnyObserver<(response: HTTPURLResponse, data: Data)> = AnyObserver { (event) in
switch event {
case .next(let data):
print(data)
case .error(let error):
print("Data Task Error: \(error)")
default:
break
}
}
let url = URL.init(string: "https://www.baidu.com")
URLSession.shared.rx.response(request: URLRequest.init(url: url!))
.subscribe(observer).disposed(by: disposeBag)
将观察者和订阅方法分离。除此之外所有的观察者都可以被抽离。
2、Binder
有两个特征:
- 不会处理错误事件
- 确保绑定都是在给定Scheduler上执行(默认在MainScheduler)
主要在UI绑定上使用,如textfield
绑定到label
,由输入产生时间,根据设定条件设置显隐。
文本框验证:
let passwordValid = pwdTf.rx.text.orEmpty
.map{$0.count>6}
.share(replay: 1, scope: .whileConnected)
passwordValid.bind(to: pwdLabel.rx.isHidden).disposed(by: disposeBag)
内部设置如下:
extension Reactive where Base: UIView {
public var isHidden: Binder<Bool> {
return Binder(self.base) { view, hidden in
view.isHidden = hidden
}
}
}
所有继承自UIView
子控件的绑定显隐都通过该扩展实现的。
按钮点击绑定:
//联合绑定
Observable.combineLatest(nameValid,pwdValid){
$0 && $1
}.bind(to: button.rx.isEnabled)
.disposed(by: disposeBag)
内部设置如下:
extension Reactive where Base: UIControl {
public var isEnabled: Binder<Bool> {
return Binder(self.base) { control, value in
control.isEnabled = value
}
}
}
label文本绑定:
//监听输入变化
textfield.rx.text.orEmpty.changed.subscribe(onNext: { (text) in
print(text)
}).disposed(by: disposeBag)
//绑定,数据传递
textfield.rx.text.bind(to: label.rx.text)
内部设置如下:
extension Reactive where Base: UILabel {
public var text: Binder<String?> {
return Binder(self.base) { label, text in
label.text = text
}
}
}
通过以上方法,我们也可以扩展方法来实现一个自定义的观察者。
三、Subject
Observable & Observer
即是可监听序列也是观察者,如文本框输入监听,text
即作为序列输出,text绑定作为观察者。switch、slider、segment
等都具有此特性,观察值的变化并给被绑定对象的text
赋值。此外框架还定义了其他辅助类,可以帮助开发者准确的描述事务特征。
1、AsyncSubject
使用结合.onCompleted()
来使用,无论有多少序列(每个元素是一个事件),只发送由源产生的最后一个事件。事件由onCompleted和onError
的调用而终止,否则发送的方法不会触发,不会再产生新的事件。代码如下:
let sub = AsyncSubject<Any>()
sub.subscribe(onNext: { (val) in
print(val)
}, onCompleted: {
print("完成")
}).disposed(by: disposeBag)
sub.onNext("1")
sub.onCompleted()
sub.onNext("2")
sub.onCompleted()
打印:
value:1
完成
完成后订阅立即取消,后边的消息发送均无观察者接收。
2、PublishSubject
只有先订阅成为观察者,才能收到发送的消息,多次订阅,一个消息会发送给多个观察者,如发送onCompleted
和onError
将终止所有事件的产生和发送。代码如下:
let sub = PublishSubject<Any>()
sub.subscribe{
print("channal1:\($0)")
}.disposed(by: disposeBag)
sub.onNext("hibo")
sub.onNext("luck")
sub.subscribe{
print("channal2:\($0)")
}.disposed(by: disposeBag)
sub.onNext("coder")
sub.onNext("sony")
打印:
channal1:next(hibo)
channal1:next(luck)
channal1:next(coder)
channal2:next(coder)
channal1:next(sony)
channal2:next(sony)
3、ReplaySubject
无论何时订阅ReplaySubject
,存储最后n次onNext发送出的元素,由bufferSize决定存储新值的个数。当有新的订阅者订阅时直接发送存储的最新的元素。代码如下:
let sub = ReplaySubject<Any>.create(bufferSize: 1)
sub.subscribe{
print("channal1:\($0)")
}.disposed(by: disposeBag)
sub.onNext("hibo")
sub.onNext("luck")
sub.subscribe{
print("channal2:\($0)")
}.disposed(by: disposeBag)
sub.onNext("coder")
sub.onNext("sony")
打印:
channal1:next(hibo)
channal1:next(luck)
channal2:next(luck) 此处为订阅后立即收到的消息
channal1:next(coder)
channal2:next(coder)
channal1:next(sony)
channal2:next(sony)
4、BehaviorSubject
通过一个初始值来创建,每次订阅BehaviorSubject
后,会接收到subject
发出的Event
,传递值为初始值。代码如下:
let sub = BehaviorSubject(value: "init_value")
sub.subscribe{
print("channal1:\($0)")
}.disposed(by: disposeBag)
sub.onNext("hibo")
sub.onNext("luck")
sub.subscribe{
print("channal2:\($0)")
}.disposed(by: disposeBag)
sub.onNext(“coder")
sub.onNext("sony")
打印:
channal1:next(init_value) 订阅后即发送的消息
channal1:next(hibo) 调用onNext发送
channal1:next(luck)
channal2:next(luck) 订阅后将发送最后一次发送的消息
channal1:next(coder)
channal2:next(coder)
channal1:next(sony)
channal2:next(sony)
5、ControlProperty
用于描述UI控件属性。具有以下特征:
- 不会产生error事件
- 在主线程(MainScheduler)订阅
- 在主线程监听
- 共享状态变化
最典型的代码实列及,文本框输入绑定,textfield
触发修改label
的text
值。如下:
//监听输入变化
textfield.rx.text.orEmpty.changed.subscribe(onNext: { (text) in
print(text)
}).disposed(by: disposeBag)
//绑定,数据传递
textfield.rx.text.bind(to: label.rx.text)
四、Operator
操作符,可以产生新的序列,或改变组合原有序列生成新序列,也就是提供了额外处理序列的功能,使得功能性更强。如以上所提的.map
、.combineLatest
等操作符,完成对序列的筛选,和验证的联合绑定。如下代码:
let nameValid = textfield.rx.text.orEmpty
.map{$0.count>3}
.share(replay: 1, scope: .whileConnected)
nameValid.bind(to: nameLabel.rx.isHidden).disposed(by: disposeBag)
let pwdValid = pwdTf.rx.text.orEmpty
.map{$0.count>6}
.share(replay: 1, scope: .whileConnected)
pwdValid.bind(to: pwdLabel.rx.isHidden).disposed(by: disposeBag)
//联合绑定-需要序列成对出现
Observable.combineLatest(nameValid,pwdValid){
$0 && $1
}.bind(to: button.rx.isEnabled).disposed(by: disposeBag)
除此之外还很很多类似功能的操作符,该操作符是对ObservableType
的扩展,这里只做简单介绍,可以点击进入查看。
五、Disposable
可被清除的资源。用来取消订阅,并清除订阅过程中的内部资源。清除方法直接调用dispose()
方法订阅将被取消,再发送消息,该观察者已不存在。发送onCompleted
和onError
消息,订阅也会被取消,并清除内部资源。
let sub = PublishSubject<Any>()
let dispose = sub.subscribe(onNext: { (val) in
print("value:\(val)")
})
sub.onNext("11111")
dispose.dispose() //此处调用onCompleted、onError作用相同
sub.onNext("22222")
打印:11111
DisposeBag
清除包取消订阅,清除内部资源。与dispose
不同之处,dispose
是手动清除调用即清除,DisposeBag
对所有订阅者做打包处理,在当前视图被释放,DisposeBag
也被释放,其管理的订阅者同时都被释放掉。所以使用DisposeBag
即可,在每一个订阅者后调用该方法,就不用考虑什么时候需要取消订阅。当然如果能够准确控制订阅的取消,更好。如下代码:
let sub = PublishSubject<Any>()
sub.subscribe(onNext: { (val) in
print("value:\(val)")
}).disposed(by: disposeBag)
sub.onNext("11111")
sub.onNext("22222")
如果知道什么时候结束我们可以直接调用onCompleted
方法而disposed
就不需要了。
六、Schedulers
调度器是Rx
的多线程核心模块,决定任务在哪个线程的哪个队列中执行。常规对GCD
的使用如下:
DispatchQueue.global(qos: .userInitiated).async {
var total = 0
for index in 1...10{
total += index
}
DispatchQueue.main.async {
print(total)
}
}
在全局队列中对一组数据做异步运算,结束之后返回主队列打印。Rx
对GCD
的实现:
let obj = Observable<Int>.create { (oberver) -> Disposable in
var total = 0
for index in 1...10{
total += index
}
oberver.onNext(total)
return Disposables.create()
}
obj .subscribeOn(ConcurrentDispatchQueueScheduler(qos: .userInitiated))//选择一个并行队列
.observeOn(MainScheduler.instance)//设置观察者在主线程上
.subscribe(onNext: { (val) in
print(val)
}).disposed(by: disposeBag)
这样编码,感觉代码量变的很大,但是Rx
让代码结构变的更清晰,并且统一了对事件的观察,统一思想快速开发。
线程在Rx
中的表示线程的类:
类型 | 说明 |
---|---|
MainScheduler | 主线程类 |
SerialDispatchQueueScheduler | 串行队列类 |
ConcurrentDispatchQueueScheduler | 并行队列类 |
OperationQueueScheduler | 具备NSOperationQueue的特点 |
七、Error Handling
错误处理。序列中也是数据处理,会有逻辑错误。RxSwift
提供了两种错误处理机制
- retry 重试
- catch 恢复
retry 示例如下:
let url = URL.init(string: "https://www.baidu.com")
URLSession.shared.rx.response(request: URLRequest.init(url: url!))
.retry(3)
.subscribe(onNext: { (data) in
print(data)
}, onError: { (error) in
print(error)
}, onCompleted: {
print("请求完成")
}).disposed(by: disposeBag)
断开网络测试打印显示发送3条请求,信息太长这里就不粘贴输出结果了。如果再请求失败后需要等待几秒种在做请求可以使用retryWhen
,示例:
let url = URL.init(string: "https://www.baidu.com")
URLSession.shared.rx.response(request: URLRequest.init(url: url!))
.retryWhen{ (error: Observable<Error>) -> Observable<Int> in
return Observable.timer(3, scheduler: MainScheduler.instance)
}
.subscribe(onNext: { (data) in
print(data)
}, onError: { (error) in
print(error)
}, onCompleted: {
print("请求完成")
}).disposed(by: disposeBag)
运行观察,失败后5秒重新发起一次请求。还可以设置请求次数,当然不是结合.retry
使用,示例如下:
let url = URL.init(string: "https://www.baidu.com")
URLSession.shared.rx.response(request: URLRequest.init(url: url!))
.retryWhen{ (error: Observable<Error>) -> Observable<Int> in
return error.enumerated().flatMap({ (index,error) -> Observable<Int> in
guard index<3 else{
return Observable.error(error)
}
return Observable<Int>.timer(3, scheduler: MainScheduler.instance)
})
}
.subscribe(onNext: { (data) in
print(data)
}, onError: { (error) in
print(error)
}, onCompleted: {
print("请求完成")
}).disposed(by: disposeBag)
catch 示例如下:
let url = URL.init(string: "https://www.baidu.com")
//预处理信息
let data = Observable<(response: HTTPURLResponse, data: Data)>.create { (observer) -> Disposable in
observer.onNext((HTTPURLResponse.init(),Data.init()))
return Disposables.create()
}
URLSession.shared.rx.response(request: URLRequest.init(url: url!))
.catchError({ (error) -> Observable<(response: HTTPURLResponse, data: Data)> in
//返回预处理数据,防止执行onError 返回的数据走到 onNext下
return data
})
.subscribe(onNext: { (data) in
//成功失败都会走到此处
print("data:\(data)")
}, onError: { (error) in
print(error)
}, onCompleted: {
print("请求完成")
}).disposed(by: disposeBag)
可以使得错误信息发送至onNext
下。如果需要一个备用值可使用catchErrorJustReturn
,示例如下:
URLSession.shared.rx.response(request: URLRequest.init(url: url!))
.catchErrorJustReturn((HTTPURLResponse.init(), Data.init()))
.subscribe(onNext: { (data) in
//成功失败都会走到此处
print("data:\(data)")
}, onError: { (error) in
print(error)
}, onCompleted: {
print("请求完成”) //会执行onCompleted
}).disposed(by: disposeBag)
通过以上对Rx
的了解,总结如图:
参考:RxSwift中文文档