Spark Core源码精读计划#25:UnifiedMemoryManager——统一内存管理机制

目录

前言

在前文的末尾,我们分析了静态内存管理器StaticMemoryManager的优缺点,并指出统一内存管理器UnifiedMemoryManager能够弥补它的缺点,同时也是目前Spark内存管理的事实标准。本文尽可能深入地剖析UnifiedMemoryManager的具体实现。

统一内存管理器UnifiedMemoryManager

UnifiedMemoryManager与StaticMemoryManager相比,主要有两点改进:

  • 存储内存和执行内存不再是靠比例定死的,而是在一定条件下可以相互借用,更加灵活;
  • 存储内存和执行内存都可以在堆外分配了。

按照惯例,我们仍然从其构造开始看起。

构造方法

代码#25.1 - o.a.s.memory.UnifiedMemoryManager类的构造

private[spark] class UnifiedMemoryManager private[memory] (
    conf: SparkConf,
    val maxHeapMemory: Long,
    onHeapStorageRegionSize: Long,
    numCores: Int)
  extends MemoryManager(
    conf,
    numCores,
    onHeapStorageRegionSize,
    maxHeapMemory - onHeapStorageRegionSize) {

  private def assertInvariants(): Unit = {
    assert(onHeapExecutionMemoryPool.poolSize + onHeapStorageMemoryPool.poolSize == maxHeapMemory)
    assert(
      offHeapExecutionMemoryPool.poolSize + offHeapStorageMemoryPool.poolSize == maxOffHeapMemory)
  }

  assertInvariants()

  // ......
}

其构造方法参数与StaticMemoryManager相比有微小的变化,需要传入堆内内存总量maxHeapMemory,以及堆内存储内存空间的量onHeapStorageRegionSize,堆内执行内存空间的量就是两者之差。另外,还会校验堆内、堆外内存池的大小,保证它们与规定的内存总量对的上。

但是,在代码#24.1初始化MemoryManager实现时,调用的UnifiedMemoryManager构造方法只有两个参数,这是因为其伴生对象实现了apply()方法。如果看官对Scala不太熟的话,可以去翻翻Scala官方文档,其中讲述了apply()方法的具体作用。下面还是来看代码。

计算内存量

代码#25.2 - o.a.s.memory.UnifiedMemoryManager.apply()方法

  def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = {
    val maxMemory = getMaxMemory(conf)
    new UnifiedMemoryManager(
      conf,
      maxHeapMemory = maxMemory,
      onHeapStorageRegionSize =
        (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong,
      numCores = numCores)
  }

可见,Spark可利用的内存总量(为避免混淆,下面叫“统一内存”)是调用getMaxMemory()方法计算出来的,存储内存占统一内存的初始比例(因为可以借用,所以是初始比例)由配置项spark.memory.storageFraction决定,默认值0.5。剩下的(1 - spark.memory.storageFraction)比例的内存就是执行内存了。

getMaxMemory()方法的代码如下。

代码#25.3 - o.a.s.memory.UnifiedMemoryManager.getMaxMemory()方法

  private def getMaxMemory(conf: SparkConf): Long = {
    val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
    val reservedMemory = conf.getLong("spark.testing.reservedMemory",
      if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
    val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
    if (systemMemory < minSystemMemory) {
      throw new IllegalArgumentException(s"System memory $systemMemory must " +
        s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " +
        s"option or spark.driver.memory in Spark configuration.")
    }

    if (conf.contains("spark.executor.memory")) {
      val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
      if (executorMemory < minSystemMemory) {
        throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
          s"$minSystemMemory. Please increase executor memory using the " +
          s"--executor-memory option or spark.executor.memory in Spark configuration.")
      }
    }
    val usableMemory = systemMemory - reservedMemory
    val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6)
    (usableMemory * memoryFraction).toLong
  }

该方法的执行流程是:

  1. 通过Runtime.maxMemory()这个native方法取得当前JVM可用的最大内存(堆内存)。spark.testing.memory参数是测试参数,几乎不用。
  2. 取得保留内存的大小,由常量RESERVED_SYSTEM_MEMORY_BYTES定义,大小是300MB。这个量可以通过spark.testing.reservedMemory参数来改,但同样几乎不用。
  3. 以保留内存的1.5倍(也就是450MB)作为Driver和Executor的最小内存并校验之。
  4. 用堆内存量减去保留内存量,得到可用内存。spark.memory.fraction配置项指定了统一内存能实际利用的可用内存比例,默认值为0.6(60%),最终返回可用内存与spark.memory.fraction的乘积。

