RxSwift 简单理解

RxSwift核心

1. Observable - 可被监听的序列

Observable 可以用于描述元素异步产生的序列。这样我们生活中许多事物都可以通过它来表示: Observable<Double> 温度
例如:你可以将温度看作是一个序列,然后监测这个温度值,最后对这个值做出响应。例如:当室温高于 33 度时,打开空调降温

特征序列
  • Single

Single 是 Observable 的另外一个版本。不像 Observable 可以发出多个元素,它要么只能发出一个元素,要么产生一个 error 事件。
A.发出一个元素,或一个 error 事件
B.不会共享状态变化

  • Completable

Completable 是 Observable 的另外一个版本。不像 Observable 可以发出多个元素,它要么只能产生一个 completed 事件,要么产生一个 error 事件。
A.发出零个元素
B.发出一个 completed 事件或者一个 error 事件
C.不会共享状态变化

  • Maybe

Maybe 是 Observable 的另外一个版本。它介于 Single 和 Completable 之间,它要么只能发出一个元素,要么产生一个 completed 事件,要么产生一个 error 事件。
A.发出一个元素或者一个 completed 事件或者一个 error 事件
B.不会共享状态变化”

  • Driver

Driver(司机?) 是一个精心准备的特征序列。它主要是为了简化 UI 层的代码。不过如果你遇到的序列具有以下特征,你也可以使用它:
A.不会产生 error 事件
B.一定在 MainScheduler 监听(主线程监听)
C.共享状态变化

*ControlEvent

ControlEvent 专门用于描述 UI 控件所产生的事件,它具有以下特征:
A.不会产生 error 事件
B.一定在 MainScheduler 订阅(主线程订阅)
C.一定在 MainScheduler 监听(主线程监听)
D.共享状态变化

2. Observer - 观察者

观察者 是用来监听事件,然后它需要这个事件做出响应。例如:弹出提示框就是观察者,它对点击按钮这个事件做出响应。

例如:当室温高于 33 度时,打开空调降温
打开空调降温就是观察者 Observer<Double>。

创建观察者:
创建观察者最直接的方法就是在 Observable 的 subscribe 方法后面描述,事件发生时,需要如何做出响应。而观察者就是由后面的 onNext,onError,onCompleted的这些闭包构建出来的。

特征观察者
  • AnyObserver

AnyObserver 可以用来描叙任意一种观察者

*Binder

Binder 主要有以下两个特征:
A.不会处理错误事件
B.确保绑定都是在给定 Scheduler 上执行(默认 MainScheduler)
一旦产生错误事件,在调试环境下将执行 fatalError,在发布环境下将打印错误信息。

let observer: AnyObserver<Bool> = AnyObserver { [weak self] (event) in
    switch event {
    case .next(let isHidden):
        self?.usernameValidOutlet.isHidden = isHidden
    default:
        break
    }
}

//observer
usernameValid.bind(to: observer).disposed(by: disposeBag)
extension Reactive where Base: UIView {
  public var isHidden: Binder<Bool> {
      return Binder(self.base) { view, hidden in
          view.isHidden = hidden
      }
  }
}

usernameValid.bind(to: usernameValidOutlet.rx.isHidden).disposed(by: disposeBag)

3. Observable & Observer 既是可被监听的序列也是观察者

在我们所遇到的事物中,有一部分非常特别。它们既是可被监听的序列也是观察者。
例如:textField的当前文本。它可以看成是由用户输入,而产生的一个文本序列。也可以是由外部文本序列,来控制当前显示内容的观察者:

// 作为可被监听的序列
let observable = textField.rx.text
observable.subscribe(onNext: { text in show(text: text) })
// 作为观察者
let observer = textField.rx.text
let text: Observable<String?> = ...
text.bind(to: observer)
Subject

*AsyncSubject

AsyncSubject 将在源 Observable 产生完成事件后,发出最后一个元素(仅仅只有最后一个元素),如果源 Observable 没有发出任何元素,只有一个完成事件。那 AsyncSubject 也只有一个完成事件。
它会对随后的观察者发出最终元素。如果源 Observable 因为产生了一个 error 事件而中止, AsyncSubject 就不会发出任何元素,而是将这个 error 事件发送出来。

