一个作业,多个TTL-Flink SQL 细粒度TTL配置的实现(一)

(转自我的微信公众号 KAMI说 )

Flink 是当前最流行的分布式计算框架,其提供的 Table API 和 SQL 特性,使得开发者可以通过成熟,直观、简洁、表达力强的标准 SQL 描述计算逻辑,大大减少其学习、开发和维护成本。

Flink SQL 支持面向无边界输入流的流处理。然而。聚合统计、窗口统计等计算是有状态的。在流处理中,若这些状态数据随时间不断堆积、不断膨胀,会导致因为OOM频繁发生导致的作业崩溃、重启。

从 Flink 1.6 版本开始,社区引入了状态 TTL(Time-To-Live) 特性。在通过Flink SQL 实现流处理时,开发者可以为作业 SQL 设置TTL,实现过期状态的自动清理,从而防止作业状态无限膨胀。

然而,目前Flink SQL 只支持粗粒度的TTL设置,即一段 SQL 只能设置一个TTL。在一些常见的应用场景中,这不足够。

下面是一段计算DAU指标的 SQL 代码

SELECT
     t_date
   , COUNT(DISTINCT user_id) AS cnt_login
   , COUNT(DISTINCT CASE WHEN t_date = t_debut THEN user_id END) AS cnt_new
 FROM 
   (
     SELECT
        t_date
      , user_id
      , MIN(t_date) OVER (
              PARTITION BY user_id
              ORDER BY proctime
              ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
        ) AS t_debut
     FROM Login
 ) AS t
 GROUP BY t_date

这段SQL的业务意义很直观,就是计算实时每日登陆用户和新增登陆用户。

  • 第一层的窗口统计,计算每个用户有史以来最小的登陆日期,即其新增日期
  • 第二层的聚合统计,按天进行聚合,计算每天的登陆用户数和新增用户数

然而,在TTL的设置上,我们面临两难状况:

  • 不设置TTL。那么在第二层按天进行的聚合统计,COUNT DISTINCT计算带来的状态会随着天数近乎线性增长,状态会不断膨胀,带来OOM等一系列问题
  • 设置TTL,例如 n 天未访问的状态自动清理。那么在第一层的窗口统计,n天不活跃的用户的登陆日期状态就可能被清除,导致其后续再次登录时被误判为新增

要解决这个矛盾,我们实际上需要 Flink SQL 提供 TTL 的细粒度配置,即为一段SQL设置多个 TTL :

  • 第一层的窗口统计不设置TTL,所有用户的登陆日期状态永久保留
  • 第二层的聚合统计设置 n 天的 TTL,保证其状态不会无限增长

下面给大家介绍,如何实现Flink SQL的细粒度 TTL 配置。

大家都知道,在 Flink 中,通过 Table API 和 SQL 实现的流处理逻辑,最终会翻译为基于 DataStream API 实现的 DataStream 作业,返回这个作业输出的 DataStream (writeToSink 本质上也是先得到 DataStream 作业,再为其输出 DataStream 加上一个 DataStreamSink) 。
从一段 SQL 到 DataStream 作业,其过程简单描述如下:

  1. 在 TableEnvironment,即“表环境”,将数据源注册为动态表。例如,通过表环境的接口registerDataStream, 作为源的DataStream,即数据流, 在表环境注册为动态表
  2. 通过表环境的接口 sqlQuery,将 SQL 构造为 Table 对象
  3. 通过toAppendStream/toRetractedStream接口,即翻译接口,将 Table 对象表达的作业逻辑,翻译为 DataStream 作业。
从SQL到 DataStream 作业

在调用翻译接口,将 Table 对象翻译为 DataStream 作业时,通过翻译接口传入的 TTL 配置,递归传递到各个计算节点的翻译、构造逻辑里,使得翻译出来的 DataStream 算子的内部状态按照该 TTL 配置及时清理。

如果我们将上述计算DAU的SQL拆分成两段,前者作为一个中间结果,提供给后者调用。

SQL1:

    SELECT
       t_date
     , user_id
     , MIN(t_date) OVER (
             PARTITION BY user_id
             ORDER BY proctime
             ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
       ) AS t_debut
    FROM Login

