RxSwift的核心流程可以简化为三个步骤:
- 创建序列
- 订阅序列
- 发送信号
// 创建序列
Observable<Int>.create { (anyObserver) -> Disposable in
// 发送信号
anyObserver.onNext(2)
return Disposables.create()
}
// 订阅序列
.subscribe(onNext: { (element) in
print("订阅到: \(element)")
})
.disposed(by: DisposeBag())
在执行这行代码得到的结果是: 订阅到: 2
, 那么在RxSwift内部是在什么时候开始发送信号(其实就是create(_ subscribe:)
中的 subscribe闭包
什么时候执行),又是什么时候订阅到结果(就是subscribe(onNext:)
中的 onNext闭包
什么时候执行)。
创建序列
因为 Observable
继承于 ObservableType
, 所以点击 create
方法可以看到 ObservableType
的扩展方法
extension ObservableType {
public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
return AnonymousObservable(subscribe)
}
}
在 create
方法中,其实是初始化了 AnonymousObservable
类的对象,而在初始化的时候保存了 SubscribeHandler
闭包。
订阅序列
-
subscribe(onNext: onError: onCompleted: onDisposed:)
方法也是ObservableType
的扩展方法,实现如下(忽略不关心的代码):
public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
let observer = AnonymousObserver<Element> { event in
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
-
在这个方法中我们关注两个方法:
- 初始化
AnonymousObserver(匿名观察者)
对象observer
,保存了EventHandler
闭包 -
self.asObservable()
调用了subscribe(observer)
- 初始化
在前面创建序列分析
create
方法时知道这里的self.asObservable()
其实就是AnonymousObservable()
对象,那么self.asObservable().subscribe(observer)
=>AnonymousObservable().subscribe(observer)
由于
AnonymousObservable
继承于Producer
, 在Producer
类中找到subscribe(observer)
的实现如下:
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// 省略部分代码
...
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
// 省略部分代码
...
return disposer
}
}
}
- 在
subscribe(_ observer:)
方法中其实调用的就是self.run(observer, ...)
,run(observer, ...)
的具体实现是在AnonymousObservable
中:
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
- 我们可以看到在这里初始化了
AnonymousObservableSink
类的对象,保存在外面创建的属于AnonymousObserver
类的observer
对象。 -
sink.run
中的实现为:
typealias Parent = AnonymousObservable<Element>
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
- 这里的
parent
就是AnonymousObservable()
对象 - 到此前面
AnonymousObservable(subscribe)
保存的那份闭包会开始执行,也就是开始发送信号。
-
捋一下整个流程为:
其中let ob = AnonymousObservable(subscribe)
let sink = AnonymousObservableSink(observer: observer, ...)
,
let observer = AnonymousObserver(event)
self.asObservable().subscribe(observer)
->ob.subscribe(observer)
->ob.run(observer, ...)
->sink.run(ob)
->ob._subscribeHandler(AnyObserver(sink))
发送信号
- 在前面执行
AnonymousObservable(subscribe)._subscribeHandler(AnyObserver(sink))
中传递AnyObserver()
对象出去。
public struct AnyObserver<Element> : ObserverType {
public typealias EventHandler = (Event<Element>) -> Void
private let observer: EventHandler
...
public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
self.observer = observer.on
}
public func on(_ event: Event<Element>) {
return self.observer(event)
}
...
}
- 从初始化方法中可以看到
AnyObserver(sink)
保存的sink.on
的闭包。 - 当执行
anyObserver.onNext(2)
时,因为AnyObserver
是ObserverType
的类型,所以会走到ObserverType
的扩展方法
extension ObserverType {
public func onNext(_ element: Element) {
self.on(.next(element))
}
...
}
- 继续走到
AnyObserver
的on(_ event:)
方法,传递.next
事件, 从上面的on(_ event:)
方法中看到它会执行之前保存的闭包,因为我们之前保存的是sink.on
闭包,所以最终会走到sink.on
方法里:
func on(_ event: Event<Element>) {
...
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
- 来到这里会找到父类的
forwardOn(_ event:)
方法:
final func forwardOn(_ event: Event<Observer.Element>) {
...
self._observer.on(event)
}
- 还记得之前在创建
AnonymousObservableSink
对象时,保存了AnonymousObserver
对象吗?self._observer.on(event)
执行AnonymousObserver
的父类ObserverBase
的on(event)
方法:
func on(_ event: Event<Element>) {
switch event {
case .next:
if load(self._isStopped) == 0 {
self.onCore(event)
}
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.onCore(event)
}
}
}
-
onCore(_ event:)
方法:
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
执行了在创建
AnonymousObserver
对象时保存的闭包。到这里我们也继续捋一下整个流程:
let observer = AnonymousObserver(_ event:)
anyObserver.onNext(2)
-> anyObserver.on(.next(value))
-> sink.on(.next(value))
-> sink.forwardOn(.next(value))
-> sink._observer.on(.next(value))
-> observer.on(.next(value))
-> observer.onCore(.next(value))
-> observer._eventHandler(.next(value))
-> onNext?(value)