一、Join 逻辑计划生成
和 Join 相关的逻辑层的优化规则主要包含以下几种:
ReorderJoin
EliminateOuterJoin
以及
中和 Join 相关的 predicate pushDown
二、Join 物理计划生成和选取
2.1、基本概念
在 Spark SQL 中,参与 Join 操作的两张表分别被称为流式表(StreamTable)和构件表(BuildTable),不同表的角色在 Spark SQL 中会通过一定的策略进行设定。通常来讲,系统会将大表设置为 StreamTable,小表设置为 BuildTable。流式表的迭代器为 streamIter,构建表的迭代器为 buildIter。遍历 streamIter 的每一条记录,然后在 buildIter 中查找匹配的记录。这个查找过程称为 build 过程。每次 build 操作的结果为一条 JoinedRow(A, B)
,其中 A 来自 streamedIter,B 来自 buildIter。
再例如,在 BroadcastHashJoin 中需要决定广播哪个数据表。这里的 BuildSide 可以简单理解为 “构建的一边”。
在 Spark 中,BuildSide 作为一个抽象类,包含 BuildLeft 和 BuildRight 两个子类,一般在构造 Join 的执行算子时,都会传入一个 BuildSide 的构造参数。在 JoinSelection 中通过 canBuildRight
和 canBuildLeft
判断一个 Join 类型能否 “构建” 右表和左表。
2.2、物理计划选取顺序
Join 物理执行计划的选取在 JoinSelection 中进行,其主要逻辑如下:
如果是一个等值 join(equi-join)且包含 join hint,我们依次查看 join hint:
-
broadcast hint
:如果 join 类型支持,使用 broadcast hash join。如果 left 和 right 都有 broadcast hint,选择 size 较小的一侧(基于统计数据)进行 broadcast -
sort merge hint
:如果 join keys 是可排序的,使用 sort merge join。 -
shuffle hash hint
:如果 join 类型支持,如果 left 和 right 都设置了 shuffle hash hints,选择 size 较小的一侧作为 build side -
shuffle replicate NL hint
:如果 join type 为 inner like,使用 cartesian product join(笛卡尔积)
JoinSelection 通过 ExtractEquiJoinKeys
来判断是否为等值 Join 并提取相关信息:
如果没有指定 hint 或 hint 不适用,Join 选择顺序如下:
- 尝试选用 broadcast hash join:如果 join 类型支持,且 join 的一侧 size 足够小能够 broadcast。如果都足够小,选择更小的一侧进行 broadcast(基于统计数据)
- 尝试选用 shuffle hash join:如果 join 类型支持,且 join 的一侧 size 足够小能够构建 local hash map,且该侧 size 显著小于另一侧,且
spark.sql.join.preferSortMergeJoin
为 false - 尝试选用 sort merge join:如果 join keys 是可排序的
- 尝试选用笛卡尔积:如果是
inner like join
- 尝试选用 broadcast nested loop join:最后的兜底手段,可能会 OOM,如果这里 OOM 了,也没办法了
2.3、等值 Join 情况
注①:
createJoinWithoutHint 如下: