以下文章全部基于 Flink 1.14
StreamingJoinOperator.java
debug 代码可以看到 Regular Join
逻辑实现在 StreamingJoinOperator.java
(几种 Regular Join
都是 )
processElement 注释伪代码
if input record is accumulate
| if input side is outer
| | if there is no matched rows on the other side, send +I[record+null], state.add(record, 0)
| | if there are matched rows on the other side
| | | if other side is outer
| | | | if the matched num in the matched rows == 0, send -D[null+other]
| | | | if the matched num in the matched rows > 0, skip
| | | | otherState.update(other, old + 1)
| | | endif
| | | send +I[record+other]s, state.add(record, other.size)
| | endif
| endif
| if input side not outer
| | state.add(record)
| | if there is no matched rows on the other side, skip
| | if there are matched rows on the other side
| | | if other side is outer
| | | | if the matched num in the matched rows == 0, send -D[null+other]
| | | | if the matched num in the matched rows > 0, skip
| | | | otherState.update(other, old + 1)
| | | | send +I[record+other]s
| | | else
| | | | send +I/+U[record+other]s (using input RowKind)
| | | endif
| | endif
| endif
endif
if input record is retract
| state.retract(record)
| if there is no matched rows on the other side
| | if input side is outer, send -D[record+null]
| endif
| if there are matched rows on the other side, send -D[record+other]s if outer, send -D/-U[record+other]s if inner.
| | if other side is outer
| | | if the matched num in the matched rows == 0, this should never happen!
| | | if the matched num in the matched rows == 1, send +I[null+other]
| | | if the matched num in the matched rows > 1, skip
| | | otherState.update(other, old - 1)
| | endif
| endif
endif
数据
order_log
order_id | movie_id | order_timestamp |
---|---|---|
1 | 1 | 2021-12-25 00:00:00 |
2 | 2 | 2021-12-25 00:01:00 |
3 | 3 | 2021-12-25 00:02:00 |
price_log
order_id | set_price | price_timestamp |
---|---|---|
1 | 40 | 2021-12-25 00:00:01 |
3 | 80 | 2021-12-25 00:02:00 |
Left Join
Left Join Demo
INSERT INTO print
SELECT
o.order_id
,movie_id
,seat_price
,o.timestamp
FROM order_log o
LEFT JOIN price_log p ON o.order_id = p.order_id
Left Join 过程
- 左表来一条数据,与右表逐个比较进行关联
- 如果能关联上,发送
+I[left_record, matched_right_record]
,并将(left_record, matched_right_record_number)
保存到左表状态中 (以供后续 join 使用) - 如果不能关联上,发送
+I[left_record, null]
,并将(left_record, 0)
保存到左表状态中
- 如果能关联上,发送
- 右表来一条数据,与左表逐个比较进行关联,不论是否关联到,都将右表数据保存到右表状态中
- 如果能关联上
- 如果关联到左表数据的
numOfAssociations
等于0 ,则发送-D[matched_left_record, null]
,更新左表状态(matched_left_record, numOfAssociations + 1)
,发送+I[matched_left_record, right_record]
(把之前没关联到右表数据的left_record
撤回,把关联到的最新结果下发) - 如果关联到左表数据的
numOfAssociations
大于0,更新左表状态(matched_left_record, numOfAssociations + 1)
,发送+I[matched_left_record, right_record]
- 如果关联到左表数据的
- 否则什么也不做
- 如果能关联上
Inner Join
Inner Join Demo
INSERT INTO print
SELECT
o.order_id
,movie_id
,seat_price
,o.timestamp
FROM order_log o
INNER JOIN price_log p ON o.order_id = p.order_id
Inner Join 过程
- 左表来一条数据,与右边逐个比较进行关联,将左表数据保存到状态中
- 如果不能关联上,什么都不做
- 如果能关联上,发送
+I[left_record, matched_right_record]
- 右表来一条数据,与左边逐个比较进行关联,将右表数据保存到状态中,
- 如果不能关联上,什么都不做
- 如果能关联上,发送
+I[matched_left_record, reight_record]
Full Outer Join
Full Outer Join Demo
INSERT INTO print
SELECT
o.order_id
,movie_id
,seat_price
,o.timestamp
FROM order_log o
FULL OUTER JOIN price_log p ON o.order_id = p.order_id
Full Outer Join 过程
-
左表来一条数据,与右表逐个比较关联
- 如果不能关联上,发送
+I[left_record, null]
,并将(left_record, 0)
保存到左表状态中 - 如果能关联上
- 如果关联到的右表数据的
numOfAssociations
等于0,则发送-D[null, matched_right_record]
,更新右表状态(matched_right_record, numOfAssociations + 1)
,发送+I[left_record, matched_right_record]
,更新左表状态(left_record, matched_right_record_number)
- 如果关联到右表数据的
numOfAssociations
不等于0,更新右表状态(matched_right_record, numOfAssociations + 1)
,发送+I[left_record, matched_right_record]
,更新左表状态(left_record, matched_right_record_number)
- 如果关联到的右表数据的
- 如果不能关联上,发送
-
右表来一条数据,与左表逐个比较关联
- 如果不能关联上,发送
+I[null, right_record]
,并将(right_record, 0)
保存到右表状态中 - 如果能关联上
- 如果关联到的左表数据的
numOfAssociations
等于0,则发送-D[matched_left_record, null]
,更新左表状态(matched_left_record, numOfAssociations + 1)
,发送+I[matched_left_record, right_record]
,更新右表状态(right_record, matched_left_record_number)
- 如果关联到的左表数据的
numOfAssociations
不等于0,更新左表状态(matched_left_record, numOfAssociations + 1)
,发送+I[matched_left_record, right_record]
,更新右表状态(right_record, matched_left_record_number)
- 如果关联到的左表数据的
- 如果不能关联上,发送