RxSwift之Subject

RxSwift之Subject

在 RxsWift 中还有一种非常特殊的序列 Subject - 即公也为受。Subject是一个代理,它既是Observer,也是Observable。

PublishSubject

PublishSubject 只对观察者发送订阅后产生的元素,而在订阅前发出的元素将不会发送给观察者。如果你希望观察者接收到所有的元素,你可以通过使用 Observable 的 create 方法来创建 Observable,或者使用 ReplaySubject。


PublishSubject.jpg

如果源 Observable 因为产生了一个 error 事件而中止, PublishSubject 就不会发出任何元素,而是将这个 error 事件发送出来。


PublishSubject.jpg

demo

//1:初始化一个PublishSubject 装着Int类型的序列
let publishSub = PublishSubject<Int>() 
// 2:发送响应序列
publishSub.onNext(1)
// 3:订阅序列
publishSub.subscribe { print("订阅到了:",$0)}
    .disposed(by: disposbag)
// 再次发送响应
publishSub.onNext(2)
publishSub.onNext(3)

输出:
订阅到了: next(2)
订阅到了: next(3)

解析

public final class PublishSubject<Element>
    : Observable<Element>
    , SubjectType
    , Cancelable
    , ObserverType
    , SynchronizedUnsubscribeType {

    typealias Observers = AnyObserver<Element>.s
    private var _observers = Observers()
    
    public func on(_ event: Event<Element>) {
        ......
        dispatch(self._synchronized_on(event), event)
    }

    func _synchronized_on(_ event: Event<E>) -> Observers {
        self._lock.lock(); defer { self._lock.unlock() }
        switch event {
        case .next:
            if self._isDisposed || self._stopped {
                return Observers()
            }
            
            return self._observers
        case .completed, .error:
            if self._stoppedEvent == nil {
                self._stoppedEvent = event
                self._stopped = true
                let observers = self._observers
                self._observers.removeAll()
                return observers
            }

            return Observers()
        }
    }
    
    public override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        self._lock.lock()
        let subscription = self._synchronized_subscribe(observer)
        self._lock.unlock()
        return subscription
    }

    func _synchronized_subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
        ......
        let key = self._observers.insert(observer.on)
        return SubscriptionDisposable(owner: self, key: key)
    }
}

源序列订阅时会把observer.on通过insert函数装进袋子里,_observers就是Bag

extension AnyObserver {
    typealias s = Bag<(Event<Element>) -> Void>
}

发出信号时,遍历袋子中的.on去响应

func dispatch<E>(_ bag: Bag<(Event<E>) -> Void>, _ event: Event<E>) {
    bag._value0?(event)

    if bag._onlyFastPath {
        return
    }

    let pairs = bag._pairs
    for i in 0 ..< pairs.count {
        pairs[i].value(event)
    }

    if let dictionary = bag._dictionary {
        for element in dictionary.values {
            element(event)
        }
    }
}

订阅之前发出的信号,没有装进袋子,也无法作出响应。

BehaviorSubject

当观察者对 BehaviorSubject 进行订阅时,它会将源 Observable 中最新的元素发送出来(如果不存在最新的元素,就发出默认元素)。然后将随后产生的元素发送出来。

BehaviorSubject.jpg

如果源 Observable 因为产生了一个 error 事件而中止, BehaviorSubject 就不会发出任何元素,而是将这个 error 事件发送出来。

BehaviorSubject.jpg

demo

// 1:创建序列
let behaviorSub = BehaviorSubject.init(value: 100)
// 2:发送信号
behaviorSub.onNext(2)
behaviorSub.onNext(3)
// 3:订阅序列
behaviorSub.subscribe{ print("订阅到了:",$0)}
    .disposed(by: disposbag)
// 再次发送
behaviorSub.onNext(4)
behaviorSub.onNext(5)
// 再次订阅
behaviorSub.subscribe{ print("订阅到了:",$0)}
    .disposed(by: disposbag)
    
输出:
订阅到了: next(3)
订阅到了: next(4)
订阅到了: next(5)
订阅到了: next(5)

解析

public final class BehaviorSubject<Element>
    : Observable<Element>
    , SubjectType
    , ObserverType
    , SynchronizedUnsubscribeType
    , Cancelable {

    public init(value: Element) {
        self._element = value
    }
    
    public func on(_ event: Event<E>) {
        。。。。。。
        dispatch(self._synchronized_on(event), event)
    }

    func _synchronized_on(_ event: Event<E>) -> Observers {
        self._lock.lock(); defer { self._lock.unlock() }
        if self._stoppedEvent != nil || self._isDisposed {
            return Observers()
        }
        
        switch event {
        case .next(let element):
            self._element = element
        case .error, .completed:
            self._stoppedEvent = event
        }
        
        return self._observers
    }
    
    public override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        self._lock.lock()
        let subscription = self._synchronized_subscribe(observer)
        self._lock.unlock()
        return subscription
    }

    func _synchronized_subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
        。。。。。。
        let key = self._observers.insert(observer.on)
        observer.on(.next(self._element))
    
        return SubscriptionDisposable(owner: self, key: key)
    }
}

