Flink SQL Query 语法(一)

主要引用官方文档 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/overview/

SELECT 语句和 VALUES 语句需要使用 TableEnvironment 的 sqlQuery() 方法加以指定,会以 Table 的形式返回 SELECT (或 VALUE)的查询结果。Table 可被用于 SQL 或 Table API 查询、转换为 DataSet 或 DataStream、输出到 TableSink。SQL 与 Table API 的查询可以进行无缝融合、整体优化。

为了可以在 SQL 查询中访问到表,需要先在 TableEnvironment 中注册表(可以通过 TableSource、Table、CREATE TABLE 语句、DataStream 或 DataSet 注册)。为方便起见 Table.toString() 将会在其 TableEnvironment 中以唯一的名称自动注册表,并返回名称。

注意: 查询若包括了不支持的 SQL 特性,将会抛出 TableException

指定查询

以下示例显示如何在已注册和内联表上指定 SQL 查询。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// 从外部数据源获取一个 DataStream
DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);

// 查询一个未注册的 Table
Table table = tableEnv.fromDataStream(ds, $("user"), $("product"), $("amount"));
Table result = tableEnv.sqlQuery(
  "SELECT SUM(amount) FROM " + table + " WHERE product LIKE '%Rubber%'");

// 查询一个注册的 Table
tableEnv.createTemporaryView("Orders", ds, $("user"), $("product"), $("amount"));

// 执行 sqlQuery() 返回 Table 对象
Table result2 = tableEnv.sqlQuery(
  "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");

final Schema schema = new Schema()
    .field("product", DataTypes.STRING())
    .field("amount", DataTypes.INT());

// 创建并注册 TableSink
tableEnv.connect(new FileSystem().path("/path/to/file"))
    .withFormat(...)
    .withSchema(schema)
    .createTemporaryTable("RubberOrders");

// 调用 executeSql() 执行 INSERT SQL,查询结果写入 TableSink
tableEnv.executeSql(
  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");

执行查询

SELECT 语句或者 VALUES 语句可以通过 TableEnvironment.executeSql() 方法来执行,该方法返回 TableResult 对象用于包装查询的结果,一个 Table 对象可以通过 Table.execute() 方法执行获取查询结果。TableResult.collect() 方法返回一个可以关闭的行迭代器(除非所有的数据都被收集到本地,否则一个查询作业永远不会结束。所以通过 CloseableIterator#close() 方法主动地关闭作业以防止资源泄露)。 还可以通过 TableResult.print() 方法将查询结果打印到控制台。TableResult 中的结果数据只能被访问一次,因此一个 TableResult 实例中,collect() 方法和 print() 方法不能被同时使用。

TableResult.collect()TableResult.print() 的行为在不同的 checkpointing 模式下略有不同。

  • 对于批作业或没有配置任何 checkpointing 的流作业,TableResult.collect()TableResult.print() 既不保证 Exactly-once、也不保证 At-least-once。查询结果在产生后可被客户端即刻访问,但作业失败和重启时将会报错。
  • 对于配置了 Exactly-once checkpointing 的流作业,TableResult.collect()TableResult.print() 保证 Exactly-once。一条结果数据只有在其关联的 checkpointing 完成后才能在客户端被访问。
  • 对于配置了 At-least-once checkpointing 的流作业,TableResult.collect()TableResult.print() 保证 At-least-once。查询结果在产生后可被客户端即刻访问,同一条结果可能被多次传递给客户端。

语法

Flink 通过支持标准 ANSI SQL的 Apache Calcite 解析 SQL。以下“BNF-语法”描述了批处理和流处理查询中所支持的 SQL 特性的超集。

query:
  values
  | {
      select
      | selectWithoutFrom
      | query UNION [ ALL ] query
      | query EXCEPT query
      | query INTERSECT query
    }
    [ ORDER BY orderItem [, orderItem ]* ]
    [ LIMIT { count | ALL } ]
    [ OFFSET start { ROW | ROWS } ]
    [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]

orderItem:
  expression [ ASC | DESC ]

select:
  SELECT [ ALL | DISTINCT ]
  { * | projectItem [, projectItem ]* }
  FROM tableExpression
  [ WHERE booleanExpression ]
  [ GROUP BY { groupItem [, groupItem ]* } ]
  [ HAVING booleanExpression ]
  [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]

selectWithoutFrom:
  SELECT [ ALL | DISTINCT ]
  { * | projectItem [, projectItem ]* }

projectItem:
  expression [ [ AS ] columnAlias ]
  | tableAlias . *

tableExpression:
  tableReference [, tableReference ]*
  | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]

joinCondition:
  ON booleanExpression
  | USING '(' column [, column ]* ')'

tableReference:
  tablePrimary
  [ matchRecognize ]
  [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]

tablePrimary:
  [ TABLE ] tablePath [ dynamicTableOptions ] [systemTimePeriod] [[AS] correlationName]
  | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
  | UNNEST '(' expression ')'

tablePath:
  [ [ catalogName . ] schemaName . ] tableName

systemTimePeriod:
  FOR SYSTEM_TIME AS OF dateTimeExpression

dynamicTableOptions:
  /*+ OPTIONS(key=val [, key=val]*) */

key:
  stringLiteral

val:
  stringLiteral

values:
  VALUES expression [, expression ]*

groupItem:
  expression
  | '(' ')'
  | '(' expression [, expression ]* ')'
  | CUBE '(' expression [, expression ]* ')'
  | ROLLUP '(' expression [, expression ]* ')'
  | GROUPING SETS '(' groupItem [, groupItem ]* ')'

windowRef:
    windowName
  | windowSpec

windowSpec:
    [ windowName ]
    '('
    [ ORDER BY orderItem [, orderItem ]* ]
    [ PARTITION BY expression [, expression ]* ]
    [
        RANGE numericOrIntervalExpression {PRECEDING}
      | ROWS numericExpression {PRECEDING}
    ]
    ')'

matchRecognize:
      MATCH_RECOGNIZE '('
      [ PARTITION BY expression [, expression ]* ]
      [ ORDER BY orderItem [, orderItem ]* ]
      [ MEASURES measureColumn [, measureColumn ]* ]
      [ ONE ROW PER MATCH ]
      [ AFTER MATCH
            ( SKIP TO NEXT ROW
            | SKIP PAST LAST ROW
            | SKIP TO FIRST variable
            | SKIP TO LAST variable
            | SKIP TO variable )
      ]
      PATTERN '(' pattern ')'
      [ WITHIN intervalLiteral ]
      DEFINE variable AS condition [, variable AS condition ]*
      ')'

measureColumn:
      expression AS alias

pattern:
      patternTerm [ '|' patternTerm ]*

patternTerm:
      patternFactor [ patternFactor ]*

patternFactor:
      variable [ patternQuantifier ]

patternQuantifier:
      '*'
  |   '*?'
  |   '+'
  |   '+?'
  |   '?'
  |   '??'
  |   '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?']
  |   '{' repeat '}'

