原创 2017-06-13
RxSwift 这个框架RP框架相信你应该不陌生了,在Objective-C中我们使用比较多的是ReactiveCocoa,从网上找到的入门知识比较零散,我现在就将从官方文档学习的笔记作为一个记录,也许对刚开始学习的你会有所帮助。如下就是我通过思维导图绘制的框架大致内容:
从上图可以看出,整个框架无外乎围绕着:
- Observable: 可观察序列
- Observer:观察者
- Subjects:观察和观察序列桥梁
- Disposable:你可以把它看做一个ARC
- Scheduler:任务执行线程
下面我们就围绕着这几大模块介绍,首先介绍一下Observable:
Observable
在ReactiveX中,Observable<Element>代表的是一个可观察序列,从字面意思可以看出这是在观察者模式中的被观察者,它会向观察对象发送事件序列:
- .Next(Element):新事件
- .Error(error):带有异常的事件完成序列
- .Complete():正常事件完结序列
Observable大致需要了解一下知识,我们将围绕到以下内容进行学习:
Observable 创建
- Create:通过编程方式来创建可观察对象,创建后的Observeble是可以接收到.onNext、.onError、.onComplete
---------- 示例代码 ----------
let observable = Observable<String>.create { (observer) -> Disposable in
observer.onNext("hello RxSwift")
observer.onCompleted()
return Disposables.create {
print("disposabled")
}
}
observable.subscribe { (event) in
print("Operator: create \(event)")
}.dispose()
---------- 运行结果 ----------
Operator: create next(hello RxSwift)
Operator: create completed
disposabled
- Defer:延时创建Observable对象,当subscribe的时候才去创建,它为每一个Observer创建一个新的Observable,也就是说每个订阅者订阅的对象都是内容相同但是完全独立的序列;deferr采用一个Factory函数型作为参数,Factory函数返回的是Observable类型。这也是其延时创建Observable的主要实现
---------- 示例代码 ----------
let defObservable = Observable<String>.deferred { () -> Observable<String> in
return Observable.create({ (observer) -> Disposable in
observer.onNext("create")
observer.onCompleted()
return Disposables.create {
print("disposed")
}
})
}
defObservable.subscribe { (event) in
print("one -- \(event)")
}
defObservable.subscribe { (event) in
print("two -- \(event)")
}
---------- 运行结果 ----------
one -- next(create)
one -- completed
disposed
two -- next(create)
two -- completed
disposed
通过上述例子,相信你已经看出来了在前面说到的:defer创建会为每个Observer创建一个新的独立的Observable,并且他们具有相同内容,但是好像并没有体现出defer延迟创建,那么请你注意下面这两个例子的对比
---------- 示例非defer,这里可能会用到后面讲的内容 ----------
var value: String?
let observable = Observable<String>.from(optional: value)
value = "hello RxSwift"
observable.subscribe { (event) in
print(event)
}.dispose()
---------- 运行结果 ----------
completed
上述结果并没有像我们想象中的那样也会打印出 onNext事件,这个是为什么呢? 因为在我们订阅的时候,数据未必已经初始化完成,现在我们把这个例子使用defer重新测试一下:
---------- 示例,通过defer改写上面 ----------
var value: String?
let observable = Observable<String>.deferred { () -> Observable<String> in
return Observable<String>.from(optional: value)
}
value = "hello RxSwift"
observable.subscribe { (event) in
print(event)
}.dispose()
---------- 运行结果 ---------
next(hello RxSwift)
completed
看到了如期打印出来的onNext结果,这可能才会是你想达到的效果,具体defer还是需要看使用场景,只有在场景中慢慢体会。
- Of / From :这两个方法都是把一个对象或者数据转换为可观察序列,这在你使用Swift中的SequenceType时很有用
---------- of 示例 ----------
Observable<String>.of("hello", "RxSwift").subscribe { (event) in
print("operator: of \(event)")
}.dispose()
---------- 运行结果 ----------
operator: of next(hello)
operator: of next(RxSwift)
operator: of completed
---------- from 示例 ----------
Observable<String>.from(["hello", "RxSwift"]).subscribe { (event) in
print("operator: from \(event)")
}.dispose()
----------- 运行结果 ----------
operator: from next(hello)
operator: from next(RxSwift)
operator: from completed
- Just:将一个对象或者一个Sequence转换为 一个 可观察序列,请注意这里与From是完全不相同的:From是转换为一个或者多个可观察序列(这取决于你是要将一个还是一个序列进行转换)。也就是说Just只能包含一个观察序列,请注意与上面例子结果进行对比
---------- Just 示例对比 ----------
Observable<String>.from(["hello", "RxSwift"]).subscribe { (event) in
print("operator: just \(event)")
}
Observable<Array<String>>.just(["hello", "RxSwift"]).subscribe { (event) in
print("operator: just \(event)")
}
--------- 运行结果 ----------
operator: from next(hello)
operator: from next(RxSwift)
operator: from completed
operator: just next(["hello", "RxSwift"])
operator: just completed
- Interval:创建一个可观察序列,以特定的时间间隔释放一系列整数(E -> Int/NSInteger)
----------- 示例 ---------
Observable<Int>.interval(1, scheduler: MainScheduler.instance)
.take(3) // 发送3次Next,后面再讲到此方法
.subscribe { (event) in
print("operator: interval \(event)")
}
----------- 运行结果 ---------
operator: interval next(0)
operator: interval next(1)
operator: interval next(2)
operator: interval completed
- Range:创建一个可观察到的,发射特定序列整数的范围(E -> Int/NSInteger)
---------- 示例 ----------
Observable<Int>.range(start: 1, count: 5).subscribe { (event) in
print("operator: range \(event)")
}.dispose()
---------- 运行结果 ---------
operator: range next(1)
operator: range next(2)
operator: range next(3)
operator: range next(4)
operator: range next(5)
operator: range completed
- Repeat: 创建一个可多次发送特定item的Observable
---------- 示例 ----------
Observable<String>.repeatElement("hello RxSwift")
.take(3)
.subscribe { (event) in
print("operator: repeat \(event)")
}.dispose()
---------- 运行结果 --------
operator: repeat next(hello RxSwift)
operator: repeat next(hello RxSwift)
operator: repeat next(hello RxSwift)
operator: repeat completed
- Start:
- Timer:在指定的时间后,发送一个特定的Item (E -> Int/NSInteger),请注意这里与Interval的区别(Interval是发送一系列特定Item,而Timer只会发送一个)
---------- 示例 ---------
Observable<Int>.timer(1, scheduler: MainScheduler.instance)
.subscribe { (event) in
print("operator: timer \(event)")
}
---------- 运行结果 ---------
operator: timer next(0)
operator: timer completed
- Empty: 只会发送一个Complete事件
---------- 示例 ----------
Observable<Int>.empty().subscribe { (event) in
print("operator: empty \(event)")
}.dispose()
---------- 运行结果 ----------
operator: empty completed
- Never:你将不会收到任何事件,并且它将永远不会终止
- Generate:其实是实现了一个迭代器的效果,他有三个参数,第一个是初始值,第二个是条件(满足条件之后就会继续执行.Next事件,第三个是迭代器,每次都会返回一个E类型,知道不满足条件为止
---------- 示例 ----------
Observable<Int>.generate(initialState: 0, condition: { (element: Int) -> Bool in
return element < 10
}, iterate: { element -> Int in
return element + 3
}).subscribe { (event) in
print("operator: generate \(event)")
}.dispose()
----------- 运行结果 ----------
operator: generate next(3)
operator: generate next(6)
operator: generate next(9)
operator: generate completed
上述例子表示的是:我有一个初始变量0, 此时满足条件 < 10 ,那么就会执行迭代器,每次+3,直到不满足条件为止
Observable 变换
- Buffer:定期的将需要发射的Items手机到一个buffer的包中,分批次的发射这些包,而不是一次发射一个Item:例如你有[1, 2, 3, 4] ,你可以一次发射一个,也可以一次发射两个Item或者三个...,你需要仔细的观察下面的输出结果,如果需要更好的理解还请你敲一遍代码
---------- 示例1 一次发射1个Item事件 --------
Observable<Int>.of(1, 2, 3, 4)
.buffer(timeSpan: 1, count: 1, scheduler: MainScheduler.instance)
.subscribe { (event) in
print("operator: buffer \(event)")
}.dispose()
---------- 运行结果 ---------
operator: buffer next([1])
operator: buffer next([2])
operator: buffer next([3])
operator: buffer next([4])
operator: buffer next([])
operator: buffer completed
---------- 示例2 一次发射3个Item事件 --------
Observable<Int>.of(1, 2, 3, 4)
.buffer(timeSpan: 1, count: 3, scheduler: MainScheduler.instance)
.subscribe { (event) in
print("operator: buffer \(event)")
}.dispose()
---------- 运行结果 ---------
operator: buffer next([1, 2, 3])
operator: buffer next([4])
operator: buffer completed
还可以有其他多个不同发射Item事件,请仔细体会上述结果
- Window:与Buffer类似,但是每次发射的不是Item,而是Observables序列(请注意与Buffer的结果比较):
------------ 示例 -----------
Observable<Int>.of(1, 2, 3, 4)
.window(timeSpan: 1, count: 3, scheduler: MainScheduler.instance)
.subscribe { (event) in
print("operator: window \(event)")
// 这里event的element 是一个Observable
event.element?.subscribe({ (event) in
print("operator: window - subObservables \(event)")
}).dispose()
}.dispose()
------------ 运行结果 -----------
operator: window next(RxSwift.AddRef<Swift.Int>)
operator: window - subObservables next(1)
operator: window - subObservables next(2)
operator: window - subObservables next(3)
operator: window - subObservables completed
operator: window next(RxSwift.AddRef<Swift.Int>)
operator: window - subObservables next(4)
operator: window - subObservables completed
operator: window completed
- FlatMap:将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable。
FlatMap操作符使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后FlatMap合并这些Observables发射的数据,最后将合并后的结果当做它自己的数据序列发射
这个方法是很有用的,例如,当你有一个这样的Observable:它发射一个数据序列,这些数据本身包含Observable成员或者可以变换为Observable,因此你可以创建一个新的Observable发射这些次级Observable发射的数据的完整集合
--------- 示例1 ---------
// 我需要在每一个Item后跟一个新的Item叫做RxSwift
Observable<Int>.of(0, 1, 2)
.flatMap { (element: Int) -> Observable<String> in
return Observable<String>.of("\(element)", "RxSwift")
}
.subscribe { (event) in
print("operator: flatMap \(event)")
}.dispose()
--------- 运行结果 -------
operator: flatMap next(0)
operator: flatMap next(RxSwift)
operator: flatMap next(1)
operator: flatMap next(RxSwift)
operator: flatMap next(2)
operator: flatMap next(RxSwift)
operator: flatMap completed
- GroupBy:将一个Observable分拆为一些Observables集合,它们中的每一个发射原始Observable的一个子序列
----------- 示例 --------
// 我需要将奇数偶数分成两组
Observable<Int>.of(0, 1, 2, 3, 4, 5)
.groupBy(keySelector: { (element) -> String in
return element % 2 == 0 ? "偶数" : "基数"
})
.subscribe { (event) in
switch event {
case .next(let group):
group.asObservable().subscribe({ (event) in
print("key: \(group.key) \(event)")
})
default:
print("")
}
}
.dispose()
--------- 运行结果 ---------
key: 基数 next(1)
key: 偶数 next(2)
key: 基数 next(3)
key: 偶数 next(4)
key: 基数 next(5)
key: 偶数 completed
key: 基数 completed
- Map:通过一个闭包将原来的序列转换为一个新序列的操作
---------- 示例 ----------
Observable<String>.of("John", "Tony", "Tom")
.map { return "hello " + $0}
.subscribe { (event) in
print("operator: map \(event)")
}.dispose()
---------- 运行结果 ----------
operator: map next(hello John)
operator: map next(hello Tony)
operator: map next(hello Tom)
operator: map completed
- Scan:从字面意思可以看出是扫描,也就是说该方法会给出一个初始值(seed),每次通过一个函数将上一次的结果与序列中的Item进行处理,每处理完成都会发射.Next事件
----------- 示例 ----------
Observable<String>.of("Rx", "Swift")
.scan("hello ") { (acum, element) in
return acum + element
}.subscribe { (event) in
print("operator: scan \(event)")
}.dispose()
---------- 运行结果 ----------
operator: scan next(hello Rx)
operator: scan next(hello RxSwift)
operator: scan completed
- Reduce:与上述Scan类似,都是初始一个Seed,每次通过函数将上一次的结果与序列中的Item进行处理,但是唯一不同的一点是,只会在最后发射一次.Next事件,将其拿来作数学计算很有用,这个我们将会在后面讲到 (** 请注意与上述Scan的结果比较**)
---------- 示例 ----------
Observable<String>.of("Rx", "Swift")
.reduce("hello ") { (accum, element) -> String in
return accum + element
}.subscribe { (event) in
print("operator: reduce \(event)")
}.dispose()
---------- 运行结果 ----------
operator: reduce next(hello RxSwift)
operator: reduce completed
Observable 过滤器
- Debounce:在规定的时间内过滤Item,如下图:如果debounce开启的时候此时2、3、4 Item正好到来,那么将无法收到它们的任何时间
----------- 示例 ----------
Observable<Int>.of(1, 2, 3, 4)
.debounce(1, scheduler: MainScheduler.instance)
.subscribe { (event) in
print("operator: debounce \(event)")
}.dispose()
----------- 运行结果 ----------
operator: debounce next(4)
operator: debounce completed
- Distinct:过滤掉可观察到的重复Item(相信你在使用数据库的时候回经常使用到distinct),在Swift中使用的是distinctUntilChanged,表示如果发射的事件与上一次不相同那么才会发射此次事件
---------- 示例 ----------
Observable<Int>.of(1, 2, 2, 2, 3)
.distinctUntilChanged()
.subscribe { (event) in
print("operator: distict \(event)")
}.dispose()
---------- 运行结果 ----------
operator: distict next(1)
operator: distict next(2)
operator: distict next(3)
operator: distict completed
- ElementAt:发射第 N 个Item
----------- 示例 ----------
Observable<Int>.of(1, 2, 2, 3, 4)
.elementAt(5)
.subscribe { (event) in
print("operator: elementAt \(event)")
}.dispose()
----------- 运行结果 -----------
operator: elementAt next(3)
operator: elementAt completed
- Filter:仅发射谓词测试通过的Items
----------- 示例 -----------
Observable<Int>.of(9, 10, 11, 12)
.filter { (element) -> Bool in
element > 10
}.subscribe { (event) in
print("operator: filter \(event)")
}.dispose()
----------- 运行结果 -----------
operator: filter next(11)
operator: filter next(12)
operator: filter completed
- Skip:发射第N(包含N)之后的Items
----------- 示例 -----------
Observable<Int>.of(9, 10, 11, 12)
.skip(2)
.subscribe { (event) in
print("operator: skip \(event)")
}.dispose()
----------- 运行结果 ---------
operator: skip next(11)
operator: skip next(12)
operator: skip completed
- Take: 发射第N(不包含N)之前的Items,与Skip相反效果
----------- 示例 -----------
Observable<Int>.of(9, 10, 11, 12)
.take(2)
.subscribe { (event) in
print("operator: take \(event)")
}.dispose()
---------- 运行结果 -----------
operator: take next(9)
operator: take next(10)
operator: take completed
- TakeLast: 发射第N(包含N)之后的Items,与Skip相同效果
----------- 示例 -----------
Observable<Int>.of(9, 10, 11, 12)
.takeLast(2)
.subscribe { (event) in
print("operator: takeLast \(event)")
}.dispose()
---------- 运行结果 -----------
operator: takeLast next(11)
operator: takeLast next(12)
operator: takeLast completed
结合多个Observables
- Merge:将多个序列的Items合并为一个序列的Items
----------- 示例 ----------
let observable1 = Observable<Int>.of(1, 3, 5)
let observable2 = Observable<Int>.of(2, 4)
Observable.merge(observable1, observable2)
.subscribe { (event) in
print("operator: merge \(event)")
}.dispose()
----------- 运行结果 ----------
operator: merge next(1)
operator: merge next(2)
operator: merge next(3)
operator: merge next(4)
operator: merge next(5)
operator: merge completed
- StartWith:在发射序列Items前新增一个Item
----------- 示例 ----------
Observable<String>.of(" ", "RxSwift", "!")
.startWith("hello")
.reduce("") { (accum, element) -> String in
return accum + element
}.subscribe { (event) in
print("operator: startWith \(event)")
}.dispose()
----------- 运行结果 ----------
operator: startWith next(hello RxSwift!)
operator: startWith completed
- Switch:当你的序列是一个事件序列的序列 (Observable<Observable<T>>) 的时候(也即是二维序列),可以使用 switch 将序列的序列转换成一维,并且在出现新的序列的时候,自动切换到最新的那个序列上。也就是说会将前一个序列未发射的Item自动取消掉
----------- 示例 ----------
let var1 = Variable(0)
let var2 = Variable(100)
// 序列的序列
let variable = Variable(var1.asObservable())
variable.asObservable()
.switchLatest()
.subscribe { (event) in
print("operator: switchLatest \(event)")
}
var1.value = 1
variable.value = var2.asObservable()
var2.value = 200
----------- 运行结果 ----------
operator: switchLatest next(0)
operator: switchLatest next(1)
operator: switchLatest next(100)
operator: switchLatest next(200)
operator: switchLatest completed
- Zip:将多个序列的Items进行一一合并,但是需要注意的是,它会等到Item对其后合并,未对齐的会舍弃
---------- 示例 -----------
let observable1 = Observable<Int>.of(1, 2, 3, 4, 5)
let observable2 = Observable<String>.of("A", "B", "C", "D")
Observable<String>.zip(observable1, observable2) { (e1: Int, e2: String) -> String in
"\(e1)\(e2)"
}.subscribe { (event) in
print("operator: zip \(event)")
}.dispose()
---------- 运行结果 ----------
operator: zip next(1A)
operator: zip next(2B)
operator: zip next(3C)
operator: zip next(4D)
operator: zip completed
- CombineLatest:如果存在两条事件队列,需要同时监听,那么每当有新的事件发生的时候,combineLatest 会将每个队列的最新的一个元素进行合并。类似于zip,但是只有当原始的Observable中的每一个都发射了一条数据时zip才发射数据。CombineLatest则在原始的Observable中任意一个发射了数据时发射一条数据。当原始Observables的任何一个发射了一条数据时,CombineLatest使用一个函数结合它们最近发射的数据,然后发射这个函数的返回值
---------- 示例 ----------
let observable1 = Observable<Int>.of(1, 2, 3, 4, 5)
let observable2 = Observable<String>.of("A", "B", "C", "D")
Observable<String>.combineLatest(observable1, observable2, resultSelector: { (e1: Int, e2: String) -> String in
"\(e1)\(e2)"
}).subscribe { (event) in
print("operator: combine \(event)")
}.dispose()
----------- 运行结果 ----------
operator: combine next(1A)
operator: combine next(2A)
operator: combine next(2B)
operator: combine next(3B)
operator: combine next(3C)
operator: combine next(4C)
operator: combine next(4D)
operator: combine next(5D)
operator: combine completed
错误处理
- Catch:在收到序列的异常事件时,通过返回另一个序列来持续发送非error事件
----------- 示例 -----------
Observable<UInt8>.create { (observer) -> Disposable in
observer.onNext(0)
observer.onError(CustomError()) // 这里自定义了一个Error
return Disposables.create()
}.catchError { (error) -> Observable<UInt8> in
print(error)
return Observable<UInt8>.of(1, 2)
}.subscribe { (event) in
print("operator: catchError \(event)")
}.dispose()
----------- 运行结果 ----------
operator: catchError next(0)
RxDemo.(CustomError in _53A9E5DF100B211646D927F0B28DE79B)
operator: catchError next(1)
operator: catchError next(2)
operator: catchError completed
- Retry:出现错误事件后,重新发送所有事件信息
---------- 示例 -----------
Observable<UInt8>.create { (observer) -> Disposable in
observer.onNext(0)
observer.onError(CustomError())
return Disposables.create()
}.retry(3) // 重复三次
.subscribe { (event) in
print("operator: retry \(event)")
}.dispose()
----------- 运行结果 -----------
operator: retry next(0)
operator: retry next(0)
operator: retry next(0)
operator: retry error(RxDemo.(CustomError in _53A9E5DF100B211646D927F0B28DE79B))
Observable实用操作
- Delay:延迟发射事件
------------ 示例 ------------
print("start time: \(Date())")
Observable<Int>.of(1, 2, 1)
.delay(1, scheduler: MainScheduler.instance)
.subscribe { (event) in
if event.isCompleted {
print("end time: \(Date())")
}
print("operator: delay \(event)")
}
------------ 运行结果 -----------
start time: 2017-06-12 06:10:32 +0000
operator: delay next(1)
operator: delay next(2)
operator: delay next(1)
end time: 2017-06-12 06:10:33 +0000
operator: delay completed
- Do: 在一个序列的每个事件执行之前添加一个执行动作
---------- 示例 ----------
Observable<Int>.of(1, 2, 1)
.do(onNext: { (_) in
print("operator: do previous next")
}, onError: { (_) in
print("operator: do previous error")
}, onCompleted: {
print("operator: do previous complete")
}, onSubscribe: nil, onSubscribed: nil, onDispose: nil)
.subscribe { (event) in
print("operator: delay \(event)")
}.dispose()
---------- 运行结果 ---------
operator: do previous next
operator: delay next(1)
operator: do previous next
operator: delay next(2)
operator: do previous next
operator: delay next(1)
operator: do previous complete
operator: delay completed
- ObserveOn:Observer在指定Scheduler中观察序列事件
----------- 示例 --------
Observable<Int>.of(1)
.observeOn(ConcurrentDispatchQueueScheduler.init(queue: DispatchQueue(label: "test")))
.subscribe { (event) in
print("operator: observeOn \(Thread.current.isMainThread) \(event)")
}
---------- 运行结果 ----------
operator: observeOn false next(1)
operator: observeOn false completed
- Subscribe:订阅事件.onNext 、.onError、.onCompleted
----------- 示例 ---------
Observable<String>.of("hello RxSwift").subscribe(onNext: { (element) in
print("operator: onNext \(element)")
}, onError: { (error) in
print("operator: onError \(error)")
}, onCompleted: {
print("operator: onCOmpleted")
}, onDisposed: nil)
----------- 运行结果 ----------
operator: onNext hello RxSwift
operator: onCOmpleted
- SubscribeOn:在指定的Scheduler中操作,参考ObserveOn
----------- 示例 ---------
Observable<Int>.of(1)
.subscribeOn(MainScheduler.instance)
.subscribe { (event) in
print("operator: observeOn \(Thread.current.isMainThread) \(event)")
}
----------- 运行结果 ----------
operator: subscribeOn true next(1)
operator: subscribeOn true completed
- TimeOut:一个序列在指定时间内未发射完成所有事件,那么将会进入.onError
---------- 示例 --------
Observable<Int>.of(1)
.delay(2, scheduler: MainScheduler.instance)
.timeout(1, scheduler: MainScheduler.instance)
.subscribe { (event) in
print("operator: timeout \(event)")
}
---------- 运行结果 ----------
operator: timeout error(Sequence timeout.)
条件运算符和布尔类型运算符
- DefaultIfEmpty:如果是序列中没有任何Item,那么给定一个default
---------- 示例 -----------
Observable<Int>.empty()
.ifEmpty(default: 0)
.subscribe({ (event) in
print("operator: ifEmpty \(event)")
})
----------- 运行结果 ---------
operator: ifEmpty next(0)
operator: ifEmpty completed
- SkipUntil:丢弃掉第一个序列的所有Items,直到第二个序列的Item出现
--------- 示例 ----------
Observable<String>.of("A", "B", "C")
.skipUntil(Observable<String>.of("D"))
.subscribe { (event) in
print("operator: skipUntil \(event)")
}
--------- 运行结果 --------
operator: skipUntil next(B)
operator: skipUntil next(C)
operator: skipUntil completed
- SkipWhile:丢弃掉所有的Items,直到满足某个不满足条件的Item出现
---------- 示例 ---------
Observable<String>.of("AD", "BD", "CD")
.skipWhile({ (element) -> Bool in
element.contains("A")
})
.subscribe { (event) in
print("operator: skipWhile \(event)")
}.dispose()
---------- 运行结果 ---------
operator: skipWhile next(BD)
operator: skipWhile next(CD)
operator: skipWhile completed
- TakeUntil:取得第一个序列所有Items,直到第二个序列发射Item或者终止
---------- 示例 --------
Observable<Int>.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
.takeUntil(Observable<Int>.empty())
.subscribe { (event) in
print("operator: takeUntil \(event)")
}.dispose()
- TakeWhile:取得第一个序列的所有Items,直到出现不满足条件的Item (请仔细体会与SkipWhile的不同之处)
----------- 示例 ---------
Observable<Int>.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
.takeWhile({ (element) -> Bool in
element < 5
})
.subscribe { (event) in
print("operator: takeUntil \(event)")
}.dispose()
--------- 运行结果 --------
operator: takeWhile next(1)
operator: takeWhile next(2)
operator: takeWhile next(3)
operator: takeWhile next(4)
operator: takeWhile completed
连接操作
- Publish:将一个普通的序列转换为可连接序列
- RefCount:使一个可连接序列表现为一个普通序列
- Replay:保证所有的Observers所观察到的事件Items都是相同的,即使它们已经在序列事件已经发射之后才订阅的 (具体使用将在下一章节的Subjects中讲到)
Observer
- Observer作为观察者,用于来订阅Observable,它可以用来通知.onNext、.onError、.onCompleted
--------- 示例 --------
Observable<String>.create { (observer) -> Disposable in
observer.on(.next("hello RxSwift"))
observer.onError(CustomError())
return Disposables.create {
print("disposable")
}
}.subscribe { (event) in
print(event)
}.dispose()
--------- 运行结果 ---------
next(hello RxSwift)
error(RxDemo.(CustomError in _53A9E5DF100B211646D927F0B28DE79B))
- AnyObserver使用:
--------- 示例 ----------
let observer1 = AnyObserver<Int> { (event) in
print(event)
}
let observer2 = AnyObserver(observer1)
Observable<Int>.of(1, 2, 3)
.subscribe(observer2)
.dispose()
---------- 运行结果 --------
next(1)
next(2)
next(3)
completed
Subject
Subject在ReactiveX的一些实现中扮演了一种桥梁或者代理的角色,它既可以作为Observer也可以作为Observable来使用。作为观察者来说它可以订阅一个或者多个可观察序列,作为可观察者来说它可以通过Items的reemitting来观察,并且还可以发射新的Items事件,我们将从如下四个Subject进行学习:
- AnySubject:仅仅只发送订阅之后的最后一个Item以及.onCompleted,如果出现错误,那么仅仅将只发送.onError
- ReplaySubject:如果一个Observer订阅了ReplaySubject,那么它将收到订阅前(在bufferSize大小内)以及订阅后的所有Items,不管Observer何时订阅的
---------- 示例 -----------
let replay = ReplaySubject<Int>.create(bufferSize: 3)
replay.onNext(0)
replay.onNext(1)
replay.onNext(2)
replay.onNext(3)
replay.subscribe { (event) in
print("Replay first \(event)")
}
replay.onNext(4)
replay.subscribe { (event) in
print("Replay second \(event)")
}
replay.onNext(5)
replay.onCompleted()
---------- 运行结果 ----------
Replay first next(1)
Replay first next(2)
Replay first next(3)
Replay first next(4)
Replay second next(2)
Replay second next(3)
Replay second next(4)
Replay first next(5)
Replay second next(5)
Replay first completed
Replay second completed
请仔细查看上面的运行结果你可以发现:上述replay被两次订阅了之后都会收到订阅的Items,但是收到订阅之前的Items是有size限制的,这就与你设置的bufferSize大小有关了。订阅之后的Items是可以完全收到的
- PublishSubject:与BehaviorSubject有略微不同,从字面意思可以看出Publish是发布,那么就意味着如果一个Observer订阅了PublishSubject,它就将收到订阅之后的所有事件,但是不包括订阅之前的事件(你也可以把他看成一个bufferSize=0的ReplaySubject)
----------- 示例 ---------
let publish = PublishSubject<Int>()
publish.onNext(1)
publish.onNext(2)
publish.subscribe { (event) in
print("Publish \(event)")
}
publish.onNext(3)
publish.onNext(4)
publish.onCompleted()
---------- 运行结果 ---------
// 从结果可以看出,订阅之前的所有Item是收不到的,请体会与BehaviorSubject之间差异
Publish next(3)
Publish next(4)
Publish completed
-
BehaviorSubject:如果一个Observer订阅了BehaviorSubject之后,那么它就将收到最近的事件,并且也能够收到在订阅之后发射的事件Item (你也可以把它看成一个bufferSize=1 的ReplaySubject)
----------- 示例 ----------
let behavior = BehaviorSubject<Int>(value: 0)
behavior.onNext(1)
behavior.onNext(2)
behavior.onNext(3)
let disposable = behavior.subscribe { (event) in
print("Behavior \(event)")
}
behavior.onNext(4)
behavior.onCompleted()
---------- 运行结果 -----------
Behavior next(3) // 收到最近Item事件
Behavior next(4) // 收到订阅之后所有事件
Behavior completed
- Variable:与BehaviorSubject类似,但是比较特殊的一点就是:在.onError之后它不会终止,只会等待到.onCompleted之后才会被deallocated
注意:至此Subject基本就是这些特殊内容,对于这些特殊内容我建议你敲代码,看结果来仔细体会它们之间的不同与相同之处
Scheduler
对于Scheduler来说,我们需要了解Concurrent(并行)、Serial(串行)Scheduler就可以了,如下为所有Scheduler脑图:
- Scheduler:当你在多线程的时候你会用到这个Scheduler
在上面的章节相信你已经熟悉了observeOn/subscribeOn,这里我们就直接上来个例子介绍:
----------- 示例 -----------
Observable<Int>.of(1, 2, 3, 4)
.observeOn(SerialDispatchQueueScheduler(internalSerialQueueName: "test"))
.map { (element) -> Int in
print("scheduler:map --> Main Thread: \(Thread.current.isMainThread)")
return element * 2
}
.subscribeOn(MainScheduler.instance)
.observeOn(MainScheduler.instance)
.subscribe { (event) in
print("scheduler:subscribe --> Main Thread: \(Thread.current.isMainThread)")
print("scheduler \(event)")
}
---------- 运行结果 -----------
scheduler:map --> Main Thread: false
scheduler:map --> Main Thread: false
scheduler:map --> Main Thread: false
scheduler:map --> Main Thread: false
scheduler:subscribe --> Main Thread: true
scheduler next(2)
scheduler:subscribe --> Main Thread: true
scheduler next(4)
scheduler:subscribe --> Main Thread: true
scheduler next(6)
scheduler:subscribe --> Main Thread: true
scheduler next(8)
scheduler:subscribe --> Main Thread: true
scheduler completed
Disposable
什么时候使用Disposable呢?我个人理解就是你订阅了一个可观察序列,如果有特殊需求你需要提前取消订阅时使用。也就是说Disposable是用来取消订阅的一个工具
- 创建:通过Disposables工具创建
let dis1 = Disposables.create()
let dis2 = Disposables.create {
print("在Dispose之前所做一些工作")
}
let dis3 = Disposables.create([dis1, dis2])
- dispose:通过.dispose()取消或者添加到DisposeBag(你可以将它看成一个非ARC机制下的AutoReleasePool
let disposable = Observable<Int>.of(0, 1, 2)
.subscribe { (event) in
print(event)
}
disposable.dispose()
// 或者
disposable.addDisposableTo(DisposeBag())
至此,RxSwift的入门笔记到此结束,接下来我会继续介绍RxSwift使用详解篇相关学习笔记,敬请关注