Spark Core源码精读计划#27:磁盘块管理器DiskBlockManager

目录

前言

我们前面用4篇文章的时间讲解了Spark存储子系统中的内存部分,其内容相当多,包括内存池MemoryPool、内存管理器MemoryManager(包含两种实现:静态内存管理器StaticMemoryManager和统一内存管理器UnifiedMemoryManager)、内存项MemoryEntry、内存存储MemoryStore。相对而言,磁盘部分的实现就比较直接而简单一些,主要包含两个组件:磁盘块管理器DiskBlockManager、磁盘存储DiskStore。它们的内容都不是特别复杂,本文就研究一下DiskBlockManager。

磁盘块管理器DiskBlockManager

DiskBlockManager负责维护块数据与其在磁盘上存储位置的关系。先来看看它的构造方法与属性成员。

构造方法与属性成员

代码#27.1 - o.a.s.storage.DiskBlockManager类的构造与属性

private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolean) extends Logging {
  private[spark] val subDirsPerLocalDir = conf.getInt("spark.diskStore.subDirectories", 64)

  private[spark] val localDirs: Array[File] = createLocalDirs(conf)
  if (localDirs.isEmpty) {
    logError("Failed to create any local dir.")
    System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
  }

  private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))

  private val shutdownHook = addShutdownHook()
  
  // ......
}

DiskBlockManager接受两个参数:SparkConf实例与一个叫deleteFilesOnStop的布尔值。该值表示DiskBlockManager停止时是否要删除本地的存储目录,由BlockManager初始化它时指定。各个属性成员的含义解释如下:

  • subDirsPerLocalDir:每个存储目录下子目录的最大数量,由spark.diskStore.subDirectories配置项指定,默认值64。
  • localDirs:本地存储目录的数组,通过调用createLocalDirs()方法创建。
  • subDirs:包含子目录的本地存储目录的二维数组,其中一维的大小是localDirs.length,另一维的大小是subDirsPerLocalDir。
  • shutdownHook:DiskBlockManager的关闭钩子,通过调用addShutdownHook()方法来绑定。

下面我们就来看看createLocalDirs()方法。

创建本地存储目录

代码#27.2 - o.a.s.storage.DiskBlockManager.createLocalDirs()方法

  private def createLocalDirs(conf: SparkConf): Array[File] = {
    Utils.getConfiguredLocalDirs(conf).flatMap { rootDir =>
      try {
        val localDir = Utils.createDirectory(rootDir, "blockmgr")
        logInfo(s"Created local directory at $localDir")
        Some(localDir)
      } catch {
        case e: IOException =>
          logError(s"Failed to create local dir in $rootDir. Ignoring this directory.", e)
          None
      }
    }
  }

该方法先调用通用工具类Utils中的getConfiguredLocalDirs()方法获取根目录,然后对每个根目录,调用Utils.createDirectory()方法创建存储目录。也就是说,所有磁盘存储的目录都是组织在一起的。Utils类的代码暂时就不细看了,看官只需知道getConfiguredLocalDirs()会依次检查如下几个环境变量或配置项中的路径即可:

  • LOCAL_DIRS(仅限Spark on YARN部署);
  • SPARK_EXECUTOR_DIRS;
  • SPARK_LOCAL_DIRS;
  • MESOS_DIRECTORY;
  • spark.local.dir(默认值为java.io.tmpdir)。

然后,Utils.createDirectory()方法就会创建名称形如blockmgr-[UUID.randomUUID]的一级存储目录,但不会创建子目录。那么哪里会创建子目录呢?答案在getFile()方法中,它除了名称所述的获取文件的功能外,也兼职创建子目录。

获取存储文件及创建子目录

代码#27.3 - o.a.s.storage.DiskBlockManager.getFile()方法

  def getFile(filename: String): File = {
    val hash = Utils.nonNegativeHash(filename)
    val dirId = hash % localDirs.length
    val subDirId = (hash / localDirs.length) % subDirsPerLocalDir

    // Create the subdirectory if it doesn't already exist
    val subDir = subDirs(dirId).synchronized {
      val old = subDirs(dirId)(subDirId)
      if (old != null) {
        old
      } else {
        val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
        if (!newDir.exists() && !newDir.mkdir()) {
          throw new IOException(s"Failed to create local dir in $newDir.")
        }
        subDirs(dirId)(subDirId) = newDir
        newDir
      }
    }

    new File(subDir, filename)
  }

  def getFile(blockId: BlockId): File = getFile(blockId.name)

