ReactiveSwift框架分析3 — SignalProducer用法及源码分析

前面我们学过如何创建信号并观察它,以及通过分析源码了解了底层现实原理,那这篇文章总结下SignalProducer的用法及原理。

SignalProducer封装了延迟的和可重复的任务,这些任务会在启动时生成信号。

那它怎么用呢?

还是看个例子吧:

每隔5秒打印一条时间信息。

第一步,创建SignalProducer

//Creating a SignalProducer
let signalProducer: SignalProducer<Int, NoError> =
 SignalProducer { (observer, lifetime) in
    for i in 0..<10 {
        DispatchQueue.main.asyncAfter(deadline: .now() + 5.0 *  Double(i)) 
        {          
               observer.send(value: i)
               if i == 9 { //Mark completion on 9th iteration
                     observer.sendCompleted()
               }
        }
    }
}

在这里,使用一个闭包初始化SignalProducer,该闭包会在SignalProducer调用start方法时执行。此闭包有两个接收值:

  1. observer用来发送值。
  2. lifetime为我们提供了一个机会,如果停止观察,我们可以取消正在进行的工作。

第二步,创建观察者

//Creating an observer
let signalObserver = Signal<Int, NoError>.Observer (
value: { value in
     print("Time elapsed = \(value)")
}, completed: {
     print("completed")
}, interrupted: { 
     print("interrupted")
})

第三步,启动观察,执行block

//Start a SignalProducer
let disposable = signalProducer.start(signalObserver)

第四步,假设我们想在10秒后中断SignalProducer:

//Dispose after 10 seconds
DispatchQueue.main.asyncAfter(deadline: .now() + 10.0) {
     disposable.dispose()
}

但是,根据我们当前的实现,即使观察者在10秒后被释放,SignalProducer仍然会在50秒内继续发出整数值,此时lifetime就可以利用起来了。

let signalProducer: SignalProducer<Int, NoError> = 
SignalProducer { (observer, lifetime) in
    for i in 0..<10 {
       DispatchQueue.main.asyncAfter(deadline: .now() + 5.0 *   Double(i)) 
       {
            guard !lifetime.hasEnded else { 
                observer.sendInterrupted()
                return
            }
            observer.send(value: i)
            if i == 9 { 
                observer.sendCompleted()
            }
        }
    }
}

通过判断lifetime的hasEnded属性值,如果为true,就发送一个中断interruption事件,SignalProducer就会终止。

Signal vs SignalProducer

为了理解Signal和SignalProducer之间的区别,可以以直播跟录播为例来类比。

Signal就像直播,它是一个连续的视频和音频流。在给定的时间点,每个观察者都看到相同的帧序列。观察者不能影响整个播放过程。

而SignalProducer就像录播。不同的观察者,在给定的时间点,可以看到不相同的帧序列。

因此,Signal通常用于表示已经“在进行中”的事件流,例如通知、用户输入等。
SignalProducer用于表示需要启动的操作或任务。例如网络请求。

SignalProducer是ReactiveSwift中冷信号的实现,冷信号需要一个唤醒操作,然后才能发送事件,而这个唤醒操作就是订阅它,因为订阅后才发送事件。

热信号Signal相当于现场直播,晚来了前面的节目就没法看,冷信号相当于录播,早晚都能看。

SignalProducer是如何保存Observer对象的呢?

跟Signal 一样其内部有个SignalProducerCore,我们主要来分析SignalProducerCore源码:

//SignalProducerCore
internal class SignalProducerCore {
    struct Instance {
        let signal: Signal
        let observerDidSetup: () -> Void
        let interruptHandle: Disposable
    }
     
    func makeInstance() -> Instance {
        fatalError()
    }
     
    func start(_ generator: (_ upstreamInterruptHandle: Disposable) ->    Signal.Observer) -> Disposable {
        fatalError()
    }
    ......
}

SignalProducerCore 内部有个Instance,它的作用是:

  1. 持有一个热信号Signal,用于保存订阅者添加的Observer对象
  2. 持有一个() -> Void闭包,用于执行回调(对子类SignalCore来说 这个闭包的作用则是向上面的Signal.core.state.Observes数组发送Event)

还有两个抽象方法,供其子类去具体实现。SignalProducerCore有三个子类SignalCoreGeneratorCoreTransformerCore,其中SignalCoreGeneratorCore用于普通操作,而TransformerCore则是在map, take, filterMap...等操作才会用上。

来看看SignalCore的源码:

//SignalCore
private final class SignalCore: SignalProducerCore {
    private let _make: () -> Instance
     
    //这个action会由SignalProducer传入
    init(_ action: @escaping () -> Instance) {
        self._make = action //初始化就是给闭包make赋值
    }
     
    //外部执行SignalProducer的start函数内部实现
    override func start(_ generator: (Disposable) -> Signal.Observer) -> Disposable {
        let instance = makeInstance()// 1. 创建一个热信号signal
        instance.signal.observe(generator(instance.interruptHandle)) 2. 通过参数generator创建一个观察者并订阅上面创建的signal
        instance.observerDidSetup()3. 订阅signal完成,执行回调
        return instance.interruptHandle
    }
     
