Swift框架学习之-RxSwift入门篇

原创 2017-06-13

RxSwift 这个框架RP框架相信你应该不陌生了,在Objective-C中我们使用比较多的是ReactiveCocoa,从网上找到的入门知识比较零散,我现在就将从官方文档学习的笔记作为一个记录,也许对刚开始学习的你会有所帮助。如下就是我通过思维导图绘制的框架大致内容:

RxSwift框架

从上图可以看出,整个框架无外乎围绕着:

  • Observable: 可观察序列
  • Observer:观察者
  • Subjects:观察和观察序列桥梁
  • Disposable:你可以把它看做一个ARC
  • Scheduler:任务执行线程

下面我们就围绕着这几大模块介绍,首先介绍一下Observable:

Observable

在ReactiveX中,Observable<Element>代表的是一个可观察序列,从字面意思可以看出这是在观察者模式中的被观察者,它会向观察对象发送事件序列:

  • .Next(Element):新事件
  • .Error(error):带有异常的事件完成序列
  • .Complete():正常事件完结序列

Observable大致需要了解一下知识,我们将围绕到以下内容进行学习:


Observable需要学习

Observable 创建

  • Create:通过编程方式来创建可观察对象,创建后的Observeble是可以接收到.onNext、.onError、.onComplete
create
----------  示例代码  ----------
let observable = Observable<String>.create { (observer) -> Disposable in
   observer.onNext("hello RxSwift")
   observer.onCompleted()
   return Disposables.create {
       print("disposabled")
   }
}
observable.subscribe { (event) in
   print("Operator: create \(event)")
}.dispose()  

----------  运行结果  ----------
Operator: create next(hello RxSwift)
Operator: create completed
disposabled
  • Defer:延时创建Observable对象,当subscribe的时候才去创建,它为每一个Observer创建一个新的Observable,也就是说每个订阅者订阅的对象都是内容相同但是完全独立的序列;deferr采用一个Factory函数型作为参数,Factory函数返回的是Observable类型。这也是其延时创建Observable的主要实现
defer
----------  示例代码  ----------
let defObservable = Observable<String>.deferred { () -> Observable<String> in
    return Observable.create({ (observer) -> Disposable in
        observer.onNext("create")
        observer.onCompleted()
        return Disposables.create {
            print("disposed")
        }
    })
}

defObservable.subscribe { (event) in
    print("one  --   \(event)")
}

defObservable.subscribe { (event) in
    print("two  --   \(event)")
}

----------  运行结果  ----------
one  --   next(create)
one  --   completed
disposed
two  --   next(create)
two  --   completed
disposed

通过上述例子,相信你已经看出来了在前面说到的:defer创建会为每个Observer创建一个新的独立的Observable,并且他们具有相同内容,但是好像并没有体现出defer延迟创建,那么请你注意下面这两个例子的对比

---------- 示例非defer,这里可能会用到后面讲的内容 ----------
var value: String?
let observable = Observable<String>.from(optional: value)
value = "hello RxSwift"
observable.subscribe { (event) in
    print(event)
}.dispose()

---------- 运行结果 ----------
completed

上述结果并没有像我们想象中的那样也会打印出 onNext事件,这个是为什么呢? 因为在我们订阅的时候,数据未必已经初始化完成,现在我们把这个例子使用defer重新测试一下:

---------- 示例,通过defer改写上面 ----------
var value: String?
let observable = Observable<String>.deferred { () -> Observable<String> in
    return Observable<String>.from(optional: value)
}
value = "hello RxSwift"
observable.subscribe { (event) in
    print(event)
}.dispose()

---------- 运行结果 ---------
next(hello RxSwift)
completed

看到了如期打印出来的onNext结果,这可能才会是你想达到的效果,具体defer还是需要看使用场景,只有在场景中慢慢体会。

  • Of / From :这两个方法都是把一个对象或者数据转换为可观察序列,这在你使用Swift中的SequenceType时很有用
of/from
---------- of 示例 ----------
Observable<String>.of("hello", "RxSwift").subscribe { (event) in
    print("operator: of     \(event)")
}.dispose() 

---------- 运行结果 ----------
operator: of     next(hello)
operator: of     next(RxSwift)
operator: of     completed
---------- from 示例 ----------
Observable<String>.from(["hello", "RxSwift"]).subscribe { (event) in
    print("operator: from    \(event)")
}.dispose()

