Flink 使用介绍相关文档目录
SQL match recognize
本篇为Flink 使用之 CEP后续。如果您对CEP不了解,请先浏览Flink 使用之 CEP。
SQL的match_recognize
子句给予了SQL支持CEP的能力。下面逐个介绍match_recognize
子句的组成部分。
partition by 从句
分区基准字段,类似于DataStream API的keyby
。将业务数据划分为多个组,分别统计。该字段会作为查询结果返回。
order by 从句
排序基准字段,实际业务中通常为时间戳。
measures 从句
定义如何输出匹配的结果,类似于select语句。需要从匹配pattern中取出结果并映射字段名。
输出结果的schema为partition by基准字段 + measures中的字段。
可以使用聚合函数。CEP业务中除了常用的数学运算聚合函数外,常用的还有:
- first:返回一系列有序元素的第一个。
- last:返回一系列有序元素的最后一个。
这两个函数适用于从"X+"这种Pattern获取第一个满足X条件或最后一个满足X条件的元素。
one row per match/all rows per match
这一项为SQL的输出模式配置,用来规定每次匹配过后输出多少行。
目前Flink只支持one row per match。所以说和使用写代码CEP不同的是,SQL方式及时一次匹配到多个元素,也只能输出一个。所以说我们需要使用聚合函数进行计算。或者是在measures中,将匹配到的元素内容取出,作为输出表的字段展示(打宽)。
匹配后跳转模式
和Pattern API中匹配后跳过策略相同。
- after match skip to next row:从匹配成功的事件序列中的第一个事件的下一个事件开始进行下一次匹配
- after match skip past last row:从匹配成功的事件序列中的最后一个事件的下一个事件开始进行下一次匹配
- after match skip to first patternName:从匹配成功的事件序列中第一个对应于patternName的事件开始进行下一次匹配
- after match skip to last patternName:从匹配成功的事件序列中最后一个对应于patternName的事件开始进行下一次匹配
pattern 从句
pattern从句格式为pattern (xxx) within interval 'xxx' time_unit
。表示捕获符合xxx pattern的一系列元素,且这些元素的时间需要在within
后面的时间范围内。
Pattern从句括号内需要填写pattern变量表达式,格式类似于正则表达式。例如:
PATTERN (A B+ C* D)
表示A之后跟随1个或多个B,然后0个或多个C,最后为D。
注意:pattern(A B)的含义为A和B必须紧密相连(相当于Pattern API中的consecutive
)。
Pattern的数量限定符和正则表达式使用方式类似。我们引用官网的解释。
- * — 0 or more rows
- + — 1 or more rows
- ? — 0 or 1 rows
- { n } — exactly n rows (n > 0)
- { n, } — n or more rows (n ≥ 0)
- { n, m } — between n and m (inclusive) rows (0 ≤ n ≤ m, 0 < m)
- { , m } — between 0 and m (inclusive) rows (m > 0)
除此之外,Pattern还支持greedy和reluctant。数量限定符默认是greedy类型,表示匹配尽可能多的元素,相反,reluctant类型回去匹配尽可能少的元素。和正则表达式类似,将数量限定符从greedy转化为reluctant类型需要在限定符后加一个?
字符。
define 从句
defines从句用于为pattern表达式配置精确的匹配条件。相当于Pattern API中的where
方法。
到这里为止SQL match_recognize各个从句的功能和编写方法已经介绍完了。但是离开实际案例,我们还是不能很好的掌握具体的用法。下一节我们引入一个具体的案例。
使用示例
场景:我们有实时的机房温度监控数据,机房温度过高会触发告警。我们想知道每个机房每次告警的起止时间和平均温度等数据。
数据源配置
首先我们配置数据源。实际生产中常用的数据源是Kafka。练习环境为了配置简单,我们使用SQL从CSV文件读入数据的方式。
要读取CSV格式,首先要引入依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
然后准备编造好的temp_record.csv
数据文件。其中每列的含义分别为:机架编号,时间戳,温度。
1,2021-09-14 15:37:03.3,30
1,2021-09-14 15:37:13.3,50
1,2021-09-14 15:37:23.3,55
1,2021-09-14 15:37:33.3,60
1,2021-09-14 15:37:43.3,55
1,2021-09-14 15:37:53.3,50
1,2021-09-14 15:38:03.3,45
2,2021-09-14 15:37:03.3,30
2,2021-09-14 15:37:13.3,50
2,2021-09-14 15:37:23.3,55
2,2021-09-14 15:37:33.3,45
2,2021-09-14 15:37:43.3,55
2,2021-09-14 15:37:53.3,50
2,2021-09-14 15:38:03.3,45
注意:CSV最后必须要有空行,否则解析时候报错。
接下来根据实例数据的schema,编写create table语句并检查是否能正常读入数据。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val createTableSql =
"""
|create table temp_record (
|rack_id int,
|ts timestamp(3),
|temp int,
|WATERMARK FOR ts AS ts - INTERVAL '1' SECOND)
|with (
|'connector'='filesystem',
|'path'='/path/to/temp_record.csv',
|'format'='csv'
|)
|""".stripMargin
tEnv.executeSql(createTableSql)
tEnv.executeSql("select * from temp_record").print()
如果一切无误,我们可以看到控制台打印出了CSV文件的内容,如下所示:
+----+-------------+-------------------------+-------------+
| op | rack_id | ts | temp |
+----+-------------+-------------------------+-------------+
| +I | 1 | 2021-09-14 15:37:53.300 | 50 |
| +I | 1 | 2021-09-14 15:37:23.300 | 55 |
| +I | 1 | 2021-09-14 15:37:43.300 | 55 |
| +I | 2 | 2021-09-14 15:37:43.300 | 55 |
| +I | 1 | 2021-09-14 15:38:03.300 | 45 |
| +I | 2 | 2021-09-14 15:37:03.300 | 30 |
| +I | 1 | 2021-09-14 15:37:03.300 | 30 |
| +I | 1 | 2021-09-14 15:37:13.300 | 50 |
| +I | 2 | 2021-09-14 15:37:33.300 | 45 |
| +I | 2 | 2021-09-14 15:37:13.300 | 50 |
| +I | 1 | 2021-09-14 15:37:33.300 | 60 |
| +I | 2 | 2021-09-14 15:37:53.300 | 50 |
| +I | 2 | 2021-09-14 15:38:03.300 | 45 |
| +I | 2 | 2021-09-14 15:37:23.300 | 55 |
+----+-------------+-------------------------+-------------+
到这里数据源配置完毕。
编写业务逻辑
在这个例子中。我们假设机器告警温度为大于等于50度。我们需要查询出每个机架的高温告警开始时间,高温告警结束时间,告警起始温度,告警结束温度和告警期间平均温度。按照业务需求,我们编写SQL如下。
val cepSql =
"""
|select * from temp_record
|match_recognize(
|partition by rack_id
|order by ts
|measures
|A.ts as start_ts,
|last(B.ts) as end_ts,
|A.temp as start_temp,
|last(B.temp) as end_temp,
|avg(B.temp) as avg_temp
|one row per match
|after match skip to next row
|pattern (A B+ C) within interval '90' second
|define
|A as A.temp < 50,
|B as B.temp >= 50,
|C as C.temp < 50
|)
|""".stripMargin
tEnv.executeSql(cepSql).print()
执行结果:
+----+-------------+-------------------------+-------------------------+-------------+-------------+-------------+
| op | rack_id | start_ts | end_ts | start_temp | end_temp | avg_temp |
+----+-------------+-------------------------+-------------------------+-------------+-------------+-------------+
| +I | 2 | 2021-09-14 15:37:03.300 | 2021-09-14 15:37:23.300 | 30 | 55 | 52 |
| +I | 2 | 2021-09-14 15:37:33.300 | 2021-09-14 15:37:53.300 | 45 | 50 | 52 |
| +I | 1 | 2021-09-14 15:37:03.300 | 2021-09-14 15:37:53.300 | 30 | 50 | 54 |
+----+-------------+-------------------------+-------------------------+-------------+-------------+-------------+
SQL部分解析:
- 我们需要将不同rack_id的数据分开统计,因此需要
partition by rack_id
。 - measures中使用
last(B.ts) as end_ts
获取最后一个对应pattern B的元素, 由define子句可知,pattern B为温度大于等于50的元素。所以它的含义为高温告警期间最后一个告警的温度。 - pattern by子句和measures子句两者共同决定的输出表格的schema。
- pattern子句和define子句两者一起确定了匹配模板,含义为一个温度低于50度的元素,紧跟一个或多个温度大于等于50度的元素,然后再跟一个温度低于50度的元素。