AsyncSubject.png
AsyncSubject.png
let disposeBag = DisposeBag()
let subject = AsyncSubject<String>()

subject
  .subscribe { print("Subscription: 1 Event:", $0) }
  .disposed(by: disposeBag)

subject.onNext("🐶")
subject.onNext("🐱")
subject.onNext("🐹")
subject.onCompleted()

输出结果:
Subscription: 1 Event: next(🐹)
Subscription: 1 Event: completed

*PublishSubject

PublishSubject 将对观察者发送订阅后产生的元素,而在订阅前发出的元素将不会发送给观察者。如果你希望观察者接收到所有的元素,你可以通过使用Observable 的 create 方法来创建 Observable,或者使用 ReplaySubject。
如果源 Observable 因为产生了一个 error 事件而中止, PublishSubject 就不会发出任何元素,而是将这个 error 事件发送出来。

PublishSubject.png
PublishSubject.png
let disposeBag = DisposeBag()
let subject = PublishSubject<String>()

subject
  .subscribe { print("Subscription: 1 Event:", $0) }
  .disposed(by: disposeBag)

subject.onNext("🐶")
subject.onNext("🐱")

subject
  .subscribe { print("Subscription: 2 Event:", $0) }
  .disposed(by: disposeBag)

subject.onNext("🅰️")
subject.onNext("🅱️")

输出结果:
Subscription: 1 Event: next(🐶)
Subscription: 1 Event: next(🐱)
Subscription: 1 Event: next(🅰️)
Subscription: 2 Event: next(🅰️)
Subscription: 1 Event: next(🅱️)
Subscription: 2 Event: next(🅱️)

*ReplaySubject

ReplaySubject 将对观察者发送全部的元素,无论观察者是何时进行订阅的。
这里存在多个版本的 ReplaySubject,有的只会将最新的 n 个元素发送给观察者,有的只会将限制时间段内最新的元素发送给观察者。
如果把 ReplaySubject 当作观察者来使用,注意不要在多个线程调用 onNext, onError 或 onCompleted。这样会导致无序调用,将造成意想不到的结果。

ReplaySubject.png
let disposeBag = DisposeBag()
let subject = ReplaySubject<String>.create(bufferSize: 1)

subject
  .subscribe { print("Subscription: 1 Event:", $0) }
  .disposed(by: disposeBag)

subject.onNext("🐶")
subject.onNext("🐱")

subject
  .subscribe { print("Subscription: 2 Event:", $0) }
  .disposed(by: disposeBag)

subject.onNext("🅰️")
subject.onNext("🅱️")

输出结果:
Subscription: 1 Event: next(🐶)
Subscription: 1 Event: next(🐱)
Subscription: 2 Event: next(🐱)
Subscription: 1 Event: next(🅰️)
Subscription: 2 Event: next(🅰️)
Subscription: 1 Event: next(🅱️)
Subscription: 2 Event: next(🅱️)

*BehaviorSubject

当观察者对 BehaviorSubject 进行订阅时,它会将源 Observable 中最新的元素发送出来(如果不存在最新的元素,就发出默认元素)。然后将随后产生的元素发送出来。如果源 Observable 因为产生了一个 error 事件而中止, BehaviorSubject 就不会发出任何元素,而是将这个 error 事件发送出来

98C97A0E-D2A0-4C84-964D-90A2BF88B2EC.png
413FA601-5878-433B-BDC1-FA57A4196385.png
let disposeBag = DisposeBag()
let subject = BehaviorSubject(value: "🔴")

subject
  .subscribe { print("Subscription: 1 Event:", $0) }
  .disposed(by: disposeBag)

subject.onNext("🐶")
subject.onNext("🐱")

subject
  .subscribe { print("Subscription: 2 Event:", $0) }
  .disposed(by: disposeBag)

subject.onNext("🅰️")
subject.onNext("🅱️")

subject
  .subscribe { print("Subscription: 3 Event:", $0) }
  .disposed(by: disposeBag)

subject.onNext("🍐")
subject.onNext("🍊")

