@TOC
Rxswift销毁者Dispose简介
-
先通过一张思维导图初步了解一下销毁者Dispose它拥有什么,做了一些什么事情:
- 本编文章主要是围绕上面这张图来展开,重点分析Dispose()是怎么销毁序列的。
- 从上图我们可以看出销毁者后的第一个根节点是dispose和disposeBag.那他们分别是什么呢?答案将在下面讲解。
Rxswift销毁者类和重要函数介绍
1. DisposeBag
1.1 DisposeBag是什么
RxSwift和RxCocoa还有一个额外的工具来辅助处理ARC和内存管理:即DisposeBag。这是Observer对象的一个虚拟”包”,当它们的父对象被释放时,这个虚拟包会被丢弃。
当带有DisposeBag属性的对象调用deinit()时,虚拟包将被清空,且每一个一次性(disposable)Observer会自动取消订阅它所观察的内容。这允许ARC像通常一样回收内存。
如果没有DisposeBag,会有两种结果:或者Observer会产生一个retain cycle,被无限期的绑定到被观察对象上;或者意外地被释放,导致程序崩溃。
所以要成为一个ARC的良民,记得设置Observable对象时,将它们添加到DisposeBag中。这样,它们才能被很好地清理掉。
当一个Observable(被观察者)被观察订阅后,就会产生一个Disposable实例,通过这个实例,我们就能进行资源的释放了。
对于RxSwift中资源的释放,也就是解除绑定、释放空间,有两种方法,分别是显式释放以及隐式释放:
- 显式释放
可以让我们在代码中直接调用释放方法进行资源的释放
如下面的实例:
let dispose = textField.rx_text
.bindTo(label.rx_sayHelloObserver)
dispose.dispose()
- 隐式释放
隐式释放则通过DisposeBag
来进行,它类似于Objective-C ARC中的自动释放池机制,当我们创建了某个实例后,会被添加到所在线程的自动释放池中,而自动释放池会在一个RunLoop周期后进行池子的释放与重建;DisposeBag对于RxSwift就像自动释放池一样,我们把资源添加到DisposeBag中,让资源随着DisposeBag一起释放。
如下实例:
let disposeBag = DisposeBag()
func binding() {
textField.rx_text
.bindTo(label.rx_sayHelloObserver)
.addDisposableTo(self.disposeBag)
}
上面代码中方法addDisposableTo会对DisposeBag进行弱引用,所以这个DisposeBag要被实例引用着,一般可作为实例的成员变量,当实例被销毁了,成员DisposeBag会跟着销毁,从而使得RxSwift在此实例上绑定的资源得到释放。
从上面的讲解我们大致明白了DisposeBag就像我们我们OC内存管理里的自动释放池。他充当了一个垃圾回收袋的角色,你只需把序列加入了disposeBag,disposeBag就会在合适的时候帮我们释放资源,那么它是怎么做到的呢?
1.2 DisposeBag的实现源码分析
1.2.1. 先看一下类图:
1.2.2. 具体分析源码流程
- 当我们调用disposed()方法的时候,会调用Dispose类的insert()方法,将销毁者dispose加入的_disposables数组中。
具体源码如下:
public final class DisposeBag: DisposeBase {
private var _lock = SpinLock()
// state
fileprivate var _disposables = [Disposable]()
fileprivate var _isDisposed = false
/// Constructs new empty dispose bag.
public override init() {
super.init()
}
/// Adds `disposable` to be disposed when dispose bag is being deinited.
///
/// - parameter disposable: Disposable to add.
public func insert(_ disposable: Disposable) {
self._insert(disposable)?.dispose()
}
private func _insert(_ disposable: Disposable) -> Disposable? {
//这里为了为了防止多线程下出现抢占资源问题,需要加锁控制同步访问
self._lock.lock(); defer { self._lock.unlock() }
if self._isDisposed {//判断如果调用过了_dispose()说明已经被释放过了,不需要再释放,保证对称性,则直接返回
return disposable
}
//保存到数组中
self._disposables.append(disposable)
return nil
}
/// This is internal on purpose, take a look at `CompositeDisposable` instead.
private func dispose() {
// 1.获取到所有保存的销毁者
let oldDisposables = self._dispose()
// 2.遍历每个销毁者,掉用每一个销毁者的dispose()释放资源
for disposable in oldDisposables {
disposable.dispose()
}
}
private func _dispose() -> [Disposable] {
self._lock.lock(); defer { self._lock.unlock() }
// 获取到所有保存的销毁者
let disposables = self._disposables
self._disposables.removeAll(keepingCapacity: false)
self._isDisposed = true //这个变量用来记录是否垃圾袋数组被清空过
return disposables
}
deinit {
//当DisposeBag自身对象被销毁时,调用自己的dispose(),遍历销毁数组中所有保存的销毁者,
self.dispose()
}
}
-
上面的源码流程通过一个图来标识
- 总结一下上面的DisposeBag处理流程:
- 当我们调用序列的
dispose()
方法是,DisposeBag
调用insert()
方法将我们的需要销毁的序列保存起来存放在_disposables
数组中。 - 当我们的DisposeBag销毁时,如定义的局部变量出了作用域后,就会被销毁,首先会调用我们的deinit()方法 如上图4,在deinit()里面会执行自己的dispose()方法,然后变量之前保存的所有需要释放的
_disposables
数组,依次调用他们自己的dispose()方法。
2. fetchOr()函数
- fetchOr 函数的作用类似于标记,先来看一下fetchOr()函数的源码:
func fetchOr(_ this: AtomicInt, _ mask: Int32) -> Int32 {
this.lock()
let oldValue = this.value
this.value |= mask
this.unlock()
return oldValue
}
源码很简单,但是作用不小。代码中this
是传入的AtomicInt值,其内部仅有一个value值。 fetchOr 先将 this.value copy一份,作为结果返回。而将 this.value 和 mask 做或 (|) 运算。并且将或运算的结果赋值给 this.value。
- 我们通过一个表来理解这个函数的执行结果:
this.value | mask | oldValue | 或 运算后this.value | 返回值 |
---|---|---|---|---|
0 | 1 | 0 | 1 | 0 |
1 | 1 | 1 | 1 | 1 |
0 | 2 | 0 | 2 | 0 |
1 | 2 | 1 | 3 | 1 |
就是做了一次或运算,实际的10进制结果不变,只是改变了里面的二进制位,可以用来做标志位,只是C语言里面经常用的方法,即一个Int类型处理本身的值可以使用外,还可以通过按位与,或,来改变它的标志位,达到传递值的目的,这样每个位都可以取代一个bool类型,经常用作枚举。
运算符 | 二进制 | 十进制 | 说明 |
---|---|---|---|
0000 0001 | 1 | ||
0000 0010 | 2 | ||
或运算 | 0000 0011 | 3 |
- 通过上面的分析,我得知 fetchOr ()函数的作用就是,可以确保每段代码只被执行一次,就相当于一个标志位,如果初始值为0 ,如果传入参数1,假设这段代码重复执行5次,只有第一次会从0变为1,后面四次调用都是为1,不会发送变化。
Dispose核心逻辑
Dispose 实例代码分析
- 学过Rxswift的童鞋都知道dispose()调用后,会向我们oc里面的引用计数器一样,释放我们的资源。释放的时候我们还可以监听到被销毁的事件回调。那么有没有思考过Dispose是如何做到的呢?
要知道这个答案,我们只能通过源码来一步步分析:
- 首先,我们来看一段实例代码:
实例1:
func limitObservable(){
// 创建序列
let ob = Observable<Any>.create { (observer) -> Disposable in
observer.onNext("kongyulu")
return Disposables.create { print("销毁释放了")} // dispose.dispose()
}
// 序列订阅
let dispose = ob.subscribe(onNext: { (anything) in
print("订阅到了:\(anything)")
}, onError: { (error) in
print("订阅到了:\(error)")
}, onCompleted: {
print("完成了")
}) {
print("销毁回调")
}
print("执行完毕")
//dispose.dispose()
}
-
上面的代码执行结果如下:
- 通过上面的结果我们知道,这个创建的序列没有被销毁,即没有打印“销毁释放了”,也没有打印“销毁回调”。这是为什么呢?这个问题我们后面再通过分析源码Rx源码就知道了。
- 现在我们把上面代码的那行注释放开
dispose.dispose()
这行代码去掉注释,然后重新运行,输出结果如下:
- 通过上面的代码我们看到了,创建的序列销毁了,销毁回调也执行了。那为什么加上了
dispose.dispose()
就可以了呢? - 此外我们再来修改一下我们的代码:
实例2:
func limitObservable(){
// 创建序列
let ob = Observable<Any>.create { (observer) -> Disposable in
observer.onNext("kongyulu")
observer.onCompleted()
return Disposables.create { print("销毁释放了")} // dispose.dispose()
}
// 序列订阅
let dispose = ob.subscribe(onNext: { (anything) in
print("订阅到了:\(anything)")
}, onError: { (error) in
print("订阅到了:\(error)")
}, onCompleted: {
print("完成了")
}) {
print("销毁回调")
}
print("执行完毕")
//dispose.dispose()
}
上面的实例2 的代码相对与实例1 就多了一行代码:observer.onCompleted()
:
我们再来看一下输出结果:
这里我们可以看到我们多加了一行
observer.onCompleted()
代码后,就也打印了销毁回调,销毁释放了,这是什么逻辑呢? why?
- 下面就让我们带着三个问题去探索一下Rxswift底层是如何实现的
Dispose 流程源码解析
再分析Dispose源码前,我们必须先深入理解序列的创建,订阅流程这个是基础,只有理解了这个,才能真正理解Dispose的原理。
这个其实在之前的博客已经分析过了,详情可以参考我之前的博客:序列核心逻辑
为了便于更好的理解,我在这里还再一次理一下具体的流程:
1. 序列创建,订阅流程
- (1) 当我们执行代码
let ob = Observable<Any>.create { (observer) -> Disposable in 这里面是一个闭包我们称为闭包A }
时,实际会来到Create.swift文件的第20行:
public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
return AnonymousObservable(subscribe)
}
- (2) create()函数返回一个
AnonymousObservable(subscribe)
对象,并将我们的闭包A传入到了AnonymousObservable的构造函数里面,AnonymousObservable将这个闭包A保存到了let _subscribeHandler: SubscribeHandler
这个变量中存起来了。_subscribeHandler
这个变量保存了序列ob创建时 传入的闭包A (其中闭包A要求传入AnyObserver
类型作为参数)
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
//这个变量保存了序列ob创建时 传入的闭包A (其中闭包A要求传入AnyObserver类型作为参数)
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler //这个变量保存了序列ob创建时 传入的闭包A
}
...下面代码先不看省略掉
}
- (3)我们调用
let dispose = ob.subscribe(onNext: { (anything) in print("订阅到了:\(anything)" }
这行代码进行序列ob的订阅操作,这行代码,我们跟进源码可以查看到:在ObservableType+Extensions.swift文件的第39行:
public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
let disposable: Disposable
... 此处代码先不分析省略
let observer = AnonymousObserver<Element> {
这个里面是一个尾随闭包的内容这里先不分析
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
(4)从上面
subscribe()
的源码可以看到,在函数中创建了一个AnonymousObserver的对象,然后直接就return Disposables.create()结束了。(5)这里我们并没有发现订阅和我们的闭包A有任何关系,那关键就在
self.asObservable().subscribe(observer)
这行代码里面了,我来分析一下这行代码到底做了些什么。-
(6)我们要理解(5)中的这行代码,就需要先理解一下类的集成关系:
AnonymousObservable
->Producer
->Observable
->ObservableType
->ObservableConvertibleType
详情如下图:
(7)通过继承关系,我们可以顺着继承链往上找父类,我们可以找到是在
Observable
类中定义了这个asObservable()
方法:
public class Observable<Element> : ObservableType {
...此处省略不关注的代码
public func asObservable() -> Observable<Element> {
return self
}
...此处省略不关注的代码
}
- (8)通过源码分析,我得知 asObservable()就是返回self ,而(3)的代码调用是的
self.asObservable().subscribe(observer)
这行代码的self就是我们创建的序列ob, 所以self.asObservable()
返回的就是ob我们最开始创建的可观察序列。self.asObservable().subscribe(observer)
中的observer就是我们在public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
方法实现中创建的局部变量:let observer = AnonymousObserver<Element> { 这个里面是一个尾随闭包的内容这里先不分析 }
我们将这个局部变量传入了Observable的subscribe()
方法。 - (9)接着我们就要分享Observable的
subscribe()
方法做了些什么了。 - (10)当我们调用实例2中的这行代码:
let dispose = ob.subscribe(onNext: { (anything) in print("订阅到了:\(anything)") }
的时候,实际调用了ObservableType
协议的subscribe()方法,在这个方法里面我们创建了一个AnonymousObserver
对象,并通过self.asObservable().subscribe(observer)
传入了ob.susbscribe(observer) (注意:这里的ob就是我们create()创建的AnonymousObservable对象,而observer就是subscribe时创建临时局部AnonymousObserver对象,这些上面已经分析过了)。 - (11)然而通过上面的类图,我们可以看到在ob(AnonymousObservable)类中并没有一个subscribe()的方法,那么我们只能先找它的父类Producer.
- (12)通过前面的类图分析,可以知道Producer继承Observerable可观察序列,遵循了ObservableType协议(这个协议定义一个subscribe()接口),所以我们Producer中必定会实现这个接口。我来看一下源码:
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
//下面这行代码是重点,调用了自己的run()方法,并传入了两个参数:
//参数1:observer:就是我们`self.asObservable().subscribe(observer)` 传入的`AnonymousObserver`对象
//参数2:disposer:就SinkDisposer()对象后将销毁会再分析。
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
- (13)通过上面源码分析,我们得知
Producer
实现的subscribe()
接口里面,调用了自己的run()
方法,并在run()
方法里面传入了observer:就是我们self.asObservable().subscribe(observer)
传入的AnonymousObserver
对象。那接下来我看一下run()做了一些什么事情:
func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
rxAbstractMethod()
}
- (14)从上面Producer中的run()方法,我们可以知道在这个方法并没有做任何事情,就一行
rxAbstractMethod()
,而这个rxAbstractMethod()只是一个抽象方法。那我们的子类AnonymousObservable中肯定覆写了run()方法。接下来我们再看一下AnonymousObservable
的run()
的源码:
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
//创建了一个管子AnonymousObservableSink,并传给了管子两个参数:
//参数1:observer:就是我们`self.asObservable().subscribe(observer)` 传入的`AnonymousObserver`对象
//参数2:disposer:就SinkDisposer()对象后将销毁会再分析。
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
- (15)在上面的
run()
源码中我们可以看到:在AnonymousObservable
的run()
方法中。首先,创建了一个AnonymousObservableSink
对象sink
,并将observer
(也就是我们self.asObservable().subscribe(observer)
传入的AnonymousObserver
对象)传入;其次,调用了sink.run(self)
方法返回了subscription,然后直接饭后一个元组,也就是run()方法返回了一个元组:(sink: sink, subscription: subscription)
。 但是我们的重点是在sink管子上面。AnonymousObservableSink
是一个类似于manager角色,它保存了序列,订阅者,销毁者三个信息,还具备调度能力。我们序列和订阅者就是通这个管子来做桥梁,实现通讯。 - (16)接下来我们分析
AnonymousObservableSink
管子做了一些什么呢?我们来看一下AnonymousObservableSink
的源码:
final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
typealias Element = Observer.Element
//这里给了一个别名:Parent就是AnonymousObservable序列
typealias Parent = AnonymousObservable<Element>
// state
private let _isStopped = AtomicInt(0)
#if DEBUG
fileprivate let _synchronizationTracker = SynchronizationTracker()
#endif
override init(observer: Observer, cancel: Cancelable) {
//传入了observer:就是我们`self.asObservable().subscribe(observer)` 传入的`AnonymousObserver`对象
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<Element>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
switch event {
case .next:
if load(self._isStopped) == 1 {//如果已经执行过.error, .completed,就不会继续执行self.forwardOn(event)代码,意思就是只有对象生命周期内执行过.complete,.error事件,就不会再执行forwardOn,除非重新激活改变条件值。
return
}
self.forwardOn(event)
case .error, .completed:
//fetchOr()这个方法上面已经讲解过,作用就是控制确保只会执行一次
if fetchOr(self._isStopped, 1) == 0 {//如果从没有执行过就执行一次,否则不执行。以确保下面代码在对象生命周期内,无论on()调用多少次,都只会执行一次。
self.forwardOn(event)
self.dispose()
}
}
}
//这是一个很重要的方法,
func run(_ parent: Parent) -> Disposable {
//这里传入parent就是AnonymousObservable序列,也就是我们最开始create()序列ob,_subscribeHandler就是我们创建序列时传入的闭包A(闭包A就相对一个函数,要求传入一个参数,这个参数就是AnyObserver(self))
return parent._subscribeHandler(AnyObserver(self))
}
}
- (17)通过上面AnonymousObservableSink的源码,我们得知有一下几点结论:
- AnonymousObservableSink.init初始化时传入了observer:就是我们
self.asObservable().subscribe(observer)
传入的AnonymousObserver
对象。 - AnonymousObservableSink有一个on()方法,这个方法根据传入的参数event做了不同的处理,但都会至少调用一次
self.forwardOn(event)
方法。每次如果onNext事件都会调用一次forwardOn()
。但是.error, .completed事件最多只会调用一次forwardOn()
。 - AnonymousObservableSink的run()方法是核心方法,是它会回调我们最开始create()创建时传递的闭包A,并将我们调用ob.subscribe()订阅时,函数内部创建的
AnonymousObserver
对象通过我们AnonymousObservableSink对象sink,也就是AnyObserver(self)
中的self
包装成一个AnyObserver结构体之后,作为参数传入闭包A,这样就将我们的序列和订阅者建立了联系。 -
特别注意:很多人认为传入我们闭包A 的就是
AnonymousObserver
实际上不是,传入闭包A的时一个AnyObserver结构体 - 通过AnonymousObservableSink的run()方法我们成功把我们最开始的ob.subscibe()订阅时创建的闭包通过
AnyObserver(self)
作为参数传给了闭包A,当我们在闭包A里面调用这行代码时:observer.onNext("kongyulu")
时,由于经过ob.subscribe()订阅之后,AnyObserver(self)
就是我们的observer.了,而此时的observer是一个结构体,它拥有了我们的管子AnonymousObservableSink
对象的on()方法。 - 在实例1中:当我们发送
observer.onNext("kongyulu")
序列消息时,实际上会通过我们的管子AnonymousObservableSink.on()
来调度,最终调度我们订阅时的闭包:onNext()闭包B:let dispose = ob.subscribe(onNext: { (anything) in print("订阅到了:\(anything)") }
。 - 那么现在最大的疑问就是:
AnonymousObservableSink.on()
时如何从observer.onNext("kongyulu")
调度到我们闭包B?
- AnonymousObservableSink.init初始化时传入了observer:就是我们
- (18)要分析上面这个问题,我们需要先来分析一下结构体
AnyObserver(self)
做了什么:先看一下AnyObsevrer的源码
public struct AnyObserver<Element> : ObserverType {
/// Anonymous event handler type.
public typealias EventHandler = (Event<Element>) -> Void
//这里定义了别名EventHandler就是一个传入事件的闭包
private let observer: EventHandler
/// Construct an instance whose `on(event)` calls `eventHandler(event)`
///
/// - parameter eventHandler: Event handler that observes sequences events.
public init(eventHandler: @escaping EventHandler) {
//self.observer保存了AnonymousObservableSink对象
self.observer = eventHandler
}
/// Construct an instance whose `on(event)` calls `observer.on(event)`
///
/// - parameter observer: Observer that receives sequence events.
//初始化时要求传入一个ObserverType,而这个是(17)点分析中AnyObserver(self)代码中的self,实际上就是AnonymousObservableSink对象,
public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
//这段代码直接保存了AnonymousObservableSink.on()方法
//self.observer实际就是一个on()方法
self.observer = observer.on
}
/// Send `event` to this observer.
///
/// - parameter event: Event instance.
public func on(_ event: Event<Element>) {
//这里调用on方法实际就是调用AnonymousObservableSink.on(event)方法
return self.observer(event)
}
/// Erases type of observer and returns canonical observer.
///
/// - returns: type erased observer.
public func asObserver() -> AnyObserver<Element> {
return self
}
}
-
(19)通过上面AnyObserver源码分析,我们得知
AnyObserver
初始化时保存了我们管子AnonymousObservableSink
的on()
方法,并且自己有一个on方法,在他自己的on方法里面再去调用AnonymousObservableSink.on()方法。这样就只是包装了一层不让外界知道我们的AnonymousObservableSink
类,为啥这样设计呢?这样设计有几点好处:- 起到完全封装效果,外界完全不需要知道我们的管子
AnonymousObservableSink
类,他们不关心我们AnonymousObservableSink
类时如何实现的,使用者只需要用这个接口on()就行了,至于on()是如何实现的,通过谁实现并不需要关心。 - 起到解耦的效果,
AnyObserver
并没有拥有我们AnonymousObservableSink
对象,它只是拥有了AnonymousObservableSink
的on()接口,只需要AnonymousObservableSink
实现这个on()接口该做的事情就可以了。至于AnonymousObservableSink
内部怎么改(只要on()接口不改)的都不会影响到AnyObserver
。
- 起到完全封装效果,外界完全不需要知道我们的管子
-
(20)现在我们的重点就在on()方法上面:
- 当我们实例1中执行:
observer.onNext("kongyulu")
这行代码时,实际就会调用:AnyObserver.onNext()
方法。(由于我们AnyObserver继承了ObserverType协议,也就拥有了ObserverType
的onNext()
方法,此处如果不清楚可以往上回看类继承关系)
- 当我们实例1中执行:
(21)
AnyObserver.onNext()
调用的时候会调用自己的on()
方法:
ObserverType的接口定义
extension ObserverType {
public func onNext(_ element: Element) {
self.on(.next(element))//这里会调回到AnyObserver的on()方法,AnyObserver继承ObserverType,重写了on()接口
}
public func onCompleted() {
self.on(.completed)
}
public func onError(_ error: Swift.Error) {
self.on(.error(error))
}
}
- (22)
AnyObserver.on()
方法会调用AnonymousObservableSink.on()
方法。 - (23)
AnonymousObservableSink.on(event)
会调用AnonymousObservableSink.forwardOn(event)
- (24)而在AnonymousObservableSink中没有定义
forwardOn()
方法,我们找到它的父类Sink里面实现了forwardOn()
源码如下:
class Sink<Observer: ObserverType> : Disposable {
fileprivate let _observer: Observer
fileprivate let _cancel: Cancelable
fileprivate let _disposed = AtomicInt(0)
#if DEBUG
fileprivate let _synchronizationTracker = SynchronizationTracker()
#endif
init(observer: Observer, cancel: Cancelable) {
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
//初始化保存了self._observer实际就是:我们`self.asObservable().subscribe(observer)` 传入的`AnonymousObserver`对象
self._observer = observer
self._cancel = cancel
}
final func forwardOn(_ event: Event<Observer.Element>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
if isFlagSet(self._disposed, 1) {
return
}
// 这里实际调用了`AnonymousObserver.on()`方法。
self._observer.on(event)
}
... 此次代码省略,不需要关注
}
- (25)从上面的源码我们可以看到:
Sink.forwardOn()
实际调用了AnonymousObserver.on()
,说白了就是:我们最开始实例1的observer.onNext("kongyulu")
这行代码执行时,ob.onNext() 先调用AnyObserver.on()
,AnyObserver.on()
又会调用AnonymousObservableSink.on()
,AnonymousObservableSink.on()
又会调用AnonymousObservableSink.forwardOn()
,接着AnonymousObservableSink.forwardOn()
又会调用AnonymousObservableSink父类的Sink.forwardOn()
,最后由Sink.forwardOn()
调用了AnonymousObserver.on()
。 - (26)到这里我们思路基本清晰了,我们再回到
AnonymousObserver.on()
方法定义:
- 首先我们查看类定义如下,并没有找到由on()方法:
final class AnonymousObserver<Element>: ObserverBase<Element> {
//次处给尾随闭包取了一个别名
typealias EventHandler = (Event<Element>) -> Void
private let _eventHandler : EventHandler
init(_ eventHandler: @escaping EventHandler) {
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
//这里保存了一个传入的尾随闭包:这个尾随闭包就是我们ob.subscribe()时创建let observer = AnonymousObserver<Element> { event in这里是个尾随闭包B} 这里传入_eventHandler保存的就是尾随闭包B
self._eventHandler = eventHandler
}
override func onCore(_ event: Event<Element>) {
//这里回调了我们的尾随闭包B
return self._eventHandler(event)
}
#if TRACE_RESOURCES
deinit {
_ = Resources.decrementTotal()
}
#endif
}
- 于是我们来找它的父类ObserverBase:
class ObserverBase<Element> : Disposable, ObserverType {
private let _isStopped = AtomicInt(0)
func on(_ event: Event<Element>) {
switch event {
case .next:
if load(self._isStopped) == 0 {
self.onCore(event)//这里实际调用的是子类的onCore()
}
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.onCore(event)
}
}
}
func onCore(_ event: Event<Element>) {
rxAbstractMethod()
}
func dispose() {
fetchOr(self._isStopped, 1)
}
}
- 我们通过分析父类源码得知
ObserverBase.on()
最终调用了AnonymousObserver.onCore()
,而在AnonymousObserver.onCore()
里回调了_eventHandler(event)闭包B,而闭包B就是我们最初ob.subscribe()序列订阅时创建AnonymousObserver的尾随闭包,这样这个尾随闭包最终调用了我们订阅的onNext()方法。这样就解释了:实例1中,执行observer.onNext("kongyulu")
这行代码就会回调let dispose = ob.subscribe(onNext: { (anything) in print("订阅到了:\(anything)") }
从而打印了 “订阅到了:kongyulu”
具体AnonymousObserver<Element> {B}的尾随闭包B的代码如下:
public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
...此次无关代码先省略
//特别注意:AnonymousObserver<Element> { B}括号里面的尾随闭包我们称为B,最终会通过AnonymousObserver.onCore()函数调用闭包B
let observer = AnonymousObserver<Element> { event in
...此次无关代码先省略
switch event {
case .next(let value):
onNext?(value) //这里调用onNext(value)实际就是
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
- (27)通过(26)点分析我们应该弄明白了整个订阅的流程了,
简单总结就是:
- 我们ob.create(
闭包A
)创建时将闭包A保存在AnonymousObservable
里变量_subscribeHandler
。 - 当我们调用ob.subscribe(
闭包B
)订阅序列时,会首先创建一个AnonymousObserver
对象,并且会带一个尾随闭包C
。然后通过self.asObservable().subscribe(AnonymousObserver)
经过一系列转化将AnyObserver
传递给了闭包A
。 - 其中2中说一系列转化可以简单解释为:
-
self.asObservable().subscribe(AnonymousObserver)
实际就是ob.subscribe(AnonymousObserver)
-
ob.subscribe(AnonymousObserver)
实际就是Producer.subscribe(AnonymousObserver)
-
Producer.subscribe(AnonymousObserver)
会调用self.run(AnonymousObserver)
-
self.run(AnonymousObserver)
会创建一个AnonymousObservableSink
管子对象sink,然后调用sink.run(AnonymousObservable)
调用了管子的run()方法,并将ob传入了管子sink. - 而我们管子的
sink.run(AnonymousObservable)
方法里面调用了parent._subscribeHandler(AnyObserver(self))
实际就是ob._subscribeHandler(AnyObserver(AnonymousObservableSink))
也就是调用了闭包A
- 而我们
闭包A
需要传入一个参数就是AnyObserver(AnonymousObservableSink)
,实际上AnyObserver
只是一个结构体,它保存了AnonymousObservableSink.on()
方法。 - 当我们在
闭包A
里面调用observer.onNext("kongyulu")
实际上就是AnyObserver.onNext("kongyulu")
,而AnyObserver.onNext("kongyulu")
会调用AnyObserver.on()
-
AnyObserver.on()
接着又调用AnonymousObservableSink.on(event)
这里event里面 - AnonymousObservableSink类中
AnonymousObservableSink.on(event)
接着又会去调用它自己的forwardOn(event)
也就是AnonymousObservableSink.forwardOn(event)
-
AnonymousObservableSink.forwardOn(event)
实际上是调用它父类Sink.forwardOn(event)
而在Sink父类初始化的时候已经保存了AnonymousObserver
对象_observer。 -
Sink.forwardOn(event)
会调用的是AnonymousObserver.on(event)
-
AnonymousObserver.on(event)
实际会调用自己父类的ObserverBase.on(event)
-
ObserverBase.on(event)
实际又会调用子类的AnonymousObserver.onCore(event)
-
AnonymousObserver.onCore(event)
会调用self._eventHandler(event)
而这里_eventHandler就是保存AnonymousObserver创建时传入的尾随闭包C
这样就回调了闭包C
-
闭包C
中又根据event的事件不同,回调了闭包B
,例如如event=.onNext事件,就会回调闭包B
onNext{},也就是let dispose = ob.subscribe(onNext: { (anything) in print("订阅到了:\(anything)") }, onError: { (error) in print("订阅到了:\(error)") }, onCompleted: { print("完成了") }) { print("销毁回调") }
里面的这段代码:onNext: { (anything) in print("订阅到了:\(anything)")
从而就会打印:“订阅到了:kongyulu”
- (28)
- (29)
最后这里通过一个流程图来表达整个创建,订阅过程
2. 序列创建,订阅图解
3. 序列订阅流程
3.1 序列销毁方式
上面讲解了序列的创建,订阅流程,在分析创建序列,订阅序列的源码时,我们已经隐隐约约的看到了我们开篇分析的dispose(),貌似在整个源码中各处都有着dispose的代码,那么序列到底是怎么销毁的呢?
为了解决这个疑问,我们下面将通过分析源码,来探索一下序列的销毁流程。
这里先看一张序列生命周期时序图:
通过这张时序图,结合上面的序列创建,订阅的流程分析,我可以先得出序列会被销毁的3种方式:
方式一:
通过发送事件,让序列生命周期自动结束来释放序列资源。 一个序列只要发出了 error 或者 completed 事件,它的生命周期将结束,那么所有内部资源都会被释放,不需要我们手动释放。(这个结论在本篇博客讨论实例1和实例2的时候已经验证了,只要发送了completed和error事件,就会调用onComplete并打印“销毁了”信息)方式二:
通过主动调用dispose()来释放。例如你需要提前释放序列资源或取消订阅的话,那么你可以对返回的可被清除的资源(Disposable) 调用 dispose 方法。方式三:
通过垃圾袋DisposeBag来回收资源,达到自动释放,这是官方推荐的方式。官方推荐使用清除包(DisposeBag)来管理订阅的生命周期,一般是把资源加入到一个全局的DisposeBag里面,它跟随着页面的生命周期,当页面销毁时DisposeBag也会随之销毁,同时DisposeBag里面的资源也会被一一释放。(这个结论在上面的DisposeBag分析中也证实了)
3.2 序列销毁实例分析
我们先来回顾一下本篇博客开始分析的实例1的代码
实例1:
func limitObservable(){
// 创建序列
let ob = Observable<Any>.create { (observer) -> Disposable in
observer.onNext("kongyulu")
return Disposables.create { print("销毁释放了")} // dispose.dispose()
}
// 序列订阅
let dispose = ob.subscribe(onNext: { (anything) in
print("订阅到了:\(anything)")
}, onError: { (error) in
print("订阅到了:\(error)")
}, onCompleted: {
print("完成了")
}) {
print("销毁回调")
}
print("执行完毕")
//dispose.dispose()
}
-
上面的代码执行结果如下:
- 通过上面的结果我们知道,这个创建的序列没有被销毁,即没有打印“销毁释放了”,也没有打印“销毁回调”。这是为什么呢?这个问题我们后面再通过分析源码Rx源码就知道了。
- 现在我们把上面代码的那行注释放开
dispose.dispose()
这行代码去掉注释,然后重新运行,输出结果如下:
3.3 序列销毁源码分析
- 通过上面实例1的代码,首先可以看到,在创建序列
Observable<Any>.create()
方法有一个尾随闭包,需要返回一个实现了Disposable
协议的实例。而就是通过return Disposables.create { print("销毁释放了")}
这行代码返回的。由此我们确认Disposables.create { print("销毁释放了")}
非常重要,我们先来发分析一下Disposables.create
源码。 - 进入到Disposables.create()源码:我们想直接点击进去发现Disposables就是一个空结构体
public struct Disposables {
private init() {}
}
看这个结构体连初始化方法都是私有的,说明它不能被继承,于是我们推测Disposables.create()
一定通过扩展的方式实现的。所以我们在项目中搜索extension Disposables {
关键字,可以找到如下:
这样我们找到第一个:AnonymousDisposable.swift文件进入找到第55行:
extension Disposables {
/// Constructs a new disposable with the given action used for disposal.
///
/// - parameter dispose: Disposal action which will be run upon calling `dispose`.
public static func create(with dispose: @escaping () -> Void) -> Cancelable {
return AnonymousDisposable(disposeAction: dispose)//这里dispose就是我们传入的尾随闭包
}
}
通过上面源码,我们看到直接一行return AnonymousDisposable(disposeAction: dispose)
就结束了,而dispose
就是我们实例1中 Disposables.create { print("销毁释放了")} // dispose.dispose() }
这行代码里面的尾随闭包: { print("销毁释放了")}
这里我们给他一个别名成为:闭包D
- 不用思考,接下来我们肯定要进入
AnonymousDisposable
类实现一探究竟:
fileprivate final class AnonymousDisposable : DisposeBase, Cancelable {
public typealias DisposeAction = () -> Void
private let _isDisposed = AtomicInt(0)
private var _disposeAction: DisposeAction?
/// - returns: Was resource disposed.
public var isDisposed: Bool {
return isFlagSet(self._isDisposed, 1)
}
fileprivate init(_ disposeAction: @escaping DisposeAction) {
self._disposeAction = disposeAction
super.init()
}
// Non-deprecated version of the constructor, used by `Disposables.create(with:)`
fileprivate init(disposeAction: @escaping DisposeAction) {
self._disposeAction = disposeAction
super.init()
}
/// Calls the disposal action if and only if the current instance hasn't been disposed yet.
///
/// After invoking disposal action, disposal action will be dereferenced.
fileprivate func dispose() {
if fetchOr(self._isDisposed, 1) == 0 {
if let action = self._disposeAction {
self._disposeAction = nil
action()
}
}
}
}
- 分析上面AnonymousDisposable类定义源码,我们可以得出以下结论:
- 初始化的时候把外界传过来的闭包进行保存,传入进来的闭包我们就是我们第2点中分析的闭包D:
{ print("销毁释放了")}
- 有一个
dispose()
方法,通过fetchOr(self._isDisposed, 1) == 0
这行代码控制dispose()
里面的内容只会被执行一次。(无论dispose()
方法被执行多少次,if let action = self._disposeAction { self._disposeAction = nil action() }
这段代码最多会被执行一次) -
dispose()
方法中先把self._disposeAction
赋值给临时变量action
,然后置空self._disposeAction
,再执行action()
。这样操作的原因是如果_disposeAction
闭包是一个耗时操作,也能够保证_disposeAction
能够立即释放。
在
AnonymousDisposable
里面我们只看到了一些常规的保存等操作,结合我们最开始分析序列的创建流程经验(AnonymousDisposable
就类似于AnonymousObservable
),我们可以推断核心代码实现肯定在订阅这一块。接下来,我们进入到
observable.subscribe()
方法来探究一些subscribe()
的源码实现。
public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
//1.这里定义disposable局部变量
let disposable: Disposable
//2.创建了Disposables对象
if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
}
else {
disposable = Disposables.create()
}
//3.创建了一个AnonymousObserver对象,有一个重要的尾随闭包
let observer = AnonymousObserver<Element> { event in
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose() //这里当收到error事件就会回收释放资源
case .completed:
onCompleted?()
disposable.dispose() //这里当收到completed事件就会回收释放资源
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable//这里将我们创建的局部变量传给了self.asObservable().subscribe,也就是我们的Producer.subscribe
)
}
分析上面subscribe()
源码,结合开始的分析,我们可以得出以下结论:
-
subscribe()
创建了一个Disposable
对象,并保存了销毁回调闭包,当执行销毁时,会把消息回调出去。 - 在收到错误或者完成事件时会执行
disposable.dispose()
释放资源。 -
return Disposables.create( self.asObservable().subscribe(observer), disposable )
,这里返回的Disposable
对象就是我们外面手动调用dispose.dispose()
方法的dispose
对象,或者说是加入到全局的DisposeBag
的销毁者。
- 由6的分析,我们清楚知道最后一行代码
return Disposables.create( self.asObservable().subscribe(observer), disposable )
时关键点,我们接下进入:Disposables.create()
源码:
public static func create(_ disposable1: Disposable, _ disposable2: Disposable) -> Cancelable {
return BinaryDisposable(disposable1, disposable2)//返回一个二元销毁者对象。
}
上面代码我们看到create()直接返回了一个BinaryDisposable
二元销毁者类对象,并将disposable1
,disposable2
传入给了BinaryDisposable
。
- 这里的
disposable1
就是self.asObservable().subscribe(observer)
也就是Producer..subscribe(observer)
返回的disposer -
disposable2
就是我们subscribe()中创建局部变量let disposable: Disposable
- 接着我们来分析
BinaryDisposable
类到底是什么:
private final class BinaryDisposable : DisposeBase, Cancelable {
private let _isDisposed = AtomicInt(0)
// state
private var _disposable1: Disposable?
private var _disposable2: Disposable?
/// - returns: Was resource disposed.
var isDisposed: Bool {
return isFlagSet(self._isDisposed, 1)
}
init(_ disposable1: Disposable, _ disposable2: Disposable) {
self._disposable1 = disposable1
self._disposable2 = disposable2
super.init()
}
func dispose() {
if fetchOr(self._isDisposed, 1) == 0 {
self._disposable1?.dispose()
self._disposable2?.dispose()
self._disposable1 = nil
self._disposable2 = nil
}
}
}