RxSwift特征序列之Driver
Driver 是个比较特殊的序列,它主要是对需要在 UI 上做出响应的序列进行了封装。这层封装做了三件事情:
- 在主线程监听
- 不会产生 error 事件
- 共享附加作用
没有对比就没有伤害,先看看一搬的序列在驱动 UI 时会怎么做,再回首就能体会到 Driver 的便捷之处了。
demo
准备一个模拟网络请求的函数,然后把输入框的编辑事件和网络请求的结果合并之后,订阅到的结果在 UI(lab 和 btn)上展示出来。
func dealWithData(inputText:String)-> Observable<Any>{
print("准备网络请求---\(Thread.current)") // data
return Observable<Any>.create({ (ob) -> Disposable in
if inputText == "1234" {
ob.onError(NSError.init(domain: "❌", code: 120, userInfo: nil))
}
DispatchQueue.global().async {
print("发送前的线程: \(Thread.current)")
ob.onNext("\(inputText)")
ob.onCompleted()
}
return Disposables.create()
})
}
然后开始序列的创建、订阅:
let result = self.tf.rx.text.skip(1)
.flatMap { [weak self](input) -> Observable<Any> in
return (self?.dealWithData(inputText:input ?? ""))!
}
result.map {
print("map映射lab---\($0)---\(Thread.current)")
return "长度: \(($0 as! String).count)"
}
.bind(to:self.lab.rx.text)
.disposed(by: disposeBag)
result.map {
print("map映射btn---\($0)---\(Thread.current)")
return "\($0 as! String)"
}
.bind(to:self.btn.rx.title())
.disposed(by: disposeBag)
当输入框输入 2 的时候会打印:
准备网络请求---<NSThread: 0x600003798dc0>{number = 1, name = main}
发送前的线程: <NSThread: 0x6000037fd900>{number = 4, name = (null)}
map映射lab---2---<NSThread: 0x6000037fd900>{number = 4, name = (null)}
准备网络请求---<NSThread: 0x600003798dc0>{number = 1, name = main}
发送前的线程: <NSThread: 0x6000037307c0>{number = 6, name = (null)}
map映射btn---2---<NSThread: 0x6000037307c0>{number = 6, name = (null)}
这样写会有些问题:
- 输入框每次的编辑事件都会触发两次请求,因为订阅(bind)了两次,并没有共享。
- 在子线程请求后,响应也是在子线程。
- 如果网络请求序列发出 error 事件,就会取消所有的绑定,无法发出新的请求,并抛出异常错误。
为了避免这三个问题,就得多调用几个方法:
let result = self.tf.rx.text.skip(1)
.flatMap { [weak self](input) -> Observable<Any> in
return (self?.dealWithData(inputText:input ?? ""))!
//保证了在主线程监听
.observeOn(MainScheduler())
//避免程序抛出错误异常
.catchErrorJustReturn("检测到了错误事件")
}
//共享附加作用
.share(replay: 1, scope: .whileConnected)
result.map {
print("map映射lab---\($0)---\(Thread.current)")
return "长度: \(($0 as! String).count)"
}
.bind(to:self.lab.rx.text)
.disposed(by: disposeBag)
result.map {
print("map映射btn---\($0)---\(Thread.current)")
return "\($0 as! String)"
}
.bind(to:self.btn.rx.title())
.disposed(by: disposeBag)
error 事件的打印:
准备网络请求---<NSThread: 0x600001f6e880>{number = 1, name = main}
map映射lab---检测到了错误事件---<NSThread: 0x600001f6e880>{number = 1, name = main}
map映射btn---检测到了错误事件---<NSThread: 0x600001f6e880>{number = 1, name = main}
发送前的线程: <NSThread: 0x600001f1d100>{number = 4, name = (null)}
使用 Driver
如果使用 Driver 的话,就是这个样子的:
let result = self.tf.rx.text.orEmpty
.asDriver()
.flatMap {
return self.dealWithData(inputText: $0)
.asDriver(onErrorJustReturn: "检测到了错误事件")
}
result.map {
return "长度: \(($0 as! String).count)"
}
.drive(self.lab.rx.text)
.disposed(by: disposeBag)
result.map {
return "\($0 as! String)"
}
.drive(self.btn.rx.title())
.disposed(by: disposeBag)
要使用 Driver,就要先用asDriver
把序列转换为 Driver,然后才能拥有 drive 的绑定能力。把输入框编辑事件和网络请求都转为 Driver 序列再合并后,用 drive 把订阅到的数据绑定到 UI 上。同样可以避免那三种情况,写起来更简洁。
解析
先点进去asDriver
(由于 demo 中 error 事件是在网络请求中发出的,这里主要看flatMap
中的asDriver
):
extension ObservableConvertibleType {
public func asDriver(onErrorJustReturn: Element) -> Driver<Element> {
let source = self
.asObservable()
.observeOn(DriverSharingStrategy.scheduler)
.catchErrorJustReturn(onErrorJustReturn)
return Driver(source)
}
}
到了ObservableConvertibleType
的Driver
分类文件中。里面以调用者作为源序列构建了Driver
序列,源序列在这里也是做了两个处理:1.observeOn
:在主线程监听;2.catchErrorJustReturn
:error 事件不会抛出异常。
1.主线程监听
public struct DriverSharingStrategy: SharingStrategyProtocol {
public static var scheduler: SchedulerType { return SharingScheduler.make() }
}
结构体DriverSharingStrategy
的scheduler
返回的是SharingScheduler
枚举值。默认值就是主线程。
public enum SharingScheduler {
public private(set) static var make: () -> SchedulerType = { MainScheduler() }
}
2.回避 error 事件
public func catchErrorJustReturn(_ element: Element) -> Observable<Element> {
return Catch(source: self.asObservable(), handler: { _ in Observable.just(element) })
}
传入 source 和 handler 闭包,构建了一个Catch
序列。
final private class Catch<Element>: Producer<Element> {
typealias Handler = (Swift.Error) throws -> Observable<Element>
init(source: Observable<Element>, handler: @escaping Handler) {
self._source = source
self._handler = handler
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = CatchSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
老套路了,CatchSink
的run
中,self._parent._source.subscribe(self)
订阅后的响应在CatchSink
的on
里面。
final private class CatchSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
func run() -> Disposable {
let d1 = SingleAssignmentDisposable()
self._subscription.disposable = d1
d1.setDisposable(self._parent._source.subscribe(self))
return self._subscription
}
func on(_ event: Event<Element>) {
switch event {
case .next:
self.forwardOn(event)
case .completed:
self.forwardOn(event)
self.dispose()
case .error(let error):
do {
let catchSequence = try self._parent._handler(error)
let observer = CatchSinkProxy(parent: self)
self._subscription.disposable = catchSequence.subscribe(observer)
}
catch let e {
self.forwardOn(.error(e))
self.dispose()
}
}
}
}
next 和 completed 事件都是常规操作,主要是 error 的处理,首先回调了_handler(error)
,这个_handler
就是初始化Catch
时的{ _ in Observable.just(element) }
。虽然回调时把error
传过去了,但是闭包中直接忽略了(error 被无视了),还是把外界传入的element
用来构造一个 just 序列作为返回值(just 只发出唯一的元素,就是这里的element
了)。所以,这里的catchSequence
就是个 just 序列。
然后构造了一个中间层CatchSinkProxy
,作为 just 序列的观察者,订阅后自然还是在中间层CatchSinkProxy
的on
中响应:
final private class CatchSinkProxy<Observer: ObserverType>: ObserverType {
func on(_ event: Event<Element>) {
self._parent.forwardOn(event)
switch event {
case .next:
break
case .error, .completed:
self._parent.dispose()
}
}
}
二话不说,直接让CatchSink
通过forwardOn
响应外界。回避 error 流程结束。
3. 共享附加作用
好像还少了一点:共享附加作用,继续跟着Driver
的构造函数走。
public typealias Driver<Element> = SharedSequence<DriverSharingStrategy, Element>
Driver
只是SharedSequence
的别名。看到这里也大致能猜到这个SharedSequence
就是处理共享附加作用的。
public struct SharedSequence<SharingStrategy: SharingStrategyProtocol, Element> : SharedSequenceConvertibleType {
let _source: Observable<Element>
init(_ source: Observable<Element>) {
self._source = SharingStrategy.share(source)
}
}
SharedSequence
构造函数中的SharingStrategy
像是突然蹦出来的,点击也跳不到定义的位置。从SharedSequence
的定义中看出它是个遵守SharingStrategyProtocol
协议的泛型。之前给Driver
起别名的时候,SharedSequence
中指定的是DriverSharingStrategy
。那我们点击share
是就可以选择DriverSharingStrategy.share
的位置。
public struct DriverSharingStrategy: SharingStrategyProtocol {
public static var scheduler: SchedulerType { return SharingScheduler.make() }
public static func share<Element>(_ source: Observable<Element>) -> Observable<Element> {
return source.share(replay: 1, scope: .whileConnected)
}
}
结构体DriverSharingStrategy
的share
里,又让source
调用了一个share
。然后,Driver
(也就是SharedSequence
)的_source
还是调用asDriver
的那个序列么?
看到这里,感觉很绕,外面一句代码的调用,里面缺穿行了好多层,还看不到头。但是也会发现,每一层都只做很简单的事,它们之间灵活搭配,不同的组合可以完成各种不同的功能。
继续跳进share
:
public func share(replay: Int = 0, scope: SubjectLifetimeScope = .whileConnected) -> Observable<Element> {
switch scope {
case .forever:
switch replay {
case 0: return self.multicast(PublishSubject()).refCount()
default: return self.multicast(ReplaySubject.create(bufferSize: replay)).refCount()
}
case .whileConnected:
switch replay {
case 0: return ShareWhileConnected(source: self.asObservable())
case 1: return ShareReplay1WhileConnected(source: self.asObservable())
default: return self.multicast(makeSubject: { ReplaySubject.create(bufferSize: replay) }).refCount()
}
}
}
我们只看满足条件(replay == 1,scope == whileConnected)的ShareReplay1WhileConnected(source: self.asObservable())
。其他分支跳进去又是一大坨。这里果然是构建了另一个序列ShareReplay1WhileConnected
,但source
还是那个调用asDriver
的源序列。
final private class ShareReplay1WhileConnected<Element>
: Observable<Element> {
fileprivate typealias Connection = ShareReplay1WhileConnectedConnection<Element>
fileprivate var _connection: Connection?
init(source: Observable<Element>) {
self._source = source
}
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
self._lock.lock()
let connection = self._synchronized_subscribe(observer)
let count = connection._observers.count
let disposable = connection._synchronized_subscribe(observer)
self._lock.unlock()
if count == 0 {
connection.connect()
}
return disposable
}
@inline(__always)
private func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Connection where Observer.Element == Element {
let connection: Connection
if let existingConnection = self._connection {
connection = existingConnection
}
else {
connection = ShareReplay1WhileConnectedConnection<Element>(
parent: self,
lock: self._lock)
self._connection = connection
}
return connection
}
}
ShareReplay1WhileConnected
的订阅中,调用_synchronized_subscribe
,引用了一个ShareReplay1WhileConnectedConnection
。然后获取一下观察者总数,就把ShareReplay1WhileConnected
作为ShareReplay1WhileConnectedConnection
的观察者开始同步订阅了。
fileprivate final class ShareReplay1WhileConnectedConnection<Element>
: ObserverType
, SynchronizedUnsubscribeType {
final func on(_ event: Event<Element>) {
self._lock.lock()
let observers = self._synchronized_on(event)
self._lock.unlock()
dispatch(observers, event)
}
final private func _synchronized_on(_ event: Event<Element>) -> Observers {
if self._disposed {
return Observers()
}
switch event {
case .next(let element):
self._element = element
return self._observers
case .error, .completed:
let observers = self._observers
self._synchronized_dispose()
return observers
}
}
final func connect() {
self._subscription.setDisposable(self._parent._source.subscribe(self))
}
final func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
self._lock.lock(); defer { self._lock.unlock() }
if let element = self._element {
observer.on(.next(element))
}
let disposeKey = self._observers.insert(observer.on)
return SubscriptionDisposable(owner: self, key: disposeKey)
}
}
ShareReplay1WhileConnectedConnection
的_synchronized_subscribe
中,如果_element
有值,观察者就发出 next 事件出去,然后就是observer.on
装袋了,很熟悉的模式,跟 RxSwift之Subject 中的 ReplaySubject、PublishSubject里的处理非常类似。
到此为止还没有真正的去订阅,我们回到ShareReplay1WhileConnected
的subscribe
函数里,继续下一步。初次进来,袋子里的观察者 count 必定为 0 。也会调用connection.connect()
。ShareReplay1WhileConnectedConnection
的connect
里才走了源序列的订阅subscribe
。之后的响应也和PublishSubject
中的一样多点发送,只是 Replay 的只有一个元素罢了。这样也完成了共享附加作用。