前言
函数式响应式编程框架我们应该也用得比较多了,如ReactiveCocoa
、ReactiveX
系列(RxSwift、RxKotlin、RxJava)
,这些框架内部实现都是基于函数式编程的思想来构建的。还记得前不久面试的时候面试官有问道:“有阅读过ReactiveCocoa的源码吗?有没有看过其中的核心函数bind?你知道这个函数如何实现的吗?
”。在回答这个问题时,如果面试者只是单纯的看过RAC源码,虽能凭自己的印象说出这个方法的大概流程,不过对其中的思想可能也只是一知半解,但如果你充分了解过函数式编程,熟悉Monad
概念,就能知道bind
方法其实就是Monad
概念中的一部分,RAC正是利用Monad
来实现它的Signal
。此时你就可以向面试官开始你的表演了。
如标题所述,在这篇文章中我们将利用函数式编程的思想,去构建一个小型的响应式框架。它具有响应回调的能力,且能将一个个事件数据抽象成管道中流动的流体,我们可以对这些事件数据进行若干的转换,最后再订阅它们。
本文为《函数式编程》系列文章中的第三篇,若大家对函数式编程感兴趣,可以阅读系列的前两篇文章:
原理
函数式响应式的本质是什么
先附上一张流转换思想的概念图:
在日常项目逻辑的构建中,我们总会对一些数据进行转换运算,这里我们将数据的转换过程抽象成一条包裹着流动数据的管道,数据以流的形式在这条管道中流通,当经过转换器时,原始的数据流将会被转换成新的数据流,然后继续流动下去。针对数据的转换运算,我们会使用一些函数/方法,将运算的数据作为实参传入函数中/对运算的对象调用方法,得到转换后的结果。此时整个运算将会同步
运行,转换函数接收旧数据进行转换,成功后返回新的数据。除此之外,你还可以在这个管道中安置多个转换器,数据在通过若干的转换器后便转换成了最终我们所期望的结果值,并从管道中流出。
不过,事实上项目逻辑中也会涉及到许多非同步
进行的操作,如某些较为耗时的操作(数据库操作、网络请求)、基于事件循环(RunLoop)的事件监听处理(屏幕触摸监听、设备传感器监听),这些操作有的会在后台创建新的线程进行处理,当处理完成后将数据馈回到主线程中,有的则是会在整个运行循环中通过对每一次循环周期从事件队列中取得需要处理的事件,派发到相应的Handler中。对于这些操作,它们都具有共同点,那就是:数据返回的过程都是通过回调(Callback)
来实现的。
对于如何将流转换
的思想用于Callback
上,就是函数式响应式所探讨解决的问题。
在前不久我有幸参与了中国2017年Swift大会,会议邀请了RxSwift的作者前来演讲,在演讲中他阐明了RxSwift的本质:
RxSwift just a callback! (RxSwift就是一个回调)
可能这里有人会有疑问:为什么回调不使用一个简单的代理模式或者一个闭包,反而构建起这么复杂且重量级的框架?因为,这些函数式响应式框架要做的事情就是让回调结合流转换的思想,让开发者只专注于数据的转换过程而不必多花精力在回调的设计上,轻松写出简洁优雅的回调过程。
核心思想
流转换的思想为将数据事件抽象成管道中流通的流体,用过转换器转换成新的数据事件
,若加上回调
的实现,我们可以说这条管道是建立在回调上的。这时候,我们就可以理清管道和数据的关系:建立在回调上的管道包裹着数据。换句话说,具有回调能力的管道作为一个Context(上下文)
,包裹着基本的数据值,并且它还拥有某种运算的能力,那就是触发事件、监听回调
,而这种运算不需要我们去花精力放在上面,我们只想专注于数据的转换。
看到上面对函数式响应式的描述,你或许也发现了这跟函数式编程里面一个十分重要的概念高度匹配,那就是Monad(单子)
。是的,函数式响应式的核心其实就是建立在Monad
之上,所以,要实现函数式响应式,我们须构建出一个Monad
,可以把它叫做响应式Monad。
看过ReactiveCocoa
源码的小伙伴可能知道,RACSignal
中具有方法bind
和派生类RACReturnSignal
,它们就是用来实现Monad
中的bind
和return
函数,所以,Signal
就是一个Monad
。不过我们这里需要知道的是,ReactiveCocoa
中的bind
方法并非完全标准的Monad bind
函数,它在参数类型上有所变化,在外表封装多了一层RACSignalBindBlock
,要说最接近Monad bind
的,应该就属RACSignal
中的flattenMap
方法了(RACSignal的flattenMap方法也是基于bind包装)。所以,实现了响应式Monad,你就能免费得到flattenMap
方法。
因为Monad
必定也是一个Functor
,所以当你实现一个响应式Monad后,相应的Functor
中的map
方法你就能很轻易地实现出来了。是的,map
方法并非RACSignal
所特有的,其也是来自于函数式编程中的Functor
。
实现
因为个人热衷于Swift,接下来我将基于Swift语言实现一个简单的函数式响应式框架。
Event
首先我们来实现Event(事件)
,像ReactiveCocoa
、RxSwift
中,事件具有三种类型,分别是:
- next 表示一个数据流元素
- completed 表示数据流已经完成
- error 表示数据流中产生了错误
这个我实现的事件就简单一点,它仅具有next
和error
类型:
enum Event<E> {
case next(E)
case error(Error)
}
Event
中的泛型E
代表其中数据元素的类型。这里需要注意的是,当事件类型为error
时,其关联的错误实例并没有类型限制,这里为了简单演示我没有添加约束错误实例的泛型,大家在后面如果尝试自己去实现的话可以稍作优化,如:
enum Event<E, R> where R: Error {
case next(E)
case error(R)
}
Observer
Observer
要做的事情有两个,分别是发送事件
以及监听事件
。
// MARK: - Protocol - Observer
protocol ObserverType {
associatedtype E
var action: (Event<E>) -> () { get }
init(_ action: @escaping (Event<E>) -> ())
func send(_ event: Event<E>)
}
extension ObserverType {
func send(_ event: Event<E>) {
action(event)
}
func sendNext(_ value: E) {
send(.next(value))
}
func sendError(_ error: Error) {
send(.error(error))
}
}
// MARK: - Class - Observer
final class Observer<Element>: ObserverType {
typealias E = Element
let action: (Event<E>) -> ()
init(_ action: @escaping (Event<E>) -> ()) {
self.action = action
}
}
通过send
方法,Observer
可以发送出事件,而通过实现一个闭包并将其传入到Observer
的构造器中,我们就可以监听到Observer
发出的事件。
Signal
接下来就是重头戏:Signal
(命名是我从ReactiveCocoa
中直接借鉴而来),它就是我们上面所提到的响应式Monad
,整个函数式响应式的核心。
我们先来看看SignalType
协议:
// MARK: - Protocol - Signal
protocol SignalType {
associatedtype E
func subscribe(_ observer: Observer<E>)
}
extension SignalType {
func subscribe(next: ((E) -> ())? = nil,
error: ((Error) -> ())? = nil) {
let observer = Observer<E> { event in
switch event {
case .error(let e):
error?(e)
case .next(let element):
next?(element)
}
}
subscribe(observer)
}
}
协议声明了用于订阅事件的方法subscribe(_:)
,这个方法接收了一个Observer
作为参数,基于此方法我们就可以扩展出专门针对特殊事件类型(next、error)的订阅方法:subscribe(next:error:)
。
接下来就是Signal
的实现:
// MARK: - Class - Signal
final class Signal<Element>: SignalType {
typealias E = Element
private var value: E?
private var observer: Observer<E>?
init(value: E) {
self.value = value
}
init(_ creater: (Observer<E>) -> ()) {
let observer = Observer(action)
creater(observer)
}
func action(_ event: Event<E>) {
observer?.action(event)
}
static func `return`(_ value: E) -> Signal<E> {
return Signal(value: value)
}
func subscribe(_ observer: Observer<E>) {
if let value = value { observer.sendNext(value) }
self.observer = observer
}
static func pipe() -> (Observer<E>, Signal<E>) {
var observer: Observer<E>!
let signal = Signal<E> {
observer = $0
}
return (observer, signal)
}
}
我们可以看到Signal
内部具有一个成员属性observer
,当我们调用subscribe(_:)
方法时就将传入的参数赋予给这个成员。对于另一个成员属性value
,它的作用是为了让Signal
实现Monad return
函数,我在《函数式编程》系列文章的前面已经介绍过,Monad return
函数就是将一个基本的数据包裹在一个Monad
上下文中。所以在Signal
中我定义了类方法return(_:)
,内部调用了针对于value
初始化的Signal
构造器init(value: E)
,将一个基本的数据赋予给了value
成员属性。在subscribe(_:)
方法的实现中,我们首先对value
做非空判断,若此时value
存在,传入的observer
参数将发送关联了value
的next
事件,这样做是为了保证整个Signal
符合Monad
特性。
接着到init(_ creater: (Observer<E>) -> ())
构造方法,这个方法接受一个闭包,闭包里面做的,就是进行某些运算处理逻辑或事件监听,如网络请求、事件监听等。闭包带有一个Observer
类型的参数,当闭包中的运算处理逻辑完成或者接收到事件回调时,就利用这个Observer
发送事件。在这个构造方法实现的内部,我首先将Signal
自己的action(_:)
方法作为参数传入Observer
的构造器从而创建了一个Observer
实例,其中,action(_:)
方法做的事情是:指使成员属性observer
将自己接收到的事件参数转发出去。这里的设计比较巧妙,我们在构造器闭包类型参数creater
中进行处理逻辑或事件监听,若得到结果,将使用闭包中的Observer
参数发送事件,事件将会传递到订阅了这个Signal
的订阅者中,从而触发相关回调。
这里可能有人会有疑惑:为什么需要用两个observer
来传递事件?可以在subscribe(_:)
方法调用的时候再顺便调用creater
闭包,把接收到的订阅者传入即可。其实,我这么做的目的是为了保证creater
的调用跟init(_ creater: (Observer<E>) -> ())
同步进行,因为在Signal
中我提供了pipe
方法。
pipe
方法返回一个二元组,第一项为Observer
,我们可以利用它来发送事件,第二项为Signal
,我们可以通过它来订阅事件,它就像RxSwift
中的Subject
,只不过这里我将事件发送者与订阅者区分开了。这里有一个需要注意的地方:
上面说到,对于我们使用
pipe
函数获取到的Observer
,其内部的action
成员属性来自于Signal
的action(_:)
方法,这个方法引用到了Signal
中的成员属性。由此,我们可以推出此时Observer
对Signal
具有引用的关系,Observer
不释放,Signal
也会一直保留。
接下来就是让Signal
实现Monad
的bind
方法了:
// MARK: - Monad - Signal
extension Signal {
func bind<O>(_ f: @escaping (E) -> Signal<O>) -> Signal<O> {
return Signal<O> { [weak self] observer in
self?.subscribe(next: { element in
f(element).subscribe(observer)
}, error: { error in
observer.sendError(error)
})
}
}
func flatMap<O>(_ f: @escaping (E) -> Signal<O>) -> Signal<O> {
return bind(f)
}
func map<O>(_ f: @escaping (E) -> O) -> Signal<O> {
return bind { element in
return Signal<O>.return(f(element))
}
}
}
bind
方法接受一个函数作为参数,这个函数的类型为(E) -> Signal<O>
,E
泛型为旧Signal
元素中的类型,O
则是新Signal
元素中的类型,这个bind
方法其实跟ReactiveCocoa
的flattenMap
或是RxSwift
中的flatMap
做的事情一样,所以在下面的flatMap
方法的实现中我只是直接地调用bind
方法。很多人俗称这个过程为降维。
在bind
方法的实现中,我们返回一个新的Signal
,为了构造这个Signal
,我们使用初始化方法init(_ creater: (Observer<E>) -> ())
,在creater
闭包中订阅旧的Signal
。倘若旧Signal
的Observer
发出error
事件,则直接把error
事件中关联的Error
实例提取出来,通过creater
闭包中作为参数传入的Observer
包裹起来再传递出去;而若是旧Signal
的Observer
发出next
事件,则先把next
关联的数据元素提取出来,通过调用bind
传进来的函数,获取一个中间层的Signal
,再通过对这个中间层Signal
进行订阅,将事件传递到新的Signal
中。
creater
闭包中我使用了[weak self]
捕获列表来对旧Signal
进行若引用以防止循环引用的发生,为什么这里可能会发生循环引用?上面提到过,Observer
会引用Signal
,而在creater
闭包中旧的Signal
将引用新Signal
的Observer
,从而可以推出旧的Signal
会对新Signal
持引用关系,这里如果不留意的话会造成循环引用。
Monad
中的bind
方法将自动处理上下文。在Signal
中,bind
则帮我们自己处理好事件的订阅、转移、传递,而我们只需要专注于纯数据的转换。
map
方法的实现十分简单,通过在内部调用bind
方法,并将最终数据通过return
包裹进Signal
上下文中,在这里我就不多说了。
以上,我们的响应式Monad
就实现完成了!
以上只是非常简单地实现函数式响应式,目的是为了简单介绍如何利用函数式编程思想去完成响应式的操作,其中并没有考虑有关跨线程调度的问题,大家如果有兴趣的可以自己尝试去进行相关优化。
下面我们来测试使用一下。
简单使用
通过creater闭包构建Signal
let mSignal: Signal<Int> = Signal { observer in
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
observer.sendNext(1)
}
DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
observer.sendNext(2)
}
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
observer.sendNext(3)
}
}
mSignal.map { $0 + 1 }.map { $0 * 3 }.map { "The number is \($0)" }.subscribe(next: { numString in
print(numString)
})
输出:
The number is 6
The number is 9
The number is 12
通过pipe构建Signal
let (mObserver, mSignal) = Signal<Int>.pipe()
mSignal.map { $0 * 3 }.map { $0 + 1 }.map { "The value is \($0)" }.subscribe(next: { value in
print(value)
})
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
mObserver.sendNext(3)
}
DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
mObserver.sendNext(2)
}
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
mObserver.sendNext(1)
}
输出:
The value is 10
The value is 7
The value is 4
扩展
接下来我们对刚刚实现的函数式响应式进行扩展,关联一些平时我们常用到的类。
UIControl
对UIControl
的触发事件进行监听,传统的做法是通过调用addTarget(_:, action:, for:)
方法,传入target以及一个回调函数Selector。很多人比较厌倦这种方法,觉得每次监听事件都需要定义一个事件处理函数,比较麻烦,希望能直接通过闭包回调事件触发。
这里只需简单地封装一下即可满足这种需求:
final class ControlTarget: NSObject {
private let _callback: (UIControl) -> ()
init(control: UIControl, events: UIControlEvents, callback: @escaping (UIControl) -> ()) {
_callback = callback
super.init()
control.addTarget(self, action: #selector(ControlTarget._handle(control:)), for: events)
}
@objc private func _handle(control: UIControl) {
_callback(control)
}
}
fileprivate var targetsKey: UInt8 = 23
extension UIControl {
func on(events: UIControlEvents, callback: @escaping (UIControl) -> ()) {
var targets = objc_getAssociatedObject(self, &targetsKey) as? [UInt: ControlTarget] ?? [:]
targets[events.rawValue] = ControlTarget(control: self, events: events, callback: callback)
objc_setAssociatedObject(self, &targetsKey, targets, .OBJC_ASSOCIATION_RETAIN)
}
}
在这里我间接利用ControlTarget
对象来将UIControl
事件触发传递到闭包中,并通过关联对象
来使得UIControl
保持对ControlTarget
的引用,以防止其被自动释放。经过上面简单的封装后,我们就能很方面地利用闭包监听UIControl
的事件回调:
button.on(events: .touchUpInside) { button in
print("\(button) - TouchUpInside")
}
button.on(events: .touchUpOutside) { button in
print("\(button) - TouchUpOutside")
}
由此,我们可以简单地基于上面的封装来扩展我们的函数式响应式:
extension UIControl {
func trigger(events: UIControlEvents) -> Signal<UIControl> {
return Signal { [weak self] observer in
self?.on(events: events, callback: { control in
observer.sendNext(control)
})
}
}
var tap: Signal<()> {
return trigger(events: .touchUpInside).map { _ in () }
}
}
trigger(events:)
方法传入一个需要进行监听的事件类型,返回一个Signal
,当对应的事件触发时,Signal
中则会发射出事件。而tap
返回的则是针对TouchUpInside
事件触发的Signal
。
使用起来跟RxSwift
或ReactiveCocoa
一样,十分简洁优雅:
button.tap.map { _ in "Tap~" }.subscribe(next: { message in
print(message)
})
上面整个过程的引用关系为: UIControl -> ControlTarget -> _callback -> Observer -> Signal,由此我们知道,只要保持对
UIControl
的引用,那么其所关联的事件监听Signal
则不会被自动释放,可以在整个RunLoop
中持续工作,
NotificationCenter
将函数式响应式适配控制中心,方法跟上面对UIControl
的扩展一样,通过一个中间层NotificationObserver
来做事件的传递转发:
final class NotificationObserver: NSObject {
private unowned let _center: NotificationCenter
private let _callback: (Notification) -> ()
init(center: NotificationCenter, name: Notification.Name, object: Any?, callback: @escaping (Notification) -> ()) {
_center = center
_callback = callback
super.init()
center.addObserver(self, selector: #selector(NotificationObserver._handle(notification:)), name: name, object: object)
}
@objc private func _handle(notification: Notification) {
_callback(notification)
}
deinit {
_center.removeObserver(self)
}
}
fileprivate var observersKey: UInt = 78
extension NotificationCenter {
func callback(_ name: Notification.Name, object: Any?, callback: @escaping (Notification) -> ()) {
var observers = objc_getAssociatedObject(self, &observersKey) as? [String: NotificationObserver] ?? [:]
observers[name.rawValue] = NotificationObserver(center: self, name: name, object: object, callback: callback)
objc_setAssociatedObject(self, &observersKey, observers, .OBJC_ASSOCIATION_RETAIN)
}
func listen(_ name: Notification.Name, object: Any?) -> Signal<Notification> {
// Warning: 注意object可能对返回的Signal进行引用,从而造成循环引用
return Signal { [weak self] observer in
self?.callback(name, object: object, callback: { notification in
observer.sendNext(notification)
})
}
}
}
由此,我们可以基于上面对NotificationCenter
的响应式扩展,来完成对UITextFiled
文字变化的监听:
extension UITextField {
var listen: Signal<String?> {
return NotificationCenter.default.listen(.UITextFieldTextDidChange, object: self).map { $0.object as? UITextField }.map { $0?.text }
}
}
// 使用
textField.listen.map { "Input: \($0 ?? "")" }.subscribe(next: {
print($0)
})
方法调用监听 / 代理调用监听
我们有时候想监听某个对象中指定方法的调用,来实现面向切面编程或者埋点,另外,当函数式响应式被引入后,我们希望它能充当代理的职责,监听代理方法的调用。为此我们可以通过对函数式响应式进行扩展来支持上面的需求。不过要做这件事情并不简单,这里面要涉及多种Runtime
特性,如方法交换、方法动态派发、isa交换等Runtime黑科技,要实践它可能需要投入较大精力,花费较长时间。因本人能力与时间有限,没有去编写相应的代码,若大家有兴趣可以尝试一下,而后期如果我做了相关的努力,也会公布出来。
为什么没有Disposable
若我们接触过RxSwift
、ReactiveSwift
,我们会发现每次我们订阅完一个Observable
或者Signal
后,会得到订阅方法返回的一个专门用于回收资源的实例,比如RxSwift
中的Disposable
,我们可以通过在某个时机调用它的dispose
方法,或者将其放入一个DisposeBag
中来使得资源在最后得到充分的回收。
再来看回我们在上面实现的响应式框架,因为这个框架的实现非常简单,并不会在订阅后返回一个专门提供给我们释放资源的实例,所以我们在使用它的时候要密切留意资源的存活与释放问题。这里举一个例子:
在上面,我们对函数式响应式进行针对UIControl
的适配时,是通过一个中间层ControlTarget
来完成的,为了保持这个ControlTarget
实例的存活,使得它不会被自动释放,我们先用一个集合来包裹住它,并将这个集合设置为目标UIControl
的关联对象。此时我们可以将这个中间层ControlTarget
看做是这个事件流管道中的一个资源,这个资源的销毁是由目标UIControl
来决定的。
对于RxSwift
来说,它实现对UIControl
的扩展原理跟我们写的差不多,也是通过一个中间层来完成,但是对于中间层资源的保活与销毁,它采用的是另一种方法,我们可以看下这段RxSwift
的源码(为了简单,删掉了一些无关的代码):
class RxTarget {
private var retainSelf: RxTarget?
init() {
self.retainSelf = self
}
func dispose() {
self.retainSelf = nil
}
}
这个类型的保活方式十分巧妙,它利用自己对自己的循环引用来使得维持生存,而当调用dispose
方法时,它将解开对自己的循环引用,从而将自己销毁。
通过上面两个例子的对比,我们可以知道,对于我们自己实现的响应式框架,我们需要把某些精力放在对资源的保活与释放上,而像RxSwift
,它则提供一个统一的资源管理方式,相比起来更加清晰优雅,大家有兴趣可以实现一下这种方式。
相关链接
Github - ReactiveObjc
Github - ReactiveCocoa
Github - RxSwift
本文纯属个人见解,若大家发现文章部分有误,欢迎在评论区提出。