就这么件简单大的一句话, 调用了好多个类、好多个方法,根本和它娇小的身躯根本不成正比呀。
_ = Observable.just("1").subscribe(onNext: { (str) in
print(str)
})
Just 代码调用逻辑是什么?
一、从 Observable 说起
- Observable 是遵循了 ObservableType 协议的类,而 ObervableType 总结的来说实现的是 subscribe 协议。
- .just("1"),其实是一个 ObervableType 协议的扩展方法, 并不是惯性以为的 Just 类的方法。
/**
Returns an observable sequence that contains a single element.
- seealso: [just operator on reactivex.io](http://reactivex.io/documentation/operators/just.html)
- parameter element: Single element in the resulting observable sequence.
- returns: An observable sequence containing the single specified element.
*/
public static func just(_ element: Element) -> Observable<Element> {
return Just(element: element)
}
- 在 just 方法中调用的
Just(element: element)
, 才是进入 正经的 Just。 当然进去 Just 最先干的还是 init,Just -> Producer -> Observable 依次调用init 方法,但是 只有 Just 自己的init 干了正事, 把 element 存了起来。
// Just
init(element: Element) {
self._element = element
}
// Producer
override init() {
super.init()
}
// Observable
init() {
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
}
二、 接下来才是各种花里胡哨方法跳转:
问:首先来想想 subscribe 是谁实现的方法?
答:因为just("1") 毕竟返回的是 Just 类呀!所以他后面的 subscribe 是Just 的。如果你和我想的一样,恭喜你 你和我入坑的姿势是一样的,嘿嘿 真爽。
- subscribe 是 ObservableType 协议的 extension 实现!!!,但是为啥 Just 对象能调用呢? 因为 Just 是继承 Producer , 而 Producer 则是继承了 Observable ,最后就是 Observable 是遵守了 ObservableType 协议 所以 Just 的 对象能调用 subscribe ...... 方法。
2.1、敲重点, 这个 subscribe 是灵魂。
/**
Subscribes an element handler, an error handler, a completion handler and disposed handler to an observable sequence.
- parameter onNext: Action to invoke for each element in the observable sequence.
- parameter onError: Action to invoke upon errored termination of the observable sequence.
- parameter onCompleted: Action to invoke upon graceful termination of the observable sequence.
- parameter onDisposed: Action to invoke upon any type of termination of sequence (if the sequence has
gracefully completed, errored, or if the generation is canceled by disposing subscription).
- returns: Subscription object used to unsubscribe from the observable sequence.
*/
public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
let disposable: Disposable
if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
}
else {
disposable = Disposables.create()
}
#if DEBUG
let synchronizationTracker = SynchronizationTracker()
#endif
let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
let observer = AnonymousObserver<Element> { event in
#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
Subscriber 执行到
AnonymousObserver
类的时候 ,就会发现 她是一个 继承自 observerBase 的类, 而且也没什么方法,也很简单的类。其实这是错觉,它才是我们能回调的 起 承上启下的作用的类。首先在
AnonymousObserver
初始化时会把 闭包 保存成 属性(self._eventHandler(event)),并在onCore
方法 通过闭包(self._eventHandler(event))进行回调 。
final class AnonymousObserver<Element>: ObserverBase<Element> {
typealias EventHandler = (Event<Element>) -> Void
private let _eventHandler : EventHandler
init(_ eventHandler: @escaping EventHandler) {
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
self._eventHandler = eventHandler
}
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
#if TRACE_RESOURCES
deinit {
_ = Resources.decrementTotal()
}
#endif
}
2.2、继续向下执行到 return Disposables.create(self.asObservable().subscribe(observer),disposable)
-
self.asObservable()
这个方法进入的是 Observable (因为 Just 并没有实现 asObservable 所以进入的是 父类的方法),
public func asObservable() -> Observable<Element> {
return self
}
-
subscribe
这个方法进入的就是Just
自己的 方法了(因为 Just 重写了),.next(self._element)
和.completed
都是enum Event
的 case (RxSwift/RxSwift/Event.swift)。 入参observer
就是AnonymousObserver
对象。
如果不知道 observer 怎么来的, 就去 RxSwift/RxSwift/ObservableType+Extensions.swift 路径下看
extension ObservableType
的subscribe
的 56 和 80 行
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
observer.on(.next(self._element))
observer.on(.completed)
return Disposables.create()
}
-
observer.on(.next(self._element))
中 observer 是AnonymousObserver
对象,但是它并没有 重新on
方法,所以进入了它的父类ObserverBase
中,这里我们就看到on
方法的 入参event
。
func on(_ event: Event<Element>) {
switch event {
case .next:
if load(self._isStopped) == 0 {
self.onCore(event)
}
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.onCore(event)
}
}
}
-
on
方法调用了onCore
方法, 因为AnonymousObserver
实现了这个方法,现在我们要回到AnonymousObserver
看onCore
的实现逻辑 。
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
-
onCore
通过开始初始化保存的闭包 进行回调并赶回了 event 参数(就是在 Just 的.next(self._element)
和.completed
),因为self._eventHandler(event)
回调 我们又回到了extension ObservableType
的subscribe(onNext ...
方法的 57 行,进行回调操作,并通过 我们的熟悉onNext
onError
onCompleted
onDisposed
回调回 最初调用地点。
let observer = AnonymousObserver<Element> { event in
#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
- 至此 小小的 just("1") 就能在闭包中收到 打印 1 了。现在我已经是老泪纵横了(太难了)。
简单的回顾一下我们都去了哪些类和协议:
-
extension ObservableType
的static func just(_ element: Element)
方法 -
Just
的init
方法 -
Producer
的init
方法 -
Observable
的init
方法 -
extension ObservableType
的public func subscribe(onNext: ((Element) -> Void)? = nil, onError ...
方法 -
AnonymousObserver
的init
方法 -
Observable
的asObservable
方法 -
Just
的subscribe<Observer: ObserverType>(_ ob
方法 -
ObserverBase
的on
方法 -
AnonymousObserver
的onCore
方法 -
extension ObservableType
的subscribe(onNext
方法的 - 订阅到信号