SQL2:

  SELECT
      t_date
    , COUNT(DISTINCT user_id) AS cnt_login
    , COUNT(DISTINCT CASE WHEN t_date = t_debut THEN user_id END) AS cnt_new
  FROM t_middle
  GROUP BY t_date

从第一段 SQL 构建对应 Table 对象,再调用翻译接口,翻译成 DataStream 作业,其输出数据流为 s_middle。其可以使用 Row 作为流数据类型,各个字段的名称和类型可以通过 Table 对象的 Schema获得。显然,这个 DataStream 作业是原来完整DAU计算 DataStream 作业的一部分,其输出为一个中间结果。

然后,将这个中间结果数据流 s_middle 在表环境重新注册为动态表 t_middle ,各个字段的名称和类型可以通过 Table 对象的 Schema获得。这是第二段 SQL 需要调用的中间结果动态表。

最后,从第二段 SQL 构建对应 Table 对象,再调用翻译接口,加上 n 天的 TTL 配置,翻译成 DataStream 作业。显然,这个 DataStream 作业是原来完整DAU计算 DataStream 作业的另外一部分,其输出为完整的 DAU 计算结果。

显然,第一段 SQL 对应的计算节点,其状态 TTL 为永不过期。第二段 SQL 对应的计算节点,其状态 TTL 为 n 天后过期!TTL的细粒度配置实现!

归纳一下,如果要给 Flink SQL 设置细粒度TTL配置,我们只需要:
1. 将原来一段 SQL 代码,按照不同的TTL,改写为前后依赖的多个子 SQL。
2. 对于每个子 SQL,若不是最下游的,进行“翻译-重注册”:
a. 加上对应的 TTL 配置,翻译为 DataStream 作业,得到其输出数据流,其中,流数据类型使用 Row,各个字段的名称和类型可以通过 Table 对象的 Schema获得
b. 将中间结果数据流在表环境重新注册,表名为下游子SQL调用的表名,各个字段的名称和类型可以通过 Table 对象的 Schema获得
3. 最后一个子 SQL,加上对应的 TTL 配置,翻译成 DataStream 作业,其输出数据流即为完整计算的输出。


Flink SQL 细粒度TTL配置的实现

需要注意的是,处理时间(Process-Time)和事件时间(Event-Time)字段,对应的数据类型在Flink Table API & SQL 的包 flink-table 中是私有的,在外部访问会出错。
所以,在“翻译-重注册”过程中,需要特殊处理时间和事件时间字段:

  1. 通过 Table 对象的 Schema 找出时间特性字段,然后通过 Table.select 方法,剔除时间特性字段,再翻译成 DataStream 作业,得到中间结果数据流。
  2. 为中间结果数据流重新构造时间特性字段,在重注册为动态表时,按照原字段名重新声明。

总结一下,整个细粒度TTL配置的实现过程实施:

  1. 按 TTL 的不同,将 SQL 拆解为多个子 SQL
  2. 对每个子 SQL 进行“翻译-重注册”,包括时间特性字段的处理
  3. 最后一个子 SQL 完成翻译,得到的 DataStream 作业的输出便是完整计算逻辑的输出

细心的读者会发现,如果中间的计算过程包含聚合计算,翻译出的 DataStream 作业的输出数据流只能是带撤回标志位的数据流(简称撤回流)DataStream<Tuple<Boolean, Row>>,无法直接重注册到表环境中。上述的方法无法应用于有多层 TTL 配置不一样的聚合操作的 Flink SQL 中。

也就是说,要实现所有场景下的 Flink SQL 的细粒度 TTL 配置,我们必须实现撤回流注册为动态表这一特性。

本系列文的第二篇《Flink SQL 细粒度TTL配置的实现(二)》将给大家介绍具体的实现方法,需要对Flink Table API & SQL 的包 flink-table 的源码进行一点修改,尽情期待。

扫描下方二维码关注我的公众号“KAMI说”,有更多精彩原创内容哦~

KAMI说
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,098评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,213评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,960评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,519评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,512评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,533评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,914评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,804评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,563评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,644评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,350评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,933评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,908评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,146评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,847评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,361评论 2 342

推荐阅读更多精彩内容