通过前几篇的探索,大家都
RxSwift
原理有了一个比较全面的了解,那么接下来的时间我们就来一起探索下RxSwift
在实际项目中的运用,正所谓学以致用才是王道。
今天我们一起来探索下在实际开发中经常会用到的一个--Subject
Subject
在RxSwift
中是一个非常特殊的序列,因为它既是序列,又是观察者
/// Represents an object that is both an observable sequence as well as an observer.
public protocol SubjectType : ObservableType {
/// The type of the observer that represents this subject.
///
/// Usually this type is type of subject itself, but it doesn't have to be.
associatedtype Observer: ObserverType
/// Returns observer interface for subject.
///
/// - returns: Observer interface for subject.
func asObserver() -> Observer
}
- 1.
SubjectType
继承自ObservableType
是一个序列 - 2.
SubjectType
中又包含associatedtype Observer: ObserverType
是一个观察者
通过以上2点,证明我们上面的说法是完全正确的
一.PublishSubject
PublishSubject
可以不需要初始值来进行初始化(也就是可以为空),并且它只会向订阅者发送在订阅之后才接收到的元素
首先我们来看下面这个例子
// PublishSubject
// 1:初始化序列
let publishSub = PublishSubject<Int>() //初始化一个PublishSubject 装着Int类型的序列
// 2:发送响应序列
publishSub.onNext(1)
// 3:订阅序列
publishSub.subscribe { print("订阅到了:", $0) }
.disposed(by: disposbag)
// 再次发送响应
publishSub.onNext(2)
publishSub.onNext(3)
//打印结果: 2,3
通过打印结果,我们看到只是打印了2和3,1却被忽略了,这是为什么? 我们来一起分析分析
- 1.初始化
/// Creates a subject.
public override init() {
super.init()
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
}
在初始化中感觉上面都没做,只是初始化了subject
的一个类型并且记录了一个引用计数,这里就没有其他入口了,那么我们再来看下下一步
publishSub.onNext(1)
由于不接收,这里也直接跳过,直接来到subscribe
- 2.订阅序列
/**
Subscribes an observer to the subject.
- parameter observer: Observer to subscribe to the subject.
- returns: Disposable object that can be used to unsubscribe the observer from the subject.
*/
public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
self.lock.performLocked { self.synchronized_subscribe(observer) }
}
func synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if let stoppedEvent = self.stoppedEvent {
observer.on(stoppedEvent)
return Disposables.create()
}
if self.isDisposed {
observer.on(.error(RxError.disposed(object: self)))
return Disposables.create()
}
let key = self.observers.insert(observer.on)
return SubscriptionDisposable(owner: self, key: key)
}
2.1.
self.lock.performLocked
锁,为了在多线程环境下保证程序的正常执行2.2.
synchronized_subscribe
做了一个收集的工作- 发送信号
/// Notifies all subscribed observers about next event.
///
/// - parameter event: Event to send to the observers.
public func on(_ event: Event<Element>) {
#if DEBUG
self.synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self.synchronizationTracker.unregister() }
#endif
dispatch(self.synchronized_on(event), event)
}
看完整个流程,好像我们的问题还没有得到解决,为什么在订阅之前发送的信号,我们接收不到
信号1接收不到,原因在什么地方呢
func synchronized_on(_ event: Event<Element>) -> Observers {
self.lock.lock(); defer { self.lock.unlock() }
switch event {
case .next:
if self.isDisposed || self.stopped {
return Observers()
}
return self.observers
case .completed, .error:
if self.stoppedEvent == nil {
self.stoppedEvent = event
self.stopped = true
let observers = self.observers
self.observers.removeAll()
return observers
}
return Observers()
}
}
看到这段代码之后,我想大家都应该明白了
在订阅之前,我们的Observers
是nil
,观察者为nil肯定是观察不了任何信号的,只有观察者初始化之后才能接收信号
二.BehaviorSubject
-
BehaviorSubject
通过一个默认初始值来创建,当订阅者订阅BehaviorSubject
时,会收到订阅后Subject
上一个发出的Event
,如果还没有收到任何数据,会发出一个默认值,之后就和PublishSubject
一样,正常接收新的事件 -
BehaviorSubject
和publish
稍微不同就是behavior
这个家伙有个存储功能,存储上一次的信号
// BehaviorSubject
// 1:创建序列
let behaviorSub = BehaviorSubject.init(value: 100)
// 2:发送信号
behaviorSub.onNext(2)
behaviorSub.onNext(3)
// 3:订阅序列
behaviorSub.subscribe{ print("订阅到了1:", $0) }
.disposed(by: disposbag)
// 再次发送
behaviorSub.onNext(4)
behaviorSub.onNext(5)
// 再次订阅
behaviorSub.subscribe{ print("订阅到了:", $0) }
.disposed(by: disposbag)
//打印结果:订阅到了1:3 订阅到了1:4 订阅到了1:5 订阅到了:5
三.ReplaySubject
ReplaySubject
发送源Observable的所有事件,无论observer什么时候开始订阅
// ReplaySubject
// 1:创建序列
let replaySub = ReplaySubject<Int>.createUnbounded()
//也可以通过create(bufferSize: 2)函数来控制存几个信号
// let replaySub = ReplaySubject<Int>.create(bufferSize: 2)
// 2:发送信号
replaySub.onNext(1)
replaySub.onNext(2)
replaySub.onNext(3)
replaySub.onNext(4)
// 3:订阅序列
replaySub.subscribe{ print("订阅到了:", $0) }
.disposed(by: disposbag)
// 再次发送
replaySub.onNext(7)
replaySub.onNext(8)
replaySub.onNext(9)
//打印结果:订阅到了:1 订阅到了:2 订阅到了:3 订阅到了:4 订阅到了:5 订阅到了:6 订阅到了:7 订阅到了:8 订阅到了:9
四. AsyncSubject
AsyncSubject
只发送由源Observable
发送的最后一个事件,并且只在源Observable
完成之后(如果源Observable
没有发生任何值,AsyncSubject
也不会发送任何值)
// AsyncSubject
// 1:创建序列
let asynSub = AsyncSubject<Int>.init()
// 2:发送信号
asynSub.onNext(1)
asynSub.onNext(2)
// 3:订阅序列
asynSub.subscribe{ print("订阅到了:", $0) }
.disposed(by: disposbag)
// 再次发送
asynSub.onNext(3)
asynSub.onNext(4)
// asynSub.onError(NSError.init(domain: "error", code: 10085, userInfo: nil))
asynSub.onCompleted()
//打印结果:订阅到了:4 Completed
//如果是发送了错误信号: 打印错误信号
五.BehaviorRelay
- 1.
BehaviorRelay
替换原来的Variable
- 2.
BehaviorRelay
可以存储一个信号 - 3.
BehaviorRelay
随时订阅响应
// 1:创建序列
let variableSub = BehaviorRelay.init(value: 1)
// 2:发送信号
variableSub.accept(100)
variableSub.accept(10)
// 3:订阅信号
variableSub.asObservable().subscribe{ print("订阅到了:", $0) }
.disposed(by: disposbag)
// 再次发送
variableSub.accept(1000)
//打印结果:订阅到了:10 订阅到了:1000
这篇文章以
PublishSubject
为引子介绍了RxSwift
中的五种Subject
,其他的四种原理其实和PublishSubject
差不多,有兴趣的朋友可以自行去看一看具体的代码实现