走进 RxSwift 之冷暖自知

瞎扯几句

前段时间身体跟心态都出了点问题,博客也很久没更新了。细心的朋友可能发现我的个人介绍换了,由原先高冷装逼的“谢绝转载”变为略显矫情的“人生谁能不迷茫”了。不知道大家有没有这样的经历,因为一些三言两语难以说清的理由,或者干脆就是无端地对代码产生了一些排斥情绪,下班后看不进书也不想碰代码。我经历了几天这样的日子,挺难受的,好在很快就走出来了。编程本身其实是件有趣的事,但任何事情,一旦将其作为职业,便也失了纯粹,总会有身不由己的时候。做个程序猿,也是如人饮水,冷暖自知。

关于标题

言归正传啊,今天还是想跟大家聊一聊 RxSwift ,之前我写过一篇 走进 RxSwift 之观察者模式,讲解了 RxSwift 的部分实现。今天这个标题还是以“走进 RxSwift”为开头,暗示着这将会是一个系列(感觉立了个 Flag 啊……)。至于冷暖自知呢,就有一点讲究了,可不仅仅是一句感慨。同为 FRP 框架的 RAC 中素有冷信号和热信号的概念,而且是两种不同的类型。其实 RxSwift 中的 Observable 也有冷热之分,但为什么提的人不多呢,官方文档是这么说的:

IMHO, I would suggest to more think of this as property of sequences and not separate types because they are represented by the same abstraction that fits them perfectly, Observable sequence.

RxSwift 认为不管是 Cold 还是 Hot,它们都是 Observable,它们同属于一个抽象,而不是两种独立的类型。这个观点很有意思,也就是说如果你自己声明了一个 Observable(遵守 ObservableType 协议),使用者并不知道它是 Cold 还是 Hot,只有你自己清楚,真正的“冷暖自知”。

使用场景

关于冷热 Observable 的异同,跟冷热信号的异同是类似的,网上已经有很多资料了,我就不详细展开了。简而言之,Cold Observable(以下简称 CO)只有在被订阅的时候才会发射事件,每次有新的订阅者都会把之前所有的事件都重新发射一遍; Hot Observable(以下简称 HO)则是实时的,一旦有新的事件它就发射,不管有没有被订阅,而新的订阅者并不会接收到订阅前已经发射过的事件。

一个姑娘,不管她冷若冰霜还是热情似火,总有人喜欢(毕竟只要漂亮就好0 0)。Observable 也是一样,冷热只是其特性,并没有优劣之分,它们都有各自的应用场景。HO 有点“推模型”的意思,它会 push 新的事件过来,一些实时性要求较高的场景(譬如各种响应事件、通知消息等),如果你要自己用 Rx 去封装的话可以用 HO(当然这些 RxCocoa 基本都帮你做了,而且严格来说也不算用了 HO,这个下文再叙)。而 CO 则有点“拉模型”的意思,只在需要的时候去 pull(subscribe),所以在封装网络请求和一些异步操作的时候,可以使用 CO。

实战小剧场

下面我随便举个例子,我们要完成的功能是发送网络请求,将返回的数据显示到一个 TableView 上。假设我们已经有了一个网络模块,它的调用接口大概长这样:

class Resource {
    typealias CompletionHandler = (Data?, Error?) -> Void

    func request(completion: CompletionHandler) {
        // ...
    }
}

这是一个非常典型的网络请求,在回调中处理返回数据。不过我们想用 Rx 对它进行一点小包装,让它直接返回一个 Observable。记得我前面说过吧,异步操作一般是用 CO:

extension Resource {
    func request() -> Observable<Data> {
        return Observable.create { observer in
            self.request() { (data, error) in
                if let error = error {
                    observer.onError(error)
                    return
                }
                if let data = data {
                    observer.onNext(data)
                }
                observer.onCompleted()
            }

            return Disposables.create()
        }
    }
}

顺便说一下,用Observable.createObservable.justObservable.empty等方法创建的一般都是 CO。好的,现在我们可以这样调用了:

let bag = DisposeBag()
let testResource = Resource()

testResource.request()
    .map(parse)
    .subscribe { print($0) }
    .addDisposableTo(bag)

为了便于测试,造点假数据打印一下:

func request(completion: CompletionHandler) {
    // test
    let data = Data()
    completion(data, nil)
}

func parse(data: Data) -> String { return "Test Data" }

这时顺利打印出了如下内容:

