RxSwift核心流程
- 创建序列
- 订阅序列
- 发送信号
- 销毁序列
// 1: 创建序列
_ = Observable<String>.create { (observer) -> Disposable in
// 3:发送信号
obserber.onNext("发送信号了")
return Disposables.create() // 销毁
// 2: 订阅序列
}.subscribe(onNext: { (text) in
print("订阅到:\(text)")
})
序列创建 create
这里按着command
点进去
extension ObservableType {
/**
Creates an observable sequence from a specified subscribe method implementation.
- seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html)
- parameter subscribe: Implementation of the resulting observable sequence's `subscribe` method.
- returns: The observable sequence with the specified implementation for the `subscribe` method.
*/
public static func create(_ subscribe: @escaping (RxSwift.AnyObserver<Self.E>) -> RxSwift.Disposable) -> RxSwift.Observable<Self.E>
}
发现扩展了ObservableType
协议的一个方法,但是没有方法实现,这个时候我们可以全局搜索create(_
public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
return AnonymousObservable(subscribe)
}
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
class Producer<Element> : Observable<Element> {
override init() {
super.init()
}
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
rxAbstractMethod()
}
}
-
create
方式是利用协议拓展实现的, - 创建了一个
AnonymousObservable
匿名可观察序列,保存了外界闭包 - 继承了
Producer
,有一个subscribe
方法和run
方法
订阅序列subscribe
当订阅的时候回调用以下方法
extension ObservableType {
public func subscribe(_ on: @escaping (Event<E>) -> Void)
-> Disposable {
let observer = AnonymousObserver { e in
on(e)
}
return self.asObservable().subscribe(observer)
}
public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
let disposable: Disposable
///此处省略一些代码
let observer = AnonymousObserver<E> { event in
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
}
final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
typealias Element = ElementType
typealias EventHandler = (Event<Element>) -> Void
private let _eventHandler : EventHandler
init(_ eventHandler: @escaping EventHandler) {
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
self._eventHandler = eventHandler
}
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
}
AnonymousObserver
跟之前的AnonymousObservable
一样保存了外界的闭包,通过switch case event
来决定执行哪块代码
self.asObservable().subscribe(observer),
然后调用self.subscribe
,这里的subscribe
跟上边的不一样的哦,这里的subscribe
是AnonymousObservable
的父类Producer
的方法,然后调用run
方法,Producer
的run
方法只是一个空实现,具体的run
看子类AnonymousObservable
的实现
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
调用AnonymousObservableSink
的run
方法,这里有个疑问,为什么调用run
方法会执行AnonymousObserver
保存的闭包呢,
发送响应
这里忽略的一点AnyObserver(self)
,看下他的实现
public init<O : ObserverType>(_ observer: O) where O.E == Element {
self.observer = observer.on
}
这里初始化直接调用了observer
(AnonymousObservableSink)的on
方法,然后调用forwardon
方法
final func forwardOn(_ event: Event<O.E>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
if isFlagSet(self._disposed, 1) {
return
}
self._observer.on(event)
}
那不对啊,这里调用的是on
方法,AnonymousObserver
类里边只有调用了oncore方法才会调用subscribe
闭包啊,这里不要忘了继承关系,AnonymousObserver
继承与ObserverBase
,这里调用的on
是父类的on
,然后父类的on
方法里边在调用oncore
方法,从而执行闭包
class ObserverBase<ElementType> : Disposable, ObserverType {
typealias E = ElementType
private let _isStopped = AtomicInt(0)
func on(_ event: Event<E>) {
switch event {
case .next:
if load(self._isStopped) == 0 {
self.onCore(event)
}
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.onCore(event)
}
}
}
func onCore(_ event: Event<E>) {
//空实现,具体由子类去实现
rxAbstractMethod()
}
总结
- create: 匿名序列
AnonymousObservable(继承Producer)
保存外界闭包, - 订阅:
AnonymousObserver (继承ObserverBase )
保存外界subscribe闭包, - 发送响应: 调用subscribe,然后调用Producer的subscribe,然后调用run方法,创建
AnonymousObservableSink
调用sink的run,调用create闭包,创建AnyObserver
,调用AnonymousObservable(父类ObserverBase)
的on方法,调用oncore方法,执行订阅闭包