Subject介绍
从之前的文章RxSwift(三)-- RxSwift使用介绍Observable的创建中,我们可以知道,当我们需要创建一个
Observable
的时候,要预先将要发出的数据都准备好,等到有人订阅它时再将数据通过Event
发出去。
不过,有时我们也希望Observable
在运行时能动态地“获得”或者说“产生”出一个新的数据,再通过Event
发送出去。比如:订阅一个输入框的输入内容,当用户每输入一个字后,这个输入框关联的Observable
就会发出一个带有输入内容的Event
,通知给所有订阅者。
这个时候,我们要介绍的Subject
就登场了。
Subject基本介绍
-
Subject
它既是订阅者,也是Observable
序列:
- 因为它能够动态地接收新的值,所以它是订阅者。
- 因为
Subject
有了新的值以后,会通过Event
事件将新的值发出来给到订阅者,所以它又是一个Observable
序列。
-
RxSwift
中Subject
的分类是:PublishSubject
、BehaviorSubject
、ReplaySubject
、AsyncSubject
、BehaviorRelay
,它们之间既有各自的特点,也有相同之处:
- 它们都是
Observable
序列,订阅者都能收到它们发出的新的Event
。- 等到
Subject
发出.complete
或者.error
的Event
,Subject
也就结束了,也就不会再发出.next
事件了。- 如果在
Subject
结束后再订阅的订阅者,也可以收到Subject
发出的一条.complete
或者.error
事件,用来告诉订阅者已经结束了。- 它们之间的最大区别是:当一个新的订阅者刚订阅它的时候,能不能收到
Subject
以前发出过的旧Event
,如果能的话又能收到多少个。
-
Subject
的常用方法:
onNext(:)
:是on(.next(:))
的简便写法。该方法相当于Subject
接收到一个.next
事件。onError(:)
:是on(.error(:))
的简便写法。该方法相当于Subject
接收到一个.error
事件。onCompleted()
:是on(.completed)
的简便写法。该方法相当于 subject 接收到一个.completed
事件。
PublishSubject
- 介绍
PublishSubject
不需要初始值就能创建(也就是可以为空)。PublishSubject
的订阅者从他们开始订阅的时间点起,可以收到订阅后Subject
发出的新Event
事件,而不会收到他们在订阅前已发出的Event
事件。
- 代码展示
// 创建一个PublishSubject
let subject = PublishSubject<String>()
// 由于当前没有任何订阅者,所以这条信息不会输出到控制台
subject.onNext("111")
// 第1次订阅subject
subject.subscribe(onNext: { string in
print("第1次订阅:", string)
}, onCompleted:{
print("第1次订阅:onCompleted")
}).disposed(by: disposeBag)
// 当前有1个订阅,则该信息会输出到控制台
subject.onNext("222")
// 第2次订阅subject
subject.subscribe(onNext: { string in
print("第2次订阅:", string)
}, onCompleted:{
print("第2次订阅:onCompleted")
}).disposed(by: disposeBag)
// 当前有2个订阅,则该信息会输出到控制台
subject.onNext("333")
// 让subject结束
subject.onCompleted()
// subject完成后会发出.next事件了。
subject.onNext("444")
// subject完成后它的所有订阅(包括结束后的订阅),都能收到subject的.completed事件,
subject.subscribe(onNext: { string in
print("第3次订阅:", string)
}, onCompleted:{
print("第3次订阅:onCompleted")
}).disposed(by: disposeBag)
运行结果:
第1次订阅: 222
第1次订阅: 333
第2次订阅: 333
第1次订阅:onCompleted
第2次订阅:onCompleted
第3次订阅:onCompleted
BehaviorSubject
- 介绍
BehaviorSubject
需要通过一个默认初始值来创建。- 当一个订阅者来订阅它的时候,这个订阅者会立即收到
BehaviorSubjects
上一个发出的event
事件,如果还没有收到任何数据,则会发出一个默认值。之后就跟PublishSubject
正常的情况一样,它也会接收到BehaviorSubject
之后发出的新的event
事件。
- 代码展示
// 创建一个BehaviorSubject
let subject = BehaviorSubject(value: "111")
// 第1次订阅subject
subject.subscribe { event in
print("第1次订阅:", event)
}.disposed(by: disposeBag)
// 发送next事件
subject.onNext("222")
// 发送error事件
subject.onError(NSError(domain: "local", code: 0, userInfo: nil))
// 第2次订阅subject
subject.subscribe { event in
print("第2次订阅:", event)
}.disposed(by: disposeBag)
运行结果:
第1次订阅: next(111)
第1次订阅: next(222)
第1次订阅: error(Error Domain=local Code=0 "(null)")
第2次订阅: error(Error Domain=local Code=0 "(null)")
ReplaySubject
- 介绍
ReplaySubject
在创建时候需要设置一个bufferSize
,表示它对于它发送过的event
的缓存个数。- 比如一个
ReplaySubject
的bufferSiz
e 设置为 2,它发出了 3 个.next
的event
,那么它会将后两个(最近的两个)event
给缓存起来。此时如果有一个subscriber
订阅了这个ReplaySubject
,那么这个subscriber
就会立即收到前面缓存的两个.next
的event
。- 如果一个
subscriber
订阅已经结束的ReplaySubject
,除了会收到缓存的.next
的event
外,还会收到那个终结的.error
或者.complete
的event
。
- 代码展示
//创建一个bufferSize为2的ReplaySubject
let subject = ReplaySubject<String>.create(bufferSize: 2)
//连续发送3个next事件
subject.onNext("111")
subject.onNext("222")
subject.onNext("333")
//第1次订阅subject
subject.subscribe { event in
print("第1次订阅:", event)
}.disposed(by: disposeBag)
//再发送1个next事件
subject.onNext("444")
//第2次订阅subject
subject.subscribe { event in
print("第2次订阅:", event)
}.disposed(by: disposeBag)
//让subject结束
subject.onCompleted()
//第3次订阅subject
subject.subscribe { event in
print("第3次订阅:", event)
}.disposed(by: disposeBag)
运行结果:
第1次订阅: next(222)
第1次订阅: next(333)
第1次订阅: next(444)
第2次订阅: next(333)
第2次订阅: next(444)
第1次订阅: completed
第2次订阅: completed
第3次订阅: next(333)
第3次订阅: next(444)
第3次订阅: completed
BehaviorRelay
- 介绍
BehaviorRelay
是作为Variable
的替代者出现的。它的本质其实也是对BehaviorSubject
的封装,所以它也必须要通过一个默认的初始值进行创建。BehaviorRelay
具有BehaviorSubject
的功能,能够向它的订阅者发出上一个event
以及之后新创建的 event。- 与
BehaviorSubject
不同的是,不需要也不能手动给BehaviorReply
发送completed
或者error
事件来结束它(BehaviorRelay
在销毁时也不会自动发送.complete
的event
)。BehaviorRelay
有一个value
属性,我们通过这个属性可以获取最新值。而通过它的accept()
方法可以对值进行修改。
- 代码展示
//创建一个初始值为111的BehaviorRelay
let subject = BehaviorRelay<String>(value: "111")
//修改value值
subject.accept("222")
//第1次订阅
subject.asObservable().subscribe {
print("第1次订阅:", $0)
}.disposed(by: disposeBag)
//修改value值
subject.accept("333")
//第2次订阅
subject.asObservable().subscribe {
print("第2次订阅:", $0)
}.disposed(by: disposeBag)
//修改value值
subject.accept("444")
运行结果:
第1次订阅: next(222)
第1次订阅: next(333)
第2次订阅: next(333)
第1次订阅: next(444)
第2次订阅: next(444)
- 另外,如果想将新值合并到原值上,可以通过
accept()
方法与value
属性配合来实现。(这个常用在表格上拉加载功能上,BehaviorRelay
用来保存所有加载到的数据)
//创建一个初始值为包含一个元素的数组的BehaviorRelay
let subject = BehaviorRelay<[String]>(value: ["1"])
//修改value值
subject.accept(subject.value + ["2", "3"])
//第1次订阅
subject.asObservable().subscribe {
print("第1次订阅:", $0)
}.disposed(by: disposeBag)
//修改value值
subject.accept(subject.value + ["4", "5"])
//第2次订阅
subject.asObservable().subscribe {
print("第2次订阅:", $0)
}.disposed(by: disposeBag)
//修改value值
subject.accept(subject.value + ["6", "7"])
运行结果:
第1次订阅: next(["1", "2", "3"])
第1次订阅: next(["1", "2", "3", "4", "5"])
第2次订阅: next(["1", "2", "3", "4", "5"])
第1次订阅: next(["1", "2", "3", "4", "5", "6", "7"])
第2次订阅: next(["1", "2", "3", "4", "5", "6", "7"])
AsyncSubject
- 介绍
- 一个
AsyncSubject
只在原始Observable
完成后,发射来自原始Observable
的最后一个值。- 如果原始
Observable
没有发射任何值,AsyncObject
也不发射任何值,它会把这最后一个值发射给任何后续的观察者。- 如果原始的
Observable
因为发生了错误而终止,AsyncSubject
将不会发射任何数据,只是简单的向前传递这个错误通知。
- 代码展示
let subject = AsyncSubject<Int>()
subject.onNext(1)
subject.subscribe(onNext: { int in
print("observerA: \(int)")
}, onCompleted: {
print("observerA: onCompleted")
}).disposed(by: disposeBag)
subject.onNext(2)
subject.subscribe(onNext: { int in
print("observerB: \(int)")
}, onCompleted: {
print("observerB: onCompleted")
}).disposed(by: disposeBag)
subject.onNext(3)
subject.subscribe(onNext: { int in
print("observerC: \(int)")
}, onCompleted: {
print("observerC: onCompleted")
}).disposed(by: disposeBag)
subject.onCompleted()
subject.onNext(4)
subject.subscribe(onNext: { int in
print("observerD: \(int)")
}, onCompleted: {
print("observerD: onCompleted")
}).disposed(by: disposeBag)
运行结果:
observerA: 3
observerB: 3
observerC: 3
observerA: onCompleted
observerB: onCompleted
observerC: onCompleted
observerD: 3
observerD: onCompleted