当程序员原来越浮躁了,项目做多了大都是雷同的, 对技术没啥帮助,读一些牛逼的第三方框架,有助于提升,
关于RxSwift,作为ReactiveX的成员框架之一,它有着血脉相承的语法,语言和哲学上的一致性使得即便之后转向其他的平台我们也能很快的上手Rx。其次,通过阅读源码,我们也可以看到,其实有的时候大神的代码也没有那么的完美,我们也可以在一些地方看到妥协和失误
本篇目标
本篇的目标就是了解下面这段代码(来自Rx.playgroud)的实现:
example("just") {
let disposeBag = DisposeBag()
Observable.just("🔴")
.subscribe { event in
print(event)
}
.disposed(by: disposeBag)
}
这寥寥数行代码,我想但凡RxSwift入门的同学都知道它的用处:创建一个单值的可观察序列,并且打印出它的所有序列。恩...没毛病,但是本篇想要知道的是:
序列是如何创建的?它的结构是怎么样的?
序列是如何被观察者订阅的?
so...let's go !
众所周知,自Swift诞生以来,苹果爸爸就一直在推崇*** 面向协议编程(POP) *** ,而RxSwift也是同样的,遵循了从一个协议开始,而不是从一个类开始。但是我并不想从协议讲起,因为虽然从协议讲起最具逻辑性,但是从文章的角度来说并不好理解和阅读。所以本文将以示例代码为切入点,自上而下的阅读,以求简单清晰易懂。
Observable
在开篇的示例代码中,首先映入我们眼帘的是Observable,Observable调用了just方法。其实Observable是一个遵守ObservableType的类,实现代码如下
public class Observable<Element> : ObservableType {
/// Type of elements in sequence.
public typealias E = Element
init() {
#if TRACE_RESOURCES
let _ = Resources.incrementTotal()
#endif
}
public func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
rxAbstractMethod()
}
public func asObservable() -> Observable<E> {
return self
}
deinit {
#if TRACE_RESOURCES
let _ = Resources.decrementTotal()
#endif
}
// this is kind of ugly I know :(
// Swift compiler reports "Not supported yet" when trying to override protocol extensions, so ¯\_(ツ)_/¯
/// Optimizations for map operator
internal func composeMap<R>(_ selector: @escaping (Element) throws -> R) -> Observable<R> {
return Map(source: self, transform: selector)
}
}
首先这是一个用Public修饰的抽象类,它是直接面向RxSwift使用者的。在这个类当中,使用了E的别名来充当序列值的泛型类型。在Init方法中我们可以看到一个Resource的结构体,顺便提一句,这是一个用来“追踪计数”RxSwift引用资源的,每当init一个资源计数就+1,deinit的时候就总数-1,以此来追踪全局的资源使用。
除此之外,我们还可以看到有两个方法:
public func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E
public func asObservable() -> Observable<E>
这两个方法都在ObservableType这个协议中有所定义,前者定义了Observable能够被订阅的行为,后者则是定义了可以将自身“转换”成“Observable实体”的功能。这里可能会有一些Confuse,其实前面已经提到过,单纯的Observable类并不能作为序列被直接订阅使用,只有Observable的实体子类才可以被实例化使用。
所以,我们也可以看到,subscribe函数的实现也只是简单的fatalError,并没有实际的逻辑操作:
// Swift does not implement abstract methods. This method is used as a runtime check to ensure that methods which intended to be abstract (i.e., they should be implemented in subclasses) are not called directly on the superclass.
func rxAbstractMethod(file: StaticString = #file, line: UInt = #line) -> Swift.Never {
rxFatalError("Abstract method", file: file, line: line)
}
-> Observable - > ObservableType
现在我们再来看一下ObservableType:
public protocol ObservableType : ObservableConvertibleType {
/// Type of elements in sequence.
associatedtype E
func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E
}
extension ObservableType {
/// Default implementation of converting `ObservableType` to `Observable`.
public func asObservable() -> Observable<E> {
// temporary workaround
//return Observable.create(subscribe: self.subscribe)
return Observable.create { o in
return self.subscribe(o)
}
}
}
这两个方法的作用前面已经讲过,在这里我们可以看到通过Protocol ExtensionRxSwift为ObservableType提供了默认的asObservable的实现,那么ObservableConvertibleType是一个什么协议呢,是不是有它从根源上定义了asObservable方法呢?我们来看一下ObservableConvertibleType的定义:
/// Type that can be converted to observable sequence (`Observer<E>`).
public protocol ObservableConvertibleType {
/// Type of elements in sequence.
associatedtype E
/// Converts `self` to `Observable` sequence.
///
/// - returns: Observable sequence that represents `self`.
func asObservable() -> Observable<E>
}
果然如此,那么至此为止,Observable之前的协议继承体系我们已经明了,画成图大概是这样的:
很遗憾,至此为止我们并没有看到太多的实现逻辑,但是我们看到了一系列Observable的Protocol根基。那么具体的Observable实体应该是怎么样的呢?我们从just身上来找到答案。
在Observable+Creation.swift文件中,我们找到了关于just的定义:
Observable+Creation.swift是一个Observable的一个拓展(extension),文件中我们可以看到很多关于构建Observable实体的方法,诸如create, empty, never等等,本篇以just作为切入点,其实其他的公开的Creation函数也是类似的逻辑,所以不会一一介绍了。
public static func just(_ element: E) -> Observable<E> {
return Just(element: element)
}
我们可以看到,这是一个public修饰的暴露在外的Observable的静态方法,返回的也是Observable类型。那么这个Just是什么呢?
final class Just<Element> : Producer<Element> {
private let _element: Element
init(element: Element) {
_element = element
}
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
observer.on(.next(_element))
observer.on(.completed)
return Disposables.create()
}
}
我们可以看到,这是一个继承自Producer的一个类,OK,我们先不去管这个Just,先去看看Producer是一个什么东西:
class Producer<Element> : Observable<Element> {
override init() {
super.init()
}
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
if !CurrentThreadScheduler.isScheduleRequired {
let disposer = SinkDisposer()
let sinkAndSubscription = run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
rxAbstractMethod()
}
}
看到这里我们清楚了,Producer是继承自Observable的一个抽象类,结合前面的Just,于是我们的图可以画成这样了:
在Observable的子类Producer我们可以看到该基类实现subscribe的基础方法,这里用到了RxSwift当中的另外一个概念--Scheduler,但是它不是本文的重点,我们将在接下来的文章里面去集中讨论它,这里只是做一些简单的解读(感觉给自己埋了坑)。
在Producer中,subscribe主要做了以下几件事情:
(1)创建一个SinkDisposer。
(2)判断是否需要Scheduler来进行切换线程的调用,如果需要那么就在指定的线程中操作。
(4)调用run方法,将observer和刚刚创建的SinkDisposer作为入参,得到一个Sink和Subscription的一个元组。这里的Sink和Subscription都是遵守Disposable的类。
(4).SinkDisposer对传入之前的Sink和Subscription执行setSinkAndSubscription方法。
(5).将执行完setSinkAndSubscription方法的disposer作为返回值返回。
这里的相关操作其实都容易理解,首先看看这个run:
func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
rxAbstractMethod()
}
好,既然这是一个抽象方法,那么我们暂且先不去管它,今晚不是它的轮次(最近🐺杀玩多了)。除去这个方法,最让人疑惑的就是这个setSinkAndSubscription方法,那么它的作用是什么呢?
我们先来谈一谈SinkDisposer类,但是再谈它之前我们需要先知道它所遵守的协议。SinkDisposer是一个遵守Cancelable协议的类,那么这个Cancelable是何方神圣呢?
这一切都要从Disposable说起。
Disposable
Disposable只是一个简单的协议,其中只有一个dispose方法,定义了释放资源的统一行为。
/// Respresents a disposable resource.
public protocol Disposable {
/// Dispose resource.
func dispose()
}
OK,这个很简单Cancelable呢?
/// Represents disposable resource with state tracking.
public protocol Cancelable : Disposable {
/// Was resource disposed.
var isDisposed: Bool { get }
}
没错,Cancelable只是一个继承自Disposable的一个协议,其中定义了一个Bool类型的isDisposed标识,用来标识是否该序列已经被释放。
SinkDisposer
OK,现在我们终于来到了SinkDisposer类,先上源码:
fileprivate final class SinkDisposer: Cancelable {
fileprivate enum DisposeState: UInt32 {
case disposed = 1
case sinkAndSubscriptionSet = 2
}
// Jeej, swift API consistency rules
fileprivate enum DisposeStateInt32: Int32 {
case disposed = 1
case sinkAndSubscriptionSet = 2
}
private var _state: AtomicInt = 0
private var _sink: Disposable? = nil
private var _subscription: Disposable? = nil
var isDisposed: Bool {
return AtomicFlagSet(DisposeState.disposed.rawValue, &_state)
}
func setSinkAndSubscription(sink: Disposable, subscription: Disposable) {
_sink = sink
_subscription = subscription
let previousState = AtomicOr(DisposeState.sinkAndSubscriptionSet.rawValue, &_state)
if (previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) != 0 {
rxFatalError("Sink and subscription were already set")
}
if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
sink.dispose()
subscription.dispose()
_sink = nil
_subscription = nil
}
}
func dispose() {
let previousState = AtomicOr(DisposeState.disposed.rawValue, &_state)
if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
return
}
if (previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) != 0 {
guard let sink = _sink else {
rxFatalError("Sink not set")
}
guard let subscription = _subscription else {
rxFatalError("Subscription not set")
}
sink.dispose()
subscription.dispose()
_sink = nil
_subscription = nil
}
}
}
在这里我们看到的一些以Atomic
开头的方法都是在OSAtomic.h
中所定义的自动读取和更新指定值方法,详细的使用方法可以点 这里的官方文档 。这里使用的Atomic
,方法是为了区分以下几种可能性:
case1 - 第一次执行
(1)将sink和subscription赋值给自身的私有变量。
(2)通过Atmoic方法(也就是OSAtomicOr32OrigBarrier)方法,将_state的值更新为2,并且返回值previousState为0。
(3)previousState和DisposeStateInt32. sinkAndSubscriptionSet.rawValue做逻辑与运算,得值为0所以不执行if里面的逻辑。
(4)previousState和DisposeStateInt32.disposed.rawValue 做逻辑与运算,的值为0,所以也不执行if里面的逻辑。
(5)结束。
case2 - 再次执行
1.由于没有执行过dispose方法,所以自从第一次执行setSinkAndSubscription之后,_state的值一直为2。当执行previousState和DisposeStateInt32.disposed.rawValue的时候,的值为2,所以执行xFatalError("Sink and subscription were already set")程序中止运行。
case3 - 先执行过dispose,然后第一次执行
(1)由于执行过dispose 方法,所以_state的值为1。
(2)通过Atmoic方法(也就是OSAtomicOr32OrigBarrier)方法,将_state的值更新为2,并且返回值previousState为1。
(3)previousState和DisposeStateInt32. sinkAndSubscriptionSet.rawValue做逻辑与运算,得值为0所以不执行if里面的逻辑。
(4)previousState和DisposeStateInt32.disposed.rawValue 做逻辑与运算,的值为1,所以执行if内的操作,将sink和subscription分别执行dispose操作,并且将两个私有变量置nil,打破引用环。
我们可以看到,通过一个_state和OSAtomic的方法,RxSwift非常优雅的解决了上述三种场景,非常值得借鉴。而本类中的dispose方法其实也是类似的处理方法,来保证只有一次有效的dispose操作,本文就不再赘述
Observer
接下来我们来讲讲RxSwift中的另外一个角色,Observer(观察者),这次我们从观察者的基类ObserverBase谈起:
ObserverBase是一个遵守了Disposable和ObserverType协议的一个抽象类,实现了on和dispose。值得注意的是,在ObserverBase中有一个私有变量:
接下来我们来讲讲RxSwift中的另外一个角色,Observer(观察者),这次我们从观察者的基类ObserverBase谈起:
ObserverBase是一个遵守了Disposable和ObserverType协议的一个抽象类,实现了on和dispose。值得注意的是,在ObserverBase中有一个私有变量:
private var _isStopped: AtomicInt = 0
_isStopped是一个哨兵,用来标记所观察的序列是否已经停止了,那么什么时候需要标记为Stop呢?我们来看这段代码:
func on(_ event: Event<E>) {
switch event {
case .next:
if _isStopped == 0 {
onCore(event)
}
case .error, .completed:
if !AtomicCompareAndSwap(0, 1, &_isStopped) {
return
}
onCore(event)
}
}
只要_isStopped不为0,那么就允许“发射”.next事件,也就是执行onCore方法。
当第一次“发射”.error或者.completed时,执行一次onCore,并且将_isStopped设为1。
因为所有的Observer类在事件发射的逻辑上面都相同,所以统一在ObserverBase中作了处理,这也是典型的OOP思想。老铁,没毛病~
值得一提的是,我们可以看到这里使用了一个AtomicCompareAndSwap的方法,这个方法是做什么的呢?在Platform.Darwin.swift中,我们可以看到关于这个方法的定义:
typealias AtomicInt = Int32
let AtomicCompareAndSwap = OSAtomicCompareAndSwap32Barrier
let AtomicIncrement = OSAtomicIncrement32Barrier
let AtomicDecrement = OSAtomicDecrement32Barrier
我们可以看到,AtomicCompareAndSwap其实就是OSAtomic库中所定义的一个全局方法:
/*! @abstract Compare and swap for 32-bit values with barrier.
@discussion
This function compares the value in <code>__oldValue</code> to the value
in the memory location referenced by <code>__theValue</code>. If the values
match, this function stores the value from <code>__newValue</code> into
that memory location atomically.
This function is equivalent to {@link OSAtomicCompareAndSwap32}
except that it also introduces a barrier.
@result Returns TRUE on a match, FALSE otherwise.
*/
@available(iOS 2.0, *)
@available(iOS, deprecated: 10.0, message: "Use atomic_compare_exchange_strong() from <stdatomic.h> instead")
public func OSAtomicCompareAndSwap32Barrier(_ __oldValue: Int32, _ __newValue: Int32, _ __theValue: UnsafeMutablePointer<Int32>!) -> Bool
简单的来说,该方法传入三个参数:__oldValue,__newValue和__theValue,前两个参数都是Int32类型的,后一个是UnsafeMutablePointer<Int32>的可变指针。当__oldValue的值和指针所指向的内存地址的变量的值相等时,返回true否则为false,于此同时,如果__newValue和当前的值不相等,那么就赋值,使得__theValue的值为新值。伪代码如下:
f (*pointer == oldvalue) {
*pointer = newvalue;
return 1;
} else {
return 0;
}
为了达到最佳性能,编译器通常会对汇编基本的指令进行重新排序来尽可能保持处理器的指令流水线。作为优化的一部分,编译器有可能对访问主内存的指令,如果它认为这有可能产生不正确的数据时,将会对指令进行重新排序。不幸的是,靠编译器检测到所有可能内存依赖的操作几乎总是不太可能的。如果看似独立的变量实际上是相互影响,那么编译器优化有可能把这些变量更新位错误的顺序,导致潜在不不正确结果。
内存屏障(memory barrier)是一个使用来确保内存操作按照正确的顺序工作的非阻塞的同步工具。内存屏障的作用就像一个栅栏,迫使处理器来完成位于障碍前面的任何加载和存储操作,才允许它执行位于屏障之后的加载和存储操作。内存屏障同样使用来确保一个线程(但对另外一个线程可见)的内存操作总是按照预定的顺序完成。如果在这些地方缺少内存屏障有可能让其他线程看到看似不可能的结果。为了使用一个内存屏障,你只要在你代码里面需要的地方简单的调用OSMemoryBarrier函数。
####匿名观察者
看完了ObserverBase现在我们来看一下AnonymousObserver:
final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
typealias Element = ElementType
typealias EventHandler = (Event<Element>) -> Void
private let _eventHandler : EventHandler
init(_ eventHandler: @escaping EventHandler) {
if TRACE_RESOURCES
let _ = Resources.incrementTotal()
endif
_eventHandler = eventHandler
}
override func onCore(_ event: Event<Element>) {
return _eventHandler(event)
}
if TRACE_RESOURCES
deinit {
let _ = Resources.decrementTotal()
}
endif
}
我们可以看到,在这个匿名观察者中,它主要做的事情就是将基类ObserverBase所没有实现的onCore方法实现了,将观察者构造方法时传入的EventHandler在onCore方法中执行。这也就是观察者受到序列事件的动作。
####订阅过程
在我们对Observable、Observer和Disposeable有了一定的认知之后,我们可以来认识一下最为关键的一步,subscribe也就是订阅。
在ObservableType+Extensions.swift中我们可以看到相关的实现:
/**
Subscribes an event handler to an observable sequence.
- parameter on: Action to invoke for each event in the observable sequence.
- returns: Subscription object used to unsubscribe from the observable sequence.
*/
public func subscribe(_ on: @escaping (Event<E>) -> Void)
-> Disposable {
let observer = AnonymousObserver { e in
on(e)
}
return self.subscribeSafe(observer)
}
所谓的subscribe其实只是做了两个事情。首先是构造了一个匿名观察者,将on也就是(Event<E>) -> Void类型的闭包作为参数,每次在匿名观察者有新的事件的时候调用,这里也用到了尾随闭包的语法糖,提高阅读性。其次,将刚刚构造的匿名观察者,通过subscribeSafe函数来完成订阅。那么subscribeSafe究竟做了一些什么事情呢?
extension ObservableType {
func subscribeSafe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
return self.asObservable().subscribe(observer)
}
}
subscribeSafe是一个内部的方法,所有内部的订阅操作全部通过该方法来完成,一般最后都是通过subscribe方法的多态性来完成最终的订阅,那么回想一下之前Just的subscribe方法我们就可以知道,一旦调用subscribe方法,Just立刻给匿名观察者发送一个包裹了初始值的.next事件和一个.completed事件,最后返回一个NopDisposable类型的“存根”,NopDisposable是一个在执行dispose操作时不进行任何操作的存根。然后整个订阅过程就结束了。
DisposeBag
对于开头的代码,我们现在唯一还没有讲到的就是addDisposableTo这个方法,我们都知道,当一个序列执行subscribe之后我们会得到一个遵守Disposable的存根。那么根据方法名,我们也可以猜到这个一个将存根添加到一个地方的方法,那么它是要将存根添加到哪里呢?
没错!就是我们天天在写的DisposeBag。在DisposeBag.swift中我们可以找到该方法的定义:
extension Disposable {
/// Adds `self` to `bag`.
///
/// - parameter bag: `DisposeBag` to add `self` to.
public func addDisposableTo(_ bag: DisposeBag) {
bag.insert(self)
}
}
那么DisposeBag到底是个什么东西呢?talk is cheap,我们直接来看源码:
public final class DisposeBag: DisposeBase {
private var _lock = SpinLock()
// state
private var _disposables = [Disposable]()
private var _isDisposed = false
/// Constructs new empty dispose bag.
public override init() {
super.init()
}
/// Adds `disposable` to be disposed when dispose bag is being deinited.
///
/// - parameter disposable: Disposable to add.
public func insert(_ disposable: Disposable) {
_insert(disposable)?.dispose()
}
private func _insert(_ disposable: Disposable) -> Disposable? {
_lock.lock(); defer { _lock.unlock() }
if _isDisposed {
return disposable
}
_disposables.append(disposable)
return nil
}
/// This is internal on purpose, take a look at `CompositeDisposable` instead.
private func dispose() {
let oldDisposables = _dispose()
for disposable in oldDisposables {
disposable.dispose()
}
}
private func _dispose() -> [Disposable] {
_lock.lock(); defer { _lock.unlock() }
let disposables = _disposables
_disposables.removeAll(keepingCapacity: false)
_isDisposed = true
return disposables
}
deinit {
dispose()
}
}
其实DisposeBag这个类设计的还是非常的简单明了的,暴露给外部的只有一个insert方法,将需要被管理的Dispose交给这个Bag,当该Bag执行deinit方法的时候执行dispose,将所持有的所有Disposable遍历一遍,同时挨个dispose,值得注意的是,该类内部使用了一个锁:
private var _lock = SpinLock()
这个SpinLock其实就是一个NSRecursiveLock的递归锁,该🔐的作用就是为了保证_disposables的数组线程安全,之所以用递归锁是因为有可能会出现在相同的线程多次调用insert的而引发死锁。
正常情况下,执行insert方法,首先会执行加锁操作,然后Bag会将该Disposable加入到_disposables这个数组中,最后解锁。但是还有一种情况,那就是当执行insert操作的时候,该Bag已经被析构了,那么我们就不需要再将其加入数组,直接将该Disposable释放掉就可以了。
QA
当一个序列构造完毕的之后,调用subscribe
方法会进行SubscribeHandler
,也就是进行订阅的相关操作。具体来说,对于Just
这个序列,SubscribeHandler
指的是就是发送一个.next(element)
事件和一个.completed
事件;对于NeverProducer
这个序列,SubscribeHandler
指的是单单只发送一个.completed
事件;所以对于不同的SubscribeHandler
会有不同的订阅操作,总的来说是根据序列的特性来发送给观察者不同的事件流。
值得一提的是在RxSwift中还有一个很重要的概念Sink
,关于它的解释可以参考一下这个issue, Sink
相当与一个加工者,可以将源事件流转换成一个新的事件流,如果讲事件流比作水流,事件的传递过程比作水管,那么Sink
就相当于水管中的一个转换头。关于Sink
我们会在之后的文章中详细的讲述。
我们分析了在RxSwift中的整个订阅流程。在开讲变换操作之前,首先要弄清楚Sink的概念,不清楚的同学可以翻看上一篇的分析。简单的来说,在每一次订阅操作之前都会进行一次Sink对流的操作。如果把Rx中的流当做水,那么Sink就相当于每个水管水龙头的滤网,在出水之前进行最后的加工。
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
通过上面的源码我们可以发现,每当一个Observable被订阅,那么该Observable一定会执行run方法,而run方法中做的事情就是Sink的相关处理操作。
简单的来说Sink主要做两件事情:
对Next、Complete、Error事件的转发;
对流转发之前的预先变化。
而我们的变换操作基本上都是在各种各样的Sink中操作的,为什么说是基本上呢?因为在一些高阶变化(嵌套变换)的情况之下,Sink并不是发生变换的地方,具体的情况在下文会慢慢说到。
例子
Observable.of(1, 2, 3)
.map { $0 * $0 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
我们可以在map方法之上卡一个断点,程序运行之后我们可以看到停在了下面的方法定义
extension ObservableType {
public func map<R>(_ transform: @escaping (E) throws -> R)
-> Observable<R> {
return self.asObservable().composeMap(transform)
}
}
我们可以看到,这里做了两件事情,首先确保把调用者转化成Observable,因为符合ObservableType的对象有可能是ControlEvent,ControlProperty之类的东西。然后调用composeMap方法,将我们所期望的变换操作的闭包传入。
OK,我们再进一层,来看看composeMap做了什么:
internal func composeMap<R>(_ transform: @escaping (Element) throws -> R) -> Observable<R> {
return _map(source: self, transform: transform)
}
我们可以看到,在这里Observable调用了自身的_map私有方法:
internal func _map<Element, R>(source: Observable<Element>, transform: @escaping (Element) throws -> R) -> Observable<R> {
return Map(source: source, transform: transform)
}
final fileprivate class Map<SourceType, ResultType>: Producer<ResultType> {
typealias Transform = (SourceType) throws -> ResultType
private let _source: Observable<SourceType>
private let _transform: Transform
init(source: Observable<SourceType>, transform: @escaping Transform) {
_source = source
_transform = transform
}
override func composeMap<R>(_ selector: @escaping (ResultType) throws -> R) -> Observable<R> {
let originalSelector = _transform
return Map<SourceType, R>(source: _source, transform: { (s: SourceType) throws -> R in
let r: ResultType = try originalSelector(s)
return try selector(r)
})
}
override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == ResultType {
let sink = MapSink(transform: _transform, observer: observer, cancel: cancel)
let subscription = _source.subscribe(sink)
return (sink: sink, subscription: subscription)
}
}
我们可以看到,所谓的_map实际上又返回了一个基于Producer类(Producer继承自Observable,而Observable类中又是最开始定义composeMap的地方,这个集成链对于接下来的理解很重要)的Map对象。这里主要做了三件事情:
(1)首先把通过构造器把“可观察序列”和“变换操作”保存起来备用。
(2)重写父类的composeMap,从原来的直接使用传入的“变换操
作”(transform)构造Map对象变成了先使用Map对象自带的“变换操作”进行一次变换,再使用传入的“变换操作”进行一次变换。这样的递归处理方式就可以达到嵌套处理map操作的目的,就像这样:Observable<Int>.of(1, 3, 5).map(+).map(+)。
(3)重写父类的run方法,就像前文中说的那样,run方法会在订阅之前执行,并且使用各类的Sink在传递数据时对“数据源”进行各类的加工处理。而在这个例子中,这个Sink就是MapSink,这个MapSink在每次的Next事件的时候,使用传入的transform对数据源进行加工,然后再将加工后的数据源传出。
至此所有的map操作已经全部完成。我们可以看到,map的操作其实是“惰性”的,也就是说,当你使用了map操作除非你使用了嵌套map或者对观察序列进行了订阅,否则他们都不会立刻执行变换操作。
生产者-消费者模式
在RxSwift的设计实现过程中,其实也是对生产者-消费者模式(Producer–consumer pattern)实际应用。在RxSwift中,所有的可观察序列都充当着生产者的作用,所以我们可以变换操作最后返回的都是一个继承自Producer
类的一个子类(除了一些Subject,Subject比较特殊,之后会好好讨论一下)。
上面的脑图大概展示了Producer所派生的子类,我们可以看到,无论是我们常用的“初始化”方法:just、of、from,还是我们常用的变换方法:map,flatMap,merge,他们所对应的实现都是一种Producer。
我们可以看到,也正是得益于生产者-消费者模式的实现,使得RxSwift在可观察序列如同工厂里的流水线一样,可以在每一条流水线结束之前进行自定义的加工。
总结
接下来我们可以俯瞰一下RxSwift对于事件变换的操作,以下做一些逻辑上的梳理工作,希望大家可以看的更加清楚
1. 协议拓展
从一个协议开始。 ---- WWDC 2015
我们知道,RxSwift的可观察序列都是基于ObservableType,所以当我们需要给所有的可观察序列添加一个变换操作的时候,我们只需要通过extension来添加一个公开的方法,然后去实现它。
public func map<R>(_ transform: @escaping (E) throws -> R)
-> Observable<R> {
return self.asObservable().composeMap(transform)
}
public func flatMap<O: ObservableConvertibleType>(_ selector: @escaping (E) throws -> O)
-> Observable<O.E> {
return FlatMap(source: asObservable(), selector: selector)
}
public func concat<O: ObservableConvertibleType>(_ second: O) -> Observable<E> where O.E == E {
return Observable.concat([self.asObservable(), second.asObservable()])
}
public static func combineLatest<O1: ObservableType, O2: ObservableType>
(_ source1: O1, _ source2: O2)
-> Observable<(O1.E, O2.E)> {
return CombineLatest2(
source1: source1.asObservable(), source2: source2.asObservable(),
resultSelector: { ($0, $1) }
)
}
// More and more ....
}
上面我所列出来的代码是我为了集中展示所以放在同一个extension中,在实际的源码中他们都是分散在不同的swift文件中的。所以我们知道,所有我们所使用的变换操作,都是通过extension拓展到ObservableType协议当中的。
通过翻看源码我们可以看到,上述的变换操作其实都做了一件事情,那就是返回一个Producer的具体子类。比如map返回的是Map类的实例对象,combineLatest返回的是CombineLatest2类的实例对象。
2. 具象化的Producer
那么通过拓展方法所返回的Producer的子类又是做了一些什么事情呢?
首先,具象化的Producer一定会重写override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Accumulate方法,在该方法中,RxSwift通过具象化的Sink来对数据源进行处理,然后让源可观察序列执行订阅。
其次,Producer在初始化的时候会至少接收两个参数:一个参数是所传递的可观察序列,另外一个参数是所进行变换操作的闭包。当然,有些变换操作可能由于操作的特性而需要三个的参数。比如Scan操作,不仅仅需要闭包accumulator,而且还需要一个seed,这也是由Scan操作的特性所决定了,在这里不多加赘述。当Producer保存了这些变换所必要的参数之后,在run方法中的sink就能够在订阅输出之前执行这些变换,然后输出给订阅者了。
值得注意的是,由于run方法和subscribe方法之间的递归调用,所以这样的实现模式也天然的支持嵌套的变换操作
3. "苦力"Sink
所以变换的闭包的执行都是在各类的Sink当中,比如MapSink:
func on(_ event: Event<SourceType>) {
switch event {
case .next(let element):
do {
/// 进行变换操作
let mappedElement = try _selector(element, try incrementChecked(&_index))
/// 将变换操作之后的事件转发给原来的观察者
forwardOn(.next(mappedElement))
}
catch let e {
forwardOn(.error(e))
dispose()
}
case .error(let error):
forwardOn(.error(error))
dispose()
case .completed:
forwardOn(.completed)
dispose()
}
}
我们可以看到,在这里我们终于进行了变换操作,并且变换操作之后将结果转发给了观察者。
至此,整条变换链都转换完毕。
设计的遗憾
在composeMap的定义方法之上,我们可以看到如下的一段注释:
// this is kind of ugly I know :(
// Swift compiler reports "Not supported yet" when trying to override protocol extensions, so ¯\_(ツ)_/¯
/// Optimizations for map operator
在上一节的总结中我们知道,在RxSwift中的变换操作的嵌套是通过run方法和subscribe方法的递归调用来解决的。但是这里存在问题,比如,当你嵌套10个map方法的时候,每次发生onNext都会导致10次的变换操作的递归调用,然后再生成最后的值传递给订阅者。用简单的函数式的表达就像这样:
10(+1)(+1)(+1)(+1)(+1)(+1)(+1)(+1)(+1)(+1) = 20
那么,我们为什么不可以直接这样呢?
10(+10) = 20
基于这样的考虑,我们可以看到map的默认实现比较特殊,它并不是直接返回一个Map对象,而是通过composeMap返回一个Map对象,然后再在Map对象中重写composeMap以达到当发生嵌套调用的时候可以优化函数式调用
final fileprivate class Map<SourceType, ResultType>: Producer<ResultType> {
// ....
override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == ResultType {
let sink = MapSink(transform: _transform, observer: observer, cancel: cancel)
let subscription = _source.subscribe(sink)
return (sink: sink, subscription: subscription)
}
}
也正是为了这样的一个优化,导致似乎看起来很ugly,这也是设计上的遗憾吧。