概述
spark的内存管理有两套方案,新旧方案分别对应的类是UnifiedMemoryManager和StaticMemoryManager。
旧方案是静态的,storageMemory(存储内存)和executionMemory(执行内存)拥有的内存是独享的不可相互借用,故在其中一方内存充足,另一方内存不足但又不能借用的情况下会造成资源的浪费。新方案是统一管理的,初始状态是内存各占一半,但其中一方内存不足时可以向对方借用,对内存资源进行合理有效的利用,提高了整体资源的利用率。
总的来说内存分为三大块,包括storageMemory、executionMemory、系统预留,其中storageMemory用来缓存rdd,unroll partition,存放direct task result、广播变量,在 Spark Streaming receiver 模式中存放每个 batch 的 blocks。executionMemory用于shuffle、join、sort、aggregation 中的缓存。除了这两者以外的内存都是预留给系统的。
旧方案 StaticMemoryManager
在SparkEnv中会创建memoryManager:
val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
val memoryManager: MemoryManager =
if (useLegacyMemoryManager) {
new StaticMemoryManager(conf, numUsableCores)
} else {
UnifiedMemoryManager(conf, numUsableCores)
}
默认使用的是统一管理方案UnifiedMemoryManager,这里我们简要的看看旧方案StaticMemoryManager。
storageMemory能分到的内存是:
systemMaxMemory * memoryFraction * safetyFraction
其中:
- systemMaxMemory :Runtime.getRuntime.maxMemory,即JVM能获得的最大内存空间。
- memoryFraction:由参数spark.storage.memoryFraction控制,默认0.6。
- safetyFraction:由参数spark.storage.safetyFraction控制,默认是0.9,因为cache block都是估算的,所以需要一个安全系数来保证安全。
executionMemory能分到的内存是:
systemMaxMemory * memoryFraction * safetyFraction
其中:
- systemMaxMemory :Runtime.getRuntime.maxMemory,即JVM能获得的最大内存空间。
- memoryFraction:由参数spark.shuffle.memoryFraction控制,默认0.2。
- safetyFraction:由参数spark.shuffle.safetyFraction控制,默认是0.8。
memoryFraction系数之外和安全系数之外的内存就是给系统预留的了。
executionMemory能分到的内存直接影响了shuffle中spill的频率,增加executionMemory可减少spill的次数,但storageMemory能cache的容量也相应减少。
execution 和 storage 被分配到内存后大小就一直不变了,每次申请内存都只能申请自己独有的不能相互借用,会造成资源的浪费。另外,只有 execution 内存支持 off heap,storage 内存不支持 off heap。
新方案 UnifiedMemoryManager
由于新方案中storageMemory和executionMemory是统一管理的,我们看看两者一共能拿到多少内存。
private def getMaxMemory(conf: SparkConf): Long = {
val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
val reservedMemory = conf.getLong("spark.testing.reservedMemory",
if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
if (systemMemory < minSystemMemory) {
throw new IllegalArgumentException(s"System memory $systemMemory must " +
s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " +
s"option or spark.driver.memory in Spark configuration.")
}
// SPARK-12759 Check executor memory to fail fast if memory is insufficient
if (conf.contains("spark.executor.memory")) {
val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
if (executorMemory < minSystemMemory) {
throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
s"$minSystemMemory. Please increase executor memory using the " +
s"--executor-memory option or spark.executor.memory in Spark configuration.")
}
}
val usableMemory = systemMemory - reservedMemory
val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6)
(usableMemory * memoryFraction).toLong
}
首先给系统内存reservedMemory预留了300M,若jvm能拿到的最大内存和配置的executor内存分别不足以reservedMemory的1.5倍即450M都会抛出异常,最后storage和execution能拿到的内存为:
(heap space - 300) * spark.memory.fraction (默认为0.6)
storage和execution各占所获内存的50%。
申请storage内存
为某个blockId申请numBytes大小的内存:
override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
memoryMode: MemoryMode): Boolean = synchronized {
assertInvariants()
assert(numBytes >= 0)
val (executionPool, storagePool, maxMemory) = memoryMode match {
case MemoryMode.ON_HEAP => (
onHeapExecutionMemoryPool,
onHeapStorageMemoryPool,
maxOnHeapStorageMemory)
case MemoryMode.OFF_HEAP => (
offHeapExecutionMemoryPool,
offHeapStorageMemoryPool,
maxOffHeapMemory)
}
// 申请的内存大于storage和execution内存之和
if (numBytes > maxMemory) {
// Fail fast if the block simply won't fit
logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
s"memory limit ($maxMemory bytes)")
return false
}
// 大于storage空闲内存
if (numBytes > storagePool.memoryFree) {
// There is not enough free memory in the storage pool, so try to borrow free memory from
// the execution pool.
val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes)
executionPool.decrementPoolSize(memoryBorrowedFromExecution)
storagePool.incrementPoolSize(memoryBorrowedFromExecution)
}
storagePool.acquireMemory(blockId, numBytes)
}
- 若申请的numBytes比两者总共的内存还大,直接返回false,说明申请失败。
- 若numBytes比storage空闲的内存大,则需要向executionPool借用
- 借用的大小为此时execution的空闲内存和numBytes的较小值(个人观点应该是和<numBytes-storage空闲内存>的较小值)
- 减小execution的poolSize
- 增加storage的poolSize
即使向executionPool借用了内存,但不一定就够numBytes,因为不可能把execution正在使用的内存都接过来,接着调用了storagePool的acquireMemory方法在不够numBytes的情况下去释放storage中共cache的rdd,以增加storagePool.memoryFree的值:
def acquireMemory(blockId: BlockId, numBytes: Long): Boolean = lock.synchronized {
val numBytesToFree = math.max(0, numBytes - memoryFree)
acquireMemory(blockId, numBytes, numBytesToFree)
}
计算出向execution借了内存后还差多少内存才能满足numBytes,即需要释放的内存numBytesToFree 。接着调用了acquireMemory方法:
def acquireMemory(
blockId: BlockId,
numBytesToAcquire: Long,
numBytesToFree: Long): Boolean = lock.synchronized {
assert(numBytesToAcquire >= 0)
assert(numBytesToFree >= 0)
assert(memoryUsed <= poolSize)
if (numBytesToFree > 0) {
memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, memoryMode)
}
// NOTE: If the memory store evicts blocks, then those evictions will synchronously call
// back into this StorageMemoryPool in order to free memory. Therefore, these variables
// should have been updated.
val enoughMemory = numBytesToAcquire <= memoryFree
if (enoughMemory) {
_memoryUsed += numBytesToAcquire
}
enoughMemory
}
当numBytesToFree 大于0的情况下,就真的要去释放缓存在memory中的block,释放完后再看空闲内存是否能满足numBytes,若满足则将numBytes加到已使用的变量里。
看看当需要从storay中释放block的时候是怎么释放的:
private[spark] def evictBlocksToFreeSpace(
blockId: Option[BlockId],
space: Long,
memoryMode: MemoryMode): Long = {
assert(space > 0)
memoryManager.synchronized {
var freedMemory = 0L
val rddToAdd = blockId.flatMap(getRddId)
val selectedBlocks = new ArrayBuffer[BlockId]
def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = {
entry.memoryMode == memoryMode && (rddToAdd.isEmpty || rddToAdd != getRddId(blockId))
}
// This is synchronized to ensure that the set of entries is not changed
// (because of getValue or getBytes) while traversing the iterator, as that
// can lead to exceptions.
entries.synchronized {
val iterator = entries.entrySet().iterator()
while (freedMemory < space && iterator.hasNext) {
val pair = iterator.next()
val blockId = pair.getKey
val entry = pair.getValue
if (blockIsEvictable(blockId, entry)) {
// We don't want to evict blocks which are currently being read, so we need to obtain
// an exclusive write lock on blocks which are candidates for eviction. We perform a
// non-blocking "tryLock" here in order to ignore blocks which are locked for reading:
if (blockInfoManager.lockForWriting(blockId, blocking = false).isDefined) {
selectedBlocks += blockId
freedMemory += pair.getValue.size
}
}
}
}
def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = {
val data = entry match {
case DeserializedMemoryEntry(values, _, _) => Left(values)
case SerializedMemoryEntry(buffer, _, _) => Right(buffer)
}
val newEffectiveStorageLevel =
blockEvictionHandler.dropFromMemory(blockId, () => data)(entry.classTag)
if (newEffectiveStorageLevel.isValid) {
// The block is still present in at least one store, so release the lock
// but don't delete the block info
blockInfoManager.unlock(blockId)
} else {
// The block isn't present in any store, so delete the block info so that the
// block can be stored again
blockInfoManager.removeBlock(blockId)
}
}
if (freedMemory >= space) {
logInfo(s"${selectedBlocks.size} blocks selected for dropping " +
s"(${Utils.bytesToString(freedMemory)} bytes)")
for (blockId <- selectedBlocks) {
val entry = entries.synchronized { entries.get(blockId) }
// This should never be null as only one task should be dropping
// blocks and removing entries. However the check is still here for
// future safety.
if (entry != null) {
dropBlock(blockId, entry)
}
}
logInfo(s"After dropping ${selectedBlocks.size} blocks, " +
s"free memory is ${Utils.bytesToString(maxMemory - blocksMemoryUsed)}")
freedMemory
} else {
blockId.foreach { id =>
logInfo(s"Will not store $id")
}
selectedBlocks.foreach { id =>
blockInfoManager.unlock(id)
}
0L
}
}
}
spark中内存中的block都是通过memoryStore来存储的,用
private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true)
来维护了blockId和MemoryEntry(对应value的包装)的关联,另外方法中还定义了两个方法,blockIsEvictable方法是判断遍历到的blockId和当前blockId是否属于同一个rdd,因为不能提出同一个rdd的另外一个block。dropBlock方法就是真正执行从内存中移除block的,若StorageLevel包括了使用disk,则会写到磁盘文件。
整段代码的逻辑简单概述就是:遍历当前memoryStore中存的每个block(不是和当前请求的block属于同于同一rdd),直到block对应的内存之和大于所需释放的内存才停止遍历,也有可能遍历完了都还不能满足所需的内存。若能释放的内存满足所需的内存,则真正执行移除,否则不移除,因为不可能一个block在内存中一部分,在磁盘一部分,最后返回真正剔除block释放的内存。
总结一下向StorageMemory申请内存的过程(在MemoryMode.ON_HEAP模式下):
- 若numBytes大于storage和execution内存之和,抛异常。
- 若numBytes大于storage空闲内存,向execution借用min(executionFree,numBytes)大的内存,并更新各自的poolSize。
- 若申请完后还不够,则释放storage中的block来补足。
- memoryStore缓存的block大小满足需要补足的大小,则真正执行剔除(遍历block直到内存满足需求对应的block),否则不剔除。
- 最终若空闲内存满足numBytes则返回true,否则返回false。
申请execution内存
在execution内存不足向storage借用时,还是不满足所需内存的情况下能借多少借多少。看看在需要向execution申请内存时是怎么处理的(MemoryMode.ON_HEAP模式下):
override private[memory] def acquireExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Long = synchronized {
assertInvariants()
assert(numBytes >= 0)
val (executionPool, storagePool, storageRegionSize, maxMemory) = memoryMode match {
case MemoryMode.ON_HEAP => (
onHeapExecutionMemoryPool,
onHeapStorageMemoryPool,
onHeapStorageRegionSize,
maxHeapMemory)
case MemoryMode.OFF_HEAP => (
offHeapExecutionMemoryPool,
offHeapStorageMemoryPool,
offHeapStorageMemory,
maxOffHeapMemory)
}
/**
* Grow the execution pool by evicting cached blocks, thereby shrinking the storage pool.
*
* When acquiring memory for a task, the execution pool may need to make multiple
* attempts. Each attempt must be able to evict storage in case another task jumps in
* and caches a large block between the attempts. This is called once per attempt.
*/
def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
if (extraMemoryNeeded > 0) {
// There is not enough free memory in the execution pool, so try to reclaim memory from
// storage. We can reclaim any free memory from the storage pool. If the storage pool
// has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim
// the memory that storage has borrowed from execution.
val memoryReclaimableFromStorage = math.max(
storagePool.memoryFree,
storagePool.poolSize - storageRegionSize)
if (memoryReclaimableFromStorage > 0) {
// Only reclaim as much space as is necessary and available:
val spaceToReclaim = storagePool.freeSpaceToShrinkPool(
math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
storagePool.decrementPoolSize(spaceToReclaim)
executionPool.incrementPoolSize(spaceToReclaim)
}
}
}
/**
* The size the execution pool would have after evicting storage memory.
*
* The execution memory pool divides this quantity among the active tasks evenly to cap
* the execution memory allocation for each task. It is important to keep this greater
* than the execution pool size, which doesn't take into account potential memory that
* could be freed by evicting storage. Otherwise we may hit SPARK-12155.
*
* Additionally, this quantity should be kept below `maxMemory` to arbitrate fairness
* in execution memory allocation across tasks, Otherwise, a task may occupy more than
* its fair share of execution memory, mistakenly thinking that other tasks can acquire
* the portion of storage memory that cannot be evicted.
*/
def computeMaxExecutionPoolSize(): Long = {
maxMemory - math.min(storagePool.memoryUsed, storageRegionSize)
}
executionPool.acquireMemory(
numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize)
}
这里先讲解这里面的两个方法:
maybeGrowExecutionPool就是需要向storage借内存的方法,能借用的最大内存memoryReclaimableFromStorage 为storage的空闲内存和storage向execution借用的内存(即已经使用也要释放来归还)的较大值,若memoryReclaimableFromStorage为0,则说明storage之前没有向execution借用内存,并且此时storage没有空闲的内存可借。
最终申请借用的是所需内存和memoryReclaimableFromStorage的较小值(缺多少借多少),跟进storagePool.freeSpaceToShrinkPool方法看看其实现:
def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized {
val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree)
val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
if (remainingSpaceToFree > 0) {
// If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
val spaceFreedByEviction =
memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, memoryMode)
// When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do
// not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
} else {
spaceFreedByReleasingUnusedMemory
}
}
若storage空闲内存不足以所申请的内存,则需要通过释放storage中缓存的block来补充。
方法computeMaxExecutionPoolSize即计算的是execution拥有的最大可用内存。
接着通过这两个函数作为参数调用了方法executionPool.acquireMemory:
private[memory] def acquireMemory(
numBytes: Long,
taskAttemptId: Long,
maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit,
computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {
assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")
// TODO: clean up this clunky method signature
// Add this task to the taskMemory map just so we can keep an accurate count of the number
// of active tasks, to let other tasks ramp down their memory in calls to `acquireMemory`
if (!memoryForTask.contains(taskAttemptId)) {
memoryForTask(taskAttemptId) = 0L
// This will later cause waiting tasks to wake up and check numTasks again
lock.notifyAll()
}
// Keep looping until we're either sure that we don't want to grant this request (because this
// task would have more than 1 / numActiveTasks of the memory) or we have enough free
// memory to give it (we always let each task get at least 1 / (2 * numActiveTasks)).
// TODO: simplify this to limit each task to its own slot
while (true) {
val numActiveTasks = memoryForTask.keys.size
val curMem = memoryForTask(taskAttemptId)
// In every iteration of this loop, we should first try to reclaim any borrowed execution
// space from storage. This is necessary because of the potential race condition where new
// storage blocks may steal the free execution memory that this task was waiting for.
maybeGrowPool(numBytes - memoryFree)
// Maximum size the pool would have after potentially growing the pool.
// This is used to compute the upper bound of how much memory each task can occupy. This
// must take into account potential free memory as well as the amount this pool currently
// occupies. Otherwise, we may run into SPARK-12155 where, in unified memory management,
// we did not take into account space that could have been freed by evicting cached blocks.
val maxPoolSize = computeMaxPoolSize()
val maxMemoryPerTask = maxPoolSize / numActiveTasks
val minMemoryPerTask = poolSize / (2 * numActiveTasks)
// How much we can grant this task; keep its share within 0 <= X <= 1 / numActiveTasks
val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))
// Only give it as much memory as is free, which might be none if it reached 1 / numTasks
val toGrant = math.min(maxToGrant, memoryFree)
// We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
// if we can't give it this much now, wait for other tasks to free up memory
// (this happens if older tasks allocated lots of memory before N grew)
if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
lock.wait()
} else {
memoryForTask(taskAttemptId) += toGrant
return toGrant
}
}
0L // Never reached
}
里面定义了一个Task能使用的execution内存:
val maxPoolSize = computeMaxPoolSize()
val maxMemoryPerTask = maxPoolSize / numActiveTasks
val minMemoryPerTask = poolSize / (2 * numActiveTasks)
其中maxPoolSize 为从 storage 借用了内存后,executionMemoryPool 的最大可用内存,保证一个Task可用的内存在 1/2*numActiveTasks ~ 1/numActiveTasks 范围内,整体保证各个Task资源占用平衡。
向execution申请内存代码流程:
- 先获取Task目前已经分配到的内存。
- 当numBytes大于execution空闲内存,则会通过maybeGrowPool方法向storage借内存。
- 能获取的最大内存maxToGrant为numBytes和(maxMemoryPerTask - curMem)的较小值。
- 本次循环能获取真正的内存toGrant为maxToGrant和(execution向memory借用后可用的内存)的较小值。
- 若最终能申请的内存小于numBytes且申请的内存加上原来有的内存还不足以一个Task最小的使用内存minMemoryPerTask,则会阻塞,直到有足够的内存或者有新的Task进来减小了minMemoryPerTask的值。
否则直接返回本次分配到的内存。
对于向storage和execution申请内存以及相互借用内存的方式至此讲解完成。用到storage和execution内存的地方很多(看概述),其中缓存rdd会向storage申请内存,运行Task会向execution申请内存,接下来分别看看是在什么时候申请的。
缓存 RDD
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
getOrCompute(split, context)
} else {
computeOrReadCheckpoint(split, context)
}
}
每个rdd分区的数据都是通过对应的迭代器得到,其中若存储级别不为NONE,则会先尝试从储存介质中(内存、磁盘文件等)获取,第一次获取当然都没有,只有先计算完缓存起来以供后续的计算直接获取。缓存序列化和非序列化的数据的缓存方式不一样,非序列化的缓存的代码是:
memoryStore.putIteratorAsValues(blockId, iterator(), classTag)
private[storage] def putIteratorAsValues[T](
blockId: BlockId,
values: Iterator[T],
classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
// Number of elements unrolled so far
var elementsUnrolled = 0
// Whether there is still enough memory for us to continue unrolling this block
var keepUnrolling = true
// Initial per-task memory to request for unrolling blocks (bytes).
val initialMemoryThreshold = unrollMemoryThreshold
// How often to check whether we need to request more memory
val memoryCheckPeriod = 16
// Memory currently reserved by this task for this particular unrolling operation
var memoryThreshold = initialMemoryThreshold
// Memory to request as a multiple of current vector size
val memoryGrowthFactor = 1.5
// Keep track of unroll memory used by this particular block / putIterator() operation
var unrollMemoryUsedByThisBlock = 0L
// Underlying vector for unrolling the block
var vector = new SizeTrackingVector[T]()(classTag)
// Request enough memory to begin unrolling
keepUnrolling =
reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, MemoryMode.ON_HEAP)
if (!keepUnrolling) {
logWarning(s"Failed to reserve initial memory threshold of " +
s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
} else {
unrollMemoryUsedByThisBlock += initialMemoryThreshold
}
// Unroll this block safely, checking whether we have exceeded our threshold periodically
while (values.hasNext && keepUnrolling) {
vector += values.next()
if (elementsUnrolled % memoryCheckPeriod == 0) {
// If our vector's size has exceeded the threshold, request more memory
val currentSize = vector.estimateSize()
if (currentSize >= memoryThreshold) {
val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
keepUnrolling =
reserveUnrollMemoryForThisTask(blockId, amountToRequest, MemoryMode.ON_HEAP)
if (keepUnrolling) {
unrollMemoryUsedByThisBlock += amountToRequest
}
// New threshold is currentSize * memoryGrowthFactor
memoryThreshold += amountToRequest
}
}
elementsUnrolled += 1
}
if (keepUnrolling) {
// We successfully unrolled the entirety of this block
val arrayValues = vector.toArray
vector = null
val entry =
new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag)
val size = entry.size
def transferUnrollToStorage(amount: Long): Unit = {
// Synchronize so that transfer is atomic
memoryManager.synchronized {
releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount)
val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP)
assert(success, "transferring unroll memory to storage memory failed")
}
}
// Acquire storage memory if necessary to store this block in memory.
val enoughStorageMemory = {
if (unrollMemoryUsedByThisBlock <= size) {
val acquiredExtra =
memoryManager.acquireStorageMemory(
blockId, size - unrollMemoryUsedByThisBlock, MemoryMode.ON_HEAP)
if (acquiredExtra) {
transferUnrollToStorage(unrollMemoryUsedByThisBlock)
}
acquiredExtra
} else { // unrollMemoryUsedByThisBlock > size
// If this task attempt already owns more unroll memory than is necessary to store the
// block, then release the extra memory that will not be used.
val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory)
transferUnrollToStorage(size)
true
}
}
if (enoughStorageMemory) {
entries.synchronized {
entries.put(blockId, entry)
}
logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(
blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
Right(size)
} else {
assert(currentUnrollMemoryForThisTask >= unrollMemoryUsedByThisBlock,
"released too much unroll memory")
Left(new PartiallyUnrolledIterator(
this,
MemoryMode.ON_HEAP,
unrollMemoryUsedByThisBlock,
unrolled = arrayValues.toIterator,
rest = Iterator.empty))
}
} else {
// We ran out of space while unrolling the values for this block
logUnrollFailureMessage(blockId, vector.estimateSize())
Left(new PartiallyUnrolledIterator(
this,
MemoryMode.ON_HEAP,
unrollMemoryUsedByThisBlock,
unrolled = vector.iterator,
rest = values))
}
}
代码太长了,我自己看到都头大了,没事,咱一点一点的慢慢来~
参数中的blockId是一个block的唯一标示,格式是"rdd_" + rddId + "_" + splitIndex
,value就是该partition对应数据的迭代器,
- 通过reserveUnrollMemoryForThisTask方法向Storage申请initialMemoryThreshold(初始值可通过spark.storage.unrollMemoryThreshold配置,默认1M)的内存来unroll 迭代器:
跟进acquireUnrollMemory可看见底层调用的就是前面所讲的向storage申请内存的方法acquireStorageMemory,若申请成功则将对应的onHeapUnrollMemoryMap加上申请到的内存,即unroll使用的内存。def reserveUnrollMemoryForThisTask( blockId: BlockId, memory: Long, memoryMode: MemoryMode): Boolean = { memoryManager.synchronized { val success = memoryManager.acquireUnrollMemory(blockId, memory, memoryMode) if (success) { val taskAttemptId = currentTaskAttemptId() val unrollMemoryMap = memoryMode match { case MemoryMode.ON_HEAP => onHeapUnrollMemoryMap case MemoryMode.OFF_HEAP => offHeapUnrollMemoryMap } unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory } success } }
- 若申请成功则跟新unrollMemoryUsedByThisBlock的值,即在该block上unroll使用的内存。
- 接着进行遍历,停止遍历的条件有两个,一是迭代器全部遍历完,二是没有申请到内存。
- 每迭代一条数据都会加到SizeTrackingVector类型的vector中(底层由数组实现),每迭代16次都会估算vector的大小是否超过了memoryThreshold(申请的内存)。
- 若超过了memoryThreshold,则会计算再次申请内存的大小,1.5倍当前vector大小-已经申请到的内存大小。
- 再次向Storage申请内存,若申请成功,则跟新unrollMemoryUsedByThisBlock,继续遍历进入下次循环,否则停止遍历。
- 循环结束后,若keepUnrolling 为 true,则说明values 一定被全部展开了;若为false,则没有全部被展开,说明没有申请到足够的内存来展开这个values,意味着该partition缓存到内存失败。
- 在values全部成功展开的前提下,会将vector构造成一个DeserializedMemoryEntry对象,其中包括数据的大小,接着会将展开后的数据大小和申请的内存大小作比较:
- 若申请的内存比数据小,则再次向storage申请对应的大小,申请成功则将unroll使用的内存转化到storage中去,转化对应的逻辑是:释放掉该Task占用的所有unroll内存,又向storage申请对应的内存,其实unroll内存就是storage内存,即操作的都是storage的内存,减去某值又加上某值,结果没有变,但流程还得这么走,因为为了将 MemoryStore 和 MemoryManager 的解耦。
- 若申请的内存比数据大,则释放掉对应的unroll内存,接着将unroll使用的内存转化到storage中去。
- 最后将blockId和对应的entry加入到memorySore所管理的entries中去。
缓存序列化rdd支持 ON_HEAP 和 OFF_HEAP,和缓存非序列化rdd的方式类似,只是以流的形式写到bytebuffer中,其中
MemoryMode 如果是 ON_HEAP,这里的 ByteBuffer 是 HeapByteBuffer(堆上内存);而如果是 OFF_HEAP,这里的 ByteBuffer 则是 DirectByteBuffer(指向的是堆外内存)。最后根据数据构建成SerializedMemoryEntry来保存在memoryStore的entries中。
shuffle中execution内存的使用
在shuffle write的时候,并不会直接将数据写到磁盘(详情请看Shuffle Write解析),而是先写到一个集合中,此集合占用的内存就是execution内存,初始给的大小是5M,可通过spark.shuffle.spill.initialMemoryThreshold
进行设置,每写一次数据就判断是否需要溢写到磁盘,溢写之前还尝试会向execution申请来避免溢写,代码如下:
protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
var shouldSpill = false
if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
// Claim up to double our current memory from the shuffle memory pool
val amountToRequest = 2 * currentMemory - myMemoryThreshold
val granted = acquireMemory(amountToRequest)
myMemoryThreshold += granted
// If we were granted too little memory to grow further (either tryToAcquire returned 0,
// or we already had more memory than myMemoryThreshold), spill the current collection
shouldSpill = currentMemory >= myMemoryThreshold
}
shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
// Actually spill
if (shouldSpill) {
_spillCount += 1
logSpillage(currentMemory)
spill(collection)
_elementsRead = 0
_memoryBytesSpilled += currentMemory
releaseMemory()
}
shouldSpill
}
当insert&update的次数是32的倍数且当前集合的大小已经大于等于了已经申请到的内存,此时会尝试向execution申请更多的内存来避免spill,申请的大小为2倍当前集合大小减去已经申请到的内存大小,跟进acquireMemory方法:
public long acquireMemory(long size) {
long granted = taskMemoryManager.acquireExecutionMemory(size, this);
used += granted;
return granted;
}
这不就是我们前面讲的向execution申请内存的方法吗,这里就不再叙述。