该方法的执行流程如下:

  1. 调用Utils.nonNegativeHash()方法,计算出文件名的哈希码的绝对值。
  2. 将哈希码与localDirs数组长度取余,作为目录的下标。再将哈希码与localDirs数组长度的商与subDirsPerLocalDir取余,作为子目录的下标。
  3. 检查文件对应的子目录是否存在。如果不存在的话,就根据子目录的下标来创建,并格式化为两位十六进制表示。
  4. 返回File对象。

另外,getFile()方法还有将BlockId作为输入的重载,由它可见,块对应的文件名与它本身的name字段有关。

通过上面的了解,DiskBlockManager磁盘存储的目录结构可以概括成下图。

图#27.1 - DiskBlockManager的目录结构

除了获取单个文件之外,还有获取所有文件及所有块ID的getAllFiles()与getAllBlocks()方法,它们的实现都很简单,代码如下。
代码#27.4 - o.a.s.storage.DiskBlockManager.getAllFiles()/getAllBlocks()方法

  def getAllFiles(): Seq[File] = {
    subDirs.flatMap { dir =>
      dir.synchronized {
        dir.clone()
      }
    }.filter(_ != null).flatMap { dir =>
      val files = dir.listFiles()
      if (files != null) files else Seq.empty
    }
  }

  def getAllBlocks(): Seq[BlockId] = {
    getAllFiles().flatMap { f =>
      try {
        Some(BlockId(f.getName))
      } catch {
        case _: UnrecognizedBlockId =>
          None
      }
    }
  }

创建临时块文件

代码#27.5 - o.a.s.storage.DiskBlockManager.createTempLocalBlock()/createTempShuffleBlock()方法

  def createTempLocalBlock(): (TempLocalBlockId, File) = {
    var blockId = new TempLocalBlockId(UUID.randomUUID())
    while (getFile(blockId).exists()) {
      blockId = new TempLocalBlockId(UUID.randomUUID())
    }
    (blockId, getFile(blockId))
  }

  def createTempShuffleBlock(): (TempShuffleBlockId, File) = {
    var blockId = new TempShuffleBlockId(UUID.randomUUID())
    while (getFile(blockId).exists()) {
      blockId = new TempShuffleBlockId(UUID.randomUUID())
    }
    (blockId, getFile(blockId))
  }

这两个方法比较简单,就是用来创建Spark计算过程中的中间结果以及Shuffle Write阶段输出的存储文件。它们的块ID分别用TempLocalBlockId和TempShuffleBlockId来表示。

绑定关闭钩子与关闭

代码#27.6 - o.a.s.storage.DiskBlockManager.addShutdownHook()/doStop()方法

  private def addShutdownHook(): AnyRef = {
    logDebug("Adding shutdown hook") // force eager creation of logger
    ShutdownHookManager.addShutdownHook(ShutdownHookManager.TEMP_DIR_SHUTDOWN_PRIORITY + 1) { () =>
      logInfo("Shutdown hook called")
      DiskBlockManager.this.doStop()
    }
  }

  private def doStop(): Unit = {
    if (deleteFilesOnStop) {
      localDirs.foreach { localDir =>
        if (localDir.isDirectory() && localDir.exists()) {
          try {
            if (!ShutdownHookManager.hasRootAsShutdownDeleteDir(localDir)) {
              Utils.deleteRecursively(localDir)
            }
          } catch {
            case e: Exception =>
              logError(s"Exception while deleting local spark dir: $localDir", e)
          }
        }
      }
    }
  }

由代码可见,如果deleteFilesOnStop标记为真,则在DiskBlockManager关闭之前,会调用Utils.deleteRecursively()方法递归地删掉本地存储目录。

总结

本文介绍了DiskBlockManager的相关设计细节,主要包含其对Spark磁盘存储目录、子目录及文件的创建和管理。至于实际的文件读写,则由磁盘存储DiskStore来负责。DiskStore的实现也比MemoryStore要来得简单,下一篇文章会来探讨它。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,968评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,601评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,220评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,416评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,425评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,144评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,432评论 3 401
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,088评论 0 261
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,586评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,028评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,137评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,783评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,343评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,333评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,559评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,595评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,901评论 2 345

推荐阅读更多精彩内容