Spark2.1和2.2 SQL物理执行策略关键源码分析

1. 文章开始之前

先附上一句SQL,使用tpc-ds的表结构,我们围绕这句SQL讲。

  • SQL:

SQL> select
avg(cs_ext_discount_amt)
from
catalog_sales, date_dim
where
d_date between '1999-02-22'
and
cast('1999-05-22' as date)
and
d_date_sk = cs_sold_date_sk
group by cs_sold_date_sk;

  • 逻辑计划:
Aggregate [cs_sold_date_sk#24], [cast((avg(UnscaledValue(cs_ext_discount_amt#46)) / 100.0) as decimal(11,6)) AS avg(cs_ext_discount_amt)#149]
+- Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
   +- Join Inner, (d_date_sk#58 = cs_sold_date_sk#24)
      :- Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
      :  +- Filter isnotnull(cs_sold_date_sk#24)
      :     +- Relation[cs_sold_date_sk#24,cs_sold_time_sk#25,cs_ship_date_sk#26,cs_bill_customer_sk#27,cs_bill_cdemo_sk#28,cs_bill_hdemo_sk#29,cs_bill_addr_sk#30,cs_ship_customer_sk#31,cs_ship_cdemo_sk#32,cs_ship_hdemo_sk#33,cs_ship_addr_sk#34,cs_call_center_sk#35,cs_catalog_page_sk#36,cs_ship_mode_sk#37,cs_warehouse_sk#38,cs_item_sk#39,cs_promo_sk#40,cs_order_number#41,cs_quantity#42,cs_wholesale_cost#43,cs_list_price#44,cs_sales_price#45,cs_ext_discount_amt#46,cs_ext_sales_price#47,... 10 more fields]
      +- Project [d_date_sk#58]
         +- Filter (((isnotnull(d_date#60) && (cast(d_date#60 as string) >= 1999-02-22)) && (d_date#60 <= 10733)) && isnotnull(d_date_sk#58))
            +- Relation[d_date_sk#58,d_date_id#59,d_date#60,d_month_seq#61,d_week_seq#62,d_quarter_seq#63,d_year#64,d_dow#65,d_moy#66,d_dom#67,d_qoy#68,d_fy_year#69,d_fy_quarter_seq#70,d_fy_week_seq#71,d_day_name#72,d_quarter_name#73,d_holiday#74,d_weekend#75,d_following_holiday#76,d_first_dom#77,d_last_dom#78,d_same_day_ly#79,d_same_day_lq#80,d_current_day#81,... 4 more fields]

2. 物理计划源码分析

2.1 物理策略

def strategies: Seq[Strategy] =
      extraStrategies ++ (
      FileSourceStrategy ::
      DataSourceStrategy ::
      DDLStrategy ::
      SpecialLimits ::
      Aggregation ::
      JoinSelection ::
      InMemoryScans ::
      BasicOperators :: Nil)

其中,extraStrategies是提供给外部人员可以自己添加的策略。调用这些strategies的代码如下:

// Collect physical plan candidates.
val candidates = strategies.iterator.flatMap(_(plan))

将strategies逐个去应用在逻辑计划上,然后做flat操作,返回一个PhysicalPlan的iterator。那么每个策略什么作用?

2.1.1 FileSourceStrategy

一个针对Hadoop文件系统做的策略,当执行计划的底层Relation是HadoopFsRelation时会调用到,用来扫描文件。

2.1.2 DataSourceStrategy

Spark针对DataSource预定义了四种scan接口,TableScanPrunedScanPrunedFilteredScanCatalystScan(其中CatalystScan是unstable的,也是不常用的),如果开发者(用户)自己实现的DataSource是实现了这四种接口之一的,在scan到执行计划的底层Relation时,就会调用来扫描文件。

2.1.3 DDLStrategy(2.2中已经消失了,2.1中有)

会在create table的时候调用,因为后续版本不会存在,所以不做解释。

2.1.4 SpecialLimits

在Spark SQL中加limit n时候回调用到(如果不指定,Spark 默认也会limit 20),在源码中,会给每种case的limit节点的子节点使用PlanLater,这是个很神奇的东西下文会讲到。

2.1.5 Aggregation

顾名思义,执行聚合函数的策略。

2.1.6 JoinSelection

执行join的策略。Join的执行策略也同样分BroadcastJoin(也就是MapSideJoin),和ShuffledJoin,这个之后的文章会展开讲。

2.1.7 InMemoryScans

当数据在内存中被缓存过,就会用到该策略。

2.1.8 BasicOperators

一些基本操作的执行策略,如flatMap,sort,project等,但是实际上大都是给这些节点的子节点套上一个PlanLater

2.2 PlanLater

Spark SQL物理计划里一个非常重要的概念。字面意思很好理解,就是之后再计划。那么经过以上策略逐个去执行以后,原来的逻辑计划会变成什么样呢?

ReturnAnswer
+- GlobalLimit 21
   +- LocalLimit 21
      +- PlanLater Aggregate [cs_sold_date_sk#24], [cast((avg(UnscaledValue(cs_ext_discount_amt#46)) / 100.0) as decimal(11,6)) AS avg(cs_ext_discount_amt)#149]
         , Aggregate [cs_sold_date_sk#24], [cast((avg(UnscaledValue(cs_ext_discount_amt#46)) / 100.0) as decimal(11,6)) AS avg(cs_ext_discount_amt)#149]
         +- PlanLater Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
            , Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
            +- PlanLater Join Inner, (d_date_sk#58 = cs_sold_date_sk#24)
               :- PlanLater Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
                  , Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
               :  +- Filter isnotnull(cs_sold_date_sk#24)
               :     +- Relation[cs_sold_date_sk#24,cs_sold_time_sk#25,cs_ship_date_sk#26,cs_bill_customer_sk#27,cs_bill_cdemo_sk#28,cs_bill_hdemo_sk#29,cs_bill_addr_sk#30,cs_ship_customer_sk#31,cs_ship_cdemo_sk#32,cs_ship_hdemo_sk#33,cs_ship_addr_sk#34,cs_call_center_sk#35,cs_catalog_page_sk#36,cs_ship_mode_sk#37,cs_warehouse_sk#38,cs_item_sk#39,cs_promo_sk#40,cs_order_number#41,cs_quantity#42,cs_wholesale_cost#43,cs_list_price#44,cs_sales_price#45,cs_ext_discount_amt#46,cs_ext_sales_price#47,... 10 more fields]
               +- PlanLater Project [d_date_sk#58]
                  , Project [d_date_sk#58]
                  +- Filter (((isnotnull(d_date#60) && (cast(d_date#60 as string) >= 1999-02-22)) && (d_date#60 <= 10733)) && isnotnull(d_date_sk#58))
                     +- Relation[d_date_sk#58,d_date_id#59,d_date#60,d_month_seq#61,d_week_seq#62,d_quarter_seq#63,d_year#64,d_dow#65,d_moy#66,d_dom#67,d_qoy#68,d_fy_year#69,d_fy_quarter_seq#70,d_fy_week_seq#71,d_day_name#72,d_quarter_name#73,d_holiday#74,d_weekend#75,d_following_holiday#76,d_first_dom#77,d_last_dom#78,d_same_day_ly#79,d_same_day_lq#80,d_current_day#81,... 4 more fields]

有什么差别呢?主要有二:

    1. 顶层多了个ReturnAnswerLimit节点
    1. AggregateProjectJoin节点都用了PlanLater

(其实Filter节点也是可以用PlanLater的,但是由于逻辑计划已经将Filter下推至底部,所以最底部的Project->Filter->Relation的三层节点是可以直接调用一个策略去执行的,因此只需要三层节点的最上层也就是Project节点使用PlanLater即可。)

言归正传,语法树顶部多了ReturnAnswerLimit节点,很容易理解,Limit是Spark SQL默认限制行数,ReturnAnswer是将结果返回。那么加的PlanLater有什么作用?我的理解是,将物理计划分割成一段段,每一段物理计划会有其对应策略来执行。具体源码如下:

  def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = {
    // Obviously a lot to do here still...

    // Collect physical plan candidates.
    val candidates = strategies.iterator.flatMap(_(plan))

    // The candidates may contain placeholders marked as [[planLater]],
    // so try to replace them by their child plans.
    val plans = candidates.flatMap { candidate =>
      val placeholders = collectPlaceholders(candidate)

      if (placeholders.isEmpty) {
        // Take the candidate as is because it does not contain placeholders.
        Iterator(candidate)
      } else {
        // Plan the logical plan marked as [[planLater]] and replace the placeholders.
        placeholders.iterator.foldLeft(Iterator(candidate)) {
          case (candidatesWithPlaceholders, (placeholder, logicalPlan)) =>
            // Plan the logical plan for the placeholder.
            val childPlans = this.plan(logicalPlan)

            candidatesWithPlaceholders.flatMap { candidateWithPlaceholders =>
              childPlans.map { childPlan =>
                // Replace the placeholder by the child plan
                candidateWithPlaceholders.transformUp {
                  case p if p == placeholder => childPlan
                }
              }
            }
        }
      }
    }

    val pruned = prunePlans(plans)
    assert(pruned.hasNext, s"No plan for $plan")
    pruned
  }

可以看到,经过策略迭代器和flat过后的candidates候选计划们(一般来说只有一个,是最顶层的planLater),然后收集placeholder(其实就是planlater),这个时候对placeholders进行迭代,并对每个placeholder的child plan递归调用plan方法。举例文章这句SQL,递归调用plan方法,得到每个placeholder及其child plan节点(也就是 case (candidatesWithPlaceholders, (placeholder, logicalPlan))这句话的placeholder和logicalPlan两个变量)如下:

placeholder:
PlanLater Aggregate [cs_sold_date_sk#24], [cast((avg(UnscaledValue(cs_ext_discount_amt#46)) / 100.0) as decimal(11,6)) AS avg(cs_ext_discount_amt)#149]

logicalPlan:
Aggregate [cs_sold_date_sk#24], [cast((avg(UnscaledValue(cs_ext_discount_amt#46)) / 100.0) as decimal(11,6)) AS avg(cs_ext_discount_amt)#149]
+- Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
   +- Join Inner, (d_date_sk#58 = cs_sold_date_sk#24)
      :- Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
      :  +- Filter isnotnull(cs_sold_date_sk#24)
      :     +- Relation[cs_sold_date_sk#24,cs_sold_time_sk#25,cs_ship_date_sk#26,cs_bill_customer_sk#27,cs_bill_cdemo_sk#28,cs_bill_hdemo_sk#29,cs_bill_addr_sk#30,cs_ship_customer_sk#31,cs_ship_cdemo_sk#32,cs_ship_hdemo_sk#33,cs_ship_addr_sk#34,cs_call_center_sk#35,cs_catalog_page_sk#36,cs_ship_mode_sk#37,cs_warehouse_sk#38,cs_item_sk#39,cs_promo_sk#40,cs_order_number#41,cs_quantity#42,cs_wholesale_cost#43,cs_list_price#44,cs_sales_price#45,cs_ext_discount_amt#46,cs_ext_sales_price#47,... 10 more fields]
      +- Project [d_date_sk#58]
         +- Filter (((isnotnull(d_date#60) && (cast(d_date#60 as string) >= 1999-02-22)) && (d_date#60 <= 10733)) && isnotnull(d_date_sk#58))
            +- Relation[d_date_sk#58,d_date_id#59,d_date#60,d_month_seq#61,d_week_seq#62,d_quarter_seq#63,d_year#64,d_dow#65,d_moy#66,d_dom#67,d_qoy#68,d_fy_year#69,d_fy_quarter_seq#70,d_fy_week_seq#71,d_day_name#72,d_quarter_name#73,d_holiday#74,d_weekend#75,d_following_holiday#76,d_first_dom#77,d_last_dom#78,d_same_day_ly#79,d_same_day_lq#80,d_current_day#81,... 4 more fields]
placeholder:
PlanLater Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]

logicalPlan:
Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
+- Join Inner, (d_date_sk#58 = cs_sold_date_sk#24)
   :- Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
   :  +- Filter isnotnull(cs_sold_date_sk#24)
   :     +- Relation[cs_sold_date_sk#24,cs_sold_time_sk#25,cs_ship_date_sk#26,cs_bill_customer_sk#27,cs_bill_cdemo_sk#28,cs_bill_hdemo_sk#29,cs_bill_addr_sk#30,cs_ship_customer_sk#31,cs_ship_cdemo_sk#32,cs_ship_hdemo_sk#33,cs_ship_addr_sk#34,cs_call_center_sk#35,cs_catalog_page_sk#36,cs_ship_mode_sk#37,cs_warehouse_sk#38,cs_item_sk#39,cs_promo_sk#40,cs_order_number#41,cs_quantity#42,cs_wholesale_cost#43,cs_list_price#44,cs_sales_price#45,cs_ext_discount_amt#46,cs_ext_sales_price#47,... 10 more fields]
   +- Project [d_date_sk#58]
      +- Filter (((isnotnull(d_date#60) && (cast(d_date#60 as string) >= 1999-02-22)) && (d_date#60 <= 10733)) && isnotnull(d_date_sk#58))
         +- Relation[d_date_sk#58,d_date_id#59,d_date#60,d_month_seq#61,d_week_seq#62,d_quarter_seq#63,d_year#64,d_dow#65,d_moy#66,d_dom#67,d_qoy#68,d_fy_year#69,d_fy_quarter_seq#70,d_fy_week_seq#71,d_day_name#72,d_quarter_name#73,d_holiday#74,d_weekend#75,d_following_holiday#76,d_first_dom#77,d_last_dom#78,d_same_day_ly#79,d_same_day_lq#80,d_current_day#81,... 4 more fields]
placeholder:
PlanLater Join Inner, (d_date_sk#58 = cs_sold_date_sk#24)

logicalPlan:
Join Inner, (d_date_sk#58 = cs_sold_date_sk#24)
:- Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
:  +- Filter isnotnull(cs_sold_date_sk#24)
:     +- Relation[cs_sold_date_sk#24,cs_sold_time_sk#25,cs_ship_date_sk#26,cs_bill_customer_sk#27,cs_bill_cdemo_sk#28,cs_bill_hdemo_sk#29,cs_bill_addr_sk#30,cs_ship_customer_sk#31,cs_ship_cdemo_sk#32,cs_ship_hdemo_sk#33,cs_ship_addr_sk#34,cs_call_center_sk#35,cs_catalog_page_sk#36,cs_ship_mode_sk#37,cs_warehouse_sk#38,cs_item_sk#39,cs_promo_sk#40,cs_order_number#41,cs_quantity#42,cs_wholesale_cost#43,cs_list_price#44,cs_sales_price#45,cs_ext_discount_amt#46,cs_ext_sales_price#47,... 10 more fields]
+- Project [d_date_sk#58]
   +- Filter (((isnotnull(d_date#60) && (cast(d_date#60 as string) >= 1999-02-22)) && (d_date#60 <= 10733)) && isnotnull(d_date_sk#58))
      +- Relation[d_date_sk#58,d_date_id#59,d_date#60,d_month_seq#61,d_week_seq#62,d_quarter_seq#63,d_year#64,d_dow#65,d_moy#66,d_dom#67,d_qoy#68,d_fy_year#69,d_fy_quarter_seq#70,d_fy_week_seq#71,d_day_name#72,d_quarter_name#73,d_holiday#74,d_weekend#75,d_following_holiday#76,d_first_dom#77,d_last_dom#78,d_same_day_ly#79,d_same_day_lq#80,d_current_day#81,... 4 more fields]
placeholder:
PlanLater Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]

logicalPlan:
Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
+- Filter isnotnull(cs_sold_date_sk#24)
   +- Relation[cs_sold_date_sk#24,cs_sold_time_sk#25,cs_ship_date_sk#26,cs_bill_customer_sk#27,cs_bill_cdemo_sk#28,cs_bill_hdemo_sk#29,cs_bill_addr_sk#30,cs_ship_customer_sk#31,cs_ship_cdemo_sk#32,cs_ship_hdemo_sk#33,cs_ship_addr_sk#34,cs_call_center_sk#35,cs_catalog_page_sk#36,cs_ship_mode_sk#37,cs_warehouse_sk#38,cs_item_sk#39,cs_promo_sk#40,cs_order_number#41,cs_quantity#42,cs_wholesale_cost#43,cs_list_price#44,cs_sales_price#45,cs_ext_discount_amt#46,cs_ext_sales_price#47,... 10 more fields]
placeholder:
PlanLater Project [d_date_sk#58]

logicalPlan:
Project [d_date_sk#58]
+- Filter (((isnotnull(d_date#60) && (cast(d_date#60 as string) >= 1999-02-22)) && (d_date#60 <= 10733)) && isnotnull(d_date_sk#58))
   +- Relation[d_date_sk#58,d_date_id#59,d_date#60,d_month_seq#61,d_week_seq#62,d_quarter_seq#63,d_year#64,d_dow#65,d_moy#66,d_dom#67,d_qoy#68,d_fy_year#69,d_fy_quarter_seq#70,d_fy_week_seq#71,d_day_name#72,d_quarter_name#73,d_holiday#74,d_weekend#75,d_following_holiday#76,d_first_dom#77,d_last_dom#78,d_same_day_ly#79,d_same_day_lq#80,d_current_day#81,... 4 more fields]

那么可以看到,递归到最底处,就是project->filter->relation的三层节点组合,由于我实际是重写过了DataSource,这个时候会调用DataSourceStrategy,去读取获取数据,然后递归逐个返回根据每个planLater分割点会有对应的策略去对数据进行相应的操作。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,053评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,527评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,779评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,685评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,699评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,609评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,989评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,654评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,890评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,634评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,716评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,394评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,976评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,950评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,191评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,849评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,458评论 2 342

推荐阅读更多精彩内容