Storage模块负责管理spark在计算过程中产生的数据,对用户来说,spark的编程面向的是RDD这种抽象的逻辑数据集,对RDD的转换和动作完成对数据运算逻辑的处理。而在RDD优雅外表之下,Storage模块则是兢兢业业的管理着数据的计算,可以说是背后的功臣。
storage模块的架构
如上图所示,Storage模块与Driver和Executor遥相呼应,也是标准的Master-Slave的模式,Block对应的是RDD中Partition的概念,是用来存储数据的,而BlockManager就是用来管理每个节点上的Block,BlockManager的Master角色是在Driver上创建的,而Slave角色是在Executors上创建的,想要理解这个架构图,需要先看一下BlockManager类的构造方法和变量
**
* Manager running on every node (driver and executors) which provides interfaces for putting and
* retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
*
* Note that #initialize() must be called before the BlockManager is usable.
*/
private[spark] class BlockManager(
executorId: String,
rpcEnv: RpcEnv,
val master: BlockManagerMaster,
defaultSerializer: Serializer,
val conf: SparkConf,
val memoryManager: MemoryManager,
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager,
blockTransferService: BlockTransferService,
securityManager: SecurityManager,
numUsableCores: Int)
extends BlockDataManager with Logging
在BlockManager的参数列表中,传入了BlockManagerMaster实例
private val slaveEndpoint = rpcEnv.setupEndpoint(
"BlockManagerEndpoint" + BlockManager.ID_GENERATOR.next,
new BlockManagerSlaveEndpoint(rpcEnv, this, mapOutputTracker))
而在BlockManager的变量中,创建了BlockManagerSlaveEndpoint对象
BlockManager运行在每一个节点(Driver和Executors),需要调用其initialize()方法进行初始化后才能使用,initialize方法:
def initialize(appId: String): Unit = {
// 初始化BlockTransferService和ShuffleClient用于shuffle过程
blockTransferService.init(this)
shuffleClient.init(appId)
blockManagerId = BlockManagerId(
executorId, blockTransferService.hostName, blockTransferService.port)
// 外部shuffle服务配置
shuffleServerId = if (externalShuffleServiceEnabled) {
logInfo(s"external shuffle service port = $externalShuffleServicePort")
BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
} else {
blockManagerId
}
// 向master注册
master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)
// Register Executors' configuration with the local shuffle service, if one should exist.
if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
registerWithExternalShuffleServer()
}
}
private def registerWithExternalShuffleServer() {
可以看到,在初始化方法中,向master注册了BlockManager的信息,也就同时注册了所有的slaveEndpoint
以上,可以总结为两点:
- Driver端的BlockManager实例化了BlockManagerMaster并注册了BlockManagerMasterEndpoint,且持有所有BlockManagerSlaveEndpoint实例的引用
- Executor端的BlockManager实例化过程中持有了BlockManagerMaster的引用,并向BlockManagerMasterEndpoint注册当前节点的BlockManagerSlaveEndpoint
所以storage实际的框架应该如下所示storage-frame2,但为了表达简单,就简化为上图storage-frame1
BlockManagerMasterEndpoint中维护了一些数据结构,为了方便下面的源码阅读,有必要单独拉出来说明一下:
- private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]
Mapping from block manager id to the block manager's information.
保存了BlockManagerId与BlockManagerInfo的映射
BlockManagerInfo保存了Slave节点的内存使用情况、Block的状态、BlockManagerSlaveEndpoint的Reference
- private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]
Mapping from executor ID to block manager ID.
保存了Executor ID与BlockManagerId的映射,这样Master就可以通过ExecutorID迅速的查找到相应的BlockManagerId
- private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
Mapping from block id to the set of block managers that have the block.
保存Block是在那些BlockManager上的,因为Block可能在多个Slave上都有备份
Master和Slave的消息传递
通过上面的架构说明可知,Driver上的BlockManagerMaster管理着所有Block的元数据信息,而Executor上则储存着所有的Block数据,那Master和Slave之间是如何通信的呢?我们通过RDD的unpersist()命令来了解一下这个过程。
RDD的unpersist命令是用来释放缓存数据的
def unpersist(blocking: Boolean = true): this.type = {
logInfo("Removing RDD " + id + " from persistence list")
sc.unpersistRDD(id, blocking)
storageLevel = StorageLevel.NONE
this
}
调用sparkContext的unpersistRDD命令:
/**
* Unpersist an RDD from memory and/or disk storage
*/
private[spark] def unpersistRDD(rddId: Int, blocking: Boolean = true) {
env.blockManager.master.removeRdd(rddId, blocking)
persistentRdds.remove(rddId)
listenerBus.post(SparkListenerUnpersistRDD(rddId))
}
这里的blockManager是Driver节点的,调用master的removeRdd()
/** Remove all blocks belonging to the given RDD. */
def removeRdd(rddId: Int, blocking: Boolean) {
val future = driverEndpoint.askWithRetry[Future[Seq[Int]]](RemoveRdd(rddId))
future.onFailure {
case e: Exception =>
logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}", e)
}(ThreadUtils.sameThread)
if (blocking) {
timeout.awaitResult(future)
}
}
BlockManagerMaster首先通知各Slave节点,发送RemoveRdd的消息,拿到结果后做一些处理
Master端的动作最后通过调用BlockManagerMasterEndpoint的removeRdd()方法来完成
private def removeRdd(rddId: Int): Future[Seq[Int]] = {
// First remove the metadata for the given RDD, and then asynchronously remove the blocks
// from the slaves.
// Find all blocks for the given RDD, remove the block from both blockLocations and
// the blockManagerInfo that is tracking the blocks.
// 首先要删除Master上保存的关于此RDD的元数据信息
val blocks: Iterable[RDDBlockId] = blockLocations.asScala.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)
blocks.foreach { blockId =>
val bms: mutable.HashSet[BlockManagerId] = blockLocations.get(blockId)
bms.foreach(bm => blockManagerInfo.get(bm).foreach(_.removeBlock(blockId)))
blockLocations.remove(blockId)
}
// Ask the slaves to remove the RDD, and put the result in a sequence of Futures.
// The dispatcher is used as an implicit argument into the Future sequence construction.
// 其次要删除Slave上的关于此RDD的信息
val removeMsg = RemoveRdd(rddId)
Future.sequence(
blockManagerInfo.values.map { bm =>
bm.slaveEndpoint.ask[Int](removeMsg)
}.toSeq
)
}
首先处理了Master中的有关要删除RDD的元数据信息:在blockLocations 中删除了有关于此RDD的Block信息,然后想各个slave节点发送RemoveRdd信息
BlockManagerSlaveEndpoint节点在接收到Master节点的信息后作何处理呢?
case RemoveRdd(rddId) =>
doAsync[Int]("removing RDD " + rddId, context) {
blockManager.removeRdd(rddId)
}
最后调用的是BlockManager的removeBlock方法来完成slave节点删除RDD缓存的任务
def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = {
logDebug(s"Removing block $blockId")
val info: BlockInfo = blockInfo.get(blockId).orNull
```scala
case RemoveRdd(rddId) =>
doAsync[Int]("removing RDD " + rddId, context) {
blockManager.removeRdd(rddId)
}
最后调用的是BlockManager的removeBlock方法来完成slave节点删除RDD缓存的任务
def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = {
logDebug(s"Removing block $blockId")
val info: BlockInfo = blockInfo.get(blockId).orNull
if (info != null && pendingToRemove.putIfAbsent(blockId, currentTaskAttemptId) == 0L) {
try {
info.synchronized {
val level = info.level
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
val removedFromMemory = if (level.useMemory) memoryStore.remove(blockId) else false
val removedFromDisk = if (level.useDisk) diskStore.remove(blockId) else false
val removedFromExternalBlockStore =
if (externalBlockStoreInitialized) externalBlockStore.remove(blockId) else false
if (!removedFromMemory && !removedFromDisk && !removedFromExternalBlockStore) {
logWarning(s"Block $blockId could not be removed as it was not found in either " +
"the disk, memory, or external block store")
}
blockInfo.remove(blockId)
val status = getCurrentBlockStatus(blockId, info)
if (tellMaster && info.tellMaster) {
reportBlockStatus(blockId, info, status)
}
Option(TaskContext.get()).foreach { tc =>
val metrics = tc.taskMetrics()
val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
metrics.updatedBlocks = Some(lastUpdatedBlocks ++ Seq((blockId, status)))
}
}
} finally {
pendingToRemove.remove(blockId)
}
} else {
// The block has already been removed; do nothing.
logWarning(s"Asked to remove block $blockId, which does not exist")
}
}
Storage数据可能存储在内存、磁盘或是外部存储中,在Executor上移除相关RDD数据的时候这三个地方都需要考虑到,删除完之后需要更新一下数据结构和TaskContext中的元数据。
经过以上master和slave的消息传递和一系列操作之后,unpersist操作已经完成,相关RDD的缓存已从Storage中删除。
以下将列举BlockManagerMasterEndpoint和BlockManagerSlaveEndpoint之间各种消息传递内容:
master 的消息
// 由BlockManagerMaster向BlockManagerMasterEndpoint发起的注册,通过注册,Master Endpoint会保存该BlockManager所包含的Block信息
case RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint) =>
register(blockManagerId, maxMemSize, slaveEndpoint)
context.reply(true)
// 向Master汇报Block的信息,Master会记录这些信息并且供slave查询
case _updateBlockInfo @ UpdateBlockInfo(
blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize) =>
context.reply(updateBlockInfo(
blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize))
listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
// 获得某个Block所在的位置信息,返回有BlockManagerId组成的列表,Block可能在多个节点都有备份
case GetLocations(blockId) =>
context.reply(getLocations(blockId))
// 一次获取多个Block的位置信息
case GetLocationsMultipleBlockIds(blockIds) =>
context.reply(getLocationsMultipleBlockIds(blockIds))
// 获取其他的BlockManager,这个在做Block的副本时会用到
case GetPeers(blockManagerId) =>
context.reply(getPeers(blockManagerId))
case GetExecutorEndpointRef(executorId) =>
context.reply(getExecutorEndpointRef(executorId))
// 获取所有的Executor的内存使用状态,包括使用的最大内存大小、剩余内存大小
case GetMemoryStatus =>
context.reply(memoryStatus)
// 获取每个Executor的Storage状态,包括最大可用内存数和Block的信息
case GetStorageStatus =>
context.reply(storageStatus)
// 根据blockId获取Block的status,一般用于测试
case GetBlockStatus(blockId, askSlaves) =>
context.reply(blockStatus(blockId, askSlaves))
// 根据filter筛选合适的Block,返回BlockId
case GetMatchingBlockIds(filter, askSlaves) =>
context.reply(getMatchingBlockIds(filter, askSlaves))
// 根据RDDId删除RDD的缓存
case RemoveRdd(rddId) =>
context.reply(removeRdd(rddId))
// 根据shuffleId移除相关的Shuffle
case RemoveShuffle(shuffleId) =>
context.reply(removeShuffle(shuffleId))
// 移除广播变量
case RemoveBroadcast(broadcastId, removeFromDriver) =>
context.reply(removeBroadcast(broadcastId, removeFromDriver))
// 移除Block
case RemoveBlock(blockId) =>
removeBlockFromWorkers(blockId)
context.reply(true)
// 删除Master上保存的executorId对应的Executor上的BlockManager的信息
case RemoveExecutor(execId) =>
removeExecutor(execId)
context.reply(true)
// 在停止BlockManagerMaster的时候调用,它会停止Master的Endpoint
case StopBlockManagerMaster =>
context.reply(true)
stop()
// Master和Slave的心跳的实现,通过Executor和Driver之间的心跳来实现的
case BlockManagerHeartbeat(blockManagerId) =>
context.reply(heartbeatReceived(blockManagerId))
// 根据executorId判断是否cache了Block
case HasCachedBlocks(executorId) =>
blockManagerIdByExecutor.get(executorId) match {
case Some(bm) =>
if (blockManagerInfo.contains(bm)) {
val bmInfo = blockManagerInfo(bm)
context.reply(bmInfo.cachedBlocks.nonEmpty)
} else {
context.reply(false)
}
case None => context.reply(false)
}
slave的消息
// 根据blockId删除该Executor上的Block
case RemoveBlock(blockId) =>
doAsync[Boolean]("removing block " + blockId, context) {
blockManager.removeBlock(blockId)
true
}
// 根据rddId删除该Executor上的Rdd
case RemoveRdd(rddId) =>
doAsync[Int]("removing RDD " + rddId, context) {
blockManager.removeRdd(rddId)
}
//根据shuffleID删除该Executor上与此shuffle有关的Block,需要先卸载mapOutputTracker上的注册信息
case RemoveShuffle(shuffleId) =>
doAsync[Boolean]("removing shuffle " + shuffleId, context) {
if (mapOutputTracker != null) {
mapOutputTracker.unregisterShuffle(shuffleId)
}
SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId)
}
case RemoveBroadcast(broadcastId, _) =>
doAsync[Int]("removing broadcast " + broadcastId, context) {
blockManager.removeBroadcast(broadcastId, tellMaster = true)
}
// 根据blockId向Master返回该Block的status,一般用于测试,注意这个操作非常耗时
case GetBlockStatus(blockId, _) =>
context.reply(blockManager.getStatus(blockId))
// 根据filter向Master返回符合filter的所有BlockId,一般用于测试
case GetMatchingBlockIds(filter, _) =>
context.reply(blockManager.getMatchingBlockIds(filter))
case TriggerThreadDump =>
context.reply(Utils.getThreadDump())