RxSwift学习--可观察序列Observable

前言

Observable英文直译为:可观察的,可看见的。但是在RxSwift普遍的称它为“可观察序列”,它的作用主要用来形成一条数据流或者事件流,所有的操作产生的事件都会通过Observable进行传输。

Observable.png

Observable里有三种事件(Event: Event枚举类型,有三个成员,nexterrorcompleted ) Observable发送的所有事件都是一个Event:

  • next事件主要是当Observable里出现新的数据时会发出的事件,同时该事件会携带新的数据对象。

  • completed事件是当Observable不再有新的数据出现,Observable被标记完成,不再产生数据.

  • error事件是当数据流遇到了错误会发出的事件,该事件也会导致Observable结束。

Observable(可观察序列)

1. Observable的创建

RxSwift中,可以有多种创建Observable对象的方法,下面我们逐一介绍:

(1). create()方法

public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
        return AnonymousObservable(subscribe)
    }

create()方法的参数是一个函数(闭包),根据闭包来创建序列,在闭包里面可自定义事件。函数需要传入参数AnyObserver类型,返回的是Disposable,AnyObserver其实就是观察者,Disposable是一个协议接口,里面只有一个dispose方法,用来释放一些资源。整个create()方法返回的是一个AnonymousObservable(匿名Observable).

create.png

例子:

    let createOb = Observable<Int>.create{ (observer) -> Disposable in
        observer.onNext(1)
        observer.onCompleted()
        return Disposables.create()
    }
    
   let _ = createOb.subscribe(onNext: { (number) in
        print("订阅:",number)
    }, onError: { (error) in
        print("error:",error)
    }, onCompleted: {
        print("完成回调")
    }) {
        print("释放回调")
    }

(2). empty()方法

 public static func empty() -> Observable<E> {
        return EmptyProducer<E>()
    }
    
final private class EmptyProducer<Element>: Producer<Element> {
    override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        observer.on(.completed)
        return Disposables.create()
    }
}

empty()方法返回一个EmptyProducer类,在这个类的内部实现了subscribe()订阅方法,且只有一个.completed状态,所以,empty()方法是一个空方法,里面没有onNext事件处理,只会处理onComplete事件。

empty.png

例子:

let emtyOb = Observable<Int>.empty()
let  _ = emtyOb.subscribe(onNext: { (number) in
        print("订阅:",number)
        }, onError: { (error) in
            print("error:",error)
        }, onCompleted: {
            print("完成回调")
        }) {
            print("释放回调")
        }

(3). just()方法

 public static func just(_ element: E) -> Observable<E> {
        return Just(element: element)
    }

just()方法是单个信号序列创建,只能处理单个事件,简单来说,我们使用just()方法时不能将一组数据一起处理,只能一个一个数据进行处理。just() 根据传入的一个参数来创建序列,它会向订阅者发送两个事件,第一个发送带元素数据的.next事件,第二个发送 .completed事件。

just.png

例子:

    let array = ["You","Me"]
        Observable<[String]>.just(array)
            .subscribe { (event) in
                print(event)
            }.disposed(by: disposeBag)
    let _ = Observable<[String]>.just(array).subscribe(onNext: { (number) in
            print("订阅:",number)
        }, onError: { (error) in
            print("error:",error)
        }, onCompleted: {
            print("完成回调")
        }) {
            print("释放回调")
        }

(4). of()方法

