RxSwift---Observable创建(三)

在上一个篇章中,我们一起探索了Rxswift的核心逻辑,对Rxswift有了更近一步的理解
正所谓光说不练,假把式。那么接下来我们来看看在我们平常开发中用到的一些序列创建方式

1:empty---空序列

使用empty函数创建一个空序列,事件是Int类型的,由于是空序列,也就是没有序列,所以只能complete

        let emptyOb = Observable<Int>.empty()
        emptyOb.subscribe(onNext: { number in
            print("订阅:", number)
        }, onError: { error in
            print("error:", error)
        }, onCompleted: {
            print("完成回调")
        }) {
            print("释放回调")
        }

//打印结果:完成回调
          释放回调

为什么直接完成回调了呢,我们点击源码进行分析

extension ObservableType {

    public static func empty() -> Observable<Element> {
        EmptyProducer<Element>()
    }
}

final private class EmptyProducer<Element>: Producer<Element> {
    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        observer.on(.completed)
        return Disposables.create()
    }
}
  • 1.empty和上一章讲的create类似,创建并返回Observable对象,区别在于empty创建EmptyProducer,create创建AnonymousObservable,create传入了一个闭包并保存,empty没有参数;根据继承关系,都是继承自Observable
  • 2.observer.on(.completed)看到这里就明白为什么直接完成回调了,它直接发送了完成信号

2:just---单个信号序列

该方法通过传入一个默认值来初始化,构建一个只有一个元素的Observable队列,订阅完信息自动complete

        // 单个信号序列创建
        print("******** just ********")
        let justOb = Observable<String>.just("逸华爱Moto")
        justOb.subscribe(onNext: { number in
            print("订阅:", number)
        }, onError: { error in
            print("error:", error)
        }, onCompleted: {
            print("完成回调")
        }) {
            print("释放回调")
        }

//打印结果
******** just ********
订阅: 逸华爱Moto
完成回调
释放回调

简单来说,就是传入什么输出什么,看看源码

extension ObservableType {

    public static func just(_ element: Element) -> Observable<Element> {
        Just(element: element)
    }

    //这里省略了一些代码...
}

final private class Just<Element>: Producer<Element> {
    private let element: Element
    
    init(element: Element) {
        self.element = element
    }
    
    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        observer.on(.next(self.element))
        observer.on(.completed)
        return Disposables.create()
    }
}
  • 1.justempty类似,只是保存了传入的参数
  • 2.observer.on(.next(self._element))常规订阅之后就会发送.next事件
    之后就会自动发送完成事件,跟我们效果完全吻合

3:of

  • 上面的just针对单元素,那么of则是多个元素 针对序列处理
  • 该方法可以接受可变数量的参数(必需要是同类型的)
        print("******** of ********")
        //多个元素
        Observable.of(1,2,3,4)
            .subscribe { element in
                print("订阅:", element)
            }
            .disposed(by: disposeBag)
        
        //数组
        Observable.of([1,2,3,4])
            .subscribe { element in
                print("订阅:", element)
            }
            .disposed(by: disposeBag)
        
        //字典
        Observable.of(["name":"逸华","hobby":"骑摩托"])
            .subscribe { element in
                print("订阅:", element)
            }
            .disposed(by: disposeBag)

//打印结果
******** of ********
订阅: next(1)
订阅: next(2)
订阅: next(3)
订阅: next(4)
订阅: completed
订阅: next([1, 2, 3, 4])
订阅: completed
订阅: next(["name": "逸华", "hobby": "骑摩托"])
订阅: completed

话不多说来看下具体实现

extension ObservableType {
    public static func of(_ elements: Element ..., scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<Element> {
        ObservableSequence(elements: elements, scheduler: scheduler)
    }
}

final private class ObservableSequence<Sequence: Swift.Sequence>: Producer<Sequence.Element> {
    fileprivate let elements: Sequence
    fileprivate let scheduler: ImmediateSchedulerType

    init(elements: Sequence, scheduler: ImmediateSchedulerType) {
        self.elements = elements
        self.scheduler = scheduler
    }

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}

final private class ObservableSequenceSink<Sequence: Swift.Sequence, Observer: ObserverType>: Sink<Observer> where Sequence.Element == Observer.Element {
    typealias Parent = ObservableSequence<Sequence>

