目录
前言
我们前面用4篇文章的时间讲解了Spark存储子系统中的内存部分,其内容相当多,包括内存池MemoryPool、内存管理器MemoryManager(包含两种实现:静态内存管理器StaticMemoryManager和统一内存管理器UnifiedMemoryManager)、内存项MemoryEntry、内存存储MemoryStore。相对而言,磁盘部分的实现就比较直接而简单一些,主要包含两个组件:磁盘块管理器DiskBlockManager、磁盘存储DiskStore。它们的内容都不是特别复杂,本文就研究一下DiskBlockManager。
磁盘块管理器DiskBlockManager
DiskBlockManager负责维护块数据与其在磁盘上存储位置的关系。先来看看它的构造方法与属性成员。
构造方法与属性成员
代码#27.1 - o.a.s.storage.DiskBlockManager类的构造与属性
private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolean) extends Logging {
private[spark] val subDirsPerLocalDir = conf.getInt("spark.diskStore.subDirectories", 64)
private[spark] val localDirs: Array[File] = createLocalDirs(conf)
if (localDirs.isEmpty) {
logError("Failed to create any local dir.")
System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
}
private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
private val shutdownHook = addShutdownHook()
// ......
}
DiskBlockManager接受两个参数:SparkConf实例与一个叫deleteFilesOnStop的布尔值。该值表示DiskBlockManager停止时是否要删除本地的存储目录,由BlockManager初始化它时指定。各个属性成员的含义解释如下:
- subDirsPerLocalDir:每个存储目录下子目录的最大数量,由spark.diskStore.subDirectories配置项指定,默认值64。
- localDirs:本地存储目录的数组,通过调用createLocalDirs()方法创建。
- subDirs:包含子目录的本地存储目录的二维数组,其中一维的大小是localDirs.length,另一维的大小是subDirsPerLocalDir。
- shutdownHook:DiskBlockManager的关闭钩子,通过调用addShutdownHook()方法来绑定。
下面我们就来看看createLocalDirs()方法。
创建本地存储目录
代码#27.2 - o.a.s.storage.DiskBlockManager.createLocalDirs()方法
private def createLocalDirs(conf: SparkConf): Array[File] = {
Utils.getConfiguredLocalDirs(conf).flatMap { rootDir =>
try {
val localDir = Utils.createDirectory(rootDir, "blockmgr")
logInfo(s"Created local directory at $localDir")
Some(localDir)
} catch {
case e: IOException =>
logError(s"Failed to create local dir in $rootDir. Ignoring this directory.", e)
None
}
}
}
该方法先调用通用工具类Utils中的getConfiguredLocalDirs()方法获取根目录,然后对每个根目录,调用Utils.createDirectory()方法创建存储目录。也就是说,所有磁盘存储的目录都是组织在一起的。Utils类的代码暂时就不细看了,看官只需知道getConfiguredLocalDirs()会依次检查如下几个环境变量或配置项中的路径即可:
- LOCAL_DIRS(仅限Spark on YARN部署);
- SPARK_EXECUTOR_DIRS;
- SPARK_LOCAL_DIRS;
- MESOS_DIRECTORY;
- spark.local.dir(默认值为java.io.tmpdir)。
然后,Utils.createDirectory()方法就会创建名称形如blockmgr-[UUID.randomUUID]
的一级存储目录,但不会创建子目录。那么哪里会创建子目录呢?答案在getFile()方法中,它除了名称所述的获取文件的功能外,也兼职创建子目录。
获取存储文件及创建子目录
代码#27.3 - o.a.s.storage.DiskBlockManager.getFile()方法
def getFile(filename: String): File = {
val hash = Utils.nonNegativeHash(filename)
val dirId = hash % localDirs.length
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
// Create the subdirectory if it doesn't already exist
val subDir = subDirs(dirId).synchronized {
val old = subDirs(dirId)(subDirId)
if (old != null) {
old
} else {
val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
if (!newDir.exists() && !newDir.mkdir()) {
throw new IOException(s"Failed to create local dir in $newDir.")
}
subDirs(dirId)(subDirId) = newDir
newDir
}
}
new File(subDir, filename)
}
def getFile(blockId: BlockId): File = getFile(blockId.name)
该方法的执行流程如下:
- 调用Utils.nonNegativeHash()方法,计算出文件名的哈希码的绝对值。
- 将哈希码与localDirs数组长度取余,作为目录的下标。再将哈希码与localDirs数组长度的商与subDirsPerLocalDir取余,作为子目录的下标。
- 检查文件对应的子目录是否存在。如果不存在的话,就根据子目录的下标来创建,并格式化为两位十六进制表示。
- 返回File对象。
另外,getFile()方法还有将BlockId作为输入的重载,由它可见,块对应的文件名与它本身的name字段有关。
通过上面的了解,DiskBlockManager磁盘存储的目录结构可以概括成下图。
除了获取单个文件之外,还有获取所有文件及所有块ID的getAllFiles()与getAllBlocks()方法,它们的实现都很简单,代码如下。
代码#27.4 - o.a.s.storage.DiskBlockManager.getAllFiles()/getAllBlocks()方法
def getAllFiles(): Seq[File] = {
subDirs.flatMap { dir =>
dir.synchronized {
dir.clone()
}
}.filter(_ != null).flatMap { dir =>
val files = dir.listFiles()
if (files != null) files else Seq.empty
}
}
def getAllBlocks(): Seq[BlockId] = {
getAllFiles().flatMap { f =>
try {
Some(BlockId(f.getName))
} catch {
case _: UnrecognizedBlockId =>
None
}
}
}
创建临时块文件
代码#27.5 - o.a.s.storage.DiskBlockManager.createTempLocalBlock()/createTempShuffleBlock()方法
def createTempLocalBlock(): (TempLocalBlockId, File) = {
var blockId = new TempLocalBlockId(UUID.randomUUID())
while (getFile(blockId).exists()) {
blockId = new TempLocalBlockId(UUID.randomUUID())
}
(blockId, getFile(blockId))
}
def createTempShuffleBlock(): (TempShuffleBlockId, File) = {
var blockId = new TempShuffleBlockId(UUID.randomUUID())
while (getFile(blockId).exists()) {
blockId = new TempShuffleBlockId(UUID.randomUUID())
}
(blockId, getFile(blockId))
}
这两个方法比较简单,就是用来创建Spark计算过程中的中间结果以及Shuffle Write阶段输出的存储文件。它们的块ID分别用TempLocalBlockId和TempShuffleBlockId来表示。
绑定关闭钩子与关闭
代码#27.6 - o.a.s.storage.DiskBlockManager.addShutdownHook()/doStop()方法
private def addShutdownHook(): AnyRef = {
logDebug("Adding shutdown hook") // force eager creation of logger
ShutdownHookManager.addShutdownHook(ShutdownHookManager.TEMP_DIR_SHUTDOWN_PRIORITY + 1) { () =>
logInfo("Shutdown hook called")
DiskBlockManager.this.doStop()
}
}
private def doStop(): Unit = {
if (deleteFilesOnStop) {
localDirs.foreach { localDir =>
if (localDir.isDirectory() && localDir.exists()) {
try {
if (!ShutdownHookManager.hasRootAsShutdownDeleteDir(localDir)) {
Utils.deleteRecursively(localDir)
}
} catch {
case e: Exception =>
logError(s"Exception while deleting local spark dir: $localDir", e)
}
}
}
}
}
由代码可见,如果deleteFilesOnStop标记为真,则在DiskBlockManager关闭之前,会调用Utils.deleteRecursively()方法递归地删掉本地存储目录。
总结
本文介绍了DiskBlockManager的相关设计细节,主要包含其对Spark磁盘存储目录、子目录及文件的创建和管理。至于实际的文件读写,则由磁盘存储DiskStore来负责。DiskStore的实现也比MemoryStore要来得简单,下一篇文章会来探讨它。