flink sql 知其所以然(十五):改了改源码,实现了个 batch lookup join(附源码)

1.序篇

flink sql 知其所以然(十四):维表 join 的性能优化之路(上)附源码

书接上回,上节说到了博主发现由于在 flink sql 中 lookup join 访问外部维表存在的性能问题。

由此诞生了一个想法,以 Redis 维表为例,Redis 支持 pipeline 批量访问模式,因此 flink sql lookup join 能不能按照 DataStream 方式一样,先攒一批数据 ,然后使用 Redis pipeline 批量访问外部存储。博主亲切的将这个功能称为 flink sql batch lookup join,本节就是讲述博主基于 flink 源码对此功能的实现。

废话不多说,咱们先直接上本文的目录和结论,小伙伴可以先看结论快速了解博主期望本文能给小伙伴们带来什么帮助:

  1. 直接来一个实战案例:博主以曝光用户日志流关联用户画像(年龄、性别)维表为例介绍 batch lookup join 具有的基本能力(怎么配置参数,怎么写 sql,最终效果咋样)。
  2. batch lookup join:主要介绍 batch lookup join 的功能是从 flink transformation 出发,确定要 batch lookup join 涉及改动的地方以及其实现思路、原理。也会教给大家一些改动源码来实现自己想要的一些功能的思路。
  3. 总结及展望:目前的 batch lookup join 实现其实不符合 sql 的原始语义,后续大家可以按照 sql 标准自己做一些实现

2.来一个实战案例

2.1.预期的输入、输出数据

来看看在具体场景下,对应输入值的输出值应该长啥样。

需求指标:使用曝光用户日志流(show_log)关联用户画像维表(user_profile)关联到用户的画像(性别,年龄段)数据。

来一波输入数据:

曝光用户日志流(show_log)数据(数据存储在 kafka 中):

log_id timestamp user_id
1 2021-11-01 00:01:03 a
2 2021-11-01 00:03:00 b
3 2021-11-01 00:05:00 c
4 2021-11-01 00:06:00 b
5 2021-11-01 00:07:00 c

用户画像维表(user_profile)数据(数据存储在 redis 中):

user_id(主键) age sex
a 12-18
b 18-24
c 18-24

注意:redis 中的数据结构存储是按照 key,value 去存储的。其中 key 为 user_id,value 为 age,sex 的 json。如下图所示:

[图片上传中...(image-9cec6d-1640484034347-18)]

<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">user_profile redis</figcaption>

预期输出数据如下:

log_id timestamp user_id age sex
1 2021-11-01 00:01:03 a 12-18
2 2021-11-01 00:03:00 b 18-24
3 2021-11-01 00:05:00 c 18-24
4 2021-11-01 00:06:00 b 18-24
5 2021-11-01 00:07:00 c 18-24

2.2.batch lookup join sql 代码

batch lookup join sql 代码和原来的 lookup join sql 代码一模一样。如下 sql。

CREATE TABLE show_log (    log_id BIGINT,    `timestamp` as cast(CURRENT_TIMESTAMP as timestamp(3)),    user_id STRING,    proctime AS PROCTIME())WITH (  'connector' = 'datagen',  'rows-per-second' = '10',  'fields.user_id.length' = '1',  'fields.log_id.min' = '1',  'fields.log_id.max' = '10');CREATE TABLE user_profile (    user_id STRING,    age STRING,    sex STRING    ) WITH (  'connector' = 'redis',  'hostname' = '127.0.0.1',  'port' = '6379',  'format' = 'json',  'lookup.cache.max-rows' = '500',  'lookup.cache.ttl' = '3600',  'lookup.max-retries' = '1');CREATE TABLE sink_table (    log_id BIGINT,    `timestamp` TIMESTAMP(3),    user_id STRING,    proctime TIMESTAMP(3),    age STRING,    sex STRING) WITH (  'connector' = 'print');-- lookup join 的 query 逻辑INSERT INTO sink_tableSELECT     s.log_id as log_id    , s.`timestamp` as `timestamp`    , s.user_id as user_id    , s.proctime as proctime    , u.sex as sex    , u.age as ageFROM show_log AS sLEFT JOIN user_profile FOR SYSTEM_TIME AS OF s.proctime AS uON s.user_id = u.user_id

可以看到 lookup join 和 batch lookup join 的代码是完全相同的,唯一的不同之处在于,batch lookup join 需要设置 table config 参数,如下图所示:

image.gif

<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">table config</figcaption>

2.2.batch lookup join 效果

将原生 lookup join 和 batch lookup join 的效果做个对比:

原生的 lookup join:每输入一条数据,访问外部维表获取到结果输出一条数据,如下图所示。

[图片上传中...(image-d0c11f-1640484034346-16)]

<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">lookup join</figcaption>

博主实现的 batch lookup join:是每攒够 30 条数据或者每 5s(防止数据量少的情况下,长时间不输出数据) 就利用 redis pipeline 能力访问外部存储一次。然后批量输出结果,如下图所示。大大提高了吞吐。

