Flink SQL高效Top-N方案的实现原理

昨天的文章里恰好用Top-N Function来举了例子,那么择日不如撞日,今天接着聊吧。

Top-N

Top-N是我们应用Flink进行业务开发时的常见场景,传统的DataStream API已经有了非常成熟的实现方案,如果换成Flink SQL,又该怎样操作?好在Flink SQL官方文档已经给出了标准答案,我们只需要照抄就行,其语法如下:

SELECT [column_list]
FROM (
   SELECT [column_list],
     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
       ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
   FROM table_name)
WHERE rownum <= N [AND conditions]

看官可能已经能够在日常工作中熟练应用这种查询风格了。那么,Flink内部是如何将它转化成高效的执行方案的呢?接下来基于最新的Flink 1.12版本稍微探究一下。

Logical Plan

使用EXPLAIN语句观察示例查询的执行计划(部分)如下:

EXPLAIN PLAN FOR SELECT * FROM (
  SELECT *,
    row_number() OVER(PARTITION BY merchandiseId ORDER BY totalQuantity DESC) AS rownum
  FROM (
    SELECT merchandiseId, sum(quantity) AS totalQuantity
    FROM rtdw_dwd.kafka_order_done_log
    GROUP BY merchandiseId
  )
) WHERE rownum <= 10

== Abstract Syntax Tree ==
LogicalProject(merchandiseId=[$0], totalQuantity=[$1], rownum=[$2])
+- LogicalFilter(condition=[<=($2, 10)])
   +- LogicalProject(merchandiseId=[$0], totalQuantity=[$1], rownum=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $1 DESC NULLS LAST)])
      +- ...

== Optimized Logical Plan ==
Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[merchandiseId], orderBy=[totalQuantity DESC], select=[merchandiseId, totalQuantity, w0$o0])
+- Exchange(distribution=[hash[merchandiseId]])
   +- ...

== Physical Execution Plan ==
Stage 1 : Data Source
    ...

    Stage 2 : Operator
        ...

        Stage 4 : Operator
            ...

            Stage 6 : Operator
                content : Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[merchandiseId], orderBy=[totalQuantity DESC], select=[merchandiseId, totalQuantity, w0$o0])
                ship_strategy : HASH

由执行计划可知,row_number() OVER(PARTITION BY ...)子句在逻辑计划阶段被优化成了名为Rank的RelNode(看官可参见Calcite的相关资料了解RelNode),可以用如下的简图说明。

负责这个优化的RelOptRule在Flink项目中名为FlinkLogicalRankRule。它将符合规则的开窗聚合操作(FlinkLogicalOverAggregate RelNode)和对排名的过滤操作(FlinkLogicalCalc RelNode)合并为FlinkLogicalRank。也就是说,只有严格符合上一节所述语法的查询才能得到优化。

FlinkLogicalRank节点会记录以下主要信息:

  • partitionKey:分组键。
  • orderKey:排序键与排序规则。
  • rankType:排名函数的类型,即ROW_NUMBER、RANK或者DENSE_RANK。
  • rankRange:排名区间(即Top-N一词中的N)。
  • strategy:Top-N结果的更新策略,目前有3种:
    • AppendFast:结果只追加,不更新;
    • Retract:类似于回撤流,结果会更新,前提是输入数据没有主键,或者主键与partitionKey不同;
    • UpdateFast:快速更新,前提是输入数据有主键,且结果单调递增/递减,还要求orderKey的排序规则与结果的单调性相反(例:ORDER BY sum(quantity) DESC)。可见它的效率最高,但是也最苛刻。
  • outputRankNumber:是否输出排名的序号,即在外层查询中是否有SELECT rownum子句。显然,如果不输出序号,在排名发生变化时可以大大减少回撤输出的数据量,降低Flink端的压力,具体可参见官方文档"No Ranking Output Optimization"一节。

Physical Plan

在流处理环境下,StreamPhysicalRankRule规则负责将FlinkLogicalRank逻辑节点转换成StreamPhysicalRankRule物理节点,并翻译成物理执行节点StreamExecRank。注意如果是分组Top-N(即有PARTITION BY子句),就会按照partitionKey的hash值分发到各个sub-task,否则会将并行度强制设为1,计算全局Top-N。另外从代码可以读出,Top-N语法目前仅支持ROW_NUMBER,暂时还不支持RANK和DENSE_RANK排名。

根据上文所述更新策略的不同,实际执行时采用的ProcessFunction也不同,如下类图所示。其中CleanupState接口表示支持空闲状态保留时间(idle state retention time)特性。

以最常用到的RetractableTopNFunction为例,当有一条累加数据到来时,处理流程可以用如下的简图来说明。

其中,dataState是MapState<RowData, List<RowData>>类型的状态,保存partitionKey与该key下面的流数据,用于容错。而treeMap是ValueState<SortedMap<RowData, Long>>类型的状态,顾名思义,它其中维护了一个TreeMap,用于计数及输出Top-N结果。至于这里为什么用了红黑树(TreeMap)而不是传统的最大/最小堆(PriorityQueue),自然是因为红黑树是对数复杂度的,相较于堆来说更适合Flink这种对时间敏感而对空间较不敏感的执行环境。

另外,我们一定要记得启用空闲状态保留时间,这样dataState和treeMap中的数据才不会永远积攒下去。不过空闲状态的清理并非确定性的,所以如果要计算有时间维度的排行榜(如按天、按小时等),需要把时间维度也加入PARTITION BY子句,而不是将保留时间设为对应的长度。

最后,在StreamExecRank中还提供了一个可配置的参数table.exec.topn.cache-size(默认值10000),即Top-N缓存的大小。如果Top-N的规模比较大,适当增加此值可以避免频繁访问状态,提高执行效率。

The End

本来想连带去重一起讲了的(毕竟去重与Top-N的实现方法类似,只是在物理计划上有区别),但是今天已经很累了,还是先回家休息吧。

家乡疫情有些严重,希望所有人都平安,百毒不侵。

国际庄加油,民那晚安晚安。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,386评论 6 479
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,939评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,851评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,953评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,971评论 5 369
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,784评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,126评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,765评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,148评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,744评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,858评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,479评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,080评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,053评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,278评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,245评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,590评论 2 343

推荐阅读更多精彩内容