目录
前言
我们用两篇文章的时间搞清楚了Spark存储中的“块”到底是怎么一回事,接下来我们就可以放心来看Spark Core存储子系统的细节了。前面已经提到过,Spark会同时利用内存和外存,尤其是积极地利用内存作为存储媒介。这点与传统分布式计算框架(如Hadoop MapReduce)的“内存仅用于计算,外存仅用于存储”的方式是非常不同的,同时也是Spark高效设计哲学的体现。接下来一段时间内,我们先研究Spark存储中的内存部分,再研究磁盘(外存)部分。
虽然BlockManager是Spark存储子系统的司令官,但它并不会直接管理块,而会将对内存和外存的管理分别组织起来。与内存存储相关的组件包括内存池MemoryPool、内存管理器MemoryManager、内存存储器MemoryStore。本文先来探索内存池和内存管理器的大体实现。
内存池MemoryPool
MemoryPool抽象类从逻辑上非常松散地定义了Spark内存池的一些基本约定,其完整源码如下。
代码#23.1 - o.a.s.memory.MemoryPool抽象类
private[memory] abstract class MemoryPool(lock: Object) {
@GuardedBy("lock")
private[this] var _poolSize: Long = 0
final def poolSize: Long = lock.synchronized {
_poolSize
}
final def memoryFree: Long = lock.synchronized {
_poolSize - memoryUsed
}
final def incrementPoolSize(delta: Long): Unit = lock.synchronized {
require(delta >= 0)
_poolSize += delta
}
final def decrementPoolSize(delta: Long): Unit = lock.synchronized {
require(delta >= 0)
require(delta <= _poolSize)
require(_poolSize - delta >= memoryUsed)
_poolSize -= delta
}
def memoryUsed: Long
}
在构造MemoryPool时,需要传入一个锁对象lock用于线程同步,该lock实际上就是后面会讲到的内存管理器MemoryManager。MemoryPool中定义了以下方法。
- poolSize:获得内存池的大小,单位为字节。
- memoryUsed:获得内存池中已占用内存的大小。该方法未提供具体实现,需要子类实现。
- memoryFree:获得内存池中空闲内存的大小,就是上述poolSize减去memoryUsed。
- incrementPoolSize():扩展内存池delta个字节的大小。该方法不能被覆写。
- decrementPoolSize():压缩内存池delta个字节的大小。注意已占用的内存不能被压缩掉,并且该方法也不能被覆写。
以上所有方法(以及其实现类的大部分方法)都由MemoryManager保证线程安全性,防止多线程同时操作内存池,造成分配混乱。
MemoryPool有两个实现类:StorageMemoryPool与ExecutionMemoryPool。顾名思义,StorageMemoryPool用于存储,比如RDD数据、广播变量数据的缓存与分发;ExecutionMemoryPool用于执行,这包含Spark的计算(连接、聚合、排序等等)和Shuffle过程。ExecutionMemoryPool严格上来讲不属于存储子系统的组成部分,因此本文先来看StorageMemoryPool。
存储内存池StorageMemoryPool
构造与属性成员
代码#23.2 - o.a.s.memory.StorageMemoryPool类的构造与属性成员
private[memory] class StorageMemoryPool(
lock: Object,
memoryMode: MemoryMode
) extends MemoryPool(lock) with Logging {
private[this] val poolName: String = memoryMode match {
case MemoryMode.ON_HEAP => "on-heap storage"
case MemoryMode.OFF_HEAP => "off-heap storage"
}
@GuardedBy("lock")
private[this] var _memoryUsed: Long = 0L
override def memoryUsed: Long = lock.synchronized {
_memoryUsed
}
private var _memoryStore: MemoryStore = _
def memoryStore: MemoryStore = {
if (_memoryStore == null) {
throw new IllegalStateException("memory store not initialized yet")
}
_memoryStore
}
// ......
}
StorageMemoryPool的构造方法参数除了锁对象之外,还有一个MemoryMode,它表示该内存池会使用哪部分的内存,其定义如下。
代码#23.3 - o.a.s.memory.MemoryMode枚举
@Private
public enum MemoryMode {
ON_HEAP,
OFF_HEAP
}
其中,ON_HEAP表示使用堆内内存,即每个Executor JVM使用的那部分内存;OFF_HEAP表示使用堆外内存,即Worker节点上的本机内存(native memory),需要通过Unsafe API(讲解的文章见这里)来分配。
Spark堆内内存和堆外内存的关系如下面的简图所示。
根据MemoryMode的不同,使用堆内内存时池子的名称为on-heap storage
,使用堆外内存时池子的名称为off-heap storage
。
StorageMemoryPool使用私有变量_memoryUsed来记录使用了多少内存,并覆写memoryUsed这个Getter方法来返回之。另外,它还必须有与其关联的MemoryStore实例。MemoryStore真正地负责块在内存中的存取,下一篇文章就会讲解到它。下面来看StorageMemoryPool提供的方法。
申请内存
申请内存的逻辑由acquireMemory()方法来实现。
代码#23.4 - o.a.s.memory.StorageMemoryPool.acquireMemory()方法
def acquireMemory(blockId: BlockId, numBytes: Long): Boolean = lock.synchronized {
val numBytesToFree = math.max(0, numBytes - memoryFree)
acquireMemory(blockId, numBytes, numBytesToFree)
}
def acquireMemory(
blockId: BlockId,
numBytesToAcquire: Long,
numBytesToFree: Long): Boolean = lock.synchronized {
assert(numBytesToAcquire >= 0)
assert(numBytesToFree >= 0)
assert(memoryUsed <= poolSize)
if (numBytesToFree > 0) {
memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, memoryMode)
}
val enoughMemory = numBytesToAcquire <= memoryFree
if (enoughMemory) {
_memoryUsed += numBytesToAcquire
}
enoughMemory
}
这两个方法的执行流程是:ID为blockId的块需要申请numBytesToAcquire字节的内存,首先检查当前的剩余内存量memoryFree是否能满足分配,如果不能,就调用MemoryStore.evictBlocksToFreeSpace()方法,释放出(numBytes - memoryFree)大小的内存。然后再次检查剩余内存量是否能满足分配,如果够,就将占用内存量增加。最终返回是否分配成功。
可见,acquireMemory()虽然名义上为申请内存,但实际上没有什么真正的内存分配操作,更多的是检查与记录而已。这是因为在Java/Scala环境中,对象内存的实际分配和释放一般都是由JVM来管理的,Spark只需要跟踪好就可以了。这也符合其父类MemoryPool类注释中的描述:“Manages bookkeeping for an adjustable-sized region of memory”,其中bookkeeping一词的含义即为“记账”,ExecutionMemoryPool也是同理。至于堆外内存的实际分配,是由基于Unsafe API的MemoryAllocator/MemoryConsumer组件来实现,这就是后话了。
释放内存
释放内存的逻辑则由releaseMemory()和releaseAllMemory()方法来实现。
代码#23.5 - o.a.s.memory.StorageMemoryPool.releaseMemory()/releaseAllMemory()方法
def releaseMemory(size: Long): Unit = lock.synchronized {
if (size > _memoryUsed) {
logWarning(s"Attempted to release $size bytes of storage " +
s"memory when we only have ${_memoryUsed} bytes")
_memoryUsed = 0
} else {
_memoryUsed -= size
}
}
def releaseAllMemory(): Unit = lock.synchronized {
_memoryUsed = 0
}
这个实现非常简单,就是将memoryUsed减去要释放的量,或者直接设为0。
下面再来看一看内存管理器MemoryManager的部分细节,它直接管理着MemoryPool,是Spark作业运行时内存管理的统一入口。
内存管理器MemoryManager
MemoryManager与MemoryPool一样,也是一个抽象类。Spark环境中的每个JVM实例都会持有一个MemoryManager,先来看它的属性成员和构造方法。
构造与属性成员
代码#23.6 - o.a.s.memory.MemoryManager抽象类的属性成员和构造方法
private[spark] abstract class MemoryManager(
conf: SparkConf,
numCores: Int,
onHeapStorageMemory: Long,
onHeapExecutionMemory: Long) extends Logging {
@GuardedBy("this")
protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP)
@GuardedBy("this")
protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP)
@GuardedBy("this")
protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP)
@GuardedBy("this")
protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP)
protected[this] val maxOffHeapMemory = conf.get(MEMORY_OFFHEAP_SIZE)
protected[this] val offHeapStorageMemory =
(maxOffHeapMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong
onHeapStorageMemoryPool.incrementPoolSize(onHeapStorageMemory)
onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory)
offHeapExecutionMemoryPool.incrementPoolSize(maxOffHeapMemory - offHeapStorageMemory)
offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory)
// ......
}
MemoryManager需要4个构造方法参数:
- conf:即SparkConf;
- numCores:分配的CPU核心数;
- onHeapStorageMemory:用于存储的堆内内存的大小(字节);
- onHeapExecutionMemory:用于执行的堆内内存的大小(字节)。
MemoryManager初始化了4个内存池,分别是堆内、堆外的存储内存池,以及堆内、堆外的执行内存池。另外,堆外内存的最大值可以由配置项spark.memory.offHeap.size来指定,默认为0。堆外存储内存占总堆外内存的比例则由配置项spark.memory.storageFraction指定,默认0.5,即50%,剩下的就是堆外执行内存。这个参数在后面还会出现。
接下来对4个内存池分别调用其incrementPoolSize()方法,设定合适的容量,初始化完毕。
内存管理方法
MemoryManager中给出了一批内存管理方法的定义,这其中有些是抽象方法,需要其子类去实现。这些方法的清单如下。
代码#23.7 - o.a.s.memory.MemoryManager定义的内存管理方法
def acquireStorageMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
def acquireUnrollMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
private[memory]
def acquireExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Long
private[memory]
def releaseExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Unit = synchronized {
memoryMode match {
case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)
case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)
}
}
private[memory] def releaseAllExecutionMemoryForTask(taskAttemptId: Long): Long = synchronized {
onHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId) +
offHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId)
}
def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized {
memoryMode match {
case MemoryMode.ON_HEAP => onHeapStorageMemoryPool.releaseMemory(numBytes)
case MemoryMode.OFF_HEAP => offHeapStorageMemoryPool.releaseMemory(numBytes)
}
}
final def releaseAllStorageMemory(): Unit = synchronized {
onHeapStorageMemoryPool.releaseAllMemory()
offHeapStorageMemoryPool.releaseAllMemory()
}
final def releaseUnrollMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized {
releaseStorageMemory(numBytes, memoryMode)
}
可见,acquireStorageMemory()、acquireUnrollMemory()和acquireExecutionMemory()三个用于申请内存的方法都需要子类去实现。什么是Unroll内存呢?RDD在被缓存之前,它所占用的内存空间是不连续的,而被缓存到存储内存之后,就以块的形式来存储,占用连续的内存空间了。Unroll就是这个将RDD固化在连续内存空间的过程,中文一般翻译为“展开”。Unroll过程使用的内存空间就是展开内存,它本质上是存储内存中比较特殊的一部分。
各个释放内存的方法则基本上代理了MemoryPool对应的释放方法,比较容易理解。
除此之外,MemoryManager类还提供了Tungsten机制下的一些内存管理相关的属性。现在铺开讲它还为时过早,看官目前只需知道Tungsten是DataBricks在3~4年前提出的Spark优化方案即可,前文提到的堆外内存管理即属于Tungsten机制的一部分。早在这个系列开始之前,我曾经写过一篇关于Tungsten Sort Shuffle的简单解析,可以参考这里。
总结
本文通过引入对内存池MemoryPool的介绍,搞清楚了用于存储的内存池StorageMemoryPool的基本逻辑,另外还对内存及MemoryPool的管理器——MemoryManager进行了简要的分析。由于Spark内存管理这一块知识的交叉性比较强,代码不能拆分得很开,所以难免会出现一些未接触过的概念,在之后的源码阅读过程中自然会逐渐了解。
在下一篇文章中,我们会重点解析MemoryManager的两种实现,即静态内存管理器StaticMemoryManager、统一内存管理器UnifiedMemoryManager,从而深刻理解Spark Core的内存管理模型。它也是Spark作业内存调优的基础。