订阅时除了把observer.on装进袋子外,还直接用.next发送了默认元素_element。发出信号时,先更新默认元素self._element = element,然后再同步发出信号。

ReplaySubject

ReplaySubject 将对观察者发送全部的元素,无论观察者是何时进行订阅的。

这里存在多个版本的 ReplaySubject,有的只会将最新的 n 个元素发送给观察者,有的只会将限制时间段内最新的元素发送给观察者。

如果把 ReplaySubject 当作观察者来使用,注意不要在多个线程调用 onNext, onError 或 onCompleted。这样会导致无序调用,将造成意想不到的结果。

ReplaySubject.jpg

demo

ReplaySubject 有两种构建可以指定bufferSize,也可以不做限制createUnbounded

// 1:创建序列
let replaySub = ReplaySubject<Int>.create(bufferSize: 2)
// let replaySub = ReplaySubject<Int>.createUnbounded()

// 2:发送信号
replaySub.onNext(1)
replaySub.onNext(2)
replaySub.onNext(3)
replaySub.onNext(4)

// 3:订阅序列
replaySub.subscribe{ print("订阅到了:",$0)}
    .disposed(by: disposbag)
// 再次发送
replaySub.onNext(7)
replaySub.onNext(8)
replaySub.onNext(9)

输出:
订阅到了: next(3)
订阅到了: next(4)
订阅到了: next(7)
订阅到了: next(8)
订阅到了: next(9)

解析

public class ReplaySubject<Element>
    : Observable<Element>
    , SubjectType
    , ObserverType
    , Disposable {

    typealias Observers = AnyObserver<Element>.s
    typealias DisposeKey = Observers.KeyType
    fileprivate var _observers = Observers()

    public static func create(bufferSize: Int) -> ReplaySubject<Element> {
        if bufferSize == 1 {
            return ReplayOne()
        } else {
            return ReplayMany(bufferSize: bufferSize)
        }
    }
    public static func createUnbounded() -> ReplaySubject<Element> {
        return ReplayAll()
    }
}

ReplaySubject只是个外部类,会根据不通的构建方式去分流给基类(子类)ReplayBufferBaseReplayOneReplayManyReplayAll)。

ReplayBufferBase

private class ReplayBufferBase<Element>
    : ReplaySubject<Element>
    , SynchronizedUnsubscribeType {
    
    override func on(_ event: Event<Element>) {
        。。。。。。
        dispatch(self._synchronized_on(event), event)
    }

    func _synchronized_on(_ event: Event<E>) -> Observers {
        self._lock.lock(); defer { self._lock.unlock() }
        if self._isDisposed {
            return Observers()
        }
        
        if self._isStopped {
            return Observers()
        }
        
        switch event {
        case .next(let element):
            self.addValueToBuffer(element)
            self.trim()
            return self._observers
        case .error, .completed:
            self._stoppedEvent = event
            self.trim()
            let observers = self._observers
            self._observers.removeAll()
            return observers
        }
    }
    
    override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        self._lock.lock()
        let subscription = self._synchronized_subscribe(observer)
        self._lock.unlock()
        return subscription
    }

    func _synchronized_subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
        if self._isDisposed {
            observer.on(.error(RxError.disposed(object: self)))
            return Disposables.create()
        }
     
        let anyObserver = observer.asObserver()
        self.replayBuffer(anyObserver)
        
        if let stoppedEvent = self._stoppedEvent {
            observer.on(stoppedEvent)
            return Disposables.create()
        }
        else {
            let key = self._observers.insert(observer.on)
            return SubscriptionDisposable(owner: self, key: key)
        }
    }
}

基类ReplayBufferBase的实现中在订阅装袋前会让各子类处理自己的replayBuffer函数,其实就是遍历发送Queue里面的元素(Queue 是个自定义的 Sequence,FIFO队列)。在on中就是把所有需要.next类型的元素通过各子类的addValueToBuffer进行入队操作,同时用trim做删减更新。

订阅者仍然是由Bag管理,元素则由Queue管理。元素的数量是bufferSize指定的,如果是 1 的话,用个属性赋值就够了,多个才需要Queue

ReplayOne

访问_value来操作元素。

