/**
Returns an observable sequence that shares a single subscription
to the underlying sequence, and immediately upon subscription
replays maximum number of elements in buffer
*/
public func shareReplay(bufferSize: Int)->Observable {
if bufferSize == 1 {
return ShareReplay1(source: self.asObservable())
else {
return self.replay(bufferSize).refCount()
}
}
// optimized version of share replay for most common case
final class ShareReplay1<Element>: Observable<Element>, ObserverType, SynchronizedUnsubscribeType {
typealias DisposeKey = Bag<AnyObserverElement>>.KeyType
private let _source: Observable<Element>
private var _lock = NSRecursiveLock()
private var _connection: SingleAssignmentDisposable?
private var _element: Element?
private var _stopped = false
private var _stopEvent = nil as Event<Element>?
private var _observers = Bag<AnyObserver<Element>>()
init(source: Observable<Element>) {
self._source = source
}
override func subscribe<O: ObserverType where O.E == E>(observer: O)->Disposable {
_lock.lock(); defer { _lock.unlock() }
return _synchronized_subscribe(observer)
}
func _synchronized_subscribe<O: ObserverType where O.E == E>(observer: O)->Disposable {
if let element = self._element {
observer.on(.Next(element))
}
if let stopEvent = self._stopEvent {
observer.on(stopEvent)
return NopDisposable.instance
}
let initialCount = self._observers.count
let disposeKey = self._observers.insert(AnyObserver(observer))
if initialCount == 0 {
let connection = SingleAssignmentDisposable()
_connection = connection
connection.disposable = self._source.subscribe(self)
}
return SubscriptionDisposable(owner: self, key: disposeKey)
}
func synchronizedUnsubscribe(disposeKey: DisposeKey) {
_lock.lock(); defer { _lock.unlock() }
_synchronized_unsubscribe(disposeKey)
}
func _synchronized_unsubscribe(disposeKey: DisposeKey) {
// if already unsubscribed, just return
if self._observers.removeKey(disposeKey) == nil {
return
}
if _observers.count == 0 {
_connection?.dispose()
_connection = nil
}
}
func on(event: Event<E>) {
_lock.lock(); defer { _lock.unlock() }
_synchronized_on(event)
}
func _synchronized_on(event: Event<E>) {
if _stopped {
return
}
switch event {
case .Next(let element):
_element = element
case .Error, .Completed:
_stopEvent = event
_stopped = true
_connection?.dispose()
connection = nil
}
observers.on(event)
}
}
shareReplay运算
最后编辑于 :
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
推荐阅读更多精彩内容
- 1. 赋值运算符 "=" Swift赋值语句不可作为条件判断语句 2.基础运算符 "+ - * / %" 3.单目...
- 一元运算符 delete delete 运算符删除对以前定义的对象属性或方法的引用。例如: delete运算符不能...
- 关系运算符 关系运算符小于、大于、小于等于和大于等于执行的是两个数的比较运算,比较方式与算术比较运算相同。每个关系...