本文主要分析 Observable 核心逻辑
Observable 三部曲
- 序列三部曲:序列
产生、订阅、销毁
- 但是在实际代码中,如下所示,观察序列的整个流程是
- 创建观察序列
- 订阅信号(订阅观察)
- 发送信号(发送事件)
- 销毁观察序列
//1、创建序列
let ob = Observable<Any>.create { observer **in**
//3、发送信号
observer.onNext("发送信号")
//error 和 complete 二选一
//observer.onError(NSError.init(domain: "myError", code: 10010, userInfo: nil))
observer.onCompleted()
return Disposables.create()
}
//2、订阅信号
ob.subscribe { text **in**
print("订阅到了: \(text)")
} onError: { error **in**
print("error \(error)")
} onCompleted: {
print("完成")
} onDisposed: {
print("销毁")
}
.disposed(by: disposeBag)
//--------打印---------
订阅到了: 发送信号
完成
销毁
RxSwift的响应式核心逻辑类似于iOS中的UI事件响应
- UIControl类调用addTarget响应#selector定义的事件
- RxSwift 将响应事件留在 rx内部类进行处理
- 在内部类由订阅中心执行onNext发送消息
- 消息传送到subscribe接收
这里需要重点关注的是
- 1、序列的创建方法 create() 中的闭包是什么时候执行的?
- 2、订阅信号 subscribe() 中的闭包是什么时候执行的?
- 3、信号从发送到接收的流程是怎样的?
下面我们带着这些问题来一一探索
RxSwift 核心逻辑分析
主要分三步进行分析
- 创建序列 create()
- 订阅信号 subscribe()
- 发送信号 onNext()
创建序列 create()
-
进入create 函数源码,其中是创建一个
AnonymousObservable
对象,AnonymousObservable 是匿名可观察序列
-
进入 AnonymousObservable 类,这个类主要用来存储产生事件的闭包(
self.subscribeHandler = subscribeHandler
)和 激活事件闭包的入口函数run()
-
查看 AnonymousObservable的继承链:
AnonymousObservable ---> Producer 生产者 --> Observable --> 遵循 ObservableType 协议 --> 遵循 ObservableConvertibleType 协议
- ObservableConvertibleType 是一个协议,有一个关联类型(用于定义任意类型的序列,即创建序列时传入的
类型Any
),和函数 asObservable() (可将非序列对象转化为序列对象)
-
ObservableType 是一个协议,提供一个基本功能:订阅信号
-
Observable 是可观察序列,有两个函数,分别是 subscribe方法(无具体实现)、asObservable函数(返回一个 Observable 序列对象)
-
Producer 生产者类(工厂设计模式),有两个函数:subscribe(订阅的具体实现,后面会讲解) 和 run(无具体实现)
- ObservableConvertibleType 是一个协议,有一个关联类型(用于定义任意类型的序列,即创建序列时传入的
****create() 创建序列总结**
从create()的源码中,可以看出创建序列的过程中,主要是保存了一个产生事件的闭包
subscribeHandler
-
AnonymousObservable 继承链:
AnonymousObservable ---> Producer 生产者 --> Observable --> 遵循 ObservableType 协议 --> 遵循 ObservableConvertibleType 协议
- ObservableConvertibleType:关联类型 Element + 函数 asObservable()
- ObservableType:函数 subscribe()
- Observable(可观察序列):函数 subscribe() + 函数 asObservable()
- Producer(生产者):函数 subscribe() + 函数 run()
- AnonymousObservable(匿名可观察序列):产生事件闭包 subscribeHandler + 函数 run()
-
create() 创建序列流程
- 创建
AnonymousObservable
匿名可观察序列A
, -
A
保存发送信号
的闭包,即产生事件的闭包(subscribeHandler)
- 创建
订阅序列 subscribe()
-
进入 subscribe() 源码
- 创建 AnonymousObserver 匿名订阅者(观察者)对象
- 将订阅回调闭包保存在 AnonymousObserver 的属性
eventHandler
中 - 关键代码是
self.asObservable().subscribe(observer)
,其中 asObservable 是返回一个序列对象本身,然后调用subscribe
将 AnonymousObserver匿名观察者对象传递过去
-
为了确认 self.asObservable() 的类型,可以通过断点调试确认为 ,这里是一个 AnonymousObservable 匿名可观察序列对象
-
查找
AnonymousObservable
类的 subscribe 方法,但是这个类中并没有这个函数,这个类的继承链为:AnonymousObservable --> ObserverBase --> 遵循 Disposable、ObserverType 协议
-
Disposable 是一个销毁协议,其中只有一个方法 dispose() 销毁方法
-
ObserverType 是一个观察者协议,有关联类型、on方法
-
ObserverBase 观察者基类,具备发送响应和销毁的功能
-
AnonymousObserver 匿名观察者类,主要用来保存事件响应闭包,以及发送响应、销毁等功能
-
-
从
AnonymousObservable
的父类Producer
中查找,其源码如下- 从源码中发现,
subscribe
中调用了AnonymousObservable
的run
函数,将 observer 订阅者作为参数传了进来
- 从源码中发现,
-
进入
AnonymousObservable
类的run
实现- 首先创建了一个
AnonymousObservableSink
对象,即管道对象,并持有 observer(订阅者/观察者对象)、cancel(销毁者对象) - 然后调用 AnonymousObservableSink 的 run函数,将 self(AnonymousObservable 匿名观察序列对象)作为参数传递过去
-
AnonymousObservableSink
类的主要作用就是链接可观察者Observable
与观察者Observer
,实现事件的传递,起到一个桥梁/通道
的作用
- 首先创建了一个
-
进入
AnonymousObservableSink
类的run
实现- 这里的parent 就是传入的self,即 AnonymousObservable
- subscribeHandler 是create函数产生的闭包,到这里我们解决了第一个疑问:
create()的闭包什么时候执行
- run中的
self
是AnonymousObservableSink
对象,简单理解就是将 AnonymousObservableSink对象 转换为 AnyObserver对象
,那具体是如何转换的呢?
-
进入
AnyObserver
的源码,AnyObserver是一个结构体,查看其第二个init
方法,将AnonymousObservableSink 的on函数
赋值给了AnyObserver 的observer属性
,所以self.observer 是一个函数 即on函数
。所以AnonymousObservableSink
中 run 函数中的AnyObserver(self)
其实就是 create闭包中的参数observer
- 由于
AnonymousObservable
是匿名类,仅内部使用,不暴露给外部,所以需要通过 AnyOberver 来调用 onNext等 -
AnyOberver
类遵循ObserverType
协议,所以可以调用 onNext函数,此时 onNext函数 实现中的 on 就是 【AnyObserver - observer - on 函数块】中通过 observer保存的函数块
- 这里就解释了 为什么发送信号的代码 observer.onNext("发送信号") 最终会触发 AnonymousObservableSink.on事件了
- 由于
-
进入
AnonymousObservableSink
的on
函数,将event分解成不同的事件
-
查找
AnonymousObservableSink
类的forwardOn
函数实现,子类未找到,在其父类 Sink中找到了,源码如下- 这里的
observer
是 外部调用 subscribe() 函数,其内部生成的AnonymousObserver 匿名观察者对象
- 这里的
-
查找
AnonymousObserver
的 on 函数,未找到,在其父类ObserverBase
中查找,实现如下-
内部调用了 onCore 函数,父类中是抽象方法,没有具体实现
- 查找
AnonymousObserver
的onCore
函数实现,这里的 eventHandler 就是 调用subscribe 函数传入的闭包,到此就可以理解第三个问题 :subscribe 的闭包是什么时候调用的了。同时也将 创建的闭包和订阅的闭包进行了关联。
-
**subscribe() 订阅信号(订阅序列)总结
-
AnonymousObserver 继承链:
AnonymousObservable --> ObserverBase --> 遵循 Disposable、ObserverType 协议
- Disposable:函数 dispose()
- ObserverType:关联类型 Element + 函数 on()
- ObserverBase(观察者基类):函数 on() + 函数 onCore() + 函数 dispose()
- AnonymousObservable(匿名观察者):事件回调闭包 eventHandler + 函数 onCore()
-
subscribe() 订阅信号(订阅序列)流程
- 创建
AnonymousObserver
匿名观察者B
,保存订阅信号
的闭包,即事件回调闭包(eventHandler) -
A
对象调用函数asObservable()
转换为序列对象,序列调用函数 subscribe,将B
传递过去,即A.asObservable().subscribe(B)
等价于A.subscribe(B)
- 查找函数
subscribe()
实现,AnonymousObservable类中没有,从其父类 Producer 中找到,其中调用了AnonymousObservable
的run()
函数,将B
作为参数传递,即A.run(B)
- 进入函数
run()
的源码,创建了一个AnonymousObservableSink
通道对象,并持有 observer(观察者)、cancel(销毁者),同时调用了 AnonymousObservableSink 的函数 run(),即AnonymousObservableSink.run(A)
- 进入 AnonymousObservableSink类的函数 run() 源码,
A
中subscribeHandler
闭包回调,并传入一个 AnyObserver 对象,即A.subscribeHandler(sink)
- 这里是将
AnonymousObservableSink
转换为 AnyObserver 对象,并将 sink 的函数 on() 赋值给了AnyObserver
的属性observer(是一个 on 函数)
,AnyObserver(self)
等价于 create()闭包中的参数 ,即AnyObserver.observer = AnonymousObservableSink.on = create的闭包参数 observer
- 这里是将
- 进入
AnonymousObservableSink
的函数on()
的实现,这里将 event 分解成了不同的事件,并调用 AnonymousObservableSink 的函数forwardOn()
,即AnonymousObservableSink.on() --> AnonymousObservableSink.forwardOn()
- 进入
AnonymousObservableSink
的函数forwardOn()
实现,其具体实现在父类Sink
中,调用了AnonymousObserver
的函数on()
,即AnonymousObservableSink.forwardOn() --> AnonymousObserver.on()
- 进入
AnonymousObserver
的函数on()
,其具体实现在父类ObserverBase
中,调用了AnonymousObserver
的函数onCore()
,即AnonymousObserver.on() --> AnonymousObserver.onCore()
- 进入
AnonymousObserver
的函数onCore()
的实现,B
中eventHandler
闭包回调,即B.eventHandler(event)
- 创建
发送信号 onNext()
- 会走到创建
B
的回调闭包中发送信号
- 然后接着走函数
subscribe()
订阅信号的流程
发送信号 onNext()
- 会走到创建
B
的回调闭包中发送信号 - 然后接着走函数
subscribe()
订阅信号的流程
总结
创建 - 订阅 - 发送信号整体流程图
其核心逻辑思维导图
-
create() 创建序列
- 创建
AnonymousObservable
匿名可观察序列A
, - 保存
发送信号
的闭包,即A.subscribeHandler = subscribeHandler
- 创建
-
subscribe() 订阅信号(订阅序列)
- 创建
AnonymousObserver
匿名观察者B
, - 保存
订阅信号
的闭包,即B.eventHandler = eventHandler
-
A
对象调用函数asObservable()
转换为序列对象,序列调用函数 subscribe,将B
传递过去,即A.asObservable().subscribe(B)
等价于A.subscribe(B)
- 查找函数
subscribe()
实现,AnonymousObservable类中没有,从其父类 Producer 中找到,其中调用了AnonymousObservable
的run()
函数,将B
作为参数传递,即A.run(B)
- 进入函数
run()
的源码,创建了一个AnonymousObservableSink
通道对象,并持有 observer(观察者)、cancel(销毁者),同时调用了 AnonymousObservableSink 的函数 run(),即AnonymousObservableSink.run(A)
- 进入 AnonymousObservableSink类的函数 run() 源码,
A
中subscribeHandler
闭包回调,并传入一个 AnyObserver 对象,即A.subscribeHandler(sink)
- 进入
AnonymousObservableSink
的函数on()
的实现,这里将 event 分解成了不同的事件,并调用 AnonymousObservableSink 的函数forwardOn()
,即AnonymousObservableSink.on() --> AnonymousObservableSink.forwardOn()
- 进入
AnonymousObservableSink
的函数forwardOn()
实现,其具体实现在父类Sink
中,调用了AnonymousObserver
的函数on()
,即AnonymousObservableSink.forwardOn() --> AnonymousObserver.on()
- 进入
AnonymousObserver
的函数on()
,其具体实现在父类ObserverBase
中,调用了AnonymousObserver
的函数onCore()
,即AnonymousObserver.on() --> AnonymousObserver.onCore()
- 进入
AnonymousObserver
的函数onCore()
的实现,B
中eventHandler
闭包回调,即B.eventHandler(event)
- 创建
-
发送信号
onNext()
- 会走到创建
B
的回调闭包中发送信号 - 然后接着走函数
subscribe()
订阅信号的流程
- 会走到创建
继承链
-
AnonymousObservable 继承链:`AnonymousObservable ---> Producer 生产者 --> Observable --> 遵循 ObservableType 协议 --> 遵循 ObservableConvertibleType 协议
-
AnonymousObserver 继承链:
AnonymousObservable --> ObserverBase --> 遵循 Disposable、ObserverType 协议