前面的内容中已经学习了如何创建, 过滤, 变换 observable 的事件序列, 尤其是需要留意 flatMap 的强大处理能力.
本章则是看里面的一些组合操作符, 它们用于将异步序列中的数据进行组合.
1 概述
本章没有新的工程学习, 打开一个空项目来实现即可.
RxSwift 的主旋律就是处理和管理异步序列. 不过经常都会在这些序列的洪流中迷失方向. 故需要掌握组合操作符来化解这样的困境.
2 拼接
在现实中的一个典型需求是保证一个 observer 开始观察的时候都能收到一个初始值. 而经常都需要 "当前状态" 这样的初始值, 而初始值之前又想要增加一些前驱状态, 比如下面的序列:
原始序列:
2---3---4---comp
希望得到的序列:
1---2---3---4---comp
这样的操作可以使用如下的代码实现, 即利用 startWith
操作符:
exampleOf("startWith") {
let numbers = Observable.of(2, 3, 4)
let obv = numbers.startWith(1)
obv.subscribe(onNext: { (elem) in
print(elem)
}, onError: { (error) in
print(error)
}, onCompleted: {
print("complete")
}, onDisposed: {
print("disposed")
}).addDisposableTo(disBag)
}
startWith
操作符的作用是在 Observable 事件序列最前面添加指定的前驱事件, 当然添加的事件数据都必须是和之前序列中事件元素类型一致.
但是不要被这个操作符所处的位置迷惑了! 虽然它是在处理链的下一个节点, 但是将元素添加到的是之前序列的最前面!
startWith
操作符的用途也十分广泛. 比如它可以保证任何序列都可以人为添加初始化时候的元素.
实际上 startWith
操作符是 concat
操作符家族中的一员.
Observable 的类方法 concat(_:)
可以将两个 observable 序列组合为一个:
exampleOf("concat") {
let nums1 = Observable.of(1, 2, 3)
let nums2 = Observable.of(4, 5, 6)
let concatedSeq = Observable.concat([nums1, nums2])
concatedSeq.subscribe(onNext: { (num) in
print(num)
}, onError: { (error) in
print(error)
}, onCompleted: {
print("completed")
}, onDisposed: {
print("disposed")
}).addDisposableTo(disBag)
}
这个类方法的参数是一个有序序列(比如数组, 如上所示). 如果传入的参数中的任何一个点上遇到 error, 则会直接将 error 及其之前的输出出来.
另外还有一个对象方法 concat
用于两个序列之间的组合:
exampleOf("concat(obj method)") {
let nums1 = Observable.of(1, 2, 3)
let nums2 = Observable.of(4, 5, 6)
let obv = nums1.concat(nums2)
obv.subscribe(onNext: { (num) in
print(num)
}, onError: { (error) in
print(error)
}, onCompleted: {
print("completed")
}, onDisposed: {
print("disposed")
}).addDisposableTo(disBag)
}
这个方法的输出实际上也和之前的类方法输出一致, 只是这个是链接两个 observable. 且 concat
对象方法会等待作为主调对象的序列 complete, 然后去观察作为参数的 observable. 最后的输出就是两个序列的组合.
而上面的 startWith
操作符可以替换为 concat
来实现同样的功能:
exampleOf("replace startWith with concat") {
let nums = Observable.of(2, 3, 4)
Observable
.just(1)
.concat(nums)
.subscribe(onNext: { (num) in
print(num)
}).addDisposableTo(disBag)
}
实际效果和之前的 startWith 一致. 需要注意的是, 这个方法的隐含条件是需要源序列产生了 complete 后才会拼接.
当然, 需要注意的是拼接的序列中元素类型必须一致, 如果不一致则会编译错误. 不过学过了变换操作符, 可以按需进行变换后再拼接.
3 合并
RxSwift 中提供了多种合并序列的方式. 最简单的就是 merge. 比如下面的两个序列, 如果要按它们的元素的发射时间的不同来合并:
序列1: 1------2------3--comp
序列2: --4--5----6------comp
源序列: 序列1---序列2---comp
则合并后的序列为:
1---4---5---2---6---3---comp
这样的结果可以通过 merge
操作符来实现, 不过过程需要注意:
-
首先构造一个源序列, 该序列为两个序列的组合:
let obv1 = PublishSubject<String>() let obv2 = PublishSubject<String>() let source = Observable.of(obv1.asObservable(), obv2.asObservable())
-
在源序列上使用
merge
操作符, 得到目标序列:let target = source.merge()
-
然后开始观察目标序列即可.
target.subscribe(onNext: { (str) in print(str) }).addDisposableTo(disBag)
下面就按照类似的方式来在两个序列上添加next事件, 看输出的情况是否和之前的猜测相同:
obv1.onNext("1")
obv2.onNext("4")
obv2.onNext("5")
obv1.onNext("2")
obv2.onNext("6")
obv1.onNext("3")
打印结果是 "1 4 5 2 6 3" 证明是合并成功了的.
merge
操作符相当于是去观察源序列中的所有子序列, 每当观察到 next , 就会把它放到新的目标序列上. 而子序列的顺序是不重要的, 因为是看子序列中的元素的发射时间来决定最终的元素排列顺序.
下面是 RxSwift 中定义的 merge 后的序列的结束条件:
- merge 等待源序列中的所有子序列都 complete 后才会 complete, 或者是 源序列 complete 时才会 complete.
- 子序列在源序列中的出现顺序没有要求, 因为是看每个子序列的元素发射时间顺序.
- 但假如任何一个子序列发射 error, 则目标序列会发射 error 并立即结束.
由于 merge
方法不会限制子序列的个数, 如果想要进行限制, 则使用 merge(maxConcurrent:)
操作符, 这个方法会在源序列中只选择前面若干个序列来观察. 超过数量的子序列会被放到一个队列中, 当之前的子序列有 complete 的之后, 再开始观察队列中放置的子序列.
在实际编程中经常用到这个做了数量限制的 merge 操作符, 尤其是在资源受限的情况下, 比如同时进行若干个网络请求, 但实际同时发送的请求是有限制的情况下.
4 组合元素
在 RxSwift 中还有一个非常重要的操作符类型: combineLatest
. 它们的作用也是将若干个序列进行合并.
每次子序列发射一个值, 都会调用一次自定义 closure, 这个 closure 里就可以对每个子序列的最新元素进行处理, 从而每次获取到的都是所有子序列的最新的事件数据的组合.
而这类操作符的应用十分广泛, 比如想同时观察若干个 textField 中的用户输入并进行处理.
下面就来看如何使用:
-
先创建若干个子序列, 这里以两个为例:
let obv1 = PublishSubject<String>() let obv2 = PublishSubject<String>()
-
创建一个源序列, 用于包含这两个子序列:
let target = Observable.combineLatest(obv1, obv2, resultSelector: { elem1, elem2 in return "\(elem1) \(elem2)" })
-
然后观察该目标序列:
target.subscribe(onNext: { res in print(res) }).addDisposableTo(disBag)
-
下面就可以来验证输出是否是最新的结果的组合:
// 经过下面两条后, 打印 hello world obv1.onNext("hello") obv2.onNext("world") // 经过下面这条后, 打印 say world obv1.onNext("say")
可以发现输出和预期一致.
而在实际使用的时候, 就可以在自定义块中对数据进行各种想要的组合, 然后再在观察中去对组合数据进行各种不同的操作.
上面例子中需要注意的内容:
-
在组合数据的时候可以组合成任何类型的数据, 子序列的数据类型不是必须一致, 且返回的类型也可以是自定义的, 比如下面的代码:
let target = Observable.combineLatest(obv1, obv2, resultSelector: { elem1, elem2 in return "\(elem1) \(elem2)".characters.count })
则组合后的数据是每个单词的字符个数.
目标序列发射数据的前提条件是每个子序列都发射过一次数据, 即至少每个子序列都拥有当前 "最新数据". 这里需要额外注意, 因为如果某个子序列没有任何数据的话, 则目标序列是不会发射任何数据的, 此时也不会调用自定义的 closure. 而每次满足条件后, 任何一个子序列有新的值, 则都会调用自定义 closure, 并将组合数据发射出去.
正是因为在自定义 closure 中, 可以返回任意类型的组合数据, 故这个方法有很多作用. 实际实现中经常在自定义 closure 中返回一个 tuple. 然后将它传到下一级去进行过滤操作:
let target = Observable
.combineLatest(obv1, obv2, resultSelector: { ($0, $1) })
.filter({ !$0.0.isEmpty && !$0.1.isEmpty})
代码中将两个序列的最新的值在自定义 closure 中组合为一个 tuple 并传给下一级的 filter, 在 filter 中过滤掉有任意一个是空的情况.
上面的代码还可以用尾随闭包的方式书写, 这样就更加好看了:
// 下面将两个不同类型的子序列元素组合成 tuple, 然后传到下一级过滤
let target = Observable
.combineLatest(obv1, obv2) { ( $1) }
.filter { !$0.0.isEmpty && $0> 0 }
下面就来看看实际使用的时候的一个例子, 这个例子组合了用户选择的日期呈现方式和当前的日期数据:
exampleOf("组合用户选择和数据") {
let choice Observable<DateFormatter.Style>.(.short, .long)
let date = Observable.of(Date())
Observable
.combineLatest(choice, date) (format, when) -> String in
let formatter = DateFormatt()
formatter.dateStyle = format
return formatter.stri(from: when)
}.subscribe(onNext: (dateString) in
print(dateString)
})
.addDisposableTo(disBag)
}
通过上面的例子可以看出, 每次用户选择更新, 或者是时间更新的时候, 就会对最新数据进行组合, 而后就可以在下一级进行操作, 比如对组合后获取到的对应日期字符串进行显示或进行处理.(这里只是演示, 实际的选择需要通过用户操作获取, 而日期需要更新.)
最后需要注意的是: combineLatest 只有在所有的子序列都 complete 后, 目标序列才会 complete. 而如果有的已经结束, 而又有的没有结束, 则还是会将结束的子序列的最后一个值传入进行组合.
组合操作符中还有一类 zip
, 它里面也有若干变体.
先来看一个例子.
example(of: "zip") {
enum Weather {
case cloudy
case sunny }
let left: Observable<Weather> = Observable.of(.sunny, .cloudy, .cloudy,
.sunny)
let right = Observable.of("Lisbon", "Copenhagen", "London", "Madrid",
"Vienna")
let observable = Observable.zip(left, right) { weather, city in
return "It's \(weather) in \(city)"
}
observable.subscribe(onNext: { value in
print(value)
})
}
输出为:
--- Example of: zip ---
It's sunny in Lisbon
It's cloudy in Copenhagen
It's cloudy in London
It's sunny in Madrid
可以看到上面的 zip(_:_:resultSelector:)
作用是:
- 观察每一个子序列
- 等待每一个发射新值
- 遇到新值后调用自定义的 closure
只是上面的 5 个城市对应 4 个天气, 输出只有四个. 因为只要一个子序列发送了 complete, 则目标序列就结束了.
5 触发器
在实际 app 中, 通常都会有若干个输入. 在这样的场景中, 通常都会同时从多个输入源(即 observable)上观察到输入, 而这些输入可以分为两类, 一类用于触发代码内的操作, 一类用于提供数据. 在 RxSwift 中提供了处理这些输入的操作符.
首先来看 withLatestFrom(_:)
操作符, 这个操作符在处理 UI 操作的时候十分有用.
下面来看看例子.
如下代码演示如何使用 withLatestFrom(_:)
操作符:
example(of: "withLatestFrom") {
// 1
let button = PublishSubject<Void>()
let textField = PublishSubject<String>()
// 2
let observable = button.withLatestFrom(textField)
let disposable = observable.subscribe(onNext: { value in
print(value)
})
// 3
textField.onNext("Par")
textField.onNext("Pari")
textField.onNext("Paris")
button.onNext()
button.onNext()
}
上面的代码就在演示一个按钮和一个输入框, 末尾按钮点击两次是有意为之.
输入如下:
Paris
Paris
上面的代码的工作流程是:
- 创建两个 Subject 来模拟按钮和输入框.
- 当按钮发射一个值, 忽略按钮的值, 而去发射输入框的最新的值.
- 通过模拟相邻的两次按钮点击来模拟两次输入框输入.
可以看出, 使用这个操作符的目的是在发生某个事件的时, 引起发射另外的某个特定值.
和 withLatestFrom(_:)
操作符类似的还有一个 sample
操作符.
要理解它的作用, 首先来看代码:
exampleOf("sample") {
let btn = PublishSubject<Bool>()
let textFieldText PublishSubject<String>()
// 这个操作符的作用是: 忽略按钮发射的值,次按钮发射值以后, 就去获取输入框的最新的值
let target = textFieldText.samp(btn)
target.subscribe(onNext: { (resultin
print(result)
}, onError: nil, onCompleted: nilonDisposed: nil)
.addDisposableTo(disBag)
textFieldText.onNext("pa")
textFieldText.onNext("par")
textFieldText.onNext("pari")
btn.onNext(true)
btn.onNext(true)
}
sample 的使用方式需要注意看. 上面代码的输出只有一个 "pari". 因为尽管模拟点击了两次按钮, 但由于点击两次中间 textField 的 text 没有改变, 故只会发射一次, 另外一次点击引起的 sample 会被忽略.
这些等待触发操作符在 UI 编程的时候十分有用.
不过有时触发器的触发条件是一系列的 observable, 或者你想等有了两个 observable 后只取其中一个? 这个是什么意思? 需要继续往下看才知道!
6 切换
在 RxSwift 中有两个广义的 "开关" 操作符: amb
和 switchLatest
. 它们都允许通过在源序列和组合序列之间切换来生成新的序列. 这样的话, 就允许你在运行时决定哪个序列上的事件能够被观察到.
下面的代码演示 amb 的作用:
example(of: "amb") {
let left = PublishSubject<String>()
let right = PublishSubject<String>()
// 1
let observable = left.amb(right)
let disposable = observable.subscribe(onNext: { value in
print(value)
})
// 2
left.onNext("Lisbon")
right.onNext("Copenhagen")
left.onNext("London")
left.onNext("Madrid")
right.onNext("Vienna")
disposable.dispose()
}
上面的代码的输出只有 left 的事件数据, 为什么呢? 下面再说, 先来看上面代码的功能:
- 创建一个 observable 来解决左和右的不匹配情况
- 两个序列都开始发射数据
amb 操作符观察 left 和 right 序列, 它会等待其中任何一个开始发射数据, 谁先发射数据, 就放弃观察另外一个. 所以结果序列中始终都是首先观察到发射数据的那个序列所发射的数据.
同时, 结果和方法调用时谁作为参数是无关的!
amb 操作符的价值经常会被忽略, 尤其是在一些特殊的使用场景中. 比如同时尝试连接多个服务器, 并且选择其中最先响应的一个.
另外一个使用更加普遍的操作符是 switchLatest 操作符:
要想明白它的功能, 首先也来看代码:
exampleOf("switch Latest") {
let pub1 = PublishSubject<String>()
let pub2 = PublishSubject<String>()
let pub3 = PublishSubject<String>()
let source PublishSubject<Observable<String>>()
let target = source.switchLatest()
target.subscribe(onNext: { (elem) in
print(elem)
}).addDisposableTo(disBag
source.onNext(pub1)
pub1.onNext("1-1")
pub2.onNext("1-2")
pub3.onNext("1-3"
source.onNext(pub2)
pub1.onNext("2-1")
pub2.onNext("2-2")
pub3.onNext("2-3"
source.onNext(pub3)
pub1.onNext("3-1")
pub2.onNext("3-2")
pub3.onNext("3-3")
}
而输出是由 source 中当前输出来决定的.
7 组合同一个序列中的元素
首先来看 reduce(_:accumulator:)
操作符, 它和 swift 中的 reduce
操作符类似.
还是先来看一个例子:
exampleOf("reduce") {
let source = Observable.of(1, 3, 5, 7, 9)
// let obv = source.reduce(0, accumulator: +)
let obv = source.reduce(0, accumulator: { (base, otherNum) -> Int in
return base + otherNum
})
obv.subscribe(onNext: { (num) in
print(num)
}, onError: { (err) in
print(err)
}, onCompleted: {
print("complete")
}, onDisposed: {
print("disposed")
}).addDisposableTo(disBag)
}
上面代码中:
- 首先创建一个 observable.
- 在创建的 observable 上使用 reduce 操作符, 其中第一个参数是累积器的初始值(注意是累积, 不一定是累加), 第二个 closure 用于指定之后的值应该如何计算.
- 当源 observable 每发射一个值, 就会被送入累积器, 当源发射 complete 时, reduce 操作符也会将结果发射出来.然后再发送一个 complete.
- 最终的结果是将序列中所有 next 值组合为一个值.
(在上面代码中可以使用特殊语法允许直接使用一个 "+" 号, 这个是什么语法? 需要去找找... 用这个语法太方便了!)
需要注意, reduce 只有在源 observable 发射 complete 的时候才会发射最终结果. 如果将它用在永远不会发射 complete 的序列上, 则什么也不会发射.
和 reduce(_:accumulator:)
操作符类似的有一个 scan(_:accumulator:)
操作符, 这个操作符的作用是对序列中的每一个 next 事件数据进行自定义处理.
来看如下代码:
exampleOf("scan") {
let source = Observable.of(1, 3, 5, 7, 9)
// 也可以使用语法: let obv = source.scan(0, accumulator: +)
// 只是下面这样完整的语法更方便理解.
let obv = source.scan(0, accumulator: { (base, nextValue) -> Int in
return base + nextValue
})
obv.subscribe(onNext: { (num) in
print(num)
}, onError: { (err) in
print(err)
}, onCompleted: {
print("complete")
}, onDisposed: {
print("disposed")
}).addDisposableTo(disBag)
}
上面代码中只是把 reduce 换成了 scan:
- 首先创建一个 observable.
- 在创建的 observable 上使用 scan 操作符, 其中第一个参数是累积器的初始值(注意是累积, 不一定是累加), 第二个 closure 用于指定之后的值应该如何计算.
- 当源 observable 每发射一个值, 就会被送入累积器, 这里和 reduce 的区别是, reduce 只有当源发射 complete 的时候才会发射最终结果, 而 scan 是源每发射一个 next, 它就会把该 next 数据送去累积, 并在新的 observable 序列上发射出去. 当源 observable 发射 complete 时, 它同样发射一个 complete.
- 最终的结果是一个新的 observable, 序列中的每个值就是每次累积后的值.
相比而言, scan 的应用范围十分广泛, 可以用它来实时计算总和, 统计结果, 计算状态等. scan 中特别适合进行状态转变操作, 并且不用再去定义一个外部变量记录状态. 这样的用法在第 20 章有很多.
九章完. 十章略先. 讲的是实操.