public static func of(_ elements: E ..., scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> {
        return ObservableSequence(elements: elements, scheduler: scheduler)
    }

of()方法可以接受多个参数来创建实例,但这些参数必须是同类型,也就是说,of()方法是just()方法的升级版,它可以将一序列的事情组合起来一起处理。

of.png

例子:

        Observable<String>.of("Henry","Jeannie")
            .subscribe { (event) in
                print(event)
            }.disposed(by: disposeBag)
            
 // 字典
        Observable<[String: Any]>.of(["name":"HuGe","age":18])
            .subscribe { (event) in
                print(event)
            }.disposed(by: disposeBag)
        
 // 数组
        Observable<[String]>.of(["Peter","July"])
            .subscribe { (event) in
                print(event)
            }.disposed(by: disposeBag)

(5). from()方法

public static func from(optional: E?) -> Observable<E> {
    return ObservableOptional(optional: optional)
}

from()方法只接收数组(数组,集合)作为参数,并抽取出数组里的元素来作为数据流中的元素,也就是说,from() 根据传入的数组元素来创建序列。它会依次发出.next事件,最后发出.completed 事件,结果和of()方法一样。

from.png

例子:

Observable<[String]>.from(optional: ["Hu","Ge"])
            .subscribe { (event) in
                print(event)
            }.disposed(by: disposeBag)

(6). deferred()方法

public static func deferred(_ observableFactory: @escaping () throws -> Observable<E>)
        -> Observable<E> {
        return Deferred(observableFactory: observableFactory)
    }

deferred()方法是延时创建Observable对象,当subscribe的时候才去创建,它为每一个bserver创建一个新的Observable

deferred.png

例子:

//用于标记是奇数、还是偶数
var isOdd = true
//使用deferred()方法延迟Observable序列的初始化,通过传入的block来实现Observable序列的初始化并且返回。
let factory : Observable<Int> = Observable.deferred {
    //让每次执行这个block时候都会让奇、偶数进行交替
    isOdd = !isOdd
    //根据isOdd参数,决定创建并返回的是奇数Observable、还是偶数Observable
    if isOdd {
        return Observable.of(1, 3, 5 ,7)
    }else {
        return Observable.of(2, 4, 6, 8)
    }
}
//第1次订阅测试
factory.subscribe { event in
    print("\(isOdd)", event)
}
//第2次订阅测试
factory.subscribe { event in
    print("\(isOdd)", event)
}

(7). range()方法

 public static func range(start: E, count: E, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> {
        return RangeProducer<E>(start: start, count: count, scheduler: scheduler)
    }

range()方法通过指定起始和结束数值,创建一个以这个范围内所有值作为初始值的 Observable 序列。range()方法其实和of()方法很相似,其功能和of()差不多,我们只要输出startcount然后就能生成一组数据,让它们执行onNext。值得注意的是,range()方法只生成Observable类型。

range.png

例子:

Observable.range(start: 2, count: 5)
    .subscribe { (event) in
        print(event)
    }.disposed(by: disposeBag)

(8). generate()方法

public static func generate(initialState: E, condition: @escaping (E) throws -> Bool, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance, iterate: @escaping (E) throws -> E) -> Observable<E> {
        return Generate(initialState: initialState, condition: condition, iterate: iterate, resultSelector: { $0 }, scheduler: scheduler)
    }

generate()方法创建一个只有当提供的所有的判断条件都为 true 的时候,才会给出动作的Observable序列。generate()方法是一个迭代器,它一直循环onNext事件,直到condition不满足要求退出。generate()有四个参数,第一个是最开始的循环变量,第二个是条件,第三个是迭代器,这个迭代器每次运行都会返回一个E类型,作为下一次是否执行onNext事件源,而是否正的要执行则看是否满足condition条件。其实我们可以理解generate()就是一个循环体,其内部实现也正是一个循环,类似于数组的遍历,具体实现代码在GenerateSinkrun方法。

generate.png

例子:

Observable.generate(initialState: 0,// 初始值
                    condition: { $0 < 10}, // 条件1
                    iterate: { $0 + 2 })  // 条件2 +2
        .subscribe { (event) in
                print(event)
        }.disposed(by: disposeBag)

(9). timer()方法

public static func timer(_ dueTime: RxTimeInterval, period: RxTimeInterval? = nil, scheduler: SchedulerType)
        -> Observable<E> {
        return Timer(
            dueTime: dueTime,
            period: period,
            scheduler: scheduler
        )
    }

timer()方法是创建的Observable 序列在经过设定的一段时间后,产生唯一的一个元素,或者每隔一段时间产生一个元素。这取决于period是否有值。

image

例子:

let observable = Observable<Int>.timer(5, period: 1, scheduler: MainScheduler.instance)
observable.subscribe { event in
    print(event)
}

(10). interval()方法

public static func interval(_ period: RxTimeInterval, scheduler: SchedulerType)
        -> Observable<E> {
        return Timer(
            dueTime: period,
            period: period,
            scheduler: scheduler
        )
    }

interval()方法创建的 Observable 序列每隔一段设定的时间,会发出一个索引数的元素。而且它会一直发送下去。差别在于timer()可以设置间隔时间和持续时间,而interval()的间隔时间和持续时间是一样的。

例子:

//下面方法让其每 1秒发送一次,并且是在主线程(MainScheduler)发送。
let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
observable.subscribe { event in
    print(event)
}

(11). repeatElement()方法

 public static func repeatElement(_ element: E, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> {
        return RepeatElement(element: element, scheduler: scheduler)
    }

repeatElement()方法创建一个可以无限发出给定元素的EventObservable序列,它是无限循环的,会一直循环onNext方法,不会终止。

repeatElement.png

例子:

Observable<Int>.repeatElement(5)
    .subscribe { (event) in  
     print("订阅:",event)
    }
    .disposed(by: disposeBag)

(12). error()方法

public static func error(_ error: Swift.Error) -> Observable<E> {
        return ErrorProducer(error: error)
}

final private class ErrorProducer<Element>: Producer<Element> {
    private let _error: Swift.Error
    
    init(error: Swift.Error) {
        self._error = error
    }
    
    override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        observer.on(.error(self._error))
        return Disposables.create()
    }
}

error()方法是返回一个只能调用onError方法的Observable序列。其中的onNextOnCompleted方法是不会执行的。

error.png

例子:

Observable<String>.error(NSError.init(domain: "ObservableError", code: 10010, userInfo: ["reason":"unknow"]))
            .subscribe { (event) in
                print("订阅:",event)
            }
            .disposed(by: disposeBag)

(13). never()方法

public static func never() -> Observable<E> {
        return NeverProducer()
    }
    
final private class NeverProducer<Element>: Producer<Element> {
    override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        return Disposables.create()
    }
}

never()方法创建一个永远不会发出 Event,也不会终止的 Observable序列。就是返回一个无终止的观察者事件序列,可以用来表示无限持续时间。

never.png

例子:

let neverSequence = Observable<Int>.never()
    _ = neverSequence.subscribe { event in
        print(event)
   }

2. Observable的订阅

创建完了 Observable,还要使用 subscribe() 方法来订阅它,并接收它发出的 Event

(1). Subscribes event handler

public func subscribe(_ on: @escaping (Event<E>) -> Void)
        -> Disposable {
        
        }

这种subscribe()方法订阅了一个 Observable 对象,该方法的 block的回调参数就是被发出的 event 事件.

例子:

let observable = Observable.of(1, 2, 3)
         
observable.subscribe { event in
    print(event)
    print(event.element)
}

这里可以看到初始化 Observable 序列时设置的默认值都按顺序通过 .next 事件发送出来,等到数据都发送完毕,它还会自动发一个 .completed 事件出来。

(2). Subscribes element handler

public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
        -> Disposable {
        }

这种subscribe()预订方法,它可以把 event 进行分类:.next.completed.error以处理不同类型的event,同时会把 event 携带的数据直接解包出来作为参数.

例子:

let observable = Observable.of("A", "B", "C")
         
observable.subscribe(onNext: { element in
    print(element)
}, onError: { error in
    print(error)
}, onCompleted: {
    print("completed")
}, onDisposed: {
    print("disposed")
})

onNextonErroronCompletedonDisposed 这四个回调参数都是有默认值的,即它们都是可选的。所以也可以只处理 onNext而不管其他的情况。

3. Observable的事件生命周期

public func `do`(onNext: ((E) throws -> Void)? = nil, onError: ((Swift.Error) throws -> Void)? = nil, onCompleted: (() throws -> Void)? = nil, onSubscribe: (() -> Void)? = nil, onSubscribed: (() -> Void)? = nil, onDispose: (() -> Void)? = nil)
        -> Observable<E> {
            return Do(source: self.asObservable(), eventHandler: { e in
                switch e {
                case .next(let element):
                    try onNext?(element)
                case .error(let e):
                    try onError?(e)
                case .completed:
                    try onCompleted?()
                }
            }, onSubscribe: onSubscribe, onSubscribed: onSubscribed, onDispose: onDispose)
    }

Observable 的某些事件产生时,你可以使用 do() 操作符来注册一些回调操作。这些回调会被单独调用,它们会和 Observable 原本的回调分离。

可以使用 do() 方法来监听事件的生命周期,它会在每一次事件发送前被调用,同时它和 subscribe() 一样,可以通过不同的回调处理不同类型的 event

例子:

let observable = Observable.of("A", "B", "C")
 
observable.do(onNext: { element in
        print("Next:", element)
    }, onError: { error in
        print("Error:", error)
    }, onCompleted: {
        print("Completed")
    }, onDispose: {
        print("Disposed")
    })
   observable.subscribe(onNext: { element in
        print(element)
    }, onError: { error in
        print(error)
    }, onCompleted: {
        print("completed")
    }, onDisposed: {
        print("disposed")
    })

4. Observable的销毁

通过上面的这些内容,我们大概已经知道了,当一个Observable 序列被正常创建出来后它不会马上就开始被激活从而发出 event,而是要等到它被订阅了才会激活它。而 Observable 序列激活之后要一直等到它发出了.error 或者 .completedevent 后,它才被终结。

(1). 显式销毁--dispose() 方法

当一个订阅结束了不再需要了,就可以调用 dispose() 方法把这个订阅给销毁掉,防止内存泄漏。订阅行为被 dispose 了,订阅将被取消,并且内部资源都会被释放,那么之后 Observable如果再发出 event,这个已经 dispose 的订阅就收不到消息了。通常情况下,你是不需要手动调用 dispose 方法的。

image

例子:

let observable = Observable.of("A", "B", "C")
let observableDispose = observable.subscribe { event in
    print(event)
}
observableDispose.dispose()

(2). 隐式销毁--DisposeBag 方法

DisposeBag创建一个对象来管理多个订阅行为的销毁,可以把一个 DisposeBag 对象看成一个回收垃圾袋,把用过的订阅行为都放进去。
而这个 DisposeBag就会在自己快要销毁 的时候,对它里面的所有订阅行为都调用 dispose() 方法。

image

例子:

let disposeBag = DisposeBag()

let observable = Observable.of(1, 2, 3)
observable.subscribe { event in
    print(event)
}.disposed(by: disposeBag)

总结

这篇内容简单的介绍了Observable序列的创建,订阅,和销毁。这里创建的序列都是最基本的序列,在RxSwift 里面Observable也存在一些特征序列,DriverSingleControlEvent...这些特征序列可以帮助我们更准确的描述序列,让我们能够用更加优雅的方式书写代码,关于这些特征序列后面我会再学习。关于Observable的销毁好像还有一种方法,后面研究到了会回来补充。如文中有错误还请各位指正!

最后感谢RxSwift中文文档.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,214评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,307评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,543评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,221评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,224评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,007评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,313评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,956评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,441评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,925评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,018评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,685评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,234评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,240评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,464评论 1 261
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,467评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,762评论 2 345