A simple introduction RxSwift essay but absolutely in detail, and practical
🇨🇳中文介绍
Requirements
- iOS 8.0+
- Xcode 8+
- Swift 3.0
Observable
Observable<Element>
是一个观察者模式中被观察的对象,相当于一个事件序列,会向订阅者发送新产生的事件信息。
Producer
Sequence
Sequence
把一系列元素转换为事件序列
let sequenceOfElements = Observable.of(1,2,3,4)
_ = sequenceOfElements.subscribe{
event in
print(event)
}
运行结果如下:
AnonymousObservable
AnonymousObservable
继承自Producer,Producer实现了线程调度功能,可以安排线程来执行run方法,因此AnonymousObservable是可以运行在指定线程中Observable。
let generated = Observable.generate(
initialState: 0, condition: {$0<20}, iterate: {$0+4}
)
_ = generated.subscribe{
print($0)
}
运行结果如下:
Error
Error
,顾名思义,是做错误处理的,创建一个不发送任何 item 的 Observable。
let error = NSError(domain:"Test",code:-1,userInfo:nil)
let erroredSequence = Observable<Any>.error(error)
_ = erroredSequence.subscribe{
print($0)
}
运行结果如下:
Deferred
deferred
会等到有订阅者的时候再通过工厂方法创建Observable
对象,每个订阅者订阅的对象都是内容相同而完全独立的序列。
let deferredSequence: Observable<Int> = Observable.deferred {
print("creating")
return Observable.create { observer in
print("emmiting")
observer.onNext(0)
observer.onNext(1)
observer.onNext(2)
return Disposables.create()
}
}
_ = deferredSequence
.subscribe { event in
print(event)
}
_ = deferredSequence
.subscribe { event in
print(event)
}
运行结果如下:
Empty
empty
创建一个空的序列。它仅仅发送.Completed
消息
let emptySequence = Observable<Int>.empty()
_ = emptySequence.subscribe{
event in
print(event)
}
运行结果如下:
Never
never
创建一个序列,该序列永远不会发送消息,.Complete
消息也不会发送
let neverSequence = Observable<Int>.never()
_ = neverSequence.subscribe{
_ in
print("这句话永远都不会被打印,那干嘛要写呢?")
}
什么都不会打印出来。
Just
just
代表只包含一个元素的序列。它将向订阅者发送两个消息,第一个消息是其中元素的值,另一个是.Completed
。
let singleElementSequence = Observable.just("iOS")
_ = singleElementSequence.subscribe{
print($0)
}
运行结果如下:
PublishSubject
PublishSubject
会发送订阅者从订阅之后的事件序列。
let subject = PublishSubject<Int>()
_ = subject.subscribe{
print($0)
}
subject.onNext(1)
subject.onNext(2)
subject.onNext(3)
subject.onNext(4)
运行结果如下:
ReplaySubject
ReplaySubject
在新的订阅对象订阅的时候会补发所有已经发送过的数据队列,bufferSize
是缓存区的大小,决定了补发队列的最大值。如果bufferSize
是1,那么新的订阅着出现的时候就会补发上一个事件,如果是2,则补两个,以此类推。
let subject1 = ReplaySubject<Int>.create(bufferSize: 2)
_ = subject1.subscribe{ event in
print("1->\(event)")
}
subject1.onNext(1)
subject1.onNext(2)
_ = subject1.subscribe{ event in
print("2->\(event)")
}
subject1.onNext(3)
subject1.onNext(4)
运行结果如下:
BehaviorSubject
BehaviorSubject
在新的订阅对象订阅的时候会发送最近发送的事件,如果没有则发送一个默认值。
let subject = BehaviorSubject(value: "z")
_ = subject.subscribe{
event in
print("1 -> \(event)")
}
subject.onNext("a")
subject.onNext("b")
_ = subject.subscribe{
event in
print("2 -> \(event)")
}
subject.onNext("c")
subject.onCompleted()
运行结果如下:
Variable
variable
是基于BehaviorSubject
的一层封装,它的优势是:不会被显式终结。即:不会收到.Complete
和.Error
这类的终结事件,它会主动在析构的时候发送.Complete
。
let variable = Variable("z")
_ = variable.asObservable().subscribe{
event in
print("1 -> \(event)")
}
variable.value = "a"
variable.value = "b"
_ = variable.asObservable().subscribe{
event in
print("2 -> \(event)")
}
variable.value = "c"
运行结果如下:
Transforming Observables
Transforming Observables
对序列做一些转换。
Map
map
就是对每个元素都用函数做一次转换,挨个映射一遍。
let originalSequence = Observable.of(1,2,3)
_ = originalSequence.map{
number in
number * 2
}.subscribe{
print($0)
}
运行结果如下:
FlatMap
flatMap
将每个Observable
发射的数据变换为Observable
的集合,然后将其降维排列成一个Observable
let sequenceInt = Observable.of(1,2,3)
let sequenceString = Observable.of("A","B","C","D","E","F","iOS")
_ = sequenceInt.flatMap{
(event:Int) -> Observable<String> in
print("From sequentInt \(event)")
return sequenceString
}.subscribe{
print($0)
}
运行结果如下:
Scan
scan
对Observable
发射的每一项数据应用一个函数,然后按顺序依次发射一个值。
let sequenceToSum = Observable.of(0,1,2,3,4,5)
_ = sequenceToSum.scan(0){
acum ,elem in
acum + elem
}.subscribe{
print($0)
}
运行结果如下:
Filtering Observables
Filetering Observables
对序列进行过滤
Filter
filter
只会让符合条件的元素通过
let subscription = Observable.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
.filter{
$0 % 2 == 0
}.subscribe{
print($0)
}
运行结果如下:
DistinctUntilChanged
distinctUntilChanged
会废弃掉重复的事件
_ = Observable.of(1,1,1,1,1,1,1,1,1,2,2,3,3,4,4,5,5,5,6,6,6,6,6)
.distinctUntilChanged()
.subscribe{
print($0)
}
运行结果如下:
Take
take
只获取序列中的前n个事件,在满足数量之后会自动.Completed
。
_ = Observable.of(1,5,6,7,2,4,6,8,41,2)
.take(4)
.subscribe{
print($0)
}
运行结果如下:
Combination operators
Combination operators
是关于序列的运算,可以将多个序列源进行组合,拼装成一个新的事件序列。
StartWith
startWith
会在队列开始之前插入一个事件元素
_ = Observable.of(1,2,3)
.startWith(0)
.subscribe{
print($0)
}
运行结果如下:
CombineLatest
CombineLatest
如果存在两件事件队列,需要同时监听,那么每当有新的事件发生的时候,combineLatest
会将每个队列的最新一个元素进行合并。
let observer1 = PublishSubject<String>()
let observer2 = PublishSubject<Int>()
_ = Observable.combineLatest(observer1, observer2) {
"\($0)\($1)"
}.subscribe{
print($0)
}
observer1.onNext("iOS")
observer2.onNext(6)
observer1.onNext("Swift")
observer2.onNext(66)
observer1.onNext("Rx")
observer2.onNext(666)
运行结果如下:
为了能够产生结果,两个序列中都必须保证至少有一个元素
let observer1 = Observable.just(2)
let observer2 = Observable.of(0,1,2,3,4)
_ = Observable.combineLatest(observer1, observer2){
$0 * $1
}.subscribe{
print($0)
}
运行结果如下:
CombineLatest
也有超过两个参数的版本
let observer1 = Observable.just(2)
let observer2 = Observable.of(0,1,2,3)
let observer3 = Observable.of(0,1,2,3,4)
_ = Observable.combineLatest(observer1, observer2, observer3){
($0 + $1) * $2
}.subscribe{
print($0)
}
运行结果如下:
CombineLatest
可以作用于不同的数据类型
let intObserver = Observable.just(2)
let stringObserver = Observable.just("ios")
_ = Observable.combineLatest(intObserver, stringObserver){
"\($0)" + $1
}.subscribe{
print($0)
}
运行结果如下:
Zip
zip
合并两条队列,不过它会等到两个队列的元素一一对应地凑齐之后再合并
let stringObserver = PublishSubject<String>()
let intObserver = PublishSubject<Int>()
_ = Observable.zip(stringObserver, intObserver){
"\($0) \($1)"
}.subscribe{
print($0)
}
stringObserver.onNext("iOS")
intObserver.onNext(6)
stringObserver.onNext("swift")
intObserver.onNext(66)
stringObserver.onNext("Rx")
intObserver.onNext(666)
stringObserver.onNext("不会打印")
运行结果如下:
Merge
merge
合并多个Observables
的组合成一个
let subject1 = PublishSubject<Int>()
let subject2 = PublishSubject<Int>()
_ = Observable.of(subject1,subject2)
.merge()
.subscribe{
event in
print(event)
}
subject1.onNext(20)
subject1.onNext(40)
subject1.onNext(60)
subject2.onNext(1)
subject1.onNext(80)
subject1.onNext(100)
subject2.onNext(2)
运行结果如下:
假如只开一条线程
let subject1 = PublishSubject<Int>()
let subject2 = PublishSubject<Int>()
_ = Observable.of(subject1,subject2)
.merge(maxConcurrent: 1)
.subscribe{
event in
print(event)
}
subject1.onNext(20)
subject1.onNext(40)
subject1.onNext(60)
subject2.onNext(1)
subject1.onNext(80)
subject1.onNext(100)
subject2.onNext(2)
运行结果如下:
SwitchLatest
switchLatest
将一个发射多个Observables
的Observable
转换成另一个单独的Observable
,后者发射那些Observables
最近发射的数据项
let var1 = Variable(0)
let var2 = Variable(200)
let var3 = Variable(var1.asObservable())
_ = var3
.asObservable()
.switchLatest()
.subscribe{
print($0)
}
var1.value = 1
var1.value = 2
var1.value = 3
var1.value = 4
var3.value = var2.asObservable()
var2.value = 201
var1.value = 5
var1.value = 6
var1.value = 7
运行结果如下:
Error Handing Operators
Error Handing Operators
对从Observable
发射的error
通知做出响应或者从错误中恢复,简称错误处理
CatchError
catchError
收到error
通知之后,转而发送一个没有错误的序列
let sequenceThatFails = PublishSubject<Int>()
let recoverySequence = Observable.of(100,200,300,400)
_ = sequenceThatFails
.catchError{
error in
return recoverySequence
}.subscribe{
print($0)
}
sequenceThatFails.onNext(1)
sequenceThatFails.onNext(2)
sequenceThatFails.onNext(3)
sequenceThatFails.onNext(4)
sequenceThatFails.onError(NSError(domain: "Test", code: 0, userInfo: nil))
运行结果如下:
另一种用法
let sequenceThatFails = PublishSubject<Int>()
_ = sequenceThatFails
.catchErrorJustReturn(100)
.subscribe{
print($0)
}
sequenceThatFails.onNext(1)
sequenceThatFails.onNext(2)
sequenceThatFails.onNext(3)
sequenceThatFails.onNext(4)
sequenceThatFails.onError(NSError(domain: "Test", code: 0, userInfo: nil))
运行结果如下:
Retry
retry
,如果原始的Observable
遇到错误,重新订阅。
var count = 1
let funnyLookingSequence = Observable<Int>.create{
observer in
let error = NSError(domain: "Test", code: 0, userInfo: nil)
observer.onNext(0)
observer.onNext(1)
observer.onNext(2)
if count < 2 {
observer.onError(error)
count += 1
}
observer.onNext(3)
observer.onNext(4)
observer.onNext(5)
observer.onCompleted()
return Disposables.create()
}
_ = funnyLookingSequence
.retry()
.subscribe{
print($0)
}
运行结果如下:
Observable Utility Operators
Observable Utility Operators
辅助工具
Subscribe
subscribe
前面已经接触到了,有新的事件就会触发
let sequenceOfInts = PublishSubject<Int>()
_ = sequenceOfInts
.subscribe{
print($0)
}
sequenceOfInts.onNext(1)
sequenceOfInts.onCompleted()
运行结果如下:
SubscribeNext
subscribeNext
let sequenceOfInts = PublishSubject<Int>()
_ = sequenceOfInts
.subscribe(onNext:{
print($0)
})
sequenceOfInts.onNext(1)
sequenceOfInts.onCompleted()
运行结果如下:
SubscribeCompleted
subscribeCompleted
let sequenceOfInts = PublishSubject<Int>()
_ = sequenceOfInts
.subscribe(onCompleted:{
print("已经完成了")
})
sequenceOfInts.onNext(1)
sequenceOfInts.onCompleted()
运行结果如下:
SubscribeError
subscribeError
let sequenceOfInts = PublishSubject<Int>()
_ = sequenceOfInts
.subscribe( onError:{
error in
print(error)
})
sequenceOfInts.onNext(1)
sequenceOfInts.onError(NSError(domain: "Test", code: -1, userInfo: nil))
运行结果如下:
DoOn
doOn
注册一个操作来监听事件的生命周期
let sequenceOfInts = PublishSubject<Int>()
_ = sequenceOfInts.do(onNext:
{
print("监听 event \($0)")
}, onCompleted:
{
print("监听 event \($0)")
}).subscribe{
print($0)
}
sequenceOfInts.onNext(1)
sequenceOfInts.onCompleted()
运行结果如下:
Conditional and Boolean Operators
Conditional and Boolean Operators
条件和布尔操作,可用操作符根据条件发射或变换Observables
,或者对他们做布尔运算
TakeUntil
takeUntil
当第二个Observable
发送数据之后,丢弃第一个Observable
在这之后的所有信息
let originalSequence = PublishSubject<Int>()
let whenThisSendsNextWordStops = PublishSubject<Int>()
_ = originalSequence.takeUntil(whenThisSendsNextWordStops).subscribe{
print($0)
}
originalSequence.onNext(1)
originalSequence.onNext(2)
originalSequence.onNext(3)
originalSequence.onNext(4)
originalSequence.onNext(5)
originalSequence.onNext(6)
whenThisSendsNextWordStops.onNext(1)
originalSequence.onNext(8)
运行结果如下:
TakeWhile
takeWhile
发送原始Observable
的数据,直到一个特定的条件false
let sequence = PublishSubject<Int>()
_ = sequence
.takeWhile{
event in
event < 4
}.subscribe{
print($0)
}
sequence.onNext(1)
sequence.onNext(2)
sequence.onNext(3)
sequence.onNext(4)
sequence.onNext(5)
运行结果如下:
Mathematical and Aggregate Operators
Mathematical and Aggregate Operators
算数和聚合
concat
合并两个或者以上的Observable
的消息,并且这些消息的发送时间不会交叉。
let var1 = BehaviorSubject(value: 0)
let var2 = BehaviorSubject(value: 200)
let var3 = BehaviorSubject(value: var1)
_ = var3
.concat()
.subscribe{
print($0)
}
var1.onNext(1)
var1.onNext(2)
var1.onNext(3)
var1.onNext(4)
var3.onNext(var2)
var2.onNext(201)
var1.onNext(5)
var1.onNext(6)
var1.onNext(7)
var1.onCompleted()
var2.onNext(202)
var2.onNext(203)
var2.onNext(204)
运行结果如下:
Reduce
reduce
按顺序对Observable
发射的每项数据应用一个函数并发射最终的值。
_ = Observable.of(0,1,2,3,4,5,6,7,8,9)
.reduce(0, accumulator: +)
.subscribe{
print($0)
}
运行结果如下:
Connectable Observable Operators
Connectable Observable Operators
连接操作
Delay
delay
延迟操作
let intObserver = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
_ = intObserver
.subscribe{
print("第一次走 \($0)")
}
DispatchQueue.main.asyncAfter(deadline: .now() + 5.0){
_ = intObserver.subscribe{
print("延迟5s走的 \($0)")
}
}
运行结果如下:
总结
欢迎各位同行入坑啊。。。哈哈哈哈。。。。