输出结果:
Subscription: 1 Event: next(🔴)
Subscription: 1 Event: next(🐶)
Subscription: 1 Event: next(🐱)
Subscription: 2 Event: next(🐱)
Subscription: 1 Event: next(🅰️)
Subscription: 2 Event: next(🅰️)
Subscription: 1 Event: next(🅱️)
Subscription: 2 Event: next(🅱️)
Subscription: 3 Event: next(🅱️)
Subscription: 1 Event: next(🍐)
Subscription: 2 Event: next(🍐)
Subscription: 3 Event: next(🍐)
Subscription: 1 Event: next(🍊)
Subscription: 2 Event: next(🍊)
Subscription: 3 Event: next(🍊)
Variable

在 Swift 中我们经常会用 var 关键字来声明变量。RxSwift 提供的 Variable 实际上是 var 的 Rx 版本,你可以将它看作是 RxVar。
我们来对比一下 var 以及 Variable 的用法:

使用 var:

// 在 ViewController 中
var model: Model? = nil {
    didSet { updateUI(with: model) }
func updateUI(with model: Model?) { ... }
let model: Variable<Model?> = Variable(nil)

override func viewDidLoad() {
    super.viewDidLoad()

    model.asObservable()
        .subscribe(onNext: { [weak self] model in
            self?.updateUI(with: model)
        })
        .disposed(by: disposeBag)

func updateUI(with model: Model?) { ... }
func getModel() -> Model { ... }

第一种使用 var 的方式十分常见,在 ViewController 中监听 Model 的变化,然后刷新页面。
第二种使用 Variable 则是 RxSwift 独有的。Variable 几乎提供了 var 的所有功能。另外,加上一条非常重要的特性,就是可以通过调用 asObservable() 方法转换成序列。然后你可以对这个序列应用操作符,来合成其他的序列。所以,如果我们声明的变量需要提供 Rx 支持,那就选用 Variable 这个类型。

说明
Variable 封装了一个 BehaviorSubject,所以它会持有当前值,并且 Variable 会对新的观察者发送当前值。它不会产生 error 事件。Variable 在 deinit 时,会发出一个 completed 事件

Operator - 操作符
6800A280-ADA2-4F79-A87B-11891FCF0158.png

操作符可以帮助大家创建新的序列,或者变化组合原有的序列,从而生成一个新的序列。
我们之前在输入验证例子中就多次运用到操作符。例如,通过 map 方法将输入的用户名,转换为用户名是否有效。然后用这个转化后来的序列来控制红色提示语是否隐藏。我们还通过 combineLatest 方法,将用户名是否有效和密码是否有效合并成两者是否同时有效。然后用这个合成后来的序列来控制按钮是否可点击。
这里 map 和 combineLatest 都是操作符,它们可以帮助我们构建所需要的序列。现在,我们再来看几个例子:

Filter:


99EE92D6-D016-4605-BB95-BA1FF5BAD869.png

Map:


D382E434-EC82-438B-B50A-A4297FFA708E.png

5. Disposable - 可被清除的资源

Disposable - 可被清除的资源

通常来说,一个序列如果发出了 error 或者 completed 事件,那么所有内部资源都会被释放。如果你需要提前释放这些资源或取消订阅的话,那么你可以对返回的 可被清除的资源(Disposable) 调用 dispose 方法:

F5A2FE7F-5526-4347-A7AB-80A37D135B12.png
var disposable: Disposable?
override func viewWillAppear(_ animated: Bool) {
    super.viewWillAppear(animated)
    self.disposable = textField.rx.text.orEmpty
        .subscribe(onNext: { text in print(text) })
}

override func viewWillDisappear(_ animated: Bool) {
    super.viewWillDisappear(animated)
    self.disposable?.dispose()
}

调用 dispose 方法后,订阅将被取消,并且内部资源都会被释放。通常情况下,你是不需要手动调用 dispose 方法的。我们推荐使用 清除包(DisposeBag) 或者 takeUntil 操作符 来管理订阅的生命周期。

DisposeBag - 清除包

因为我们用的是 Swift ,所以我们更习惯于使用 ARC 来管理内存。那么我们能不能用 ARC 来管理订阅的生命周期了。答案是肯定了,你可以用 清除包(DisposeBag) 来实现这种订阅管理机制

404F9871-3ED0-4063-993F-3545B78FD9FD.png
var disposeBag = DisposeBag()

override func viewWillAppear(_ animated: Bool) {
    super.viewWillAppear(animated)

    textField.rx.text.orEmpty
        .subscribe(onNext: { text in print(text) })
        .disposed(by: self.disposeBag)
}

override func viewWillDisappear(_ animated: Bool) {
    super.viewWillDisappear(animated)

    self.disposeBag = DisposeBag()
}

当 清除包 被释放的时候,清除包 内部所有 可被清除的资源(Disposable) 都将被清除。

takeUntil
8E12A1D5-9ED6-4018-86A0-DC6D5FB1CF2F.png

另外一种实现自动取消订阅的方法就是使用 takeUntil 操作符,上面那个输入验证的演示代码也可以通过使用 takeUntil 来实现:

override func viewDidLoad() {
“super.viewDidLoad()

    ...

    _ = usernameValid
        .takeUntil(self.rx.deallocated)
        .bind(to: passwordOutlet.rx.isEnabled)

    _ = usernameValid
        .takeUntil(self.rx.deallocated)
        .bind(to: usernameValidOutlet.rx.isHidden)

    _ = passwordValid
        .takeUntil(self.rx.deallocated)
        .bind(to: passwordValidOutlet.rx.isHidden)

    _ = everythingValid
        .takeUntil(self.rx.deallocated)
        .bind(to: doSomethingOutlet.rx.isEnabled)
}

这将使得订阅一直持续到控制器的 dealloc 事件产生为止。

6. Schedulers - 调度器

13FA831F-BA09-4DF5-9D93-2385BF472352.png

Schedulers 是 Rx 实现多线程的核心模块,它主要用于控制任务在哪个线程或队列运行。

let rxData: Observable<Data> = ...

rxData.subscribeOn(ConcurrentDispatchQueueScheduler(qos: .userInitiated))
    .observeOn(MainScheduler.instance)
    .subscribe(onNext: { [weak self] data in
        self?.data = data
    })
    .disposed(by: disposeBag)

使用 subscribeOn

我们用 subscribeOn 来决定数据序列的构建函数在哪个 Scheduler 上运行。以上例子中,由于获取 Data 需要花很长的时间,所以用 subscribeOn 切换到 后台 Scheduler 来获取 Data。这样可以避免主线程被阻塞。

使用 observeOn

我们用 observeOn 来决定在哪个 Scheduler 监听这个数据序列。以上例子中,通过使用 observeOn 方法切换到主线程来监听并且处理结果。
一个比较典型的例子就是,在后台发起网络请求,然后解析数据,最后在主线程刷新页面。你就可以先用 subscribeOn 切到后台去发送请求并解析数据,最后用 observeOn 切换到主线程更新页面。

MainScheduler

MainScheduler 代表主线程。如果你需要执行一些和 UI 相关的任务,就需要切换到该 Scheduler 运行。

SerialDispatchQueueScheduler

SerialDispatchQueueScheduler 抽象了窜行 DispatchQueue。如果你需要执行一些窜行任务,可以切换到这个 Scheduler 运行。

ConcurrentDispatchQueueScheduler

ConcurrentDispatchQueueScheduler 抽象了并行 DispatchQueue。如果你需要执行一些并发任务,可以切换到这个 Scheduler 运行。

OperationQueueScheduler

OperationQueueScheduler 抽象了 NSOperationQueue。
它具备 NSOperationQueue 的一些特点,例如,你可以通过设置 maxConcurrentOperationCount,来控制同时执行并发任务的最大数量。

7. Error Handling - 错误处理

一旦序列里面产出了一个 error 事件,整个序列将被终止。RxSwift 主要有两种错误处理机制:

  • retry - 重试
  • catch - 恢复
retry - 重试

retry 可以让序列在发生错误后重试:

// 请求 JSON 失败时,立即重试,
// 重试 3 次后仍然失败,就将错误抛出

let rxJson: Observable<JSON> = ...

rxJson
    .retry(3)
    .subscribe(onNext: { json in
        print("取得 JSON 成功: \(json)")
    }, onError: { error in
        print("取得 JSON 失败: \(error)")
})
    .disposed(by: disposeBag)

以上的代码非常直接 retry(3) 就是当发生错误时,就进行重试操作,并且最多重试 3 次

retryWhen

如果我们需要在发生错误时,经过一段延时后重试,那可以这样实现:

let retryDelay: Double = 5  // 重试延时 5 秒

rxJson
    .retryWhen { (rxError: Observable<Error>) -> Observable<Int> in
        return Observable.timer(retryDelay, scheduler: MainScheduler.instance)
    }
    .subscribe(...)
    .disposed(by: disposeBag)

这里我们需要用到 retryWhen 操作符,这个操作符主要描述应该在何时重试,并且通过闭包里面返回的 Observable 来控制重试的时机:

闭包里面的参数是 Observable<Error> 也就是所产生错误的序列,然后返回值是一个 Observable。当这个返回的Observable 发出一个元素时,就进行重试操作。当它发出一个 error 或者 completed 事件时,就不会重试,并且将这个事件传递给到后面的观察者。

如果需要加上一个最大重试次数的限制:

// 请求 JSON 失败时,等待 5 秒后重试,
// 重试 4 次后仍然失败,就将错误抛出

let maxRetryCount = 4       // 最多重试 4 次
let retryDelay: Double = 5  // 重试延时 5 秒

rxJson
    .retryWhen { (rxError: Observable<Error>) -> Observable<Int> in
        return rxError.flatMapWithIndex { (error, index) -> Observable<Int> in
            guard index < maxRetryCount else {
                return Observable.error(error)
            }
        return Observable<Int>.timer(retryDelay, scheduler: MainScheduler.instance)
        }
    }
    .subscribe(...)
    .disposed(by: disposeBag)

我们用 flatMapWithIndex 这个操作符,因为它可以给我们提供错误的索引数 index。然后用这个索引数判断是否超过最大重试数,如果超过了,就将错误抛出。如果没有超过,就等待 5 秒后重试。

catchError - 恢复

catchError 可以在错误产生时,用一个备用元素或者一组备用元素将错误替换掉:

searchBar.rx.text.orEmpty
    ...
    .flatMapLatest { query -> Observable<[Repository]> in
        ...
        return searchGitHub(query)
            .catchErrorJustReturn([])
    }
    ...
    .bind(to: ...)
    .disposed(by: disposeBag)

我们开头的 Github 搜索就用到了catchErrorJustReturn。当错误产生时,就返回一个空数组,于是就会显示一个空列表页。

你也可以使用 catchError,当错误产生时,将错误事件替换成一个备选序列:

// 先从网络获取数据,如果获取失败了,就从本地缓存获取数据

let rxData: Observable<Data> = ...      // 网络请求的数据
let cahcedData: Observable<Data> = ...  // 之前本地缓存的数据

rxData
    .catchError { _ in cahcedData }
    .subscribe(onNext: { date in
        print("获取数据成功: \(date.count)")
    })
    .disposed(by: disposeBag)

常用操作符

  • catchError
    从一个错误事件中恢复,将错误事件替换成一个备选序列


    catchError

    catchError 操作符将会拦截一个 error 事件,将它替换成其他的元素或者一组元素,然后传递给观察者。这样可以使得 Observable 正常结束,或者根本都不需要结束。

let disposeBag = DisposeBag()

let sequenceThatFails = PublishSubject<String>()
let recoverySequence = PublishSubject<String>()

sequenceThatFails
    .catchError {
        print("Error:", $0)
        return recoverySequence
    }
    .subscribe { print($0) }
    .disposed(by: disposeBag)

sequenceThatFails.onNext("😬")
sequenceThatFails.onNext("😨")
sequenceThatFails.onNext("😡")
sequenceThatFails.onNext("🔴")
sequenceThatFails.onError(TestError.test)

recoverySequence.onNext("😊")

输出结果:
next(😬)
next(😨)
next(😡)
next(🔴)
Error: test
next(😊)

  • catchErrorJustReturn
    catchErrorJustReturn 操作符会将error 事件替换成其他的一个元素,然后结束该序列。
let disposeBag = DisposeBag()
let sequenceThatFails = PublishSubject<String>()

sequenceThatFails
    .catchErrorJustReturn("😊")
    .subscribe { print($0) }
    .disposed(by: disposeBag)

sequenceThatFails.onNext("😬")
sequenceThatFails.onNext("😨")
sequenceThatFails.onNext("😡")
sequenceThatFails.onNext("🔴")
sequenceThatFails.onError(TestError.test)

输出结果:
next(😬)
next(😨)
next(😡)
next(🔴)
next(😊)
completed
  • combineLatest
    当多个 Observables 中任何一个发出一个元素,就发出一个元素。这个元素是由这些 Observables 中最新的元素,通过一个函数组合起来的
combineLatest

combineLatest 操作符将多个 Observables 中最新的元素通过一个函数组合起来,然后将这个组合的结果发出来。这些源 Observables 中任何一个发出一个元素,他都会发出一个元素(前提是,这些 Observables 曾经发出过元素)。

let disposeBag = DisposeBag()
let first = PublishSubject<String>()
let second = PublishSubject<String>()

Observable.combineLatest(first, second) { $0 + $1 }
          .subscribe(onNext: { print($0) })
          .disposed(by: disposeBag)

first.onNext("1")
second.onNext("A")
first.onNext("2")
second.onNext("B")
second.onNext("C")
second.onNext("D")
first.onNext("3")
first.onNext("4")

输出结果:
1A
2A
2B
2C
2D
3D
4D
  • concat
    让两个或多个 Observables 按顺序串连起来
concat

concat 操作符将多个 Observables 按顺序串联起来,当前一个 Observable 元素发送完毕后,后一个 Observable 才可以开始发出元素。

concat 将等待前一个 Observable 产生完成事件后,才对后一个 Observable 进行订阅。如果后一个是“热” Observable ,在它前一个 Observable 产生完成事件前,所产生的元素将不会被发送出来。

startWith 和它十分相似。但是startWith不是在后面添加元素,而是在前面插入元素。

merge 和它也是十分相似。merge并不是将多个 Observables 按顺序串联起来,而是将他们合并到一起,不需要 Observables 按先后顺序发出元素。

let disposeBag = DisposeBag()

let subject1 = BehaviorSubject(value: "🍎")
let subject2 = BehaviorSubject(value: "🐶")

let variable = Variable(subject1)

variable.asObservable()
        .concat()
        .subscribe { print($0)
}.disposed(by: disposeBag)

subject1.onNext("🍐")
subject1.onNext("🍊")

variable.value = subject2

subject2.onNext("I would be ignored")
subject2.onNext("🐱")

subject1.onCompleted()

subject2.onNext("🐭")

输出结果:
next(🍎)
next(🍐)
next(🍊)
next(🐱)
next(🐭)
  • concatMap
    将 Observable 的元素转换成其他的 Observable,然后将这些 Observables 串连起来
concatMap

concatMap 操作符将源 Observable 的每一个元素应用一个转换方法,将他们转换成 Observables。然后让这些 Observables 按顺序的发出元素,当concatMap 操作符将源 Observable 的每一个元素应用一个转换方法,将他们转换成 Observables。然后让这些 Observables 按顺序的发出元素,当前一个 Observable 元素发送完毕后,后一个 Observable 才可以开始发出元素。等待前一个 Observable 产生完成事件后,才对后一个 Observable 进行订阅。

let disposeBag = DisposeBag()

let subject1 = BehaviorSubject(value: "🍎")
let subject2 = BehaviorSubject(value: "🐶")

let variable = Variable(subject1)

variable.asObservable()
        .concatMap { $0 }
        .subscribe { print($0) }
        .disposed(by: disposeBag)

subject1.onNext("🍐")
subject1.onNext("🍊")

variable.value = subject2
subject2.onNext("I would be ignored")
subject2.onNext("🐱")

subject1.onCompleted()

subject2.onNext("🐭")

输出结果:
next(🍎)
next(🍐)
next(🍊)
next(🐱)
next(🐭)

  • connect
    通知可被连接的 Observable 可以开始发出元素了
connect

可被连接的 Observable 和普通的 Observable 十分相似,不过在被订阅后不会发出元素,直到 connect 操作符被应用为止。这样一来你可以等所有观察者全部订阅完成后,才发出元素。

  • create
    通过一个构建函数完整的创建一个 Observable
create

create 操作符将创建一个 Observable,你需要提供一个构建函数,在构建函数里面描述事件(next,error,completed)的产生过程。

通常情况下一个有限的序列,只会调用一次观察者的 onCompleted 或者 onError 方法。并且在调用它们后,不会再去调用观察者的其他方法。

创建一个 [0, 1, ... 8, 9] 的序列:

let id = Observable<Int>.create { observer in
    observer.onNext(0)
    observer.onNext(1)
    observer.onNext(2)
    observer.onNext(3)
    observer.onNext(4)
    observer.onNext(5)
    observer.onNext(6)
    observer.onNext(7)
    observer.onNext(8)
    observer.onNext(9)
    observer.onCompleted()
    return Disposables.create()
}

  • debug

打印所有的订阅,事件以及销毁信息

let disposeBag = DisposeBag()

let sequence = Observable<String>.create { observer in
    observer.onNext("🍎")
    observer.onNext("🍐")
    observer.onCompleted()
    return Disposables.create()
}

sequence
    .debug("Fruit")
    .subscribe()
    .disposed(by: disposeBag)

输出结果:
2017-11-06 20:49:43.187: Fruit -> subscribed
2017-11-06 20:49:43.188: Fruit -> Event next(🍎)
2017-11-06 20:49:43.188: Fruit -> Event next(🍐)
2017-11-06 20:49:43.188: Fruit -> Event completed
2017-11-06 20:49:43.189: Fruit -> isDisposed
  • delay

将 Observable 的每一个元素拖延一段时间后发出

delay

delay 操作符将修改一个 Observable,它会将 Observable 的所有元素都拖延一段设定好的时间, 然后才将它们发送出来。

  • delaySubscription

进行延时订阅

delaySubscription

delaySubscription 操作符将在经过所设定的时间后,才对 Observable 进行订阅操作。

  • distinctUntilChanged

阻止 Observable 发出相同的元素

distinctUntilChanged

distinctUntilChanged 操作符将阻止 Observable 发出相同的元素。如果后一个元素和前一个元素是相同的,那么这个元素将不会被发出来。如果后一个元素和前一个元素不相同,那么这个元素才会被发出来。

let disposeBag = DisposeBag()

Observable.of("🐱", "🐷", "🐱", "🐱", "🐱", "🐵", "🐱")
    .distinctUntilChanged()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

输出结果:
🐱
🐷
🐱
🐵
🐱
  • elementAt

只发出 Observable 中的第 n 个元素

elementAt

elementAt 操作符将拉取 Observable 序列中指定索引数的元素,然后将它作为唯一的元素发出。

let disposeBag = DisposeBag()

Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
    .elementAt(3)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

输出结果:
🐸
  • empty

创建一个空 Observable

empty

empty 操作符将创建一个 Observable,这个 Observable 只有一个完成事件。

//创建一个空 Observable:
let id = Observable<Int>.empty()

//它相当于:
let id = Observable<Int>.create { observer in
    observer.onCompleted()
    return Disposables.create()
}
  • error
    创建一个只有 error 事件的 Observable
error

error 操作符将创建一个 Observable,这个 Observable 只会产生一个 error 事件。

//创建一个只有 error 事件的 Observable:
let error: Error = ...
let id = Observable<Int>.error(error)

//它相当于:
let error: Error = ...
let id = Observable<Int>.create { observer in
    observer.onError(error)
    return Disposables.create()
}
  • filter

仅仅发出 Observable 中通过判定的元素

filter

filter 操作符将通过你提供的判定方法过滤一个 Observable。

let disposeBag = DisposeBag()
Observable.of(2, 30, 22, 5, 60, 1)
          .filter { $0 > 10 }
          .subscribe(onNext: { print($0) })
          .disposed(by: disposeBag)

输出结果:
30
22
60
  • flatMap

将 Observable 的元素转换成其他的 Observable,然后将这些 Observables 合并

flatMap

flatMap 操作符将源 Observable 的每一个元素应用一个转换方法,将他们转换成 Observables。 然后将这些 Observables 的元素合并之后再发送出来。

这个操作符是非常有用的,例如,当 Observable 的元素本生拥有其他的 Observable 时,你可以将所有子 Observables 的元素发送出来。

let disposeBag = DisposeBag()
let first = BehaviorSubject(value: "👦🏻")
let second = BehaviorSubject(value: "🅰️")
let variable = Variable(first)

variable.asObservable()
        .flatMap { $0 }
        .subscribe(onNext: { print($0) })
        .disposed(by: disposeBag)

first.onNext("🐱")
variable.value = second

second.onNext("🅱️")
first.onNext("🐶")
输出结果:

👦🏻
🐱
🅰️
🅱️
🐶
  • flatMapLatest

将 Observable 的元素转换成其他的 Observable,然后取这些 Observables 中最新的一个

flatMapLatest

flatMapLatest 操作符将源 Observable 的每一个元素应用一个转换方法,将他们转换成 Observables。一旦转换出一个新的 Observable,就只发出它的元素,旧的 Observables 的元素将被忽略掉。


tips:与 flatMap 比较更容易理解

let disposeBag = DisposeBag()
let first = BehaviorSubject(value: "👦🏻")
let second = BehaviorSubject(value: "🅰️")
let variable = Variable(first)

variable.asObservable()
        .flatMapLatest { $0 }
        .subscribe(onNext: { print($0) })
        .disposed(by: disposeBag)

first.onNext("🐱")
variable.value = second
second.onNext("🅱️")
first.onNext("🐶")

输出结果:
👦🏻
🐱
🅰️
🅱️
  • from

将其他类型或者数据结构转换为 Observable

from

当你在使用 Observable 时,如果能够直接将其他类型转换为 Observable,这将是非常省事的。from 操作符就提供了这种功能。

//将一个数组转换为 Observable:
let numbers = Observable.from([0, 1, 2])

//它相当于:
let numbers = Observable<Int>.create { observer in
    observer.onNext(0)
    observer.onNext(1)
    observer.onNext(2)
    observer.onCompleted()
    return Disposables.create()
}

//将一个可选值转换为 Observable
let optional: Int? = 1
let value = Observable.from(optional: optional)

//它相当于:
let optional: Int? = 1
let value = Observable<Int>.create { observer in
    if let element = optional {
        observer.onNext(element)
    }
    observer.onCompleted()
    return Disposables.create()
}
  • just

创建 Observable 发出唯一的一个元素

just

just 操作符将某一个元素转换为 Observable。

//一个序列只有唯一的元素 0:
let id = Observable.just(0)

//它相当于:
let id = Observable<Int>.create { observer in
    observer.onNext(0)
    observer.onCompleted()
    return Disposables.create()
}
  • map

通过一个转换函数,将 Observable 的每个元素转换一遍

map

map 操作符将源 Observable 的每个元素应用你提供的转换方法,然后返回含有转换结果的 Observable。

let disposeBag = DisposeBag()
Observable.of(1, 2, 3)
    .map { $0 * 10 }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

输出结果:
10
20
30
  • merge

将多个 Observables 合并成一个

merge

通过使用 merge 操作符你可以将多个 Observables 合并成一个,当某一个 Observable 发出一个元素时,他就将这个元素发出。

如果,某一个 Observable 发出一个 onError 事件,那么被合并的 Observable 也会将它发出,并且立即终止序列。

let disposeBag = DisposeBag()
let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()

Observable.of(subject1, subject2)
    .merge()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

subject1.onNext("A")

subject1.onNext("B")

subject2.onNext("1")

subject2.onNext("2")

subject1.onNext("AB")

subject2.onNext("3")

输出结果:
A
B
1
2
AB
3
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,126评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,254评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,445评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,185评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,178评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,970评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,276评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,927评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,400评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,883评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,997评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,646评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,213评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,204评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,423评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,423评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,722评论 2 345

推荐阅读更多精彩内容