[图片上传中...(image-8ca2a3-1640484034346-15)]

<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">batch lookup join</figcaption>

3.batch lookup join 实现

3.1.怎么知道应该改哪部分源码?

博主将通过下面几个问题去交给大家怎么改源码去实现自己的功能。

  1. 改源码的有哪些比较好的思路?
  • 结论:首先就是参考类似模块的实现(不会写,但是我会抄啊!),比如本文要实现 batch lookup join,必然要参考原生的 lookup join 去实现。
  1. 大家在改 flink 源码时,因为 flink 源码的模块太多了,项目非常庞大,往往第一步碰到的问题不是怎么去实现这个功能,而是应该在什么地方去改才能实现!
  • 结论:一个 flink 的任务(DataStream\Table\SQL)所有的精华精华精华都集中在 transformation 中!!!只要是涉及到算子实现的东西,小伙伴萌就可以到 transformation 中去寻找。可以将断点打在每一个 operator 的构造器或者 open 方法中就可以看到其实在哪一步构造和初始化的。这样就能顺着调用栈往前回溯而确定要改哪部分代码了。

3.2.lookup join 原理

3.2.1.transformation

在实现 batch lookup join 之前,当然要从原生的 lookup join 的实现开始入手,看看 flink 官方大大是怎么实现的,具体 transformation 如下图所示:

图片

<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">transformation</figcaption>

具体的实现逻辑承载在 org.apache.flink.streaming.api.operators.ProcessOperatororg.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner 中。

3.2.2.LookupJoinRunner

LookupJoinRunner 中的数据处理逻辑集中在 processElement 中。

[图片上传中...(image-f60084-1640484034346-13)]

<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">LookupJoinRunner</figcaption>

可以看到上图,LookupJoinRunner 又内嵌了一层 fetcher 来实现具体的 lookup 逻辑。

  1. 其中 fetcher:就是根据 flink sql lookup join 逻辑生成的 lookup join 的代码实例;
  2. 其中 collector:collector 的主要功能就是将原始数据 RowDatalookup 到的 RowData 的数据合并为 JoinedRowData 结果,然后输出。

接下来详细看看 fetcher 和 collector。

3.2.3.fetcher

[图片上传中...(image-f56c5a-1640484034346-12)]

<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">transformation fetcher</figcaption>

把这个 fetcher 的代码 copy 出来瞅瞅。

[图片上传中...(image-55a820-1640484034346-11)]

<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">fetcher</figcaption>

fetcher 内嵌了 RedisRowDataLookupFunction 来作为最终访问外部维表的函数。

3.2.4.RedisRowDataLookupFunction

访问 redis 获取到数据。

[图片上传中...(image-993279-1640484034346-10)]

<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">RedisRowDataLookupFunction</figcaption>

3.2.5.collector

[图片上传中...(image-d159f8-1640484034346-9)]

<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">transformation collector</figcaption>

把这个 collector 的代码 copy 出来瞅瞅。

[图片上传中...(image-dbe0b1-1640484034346-8)]

<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">collector</figcaption>

3.3.lookup join 算子实现调用链

是不是感觉一个 lookup join 的调用链贼复杂。

因为 batch lookup join 是完全参考 lookup join 去实现的,所以接下来博主介绍一下整体的调用链关系,这就会方便后续设计 batch lookup join 实现方案的时候去确定具体修改哪一部分代码。

[图片上传中...(image-86993-1640484034346-7)]

<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">调用链</figcaption>

整体的调用逻辑如下:

  1. ProcessOpeartor原始 RowData 传给 LookupJoinRunner
  2. LookupJoinRunner原始 RowData 传给根据 sql 代码生成的 fetcher
  3. fetcher 中把 原始 RowData 传给 RedisRowDataLookupFunction 然后去 lookup 维表,lookup 到的结果数据为 lookup RowData
  4. collector原始 RowDatalookup RowData 数据合并为 JoinedRowData 然后输出。

3.4.batch lookup join 设计思路

还是一样,先看看设计思路最终的结论,batch lookup join 算子调用链设计如下:

[图片上传中...(image-7758ad-1640484034346-6)]

<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">batch lookup 调用链</figcaption>

详细说明一下设计思路:

  1. 如果想做到批量访问外部存储(Redis)的数据。可以推断出 RedisRowDataLookupFunction 的输入需要是 List<原始 RowData> ,输出需要是 List<lookup RowData>。其中输入数据输入到 RedisRowDataLookupFunction 中后,使用 Redis pipeline 去批量访问外部存储,然后把结果 List<lookup RowData> 输出。
  2. RedisRowDataLookupFunction 的输出数据为 List<lookup RowData> 推断出 collector 输入数据格式必然是 List<原始 RowData>。由于在 lookup join 中 collector 的逻辑就是将 原始 RowDatalookup RowData 合并为 JoinedRowData,将结果输出。因此 collector 这里就是将 List<原始 RowData>List<lookup RowData> 进行遍历合并,一条一条的输出 JoinedRowData
  3. 同样 RedisRowDataLookupFunction 的输入数据是 fetcher 传入的,则推断出 fetcher 输入数据格式必然是 List<原始 RowData>
  4. 由于 fetcher 输入是 List<原始 RowData>,则 LookupJoinRunner 输出到 fetcher 的数据也需要是 List<原始 RowData>。但是 ProcessOpeartor 只能传给 LookupJoinRunner 原始 RowData,因此可以得出我们的每攒 30 条数据或者每隔 5s 的逻辑就能确定需要在 LookupJoinRunner 中做了。

