Flink SQL 入门指北

1. Overview

本文主要来自官网,旨在整理处 Flink SQL 的基本语法和使用,基础向。

2. API 调用

2.1 Old Planner VS Blink Planner

  • Blink Planner 对代码生成机制做了改进、对部分算子进行了优化,提供了丰富实用的新功能,如维表 join、Top N、MiniBatch、流式去重、聚合场景的数据倾斜优化等新功能。

  • Blink Planner 的优化策略是基于公共子图的优化算法,包含了基于成本的优化(CBO)和基于规则的优化(CRO)两种策略,优化更为全面。同时,Blink Planner 支持从 catalog 中获取数据源的统计信息,这对CBO优化非常重要。

  • Blink Planner 提供了更多的内置函数,更标准的 SQL 支持,在 Flink 1.9 版本中已经完整支持 TPC-H ,对高阶的 TPC-DS 支持也计划在下一个版本实现。

Flink 1.11 已经默认使用 Blink Planner。

2.2 基本程序结构

1.创建 TableEnvironment ( old/blink planner + stream/batch )
2.创建表( tableEnv.connect 外部数据源 或者 tableEnv.fromDataStream )
3.查询表( Table API 或者 SQL )
4.输出表( table.insertInto("xxtable") 或者 table.toRetractStream[T]/toAppendStream[T])

2.3 创建 TableEnvironment

// **********************
// FLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
// or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);

// ******************
// FLINK BATCH QUERY
// ******************
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;

ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);

// **********************
// BLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);

// ******************
// BLINK BATCH QUERY
// ******************
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);

2.4 创建表

// 1.通过外部数据源创建
//数据格式:sensor_1,1547718225,22.8
tableEnv.connect(new Kafka()
      .version("0.11")
      .topic("sensor")
      .property("zookeeper.connect", "localhost:2181")
      .property("bootstrap.servers", "localhost:9092")
    )
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temperature", DataTypes.DOUBLE())
      )
      .createTemporaryTable("kafkaInputTable")


// 2.通过 datastream 转换
val table1: Table = tableEnv.fromDataStream(stream)

2.5 查询表

