1.PublishSubject
let publishSub = PublishSubject<Int>()
publishSub.onNext(1)
publishSub.subscribe { print("订阅到了:",$0)}
.disposed(by: disposbag)
publishSub.subscribe { print("订阅到2:",$0)}
.disposed(by: disposbag)
publishSub.onNext(2)
publishSub.onNext(3)
打印结果是
订阅到了: next(2)
订阅到2: next(2)
订阅到了: next(3)
订阅到2: next(3)
订阅后发送的信号才能被订阅者收到,每一次订阅都会有被保存起来,当有信号发出时,会想每个订阅者发送该信号。
1.1订阅
public override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
self._lock.lock()
let subscription = self._synchronized_subscribe(observer)
self._lock.unlock()
return subscription
}
func _synchronized_subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
if let stoppedEvent = self._stoppedEvent {
observer.on(stoppedEvent)
return Disposables.create()
}
if self._isDisposed {
observer.on(.error(RxError.disposed(object: self)))
return Disposables.create()
}
let key = self._observers.insert(observer.on)
return SubscriptionDisposable(owner: self, key: key)
}
Subject被订阅时,会保存到容器中self._observers。
1.2.保存订阅者的容器self._observers 本质为Bag
插入订阅者
mutating func insert(_ element: T) -> BagKey {
let key = _nextKey
_nextKey = BagKey(rawValue: _nextKey.rawValue &+ 1)
if _key0 == nil {
_key0 = key
_value0 = element
return key
}
_onlyFastPath = false
if _dictionary != nil {
_dictionary![key] = element
return key
}
if _pairs.count < arrayDictionaryMaxSize {
_pairs.append((key: key, value: element))
return key
}
_dictionary = [key: element]
return key
}
1.第1个订阅者,用属性保存_key0,_value0
2.第2个到第订31阅者保存到_pairs数组中
3.大于31个订阅者则保存到dictionary中
1.3.可观察者发送信号
public func on(_ event: Event<Element>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
dispatch(self._synchronized_on(event), event)
}
func _synchronized_on(_ event: Event<E>) -> Observers {
self._lock.lock(); defer { self._lock.unlock() }
switch event {
case .next:
if self._isDisposed || self._stopped {
return Observers()
}
return self._observers
case .completed, .error:
if self._stoppedEvent == nil {
self._stoppedEvent = event
self._stopped = true
let observers = self._observers
self._observers.removeAll()
return observers
}
return Observers()
}
}
func dispatch<E>(_ bag: Bag<(Event<E>) -> Void>, _ event: Event<E>) {
bag._value0?(event)
if bag._onlyFastPath {
return
}
let pairs = bag._pairs
for i in 0 ..< pairs.count {
pairs[i].value(event)
}
if let dictionary = bag._dictionary {
for element in dictionary.values {
element(event)
}
}
}
_synchronized_on返回订阅者序列,也就是保存订阅者的容器bag。
dispatch想各个订阅者发送信号。发送的顺序先value0,再_pairs数组,最后Dictionary
2.BehaviorSubject
let behaviorSub = BehaviorSubject.init(value: 100)
// 2:发送信号
behaviorSub.onNext(2)
behaviorSub.onNext(3)
// 3:订阅序列
behaviorSub.subscribe{ print("订阅到1:",$0)}
.disposed(by: disposbag)
// 再次发送
behaviorSub.onNext(4)
behaviorSub.onNext(5)
// 再次订阅
behaviorSub.subscribe{ print("订阅到2:",$0)}
.disposed(by: disposbag)
//打印结果
订阅到1: next(3)
订阅到1: next(4)
订阅到1: next(5)
订阅到2: next(5)
BehaviorSubject和PublishSubject差不多,唯一区别当BehaviorSubject被订阅者,订阅者会收到订阅者之前BehaviorSubject发送的最后的一个信号。
func _synchronized_subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
if self._isDisposed {
observer.on(.error(RxError.disposed(object: self)))
return Disposables.create()
}
if let stoppedEvent = self._stoppedEvent {
observer.on(stoppedEvent)
return Disposables.create()
}
let key = self._observers.insert(observer.on)
observer.on(.next(self._element))
return SubscriptionDisposable(owner: self, key: key)
}
在_synchronized_subscribe中,我们可以看到observer.on(.next(self._element))。
self._element是发送信号的最新的值。
3.ReplaySubject
let replaySub = ReplaySubject<Int>.create(bufferSize: 2)
// let replaySub = ReplaySubject<Int>.createUnbounded()
// 2:发送信号
replaySub.onNext(1)
replaySub.onNext(2)
replaySub.onNext(3)
replaySub.onNext(4)
// 3:订阅序列
replaySub.subscribe{ print("订阅者1:",$0)}
.disposed(by: disposbag)
// 再次发送
replaySub.onNext(7)
replaySub.onNext(8)
replaySub.onNext(9)
replaySub.subscribe{ print("订阅者2:",$0)}
.disposed(by: disposbag)
//打印结果
订阅者1: next(3)
订阅者1: next(4)
订阅者1: next(7)
订阅者1: next(8)
订阅者1: next(9)
订阅者2: next(8)
订阅者2: next(9)
有一个queue保存发送的信号。
4.AsyncSubject
let asynSub = AsyncSubject<Int>.init()
// 2:发送信号
asynSub.onNext(1)
asynSub.onNext(2)
// 3:订阅序列
asynSub.subscribe{ print("订阅到了:",$0)}
.disposed(by: disposbag)
// 再次发送
asynSub.onNext(3)
asynSub.onNext(4)
asynSub.onCompleted()
asynSub.subscribe{ print("订阅到2:",$0)}
.disposed(by: disposbag)
//打印日志
订阅到了: next(4)
订阅到了: completed
订阅到2: next(4)
订阅到2: completed
为什么呢?我们看源码分析一下
public func on(_ event: Event<E>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
let (observers, event) = self._synchronized_on(event)
switch event {
case .next:
dispatch(observers, event)
dispatch(observers, .completed)
case .completed:
dispatch(observers, event)
case .error:
dispatch(observers, event)
}
}
func _synchronized_on(_ event: Event<E>) -> (Observers, Event<E>) {
self._lock.lock(); defer { self._lock.unlock() }
if self._isStopped {
return (Observers(), .completed)
}
switch event {
case .next(let element):
self._lastElement = element
return (Observers(), .completed)
case .error:
self._stoppedEvent = event
let observers = self._observers
self._observers.removeAll()
return (observers, event)
case .completed:
let observers = self._observers
self._observers.removeAll()
if let lastElement = self._lastElement {
self._stoppedEvent = .next(lastElement)
return (observers, .next(lastElement))
}
else {
self._stoppedEvent = event
return (observers, .completed)
}
}
}
我们看一下_synchronized_on,我们看switch event
.next,用self._lastElement记录传来的数据,但是在return观察者是重新初始化的。所以next事件只记录数据,不像观察者发信号。
completed事件时,先用一临时变量记录观察者,把我们保存的观察者remove调,保存_stoppedEvent,并向观察者发送信号。如果重新订阅subject,则会收到最后一个最后一个信号和完成。
error事件,记录_stoppedEvent,并向观察者分发。