1. Overview
本文主要来自官网,旨在整理处 Flink SQL 的基本语法和使用,基础向。
2. API 调用
2.1 Old Planner VS Blink Planner
Blink Planner 对代码生成机制做了改进、对部分算子进行了优化,提供了丰富实用的新功能,如维表 join、Top N、MiniBatch、流式去重、聚合场景的数据倾斜优化等新功能。
Blink Planner 的优化策略是基于公共子图的优化算法,包含了基于成本的优化(CBO)和基于规则的优化(CRO)两种策略,优化更为全面。同时,Blink Planner 支持从 catalog 中获取数据源的统计信息,这对CBO优化非常重要。
Blink Planner 提供了更多的内置函数,更标准的 SQL 支持,在 Flink 1.9 版本中已经完整支持 TPC-H ,对高阶的 TPC-DS 支持也计划在下一个版本实现。
Flink 1.11 已经默认使用 Blink Planner。
2.2 基本程序结构
1.创建 TableEnvironment ( old/blink planner + stream/batch )
2.创建表( tableEnv.connect 外部数据源 或者 tableEnv.fromDataStream )
3.查询表( Table API 或者 SQL )
4.输出表( table.insertInto("xxtable") 或者 table.toRetractStream[T]/toAppendStream[T])
2.3 创建 TableEnvironment
// **********************
// FLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
// or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);
// ******************
// FLINK BATCH QUERY
// ******************
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
// **********************
// BLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);
// ******************
// BLINK BATCH QUERY
// ******************
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
2.4 创建表
// 1.通过外部数据源创建
//数据格式:sensor_1,1547718225,22.8
tableEnv.connect(new Kafka()
.version("0.11")
.topic("sensor")
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
)
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temperature", DataTypes.DOUBLE())
)
.createTemporaryTable("kafkaInputTable")
// 2.通过 datastream 转换
val table1: Table = tableEnv.fromDataStream(stream)
2.5 查询表
//table api
val sensorTable = tableEnv.from("inputTable")
val resultTable = sensorTable
.select('id, 'temperature)
.filter('id === "sensor_1")
// SQL
val resultSqlTable = tableEnv.sqlQuery(
"""
|select id, temperature
|from inputTable
|where id = 'sensor_1'
""".stripMargin)
2.6 表转流的三种输出模式
-
追加( Append )模式
- 只做插入操作,和外部连接起只交换插入( insert )消息
-
撤回 ( Retract )模式
- 表和外部连接起交换添加( Add )和撤回( Retract )消息
- 插入操作编码为 Add 消息,删除编码为 Retract 消息,更新编码为上一条的 Retract 和下一条的 Add 消息
- 不能定义 Key
-
更新( Upsert )模式
- 更新和插入都被编码为 Upsert 消息,删除编码为 Delete 消息
- 需要定义 Key
DataStream 只支持 Append 和 Retract 模式。(toRetractStream[T] & toAppendStream[T]
)
外部文件系统的流支持哪种模式取决于具体实现,比如 Kakfa 只支持 Append 模式。
2.7 输出表
tableEnv.connect(new FileSystem().path(filePath))
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temp", DataTypes.DOUBLE())
)
.createTemporaryTable("inputTable")
// 转换操作
val sensorTable: Table = tableEnv.from("inputTable")
// 简单转换
val resultTable: Table = sensorTable
.select('id, 'temp)
.filter('id === "sensor_1")
// 聚合转换
val aggTable: Table = sensorTable
.groupBy('id)
.select('id, 'id.count as 'count)
// 输出到外部文件系统或者 DataStream
val outputPath = "..."
// 注册输出表
tableEnv.connect(new FileSystem().path(outputPath))
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("temperature", DataTypes.DOUBLE())
)
.createTemporaryTable("outputTable")
//aggTable.insertInto("outputTable") aggTable 因为有修改操作,CsvTableSink 只实现了 AppendStreamTableSink,所以无法输出到文件。
resultTable.insertInto("outputTable")
resultTable.toAppendStream[(String, Double)].print("result")
// aggTable 因为有修改操作不能使用 append,需要使用 Retract
aggTable.toRetractStream[Row].print("agg")
3 动态表
3.1 DataStream 上的关系查询
关系代数 / SQL | 流处理 |
---|---|
关系(或表)是有界(多)元组集合。 | 流是一个无限元组序列。 |
对批数据(例如关系数据库中的表)执行的查询可以访问完整的输入数据。 | 流式查询在启动时不能访问所有数据,必须“等待”数据流入。 |
批处理查询在产生固定大小的结果后终止。 | 流查询不断地根据接收到的记录更新其结果,并且始终不会结束。 |
尽管存在这些差异,但是使用关系查询和 SQL 处理流并不是不可能的。高级关系数据库系统提供了一个称为 物化视图(Materialized Views) 的特性。物化视图被定义为一条 SQL 查询,缓存查询的结果。缓存的一个常见难题是防止缓存结果过期。当其定义查询的基表被修改时,物化视图将过期。 即时视图维护(Eager View Maintenance) 是一种一旦更新了物化视图的基表就立即更新视图的技术。
3.2 动态表 & 连续查询( Continuous Query )
- 将流转换为动态表。
- 在动态表上计算一个连续查询,生成一个新的动态表。
- 生成的动态表被转换回流。
4. 窗口和时间语义
关于窗口和时间语义的介绍可以参考这篇文章。之前是在流上进行讨论的。Flink 在表上同样支持相应的逻辑。
4.1 时间语义
可以通过 DDL 方式创建两种时间语义,但是比较晦涩,这里不做举例,感兴趣可以到官网查看。
4.1.1 processing time
注意处理时间属性一定不能定义在一个已有字段上
-
流转表时:
// 声明一个额外的字段作为时间属性字段 val table = tEnv.fromDataStream(stream, $"UserActionTimestamp", $"user_name", $"data", $"user_action_time".proctime)
-
定义 tableSchema 时:
.withSchema(new Schema() .field("id",DataTypes.STRING()) .field("timestamp",DataTypes.BIGINT()) .field("temperature",DataTypes.DOUBLE()) .field("pt",DataTypes.TIMESTAMP(3)).proctime() //将该字段定义为 processing time )
4.1.2 event time
- 流转表时:
// 基于 stream 中的事件产生时间戳和 watermark val stream: DataStream[(String, String)] = inputStream.assignTimestampsAndWatermarks(...) // 声明一个额外的逻辑字段作为事件时间属性(数据来源于上面datastream定义好的字段),必须放在 schema 最后 val table = tEnv.fromDataStream(stream, $"user_name", $"data", $"user_action_time".rowtime) // Option 2: // 从第一个字段获取事件时间,并且产生 watermark val stream: DataStream[(Long, String, String)] = inputStream.assignTimestampsAndWatermarks(...) // 第一个字段已经用作事件时间抽取了,不用再用一个新字段来表示事件时间了 val table = tEnv.fromDataStream(stream, $"user_action_time".rowtime, $"user_name", $"data")
- 定义 tableSchema 时:
//需要注意这种方式的 source 必须实现 DefinedRowtimeAttributes 接口。如 KafkaTableSource 实现了该接口。CsvTableSource 则没有。 .withSchema(new Schema() .field("id",DataTypes.STRING()) .field("timestamp",DataTypes.BIGINT()) .rowtime( new Rowtime() .timestampsFromFiled("timestamp") .watermarksPeriodicBounded(1000) ) .field("temperature",DataTypes.DOUBLE()) )
4.2 窗口操作
窗口操作相当于对数据进行分组时,除了按照字段以外,增加了新的维度进行分组,一般是时间或者数据数量。
4.2.1 Group Windows
根据时间或者行数间隔,将行聚集在有限的组中,并对每个组的数据执行一次聚合函数。最终每个组得出一个结果,类似于传统对 group by 操作
// 基本使用结构
val table = input
.window([w:GroupWindow] as "w") //定义窗口和别名 w
.groupBy($"w",$"a") //以属性 a 和窗口 w 作为分组的key
.select($"a",$"b".sum) //聚合字段b的值,求和
tumbling window
- .window( Tumble over 10.minutes on $"a_rowtime"/$"a_proctime" as "w")
- .window( Tumble over 10.rows on $"a_proctime" as "w")
- sql: tumble(ts, interval '10' second)
sliding windows
- .window( Slide over 10.minutes every 5.minutes on $"a_rowtime"/$"a_proctime as "w")
- .window( Slide over 10.rows every 5.rows on $"a_proctime" as "w")
- sql: hop(ts,interval '10' second,interval '10' second) p.s. 第二个是步长,第三个是窗口长度
session windows
- .window( Session withGap 10.minutes on $"a_rowtime"/$"a_proctime" as "w")
- sql: session(ts,interval '10' second)
sql 辅助函数,xx = {tumble,hop,session}:
- xx_start(ts, interval '10' second)
- xx_end(ts, interval '10' second)
- xx_rowtime(ts, interval '10' second)
- xx_proctime(ts, interval '10' second)
4.2.2 Over Windows
针对每个输入行,进行开窗,增加一列表示结果,每个行都有自己所在窗口的结果。类似于传统的 over 操作
// 基本使用结构
val table = input
.window([w:OverWindow] as "w")
.select($"a",$"b".sum over $"w", $"c".min over $"w")
无界 over window
- .window(Over partitionBy $"a" orderBy $"rowtime/proctime" preceding UNBOUNDEN_RANGE as "w")
- .window(Over partitionBy $"a" orderBy $"rowtime/proctime" preceding UNBOUNDEN_ROW as "w")
有界 over window
- .window(Over partitionBy $"a" orderBy $"rowtime/proctime" preceding 1.minutes as "w")
- .window(Over partitionBy $"a" orderBy $"rowtime/proctime" preceding 10.rows as "w")