Spark SQL是如何选择join策略的?

前言

我们都知道,Spark SQL上主要有三种实现join的策略,分别是Broadcast hash join、Shuffle hash join、Sort merge join。join的方法在之前写的这篇文章里已经做过了简要的介绍。不过笔者还没说过Catalyst是依据什么样的规则来选择join策略的,本文来简单补个漏。

Catalyst在由优化的逻辑计划生成物理计划的过程中,会根据org.apache.spark.sql.execution.SparkStrategies类中JoinSelection对象提供的规则按顺序确定join的执行方式。不过在此之前,需要先来看看三个基本的判断条件。

join判断条件

build table侧的选择

Hash join过程的第一步就是根据两表之中较小的那一个构建哈希表,这个小表就叫做build table。相应地,大表叫做probe table,因为需要拿小表形成的哈希表来“探测”它。对应代码如下。

    private def canBuildRight(joinType: JoinType): Boolean = joinType match {
      case _: InnerLike | LeftOuter | LeftSemi | LeftAnti | _: ExistenceJoin => true
      case _ => false
    }

    private def canBuildLeft(joinType: JoinType): Boolean = joinType match {
      case _: InnerLike | RightOuter => true
      case _ => false
    }

可见,只有当join类型为inner-like(包含inner join与cross join两种)或right outer join时,左表才有可能作为build table。而在join类型为inner-like或者left outer/semi/anti join时,右表有可能作为build table。

顺便复习一下各种join类型的语义,用Venn图表示如下。

表如何被广播

如果有某个表的大小小于spark.sql.autoBroadcastJoinThreshold参数规定的值(默认值是10MB,可修改),那么它会被自动广播出去。对应代码如下。

    private def canBroadcast(plan: LogicalPlan): Boolean = {
      plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= conf.autoBroadcastJoinThreshold
    }

    private def canBroadcastBySizes(joinType: JoinType, left: LogicalPlan, right: LogicalPlan)
      : Boolean = {
      val buildLeft = canBuildLeft(joinType) && canBroadcast(left)
      val buildRight = canBuildRight(joinType) && canBroadcast(right)
      buildLeft || buildRight
    }

除了上述阈值之外,Spark SQL还允许在语句里使用broadcast hint(即/* +BROADCAST(t) */)来手动指定要广播的表,判断逻辑如下所示。

    private def canBroadcastByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan)
      : Boolean = {
      val buildLeft = canBuildLeft(joinType) && left.stats.hints.broadcast
      val buildRight = canBuildRight(joinType) && right.stats.hints.broadcast
      buildLeft || buildRight
    }

根据阈值和根据hint广播表的方法如下,逻辑简单,不再废话了。

    private def broadcastSideBySizes(joinType: JoinType, left: LogicalPlan, right: LogicalPlan)
      : BuildSide = {
      val buildLeft = canBuildLeft(joinType) && canBroadcast(left)
      val buildRight = canBuildRight(joinType) && canBroadcast(right)
      broadcastSide(buildLeft, buildRight, left, right)
    }

    private def broadcastSideByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan)
      : BuildSide = {
      val buildLeft = canBuildLeft(joinType) && left.stats.hints.broadcast
      val buildRight = canBuildRight(joinType) && right.stats.hints.broadcast
      broadcastSide(buildLeft, buildRight, left, right)
    }

这两个方法最终都调用了broadcastSide()方法确定应该广播哪个表。

    private def broadcastSide(
        canBuildLeft: Boolean,
        canBuildRight: Boolean,
        left: LogicalPlan,
        right: LogicalPlan): BuildSide = {
      def smallerSide =
        if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight else BuildLeft

      if (canBuildRight && canBuildLeft) {
        // Broadcast smaller side base on its estimated physical size
        // if both sides have broadcast hint
        smallerSide
      } else if (canBuildRight) {
        BuildRight
      } else if (canBuildLeft) {
        BuildLeft
      } else {
        // for the last default broadcast nested loop join
        smallerSide
      }
    }

该方法先根据表的统计信息找出左表和右表中size较小的那个,如果左表和右表都能或者都不能作为build table,就将较小的表广播。否则,先判断右表是否可作为build table,可行的话优先广播右表,再判断左表。可见,broadcast hint只能表示用户广播表的偏好,实际执行时未必会按照broadcast hint指定的表来

是否可构造本地HashMap

Shuffle hash join过程中,如果数据量不大,就可以用本地哈希表保存Shuffle中间结果,提高效率。当逻辑计划的数据量小于广播阈值与Shuffle分区数的乘积,即小于spark.sql.autoBroadcastJoinThreshold * spark.sql.shuffle.partitions时,说明单个分区的数据量足够小,可以安全地构造本地HashMap。

    private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = {
      plan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
    }

join策略选择

这部分源码都位于JoinSelection对象的apply()方法中。重要的话再说一次,策略的选择会按照效率从高到低的优先级来排。

Broadcast hash join

      // broadcast hints were specified
      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
        if canBroadcastByHints(joinType, left, right) =>
        val buildSide = broadcastSideByHints(joinType, left, right)
        Seq(joins.BroadcastHashJoinExec(
          leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))

      // broadcast hints were not specified, so need to infer it from size and configuration.
      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
        if canBroadcastBySizes(joinType, left, right) =>
        val buildSide = broadcastSideBySizes(joinType, left, right)
        Seq(joins.BroadcastHashJoinExec(
          leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))

可见是先根据broadcast hint来判断,其次是广播阈值。判断的条件在上一节已经说完了哈。

Shuffle hash join

      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
         if !conf.preferSortMergeJoin && canBuildRight(joinType) && canBuildLocalHashMap(right)
           && muchSmaller(right, left) ||
           !RowOrdering.isOrderable(leftKeys) =>
        Seq(joins.ShuffledHashJoinExec(
          leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left), planLater(right)))

      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
         if !conf.preferSortMergeJoin && canBuildLeft(joinType) && canBuildLocalHashMap(left)
           && muchSmaller(left, right) ||
           !RowOrdering.isOrderable(leftKeys) =>
        Seq(joins.ShuffledHashJoinExec(
          leftKeys, rightKeys, joinType, BuildLeft, condition, planLater(left), planLater(right)))

选择Shuffle hash join策略的条件比较严苛,大前提是不优先采用Sort merge join,即spark.sql.join.preferSortMergeJoin配置项为false。与Broadcast hash join相同地,Shuffle hash join也是先检查右表,后检查左表。以右表为例,还需要满足以下3个条件:

  • 右表能够作为build table;
  • 能够从右表构建本地HashMap;
  • 右表的数据量比左表小很多(即muchSmaller()方法),“很多”在代码中规定为3倍。

除去上述情况外,如果参与join的表的key无法被排序(即根本无法使用Sort merge join),那么也会fallback到Shuffle hash join策略。

Sort merge join

      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
        if RowOrdering.isOrderable(leftKeys) =>
        joins.SortMergeJoinExec(
          leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil

如果上面两种策略都不符合,并且参与join的key是可以排序的话,就会采取Sort merge join。这个要求不高,所以Spark SQL中非小表的join都会采用此策略。

Non equi-join

      // Pick BroadcastNestedLoopJoin if one side could be broadcast
      case j @ logical.Join(left, right, joinType, condition)
          if canBroadcastByHints(joinType, left, right) =>
        val buildSide = broadcastSideByHints(joinType, left, right)
        joins.BroadcastNestedLoopJoinExec(
          planLater(left), planLater(right), buildSide, joinType, condition) :: Nil

      case j @ logical.Join(left, right, joinType, condition)
          if canBroadcastBySizes(joinType, left, right) =>
        val buildSide = broadcastSideBySizes(joinType, left, right)
        joins.BroadcastNestedLoopJoinExec(
          planLater(left), planLater(right), buildSide, joinType, condition) :: Nil

      // Pick CartesianProduct for InnerJoin
      case logical.Join(left, right, _: InnerLike, condition) =>
        joins.CartesianProductExec(planLater(left), planLater(right), condition) :: Nil

      case logical.Join(left, right, joinType, condition) =>
        val buildSide = broadcastSide(
          left.stats.hints.broadcast, right.stats.hints.broadcast, left, right)
        // This join could be very slow or OOM
        joins.BroadcastNestedLoopJoinExec(
          planLater(left), planLater(right), buildSide, joinType, condition) :: Nil

上面的三种经典情况都是equi-join,即等值连接。如果是非等值连接,它们就无能为力了。这时会先检查表是否可以被广播,如果可以,会使用Broadcast nested loop join策略。顾名思义,它的本质是Nested loop join(即之前讲过的二重循环扫描+比对),不过加上了广播build table而已。

如果表不能被广播,又分为两种情况:若join类型是inner join或者cross join的话,会直接将两表做笛卡尔积。若上述情况全部不满足,最后的方案是选择两个表中数据量较小的那个广播,即回到Broadcast nested loop join策略。可以预见,这两种情况的效率都是非常低的,要尽量避免。

The End

民那晚安晚安。

最近新冠疫情的输入病例有反弹,还是做好防护吧。

祝身体健康,百毒不侵。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,214评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,307评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,543评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,221评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,224评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,007评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,313评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,956评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,441评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,925评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,018评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,685评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,234评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,240评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,464评论 1 261
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,467评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,762评论 2 345

推荐阅读更多精彩内容