next(Test Data)
completed

到此为止网络请求已经没有问题了,现在只要把数据显示到 TableView 上就好了,我们稍加修改:

func parse(data: Data) -> [String] { return ["Test Data"] }

testResource.request()
    .map(parse)
    .bindTo(tableView.rx.items(cellIdentifier: "Cell", cellType: UITableViewCell.self)) { (_, element, cell) in
        cell.textLabel?.text = element
    }
    .addDisposableTo(bag)

好的,我们已经圆满完成了任务。然而生活永远不会这么一帆风顺,在你准备收拾收拾下班的时候,产品经理突然过来说,兄弟,我昨天喝多了脑子不灵清,忘了跟你说我们这个页面是要能刷新的……OK,不就是刷新么,好说,这就给你加上。于是你把请求数据并显示的这段代码放到一个函数里。但这时候你开始纠结了,函数名……该叫什么呢?fetchData?可这玩意儿不仅去查询了数据,还展示了数据啊,咦?说好的一个函数只做一件事呢……哎不管了,还要回家遛狗呢,先完成功能再说吧,于是你机智地写下了一个bindDataSource函数,在加载视图和用户下拉刷新的时候都调用这个函数:

func bindDataSource() {
    Resource().request()
        .map(parse)
        .bindTo(tableView.rx.items(cellIdentifier: "Cell", cellType: UITableViewCell.self)) { (_, element, cell) in
            cell.textLabel?.text = element
        }
        .addDisposableTo(bag)
}

你觉得万无一失,潇洒地按下 cmd + R,准备看一眼效果就走人。然而世事难料,有时在残酷的现实面前你不得不低下你高傲的头颅,你低头看了眼屏幕,发现在下拉的时候,触发了一个断言:

"Hint: Maybe delegate was already set in xib or storyboard and now it's being overwritten in code.\n")

这很好理解,应该是刷新的时候重复绑定 data source 了,显然只要在每次 bind 之前把 tableView 的 dataSource 置 nil 就行了。于是你在bindDataSource开头加了一句tableView.dataSource = nil,再次 cmd + R。Everything goes well!一切都在意料之中,你捋了捋一丝不乱的刘海,合上电脑回了家。

但这晚注定是个不平静的夜晚,你躺在床上辗转反侧,总觉得哪里不对劲。你的脑海中一直盘旋着tableView.dataSource = nil这句代码,为什么每次刷新都需要重新绑定 dataSource?这什么套路,怎么这么不按常理出牌?我白天的时候在想什么?就这样你陷入了无尽的自我拷问之中,你看了眼身旁熟睡的姑娘,起来蹑手蹑脚地打开了电脑,把代码改成了这样:

let response: Variable<[String]> = Variable([])

func fetchData() {
    Resource().request()
        .map(parse)
        .bindTo(response)
        .addDisposableTo(disposeBag)
}

func bindDataSource() {
    response.asDriver()
        .drive(tableView.rx.items(cellIdentifier: "Cell", cellType: UITableViewCell.self)) { (_, element, cell) in
            cell.textLabel?.text = element
        }
        .addDisposableTo(disposeBag)
}

在页面加载的时候调用fetchDatabindDataSource,而每次刷新页面的时候,调用fetchData就可以了。response既是 Observer 也是 Observable,它作为 Observer 订阅了网络数据的变化,每次一有新的数据,就发送新的事件,tableView 随之更新。很明显,Variable 是个 HO,它其实是 BehaviorSubject 的一个封装,顺便说一下,RxSwift 中的所有 Subject 都是 HO。

如何一眼区别 CO 和 HO

前面也说过了,不管是冷是热,它们都是 Observable,虽然我顺便提了几个 RxSwift 中典型的 CO 和 HO,但如果是一个自定义的 Observable 呢,要如何区分它是冷是热呢?嗯,看源码啊……毕竟源码面前没有秘密嘛。不过看源码也有点讲究,对于今天这个话题,其实只要看它实现的subscribe方法就可以了。之前我在 走进 RxSwift 之观察者模式中解释了Observable.empty的实现,虽然那是比较老的版本,不过核心原理是一样的。默认的 Observable 的实现是冷的,像之前例子中我用Observable.create创建了一个Observable实例,create方法的参数是一个闭包,这个闭包被赋值给一个属性,每当这个实例被订阅的时候,闭包就会被执行。我之前试着实现过一个简化版的 Rx 模型,可以用create方法创建一个 CO,用 Swift2.2 写的,大家可以稍微看下,领会精神:)

