目录
前言
如题,在前方做了很多铺垫之后,本文终于可以来看BlockManager了,可谓是千呼万唤始出来。
块管理器BlockManager会运行在Spark集群中的所有节点上。每个节点上的BlockManager通过MemoryManager、MemoryStore、DiskBlockManager、DiskStore来管理其内存、磁盘中的块,并与其他节点进行块的交互,是一个规模庞大的组件。为了避免写太多出不来,本文先聚焦在两个最基础的方面,即BlockManager的初始化与块的读取流程。写入流程和其他逻辑(比如BlockTransferService)会另开坑来讲解。
BlockManager的初始化
构造方法与属性成员
代码#30.1 - o.a.s.storage.BlockManager类的构造方法与属性成员
private[spark] class BlockManager(
executorId: String,
rpcEnv: RpcEnv,
val master: BlockManagerMaster,
val serializerManager: SerializerManager,
val conf: SparkConf,
memoryManager: MemoryManager,
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager,
val blockTransferService: BlockTransferService,
securityManager: SecurityManager,
numUsableCores: Int)
extends BlockDataManager with BlockEvictionHandler with Logging {
private[spark] val externalShuffleServiceEnabled =
conf.getBoolean("spark.shuffle.service.enabled", false)
val diskBlockManager = {
val deleteFilesOnStop =
!externalShuffleServiceEnabled || executorId == SparkContext.DRIVER_IDENTIFIER
new DiskBlockManager(conf, deleteFilesOnStop)
}
private[storage] val blockInfoManager = new BlockInfoManager
private val futureExecutionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128))
private[spark] val memoryStore =
new MemoryStore(conf, blockInfoManager, serializerManager, memoryManager, this)
private[spark] val diskStore = new DiskStore(conf, diskBlockManager, securityManager)
memoryManager.setMemoryStore(memoryStore)
private val maxOnHeapMemory = memoryManager.maxOnHeapStorageMemory
private val maxOffHeapMemory = memoryManager.maxOffHeapStorageMemory
private val externalShuffleServicePort = {
val tmpPort = Utils.getSparkOrYarnConfig(conf, "spark.shuffle.service.port", "7337").toInt
if (tmpPort == 0) {
conf.get("spark.shuffle.service.port").toInt
} else {
tmpPort
}
}
var blockManagerId: BlockManagerId = _
private[spark] var shuffleServerId: BlockManagerId = _
private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
new ExternalShuffleClient(transConf, securityManager,
securityManager.isAuthenticationEnabled(), conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT))
} else {
blockTransferService
}
private val maxFailuresBeforeLocationRefresh =
conf.getInt("spark.block.failures.beforeLocationRefresh", 5)
private val slaveEndpoint = rpcEnv.setupEndpoint(
"BlockManagerEndpoint" + BlockManager.ID_GENERATOR.next,
new BlockManagerSlaveEndpoint(rpcEnv, this, mapOutputTracker))
private var asyncReregisterTask: Future[Unit] = null
private val asyncReregisterLock = new Object
@volatile private var cachedPeers: Seq[BlockManagerId] = _
private val peerFetchLock = new Object
private var lastPeerFetchTime = 0L
private var blockReplicationPolicy: BlockReplicationPolicy = _
private[storage] val remoteBlockTempFileManager =
new BlockManager.RemoteBlockDownloadFileManager(this)
private val maxRemoteBlockToMem = conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)
// ......
}
BlockManager接受很多构造方法参数,之前已经讲过的类型就不再多说,其中还另外包含三个没有详细讲过的组件:MapOutputTracker,用于跟踪任务执行时Map任务的输出(即Reduce任务的输入),属于调度模块的一部分;ShuffleManager,用于管理Shuffle策略,在本专题之外的文章里详细分析过;BlockTransferService,顾名思义用来在各个节点之间远程传输块,这个在后面的文章中马上就会讲到。
BlockManager实现了BlockDataManager和BlockEvictionHandler两个特征,分别表示BlockManager可以管理块数据,以及从内存中淘汰块。截止目前,BlockManager是这两个特征的唯一的实现类。
下面来看看BlockManager类中的属性成员。看官已经很熟悉的组件(如MemoryStore、DiskStore等)也就不再赘述,只说几个主要的新面孔。
- externalShuffleServiceEnabled:是否启用外部Shuffle服务,由配置项spark.shuffle.service.enabled来指定,默认不启用。什么叫外部Shuffle服务?我们都知道,传统的Shuffle服务是完全靠Executor来执行的,因此CPU和I/O都非常密集。如果Spark集群是on YARN的话,那么开启外部Shuffle就会在YARN NodeManager上跑一个常驻的YarnShuffleService,用来收取和分配Shuffle数据,降低Executor的压力。
- futureExecutionContext:用于异步执行BlockManager中某些操作的守护线程池,大小为128。
- blockManagerId:该BlockManager的ID,结构在上一篇文章中已经说过了。
- shuffleServerId:用来保存Shuffle中间文件的实体ID。如果不用外部Shuffle服务的话,就与本BlockManagerId相同,否则就新建一个。
- shuffleClient:用于获取其他Executor上的Shuffle文件的客户端。如果不启用外部Shuffle服务,就是前面提到过的BlockTransferService,否则就是ExternalShuffleClient实例。现在我们暂时不深究。
- slaveEndpoint:BlockManager的从RPC端点的引用,使用RpcEnv.setupEndpoint()方法来生成。
- blockReplicationPolicy:Spark中块复制的策略。
初始化方法
SparkEnv中调用了BlockManager的initialize()方法来初始化它,代码如下。
代码#30.2 - o.a.s.storage.BlockManager.initialize()方法
def initialize(appId: String): Unit = {
blockTransferService.init(this)
shuffleClient.init(appId)
blockReplicationPolicy = {
val priorityClass = conf.get(
"spark.storage.replication.policy", classOf[RandomBlockReplicationPolicy].getName)
val clazz = Utils.classForName(priorityClass)
val ret = clazz.newInstance.asInstanceOf[BlockReplicationPolicy]
logInfo(s"Using $priorityClass for block replication policy")
ret
}
val id =
BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None)
val idFromMaster = master.registerBlockManager(
id,
maxOnHeapMemory,
maxOffHeapMemory,
slaveEndpoint)
blockManagerId = if (idFromMaster != null) idFromMaster else id
shuffleServerId = if (externalShuffleServiceEnabled) {
logInfo(s"external shuffle service port = $externalShuffleServicePort")
BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
} else {
blockManagerId
}
if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
registerWithExternalShuffleServer()
}
logInfo(s"Initialized BlockManager: $blockManagerId")
}
BlockManager初始化的流程如下:
- 初始化BlockTransferService和ShuffleClient。
- 根据配置项spark.storage.replication.policy确定块复制策略并通过反射创建。默认值为RandomBlockReplicationPolicy,说明是将块的副本随机放到不同的节点上。
- 根据Executor ID生成BlockManagerId,并调用BlockManagerMaster.registerBlockManager()方法注册此ID与从RPC端点。注册成功后,BlockManagerMaster会返回另一个正式的ID。
- 生成Shuffle服务的ID。如果当前节点是Executor并启用了外部Shuffle服务的话,就调用registerWithExternalShuffleServer()方法注册外部Shuffle服务,代码略去。
前面写了这么多,可能看官还是没有实感(其实窝自己也是)。那么接下来看块读取流程,这是BlockManager的主要任务之一,并且没那么虚。
块读写的入口
在BlockManager中提供了多种对块进行读写的方法,其中一个将读写进行统一的入口是getOrElseUpdate()方法。因为块可以由RDD物化而来,因此我们可以方便地在RDD类中(具体来说是RDD.getOrCompute()方法)找到对它的调用。为了方便分析,本文就由它来入手。先顺便看一下源码吧。
代码#30.3 - o.a.s.storage.BlockManager.getOrElseUpdate()方法
def getOrElseUpdate[T](
blockId: BlockId,
level: StorageLevel,
classTag: ClassTag[T],
makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
get[T](blockId)(classTag) match {
case Some(block) =>
return Left(block)
case _ =>
}
doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match {
case None =>
val blockResult = getLocalValues(blockId).getOrElse {
releaseLock(blockId)
throw new SparkException(s"get() failed for block $blockId even though we held a lock")
}
releaseLock(blockId)
Left(blockResult)
case Some(iter) =>
Right(iter)
}
}
该方法会首先根据块ID尝试读取数据(先从本地,后从远端)。如果获取不到,就调用传入的makeIterator函数将数据转化为迭代器并写入之。最终将读取或写入的数据封装在BlockResult结构中返回。
块读取流程
以下就是代码#30.3中调用的get()方法。
代码#30.4 - o.a.s.storage.BlockManager.get()方法
def get[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
val local = getLocalValues(blockId)
if (local.isDefined) {
logInfo(s"Found block $blockId locally")
return local
}
val remote = getRemoteValues[T](blockId)
if (remote.isDefined) {
logInfo(s"Found block $blockId remotely")
return remote
}
None
}
该方法先调用getLocalValues()方法从本地(注意是本地Executor)读取数据,如果读取不到,就继续调用getRemoteValues()方法从远端获取数据。下面分别来看。
从本地读取数据
代码#30.5 - o.a.s.storage.BlockManager.getLocalValues()方法
def getLocalValues(blockId: BlockId): Option[BlockResult] = {
logDebug(s"Getting local block $blockId")
blockInfoManager.lockForReading(blockId) match {
case None =>
logDebug(s"Block $blockId was not found")
None
case Some(info) =>
val level = info.level
logDebug(s"Level for block $blockId is $level")
val taskAttemptId = Option(TaskContext.get()).map(_.taskAttemptId())
if (level.useMemory && memoryStore.contains(blockId)) {
val iter: Iterator[Any] = if (level.deserialized) {
memoryStore.getValues(blockId).get
} else {
serializerManager.dataDeserializeStream(
blockId, memoryStore.getBytes(blockId).get.toInputStream())(info.classTag)
}
val ci = CompletionIterator[Any, Iterator[Any]](iter, {
releaseLock(blockId, taskAttemptId)
})
Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
} else if (level.useDisk && diskStore.contains(blockId)) {
val diskData = diskStore.getBytes(blockId)
val iterToReturn: Iterator[Any] = {
if (level.deserialized) {
val diskValues = serializerManager.dataDeserializeStream(
blockId,
diskData.toInputStream())(info.classTag)
maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
} else {
val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
.map { _.toInputStream(dispose = false) }
.getOrElse { diskData.toInputStream() }
serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
}
}
val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
releaseLockAndDispose(blockId, diskData, taskAttemptId)
})
Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
} else {
handleLocalReadFailure(blockId)
}
}
}
有点长,但是思路比较清晰,以下简述这个方法的执行流程:
- 调用BlockInfoManager.lockForReading()方法,为这个块加读锁,并试图返回对应的块元数据BlockInfo。
- 如果没有BlockInfo,说明该块在本地不存在。反之,检查它的StorageLevel,按优先内存、其次磁盘的顺序考虑。
- 若该块的StorageLevel显示会利用内存,并且数据在MemoryStore中,就根据该数据是否序列化的情况,调用MemoryStore.getValues()或getBytes()方法,最终获得块数据的迭代器表示。
- 若该块的StorageLevel显示会利用磁盘,并且数据在DiskStore中,就先用DiskStore.getBytes()方法获得磁盘中块数据的字节流,然后根据是否序列化做不同的处理。其中还会用到maybeCacheDiskValuesInMemory()/maybeCacheDiskBytesInMemory()试图将读取到的磁盘数据cache到内存,以加快速度。
- 调用releaseLock()或releaseLockAndDispose()方法,释放块的读锁。
- 将块数据的迭代器、读取方法和块的字节数封装在BlockResult结构中返回。如果从内存读取和从磁盘读取都失败,就调用handleLocalReadFailure()方法处理本地读取的错误。
希望说的还算明白哈。继续看从远端读取块数据的方法。
从远端读取数据
代码#30.6 - o.a.s.storage.BlockManager.getRemoteValues()方法
private def getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
val ct = implicitly[ClassTag[T]]
getRemoteBytes(blockId).map { data =>
val values =
serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true))(ct)
new BlockResult(values, DataReadMethod.Network, data.size)
}
}
这个方法很短,是因为主要逻辑都在getRemoteBytes()方法中实现了。这是很显然的,因为远端的块数据必须要序列化之后才能传输,来到本地之后再反序列化为对象,所以实际上获取的是字节流。以下则是getRemoteBytes()方法的源码。
代码#30.7 - o.a.s.storage.BlockManager.getRemoteBytes()方法
def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
logDebug(s"Getting remote block $blockId")
require(blockId != null, "BlockId is null")
var runningFailureCount = 0
var totalFailureCount = 0
val locationsAndStatus = master.getLocationsAndStatus(blockId)
val blockSize = locationsAndStatus.map { b =>
b.status.diskSize.max(b.status.memSize)
}.getOrElse(0L)
val blockLocations = locationsAndStatus.map(_.locations).getOrElse(Seq.empty)
val tempFileManager = if (blockSize > maxRemoteBlockToMem) {
remoteBlockTempFileManager
} else {
null
}
val locations = sortLocations(blockLocations)
val maxFetchFailures = locations.size
var locationIterator = locations.iterator
while (locationIterator.hasNext) {
val loc = locationIterator.next()
logDebug(s"Getting remote block $blockId from $loc")
val data = try {
blockTransferService.fetchBlockSync(
loc.host, loc.port, loc.executorId, blockId.toString, tempFileManager).nioByteBuffer()
} catch {
case NonFatal(e) =>
runningFailureCount += 1
totalFailureCount += 1
if (totalFailureCount >= maxFetchFailures) {
logWarning(s"Failed to fetch block after $totalFailureCount fetch failures. " +
s"Most recent failure cause:", e)
return None
}
logWarning(s"Failed to fetch remote block $blockId " +
s"from $loc (failed attempt $runningFailureCount)", e)
if (runningFailureCount >= maxFailuresBeforeLocationRefresh) {
locationIterator = sortLocations(master.getLocations(blockId)).iterator
logDebug(s"Refreshed locations from the driver " +
s"after ${runningFailureCount} fetch failures.")
runningFailureCount = 0
}
null
}
if (data != null) {
return Some(new ChunkedByteBuffer(data))
}
logDebug(s"The value of block $blockId is null")
}
logDebug(s"Block $blockId not found")
None
}
该方法的执行流程如下:
- 调用BlockManagerMaster.getLocationsAndStatus()方法,获取所有持有该块数据的远端BlockManager位置。
- 调用sortLocations()方法,根据BlockManagerId中的拓扑信息对BlockManager的位置进行排序。处于同一台服务器上的BlockManager排在最前,然后是同一机架上的节点的BlockManager(前提是能够感知到机架),最后才是不同机架的节点上的BlockManager。
- 对于每个远端BlockManager,调用BlockTransferService.fetchBlockSync()方法,同步地获取块数据,并以ChunkedByteBuffer形式返回。
- 如果从某个远端BlockManager获取不到块数据,就继续尝试下一个。当失败的尝试次数达到spark.block.failures.beforeLocationRefresh参数规定的阈值(默认值5)时,就主动刷新一次远端BlockManager的位置,防止过期。
- 若已经尝试了所有的远端BlockManager仍然未获取到,就认为此次读取失败。
总结
本文详细叙述了BlockManager的初始化过程,以及从本地、远端读取块数据的过程。下两篇文章会将写入块与BlockTransferService的相关细节补齐,这样我们就可以整理出BlockManager读写流程的全貌了。
晚安。