Flink SQL 对于标识符(表、属性、函数名)的命名策略类似于 Java 的词法约定:

  • 标识符大小写敏感

  • 通过反引号,可以允许标识符带有非字母的字符

    • SELECT a AS `my field` FROM t
      

字符串文本常量需要被单引号包起来(如 SELECT 'Hello World' )。两个单引号表示转义(如 SELECT 'It''s me.')。字符串文本常量支持 Unicode 字符,如需明确使用 Unicode 编码,请使用以下语法:

  • 使用反斜杠(\)作为转义字符(默认):SELECT U&'\263A'
  • 使用自定义的转义字符: SELECT U&'#263A' UESCAPE '#'

操作符

WITH

WITH 提供了编写辅助语句的方法,以便在更大的查询中使用。这些语句通常被称为公共表表达式(Common Table Expression,CTE),可以认为它定义了只存在于一个查询中的临时视图。

WITH 语法:

WITH <with_item_definition> [ , ... ]
SELECT ... FROM ...;

<with_item_defintion>:
    with_item_name (column_name[, ...n]) AS ( <select_query> )

下面的示例定义了一个 CTE:orders_with_total,并在 GROUP BY 查询中使用它。

WITH orders_with_total AS (
    SELECT order_id, price + tax AS total
    FROM Orders
)

SELECT order_id, SUM(total)
FROM orders_with_total
GROUP BY order_id;

SELECT & WHERE

SELECT 语句的一般语法为:

SELECT select_list FROM table_expression [ WHERE boolean_expression ]

table_expression 可以是任何数据源(表、视图、VALUES 子句、多个表的 Join 结果、子查询)。下面的事例读取 Orders 表的所有列:

SELECT * FROM Orders

select_list 指定 * 表示解析所有的列,但是不建议在生产环境中使用,会降低性能,建议只查询需要的列:

SELECT order_id, price + tax FROM Orders

查询可以使用 VALUES 子句,每个元组(Tuple)对应一个 Row,并且可以设置别名:

SELECT order_id, price FROM (VALUES (1, 2.0), (2, 3.1))  AS t (order_id, price)

WHERE 语句可以过滤 Row:

SELECT price + tax FROM Orders WHERE id = 10

可以对每行数据的指定列调用函数(内置、自定义函数,自定义函数必须提前注册):

SELECT PRETTY_PRINT(order_id) FROM Orders

SELECT DISTINCT

