目录
前言
一个多月没顾上这个专题,惭愧惭愧。十一期间尽量赶上进度吧。
上一篇文章讲完了BlockManager管理下的数据读取流程,今天就来看写入的流程,顺便将读写放在一起总结一下。
块写入流程
块写入的入口
由于距离上一篇已经过去很久了,这里再贴一次BlockManager提供的getOrElseUpdate()方法的源码。该方法提供了块读写的统一入口。
代码#31.1 - o.a.s.storage.BlockManager.getOrElseUpdate()方法
def getOrElseUpdate[T](
blockId: BlockId,
level: StorageLevel,
classTag: ClassTag[T],
makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
get[T](blockId)(classTag) match {
case Some(block) =>
return Left(block)
case _ =>
}
doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match {
case None =>
val blockResult = getLocalValues(blockId).getOrElse {
releaseLock(blockId)
throw new SparkException(s"get() failed for block $blockId even though we held a lock")
}
releaseLock(blockId)
Left(blockResult)
case Some(iter) =>
Right(iter)
}
}
该方法会首先根据块ID尝试读取数据(先从本地,后从远端)。如果获取不到,就调用传入的makeIterator函数将数据转化为迭代器并写入之。最终将读取或写入的数据封装在BlockResult结构中返回。写入数据的方法是doPutIterator(),下面来看它的代码。
doPutIterator()方法
代码#31.2 - o.a.s.storage.BlockManager.doPutIterator()方法
private def doPutIterator[T](
blockId: BlockId,
iterator: () => Iterator[T],
level: StorageLevel,
classTag: ClassTag[T],
tellMaster: Boolean = true,
keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator[T]] = {
doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info =>
val startTimeMs = System.currentTimeMillis
var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator[T]] = None
var size = 0L
if (level.useMemory) {
if (level.deserialized) {
memoryStore.putIteratorAsValues(blockId, iterator(), classTag) match {
case Right(s) =>
size = s
case Left(iter) =>
if (level.useDisk) {
logWarning(s"Persisting block $blockId to disk instead.")
diskStore.put(blockId) { channel =>
val out = Channels.newOutputStream(channel)
serializerManager.dataSerializeStream(blockId, out, iter)(classTag)
}
size = diskStore.getSize(blockId)
} else {
iteratorFromFailedMemoryStorePut = Some(iter)
}
}
} else {
memoryStore.putIteratorAsBytes(blockId, iterator(), classTag, level.memoryMode) match {
case Right(s) =>
size = s
case Left(partiallySerializedValues) =>
if (level.useDisk) {
logWarning(s"Persisting block $blockId to disk instead.")
diskStore.put(blockId) { channel =>
val out = Channels.newOutputStream(channel)
partiallySerializedValues.finishWritingToStream(out)
}
size = diskStore.getSize(blockId)
} else {
iteratorFromFailedMemoryStorePut = Some(partiallySerializedValues.valuesIterator)
}
}
}
} else if (level.useDisk) {
diskStore.put(blockId) { channel =>
val out = Channels.newOutputStream(channel)
serializerManager.dataSerializeStream(blockId, out, iterator())(classTag)
}
size = diskStore.getSize(blockId)
}
val putBlockStatus = getCurrentBlockStatus(blockId, info)
val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
if (blockWasSuccessfullyStored) {
info.size = size
if (tellMaster && info.tellMaster) {
reportBlockStatus(blockId, putBlockStatus)
}
addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
if (level.replication > 1) {
val remoteStartTime = System.currentTimeMillis
val bytesToReplicate = doGetLocalBytes(blockId, info)
val remoteClassTag = if (!serializerManager.canUseKryo(classTag)) {
scala.reflect.classTag[Any]
} else {
classTag
}
try {
replicate(blockId, bytesToReplicate, level, remoteClassTag)
} finally {
bytesToReplicate.dispose()
}
logDebug("Put block %s remotely took %s"
.format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
}
}
assert(blockWasSuccessfullyStored == iteratorFromFailedMemoryStorePut.isEmpty)
iteratorFromFailedMemoryStorePut
}
}
虽然它本质上调用的是doPut()方法,但是整个写入的主体逻辑是作为一个函数(即doPut()方法的柯里化参数)传入的,因此我们可以先看上面的逻辑。阅读代码,可以整理出如下的执行逻辑:
- 检查块的存储等级(即StorageLevel)是否会使用内存,如果会,就优先将块展开到内存中。
- 如果存储等级是反序列化的,就调用MemoryStore.putIteratorAsValues()方法(具体逻辑见文章#26)将块数据作为对象写入。反之,如果是序列化的,就调用MemoryStore.putIteratorAsBytes()方法将块数据作为字节流写入。
- 检查上一步调用MemoryStore相关方法的结果,是否成功地展开了。如果只展开了一部分,说明内存中无法容纳块数据,因此在存储等级会使用磁盘的情况下,要继续调用DiskStore.put()方法,将多出的数据序列化地溢写到磁盘。若最终仍然没有完全展开,就将剩余的数据记录在iteratorFromFailedMemoryStorePut这个迭代器中(类型为PartiallyUnrolledIterator)。
- 若该块的存储等级不允许使用内存,而只允许使用磁盘,就直接调用DiskStore.put()方法写到磁盘中。
- 块数据写入完毕之后,如果tellMaster标记为真,调用reportBlockStatus()方法将新块的信息报告给BlockManagerMaster。
- 检查块的存储等级是否有副本(即后缀为
_2
的),如果有,还需要调用replicate()方法将块向其他节点复制一份。 - 方法返回iteratorFromFailedMemoryStorePut迭代器。
可见,块写入的过程符合Spark积极使用内存的特征。另外,通过阅读replicate()方法的代码(这里略去),可以发现复制块的过程是阻塞的。这就是在一般情况下不推荐使用带副本的StorageLevel的原因,会造成块写入性能下降,以及造成较大的网络传输开销。
然后就来看看doPut()方法的逻辑。
doPut()方法
该方法是doPutBytes()和doPutIterator()两个方法公用的方法,为写入的逻辑做一些前置和后置工作。代码如下。
代码#31.3 - o.a.s.storage.BlockManager.doPut()方法
private def doPut[T](
blockId: BlockId,
level: StorageLevel,
classTag: ClassTag[_],
tellMaster: Boolean,
keepReadLock: Boolean)(putBody: BlockInfo => Option[T]): Option[T] = {
require(blockId != null, "BlockId is null")
require(level != null && level.isValid, "StorageLevel is null or invalid")
val putBlockInfo = {
val newInfo = new BlockInfo(level, classTag, tellMaster)
if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) {
newInfo
} else {
logWarning(s"Block $blockId already exists on this machine; not re-adding it")
if (!keepReadLock) {
releaseLock(blockId)
}
return None
}
}
val startTimeMs = System.currentTimeMillis
var exceptionWasThrown: Boolean = true
val result: Option[T] = try {
val res = putBody(putBlockInfo)
exceptionWasThrown = false
if (res.isEmpty) {
if (keepReadLock) {
blockInfoManager.downgradeLock(blockId)
} else {
blockInfoManager.unlock(blockId)
}
} else {
removeBlockInternal(blockId, tellMaster = false)
logWarning(s"Putting block $blockId failed")
}
res
} catch {
case NonFatal(e) =>
logWarning(s"Putting block $blockId failed due to exception $e.")
throw e
} finally {
if (exceptionWasThrown) {
removeBlockInternal(blockId, tellMaster = tellMaster)
addUpdatedBlockStatusToTaskMetrics(blockId, BlockStatus.empty)
}
}
if (level.replication > 1) {
logDebug("Putting block %s with replication took %s"
.format(blockId, Utils.getUsedTimeMs(startTimeMs)))
} else {
logDebug("Putting block %s without replication took %s"
.format(blockId, Utils.getUsedTimeMs(startTimeMs)))
}
result
}
其中,putBody即为写入块数据的函数,keepReadLock标记表示是否在写入后还继续持有对块的读锁。方法的执行流程如下:
- 为块生成新的BlockInfo,并调用BlockInfoManager.lockNewBlockForWriting()加写锁,准备写入。BlockInfoManager的相关细节在文章#22中已经讲过,不再赘述。
- 调用putBody函数的逻辑,真正地写入块数据。
- 若写入成功,当keepReadLock为真时,就调用BlockInfoManager.downgradeLock()方法将原先持有的写锁降级为读锁,方便后续读取。反之,当keepReadLock为假时,就直接调用BlockInfoManager.unlock()方法直接释放锁。
- 若putBody未能写入全部的块数据(返回的迭代器不为空)或者中途抛出了异常,说明写入不成功,调用removeBlockInternal()方法移除失败的块。该方法的实现如下,比较简单。
代码#31.4 - o.a.s.storage.BlockManager.removeBlockInternal()方法
private def removeBlockInternal(blockId: BlockId, tellMaster: Boolean): Unit = {
val removedFromMemory = memoryStore.remove(blockId)
val removedFromDisk = diskStore.remove(blockId)
if (!removedFromMemory && !removedFromDisk) {
logWarning(s"Block $blockId could not be removed as it was not found on disk or in memory")
}
blockInfoManager.removeBlock(blockId)
if (tellMaster) {
reportBlockStatus(blockId, BlockStatus.empty)
}
}
总结
为了方便理解,下面用图示出BlockManager读写流程的方法调用链。
由上图可见,BlockTransferService充当了本地BlockManager与远程BlockManager之间交互的媒介,要想补全BlockManager读写过程的全貌,还得必须研究BlockTransferService的实现细节才可以。下一篇文章会详细地讲解。