Flink水位线并行度测试

(一)Demo解释

  • 1.开启nc服务: nc -lk 7777
  • 2.发送字符串数据格式: 0001 1569895882000 200
    • 第一个元素是key: 0001
    • 第二个元素是时间戳: 1569895882000
    • 第三个元素是要计算的数字: 200
  • 3.从数据源读取数据转成元组类型: (String, Long, Long), 例: (0001, 1569895882000, 200)
  • 4.添加水印,设置延迟时间10s
  • 5.设置滚动窗口每10s计算一次数据
  • 6.实现类WindowFunction
    • 窗口中数据,元组第3个元素,计算累加和
    • 收集数据元组类型: (String, Long) 例: (0001, 700)
    • 计算元组第3个数据的累加和,是为了更真实的模拟窗口计算效果
  • 7.版本 flink: 1.6.1, scal: 2.11.12
  • 8.代码关键位置,都加入了注释
package com.peach

import java.text.SimpleDateFormat

import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.collection.mutable.ArrayBuffer

object WatermarkTest {
    def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        import org.apache.flink.api.scala._

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        // 设置并行度
        env.setParallelism(2)

        val socketStream: DataStream[String] = env.socketTextStream("node03", 7777)

        // 读取数据转换成元组 (key, 时间戳, 需计算的数据)
        val tupleStream: DataStream[(String, Long, Long)] = socketStream.map( {
            line =>
                val strings: Array[String] = line.split(" ")
                (strings(0), strings(1).toLong, strings(2).toLong)
        })


        // 添加水印
        val waterStream: DataStream[(String, Long, Long)] = tupleStream
                .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String, Long, Long)] {

                    var currentMaxTimestamp = 0L
                    // 延迟时间设置 10s
                    val timeDiff = 10000L
                    val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")

                    override def getCurrentWatermark: Watermark = {
                        val watermark = new Watermark(currentMaxTimestamp - timeDiff)
                        watermark
                    }

                    override def extractTimestamp(element: (String, Long, Long), previousElementTimestamp: Long): Long = {

                        currentMaxTimestamp = Math.max(currentMaxTimestamp, element._2)

                        val id: Long = Thread.currentThread().getId

                        println("currentThreadId:" + id
                                + ",key:" + element._1
                                + ",eventtime:[" + element._2 + "|"
                                + sdf.format(element._2) + "],currentMaxTimestamp:["
                                + currentMaxTimestamp + "|" + sdf.format(currentMaxTimestamp) + "],watermark:["
                                + this.getCurrentWatermark.getTimestamp + "|"
                                + sdf.format(this.getCurrentWatermark.getTimestamp) + "]")

                        // NOTE: 返回的不是当前最大时间 currentMaxTimestamp
                        // 返回的是数据中的 事件时间
                        element._2
                    }
                })

        /*
            1. 设置滚动窗口10s
            2. 计算规则: 窗口内的数据,计算元组第3个元素的和
            3. 打印窗口结果
         */
        waterStream.keyBy(0)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .apply(new SumWindowFunction)
                .print()


        env.execute("WatermarkTest")
    }
}

class SumWindowFunction extends WindowFunction[(String, Long, Long), (String, Long), Tuple, TimeWindow] {
    override def apply(key: Tuple, window: TimeWindow, input: Iterable[(String, Long, Long)], out: Collector[(String, Long)]): Unit = {

        // 从key(Tuple)类型获取数据源中的key值
        val keyStr: String = key.getField[String](0)
        // 存储窗口所有数据中的时间戳数据
        val timestampBuffer: ArrayBuffer[Long] = ArrayBuffer[Long]()
        val ite: Iterator[(String, Long, Long)] = input.iterator

        // 存储计算和的数据
        var countResult: Long = 0L

        // 遍历窗口所有数据
        while (ite.hasNext) {
            val tup2: (String, Long, Long) = ite.next()
            // 逐个添加 时间戳
            timestampBuffer.append(tup2._2)

            // 累加元组中第3个数据的和
            countResult += tup2._3
        }

        // 时间戳 数据 数组
        val timeStampArray: Array[Long] = timestampBuffer.toArray

        val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
        val  result: StringBuffer =   new StringBuffer("聚合数据的key为:" + keyStr + ",")
        result.append("窗口当中数据的条数为:" + timeStampArray.length + ",")
        result.append("窗口当中第一条数据为:" + sdf.format(timeStampArray.head) + ",")
        result.append("窗口当中最后一条数据为:" + sdf.format(timeStampArray.last) + ",")
        result.append("窗口起始时间为:" + sdf.format(window.getStart) + ",")
        result.append("窗口结束时间为:" + sdf.format(window.getEnd) + ",")
        result.append("!!!!!! 触发窗口计算\n")

        print(result.toString)

        // 收集 计算结果的元组数据 (key, 计算结果的数据)
        out.collect((keyStr, countResult))
    }
}