    private let parent: Parent

    init(parent: Parent, observer: Observer, cancel: Cancelable) {
        self.parent = parent
        super.init(observer: observer, cancel: cancel)
    }

    func run() -> Disposable {
        return self.parent.scheduler.scheduleRecursive(self.parent.elements.makeIterator()) { iterator, recurse in
            var mutableIterator = iterator
            if let next = mutableIterator.next() {
                self.forwardOn(.next(next))
                recurse(mutableIterator)
            }
            else {
                self.forwardOn(.completed)
                self.dispose()
            }
        }
    }
}
  • 创建是一个中规中矩的流程,和上面几个几乎一样
  • 同样保存了传入的元素,并且保存了调度环境
  • 订阅流程也是利用sink,然后通过mutableIterator迭代器处理发送

4.from

  • 将可选序列转换为可观察序列。
  • 从集合中获取序列:数组,集合,set 获取序列 - 有可选项处理 - 更安全
        print("******** from ********")
        //MARK:  from
        // 从集合中获取序列:数组,集合,set 获取序列 - 有可选项处理 - 更安全
        Observable<[String]>.from(optional: nil)
            .subscribe { event in
                print(event)
            }.disposed(by: disposeBag)
        
        Observable<[String]>.from(optional: ["逸华", "爱摩托"])
            .subscribe { event in
                print(event)
            }.disposed(by: disposeBag)

//打印结果:
******** from ********
completed
next(["逸华", "爱摩托"])
completed

看下源码

extension ObservableType {
    public static func from(optional: Element?) -> Observable<Element> {
        ObservableOptional(optional: optional)
    }
}


final private class ObservableOptional<Element>: Producer<Element> {
    private let optional: Element?
    
    init(optional: Element?) {
        self.optional = optional
    }
    
    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        if let element = self.optional {
            observer.on(.next(element))
        }
        observer.on(.completed)
        return Disposables.create()
    }
}
  • self.optional = optional保存可选项
  • 订阅流程判断是否匹配传入并保存的可选项
  • 发送observer.on(.next(element))序列
  • 最后自动observer.on(.completed)完成序列发送

5.defer

  • 延时初始化序列
        print("******** defer ********")
        //MARK:  defer
        // 这里有一个需求:动态序列 - 根据外界的标识 - 动态输出
        // 使用deferred()方法延迟Observable序列的初始化,通过传入的block来实现Observable序列的初始化并且返回。
        var isOdd = false
        _ = Observable<Int>.deferred { () -> Observable<Int> in
            // 这里设计我们的序列
            isOdd = !isOdd
            if isOdd {
                return Observable.of(1)
            }
            return Observable.of(0)
        }
        .subscribe { event in
            print(event)
        }
  • self.observableFactory = observableFactory初始化保存了外部传入的闭包

通过sink.run进入都下面的代码

    func run() -> Disposable {
        do {
            let result = try self.observableFactory()
            return result.subscribe(self)
        }
        catch let e {
            self.forwardOn(.error(e))
            self.dispose()
            return Disposables.create()
        }
    }
  • 在订阅流程,这段工厂闭包被执行

6:rang

  • 生成指定范围内的可观察整数序列。
print("******** range ********")
        Observable.range(start: 0, count: 5)
            .subscribe { number in
                print(number)
            }
            .disposed(by: disposeBag)

//打印结果:
******** range ********
next(0)
next(1)
next(2)
next(3)
next(4)
completed

源码

extension ObservableType where Element: RxAbstractInteger {

    public static func range(start: Element, count: Element, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<Element> {
        RangeProducer<Element>(start: start, count: count, scheduler: scheduler)
    }
}

final private class RangeProducer<Element: RxAbstractInteger>: Producer<Element> {
    fileprivate let start: Element
    fileprivate let count: Element
    fileprivate let scheduler: ImmediateSchedulerType

    init(start: Element, count: Element, scheduler: ImmediateSchedulerType) {
        guard count >= 0 else {
            rxFatalError("count can't be negative")
        }

        guard start &+ (count - 1) >= start || count == 0 else {
            rxFatalError("overflow of count")
        }

        self.start = start
        self.count = count
        self.scheduler = scheduler
    }
    
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = RangeSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}

final private class RangeSink<Observer: ObserverType>: Sink<Observer> where Observer.Element: RxAbstractInteger {
    typealias Parent = RangeProducer<Observer.Element>
    