可见,UnifiedMemoryManager的堆内内存布局其实比StaticMemoryManager要简单。因为存储内存和执行内存之间的边界是浮动的,所以展开内存的比例以及安全比例都不再需要了。下面仍然先用一幅框图来形象地表示出来。

统一内存管理布局图示

图中Storage和Execution区域的界限是虚线,并且有上下箭头,表示它们之间的边界是浮动的。

#25.1 - Spark的统一内存管理布局

然后,我们来看看申请内存的方法的实现,其中也包含了借用内存的逻辑。

申请/借用存储内存

以下是重写的acquireStorageMemory()方法代码。

代码#25.4 - o.a.s.memory.UnifiedMemoryManager.acquireStorageMemory()方法

  override def acquireStorageMemory(
      blockId: BlockId,
      numBytes: Long,
      memoryMode: MemoryMode): Boolean = synchronized {
    assertInvariants()
    assert(numBytes >= 0)
    val (executionPool, storagePool, maxMemory) = memoryMode match {
      case MemoryMode.ON_HEAP => (
        onHeapExecutionMemoryPool,
        onHeapStorageMemoryPool,
        maxOnHeapStorageMemory)
      case MemoryMode.OFF_HEAP => (
        offHeapExecutionMemoryPool,
        offHeapStorageMemoryPool,
        maxOffHeapStorageMemory)
    }
    if (numBytes > maxMemory) {
      logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
        s"memory limit ($maxMemory bytes)")
      return false
    }
    if (numBytes > storagePool.memoryFree) {
      val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree,
        numBytes - storagePool.memoryFree)
      executionPool.decrementPoolSize(memoryBorrowedFromExecution)
      storagePool.incrementPoolSize(memoryBorrowedFromExecution)
    }
    storagePool.acquireMemory(blockId, numBytes)
  }

该方法首先根据MemoryMode决定是在堆内还是在堆外申请存储内存。如果申请量没有超过存储内存池的空闲量,就可以直接调用StorageMemoryPool.acquireMemory()方法申请内存。但若存储池中剩余的内存不够分配,就会试图向执行池借用内存,借用的量为当前执行池空闲量与(块大小 - 当前存储池空闲量)两个量之间的较小者。然后会调用decrementPoolSize()方法缩小执行池,调用incrementPoolSize()方法扩大存储池。

由上面的描述可以看出,借用内存的过程是比较保守的,也就是一次只会借用当时不足的内存量,不会多借。并且借到的内存有可能仍然无法满足需求,这时就只能把原先存储的一部分块淘汰掉了,这部分逻辑之前提到过,参见代码#23.4。

在统一内存管理机制下,展开内存虽然仍属于存储内存的一部分,但不再有边界,所以申请展开内存的方法与申请存储内存完全相同。

代码#25.5 - o.a.s.memory.UnifiedMemoryManager.acquireUnrollMemory()方法

  override def acquireUnrollMemory(
      blockId: BlockId,
      numBytes: Long,
      memoryMode: MemoryMode): Boolean = synchronized {
    acquireStorageMemory(blockId, numBytes, memoryMode)
  }

申请/借用执行内存

这个流程就比较复杂一些了。

代码#25.6 - o.a.s.memory.UnifiedMemoryManager.acquireExecutionMemory()方法

  override private[memory] def acquireExecutionMemory(
      numBytes: Long,
      taskAttemptId: Long,
      memoryMode: MemoryMode): Long = synchronized {
    assertInvariants()
    assert(numBytes >= 0)
    val (executionPool, storagePool, storageRegionSize, maxMemory) = memoryMode match {
      case MemoryMode.ON_HEAP => (
        onHeapExecutionMemoryPool,
        onHeapStorageMemoryPool,
        onHeapStorageRegionSize,
        maxHeapMemory)
      case MemoryMode.OFF_HEAP => (
        offHeapExecutionMemoryPool,
        offHeapStorageMemoryPool,
        offHeapStorageMemory,
        maxOffHeapMemory)
    }

    def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
      if (extraMemoryNeeded > 0) {
        val memoryReclaimableFromStorage = math.max(
          storagePool.memoryFree,
          storagePool.poolSize - storageRegionSize)
        if (memoryReclaimableFromStorage > 0) {
          val spaceToReclaim = storagePool.freeSpaceToShrinkPool(
            math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
          storagePool.decrementPoolSize(spaceToReclaim)
          executionPool.incrementPoolSize(spaceToReclaim)
        }
      }
    }

    def computeMaxExecutionPoolSize(): Long = {
      maxMemory - math.min(storagePool.memoryUsed, storageRegionSize)
    }

    executionPool.acquireMemory(
      numBytes, taskAttemptId, maybeGrowExecutionPool, () => computeMaxExecutionPoolSize)
  }