如果指定 SELECT DISTINCT,则将从结果集中删除重复行(每组重复中保留一行)。

SELECT DISTINCT id FROM Orders

对于流式查询,计算查询结果所需的状态(State)可能会无限增长。状态大小取决于不同行的数量。可<u>以为查询配置适当的状态生存时间(TTL),以防止状态大小过大。这可能会影响查询结果的正确性</u>。

Windowing TVF(1.13)

Window 是流处理的核心。Windows 将流拆分为有限大小的片段应用计算。只有流处理支持。

Flink 1.13 提供了几个 Table-valued functions(TVF,区别于 Group Window Function),将表中的元素划分为 windows,包括:

- 滚动窗口(Tumbling windows)

- 滑动窗口(Hop, Sliding windows)

- 累加窗口(Cumulate windows)

- 会话窗口(Session windows,TVF 暂不支持)

每个元素在逻辑上可以属于多个窗口,具体取决于所使用的窗口函数。TVF 必须和聚合操作一起使用:

假设存在一个 Bid

Flink SQL> desc Bid;
+-------------+------------------------+------+-----+--------+---------------------------------+
|        name |                   type | null | key | extras |                       watermark |
+-------------+------------------------+------+-----+--------+---------------------------------+
|     bidtime | TIMESTAMP(3) *ROWTIME* | true |     |        | `bidtime` - INTERVAL '1' SECOND |
|       price |         DECIMAL(10, 2) | true |     |        |                                 |
|        item |                 STRING | true |     |        |                                 |
+-------------+------------------------+------+-----+--------+---------------------------------+

Flink SQL> SELECT * FROM Bid;
+------------------+-------+------+
|          bidtime | price | item |
+------------------+-------+------+
| 2020-04-15 08:05 |  4.00 | C    |
| 2020-04-15 08:07 |  2.00 | A    |
| 2020-04-15 08:09 |  5.00 | D    |
| 2020-04-15 08:11 |  3.00 | B    |
| 2020-04-15 08:13 |  1.00 | E    |
| 2020-04-15 08:17 |  6.00 | F    |
+------------------+-------+------+

滚动窗口(Tumbling windows)

指定一个固定大小的窗口,并且不重叠,语法:

TUMBLE(TABLE data, DESCRIPTOR(timecol), size)

-- data: 表名,表需要有时间属性字段
-- timecol: 表中的时间属性字段,用于划分窗口
-- size: 窗口大小

设定一个10分钟大小的滚动窗口,

SELECT window_start, window_end, SUM(price)
FROM TABLE(
    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;
  
+------------------+------------------+-------+
|     window_start |       window_end | price |
+------------------+------------------+-------+
| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
+------------------+------------------+-------+

滑动窗口(Hop, Sliding windows)

指定一个固定大小的窗口,设定滑动间隔,元素会被指定给多个窗口,语法:

HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])

-- data: 表名,表需要有时间属性字段
-- timecol: 表中的时间属性字段,用于划分窗口
-- size: 窗口大小
-- slide:窗口滑动的大小

设定一个10分钟大小,每5分钟滑动的窗口,

SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;
  
  
+------------------+------------------+-------+
|     window_start |       window_end | price |
+------------------+------------------+-------+
| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
| 2020-04-15 08:05 | 2020-04-15 08:15 | 15.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
| 2020-04-15 08:15 | 2020-04-15 08:25 |  6.00 |
+------------------+------------------+-------+

累加窗口(Cumulate windows)

指定一个窗口的最大规模,按照指定时间间隔增长累加,直到达到窗口的最大规模,每次窗口增长会进行一次计算,可以理解为多次计算的滚动窗口,语法:

CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)

-- data: 表名,表需要有时间属性字段
-- timecol: 表中的时间属性字段,用于划分窗口
-- size: 窗口最大大小
-- step:窗口增长大小

设定一个10分钟大小,每2分钟累计一次的窗口,

SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;

    
+------------------+------------------+-------+
|     window_start |       window_end | price |
+------------------+------------------+-------+
| 2020-04-15 08:00 | 2020-04-15 08:06 |  4.00 |
| 2020-04-15 08:00 | 2020-04-15 08:08 |  6.00 |
| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
| 2020-04-15 08:10 | 2020-04-15 08:12 |  3.00 |
| 2020-04-15 08:10 | 2020-04-15 08:14 |  4.00 |
| 2020-04-15 08:10 | 2020-04-15 08:16 |  4.00 |
| 2020-04-15 08:10 | 2020-04-15 08:18 | 10.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
+------------------+------------------+-------+
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,132评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,802评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,566评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,858评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,867评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,695评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,064评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,705评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,915评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,677评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,796评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,432评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,041评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,992评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,223评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,185评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,535评论 2 343

推荐阅读更多精彩内容