    private let parent: Parent
    
    init(parent: Parent, observer: Observer, cancel: Cancelable) {
        self.parent = parent
        super.init(observer: observer, cancel: cancel)
    }
    
    func run() -> Disposable {
        return self.parent.scheduler.scheduleRecursive(0 as Observer.Element) { i, recurse in
            if i < self.parent.count {
                self.forwardOn(.next(self.parent.start + i))
                recurse(i + 1)
            }
            else {
                self.forwardOn(.completed)
                self.dispose()
            }
        }
    }
}
  • 从外部传入startcount已经调度环境,并保存起来
  • i < self.parent.count通过这个进行是否发送.completed,从而递归的发送信号

7: generate

  • 该方法创建一个只有当提供的所有的判断条件都为 true 的时候,才会给出动作的 Observable 序列。
  • 初始值给定 然后判断条件1 再判断条件2 会一直递归下去,直到条件1或者条件2不满足
  • 类似数组便利
        print("******** generate ********")
        Observable.generate(initialState: 0,// 初始值
                            condition: { $0 < 10}, // 条件1
                            iterate: { $0 + 2 })  // 条件2 +2
            .subscribe { event in
                print(event)
            }.disposed(by: disposeBag)
                     
        // 数组遍历
        let arr = ["e_1","e_2","e_3","e_4","e_5"]
        Observable.generate(initialState: 0,// 初始值
                            condition: { $0 < arr.count}, // 条件1
                            iterate: { $0 + 1 })  // 条件2 +2
            .subscribe(onNext: {
                print("遍历arr:", arr[$0])
            }).disposed(by: disposeBag)
  • 参数一initialState: 初始状态。
  • 参数二 condition:终止生成的条件(返回“false”时)。
  • 参数三 iterate:迭代步骤函数。
  • 参数四 调度器:用来运行生成器循环的调度器,默认CurrentThreadScheduler.instance
  • 返回:生成的序列。

8:timer

  • 返回一个可观察序列,该序列使用指定的调度程序运行计时器,在指定的初始相对到期时间过后定期生成一个值
        print("******** timer ********")
        Observable<Int>.timer(.seconds(5), period: .seconds(2), scheduler: MainScheduler.instance)
            .subscribe { event in
                print(event)
            }
            .disposed(by: disposeBag)

        // 因为没有指定期限period,故认定为一次性
        Observable<Int>.timer(.seconds(1), scheduler: MainScheduler.instance)
            .subscribe { event in
                print("111111111 \(event)")
            }
            .disposed(by: disposeBag)
  • 参数1:第一次响应距离现在的时间
  • 参数2:时间间隔
  • 参数3:线程

9:repeatElement

  • 该方法创建一个可以无限发出给定元素的 Event的 Observable 序列(永不终止)
        print("******** repeatElement ********")
        //MARK:  repeatElement
        // 该方法创建一个可以无限发出给定元素的 Event的 Observable 序列(永不终止)
        Observable<Int>.repeatElement(5)
            .subscribe { event in
                 print("订阅:", event)
            }
            .disposed(by: disposeBag)

10:error

  • 对消费者发出一个错误信号
        print("******** error ********")
        Observable<String>.error(NSError.init(domain: "发送错误", code: 10086, userInfo: ["reason": "unknow"]))
            .subscribe { event in
                print("订阅:", event)
            }
            .disposed(by: disposeBag)

11:never

  • 该方法创建一个永远不会发出 Event(也不会终止)的 Observable序列。
  • 这种类型的响应源 在测试或者在组合操作符中禁用确切的源非常有用
        print("******** never ********")
        Observable<String>.never()
            .subscribe { event in
                print("走你", event)
            }
            .disposed(by: disposeBag)

序列的创建也是学习 RxSwift 的根基,当然RxSwift还有一些创建序列的方式,大家可以玩一玩

以上就是RxSwift 常用的序列创建方式,如果对你有帮助,请不要吝啬自己手里的点赞👍哦~~~

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,126评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,254评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,445评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,185评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,178评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,970评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,276评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,927评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,400评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,883评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,997评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,646评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,213评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,204评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,423评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,423评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,722评论 2 345

推荐阅读更多精彩内容