以下文章全部基于 Flink 1.14
Interval Joins 支持 Join Type
经过自己测试支持情况如下表 (如有误,请指正)
Time Attributes | Inner Join | Left Join | Right Join | Full Outer Join |
---|---|---|---|---|
EventTime | ✔ | ✔ | ✔ | ✔ |
ProcessingTime | ✔ | ✔ | ✔ | ✔ |
Flink Sql Interval Join Demo
SELECT
o.order_id
,movie_id
,seat_price
,order_timestamp
,price_timestamp
FROM order_log o
LEFT JOIN price_log p
ON o.order_id = p.order_id
AND p.price_timestamp BETWEEN o.order_timestamp - INTERVAL '1' SECOND AND o.order_timestamp + INTERVAL '10' SECOND
p.price_timestamp BETWEEN o.order_timestamp - INTERVAL '1' SECOND AND o.order_timestamp + INTERVAL '10' SECOND
表示 price_log
表如果在 order_log
表 order_timestamp
的前一秒和后十秒区间内则关联输出
TimeIntervalJoin.java
debug 代码可以看到基于 EventTime 的 Interval Join
逻辑实现在 RowTimeIntervalJoin.java
,基于 ProcessingTime 的 Interval Join
逻辑实现在 ProcTimeIntervalJoin.java
,而这两者都继承自 TimeIntervalJoin.java
具体 join 核心逻辑在 processElement1 、processElement2 和 onTimer 方法中
TimeIntervalJoin 过程
不管是 Left Join、Right Join 还是 Full Outer Join 都是走以下过程。
processElement1 方法
- 左流数据到达之后,会先计算三个时间戳
- 根据左流数据时间
left_record_time
计算关联右流的时间区间下限right_lower
和上限right_upper
- 计算左流
left_watermark
和右流right_watermark
(在 EventTime 语义下这两个时间戳都等于当前的watermark
;在 ProcessingTime语义下都等于当前的processing_time
) - 计算右流需要过期处理的过期时间
right_expiration_time
- 如果右流过期时间
right_expiration_time
小于关联区间上限right_upper
,则遍历右流状态里的所有数据
- 如果 join on 条件为 true,则发送
+I[left_record, matched_right_record]
;否则啥也不干 - 然后如果右流数据的
right_record_time
小于等于右流过期时间right_expiration_time
(即右流这条数据永远不会被左流关联到) 并且 join type 为 Right Join 或者 Full Outer Join,则发送+I[null, reight_record]
;不管 join type 是什么类型都会清除这条右流数据
- 如果右流
right_watermark
小于关联区间上限right_upper
(说明这条左流数据还有可能被右流关联到) 则将这条左流数据放到左流状态中,并注册左流数据时间left_record_time
的定时器,用来清除过期数据 - 如果右流
right_watermark
大于关联区间上限right_upper
,并且这条左流数据未能与右流关联成功,并且 join type 为 Left Join 或者 Full Outer Join,则发送+I[left_record, null]
onTimer方法
- 遍历右流状态里的所有数据
- 如果右流数据
right_record_time
小于等于右流过期时间right_expiration_time
并且 join type 为 Right Join 或 Full Outer Join,则输出+I[null, right_record]
,并清除该数据 - 如果右流数据
right_record_time
大于右流过期时间right_expiration_time
,则找到最小的右流数据时间,并给它注册一个定时器,如果找不到的话,则清空右流状态
- 遍历左流状态里的所有数据,其他逻辑与右流相同
上述逻辑是左流数据到达的流程,如果右流数据到达也完全类似。