在前面的RxSwift中的publish和connect函数这篇文章中,我们认识了一个类PublishSubject
,既可以作为观察者,也可以作为订阅者。
PublishSubject
其实就是Subject
的一种。
那么
Subject
为什么既可以作为观察者,又可以作为订阅者呢?
我们来看看Subject
都继承的关键协议SubjectType
:
/// Represents an object that is both an observable sequence as well as an observer.
public protocol SubjectType : ObservableType {
/// The type of the observer that represents this subject.
///
/// Usually this type is type of subject itself, but it doesn't have to be.
associatedtype SubjectObserverType : ObserverType
/// Returns observer interface for subject.
///
/// - returns: Observer interface for subject.
func asObserver() -> SubjectObserverType
}
- 官方注释:这个协议代表遵守这个协议的类,既可以是一个可观察序列,也可以是一个观察者
- 观察者类型通常就是
subject
本身的类型,但也不一定必须是 - 继承了
ObservableType
,所以具有序列的特性,即有subscribe
方法,可以订阅观察者 - 关联了一个观察者类型,也就具备了观察者的特性,即有
on
函数,可以响应事件
常见的Subject
PublishSubject
可以不需要初始来进行初始化(也就是可以为空),并且它只会向订阅者发送在订阅之后才接收到的元素。
// 1:初始化序列
let publishSub = PublishSubject<Int>() //初始化一个PublishSubject 装着Int类型的序列
// 2:发送响应序列
publishSub.onNext(1)
// 3:订阅序列
publishSub.subscribe { print("订阅到了:",$0)}
.disposed(by: disposbag)
publishSub.subscribe { print("订阅到了2:",$0)}
.disposed(by: disposbag)
// 再次发送响应
publishSub.onNext(2)
publishSub.onNext(3)
/*
订阅到了: next(2)
订阅到了2: next(2)
订阅到了: next(3)
订阅到了2: next(3)
*/
-
1
不会被订阅,只会订阅到subscribe
方法之后发送的事件 - 发送的事件,可以被前面多个订阅依次响应
BehaviorSubject
通过一个默认初始值来创建。订阅前面有发送元素,就订阅最后一个元素,如果没有就订阅到默认值。
始终保存最后一个元素。
// 1:创建序列
let behaviorSub = BehaviorSubject.init(value: 100)
// 2:发送信号
behaviorSub.onNext(2)
behaviorSub.onNext(3)
// 3:订阅序列
behaviorSub.subscribe{ print("front 订阅到了:",$0)}
.disposed(by: disposbag)
// 再次发送
behaviorSub.onNext(4)
behaviorSub.onNext(5)
// 再次订阅
behaviorSub.subscribe{ print("back 订阅到了:",$0)}
.disposed(by: disposbag)
/*
front 订阅到了: next(3)
front 订阅到了: next(4)
front 订阅到了: next(5)
back 订阅到了: next(5)
*/
- 订阅前的元素,始终响应该订阅前的最后一个元素。订阅之后的就都正常响应
- 如果订阅之前没有发送元素,就响应默认值
value
- 可以订阅原因,是因为初始化的时候,有个属性,始终用来保存初始值,或者最后一个元素
public init(value: Element) {
self._element = value // 用来保存元素
}
ReplaySubject
升级版的BehaviorSubject
,BehaviorSubject
只能保存发送的最后一个元素,但是ReplaySubject
可以保存倒数的多个发送元素,因为它里面的存储属性变成了集合。
// 1:创建序列
let replaySub = ReplaySubject<Int>.create(bufferSize: 2)
// 2:发送信号
replaySub.onNext(1)
replaySub.onNext(2)
replaySub.onNext(3)
replaySub.onNext(4)
// 3:订阅序列
replaySub.subscribe{ print("订阅到了:",$0)}
.disposed(by: disposbag)
// 再次发送
replaySub.onNext(7)
replaySub.onNext(8)
replaySub.onNext(9)
/*
订阅到了: next(3)
订阅到了: next(4)
订阅到了: next(7)
订阅到了: next(8)
订阅到了: next(9)
*/
-
bufferSize
能够存储的元素个数 - 响应订阅之前发送的最后两个元素
- 订阅之后的发送元素,都正常响应,不受限制
AsyncSubject
只响应发送的最后一个元素,而且只会在发送completed
信号的时候,才会响应发送的最后一个元素。
如果在发送completed
前,有error
信号,那么将不会响应任何元素。
// 1:创建序列
let asynSub = AsyncSubject<Int>.init()
// 2:发送信号
asynSub.onNext(1)
asynSub.onNext(2)
// 3:订阅序列
asynSub.subscribe{ print("订阅到了:",$0)}
.disposed(by: disposbag)
// 再次发送
asynSub.onNext(3)
asynSub.onNext(4)
// asynSub.onError(NSError.init(domain: "lgcooci", code: 10086, userInfo: nil))
asynSub.onCompleted()
/*
没有注释onError的打印:
订阅到了: error(Error Domain=lgcooci Code=10086 "(null)")
注释onError的打印:
订阅到了: next(4)
订阅到了: completed
*/
- 如果中间有错误事件,那么只会响应错误,不会响应别的任何元素
- 没有错误事件,只会响应最后一个元素
- 关键代码如下:
switch event {
case .next(let element):
self._lastElement = element
return (Observers(), .completed)
case .error:
self._stoppedEvent = event
let observers = self._observers
self._observers.removeAll() // 遇到错误,就移除所有元素
return (observers, event)
case .completed:
let observers = self._observers
self._observers.removeAll()
if let lastElement = self._lastElement {
self._stoppedEvent = .next(lastElement)
return (observers, .next(lastElement)) // 完成的时候,响应最后一个元素
}
else {
self._stoppedEvent = event
return (observers, .completed)
}
}
-
next
事件的时候,只是保存当前的元素作为最后一个元素_lastElement
-
error
事件的时候,会清空所以的元素 -
completed
事件的时候,有最后一个元素_lastElement
就响应它,不然就只是发送一个completed
信号
Variable(已经被废弃)
有初始值value
,通过给value
赋值来发送元素。现在使用BehaviorRelay
来代替。
// 1:创建序列
let variableSub = Variable.init(1)
// 2:发送信号
variableSub.value = 100
variableSub.value = 10
// 3:订阅信号
variableSub.asObservable().subscribe{ print("订阅到了:",$0)}
.disposed(by: disposbag)
// 再次发送
variableSub.value = 1000
/*
ℹ️ [DEPRECATED] `Variable` is planned for future deprecation. Please consider `BehaviorRelay` as a replacement. Read more at: https://git.io/vNqvx
订阅到了: next(10)
订阅到了: next(1000)
*/
-
variableSub.value = 1000
赋值的方法来发送元素 - 里面是用
BehaviorSubject
实现的,所以又默认值,并且始终保存最后一个元素
public init(_ value: Element) {
self._value = value
self._subject = BehaviorSubject(value: value)
}
BehaviorRelay
替换原来的Variable
,是对BehaviorSubject
的一层封装。
不能发送错误和完成信号。
let behaviorRelay = BehaviorRelay(value:100)
// behaviorRelay.subscribe(onNext: { (num) in
// print(num)
// })
// .disposed(by: disposbag)
print("front value:\(behaviorRelay.value)")
behaviorRelay.accept(1000)
print("back value:\(behaviorRelay.value)") // 不需要写subscribe,就可以拿到最新值1000
/*
100 (放开注释过后,订阅里面打印的100)
front value:100
1000
back value:1000
订阅到了: completed
*/
- 有一个默认值
value
-
behaviorRelay.accept(1000)
用accept
来更新value
- 可以
behaviorRelay.value
这样来随时拿到最新的value
- 存储最后一个元素,
behaviorRelay.value
一般取最后一个元素value
,没有就取默认值value
public final class BehaviorRelay<Element>: ObservableType {
public typealias E = Element
private let _subject: BehaviorSubject<Element>
/// Accepts `event` and emits it to subscribers
public func accept(_ event: Element) {
self._subject.onNext(event)
}
/// Current value of behavior subject
public var value: Element {
// this try! is ok because subject can't error out or be disposed
return try! self._subject.value()
}
/// Initializes behavior relay with initial value.
public init(value: Element) {
self._subject = BehaviorSubject(value: value)
}
}
- 就是对
BehaviorSubject
的一次封装 -
accept
其实就是BehaviorSubject
的onNext
方法 - 没有主动发送
error
和completed
的方法