//PublishSubject -> 会发送订阅者从订阅之后的事件序列
PublishSubjectlet
sequenceOfInts = PublishSubject()
//在新的订阅对象订阅的时候会补发所有已经发送过的数据队列,bufferSize 是缓冲区的大小,决定了补发队列的最大值。如果 bufferSize 是1,那么新的订阅者出现的时候就会补发上一个事件,如果是2,则补两个.
ReplaySubject
let replaySubject = ReplaySubject.create(bufferSize: 2)
//在新的订阅对象订阅的时候会发送最近发送的事件,如果没有则发送一个默认值。
BehaviorSubject
let behaviorSubject = BehaviorSubject.init(value: "z")
Variable
Variable 是基于 BehaviorSubject 的一层封装,它的优势是:不会被显式终结。即:不会收到 .Completed 和 .Error 这类的终结事件,它会主动在析构的时候发送 .Complete 。
subscribe
//订阅各种不同类型的事件
let disposeBag = DisposeBag()
Observable.of("🐶", "🐱", "🐭", "🐹")
.subscribe(onNext: { (str) in
print(str,#line,#function)
}, onError: { (error) in
print(error,#line,#function)
}, onCompleted: {
},onDisposed:{
print("释放")
}).disposed(by: disposeBag)
Never
create an Observable that emits no items and does not terminate
Throw
create an Observable that emits no items and terminates with an error
Empty
create an Observable that emits no items but terminates normally
just
just 是只包含一个元素的序列,它会先发送 .Next(value) ,然后发送 .Completed 。
of
创建一个可观测序列与固定数量的元素。
let disposeBag = DisposeBag()
Observable.of("🐶", "🐱", "🐭", "🐹")
.subscribe(onNext: { element in
print(element)
})
.disposed(by: disposeBag)
from
//from 创建一个可观测序列的序列,如一个数组,字典,或一组。
Observable.from(["🐶", "🐱", "🐭", "🐹"])
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
create
//Creates a custom Observable sequence.
let disposeBag = DisposeBag()
let myJust = { (element: String) -> Observablein
return Observable.create { observer in
observer.on(.next(element))
observer.on(.completed)
return Disposables.create()
}
}
myJust("🔴")
.subscribe { print($0) }
.disposed(by: disposeBag)
repeatElement
//创建一个可观测序列无限期发出给定的元素。
Observable.repeatElement("🔴")
.take(3)//制定数量
.subscribe(onNext: { print($0,#line) })
.disposed(by: disposeBag)
range
//创建一个可观测序列发出一系列连续的整数,然后终止
let disposeBag = DisposeBag()
Observable.range(start: 1, count: 10)
.subscribe { print($0) }
.disposed(by: disposeBag)
generate
//创建一个可观测序列生成的值,只要所提供的条件的求值结果为true。
Observable.generate(
initialState: 0,
condition: { $0 < 3 },
scheduler: CurrentThreadScheduler.instance,
iterate: { $0 + 1 }
)
.subscribe(onNext: { print($0,#line) })
.disposed(by: disposeBag)
deferred
会等到有订阅者的时候再通过工厂方法创建 Observable 对象,每个订阅者订阅的对象都是内容相同而完全独立的序列。
let disposeBag = DisposeBag() var count = 1
let deferredSequence = Observable.deferred {
print("Creating \(count)")
count += 1
return Observable.create { observer in
print("Emitting...")
observer.onNext("🐶")
observer.onNext("🐱")
observer.onNext("🐵")
return Disposables.create()
}
}
deferredSequence
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
deferredSequence
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
map
就是对每个元素都用函数做一次转换,挨个映射一遍。
let aa = sequenceOfInts.map{ i -> Int in
print("MAP---\(i)")
return i * 2
}
flatMap
struct Player {
var score: Variable
}
let 👦🏻 = Player(score: Variable(80))
let 👧🏼 = Player(score: Variable(90))
let player = Variable(👦🏻)
player.asObservable()
.flatMap { $0.score.asObservable() } // Change flatMap to flatMapLatest and observe change in printed output
.subscribe(onNext: { print($0,#line) })
.disposed(by: disposeBag)
👦🏻.score.value = 85
player.value = 👧🏼
// Will be printed when using flatMap, but will not be printed when using flatMapLatest
👦🏻.score.value = 95
👧🏼.score.value = 100
Observable.from(["🐶", "🐱", "🐭", "🐹"])
.flatMap { (event) -> Observablein
return Observable.of("🐶", "🐱", "🐭", "🐹")
}.subscribe{
print($0,#line)
}.disposed(by: disposeBag)
scan
Observable.of(10, 100, 1000)
.scan(2) { aggregateValue, newValue in
print(aggregateValue,newValue,"->scan")
return aggregateValue + newValue
}
.subscribe(onNext: { print($0,"scan->",#line) })
.disposed(by: disposeBag)
打印结果
2 10 ->scan
12 scan-> 324
12 100 ->scan
112 scan-> 324
112 1000 ->scan
1112 scan-> 324
distinctUntilChanged
//去掉连续的重复元素
Observable.of("🐱", "🐷", "🐱", "🐱", "🐱", "🐵", "🐱")
.distinctUntilChanged()
.subscribe(onNext: { print($0,"->distinctUntilChanged") })
.disposed(by: disposeBag)
elementAt
Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
.elementAt(3)
.subscribe(onNext: { print($0,"->elementAt") })
.disposed(by: disposeBag)
takeUntil
//对另一个观测序列(referenceSequence)的依赖停止(completed) let sourceSequence = PublishSubject() let referenceSequence = PublishSubject()
sourceSequence
.takeUntil(referenceSequence)
.subscribe { print($0) }
.disposed(by: disposeBag)
sourceSequence.onNext("🐱")
sourceSequence.onNext("🐰")
sourceSequence.onNext("🐶")
referenceSequence.onNext("🔴")
sourceSequence.onNext("🐸")
sourceSequence.onNext("🐷")
sourceSequence.onNext("🐵")
skip
//过滤掉最初始的2个序列形成可观察序列(skipWhile,skipWhileWithIndex)
Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
.skip(2)
.subscribe(onNext: { print($0,"->skip") })
.disposed(by: disposeBag)
skipUntil
//对另一个观测序列的依赖,过滤let sourceSequence1 = PublishSubject() let referenceSequence1 = PublishSubject()
sourceSequence1
.skipUntil(referenceSequence1)
.subscribe(onNext: { print($0,"->skipUntil") })
.disposed(by: disposeBag)
sourceSequence1.onNext("🐱")
sourceSequence1.onNext("🐰")
sourceSequence1.onNext("🐶")
referenceSequence1.onNext("🔴")
sourceSequence1.onNext("🐸")
sourceSequence1.onNext("🐷")
sourceSequence1.onNext("🐵")
single
//将元素转化成观察序列,如果重复或者没有这个元素就抛出错误,默认是第一个元素
Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
.single()
.subscribe(onNext: { print($0,"->single") })
.disposed(by: disposeBag)
concat
等待每个序列终止之前,成功发射第二序列的元素。
let subject7 = BehaviorSubject(value: "🍎")
let subject8 = BehaviorSubject(value: "🐶")
let variable1 = Variable(subject7)
variable1.asObservable()
.concat()
.subscribe { print($0,"->concat") }
.disposed(by: disposeBag)
subject7.onNext("🍐")
subject7.onNext("🍊")
variable1.value = subject8
subject8.onNext("I would be ignored")
subject8.onNext("🐱")
subject7.onCompleted()//此时第二个序列可以成功发送数据
subject8.onNext("🐭")
toArray
Observable.range(start: 1, count: 10)
.toArray()
.subscribe { print($0) }
.disposed(by: disposeBag)
reduce
Observable.of(10, 100, 1000)
.reduce(1, accumulator: +)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
bindTo
将观察到的对象绑定到某对象
//这是一个加法计算,将相加的结果赋值给Label
Observable.combineLatest(txf1.rx.text.orEmpty,txf2.rx.text.orEmpty,txf3.rx.text.orEmpty) {
(Int($0) ?? 0)+(Int($1) ?? 0)+(Int($2) ?? 0)
}.map {
$0.description
}.bindTo(result.rx.text).disposed(by: disPoseBag)
融合信号
merge
let subject1 = PublishSubject()let subject2 = PublishSubject()
Observable.of(subject1, subject2)
.merge()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
subject1.onNext("🅰️")
subject1.onNext("🅱️")
subject2.onNext("①")
subject2.onNext("②")
subject1.onNext("🆎")
subject2.onNext("③")
combineLatest
//释放元素从最近期内可观察到的序列。
当需要同时监听时,那么每当有新的事件发生的时候,combineLatest 会将每个队列的最新的一个元素进行合并。
把3个输入框的值相加再绑定到Label
Observable.combineLatest(txf1.rx.text.orEmpty,txf2.rx.text.orEmpty,txf3.rx.text.orEmpty) {
(Int($0) ?? 0)+(Int($1) ?? 0)+(Int($2) ?? 0)
}.map {
$0.description
}.bindTo(result.rx.text).disposed(by: disPoseBag)
switchLatest
let subject3 = BehaviorSubject(value: "⚽️")
let subject4 = BehaviorSubject(value: "🍎")
let variable = Variable(subject3)
variable.asObservable()
.switchLatest()
.subscribe(onNext: { print($0,"->switchLatest") })
.disposed(by: disposeBag)
subject3.onNext("🏈")
subject3.onNext("🏀")
variable.value = subject4
subject3.onNext("⚾️”)//发送失败,最近的是subject4
subject4.onNext("🍐")
shareReplay
//shareReplay它是以重播(保存通知记录)的方式通知自己的订阅者,防止map重复调用,即使订阅之前的序列也可以收到(收到可重播次数的事件序列)
let aa = sequenceOfInts.map{ i -> Int in
print("MAP---\(i)")
return i * 2
}.shareReplay(3)
Driver
Driver的drive方法与Observable的方法bindTo用法非常相似
它的特点
* 它不会发射出错误(Error)事件
* 对它的观察订阅是发生在主线程(UI线程)的
* 自带shareReplayLatestWhileConnected,防止重复执行事件
textField.rx_text
.asDriver()
.drive(label.rx_sayHelloObserver)
.addDisposableTo(disposeBag)
throttle
在Observable中假如你要进行限流,你要用到方法throttle(dueTime: RxSwift.RxTimeInterval, scheduler: SchedulerType),方法的第一个参数是两个事件之间的间隔时间,第二个参数是一个线程的有关类,如我要在主线程中,我可以传入MainScheduler.instance。而在Driver中我们要限流,调用的是throttle(dueTime: RxSwift.RxTimeInterval),只配置事件的间隔时间,而它默认会在主线程中进行。
driveField.rx.text
.asDriver().throttle(2)
.drive(driveLabel.rx.text)
.addDisposableTo(disPoseBag)
DisposeBag
//事件的释放,会在该对象释放的时候释放绑定监听事件
let disPoseBag = DisposeBag()
aa.subscribe {
print("--1--\($0)")
}.disposed(by: disPoseBag)——>不能在这里写DisposeBag(),会直接结束掉