(二)准备测试,代码修改并行度为1,验证准备的数据

1.顺序发送3条数据

// 第1条数据  时间是:10:11:22  watermark: 10:11:12 待计算数据 200,
0001 1569895882000 200 
// 第2条数据  时间是:10:11:25 watermark: 10:11:15 待计算数据500  
0001 1569895885000 500  
// 第3条数据  时间是:10:11:40 watermark: 10:11:30  触发计算
0001 1569895900000 30   

打印结果如下:

// 第1条数据 watermark: 10:11:12
currentThreadId:38,key:0001,eventtime:[1569895882000|2019-10-01 10:11:22.000],currentMaxTimestamp:[1569895882000|2019-10-01 10:11:22.000],watermark:[1569895872000|2019-10-01 10:11:12.000]
// 第2条数据 watermark: 10:11:15
currentThreadId:38,key:0001,eventtime:[1569895885000|2019-10-01 10:11:25.000],currentMaxTimestamp:[1569895885000|2019-10-01 10:11:25.000],watermark:[1569895875000|2019-10-01 10:11:15.000]
// 第3条数据 watermark: 10:11:30 触发计算
currentThreadId:38,key:0001,eventtime:[1569895900000|2019-10-01 10:11:40.000],currentMaxTimestamp:[1569895900000|2019-10-01 10:11:40.000],watermark:[1569895890000|2019-10-01 10:11:30.000]
聚合数据的key为:0001,窗口当中数据的条数为:2,窗口当中第一条数据为:2019-10-01 10:11:22.000,窗口当中最后一条数据为:2019-10-01 10:11:25.000,窗口起始时间为:2019-10-01 10:11:20.000,窗口结束时间为:2019-10-01 10:11:30.000,!!!!!! 触发窗口计算
// 计算结果, 前2个数据被触发计算
(0001,700)

2.已确认 0001 1569895900000 30 可以触发窗口计算,
所以数据 0001 1569895901000 31 时间戳加1000, 时间增加1s, 也会触发窗口计算

// 第1条数据  时间是:10:11:22  watermark: 10:11:12 待计算数据 200,
0001 1569895882000 200 
// 第2条数据  时间是:10:11:25 watermark: 10:11:15 待计算数据500  
0001 1569895885000 500  
// 第3条数据  时间是:10:11:41 watermark: 10:11:31  触发计算
0001 1569895901000 31   

(三)并行度为2测试

1.顺序发送数据

// 第1条数据  时间是:10:11:22  watermark: 10:11:12 待计算数据 200,
0001 1569895882000 200 
// 第2条数据  时间是:10:11:25 watermark: 10:11:15 待计算数据500  
0001 1569895885000 500  
// 第3条数据  时间是:10:11:40 watermark: 10:11:30 待计算数据30  
0001 1569895900000 30   
// 第4条数据  时间是:10:11:41 watermark: 10:11:31 待计算数据31
0001 1569895901000 31  

打印结果如下:

currentThreadId:43,key:0001,eventtime:[1569895882000|2019-10-01 10:11:22.000],currentMaxTimestamp:[1569895882000|2019-10-01 10:11:22.000],watermark:[1569895872000|2019-10-01 10:11:12.000]
currentThreadId:41,key:0001,eventtime:[1569895885000|2019-10-01 10:11:25.000],currentMaxTimestamp:[1569895885000|2019-10-01 10:11:25.000],watermark:[1569895875000|2019-10-01 10:11:15.000]
currentThreadId:43,key:0001,eventtime:[1569895900000|2019-10-01 10:11:40.000],currentMaxTimestamp:[1569895900000|2019-10-01 10:11:40.000],watermark:[1569895890000|2019-10-01 10:11:30.000]
currentThreadId:41,key:0001,eventtime:[1569895901000|2019-10-01 10:11:41.000],currentMaxTimestamp:[1569895901000|2019-10-01 10:11:41.000],watermark:[1569895891000|2019-10-01 10:11:31.000]
聚合数据的key为:0001,窗口当中数据的条数为:2,窗口当中第一条数据为:2019-10-01 10:11:22.000,窗口当中最后一条数据为:2019-10-01 10:11:25.000,窗口起始时间为:2019-10-01 10:11:20.000,窗口结束时间为:2019-10-01 10:11:30.000,!!!!!! 触发窗口计算
1> (0001,700)

2.先看最后数据计算结果: 1> (0001,700)

  • 和是700, 说明前2条数据200+500被计算了
  • 输入第4个数据后,触发了窗口计算

3.查看currentThreadId(线程ID)有2个 43 和 41

  • 第3条数据线程43, 让线程43水位线满足触发条件, 每个线程都有独立的watermark
  • 第4条数据线程41, 让线程41水位线满足触发条件
  • 第4条数据后,因并行度为2,全部的线程都满足触发条件,所以执行计算

(四)并行度为4测试

  • 1.大胆假设
    • 并行度为4,就会有4个线程开启
    • 每个线程就会有独立的wartermark,就是4个watermark
    • 触发计算的条件就是,所有4个线程的watermark都满足后,而且窗口有数据,就会触发计算
  • 2.修改并行度为4, 运行程序验证
    发送数据如下:
0001 1569895882000 200
0001 1569895885000 500
0001 1569895900000 30
0001 1569895901000 31
0001 1569895902000 32
0001 1569895903000 33

打印运行结果:

currentThreadId:49,key:0001,eventtime:[1569895882000|2019-10-01 10:11:22.000],currentMaxTimestamp:[1569895882000|2019-10-01 10:11:22.000],watermark:[1569895872000|2019-10-01 10:11:12.000]
currentThreadId:45,key:0001,eventtime:[1569895885000|2019-10-01 10:11:25.000],currentMaxTimestamp:[1569895885000|2019-10-01 10:11:25.000],watermark:[1569895875000|2019-10-01 10:11:15.000]
currentThreadId:43,key:0001,eventtime:[1569895900000|2019-10-01 10:11:40.000],currentMaxTimestamp:[1569895900000|2019-10-01 10:11:40.000],watermark:[1569895890000|2019-10-01 10:11:30.000]
currentThreadId:49,key:0001,eventtime:[1569895902000|2019-10-01 10:11:42.000],currentMaxTimestamp:[1569895902000|2019-10-01 10:11:42.000],watermark:[1569895892000|2019-10-01 10:11:32.000]
currentThreadId:41,key:0001,eventtime:[1569895901000|2019-10-01 10:11:41.000],currentMaxTimestamp:[1569895901000|2019-10-01 10:11:41.000],watermark:[1569895891000|2019-10-01 10:11:31.000]
currentThreadId:45,key:0001,eventtime:[1569895903000|2019-10-01 10:11:43.000],currentMaxTimestamp:[1569895903000|2019-10-01 10:11:43.000],watermark:[1569895893000|2019-10-01 10:11:33.000]
聚合数据的key为:0001,窗口当中数据的条数为:2,窗口当中第一条数据为:2019-10-01 10:11:25.000,窗口当中最后一条数据为:2019-10-01 10:11:22.000,窗口起始时间为:2019-10-01 10:11:20.000,窗口结束时间为:2019-10-01 10:11:30.000,!!!!!! 触发窗口计算
1> (0001,700)

(五)结论

1、窗口[window_start_time,window_end_time)中有数据存在
2、watermark时间 >= window_end_time
3、并行度设置几个,就有几个独立的watermark,所有都要满足watermark时间 >= window_end_time

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,126评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,254评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,445评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,185评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,178评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,970评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,276评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,927评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,400评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,883评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,997评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,646评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,213评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,204评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,423评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,423评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,722评论 2 345

推荐阅读更多精彩内容