思路有了,那么 batch lookup join 涉及到的改动项也就能确认了。

  1. 新建一个 BatchLookupJoinRunner:实现攒批逻辑(每攒 30 条数据或者每隔 5s),其中攒批的数据放在 ListState 中,以防止丢失,在 table config 中的 is.dim.batch.mode 设置为 true 时使用此 BatchLookupJoinRunner
  2. 代码生成的 fetcher:将原来输入的 原始 RowData 改为 List<原始 RowData>
  3. 新建一个 RedisRowDataBatchLookupFunction:实现将输入的批量数据 List<原始 RowData> 拿到之后使用 redis pipeline 批量访问外部存储,获取到 List<lookup RowData> 结果数据给 collector
  4. 代码生成的 collector:将原来 lookup join 中的输入 原始 RowDatalookup RowData 改为 List<原始 RowData>List<lookup RowData>,添加遍历循环 List<原始 RowData>List<lookup RowData>,按顺序合并 List 中的每一项 原始 RowDatalookup RowData 输出 JoinedRowData 的逻辑。

3.5.batch lookup join 的最终效果

3.5.1.transformation

可以看到 is.dim.batch.mode 设置为 true 时,transformation 如下。transformation 中的重点处理逻辑就是 BatchLookupJoinRunner

[图片上传中...(image-93eb81-1640484034346-5)]

<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">batch transformation</figcaption>

3.5.2.BatchLookupJoinRunner

[图片上传中...(image-4de508-1640484034346-4)]

<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">BatchLookupJoinRunner</figcaption>

3.5.3.fetcher

sql 生成的 fetcher 代码如下:

[图片上传中...(image-7bad16-1640484034346-3)]

<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">fetcher</figcaption>

3.5.4.RedisRowDataBatchLookupFunction

RedisRowDataBatchLookupFunction 拿到输入的 List 数据,调用 Redis pipeline 批量访问外部存储。

[图片上传中...(image-489d9d-1640484034346-2)]

<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">RedisRowDataBatchLookupFunction</figcaption>

3.5.5.collector

sql 生成的 collector 代码如下:

[图片上传中...(image-3575fb-1640484034345-1)]

<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">collector</figcaption>

3.6.待改进项

目前上述方案实现的不足之处如下:

  1. batch 的执行逻辑与 sql 原始的语义不一致。因为从 sql 上看是完全没有这种 batch lookup join 的语义的。
  2. 其中每 5s博主简单实现了下,完全基于数据驱动的每 5s 攒一批,不是基于 onTimer 驱动的。可能会出现来了一条数据之后,5 min 内都没有来数据,则数据就不输出了。
  3. 没有考虑实现代码的抽象,以实现功能为主,所以很多基于源码的改动都是直接 copy 出来了另一个方法实现。

4.xdm 怎么使用这个功能?

  1. git clone https://github.com/yangyichao-mango/flink/tree/release-1.13.2

  2. 在 clone 下来的项目的中,重新把下面两个模块 install (mvn clean install) 到本地仓库中。

    [图片上传中...(image-db4556-1640484034343-0)]

  3. 然后在你的项目中引用两个 blink 包即可使用。使用方法就是只需要把 table config 的 is.dim.batch.mode 设置为 true,代码还按照 lookup join 的方式写即可。

4.总结与展望

本文主要介绍了 flink sql batch lookup join 的使用方式,并介绍了其实现思路以及效果,主要内容如下:

  1. 直接来一个实战案例:博主以曝光用户日志流关联用户画像(年龄、性别)维表为例介绍 batch lookup join 具有的基本能力(怎么配置参数,怎么写 sql,最终效果咋样)。
  2. batch lookup join:主要介绍 batch lookup join 的功能是从 flink transformation 出发,确定要 batch lookup join 涉及改动的地方以及其实现思路、原理。也会教给大家一些改动源码来实现自己想要的一些功能的思路。
  3. 总结及展望:目前的 batch lookup join 实现其实不符合 sql 的原始语义,后续大家可以按照 sql 标准自己做一些实现
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,378评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,356评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,702评论 0 342
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,259评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,263评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,036评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,349评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,979评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,469评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,938评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,059评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,703评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,257评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,262评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,485评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,501评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,792评论 2 345

推荐阅读更多精彩内容