基本流程
- 创建序列
- 订阅序列
- 发送信号
// 1:创建序列
let observable = Observable<Any>.create { (obserber) -> Disposable in
// 3:发送信号
obserber.onNext("发送信号")
obserber.onCompleted()
return Disposables.create()
}
// 2:订阅信号
_ = observable.subscribe { (event) in
print(event)
}
带着问题去思考底层的实现
涉及到的几个主要的类的继承关系
流程分析
源码分析
啰说一句
- RxSwift中经常会用父类声明方法,子类extension重写,传入当前子类的数据
OC里没这么玩的
- 看到的subscribe不一定是当前类中的方法,也可能调的父类,父父类,父父父类
已经习惯了的当我没说
当前流程所在文件Create.swift
- create 方法的时候创建了一个内部对象 AnonymousObservable
- AnonymousObservable
(匿名序列)
保存了外界的闭包
extension ObservableType {
public func subscribe(onNext: ((E) -> Void)? = nil, ...) -> Disposable {
.
.
.
let observer = AnonymousObserver<E> { event in
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
}
中间的流程是最后的外界闭包调用还没到
- 创建了一个 AnonymousObserver
(匿名内部观察者)
保存了外界的 onNext, onError , onCompleted , onDisposed
Disposables.create()是RxSwift的自己的销毁机制,不用管,先看内部
- self.asObservable().subscribe(observer),就是前文啰说的地方,调用了父类的subscribe,调用了父类Producer的subscribe()
当前流程所在文件Producer.swift
override func subscribe(_ observer: O) -> Disposable where O.E == Element {
if !CurrentThreadScheduler.isScheduleRequired {
·
·
·
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
·
·
·
return disposer
}
}
}
CurrentThreadScheduler.instance.schedule(())
是RxSwift中的线程管理,不议关键代码
self.run(observer, cancel: disposer)
由于子类AnonymousObservable重写了run()又会回到子类重写的方法中极度恶心的结构
当前流程所在文件Create.swift
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
·
·
·
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
·
·
·
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
}
parent 是传过来的AnonymousObservable对象,AnonymousObservable._subscribeHandler()完成了对外部生成序列时代码块的调用 create和subscribe怎么关联的问题
然后去执行 发送响应,回到最外部 3:送onNext()信号
let observable = Observable<Any>.create { (obserber) -> Disposable in
// 3:发送信号
obserber.onNext("发送信号")
obserber.onCompleted()
return Disposables.create()
}
当前流程所在文件Create.swift
再次回来
final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
·
·
·
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
}
- AnyObserver(self)把AnonymousObservableSink传入
当前流程所在文件AnyObserver
public struct AnyObserver<Element> : ObserverType {
·
·
/// Construct an instance whose `on(event)` calls `observer.on(event)`
///
/// - parameter observer: Observer that receives sequence events.
public init<O : ObserverType>(_ observer: O) where O.E == Element {
self.observer = observer.on
}
·
·
}
- self.observer 保存了observer.on翻译一下
- self.observer = AnonymousObservableSink.on
当obserber.onNext("发送信号")就会找到AnyObserver父类
let observable = Observable<Any>.create { (obserber) -> Disposable in
// 3:发送信号
obserber.onNext("发送信号")
obserber.onCompleted()
return Disposables.create()
}
当前流程所在文件ObserverType.swift
public protocol ObserverType {
associatedtype E
func on(_ event: Event<E>)
}
extension ObserverType {
public func onNext(_ element: E) {
self.on(.next(element))
}
public func onCompleted() {
self.on(.completed)
}
public func onError(_ error: Swift.Error) {
self.on(.error(error))
}
}
- 前边提到的,self.observer 初始化就是AnonymousObservableSink .on()
- 最终变成了self.observer(event) -> AnonymousObservableSink .on(event)
- 再次回到AnonymousObservableSink中
当前流程所在文件Create.swift
class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
func on(_ event: Event<E>) {
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
}
- self.forwardOn(event)做事件调用
- 然而forwardOn又是AnonymousObservableSink父类Sink中的方法,
又要找父类
当前流程所在文件Sink.swift
class Sink<O : ObserverType> : Disposable {
·
·
final func forwardOn(_ event: Event<O.E>) {
if isFlagSet(self._disposed, 1) {
return
}
self._observer.on(event)
}
}
- self._observer
又双叒叕
保存的是AnonymousObserver
-又双叒叕
回到AnonymousObserver的on()
当前流程所在文件Create.swift
let observer = AnonymousObserver<E> { event in
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
- 最后,判断 event 调用 onNext?(value) ,消息发送结束,收到订阅的消息
// 2:订阅信号
_ = observable.subscribe { (event) in
print(event)
}