//table api
val sensorTable = tableEnv.from("inputTable")
val resultTable = sensorTable
.select('id, 'temperature)
.filter('id === "sensor_1")

// SQL
val resultSqlTable = tableEnv.sqlQuery(
"""
|select id, temperature
|from inputTable
|where id = 'sensor_1'
""".stripMargin)

2.6 表转流的三种输出模式

  • 追加( Append )模式

    • 只做插入操作,和外部连接起只交换插入( insert )消息
  • 撤回 ( Retract )模式

    • 表和外部连接起交换添加( Add )和撤回( Retract )消息
    • 插入操作编码为 Add 消息,删除编码为 Retract 消息,更新编码为上一条的 Retract 和下一条的 Add 消息
    • 不能定义 Key
  • 更新( Upsert )模式

    • 更新和插入都被编码为 Upsert 消息,删除编码为 Delete 消息
    • 需要定义 Key

DataStream 只支持 Append 和 Retract 模式。(toRetractStream[T] & toAppendStream[T] )
外部文件系统的流支持哪种模式取决于具体实现,比如 Kakfa 只支持 Append 模式。

2.7 输出表

tableEnv.connect(new FileSystem().path(filePath))
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temp", DataTypes.DOUBLE())
      )
      .createTemporaryTable("inputTable")

    // 转换操作
    val sensorTable: Table = tableEnv.from("inputTable")
    // 简单转换
    val resultTable: Table = sensorTable
      .select('id, 'temp)
      .filter('id === "sensor_1")

    //  聚合转换
    val aggTable: Table = sensorTable
      .groupBy('id)    
      .select('id, 'id.count as 'count)

    // 输出到外部文件系统或者 DataStream
    val outputPath = "..."

    // 注册输出表
    tableEnv.connect(new FileSystem().path(outputPath))
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("temperature", DataTypes.DOUBLE())
      )
      .createTemporaryTable("outputTable")

    //aggTable.insertInto("outputTable")  aggTable 因为有修改操作,CsvTableSink 只实现了 AppendStreamTableSink,所以无法输出到文件。
    resultTable.insertInto("outputTable")

    resultTable.toAppendStream[(String, Double)].print("result")
    // aggTable 因为有修改操作不能使用 append,需要使用 Retract
    aggTable.toRetractStream[Row].print("agg")

3 动态表

3.1 DataStream 上的关系查询

关系代数 / SQL 流处理
关系(或表)是有界(多)元组集合。 流是一个无限元组序列。
对批数据(例如关系数据库中的表)执行的查询可以访问完整的输入数据。 流式查询在启动时不能访问所有数据,必须“等待”数据流入。
批处理查询在产生固定大小的结果后终止。 流查询不断地根据接收到的记录更新其结果,并且始终不会结束。

尽管存在这些差异,但是使用关系查询和 SQL 处理流并不是不可能的。高级关系数据库系统提供了一个称为 物化视图(Materialized Views) 的特性。物化视图被定义为一条 SQL 查询,缓存查询的结果。缓存的一个常见难题是防止缓存结果过期。当其定义查询的基表被修改时,物化视图将过期。 即时视图维护(Eager View Maintenance) 是一种一旦更新了物化视图的基表就立即更新视图的技术。

3.2 动态表 & 连续查询( Continuous Query )

动态表查询流程
  1. 将流转换为动态表。
  2. 在动态表上计算一个连续查询,生成一个新的动态表。
  3. 生成的动态表被转换回流。
流转为动态表

连续查询并生成新动态表

动态表转换回流(Retract模式)

4. 窗口和时间语义

关于窗口和时间语义的介绍可以参考这篇文章。之前是在流上进行讨论的。Flink 在表上同样支持相应的逻辑。

4.1 时间语义

可以通过 DDL 方式创建两种时间语义,但是比较晦涩,这里不做举例,感兴趣可以到官网查看。

4.1.1 processing time

注意处理时间属性一定不能定义在一个已有字段上

  • 流转表时:

    // 声明一个额外的字段作为时间属性字段
    val table = tEnv.fromDataStream(stream, $"UserActionTimestamp", $"user_name", $"data", $"user_action_time".proctime)
    
  • 定义 tableSchema 时:

      .withSchema(new Schema()
        .field("id",DataTypes.STRING())
        .field("timestamp",DataTypes.BIGINT())
        .field("temperature",DataTypes.DOUBLE())
        .field("pt",DataTypes.TIMESTAMP(3)).proctime()    //将该字段定义为 processing time
      )
    

4.1.2 event time

  • 流转表时:
    // 基于 stream 中的事件产生时间戳和 watermark
    val stream: DataStream[(String, String)] = inputStream.assignTimestampsAndWatermarks(...)
    
    // 声明一个额外的逻辑字段作为事件时间属性(数据来源于上面datastream定义好的字段),必须放在 schema 最后
    val table = tEnv.fromDataStream(stream, $"user_name", $"data", $"user_action_time".rowtime)
    
    
    // Option 2:
    
    // 从第一个字段获取事件时间,并且产生 watermark
    val stream: DataStream[(Long, String, String)] = inputStream.assignTimestampsAndWatermarks(...)
    
    // 第一个字段已经用作事件时间抽取了,不用再用一个新字段来表示事件时间了
    val table = tEnv.fromDataStream(stream, $"user_action_time".rowtime, $"user_name", $"data")
    
  • 定义 tableSchema 时:
        //需要注意这种方式的 source 必须实现 DefinedRowtimeAttributes 接口。如 KafkaTableSource 实现了该接口。CsvTableSource 则没有。 
    .withSchema(new Schema()
        .field("id",DataTypes.STRING())
        .field("timestamp",DataTypes.BIGINT())
        .rowtime(
            new Rowtime()
                .timestampsFromFiled("timestamp")
                .watermarksPeriodicBounded(1000)
        )
        .field("temperature",DataTypes.DOUBLE())
      )
    

4.2 窗口操作

窗口操作相当于对数据进行分组时,除了按照字段以外,增加了新的维度进行分组,一般是时间或者数据数量。

4.2.1 Group Windows

根据时间或者行数间隔,将行聚集在有限的组中,并对每个组的数据执行一次聚合函数。最终每个组得出一个结果,类似于传统对 group by 操作

// 基本使用结构
val table = input
        .window([w:GroupWindow] as "w") //定义窗口和别名 w
        .groupBy($"w",$"a")  //以属性 a 和窗口 w 作为分组的key
        .select($"a",$"b".sum)  //聚合字段b的值,求和

tumbling window
    - .window( Tumble over 10.minutes on $"a_rowtime"/$"a_proctime" as "w")
    - .window( Tumble over 10.rows on $"a_proctime" as "w")
    - sql: tumble(ts, interval '10' second)

sliding windows 
    - .window( Slide over 10.minutes every 5.minutes on $"a_rowtime"/$"a_proctime as "w")
    - .window( Slide over 10.rows every 5.rows on $"a_proctime" as "w")
    - sql: hop(ts,interval '10' second,interval '10' second) p.s. 第二个是步长,第三个是窗口长度

session windows
    - .window( Session withGap 10.minutes on $"a_rowtime"/$"a_proctime" as "w")
    - sql: session(ts,interval '10' second)

sql 辅助函数,xx = {tumble,hop,session}:
    - xx_start(ts, interval '10' second)
    - xx_end(ts, interval '10' second)
    - xx_rowtime(ts, interval '10' second)
    - xx_proctime(ts, interval '10' second)

4.2.2 Over Windows

针对每个输入行,进行开窗,增加一列表示结果,每个行都有自己所在窗口的结果。类似于传统的 over 操作

// 基本使用结构
val table = input
        .window([w:OverWindow] as "w") 
        .select($"a",$"b".sum over $"w", $"c".min over $"w")  

无界 over window
    - .window(Over partitionBy $"a" orderBy $"rowtime/proctime" preceding UNBOUNDEN_RANGE as "w")
    - .window(Over partitionBy $"a" orderBy $"rowtime/proctime" preceding UNBOUNDEN_ROW as "w")

有界 over window
    - .window(Over partitionBy $"a" orderBy $"rowtime/proctime" preceding 1.minutes as "w")
    - .window(Over partitionBy $"a" orderBy $"rowtime/proctime" preceding 10.rows as "w")
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342