public func stamp() -> String {
let date = Date()
let formatter = DateFormatter()
formatter.dateFormat = "HH:mm:ss"
let result = formatter.string(from: date)
return result
}
replay : 当有 新的订阅者 加入后,回访指定个数的 最新信号。replay(N)可以指定回放个数 ,也可用 replayAll回放所有的历史事件。
let interval = Observable<Int>.interval(DispatchTimeInterval.seconds(1), scheduler: MainScheduler.instance).replay(2)
_ = interval.subscribe(onNext: { (num) in
print("1 - " + self.stamp())
print(" Subscriber1: \(num)")
})
_ = interval.connect()
let current = DispatchTime.now()
print("开始 - " + self.stamp())
DispatchQueue.global().asyncAfter(deadline: current + 2) {
_ = interval.subscribe(onNext: { (num) in
print("2 - " + self.stamp())
print(" Subscriber2: \(num)")
})
}
DispatchQueue.global().asyncAfter(deadline: current + 4) {
_ = interval.subscribe(onNext: { (num) in
print("3 - " + self.stamp())
print(" Subscriber3: \(num)")
})
}
执行过程
interval
-----------------0-------------1--------------2-------------3-------------4-----------------
Subscriber1
-----------------0-------------1--------------2-------------3-------------4-----------------
Subscriber2
-------------------------------1-------------2--------------3-------------4-----------------
Subscriber3
------------------------------------------------------------3--------------4-----------------
.....replay(2)
----------------------------(1,0,1)-----------------------(3,3,2,3)-------------------------
note
因为 13:52:06 开始 延迟两秒, 13:52:08 replay 一次。13:52:06 延迟 4秒,13:52:10 replay 一次
一、在 13:52:06 开始运行,并在 13:52:07 Subscriber 1 监听到了第一个信号 0。
二、进入第二秒(13:52:08) Subscriber 1 首先监听到了 信号 1, 紧接着 Subscriber 2加入监听, replay 为 Subscriber 回放了 , 信号0 和 信号1。
三、在 second 3 (13:52:09 )中 Subscriber 1 还是继续监听信号,此时最新的信号是 2 。 对于Subscriber 2 replay 已经重放过了信号,所以现在它也正常了, Subscriber 继续监听信号 ,监听的结果是 2.
四、 首先要知道到 13:52:10 秒 observable 已经发生的信号有: 0 、 1、 2、 3。\Subscriber 1 和 Subscriber 2 继续监听最新的信号 结果是 3。 Subscriber 3 是新加入的,replay要为它 回放两个 最新的信号 2 、3, 所以 Subscriber 3 监听的记过是 2 、3。
五、13:52:11 开始 Subscriber 1、Subscriber 2 、 Subscriber 3 都进入稳定监听期,不会有回放事件产生。
buffer:为事件回放提供缓冲区。
回顾知识点: 默认情况订阅者是不共享Observalbe的。我很迷茫不知道你在说啥
let interval = Observable<Int>.interval(DispatchTimeInterval.seconds(1), scheduler: MainScheduler.instance).buffer(timeSpan: DispatchTimeInterval.seconds(4), count: 2, scheduler: MainScheduler.instance)
_ = interval.subscribe(onNext: { (num) in
print("1 - " + self.stamp())
print(" Subscriber1: \(num)")
})
let current = DispatchTime.now()
print("开始 - " + self.stamp())
DispatchQueue.global().asyncAfter(deadline: current + 2) {
_ = interval.subscribe(onNext: { (num) in
print("2 - " + self.stamp())
print(" Subscriber2: \(num)")
})
}
buffer 用法解析:(timeSpan 和 count 那个条件先满足都会发信号)
:缓冲区的时间跨度,尽管 interval 每隔 1秒钟发生一次事件,但是进过 buffer 处理后,就变成了 事件了,事件的值,就是所有事件值构成的数组,如果 timeSpan 过后没有任何事件发生,就向事件的订阅者发送一个空数组。
:缓冲区在timeSpan 时间里可以缓存的最大事件数量,
:表示Observable事件序列发生在主线程
执行过程
// count 先满足条件
interval
-----------------0-------------1--------------2-------------3-------------------------------
Subscriber1 发给订阅和,重置timeSpan
----------------------------(0,1)------------------------(2,3)-----------------------------
Subscriber2
-----------------------------------------------------------(0,1)----------------------------
......buffer(timeSpan: ......)
----------------------------(0,1)-----------------------(2,3)(0,1)-------------------------
note
代码展示的是 count 先满足条件的情况。
Second 1 (16:34:02) Subscriber 1 收到第一个信号 【0,1】。
Second 2 (16:34:04)Subscriber 2 加入监听,因为它是延迟 2 秒加入监听,\ 所以收到的 信号是 【0,1】,
Subscriber 1 监听的信号是 【2,3】。
window:由 Observable 填充缓冲区;除了用缓冲区的事件值作为数组发送给订阅者之外,我们还可以让某个时间段内的所有事件,组成一个新的 Observable,这个就是 window。参数使用和buffer 一样,不赘述。
我们的事件序列就会每隔4秒打开一个窗口,每个窗口周期最多处理4个事件,然后关闭当前窗口,打开新的窗口。
要注意每一个窗口周期中的事件,是随着interval中的事件实时发送给订阅者的。
为了观察每一个Sub-observable生成到完成的过程,我们只好比较笨的在interval的订阅里,又订阅了一次。
注意这里,,否则,我们订阅到的,就是对interval所有事件变换后的Observable,这样,就观察不到每一个Sub-observable的onCompleted事件了。
let interval = Observable<Int>.interval(DispatchTimeInterval.seconds(1), scheduler: MainScheduler.instance).window(timeSpan: DispatchTimeInterval.seconds(4), count: 4, scheduler: MainScheduler.instance)
print("开始 - " + self.stamp())
_ = interval.subscribe(onNext: { (subObservable: Observable<Int>) in
print("============= Window Open ===============")
_ = subObservable.subscribe(onNext: { (value: Int) in
print("1 - " + self.stamp())
print(" Subscriber1: \(value)")
}, onError: nil, onCompleted: {
print("============ Window Closed ==============")
}) {
print("onDisposed")
}
})
执行过程
interval
-----------------0--1--2---------------3--4--5--6--7--8-----------------------------------
...... window(timeSpan: .......)
--------open----(0,1,2)---close---open---(3,4,5,6)----close---------------------------------
note
- 一、第一个窗口期,observable 是在 17:29:51 开始监听, 并在 17:29:52 收到第一个信号所以 一共三个信号。
buffer 和 window 的异同点:
相同点:
- 他们都有缓存区。
- 发送和信号的标准都是一样的,timeSpan 和 count 先到为准。
不同点:
1、
buffer :发送标准是攒够了就发一次,无论是 timeSpan 还是 count那个先满足条件。这个时间段就是信号发送的间隔。
window :实时监听没有信号发送间隔,observable原来怎样发送, window监听转化后还怎么监听,timeSpan 和 count 是窗口期开启和关闭的条件,而不是信号发送的间隔。
2、
- buffer :缓存区存储的是事件的值数组。[object1 , object2....... ]
- window :缓存区存储的是某个时间段里的所有事件,组成一个新的Observable。 observable<observable<Any>>