(一)Demo解释
- 1.开启nc服务: nc -lk 7777
- 2.发送字符串数据格式: 0001 1569895882000 200
- 第一个元素是key: 0001
- 第二个元素是时间戳: 1569895882000
- 第三个元素是要计算的数字: 200
- 3.从数据源读取数据转成元组类型: (String, Long, Long), 例: (0001, 1569895882000, 200)
- 4.添加水印,设置延迟时间10s
- 5.设置滚动窗口每10s计算一次数据
- 6.实现类WindowFunction
- 窗口中数据,元组第3个元素,计算累加和
- 收集数据元组类型: (String, Long) 例: (0001, 700)
- 计算元组第3个数据的累加和,是为了更真实的模拟窗口计算效果
- 7.版本 flink: 1.6.1, scal: 2.11.12
- 8.代码关键位置,都加入了注释
package com.peach
import java.text.SimpleDateFormat
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.collection.mutable.ArrayBuffer
object WatermarkTest {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 设置并行度
env.setParallelism(2)
val socketStream: DataStream[String] = env.socketTextStream("node03", 7777)
// 读取数据转换成元组 (key, 时间戳, 需计算的数据)
val tupleStream: DataStream[(String, Long, Long)] = socketStream.map( {
line =>
val strings: Array[String] = line.split(" ")
(strings(0), strings(1).toLong, strings(2).toLong)
})
// 添加水印
val waterStream: DataStream[(String, Long, Long)] = tupleStream
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String, Long, Long)] {
var currentMaxTimestamp = 0L
// 延迟时间设置 10s
val timeDiff = 10000L
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
override def getCurrentWatermark: Watermark = {
val watermark = new Watermark(currentMaxTimestamp - timeDiff)
watermark
}
override def extractTimestamp(element: (String, Long, Long), previousElementTimestamp: Long): Long = {
currentMaxTimestamp = Math.max(currentMaxTimestamp, element._2)
val id: Long = Thread.currentThread().getId
println("currentThreadId:" + id
+ ",key:" + element._1
+ ",eventtime:[" + element._2 + "|"
+ sdf.format(element._2) + "],currentMaxTimestamp:["
+ currentMaxTimestamp + "|" + sdf.format(currentMaxTimestamp) + "],watermark:["
+ this.getCurrentWatermark.getTimestamp + "|"
+ sdf.format(this.getCurrentWatermark.getTimestamp) + "]")
// NOTE: 返回的不是当前最大时间 currentMaxTimestamp
// 返回的是数据中的 事件时间
element._2
}
})
/*
1. 设置滚动窗口10s
2. 计算规则: 窗口内的数据,计算元组第3个元素的和
3. 打印窗口结果
*/
waterStream.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.apply(new SumWindowFunction)
.print()
env.execute("WatermarkTest")
}
}
class SumWindowFunction extends WindowFunction[(String, Long, Long), (String, Long), Tuple, TimeWindow] {
override def apply(key: Tuple, window: TimeWindow, input: Iterable[(String, Long, Long)], out: Collector[(String, Long)]): Unit = {
// 从key(Tuple)类型获取数据源中的key值
val keyStr: String = key.getField[String](0)
// 存储窗口所有数据中的时间戳数据
val timestampBuffer: ArrayBuffer[Long] = ArrayBuffer[Long]()
val ite: Iterator[(String, Long, Long)] = input.iterator
// 存储计算和的数据
var countResult: Long = 0L
// 遍历窗口所有数据
while (ite.hasNext) {
val tup2: (String, Long, Long) = ite.next()
// 逐个添加 时间戳
timestampBuffer.append(tup2._2)
// 累加元组中第3个数据的和
countResult += tup2._3
}
// 时间戳 数据 数组
val timeStampArray: Array[Long] = timestampBuffer.toArray
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
val result: StringBuffer = new StringBuffer("聚合数据的key为:" + keyStr + ",")
result.append("窗口当中数据的条数为:" + timeStampArray.length + ",")
result.append("窗口当中第一条数据为:" + sdf.format(timeStampArray.head) + ",")
result.append("窗口当中最后一条数据为:" + sdf.format(timeStampArray.last) + ",")
result.append("窗口起始时间为:" + sdf.format(window.getStart) + ",")
result.append("窗口结束时间为:" + sdf.format(window.getEnd) + ",")
result.append("!!!!!! 触发窗口计算\n")
print(result.toString)
// 收集 计算结果的元组数据 (key, 计算结果的数据)
out.collect((keyStr, countResult))
}
}
(二)准备测试,代码修改并行度为1,验证准备的数据
1.顺序发送3条数据
// 第1条数据 时间是:10:11:22 watermark: 10:11:12 待计算数据 200,
0001 1569895882000 200
// 第2条数据 时间是:10:11:25 watermark: 10:11:15 待计算数据500
0001 1569895885000 500
// 第3条数据 时间是:10:11:40 watermark: 10:11:30 触发计算
0001 1569895900000 30
打印结果如下:
// 第1条数据 watermark: 10:11:12
currentThreadId:38,key:0001,eventtime:[1569895882000|2019-10-01 10:11:22.000],currentMaxTimestamp:[1569895882000|2019-10-01 10:11:22.000],watermark:[1569895872000|2019-10-01 10:11:12.000]
// 第2条数据 watermark: 10:11:15
currentThreadId:38,key:0001,eventtime:[1569895885000|2019-10-01 10:11:25.000],currentMaxTimestamp:[1569895885000|2019-10-01 10:11:25.000],watermark:[1569895875000|2019-10-01 10:11:15.000]
// 第3条数据 watermark: 10:11:30 触发计算
currentThreadId:38,key:0001,eventtime:[1569895900000|2019-10-01 10:11:40.000],currentMaxTimestamp:[1569895900000|2019-10-01 10:11:40.000],watermark:[1569895890000|2019-10-01 10:11:30.000]
聚合数据的key为:0001,窗口当中数据的条数为:2,窗口当中第一条数据为:2019-10-01 10:11:22.000,窗口当中最后一条数据为:2019-10-01 10:11:25.000,窗口起始时间为:2019-10-01 10:11:20.000,窗口结束时间为:2019-10-01 10:11:30.000,!!!!!! 触发窗口计算
// 计算结果, 前2个数据被触发计算
(0001,700)
2.已确认 0001 1569895900000 30 可以触发窗口计算,
所以数据 0001 1569895901000 31 时间戳加1000, 时间增加1s, 也会触发窗口计算
// 第1条数据 时间是:10:11:22 watermark: 10:11:12 待计算数据 200,
0001 1569895882000 200
// 第2条数据 时间是:10:11:25 watermark: 10:11:15 待计算数据500
0001 1569895885000 500
// 第3条数据 时间是:10:11:41 watermark: 10:11:31 触发计算
0001 1569895901000 31
(三)并行度为2测试
1.顺序发送数据
// 第1条数据 时间是:10:11:22 watermark: 10:11:12 待计算数据 200,
0001 1569895882000 200
// 第2条数据 时间是:10:11:25 watermark: 10:11:15 待计算数据500
0001 1569895885000 500
// 第3条数据 时间是:10:11:40 watermark: 10:11:30 待计算数据30
0001 1569895900000 30
// 第4条数据 时间是:10:11:41 watermark: 10:11:31 待计算数据31
0001 1569895901000 31
打印结果如下:
currentThreadId:43,key:0001,eventtime:[1569895882000|2019-10-01 10:11:22.000],currentMaxTimestamp:[1569895882000|2019-10-01 10:11:22.000],watermark:[1569895872000|2019-10-01 10:11:12.000]
currentThreadId:41,key:0001,eventtime:[1569895885000|2019-10-01 10:11:25.000],currentMaxTimestamp:[1569895885000|2019-10-01 10:11:25.000],watermark:[1569895875000|2019-10-01 10:11:15.000]
currentThreadId:43,key:0001,eventtime:[1569895900000|2019-10-01 10:11:40.000],currentMaxTimestamp:[1569895900000|2019-10-01 10:11:40.000],watermark:[1569895890000|2019-10-01 10:11:30.000]
currentThreadId:41,key:0001,eventtime:[1569895901000|2019-10-01 10:11:41.000],currentMaxTimestamp:[1569895901000|2019-10-01 10:11:41.000],watermark:[1569895891000|2019-10-01 10:11:31.000]
聚合数据的key为:0001,窗口当中数据的条数为:2,窗口当中第一条数据为:2019-10-01 10:11:22.000,窗口当中最后一条数据为:2019-10-01 10:11:25.000,窗口起始时间为:2019-10-01 10:11:20.000,窗口结束时间为:2019-10-01 10:11:30.000,!!!!!! 触发窗口计算
1> (0001,700)
2.先看最后数据计算结果: 1> (0001,700)
- 和是700, 说明前2条数据200+500被计算了
- 输入第4个数据后,触发了窗口计算
3.查看currentThreadId(线程ID)有2个 43 和 41
- 第3条数据线程43, 让线程43水位线满足触发条件, 每个线程都有独立的watermark
- 第4条数据线程41, 让线程41水位线满足触发条件
- 第4条数据后,因并行度为2,全部的线程都满足触发条件,所以执行计算
(四)并行度为4测试
- 1.大胆假设
- 并行度为4,就会有4个线程开启
- 每个线程就会有独立的wartermark,就是4个watermark
- 触发计算的条件就是,所有4个线程的watermark都满足后,而且窗口有数据,就会触发计算
- 2.修改并行度为4, 运行程序验证
发送数据如下:
0001 1569895882000 200
0001 1569895885000 500
0001 1569895900000 30
0001 1569895901000 31
0001 1569895902000 32
0001 1569895903000 33
打印运行结果:
currentThreadId:49,key:0001,eventtime:[1569895882000|2019-10-01 10:11:22.000],currentMaxTimestamp:[1569895882000|2019-10-01 10:11:22.000],watermark:[1569895872000|2019-10-01 10:11:12.000]
currentThreadId:45,key:0001,eventtime:[1569895885000|2019-10-01 10:11:25.000],currentMaxTimestamp:[1569895885000|2019-10-01 10:11:25.000],watermark:[1569895875000|2019-10-01 10:11:15.000]
currentThreadId:43,key:0001,eventtime:[1569895900000|2019-10-01 10:11:40.000],currentMaxTimestamp:[1569895900000|2019-10-01 10:11:40.000],watermark:[1569895890000|2019-10-01 10:11:30.000]
currentThreadId:49,key:0001,eventtime:[1569895902000|2019-10-01 10:11:42.000],currentMaxTimestamp:[1569895902000|2019-10-01 10:11:42.000],watermark:[1569895892000|2019-10-01 10:11:32.000]
currentThreadId:41,key:0001,eventtime:[1569895901000|2019-10-01 10:11:41.000],currentMaxTimestamp:[1569895901000|2019-10-01 10:11:41.000],watermark:[1569895891000|2019-10-01 10:11:31.000]
currentThreadId:45,key:0001,eventtime:[1569895903000|2019-10-01 10:11:43.000],currentMaxTimestamp:[1569895903000|2019-10-01 10:11:43.000],watermark:[1569895893000|2019-10-01 10:11:33.000]
聚合数据的key为:0001,窗口当中数据的条数为:2,窗口当中第一条数据为:2019-10-01 10:11:25.000,窗口当中最后一条数据为:2019-10-01 10:11:22.000,窗口起始时间为:2019-10-01 10:11:20.000,窗口结束时间为:2019-10-01 10:11:30.000,!!!!!! 触发窗口计算
1> (0001,700)
(五)结论
1、窗口[window_start_time,window_end_time)中有数据存在
2、watermark时间 >= window_end_time
3、并行度设置几个,就有几个独立的watermark,所有都要满足watermark时间 >= window_end_time