enum Event<T> {
    case Next(T)
    case Completed
    case Error(ErrorType)
}

typealias CompletedHandler = () -> Void
typealias ErrorHandler = ErrorType -> Void

class Observable<E> {
    typealias SubscribeHandler = Observer<E> -> Void

    private let subscribeHandler: SubscribeHandler
    private init(handler: SubscribeHandler) {
        subscribeHandler = handler
    }

    static func create(subscribe: Observer<E> -> Void) -> Observable<E> {
        return Observable(handler: subscribe)
    }

    func subscribe(observer: Observer<E>) {
        subscribeHandler(observer)
    }

    func subscribe(on: Event<E> -> Void) {
        let anonymousObserver = Observer<E>(handler: on)
        subscribe(anonymousObserver)
    }

    // todo
    // func subscribe(onNext: E -> Void) {}
    // func subscribe(onError: ErrorHandler) {}
    // func subscribe(onCompleted: CompletedHandler) {}
}

class Observer<E> {
    typealias EventHandler = Event<E> -> Void
    let eventHandler: EventHandler
    // todo
    // let nextHandler: E -> Void
    // let errorHandler: ErrorHandler
    // let completedHandler: CompletedHandler

    init(handler: EventHandler) {
        eventHandler = handler
    }

    func on(event: Event<E>) {
        eventHandler(event)
    }

    // todo
    func onNext(element: E) {}
    func onCompleted() {}
    func onError(error: ErrorType) {}
}


let observable = Observable.create { (observer: Observer<String>) in
    observer.on(.Next("Value"))
    observer.on(.Completed)
}

// Mark: Cold
observable.subscribe { event in
    switch event {
    case .Next(let value):
        print(value)
    case .Error(let error):
        print(error)
    case .Completed:
        print("Completed")
    }
}

observable.subscribe { event in
    print(event)
}

这段代码在 Xcode7 是能正常跑起来的,调用起来跟 RxSwift 也没什么不同,输出也没问题。大家想必也发现了,CO 一般是无状态的,它不会去维护一堆 Observers 或者一堆 Events 什么的,它就是一堆函数(或者说闭包),在被订阅的时候被调用,所以 CO 是比较符合 FP 的思想的。而 HO 呢,恰恰相反,我们看一下一个典型的 HO——PublishSubjectsubscribe方法:

public override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
    _lock.lock(); defer { _lock.unlock() }
    return _synchronized_subscribe(observer)
}

func _synchronized_subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == E {
    if let stoppedEvent = _stoppedEvent {
        observer.on(stoppedEvent)
        return Disposables.create()
    }

    if _isDisposed {
        observer.on(.error(RxError.disposed(object: self)))
        return Disposables.create()
    }

    let key = _observers.insert(observer.asObserver())
    return SubscriptionDisposable(owner: self, key: key)
}

这段代码看着复杂,但它的核心其实就一句:

let key = _observers.insert(observer.asObserver())

也就是把当前的订阅者加到一个订阅者集合中,而当有新的事件时,就发送给集合中所有的订阅者:

public func on(_ event: Event<Element>) {
    _lock.lock(); defer { _lock.unlock() }
    _synchronized_on(event)
}

func _synchronized_on(_ event: Event<E>) {
    switch event {
    case .next(_):
        if _isDisposed || _stopped {
            return
        }

        _observers.on(event)
    case .completed, .error:
        if _stoppedEvent == nil {
            _stoppedEvent = event
            _stopped = true
            _observers.on(event)
            _observers.removeAll()
        }
    }
}

同样,这段代码的核心是:

_observers.on(event)

_observers的类型并不是 Swift 原生的某种集合类型,可能是出于性能考虑,RxSwift 定义了一个叫Bag的数据结构,但原理上是一样的。这个on方法就是给每个订阅者发送事件。

由上可得,HO 其实是比较典型的观察者模式,跟 target-action 啊 NSNotificationCenter 啊等等的实现原理是差不多的,都需要维护一个观察者集合。RxCocoa 为我们封装了各种事件响应,按理说应该是用 HO,但我看了代码发现并非如此:

public func controlEvent(_ controlEvents: UIControlEvents) -> ControlEvent<Void> {
    let source: Observable<Void> = Observable.create { [weak control = self.base] observer in
        MainScheduler.ensureExecutingOnScheduler()

        guard let control = control else {
            observer.on(.completed)
            return Disposables.create()
        }

        let controlTarget = ControlTarget(control: control, controlEvents: controlEvents) {
            control in
            observer.on(.next())
        }

        return Disposables.create(with: controlTarget.dispose)
        }.takeUntil(deallocated)

    return ControlEvent(events: source)
}

这个controlEvent方法是非常关键的,像 UIButton 的 rx.tap 其实就是调用了这个方法,我们发现这里是用create方法创建了一个 Observable,ControlEvent 其实是个壳而已,真正工作的还是这个source

public struct ControlEvent<PropertyType> : ControlEventType {
    public typealias E = PropertyType

    let _events: Observable<PropertyType>

    public init<Ev: ObservableType>(events: Ev) where Ev.E == E {
        _events = events.subscribeOn(ConcurrentMainScheduler.instance)
    }
   
    public func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == E {
        return _events.subscribe(observer)
    }
    // ...
}

看这个subscribe,就是调用了之前create出来的sourcesubscribe而已,所以它也显然是个 CO。这就有点尴尬了……这明显是个用 HO 的场景啊……让我们回头看,玄机其实在这一句:

let controlTarget = ControlTarget(control: control, controlEvents: controlEvents) {
    control in
    observer.on(.next())
}

让我们看下ControlTarget是哪条道上的,看一下初始化函数:

init(control: Control, controlEvents: UIControlEvents, callback: @escaping Callback) {
    MainScheduler.ensureExecutingOnScheduler()

    self.control = control
    self.controlEvents = controlEvents
    self.callback = callback

    super.init()

    control.addTarget(self, action: selector, for: controlEvents)

    let method = self.method(for: selector)
    if method == nil {
        rxFatalError("Can't find method")
    }
}

我们看重点这一句:

control.addTarget(self, action: selector, for: controlEvents)

是不是很眼熟?没错,这就是我们平常用的那个addTarget。所以 UIKit 已经有在维护一个观察者集合了,本身已经是“热”的了,Rx 就没必要再去加把火了。

冷热转换

虽然我个人比较喜欢使用 CO,但有些场景确实是 HO 比较合适,那可以把一个 CO 转化为 HO 么?Observable 提供了一些操作用以返回 HO,最常用的是shareReplay。由于现实中大部分的场景都是调用shareReplay(1),所以 RxSwift 对 bufferSize 为 1 的情况作了特别处理,调用shareReplay(1)会返回一个ShareReplay1类型的实例,它是个典型的 HO,跟BehaviorSubject作为 Observable 时的行为类似。当然也可以使用shareReplayLatestWhileConnected,它返回一个ShareReplay1WhileConnected实例,与ShareReplay1不同的是,当订阅者从 1 变为 0 的时候,它会清空缓存区,包括 completed 和 error 事件。其他还有些不常有的操作也能返回 HO,如 replayreplayAllpublish等,返回的都是ConnectableObservableAdapter的实例。这个看名字就知道是个适配器,它会在subscribe方法中调用属性_subjectsubscribe方法。所以它之所以“热”,怎么个“热”法,都是由它内部对应的Subject(ReplaySubjectPublishSubject)决定的,篇幅所限,就不做展开了。

最后

好了,本期大致就是如此了,一些细枝末节不再赘述,有兴趣的朋友大可以自己去看看源码。最后,祝您身体健康,生活愉快,再见!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,179评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,229评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,032评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,533评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,531评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,539评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,916评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,813评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,568评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,654评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,354评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,937评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,918评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,152评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,852评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,378评论 2 342

推荐阅读更多精彩内容

  • 本文章内部分图片资源来自RayWenderlich.com 本文结合自己的理解来总结介绍一下RxSwift最基本的...
    FKSky阅读 2,853评论 4 14
  • 发现 关注 消息 RxSwift入坑解读-你所需要知道的各种概念 沸沸腾关注 2016.11.27 19:11*字...
    枫叶1234阅读 2,778评论 0 2
  • 最近在学习RxSwift相关的内容,在这里记录一些基本的知识点,以便今后查阅。 Observable 在RxSwi...
    L_Zephyr阅读 1,739评论 1 4
  • 我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy阅读 5,451评论 7 62
  • 作业题目 为自己的目标/问题,列1个由5本书组成的小书单 问题 提高工作效率 小书单 《超效率手册》 《搞定》 《...
    瘦子居酒屋阅读 251评论 0 2