1. object ExtractEquiJoinKeys
一个模式匹配,官方注释是:
A pattern that finds joins with equality conditions that can be evaluated using equi-join. Null-safe equality will be transformed into equality as joining key (replace null with default value).
那什么叫null-safe equality呢?这里有个case class EqualNullSafe,解释是:
expr1 FUNC expr2 - Returns same result as the EQUAL(=) operator for non-null operands, but returns true if both are null, false if one of the them is null.
意思也就是除了正常的值会判断相等之外,当等式左右两边都是null时候也会认为其相等,当有一边为null时候认为其不等。查看源码会发现,当两边都是null时候其实会被当作是0来处理。
源码:
/**
* A pattern that finds joins with equality conditions that can be evaluated using equi-join.
*
* Null-safe equality will be transformed into equality as joining key (replace null with default
* value).
*/
object ExtractEquiJoinKeys extends Logging with PredicateHelper {
/** (joinType, leftKeys, rightKeys, condition, leftChild, rightChild) */
type ReturnType =
(JoinType, Seq[Expression], Seq[Expression], Option[Expression], LogicalPlan, LogicalPlan)
def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
case join @ Join(left, right, joinType, condition) =>
logDebug(s"Considering join on: $condition")
// Find equi-join predicates that can be evaluated before the join, and thus can be used
// as join keys.
val predicates = condition.map(splitConjunctivePredicates).getOrElse(Nil)
val joinKeys = predicates.flatMap {
case EqualTo(l, r) if l.references.isEmpty || r.references.isEmpty => None
case EqualTo(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => Some((l, r))
case EqualTo(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => Some((r, l))
// Replace null with default value for joining key, then those rows with null in it could
// be joined together
case EqualNullSafe(l, r) if canEvaluate(l, left) && canEvaluate(r, right) =>
Some((Coalesce(Seq(l, Literal.default(l.dataType))),
Coalesce(Seq(r, Literal.default(r.dataType)))))
case EqualNullSafe(l, r) if canEvaluate(l, right) && canEvaluate(r, left) =>
Some((Coalesce(Seq(r, Literal.default(r.dataType))),
Coalesce(Seq(l, Literal.default(l.dataType)))))
case other => None
}
val otherPredicates = predicates.filterNot {
case EqualTo(l, r) if l.references.isEmpty || r.references.isEmpty => false
case EqualTo(l, r) =>
canEvaluate(l, left) && canEvaluate(r, right) ||
canEvaluate(l, right) && canEvaluate(r, left)
case other => false
}
if (joinKeys.nonEmpty) {
val (leftKeys, rightKeys) = joinKeys.unzip
logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
} else {
None
}
case _ => None
}
}
首先将join的所有conditions收集出来(如果有and则收集and.left和and.right),然后分成两个Sequence,一个是joinKeys,一个是otherPredicates,前者是对于canEvaluate()
为true的收集其(left, right),后者是除了前者收集到的之外的其他condition。那么其中的canEvaluate()
是什么,源码如下:
/**
* Returns true if `expr` can be evaluated using only the output of `plan`. This method
* can be used to determine when it is acceptable to move expression evaluation within a query
* plan.
*
* For example consider a join between two relations R(a, b) and S(c, d).
*
* - `canEvaluate(EqualTo(a,b), R)` returns `true`
* - `canEvaluate(EqualTo(a,c), R)` returns `false`
* - `canEvaluate(Literal(1), R)` returns `true` as literals CAN be evaluated on any plan
*/
protected def canEvaluate(expr: Expression, plan: LogicalPlan): Boolean =
expr.references.subsetOf(plan.outputSet)
即左边表达式的字段必须是右边的子集。
满足ExtractEquiJoinKeys
模式的case,会被应用到Join的物理策略中来。
2. object JoinSelection
源码:
/**
* Select the proper physical plan for join based on joining keys and size of logical plan.
*
* At first, uses the [[ExtractEquiJoinKeys]] pattern to find joins where at least some of the
* predicates can be evaluated by matching join keys. If found, Join implementations are chosen
* with the following precedence:
*
* - Broadcast: if one side of the join has an estimated physical size that is smaller than the
* user-configurable [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold
* or if that side has an explicit broadcast hint (e.g. the user applied the
* [[org.apache.spark.sql.functions.broadcast()]] function to a DataFrame), then that side
* of the join will be broadcasted and the other side will be streamed, with no shuffling
* performed. If both sides of the join are eligible to be broadcasted then the
* - Shuffle hash join: if the average size of a single partition is small enough to build a hash
* table.
* - Sort merge: if the matching join keys are sortable.
*
* If there is no joining keys, Join implementations are chosen with the following precedence:
* - BroadcastNestedLoopJoin: if one side of the join could be broadcasted
* - CartesianProduct: for Inner join
* - BroadcastNestedLoopJoin
*/
object JoinSelection extends Strategy with PredicateHelper {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
// --- BroadcastHashJoin --------------------------------------------------------------------
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if canBuildRight(joinType) && canBroadcast(right) =>
Seq(joins.BroadcastHashJoinExec(
leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left), planLater(right)))
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if canBuildLeft(joinType) && canBroadcast(left) =>
Seq(joins.BroadcastHashJoinExec(
leftKeys, rightKeys, joinType, BuildLeft, condition, planLater(left), planLater(right)))
// --- ShuffledHashJoin ---------------------------------------------------------------------
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if !conf.preferSortMergeJoin && canBuildRight(joinType) && canBuildLocalHashMap(right)
&& muchSmaller(right, left) ||
!RowOrdering.isOrderable(leftKeys) =>
Seq(joins.ShuffledHashJoinExec(
leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left), planLater(right)))
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if !conf.preferSortMergeJoin && canBuildLeft(joinType) && canBuildLocalHashMap(left)
&& muchSmaller(left, right) ||
!RowOrdering.isOrderable(leftKeys) =>
Seq(joins.ShuffledHashJoinExec(
leftKeys, rightKeys, joinType, BuildLeft, condition, planLater(left), planLater(right)))
// --- SortMergeJoin ------------------------------------------------------------
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if RowOrdering.isOrderable(leftKeys) =>
joins.SortMergeJoinExec(
leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil
// --- Without joining keys ------------------------------------------------------------
// Pick BroadcastNestedLoopJoin if one side could be broadcasted
case j @ logical.Join(left, right, joinType, condition)
if canBuildRight(joinType) && canBroadcast(right) =>
joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), BuildRight, joinType, condition) :: Nil
case j @ logical.Join(left, right, joinType, condition)
if canBuildLeft(joinType) && canBroadcast(left) =>
joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), BuildLeft, joinType, condition) :: Nil
// Pick CartesianProduct for InnerJoin
case logical.Join(left, right, _: InnerLike, condition) =>
joins.CartesianProductExec(planLater(left), planLater(right), condition) :: Nil
case logical.Join(left, right, joinType, condition) =>
val buildSide =
if (right.stats(conf).sizeInBytes <= left.stats(conf).sizeInBytes) {
BuildRight
} else {
BuildLeft
}
// This join could be very slow or OOM
joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
// --- Cases where this strategy does not apply ---------------------------------------------
case _ => Nil
}
}
首先看注释,该策略会首先使用ExtractEquiJoinKeys
来确定join至少有一个谓词是可以去估算的,如果有的话,就要根据这些谓词来去计算选择哪种join,这里分三种Join,广播Join,Shuffle Hash Join,还有最常见的Sort Merge Join。如果没有谓词可以估算的话,那么也是有两种方式:BroadcastNestedLoopJoin和CartesianProduct。
接下来分析源码。在满足有谓词可去估算的情况下,是如何判别是哪种Join?
2.1 Broadcast Join
涉及到两个方法,canBroadcast()
和canBuildX
(canBuildLeft或者canBuildRight)。
/**
* Matches a plan whose output should be small enough to be used in broadcast join.
*/
private def canBroadcast(plan: LogicalPlan): Boolean = {
plan.stats(conf).hints.isBroadcastable.getOrElse(false) ||
(plan.stats(conf).sizeInBytes >= 0 &&
plan.stats(conf).sizeInBytes <= conf.autoBroadcastJoinThreshold)
}
可以看到canBroadcast()
这边会去配置项里查找AUTO_BROADCASTJOIN_THRESHOLD
,这个配置为-1是不可用。
private def canBuildRight(joinType: JoinType): Boolean = joinType match {
case _: InnerLike | LeftOuter | LeftSemi | LeftAnti => true
case j: ExistenceJoin => true
case _ => false
}
private def canBuildLeft(joinType: JoinType): Boolean = joinType match {
case _: InnerLike | RightOuter => true
case _ => false
}
可以看到,canBuildRight()
和canBuildLeft()
方法的意思当以另外一边为主时候为true。
2.2 Shuffled Hash Join
涉及到canBuildLocalHashMap()
方法、muchSmaller()
方法和一个配置项PREFER_SORTMERGEJOIN
,这个配置项的解释是:
When true, prefer sort merge join over shuffle hash join.
而canBuildLocalHashMap()
方法源码是:
/**
* Matches a plan whose single partition should be small enough to build a hash table.
*
* Note: this assume that the number of partition is fixed, requires additional work if it's
* dynamic.
*/
private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = {
plan.stats(conf).sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
}
该方法涉及到了两个配置项,一个是AUTO_BROADCASTJOIN_THRESHOLD
,这个配置项在广播Join中已经有使用到,是对于查询优化非常有用的配置,另外一个是SHUFFLE_PARTITIONS
,是为了join或者aggregate进行shuffle时的分区数,不配置的话,默认200。
muchSmaller()
源码:
/**
* Returns whether plan a is much smaller (3X) than plan b.
*
* The cost to build hash map is higher than sorting, we should only build hash map on a table
* that is much smaller than other one. Since we does not have the statistic for number of rows,
* use the size of bytes here as estimation.
*/
private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = {
a.stats(conf).sizeInBytes * 3 <= b.stats(conf).sizeInBytes
}
也就是所谓的much就是3倍大小。
2.3 Sort Merge Join
收集到的join keys在数据类型上都是可以排序的情况下,可以用Sort Merge Join。
3. BroadcastHashJoinExec
case class BroadcastHashJoinExec(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: BuildSide,
condition: Option[Expression],
left: SparkPlan,
right: SparkPlan)
extends BinaryExecNode with HashJoin with CodegenSupport {
protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
val broadcastRelation = buildPlan.executeBroadcast[HashedRelation]()
streamedPlan.execute().mapPartitions { streamedIter =>
val hashed = broadcastRelation.value.asReadOnlyCopy()
TaskContext.get().taskMetrics().incPeakExecutionMemory(hashed.estimatedSize)
join(streamedIter, hashed, numOutputRows)
}
}
}
protected def join(
streamedIter: Iterator[InternalRow],
hashed: HashedRelation,
numOutputRows: SQLMetric): Iterator[InternalRow] = {
val joinedIter = joinType match {
case _: InnerLike =>
innerJoin(streamedIter, hashed)
case LeftOuter | RightOuter =>
outerJoin(streamedIter, hashed)
case LeftSemi =>
semiJoin(streamedIter, hashed)
case LeftAnti =>
antiJoin(streamedIter, hashed)
case j: ExistenceJoin =>
existenceJoin(streamedIter, hashed)
case x =>
throw new IllegalArgumentException(
s"BroadcastHashJoin should not take $x as the JoinType")
}
val resultProj = createResultProjection
joinedIter.map { r =>
numOutputRows += 1
resultProj(r)
}
}
上述两段代码,第一段是BroadcastHashJoinExec
的定义和基本方法,第二段是其继承的HashJoin
的join()
方法。在HashJoin
中,存在着一些二元对象,命名为(buildXXX, streamedXXX)
,这里没有打出来可以自行翻源码,比如(buildPlan, streamedPlan)
,那么在这里,buildXXX
是要被Hash或者要被广播的小表,streamedXXX
是大表,stream意思就是通过迭代流过去一条条处理的意思(个人理解)。
所以在这边,将buildPlan
广播出去以后,将streamedPlan
调用execute()
过后返回的RDD[InternalRow],调用mapPartitions
,根据每个分区和广播的小表进行join操作。
4. ShuffledHashJoinExec
case class ShuffledHashJoinExec(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: BuildSide,
condition: Option[Expression],
left: SparkPlan,
right: SparkPlan)
extends BinaryExecNode with HashJoin {
private def buildHashedRelation(iter: Iterator[InternalRow]): HashedRelation = {
val buildDataSize = longMetric("buildDataSize")
val buildTime = longMetric("buildTime")
val start = System.nanoTime()
val context = TaskContext.get()
val relation = HashedRelation(iter, buildKeys, taskMemoryManager = context.taskMemoryManager())
buildTime += (System.nanoTime() - start) / 1000000
buildDataSize += relation.estimatedSize
// This relation is usually used until the end of task.
context.addTaskCompletionListener(_ => relation.close())
relation
}
protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, buildIter) =>
val hashed = buildHashedRelation(buildIter)
join(streamIter, hashed, numOutputRows)
}
}
}
ShuffledHashJoin
和BroadcastJoin
在构造Hash Table上有不同,后者是依靠广播生成的HashedRelation
,前者是调用zipPartitions
方法,该方法的作用是将两个有相同分区数的RDD合并,映射参数是两个RDD的迭代器,可以看到在这里是(streamIter, buildIter)
,然后对buildIter
构造HashRelation。这也就说明:BroadcastJoin的HashRelation是小表的全部数据,而ShuffledHashJoin的HashRelation只是小表跟大表在同一分区内的一部分数据。
5. SortMergeJoinExec
case class SortMergeJoinExec(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
joinType: JoinType,
condition: Option[Expression],
left: SparkPlan,
right: SparkPlan) extends BinaryExecNode with CodegenSupport {
protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
val spillThreshold = getSpillThreshold
left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
...
...
}
}
}
可以看到,同样是将两个RDD做zipPartitions
后然后将每个partition迭代做Join。