来看其内部嵌套定义的方法maybeGrowExecutionPool(),它负责在执行内存不够用时,向存储内存池借用内存,逻辑是:

  1. 计算有多少内存能够从存储内存池回收回来。该大小为存储池的空闲空间与之前存储池向执行池借用过的内存量(注意这个描述)的较大值。
  2. 如果有内存可以回收,就调用存储池的freeSpaceToShrinkPool()方法,淘汰掉一部分存储块。淘汰掉的块所占内存是理论可回收量与实际需要的执行内存之间的较小值。
  3. 调用decrementPoolSize()方法缩小存储池,调用incrementPoolSize()方法扩大执行池。

从这个逻辑可以看出,执行内存不够用时,并不太像是“借用”(borrow)存储内存,而是“占用”或者“回收”(reclaim)。也就是说,执行池可以“要求”存储池淘汰掉自身持有的块来归还曾经借用的空间,而存储池并不会反过来要求执行池也同样归还。其原因在于,存储内存中的块可以方便地持久化到磁盘,而执行内存中的块大多为中间数据(比如Shuffle数据),比较难持久化,并且一旦淘汰掉这些中间数据,整个Task很可能就会直接失败,重算成本太高。

下图示出默认情况下(spark.memory.storageFraction=0.5),内存借用的流程。

图#25.2 - 内存借用示意

在代码#25.6中还有一个嵌套定义的方法computeMaxExecutionPoolSize(),它用于获得执行内存池的最大可能大小,比较简单,不再多说。

这两个嵌套方法都会当做执行内存池的acquireMemory()方法的参数,作为函数传进去。我们在前面略去了ExecutionMemoryPool类的解释过程,所以现在只是大致瞅一眼与申请内存有关的代码。

代码#25.7 - o.a.s.memory.ExecutionMemoryPool.acquireMemory()方法

  private[memory] def acquireMemory(
      numBytes: Long,
      taskAttemptId: Long,
      maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit,
      computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {
    assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")

    if (!memoryForTask.contains(taskAttemptId)) {
      memoryForTask(taskAttemptId) = 0L
      lock.notifyAll()
    }

    while (true) {
      val numActiveTasks = memoryForTask.keys.size
      val curMem = memoryForTask(taskAttemptId)

      maybeGrowPool(numBytes - memoryFree)

      val maxPoolSize = computeMaxPoolSize()
      val maxMemoryPerTask = maxPoolSize / numActiveTasks
      val minMemoryPerTask = poolSize / (2 * numActiveTasks)

      val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))
      val toGrant = math.min(maxToGrant, memoryFree)

      if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
        logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
        lock.wait()
      } else {
        memoryForTask(taskAttemptId) += toGrant
        return toGrant
      }
    }
    0L
  }

可见,ExecutionMemoryPool申请内存时是循环申请的,每次都调用参数中的maybeGrowPool()函数(实际上就是上面讲的maybeGrowExecutionPool()方法)来检查是否需要从StorageMemoryPool回收空间。如果分配到的内存比实际申请的少,或者该Task分配完毕之后的内存仍然小于每个Task应获得内存的最小值(即池子的大小除以当前活动Task数的两倍),就调用MemoryManager对象的wait()方法阻塞,直到有其他Task释放内存为止,再进入下一波循环,直到申请到足够的内存。

关于ExecutionMemoryPool,之后还会详细地解释,这里只要有个印象就行。

总结

本文详细阅读了UnifiedMemoryManager的相关源码,对Spark的统一内存管理机制有了深入的了解。当然,统一内存管理虽然先进,但也不代表万事无忧。比如当程序中cache了大量RDD并且不及时释放时,很多存储内存中的块都无法被淘汰,会造成Shuffle阶段频繁Full GC,作业执行变慢。关于Spark作业故障定位和内存调优的事情,不属于这个系列的范畴,笔者会专门挑个时间写一篇全面的总结(就像之前写的两篇Hive调优总结一样),最近实在是太忙了。

看官也可能会问,讲内存管理讲了这么长时间,结果都是在做bookkeeping的工作,内存的实际分配和释放逻辑到底在哪里呢?这个由内存存储类MemoryStore及其附属类来实现,下一篇文章就会讲到了。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,968评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,601评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,220评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,416评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,425评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,144评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,432评论 3 401
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,088评论 0 261
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,586评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,028评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,137评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,783评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,343评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,333评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,559评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,595评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,901评论 2 345

推荐阅读更多精彩内容