Spark SortShuffleWriter写流程分析

Spark的shuffleWriter一共有三种,本文分析 SortShuffleWriter的shuffle写数据过程. SortShuffleWriter是最为复杂的shuffle writer。 在ShuffleMapTask中需要对数据分区内进行排序或者预聚合的场景下,都是使用该writer完成shuffle数据的写盘。

其核心流程分为如下几步:

  1. 在ExternalSorter中插入数据
  2. 将每个spill文件读取合并重新生成新的数据文件,在合并的过程中,如果有预聚合或者排序的操作,也会进行相关操作
  3. 生成数据文件对应的index文件。

以上步骤中最为复杂的即是将数据插入externalSorter中,本文重点分析此处逻辑。

  1. 首先将数据插入array中
  def insertAll(records: Iterator[Product2[K, V]]): Unit = {
    // TODO: stop combining if we find that the reduction factor isn't high
    val shouldCombine = aggregator.isDefined
    if (shouldCombine) {
      // Combine values in-memory first using our AppendOnlyMap
      val mergeValue = aggregator.get.mergeValue
      val createCombiner = aggregator.get.createCombiner
      var kv: Product2[K, V] = null
      val update = (hadValue: Boolean, oldValue: C) => {
        if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
      }
      while (records.hasNext) {
        addElementsRead()
        kv = records.next()
        // 针对需要预聚合的场景,通过一个PartitionedAppendOnlyMap完成数据插入及聚合,其本质也是将数据存入array中
        map.changeValue((getPartition(kv._1), kv._1), update)
        maybeSpillCollection(usingMap = true)
      }
    } else {
      // Stick values into our buffer
      while (records.hasNext) {
        addElementsRead()
        val kv = records.next()
        // 数据插入PartitionedPairBuffer中,本质是将数据存入Array中,实现较为简单
        buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
        maybeSpillCollection(usingMap = false)
      }
    }
  }

本质上来讲,是将数据放入一个array中,对于输入的第n条记录,:

包含map端预聚合的数据写入

在有预聚合的场景下,核心逻辑是: 数据写入时,需要判断是否之前已经存在该key的记录,如果存在,则找出并进行聚合,如果不存在,则直接写入该记录。
由于涉及到查询之前是否已经存在该key,因此使用了hash值,这也是针对该场景使用PartitionedAppendOnlyMap的原因

数据插入的核心流程如下图所示:


数据插入

数据插入的代码逻辑如下:

 def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
    ... 空值处理
    var pos = rehash(k.hashCode) & mask
    var i = 1
    while (true) {
      val curKey = data(2 * pos)
      
      if (curKey.eq(null)) {
      // 当前位置存储数据为空,即未与数据存储
        val newValue = updateFunc(false, null.asInstanceOf[V])
        data(2 * pos) = k
        data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
        incrementSize()
        return newValue
      } else if (k.eq(curKey) || k.equals(curKey)) {
         // 当前位置有值,且已存储值与当前值相等
        val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])
        data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
        return newValue
      } else {
       // 当前位置有值,且已存储值与当前值不等,则继续查找下一个可用存储位置
        val delta = i
        pos = (pos + delta) & mask
        i += 1
      }
    }
    null.asInstanceOf[V] // Never reached but needed to keep compiler happy
  }

不需要map端预聚合的数据写入场景

该实现简单

def insert(partition: Int, key: K, value: V): Unit = {
    if (curSize == capacity) {
    //如果array空间不足,则直接扩容,并将原有数组中数据copy至新数据即可
      growArray()
    }
     //  在位置array[2n]存入`(partition, key.asInstanceOf[AnyRef])`
    data(2 * curSize) = (partition, key.asInstanceOf[AnyRef])
    //  在位置array[2n+1]的位置存入`value.asInstanceOf[AnyRef]`,存入
    data(2 * curSize + 1) = value.asInstanceOf[AnyRef]
    curSize += 1
    afterUpdate()
  }
  1. 将当前内存中数据spill至磁盘

每插入32条记录,则查看当前的内存使用量是否已经超过申请到的可用内存大小,如果超过则再次进行内存申请,如果此时再次申请到的内存依然无法满足使用,则触发spill落盘操作。

在每次触发spill落盘时,会对array中的数据进行排序落盘(在真正落盘时,只会将key和value写入磁盘,partitionId不会落盘),排序的规则如下:

  1. 首先根据分区对记录进行排序
  /**
   * A comparator for (Int, K) pairs that orders them both by their partition ID and a key ordering.
   */
  def partitionKeyComparator[K](keyComparator: Comparator[K]): Comparator[(Int, K)] = {
    new Comparator[(Int, K)] {
      override def compare(a: (Int, K), b: (Int, K)): Int = {
        val partitionDiff = a._1 - b._1
        if (partitionDiff != 0) {
          partitionDiff
        } else {
          keyComparator.compare(a._2, b._2)
        }
      }
    }
  }
}
  1. 再在同一分区内,如果有定义ordering或者aggregator则会根据如下keyComparator对同一分区的记录进行排序
private val keyComparator: Comparator[K] = ordering.getOrElse(new Comparator[K] {
    override def compare(a: K, b: K): Int = {
      val h1 = if (a == null) 0 else a.hashCode()
      val h2 = if (b == null) 0 else b.hashCode()
      if (h1 < h2) -1 else if (h1 == h2) 0 else 1
    }
  })

  private def comparator: Option[Comparator[K]] = {
    if (ordering.isDefined || aggregator.isDefined) {
      Some(keyComparator)
    } else {
      None
    }
  }
  1. 在所有的数据都处理完之后,会将所有spill至磁盘及内存中array中的数据merge到同一个文件中,并生产index文件

和以上的数据spill值磁盘一样,在归并时,也会根据是否有ordering,aggregator等场景,确认各个spill文件归并时是否需要分区内有序以及书否需要merge。 经过归并可以得到一个保存所有shuffle数据有序文件。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,937评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,503评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,712评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,668评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,677评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,601评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,975评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,637评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,881评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,621评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,710评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,387评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,971评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,947评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,189评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,805评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,449评论 2 342