    override func makeInstance() -> Instance {
        return _make()
    }
}

可以看出通过start函数,内部创建了一个Signal,并且创建了一个观察者订阅了Signal。

Observer中封装的逻辑是如何被执行的:

//SignalProducer.swift
public init(_ startHandler: @escaping (Signal.Observer, Lifetime) -> Void) {
     self.init(SignalCore { //action闭包
          let disposable = CompositeDisposable()
          let (signal, innerObserver) = Signal.pipe(disposable: disposable)
             
          let observerDidSetup = { 
                startHandler(innerObserver, Lifetime(disposable)) 
          }
          let interruptHandle = AnyDisposable(observer.sendInterrupted)
  
          return SignalProducerCore .Instance(signal: signal,
                                 observerDidSetup: observerDidSetup,
                                         interruptHandle: interruptHandle)
     })
}

可以看出,创建Producer时,内部初始化一个SignalProducerCore,创建的signal、observerDidSetup、interruptHandle也给了SignalProducerCore的instance。

observerDidSetup内部封装了startHandler的执行,创建的innerObserver传入到startHandler。

而在上面SignalCore的start函数内部实现中,instance调用了observerDidSetup,也就执行了startHandler闭包,所以在startHandler中如果我们用Observer发送值实际上是用传入到startHandler中的innerObserver来发送值。

另外,在外界调用start函数时,会创建一个观察者outObserver并订阅instance的signal。

所以每调用一次start,startHandler就会执行一次。

举个栗子,熟悉下用法

  1. 创建SignalProducer
let producer = SignalProducer<Int, NoError> { (innerObserver, lifetime) in
    lifetime.observeEnded({
        print("free sth")
    })
   
    innerObserver.send(value: 1)
    innerObserver.send(value: 2)
    innerObserver.sendCompleted()
}
  1. 创建一个观察者封装事件处理逻辑
let outerObserver = Signal<Int, NoError>.Observer(value: { (value) in
    print("did received value: \(value)")
})
  1. 添加观察者到SignalProducer
producer.start(outerObserver)

最终输出:

did received value: 1
did received value: 2
free sth

当然使用SignalProducer.startXXX可以省去第二步,还可以如下写法:

typealias Producer<T> = SignalProducer<T, NoError>

let producer = Producer<Int> { (innerObserver, _) in
    innerObserver.send(value: 1)
    innerObserver.send(value: 2)
    innerObserver.sendCompleted()
}
producer.startWithValues { (value) in
    print("did received value: \(value)")
}
producer.startWithFailed(action: )
producer.startWithResult(action: )
producer.startWithXXX...

使用SignalProducer发起网络请求的坑

第一步,网络请求方法:

func fetchData(completionHandler: (Int, Error?) -> ()) {
    print("发起网络请求")
    completionHandler(1, nil)
}

第二步,将网络请求封装到SignalProducer中

let producer = Producer {[unowned self] (innerObserver, _) in
    self.fetchData(completionHandler: { (data, error) in
        innerObserver.send(value: data)
        innerObserver.sendCompleted()
    })
}

第三步,触发网络请求

producer.startWithValues { (value) in
    print("did received value: (value)")
}
 
producer.startWithValues { (value) in
    print("did received value: (value)")
}

咻...请看最终输出:

发起网络请求
did received value: 1
发起网络请求
did received value: 1

看到了吗,发生两次网络请求,我们当然只是想发送一次请求,在订阅时多次操作请求到的数据而已,而SignalProducer会在每次被订阅就会执行一次初始化时保存的闭包,所以会发生多次网络请求。

为了解决这个问题,可以使用Signal:

typealias NSignal<T> = Signal<T, NoError>
let signalTuple = NSignal.pipe()
signalTuple.output.observeValues { (value) in
    print("did received value: (value)")
}
 
signalTuple.output.observeValues { (value) in
    print("did received value: (value)")
}
 
self.fetchData { (data, error) in
    signalTuple.input.send(value: data)
    signalTuple.input.sendCompleted()
}
 
输出: 发起网络请求
     did received value: 1
     did received value: 1

最后,自定义Error时需要注意:

struct APIError: Swift.Error {
    let code: Int
    var reason = ""
}

由于默认的SignalProducer是没有startWithValues函数的,ReactiveSwift会在Extension里给它加上startWithValues函数,但是这只对NoError有效,所以在自定义Error时,要加上以下代码才有效:

extension SignalProducer where Error == APIError {
     
    @discardableResult
    func startWithValues(_ action: @escaping (Value) -> Void) -> Disposable {
        return start(Signal.Observer(value: action))
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,132评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,802评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,566评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,858评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,867评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,695评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,064评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,705评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,915评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,677评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,796评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,432评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,041评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,992评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,223评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,185评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,535评论 2 343

推荐阅读更多精彩内容