RXSwift - 使用connectable operator回放事件



    public func stamp() -> String {
        let date = Date()
        let formatter = DateFormatter()
        formatter.dateFormat = "HH:mm:ss"
        let result = formatter.string(from: date)

        return result
    }

replay : 当有 新的订阅者 加入后,回访指定个数的 最新信号。replay(N)可以指定回放个数 ,也可用 replayAll回放所有的历史事件。

 let  interval = Observable<Int>.interval(DispatchTimeInterval.seconds(1), scheduler: MainScheduler.instance).replay(2)

        _ = interval.subscribe(onNext: { (num) in
            print("1 - " + self.stamp())
            print("     Subscriber1: \(num)")
        })
        
        _ = interval.connect()
        let current = DispatchTime.now()
         print("开始 - " + self.stamp())

        DispatchQueue.global().asyncAfter(deadline: current + 2) {
            _ = interval.subscribe(onNext: { (num) in
                print("2 - " + self.stamp())
                print("     Subscriber2: \(num)")
            })
        }
        
        DispatchQueue.global().asyncAfter(deadline: current + 4) {
            _ = interval.subscribe(onNext: { (num) in
                print("3 - " + self.stamp())
                print("     Subscriber3: \(num)")
            })
        }
        

执行过程

interval
-----------------0-------------1--------------2-------------3-------------4-----------------
     
Subscriber1
-----------------0-------------1--------------2-------------3-------------4-----------------
     
 
Subscriber2
-------------------------------1-------------2--------------3-------------4-----------------
     
 
Subscriber3
------------------------------------------------------------3--------------4-----------------

 
                            .....replay(2)
     
----------------------------(1,0,1)-----------------------(3,3,2,3)-------------------------