----------- 运行结果 ----------
operator: from    next(hello)
operator: from    next(RxSwift)
operator: from    completed
  • Just:将一个对象或者一个Sequence转换为 一个 可观察序列,请注意这里与From是完全不相同的:From是转换为一个或者多个可观察序列(这取决于你是要将一个还是一个序列进行转换)。也就是说Just只能包含一个观察序列,请注意与上面例子结果进行对比
just
---------- Just 示例对比 ----------
Observable<String>.from(["hello", "RxSwift"]).subscribe { (event) in
    print("operator: just    \(event)")
}
        
Observable<Array<String>>.just(["hello", "RxSwift"]).subscribe { (event) in
    print("operator: just    \(event)")
}

--------- 运行结果 ----------
operator: from    next(hello)
operator: from    next(RxSwift)
operator: from    completed
operator: just    next(["hello", "RxSwift"])
operator: just    completed
  • Interval:创建一个可观察序列,以特定的时间间隔释放一系列整数(E -> Int/NSInteger)
----------- 示例 ---------
Observable<Int>.interval(1, scheduler: MainScheduler.instance)
    .take(3)   // 发送3次Next,后面再讲到此方法
    .subscribe { (event) in
        print("operator: interval     \(event)")
 }

----------- 运行结果 ---------
operator: interval     next(0)
operator: interval     next(1)
operator: interval     next(2)
operator: interval     completed
  • Range:创建一个可观察到的,发射特定序列整数的范围(E -> Int/NSInteger)
range
---------- 示例 ----------
Observable<Int>.range(start: 1, count: 5).subscribe { (event) in
    print("operator: range    \(event)")
}.dispose()

---------- 运行结果 ---------
operator: range    next(1)
operator: range    next(2)
operator: range    next(3)
operator: range    next(4)
operator: range    next(5)
operator: range    completed
  • Repeat: 创建一个可多次发送特定item的Observable
repeat
---------- 示例 ----------
Observable<String>.repeatElement("hello RxSwift")
    .take(3)
    .subscribe { (event) in
         print("operator: repeat      \(event)")
}.dispose()

---------- 运行结果 --------
operator: repeat      next(hello RxSwift)
operator: repeat      next(hello RxSwift)
operator: repeat      next(hello RxSwift)
operator: repeat      completed
  • Start:
start
  • Timer:在指定的时间后,发送一个特定的Item (E -> Int/NSInteger),请注意这里与Interval的区别(Interval是发送一系列特定Item,而Timer只会发送一个)
timer
---------- 示例 ---------
Observable<Int>.timer(1, scheduler: MainScheduler.instance)
    .subscribe { (event) in
        print("operator: timer     \(event)")
}

---------- 运行结果 ---------
operator: timer     next(0)
operator: timer     completed
  • Empty: 只会发送一个Complete事件
empty创建
---------- 示例 ----------
Observable<Int>.empty().subscribe { (event) in
    print("operator: empty      \(event)")
}.dispose()

---------- 运行结果 ----------
operator: empty      completed
  • Never:你将不会收到任何事件,并且它将永远不会终止
