前面我们学过如何创建信号并观察它,以及通过分析源码了解了底层现实原理,那这篇文章总结下SignalProducer的用法及原理。
SignalProducer封装了延迟的和可重复的任务,这些任务会在启动时生成信号。
那它怎么用呢?
还是看个例子吧:
每隔5秒打印一条时间信息。
第一步,创建SignalProducer
//Creating a SignalProducer
let signalProducer: SignalProducer<Int, NoError> =
SignalProducer { (observer, lifetime) in
for i in 0..<10 {
DispatchQueue.main.asyncAfter(deadline: .now() + 5.0 * Double(i))
{
observer.send(value: i)
if i == 9 { //Mark completion on 9th iteration
observer.sendCompleted()
}
}
}
}
在这里,使用一个闭包初始化SignalProducer,该闭包会在SignalProducer调用start方法时执行。此闭包有两个接收值:
- observer用来发送值。
- lifetime为我们提供了一个机会,如果停止观察,我们可以取消正在进行的工作。
第二步,创建观察者
//Creating an observer
let signalObserver = Signal<Int, NoError>.Observer (
value: { value in
print("Time elapsed = \(value)")
}, completed: {
print("completed")
}, interrupted: {
print("interrupted")
})
第三步,启动观察,执行block
//Start a SignalProducer
let disposable = signalProducer.start(signalObserver)
第四步,假设我们想在10秒后中断SignalProducer:
//Dispose after 10 seconds
DispatchQueue.main.asyncAfter(deadline: .now() + 10.0) {
disposable.dispose()
}
但是,根据我们当前的实现,即使观察者在10秒后被释放,SignalProducer仍然会在50秒内继续发出整数值,此时lifetime就可以利用起来了。
let signalProducer: SignalProducer<Int, NoError> =
SignalProducer { (observer, lifetime) in
for i in 0..<10 {
DispatchQueue.main.asyncAfter(deadline: .now() + 5.0 * Double(i))
{
guard !lifetime.hasEnded else {
observer.sendInterrupted()
return
}
observer.send(value: i)
if i == 9 {
observer.sendCompleted()
}
}
}
}
通过判断lifetime的hasEnded属性值,如果为true,就发送一个中断interruption事件,SignalProducer就会终止。
Signal vs SignalProducer
为了理解Signal和SignalProducer之间的区别,可以以直播跟录播为例来类比。
Signal就像直播,它是一个连续的视频和音频流。在给定的时间点,每个观察者都看到相同的帧序列。观察者不能影响整个播放过程。
而SignalProducer就像录播。不同的观察者,在给定的时间点,可以看到不相同的帧序列。
因此,Signal通常用于表示已经“在进行中”的事件流,例如通知、用户输入等。
SignalProducer用于表示需要启动的操作或任务。例如网络请求。
SignalProducer是ReactiveSwift中冷信号的实现,冷信号需要一个唤醒操作,然后才能发送事件,而这个唤醒操作就是订阅它,因为订阅后才发送事件。
热信号Signal相当于现场直播,晚来了前面的节目就没法看,冷信号相当于录播,早晚都能看。
SignalProducer是如何保存Observer对象的呢?
跟Signal 一样其内部有个SignalProducerCore,我们主要来分析SignalProducerCore源码:
//SignalProducerCore
internal class SignalProducerCore {
struct Instance {
let signal: Signal
let observerDidSetup: () -> Void
let interruptHandle: Disposable
}
func makeInstance() -> Instance {
fatalError()
}
func start(_ generator: (_ upstreamInterruptHandle: Disposable) -> Signal.Observer) -> Disposable {
fatalError()
}
......
}
SignalProducerCore 内部有个Instance,它的作用是:
- 持有一个热信号Signal,用于保存订阅者添加的Observer对象
- 持有一个() -> Void闭包,用于执行回调(对子类SignalCore来说 这个闭包的作用则是向上面的Signal.core.state.Observes数组发送Event)
还有两个抽象方法,供其子类去具体实现。SignalProducerCore有三个子类SignalCore
,GeneratorCore
和TransformerCore
,其中SignalCore
和GeneratorCore
用于普通操作,而TransformerCore
则是在map, take, filterMap...等操作才会用上。
来看看SignalCore的源码:
//SignalCore
private final class SignalCore: SignalProducerCore {
private let _make: () -> Instance
//这个action会由SignalProducer传入
init(_ action: @escaping () -> Instance) {
self._make = action //初始化就是给闭包make赋值
}
//外部执行SignalProducer的start函数内部实现
override func start(_ generator: (Disposable) -> Signal.Observer) -> Disposable {
let instance = makeInstance()// 1. 创建一个热信号signal
instance.signal.observe(generator(instance.interruptHandle)) 2. 通过参数generator创建一个观察者并订阅上面创建的signal
instance.observerDidSetup()3. 订阅signal完成,执行回调
return instance.interruptHandle
}
override func makeInstance() -> Instance {
return _make()
}
}
可以看出通过start函数,内部创建了一个Signal,并且创建了一个观察者订阅了Signal。
Observer中封装的逻辑是如何被执行的:
//SignalProducer.swift
public init(_ startHandler: @escaping (Signal.Observer, Lifetime) -> Void) {
self.init(SignalCore { //action闭包
let disposable = CompositeDisposable()
let (signal, innerObserver) = Signal.pipe(disposable: disposable)
let observerDidSetup = {
startHandler(innerObserver, Lifetime(disposable))
}
let interruptHandle = AnyDisposable(observer.sendInterrupted)
return SignalProducerCore .Instance(signal: signal,
observerDidSetup: observerDidSetup,
interruptHandle: interruptHandle)
})
}
可以看出,创建Producer时,内部初始化一个SignalProducerCore,创建的signal、observerDidSetup、interruptHandle也给了SignalProducerCore的instance。
observerDidSetup内部封装了startHandler的执行,创建的innerObserver传入到startHandler。
而在上面SignalCore的start函数内部实现中,instance调用了observerDidSetup,也就执行了startHandler闭包,所以在startHandler中如果我们用Observer发送值实际上是用传入到startHandler中的innerObserver来发送值。
另外,在外界调用start函数时,会创建一个观察者outObserver并订阅instance的signal。
所以每调用一次start,startHandler就会执行一次。
举个栗子,熟悉下用法
- 创建SignalProducer
let producer = SignalProducer<Int, NoError> { (innerObserver, lifetime) in
lifetime.observeEnded({
print("free sth")
})
innerObserver.send(value: 1)
innerObserver.send(value: 2)
innerObserver.sendCompleted()
}
- 创建一个观察者封装事件处理逻辑
let outerObserver = Signal<Int, NoError>.Observer(value: { (value) in
print("did received value: \(value)")
})
- 添加观察者到SignalProducer
producer.start(outerObserver)
最终输出:
did received value: 1
did received value: 2
free sth
当然使用SignalProducer.startXXX可以省去第二步,还可以如下写法:
typealias Producer<T> = SignalProducer<T, NoError>
let producer = Producer<Int> { (innerObserver, _) in
innerObserver.send(value: 1)
innerObserver.send(value: 2)
innerObserver.sendCompleted()
}
producer.startWithValues { (value) in
print("did received value: \(value)")
}
producer.startWithFailed(action: )
producer.startWithResult(action: )
producer.startWithXXX...
使用SignalProducer发起网络请求的坑
第一步,网络请求方法:
func fetchData(completionHandler: (Int, Error?) -> ()) {
print("发起网络请求")
completionHandler(1, nil)
}
第二步,将网络请求封装到SignalProducer中
let producer = Producer {[unowned self] (innerObserver, _) in
self.fetchData(completionHandler: { (data, error) in
innerObserver.send(value: data)
innerObserver.sendCompleted()
})
}
第三步,触发网络请求
producer.startWithValues { (value) in
print("did received value: (value)")
}
producer.startWithValues { (value) in
print("did received value: (value)")
}
咻...请看最终输出:
发起网络请求
did received value: 1
发起网络请求
did received value: 1
看到了吗,发生两次网络请求,我们当然只是想发送一次请求,在订阅时多次操作请求到的数据而已,而SignalProducer会在每次被订阅就会执行一次初始化时保存的闭包,所以会发生多次网络请求。
为了解决这个问题,可以使用Signal:
typealias NSignal<T> = Signal<T, NoError>
let signalTuple = NSignal.pipe()
signalTuple.output.observeValues { (value) in
print("did received value: (value)")
}
signalTuple.output.observeValues { (value) in
print("did received value: (value)")
}
self.fetchData { (data, error) in
signalTuple.input.send(value: data)
signalTuple.input.sendCompleted()
}
输出: 发起网络请求
did received value: 1
did received value: 1
最后,自定义Error时需要注意:
struct APIError: Swift.Error {
let code: Int
var reason = ""
}
由于默认的SignalProducer是没有startWithValues函数的,ReactiveSwift会在Extension里给它加上startWithValues函数,但是这只对NoError有效,所以在自定义Error时,要加上以下代码才有效:
extension SignalProducer where Error == APIError {
@discardableResult
func startWithValues(_ action: @escaping (Value) -> Void) -> Disposable {
return start(Signal.Observer(value: action))
}
}