\color{#DEB887}{执行结果:}

\color{#DEB887}{开始 - 13:52:06}

\color{#DEB887}{1 - 13:52:07}
\color{#DEB887}{ \qquad Subscriber1: 0}

\color{#DEB887}{1 - 13:52:08}
\color{#DEB887}{ \qquad Subscriber1: 1}
\color{#DEB887}{2 - 13:52:08}
\color{#DEB887}{\qquad Subscriber2: 0 \qquad replay}
\color{#DEB887}{2 - 13:52:08 }
\color{#DEB887}{\qquad Subscriber2: 1 \qquad replay}

\color{#DEB887}{1 - 13:52:09}
\color{#DEB887}{\qquad Subscriber1: 2}
\color{#DEB887}{2 - 13:52:09}
\color{#DEB887}{\qquad Subscriber2: 2}

\color{#DEB887}{1 - 13:52:10}
\color{#DEB887}{\qquad Subscriber1: 3}
\color{#DEB887}{2 - 13:52:10}
\color{#DEB887}{\qquad Subscriber2: 3}
\color{#DEB887}{3 - 13:52:10}
\color{#DEB887}{\qquad Subscriber3: 2 \qquad replay}
\color{#DEB887}{3 - 13:52:10}
\color{#DEB887}{\qquad Subscriber3: 3 \qquad replay}

\color{#DEB887}{1 - 13:52:11}
\color{#DEB887}{\qquad Subscriber1: 4}
\color{#DEB887}{2 - 13:52:11}
\color{#DEB887}{\qquad Subscriber2: 4}
\color{#DEB887}{3 - 13:52:11}
\color{#DEB887}{ \qquad Subscriber3: 4}

note

  • 因为 13:52:06 开始 延迟两秒, 13:52:08 replay 一次。13:52:06 延迟 4秒,13:52:10 replay 一次

  • 一、在 13:52:06 开始运行,并在 13:52:07 Subscriber 1 监听到了第一个信号 0。

  • 二、进入第二秒(13:52:08) Subscriber 1 首先监听到了 信号 1, 紧接着 Subscriber 2加入监听, replay 为 Subscriber 回放了 , 信号0 和 信号1。

  • 三、在 second 3 (13:52:09 )中 Subscriber 1 还是继续监听信号,此时最新的信号是 2 。 对于Subscriber 2 replay 已经重放过了信号,所以现在它也正常了, Subscriber 继续监听信号 ,监听的结果是 2.

  • 四、 首先要知道到 13:52:10 秒 observable 已经发生的信号有: 0 、 1、 2、 3。\Subscriber 1 和 Subscriber 2 继续监听最新的信号 结果是 3。 Subscriber 3 是新加入的,replay要为它 回放两个 最新的信号 2 、3, 所以 Subscriber 3 监听的记过是 2 、3。

  • 五、13:52:11 开始 Subscriber 1、Subscriber 2 、 Subscriber 3 都进入稳定监听期,不会有回放事件产生。

buffer:为事件回放提供缓冲区。

回顾知识点: 默认情况订阅者是不共享Observalbe的。我很迷茫不知道你在说啥

     let  interval = Observable<Int>.interval(DispatchTimeInterval.seconds(1), scheduler: MainScheduler.instance).buffer(timeSpan:     DispatchTimeInterval.seconds(4), count: 2, scheduler: MainScheduler.instance)

        _ = interval.subscribe(onNext: { (num) in
            print("1 - " + self.stamp())
            print("     Subscriber1: \(num)")
        })
        
        let current = DispatchTime.now()
         print("开始 - " + self.stamp())

        DispatchQueue.global().asyncAfter(deadline: current + 2) {
            _ = interval.subscribe(onNext: { (num) in
                print("2 - " + self.stamp())
                print("     Subscriber2: \(num)")
            })
        }

buffer 用法解析:(timeSpan 和 count 那个条件先满足都会发信号)

  • \color{}{\mathbf{timeSpan}}:缓冲区的时间跨度,尽管 interval 每隔 1秒钟发生一次事件,但是进过 buffer 处理后,就变成了 \color{}{\mathbf{最长 timeSpan 秒发生一次}}事件了,事件的值,就是所有事件值构成的数组,如果 timeSpan 过后没有任何事件发生,就向事件的订阅者发送一个空数组。

  • \color{}{\mathbf{count}}:缓冲区在timeSpan 时间里可以缓存的最大事件数量,\color{}{\mathbf{当达到这个值之后,buffer 就会立即把缓存的事件用一个数组发送给订阅者,并重置 timeSpan。}}

  • \color{}{\mathbf{scheduler}}:表示Observable事件序列发生在主线程

执行过程

// count 先满足条件
interval
-----------------0-------------1--------------2-------------3-------------------------------
     
Subscriber1                         发给订阅和,重置timeSpan
----------------------------(0,1)------------------------(2,3)-----------------------------
     
 
Subscriber2
-----------------------------------------------------------(0,1)----------------------------
                      ......buffer(timeSpan: ......)
     
----------------------------(0,1)-----------------------(2,3)(0,1)-------------------------

\color{#DEB887}{1 - 16:34:02}
\color{#DEB887}{\qquad Subscriber1: [0, 1]}

\color{#DEB887}{1 - 16:34:04}
\color{#DEB887}{\qquad Subscriber1: [2, 3]}
\color{#DEB887}{2 - 16:34:04}
\color{#DEB887}{\qquad Subscriber2: [0, 1]}

\color{#DEB887}{1 - 16:34:06}
\color{#DEB887}{\qquad Subscriber1: [4, 5]}
\color{#DEB887}{1 - 16:34:06}
\color{#DEB887}{\qquad Subscriber2: [2, 3]}

note

代码展示的是 count 先满足条件的情况。

  • Second 1 (16:34:02) Subscriber 1 收到第一个信号 【0,1】。

  • Second 2 (16:34:04)Subscriber 2 加入监听,因为它是延迟 2 秒加入监听,\ 所以收到的 信号是 【0,1】,
    Subscriber 1 监听的信号是 【2,3】。

window:由 Observable 填充缓冲区;除了用缓冲区的事件值作为数组发送给订阅者之外,我们还可以让某个时间段内的所有事件,组成一个新的 Observable,这个就是 window。参数使用和buffer 一样,不赘述。

我们的事件序列就会每隔4秒打开一个窗口,每个窗口周期最多处理4个事件,然后关闭当前窗口,打开新的窗口。

要注意每一个窗口周期中的事件,是随着interval中的事件实时发送给订阅者的。\color{}{ \small{ \mathbf{而不是“攒”够了一个窗口周期的 事件后,再发送一个Sub-observable。}} }

为了观察每一个Sub-observable生成到完成的过程,我们只好比较笨的在interval的订阅里,又订阅了一次。

注意这里,\color{}{ \small{ \mathbf{不要用flatMap直接对interval进行变换}} },否则,我们订阅到的,就是对interval所有事件变换后的Observable,这样,就观察不到每一个Sub-observable的onCompleted事件了。


        let  interval = Observable<Int>.interval(DispatchTimeInterval.seconds(1), scheduler: MainScheduler.instance).window(timeSpan: DispatchTimeInterval.seconds(4), count: 4, scheduler: MainScheduler.instance)
        
        print("开始 - " + self.stamp())
        
        _ = interval.subscribe(onNext: { (subObservable: Observable<Int>) in
            
            print("============= Window Open ===============")
            
            _ = subObservable.subscribe(onNext: { (value: Int) in
                print("1 - " + self.stamp())
                print("     Subscriber1: \(value)")
            }, onError: nil, onCompleted: {
                print("============ Window Closed ==============")
            }) {
                print("onDisposed")
            }
        })
       

执行过程

interval
-----------------0--1--2---------------3--4--5--6--7--8-----------------------------------
     
            ...... window(timeSpan: .......)
     
--------open----(0,1,2)---close---open---(3,4,5,6)----close---------------------------------

\color{#DEB887}{执行结果:}

\color{#DEB887}{开始 - 17:29:51}

\color{#DEB887}{Window Open}
\color{#DEB887}{1 - 17:29:52}
\color{#DEB887}{\qquad Subscriber1: 0}
\color{#DEB887}{1 - 17:29:53}
\color{#DEB887}{\qquad Subscriber1: 1}
\color{#DEB887}{1 - 17:29:54}
\color{#DEB887}{\qquad Subscriber1: 2}
\color{#DEB887}{Window Closed}
\color{#DEB887}{onDisposed}

\color{#DEB887}{Window Open}
\color{#DEB887}{1 - 17:29:55}
\color{#DEB887}{\qquad Subscriber1: 3}
\color{#DEB887}{1 - 17:29:56}
\color{#DEB887}{\qquad Subscriber1: 4}
\color{#DEB887}{1 - 17:29:57}
\color{#DEB887}{\qquad Subscriber1: 5}
\color{#DEB887}{1 - 17:29:58}
\color{#DEB887}{\qquad Subscriber1: 6}
\color{#DEB887}{Window Closed}
\color{#DEB887}{onDisposed}

note

  • 一、第一个窗口期,observable 是在 17:29:51 开始监听, 并在 17:29:52 收到第一个信号所以 一共三个信号。

buffer 和 window 的异同点:

相同点:

  • 他们都有缓存区。
  • 发送和信号的标准都是一样的,timeSpan 和 count 先到为准。

不同点:

1、
  • buffer :发送标准是攒够了就发一次,无论是 timeSpan 还是 count那个先满足条件。这个时间段就是信号发送的间隔。

  • window :实时监听没有信号发送间隔,observable原来怎样发送, window监听转化后还怎么监听,timeSpan 和 count 是窗口期开启和关闭的条件,而不是信号发送的间隔。

2、
  • buffer :缓存区存储的是事件的值数组。[object1 , object2....... ]
  • window :缓存区存储的是某个时间段里的所有事件,组成一个新的Observable。 observable<observable<Any>>
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342