never创建
  • Generate:其实是实现了一个迭代器的效果,他有三个参数,第一个是初始值,第二个是条件(满足条件之后就会继续执行.Next事件,第三个是迭代器,每次都会返回一个E类型,知道不满足条件为止
---------- 示例 ----------
Observable<Int>.generate(initialState: 0, condition: { (element: Int) -> Bool in
    return element < 10
}, iterate: { element -> Int in
    return element + 3
}).subscribe { (event) in
    print("operator: generate   \(event)")
}.dispose()

----------- 运行结果 ----------
operator: generate   next(3)
operator: generate   next(6)
operator: generate   next(9)
operator: generate   completed

上述例子表示的是:我有一个初始变量0, 此时满足条件 < 10 ,那么就会执行迭代器,每次+3,直到不满足条件为止

Observable 变换

  • Buffer:定期的将需要发射的Items手机到一个buffer的包中,分批次的发射这些包,而不是一次发射一个Item:例如你有[1, 2, 3, 4] ,你可以一次发射一个,也可以一次发射两个Item或者三个...,你需要仔细的观察下面的输出结果,如果需要更好的理解还请你敲一遍代码
buffer
---------- 示例1 一次发射1个Item事件 --------
Observable<Int>.of(1, 2, 3, 4)
    .buffer(timeSpan: 1, count: 1, scheduler: MainScheduler.instance)
    .subscribe { (event) in
        print("operator: buffer    \(event)")
    }.dispose()

---------- 运行结果 ---------
operator: buffer    next([1])
operator: buffer    next([2])
operator: buffer    next([3])
operator: buffer    next([4])
operator: buffer    next([])
operator: buffer    completed
---------- 示例2 一次发射3个Item事件 --------
Observable<Int>.of(1, 2, 3, 4)
    .buffer(timeSpan: 1, count: 3, scheduler: MainScheduler.instance)
    .subscribe { (event) in
        print("operator: buffer    \(event)")
    }.dispose()

---------- 运行结果 ---------
operator: buffer    next([1, 2, 3])
operator: buffer    next([4])
operator: buffer    completed

还可以有其他多个不同发射Item事件,请仔细体会上述结果

  • Window:与Buffer类似,但是每次发射的不是Item,而是Observables序列(请注意与Buffer的结果比较):
window
------------ 示例 -----------
Observable<Int>.of(1, 2, 3, 4)
    .window(timeSpan: 1, count: 3, scheduler: MainScheduler.instance)
    .subscribe { (event) in
        print("operator: window     \(event)")
        // 这里event的element 是一个Observable
        event.element?.subscribe({ (event) in
            print("operator: window - subObservables    \(event)")
        }).dispose()
    }.dispose()

------------ 运行结果 -----------
operator: window     next(RxSwift.AddRef<Swift.Int>)
operator: window - subObservables    next(1)
operator: window - subObservables    next(2)
operator: window - subObservables    next(3)
operator: window - subObservables    completed
operator: window     next(RxSwift.AddRef<Swift.Int>)
operator: window - subObservables    next(4)
operator: window - subObservables    completed
operator: window     completed
  • FlatMap:将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable。
flatMap

FlatMap操作符使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后FlatMap合并这些Observables发射的数据,最后将合并后的结果当做它自己的数据序列发射

这个方法是很有用的,例如,当你有一个这样的Observable:它发射一个数据序列,这些数据本身包含Observable成员或者可以变换为Observable,因此你可以创建一个新的Observable发射这些次级Observable发射的数据的完整集合

--------- 示例1 ---------
// 我需要在每一个Item后跟一个新的Item叫做RxSwift
Observable<Int>.of(0, 1, 2)
    .flatMap { (element: Int) -> Observable<String>  in
        return Observable<String>.of("\(element)", "RxSwift")
    }
    .subscribe { (event) in
        print("operator: flatMap     \(event)")
    }.dispose()

--------- 运行结果 -------
operator: flatMap     next(0)
operator: flatMap     next(RxSwift)
operator: flatMap     next(1)
operator: flatMap     next(RxSwift)
operator: flatMap     next(2)
operator: flatMap     next(RxSwift)
operator: flatMap     completed
  • GroupBy:将一个Observable分拆为一些Observables集合,它们中的每一个发射原始Observable的一个子序列
groupBy
----------- 示例 --------
// 我需要将奇数偶数分成两组
Observable<Int>.of(0, 1, 2, 3, 4, 5)
    .groupBy(keySelector: { (element) -> String in
        return element % 2 == 0 ? "偶数" : "基数"
    })
    .subscribe { (event) in
        switch event {
        case .next(let group):
            group.asObservable().subscribe({ (event) in
                print("key: \(group.key)     \(event)")
            })
        default:
            print("")
        }
    }
    .dispose()

--------- 运行结果 ---------
key: 基数     next(1)
key: 偶数     next(2)
key: 基数     next(3)
key: 偶数     next(4)
key: 基数     next(5)
key: 偶数     completed
key: 基数     completed
  • Map:通过一个闭包将原来的序列转换为一个新序列的操作
map
---------- 示例 ----------
Observable<String>.of("John", "Tony", "Tom")
    .map { return "hello " + $0}
    .subscribe { (event) in
        print("operator: map     \(event)")
    }.dispose()

---------- 运行结果 ----------
operator: map     next(hello John)
operator: map     next(hello Tony)
operator: map     next(hello Tom)
operator: map     completed
  • Scan:从字面意思可以看出是扫描,也就是说该方法会给出一个初始值(seed),每次通过一个函数将上一次的结果与序列中的Item进行处理,每处理完成都会发射.Next事件
scan
----------- 示例 ----------
Observable<String>.of("Rx", "Swift")
    .scan("hello ") { (acum, element) in
        return acum + element
    }.subscribe { (event) in
        print("operator: scan      \(event)")
    }.dispose()

---------- 运行结果 ----------
operator: scan      next(hello Rx)
operator: scan      next(hello RxSwift)
operator: scan      completed
  • Reduce:与上述Scan类似,都是初始一个Seed,每次通过函数将上一次的结果与序列中的Item进行处理,但是唯一不同的一点是,只会在最后发射一次.Next事件,将其拿来作数学计算很有用,这个我们将会在后面讲到 (** 请注意与上述Scan的结果比较**)
reduce
---------- 示例 ----------
Observable<String>.of("Rx", "Swift")
    .reduce("hello ") { (accum, element) -> String in
        return accum + element
    }.subscribe { (event) in
        print("operator: reduce    \(event)")
    }.dispose()

---------- 运行结果 ----------
operator: reduce    next(hello RxSwift)
operator: reduce    completed

Observable 过滤器

  • Debounce:在规定的时间内过滤Item,如下图:如果debounce开启的时候此时2、3、4 Item正好到来,那么将无法收到它们的任何时间
debounce
----------- 示例 ----------
Observable<Int>.of(1, 2, 3, 4)
    .debounce(1, scheduler: MainScheduler.instance)
    .subscribe { (event) in
        print("operator: debounce     \(event)")
    }.dispose()

----------- 运行结果 ----------
operator: debounce     next(4)
operator: debounce     completed
  • Distinct:过滤掉可观察到的重复Item(相信你在使用数据库的时候回经常使用到distinct),在Swift中使用的是distinctUntilChanged,表示如果发射的事件与上一次不相同那么才会发射此次事件
distinctUntilChanged
---------- 示例 ----------
Observable<Int>.of(1, 2, 2, 2, 3)
    .distinctUntilChanged()
    .subscribe { (event) in
        print("operator: distict     \(event)")
    }.dispose()

---------- 运行结果 ----------
operator: distict     next(1)
operator: distict     next(2)
operator: distict     next(3)
operator: distict     completed
  • ElementAt:发射第 N 个Item
elementAt
----------- 示例 ----------
Observable<Int>.of(1, 2, 2, 3, 4)
    .elementAt(5)
    .subscribe { (event) in
        print("operator: elementAt    \(event)")
    }.dispose()

----------- 运行结果 -----------
operator: elementAt    next(3)
operator: elementAt    completed
  • Filter:仅发射谓词测试通过的Items
filter
----------- 示例 -----------
Observable<Int>.of(9, 10, 11, 12)
    .filter { (element) -> Bool in
        element > 10
    }.subscribe { (event) in
        print("operator: filter    \(event)")
    }.dispose()

----------- 运行结果 -----------
operator: filter    next(11)
operator: filter    next(12)
operator: filter    completed
  • Skip:发射第N(包含N)之后的Items
skip
----------- 示例 -----------
Observable<Int>.of(9, 10, 11, 12)
    .skip(2)
    .subscribe { (event) in
        print("operator: skip    \(event)")
    }.dispose()

----------- 运行结果 ---------
operator: skip    next(11)
operator: skip    next(12)
operator: skip    completed
  • Take: 发射第N(不包含N)之前的Items,与Skip相反效果
take
----------- 示例 -----------
Observable<Int>.of(9, 10, 11, 12)
    .take(2)
    .subscribe { (event) in
        print("operator: take    \(event)")
    }.dispose()

---------- 运行结果 -----------
operator: take    next(9)
operator: take    next(10)
operator: take    completed
  • TakeLast: 发射第N(包含N)之后的Items,与Skip相同效果
take
----------- 示例 -----------
Observable<Int>.of(9, 10, 11, 12)
    .takeLast(2)
    .subscribe { (event) in
        print("operator: takeLast    \(event)")
    }.dispose()

---------- 运行结果 -----------
operator: takeLast    next(11)
operator: takeLast    next(12)
operator: takeLast    completed

结合多个Observables

  • Merge:将多个序列的Items合并为一个序列的Items
merge
----------- 示例 ----------
let observable1 = Observable<Int>.of(1, 3, 5)
let observable2 = Observable<Int>.of(2, 4)
Observable.merge(observable1, observable2)
    .subscribe { (event) in
        print("operator: merge      \(event)")
    }.dispose()

----------- 运行结果 ----------
operator: merge      next(1)
operator: merge      next(2)
operator: merge      next(3)
operator: merge      next(4)
operator: merge      next(5)
operator: merge      completed
  • StartWith:在发射序列Items前新增一个Item
startWith
----------- 示例 ----------
Observable<String>.of("  ", "RxSwift", "!")
    .startWith("hello")
    .reduce("") { (accum, element) -> String in
        return accum + element
    }.subscribe { (event) in
        print("operator: startWith    \(event)")
    }.dispose()

----------- 运行结果 ----------
operator: startWith    next(hello  RxSwift!)
operator: startWith    completed
  • Switch:当你的序列是一个事件序列的序列 (Observable<Observable<T>>) 的时候(也即是二维序列),可以使用 switch 将序列的序列转换成一维,并且在出现新的序列的时候,自动切换到最新的那个序列上。也就是说会将前一个序列未发射的Item自动取消掉
switch
----------- 示例 ----------
let var1 = Variable(0)
let var2 = Variable(100)
        
// 序列的序列
let variable = Variable(var1.asObservable())
        
variable.asObservable()
    .switchLatest()
    .subscribe { (event) in
        print("operator: switchLatest   \(event)")
    }
        
var1.value = 1
        
variable.value = var2.asObservable()
var2.value = 200

----------- 运行结果 ----------
operator: switchLatest   next(0)
operator: switchLatest   next(1)
operator: switchLatest   next(100)
operator: switchLatest   next(200)
operator: switchLatest   completed
  • Zip:将多个序列的Items进行一一合并,但是需要注意的是,它会等到Item对其后合并,未对齐的会舍弃
zip
---------- 示例 -----------
let observable1 = Observable<Int>.of(1, 2, 3, 4, 5)
let observable2 = Observable<String>.of("A", "B", "C", "D")
        
Observable<String>.zip(observable1, observable2) { (e1: Int, e2: String) -> String in
    "\(e1)\(e2)"
}.subscribe { (event) in
    print("operator: zip     \(event)")
}.dispose()

---------- 运行结果 ----------
operator: zip     next(1A)
operator: zip     next(2B)
operator: zip     next(3C)
operator: zip     next(4D)
operator: zip     completed
  • CombineLatest:如果存在两条事件队列,需要同时监听,那么每当有新的事件发生的时候,combineLatest 会将每个队列的最新的一个元素进行合并。类似于zip,但是只有当原始的Observable中的每一个都发射了一条数据时zip才发射数据。CombineLatest则在原始的Observable中任意一个发射了数据时发射一条数据。当原始Observables的任何一个发射了一条数据时,CombineLatest使用一个函数结合它们最近发射的数据,然后发射这个函数的返回值
combineLatest
---------- 示例 ----------
let observable1 = Observable<Int>.of(1, 2, 3, 4, 5)
let observable2 = Observable<String>.of("A", "B", "C", "D")
Observable<String>.combineLatest(observable1, observable2, resultSelector: { (e1: Int, e2: String) -> String in
        "\(e1)\(e2)"
    }).subscribe { (event) in
        print("operator: combine    \(event)")
    }.dispose()

----------- 运行结果 ----------
operator: combine    next(1A)
operator: combine    next(2A)
operator: combine    next(2B)
operator: combine    next(3B)
operator: combine    next(3C)
operator: combine    next(4C)
operator: combine    next(4D)
operator: combine    next(5D)
operator: combine    completed

错误处理

  • Catch:在收到序列的异常事件时,通过返回另一个序列来持续发送非error事件
catchError
----------- 示例 -----------
Observable<UInt8>.create { (observer) -> Disposable in
        observer.onNext(0)
        observer.onError(CustomError())   // 这里自定义了一个Error
        return Disposables.create()
    }.catchError { (error) -> Observable<UInt8> in
        print(error)
        return Observable<UInt8>.of(1, 2)
    }.subscribe { (event) in
        print("operator: catchError    \(event)")
    }.dispose()

----------- 运行结果 ----------
operator: catchError    next(0)
RxDemo.(CustomError in _53A9E5DF100B211646D927F0B28DE79B)
operator: catchError    next(1)
operator: catchError    next(2)
operator: catchError    completed
  • Retry:出现错误事件后,重新发送所有事件信息
retry
---------- 示例 -----------
Observable<UInt8>.create { (observer) -> Disposable in
        observer.onNext(0)
        observer.onError(CustomError())
        return Disposables.create()
    }.retry(3)  // 重复三次
    .subscribe { (event) in
        print("operator: retry    \(event)")
    }.dispose()

----------- 运行结果 -----------
operator: retry    next(0)
operator: retry    next(0)
operator: retry    next(0)
operator: retry    error(RxDemo.(CustomError in _53A9E5DF100B211646D927F0B28DE79B))

Observable实用操作

  • Delay:延迟发射事件
delay
------------ 示例 ------------
print("start time: \(Date())")
Observable<Int>.of(1, 2, 1)
    .delay(1, scheduler: MainScheduler.instance)
    .subscribe { (event) in
        if event.isCompleted {
            print("end time: \(Date())")
        }
        print("operator: delay    \(event)")
    }

------------ 运行结果 -----------
start time: 2017-06-12 06:10:32 +0000
operator: delay    next(1)
operator: delay    next(2)
operator: delay    next(1)
end time: 2017-06-12 06:10:33 +0000
operator: delay    completed
  • Do: 在一个序列的每个事件执行之前添加一个执行动作
do
---------- 示例 ----------
Observable<Int>.of(1, 2, 1)
    .do(onNext: { (_) in
        print("operator: do   previous next")
    }, onError: { (_) in
        print("operator: do   previous error")
    }, onCompleted: {
        print("operator: do   previous complete")
    }, onSubscribe: nil, onSubscribed: nil, onDispose: nil)
    .subscribe { (event) in
        print("operator: delay    \(event)")
    }.dispose()

---------- 运行结果 ---------
operator: do   previous next
operator: delay    next(1)
operator: do   previous next
operator: delay    next(2)
operator: do   previous next
operator: delay    next(1)
operator: do   previous complete
operator: delay    completed
  • ObserveOn:Observer在指定Scheduler中观察序列事件
observeOn
----------- 示例 --------
Observable<Int>.of(1)
    .observeOn(ConcurrentDispatchQueueScheduler.init(queue: DispatchQueue(label: "test")))
    .subscribe { (event) in
        print("operator: observeOn  \(Thread.current.isMainThread)    \(event)")
    }

---------- 运行结果 ----------
operator: observeOn  false    next(1)
operator: observeOn  false    completed
  • Subscribe:订阅事件.onNext 、.onError、.onCompleted
----------- 示例 ---------
Observable<String>.of("hello RxSwift").subscribe(onNext: { (element) in
        print("operator: onNext    \(element)")
    }, onError: { (error) in
        print("operator: onError   \(error)")
    }, onCompleted: {
        print("operator: onCOmpleted")
    }, onDisposed: nil)

----------- 运行结果 ----------
operator: onNext    hello RxSwift
operator: onCOmpleted
  • SubscribeOn:在指定的Scheduler中操作,参考ObserveOn
subscribeOn
----------- 示例 ---------
Observable<Int>.of(1)
    .subscribeOn(MainScheduler.instance)
    .subscribe { (event) in
        print("operator: observeOn  \(Thread.current.isMainThread)    \(event)")
    }

----------- 运行结果 ----------
operator: subscribeOn  true    next(1)
operator: subscribeOn  true    completed
  • TimeOut:一个序列在指定时间内未发射完成所有事件,那么将会进入.onError
timeOut
---------- 示例 --------
Observable<Int>.of(1)
    .delay(2, scheduler: MainScheduler.instance)
    .timeout(1, scheduler: MainScheduler.instance)
    .subscribe { (event) in
        print("operator: timeout    \(event)")
    }

---------- 运行结果 ----------
operator: timeout    error(Sequence timeout.)

条件运算符和布尔类型运算符

  • DefaultIfEmpty:如果是序列中没有任何Item,那么给定一个default
---------- 示例 -----------
Observable<Int>.empty()
    .ifEmpty(default: 0)
    .subscribe({ (event) in
        print("operator: ifEmpty    \(event)")
    })

----------- 运行结果 ---------
operator: ifEmpty    next(0)
operator: ifEmpty    completed
  • SkipUntil:丢弃掉第一个序列的所有Items,直到第二个序列的Item出现
skipUntil
--------- 示例 ----------
Observable<String>.of("A", "B", "C")
    .skipUntil(Observable<String>.of("D"))
    .subscribe { (event) in
        print("operator: skipUntil    \(event)")
    }

--------- 运行结果 --------
operator: skipUntil    next(B)
operator: skipUntil    next(C)
operator: skipUntil    completed
  • SkipWhile:丢弃掉所有的Items,直到满足某个不满足条件的Item出现
skipWhile
---------- 示例 ---------
Observable<String>.of("AD", "BD", "CD")
    .skipWhile({ (element) -> Bool in
        element.contains("A")
    })
    .subscribe { (event) in
        print("operator: skipWhile    \(event)")
    }.dispose()

---------- 运行结果 ---------
operator: skipWhile    next(BD)
operator: skipWhile    next(CD)
operator: skipWhile    completed
  • TakeUntil:取得第一个序列所有Items,直到第二个序列发射Item或者终止
takeUntil
---------- 示例 --------
Observable<Int>.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
    .takeUntil(Observable<Int>.empty())
    .subscribe { (event) in
        print("operator: takeUntil    \(event)")
    }.dispose()
  • TakeWhile:取得第一个序列的所有Items,直到出现不满足条件的Item (请仔细体会与SkipWhile的不同之处)
takeWhile
----------- 示例 ---------
Observable<Int>.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
    .takeWhile({ (element) -> Bool in
        element < 5
    })
    .subscribe { (event) in
        print("operator: takeUntil    \(event)")
    }.dispose()

--------- 运行结果 -------- 
operator: takeWhile    next(1)
operator: takeWhile    next(2)
operator: takeWhile    next(3)
operator: takeWhile    next(4)
operator: takeWhile    completed

连接操作

  • Publish:将一个普通的序列转换为可连接序列
publish
  • RefCount:使一个可连接序列表现为一个普通序列
refCount
  • Replay:保证所有的Observers所观察到的事件Items都是相同的,即使它们已经在序列事件已经发射之后才订阅的 (具体使用将在下一章节的Subjects中讲到)
replay

Observer

  • Observer作为观察者,用于来订阅Observable,它可以用来通知.onNext、.onError、.onCompleted
--------- 示例 --------
Observable<String>.create { (observer) -> Disposable in
    observer.on(.next("hello RxSwift"))
    observer.onError(CustomError())
        return Disposables.create {
            print("disposable")
        }
    }.subscribe { (event) in
        print(event)
    }.dispose()

--------- 运行结果 ---------
next(hello RxSwift)
error(RxDemo.(CustomError in _53A9E5DF100B211646D927F0B28DE79B))
  • AnyObserver使用:
--------- 示例 ----------
let observer1 = AnyObserver<Int> { (event) in
    print(event)
}
        
let observer2 = AnyObserver(observer1)
        
Observable<Int>.of(1, 2, 3)
    .subscribe(observer2)
    .dispose()

---------- 运行结果 --------
next(1)
next(2)
next(3)
completed

Subject

Subject在ReactiveX的一些实现中扮演了一种桥梁或者代理的角色,它既可以作为Observer也可以作为Observable来使用。作为观察者来说它可以订阅一个或者多个可观察序列,作为可观察者来说它可以通过Items的reemitting来观察,并且还可以发射新的Items事件,我们将从如下四个Subject进行学习:

image
  • AnySubject:仅仅只发送订阅之后的最后一个Item以及.onCompleted,如果出现错误,那么仅仅将只发送.onError
image
错误
  • ReplaySubject:如果一个Observer订阅了ReplaySubject,那么它将收到订阅前(在bufferSize大小内)以及订阅后的所有Items,不管Observer何时订阅的
image
---------- 示例 -----------
let replay = ReplaySubject<Int>.create(bufferSize: 3)

replay.onNext(0)
replay.onNext(1)
replay.onNext(2)
replay.onNext(3)
replay.subscribe { (event) in
    print("Replay  first       \(event)")
}
replay.onNext(4)

replay.subscribe { (event) in
    print("Replay  second       \(event)")
}
replay.onNext(5)
replay.onCompleted()

---------- 运行结果 ----------
Replay  first       next(1)
Replay  first       next(2)
Replay  first       next(3)
Replay  first       next(4)
Replay  second       next(2)
Replay  second       next(3)
Replay  second       next(4)
Replay  first       next(5)
Replay  second       next(5)
Replay  first       completed
Replay  second       completed

请仔细查看上面的运行结果你可以发现:上述replay被两次订阅了之后都会收到订阅的Items,但是收到订阅之前的Items是有size限制的,这就与你设置的bufferSize大小有关了。订阅之后的Items是可以完全收到的

  • PublishSubject:与BehaviorSubject有略微不同,从字面意思可以看出Publish是发布,那么就意味着如果一个Observer订阅了PublishSubject,它就将收到订阅之后的所有事件,但是不包括订阅之前的事件(你也可以把他看成一个bufferSize=0的ReplaySubject)
image
----------- 示例 ---------
let publish = PublishSubject<Int>()
        
publish.onNext(1)
publish.onNext(2)
publish.subscribe { (event) in
    print("Publish       \(event)")
}
publish.onNext(3)
publish.onNext(4)
publish.onCompleted()

---------- 运行结果 ---------
// 从结果可以看出,订阅之前的所有Item是收不到的,请体会与BehaviorSubject之间差异
Publish       next(3)   
Publish       next(4)
Publish       completed
  • BehaviorSubject:如果一个Observer订阅了BehaviorSubject之后,那么它就将收到最近的事件,并且也能够收到在订阅之后发射的事件Item (你也可以把它看成一个bufferSize=1 的ReplaySubject)


    image
----------- 示例 ----------
let behavior = BehaviorSubject<Int>(value: 0)
        
behavior.onNext(1)
behavior.onNext(2)
behavior.onNext(3)
let disposable = behavior.subscribe { (event) in
    print("Behavior       \(event)")
}
behavior.onNext(4)
behavior.onCompleted()

---------- 运行结果 -----------
Behavior       next(3)      // 收到最近Item事件
Behavior       next(4)      // 收到订阅之后所有事件
Behavior       completed
  • Variable:与BehaviorSubject类似,但是比较特殊的一点就是:在.onError之后它不会终止,只会等待到.onCompleted之后才会被deallocated

注意:至此Subject基本就是这些特殊内容,对于这些特殊内容我建议你敲代码,看结果来仔细体会它们之间的不同与相同之处

Scheduler

对于Scheduler来说,我们需要了解Concurrent(并行)、Serial(串行)Scheduler就可以了,如下为所有Scheduler脑图:

image
  • Scheduler:当你在多线程的时候你会用到这个Scheduler

在上面的章节相信你已经熟悉了observeOn/subscribeOn,这里我们就直接上来个例子介绍:

----------- 示例 -----------
Observable<Int>.of(1, 2, 3, 4)
            .observeOn(SerialDispatchQueueScheduler(internalSerialQueueName: "test"))
    .map { (element) -> Int in
        print("scheduler:map  -->  Main Thread: \(Thread.current.isMainThread)")
                return element * 2
    }
    .subscribeOn(MainScheduler.instance)
    .observeOn(MainScheduler.instance)
    .subscribe { (event) in
        print("scheduler:subscribe  -->  Main Thread: \(Thread.current.isMainThread)")
        print("scheduler     \(event)")
    }

---------- 运行结果 -----------
scheduler:map  -->  Main Thread: false
scheduler:map  -->  Main Thread: false
scheduler:map  -->  Main Thread: false
scheduler:map  -->  Main Thread: false
scheduler:subscribe  -->  Main Thread: true
scheduler     next(2)
scheduler:subscribe  -->  Main Thread: true
scheduler     next(4)
scheduler:subscribe  -->  Main Thread: true
scheduler     next(6)
scheduler:subscribe  -->  Main Thread: true
scheduler     next(8)
scheduler:subscribe  -->  Main Thread: true
scheduler     completed

Disposable

什么时候使用Disposable呢?我个人理解就是你订阅了一个可观察序列,如果有特殊需求你需要提前取消订阅时使用。也就是说Disposable是用来取消订阅的一个工具

image
  • 创建:通过Disposables工具创建
let dis1 = Disposables.create()

let dis2 = Disposables.create {
    print("在Dispose之前所做一些工作")
}

let dis3 = Disposables.create([dis1, dis2])
  • dispose:通过.dispose()取消或者添加到DisposeBag(你可以将它看成一个非ARC机制下的AutoReleasePool
let disposable = Observable<Int>.of(0, 1, 2)
    .subscribe { (event) in
    print(event)
}
        
disposable.dispose()
        
// 或者
disposable.addDisposableTo(DisposeBag())

至此,RxSwift的入门笔记到此结束,接下来我会继续介绍RxSwift使用详解篇相关学习笔记,敬请关注

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342

推荐阅读更多精彩内容