fileprivate final class ReplayOne<Element> : ReplayBufferBase<Element> {
    private var _value: Element?
    
    override func addValueToBuffer(_ value: Element) {
        self._value = value
    }

    override func replayBuffer<O: ObserverType>(_ observer: O) where O.E == Element {
        if let value = self._value {
            observer.on(.next(value))
        }
    }
}

ReplayMany

ReplayManyReplayAll都是ReplayManyBase的子类。

private class ReplayManyBase<Element>: ReplayBufferBase<Element> {
    fileprivate var _queue: Queue<Element>
    
    init(queueSize: Int) {
        self._queue = Queue(capacity: queueSize + 1)
    }
    
    override func addValueToBuffer(_ value: Element) {
        self._queue.enqueue(value)
    }

    override func replayBuffer<O: ObserverType>(_ observer: O) where O.E == Element {
        for item in self._queue {
            observer.on(.next(item))
        }
    }
}

fileprivate final class ReplayMany<Element> : ReplayManyBase<Element> {
    private let _bufferSize: Int
    
    init(bufferSize: Int) {
        self._bufferSize = bufferSize
        super.init(queueSize: bufferSize)
    }
    
    override func trim() {
        while self._queue.count > self._bufferSize {
            _ = self._queue.dequeue()
        }
    }
}

ReplayAll

fileprivate final class ReplayAll<Element> : ReplayManyBase<Element> {
    init() {
        super.init(queueSize: 0)
    }
    
    override func trim() {
        
    }
}

AsyncSubject

AsyncSubject 将在源 Observable 产生完成事件后,发出最后一个元素(仅仅只有最后一个元素),如果源 Observable 没有发出任何元素,只有一个完成事件。那 AsyncSubject 也只有一个完成事件。


AsyncSubject.jpg

它会对随后的观察者发出最终元素。如果源 Observable 因为产生了一个 error 事件而中止, AsyncSubject 就不会发出任何元素,而是将这个 error 事件发送出来。


AsyncSubject.jpg

demo

// 1:创建序列
let asynSub = AsyncSubject<Int>.init()
// 2:发送信号
asynSub.onNext(1)
asynSub.onNext(2)
// 3:订阅序列
asynSub.subscribe{ print("订阅到了:",$0)}
    .disposed(by: disposbag)
// 再次发送
asynSub.onNext(3)
asynSub.onNext(4)
asynSub.onError(NSError.init(domain: "❌", code: 120, userInfo: "急救中心"))
asynSub.onCompleted()

输出:
订阅到了: error(Error Domain=❌ Code=120 "急救中心")

解析

public final class AsyncSubject<Element>
    : Observable<Element>
    , SubjectType
    , ObserverType
    , SynchronizedUnsubscribeType {
    
    typealias Observers = AnyObserver<Element>.s
    private var _lastElement: Element?

    public func on(_ event: Event<E>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        let (observers, event) = self._synchronized_on(event)
        switch event {
        case .next:
            dispatch(observers, event)
            dispatch(observers, .completed)
        case .completed:
            dispatch(observers, event)
        case .error:
            dispatch(observers, event)
        }
    }

    func _synchronized_on(_ event: Event<E>) -> (Observers, Event<E>) {
        self._lock.lock(); defer { self._lock.unlock() }
        if self._isStopped {
            return (Observers(), .completed)
        }

        switch event {
        case .next(let element):
            self._lastElement = element
            return (Observers(), .completed)
        case .error:
            self._stoppedEvent = event

            let observers = self._observers
            self._observers.removeAll()

            return (observers, event)
        case .completed:
            let observers = self._observers
            self._observers.removeAll()

            if let lastElement = self._lastElement {
                self._stoppedEvent = .next(lastElement)
                return (observers, .next(lastElement))
            } else {
                self._stoppedEvent = event
                return (observers, .completed)
            }
        }
    }

    public override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        self._lock.lock(); defer { self._lock.unlock() }
        return self._synchronized_subscribe(observer)
    }

    func _synchronized_subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
        if let stoppedEvent = self._stoppedEvent {
            ......
        }

        let key = self._observers.insert(observer.on)

        return SubscriptionDisposable(owner: self, key: key)
    }
}

AsyncSubject订阅时仍然是装袋管理,但响应的时候,_synchronized_on函数中,如果是.next就只更新最后一个元素:self._lastElement = element,并返回一个空袋子:return (Observers(), .completed)。只对.completed.error才返回订阅时的袋子容器self._observers。所以,onnext 的时候没有任何反应,error 和 completed 才会有响应。并且是最后一个元素。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 195,898评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,401评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,058评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,539评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,382评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,319评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,706评论 3 386
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,370评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,664评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,715评论 2 312
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,476评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,326评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,730评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,003评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,275评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,683评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,877评论 2 335