在这行Spark的应用程序时,Spark集群会启动Driver和Executor两种JVM线程,前者为主控进程,负责创建Spark上下文,提交Spark作业(Job),并将作业转化为计算任务(Task),在各个Executor进程间协调任务的调度,后者负责在工作节点上执行具体的计算任务,并将结果返回给Driver,同时为需要持久化的RDD提供存储功能。由于Driver的内存管理相对来说较为简单,本文主要对Executor的内存的管理进行分析,上下文中的Spark内存均特指Executor的内存。
堆内和堆外内存规划
作为一个JVM进程,Executor的内存管理建立在JVM的内存管理之上,Spark对JVM的堆内(On-heap)空间进行了更为详细的分配,以充分利用内存。同时,Spark引入对外(Off-heap),使之可以直接在工作节点的系统内存中开辟空间,进一步优化内存的使用。
1.1堆内内存
堆内内存的大小,由Spark应用程序启动时spark.executor.memory参数配置。Executor内存的并发任务共享JVM堆内内存,这些任务在缓存RDD数据和广播(Broadcast)数据时占用的内存被规划为存储(Storage)内存,而这些任务在执行Shuffle时占用的内存被规划为执行(Execution)内存,剩余的部分不做特殊规划,那些Spark内部的对象实例,或者用户定义的Spark应用程序中的对象实例,均占用剩余的空间,不同的管理模式下,这三部分占用的空间大小各不同。
Spark对堆内存的管理是一种逻辑上的规划式的管理,因为对象实例占用内存的申请和释放都是由JVM完成的,Spark只能在申请和释放前记录这些内存。
- 申请内存:
- Spark在代码中new一个对象实例
- JVM从堆内内存分配空间,创建对象并返回对象引用
- Spark保存该对象的引用,记录该对象占用的内存
- 释放内存:
- Spark记录该对象释放的内存,删除该对象的引用
- 等待JVM的垃圾回收机制释放该对象占用的堆内内存
我们知道,JVM的对象可以以序列化的方式存储,序列化的过程是将对象转换为二进制字节流,本质上可以理解为将非连续空间的链式存储转化为连续空间或块存储,在访问时则需要进行反序列化。对于Spark中序列化的对象是字节流形式的,其占用的内存大小可直接计算,而对于非序列化的对象,其占用的内存是通过周期性的采样近似估算而得。此外,在被Spark标记为释放的对象实例,很有可能在实际上并没有被JVM回收。导致实际可用的内存小于Spark记录的可用内存,从而无法完全避免内存溢出(OOM)的异常。
1.2堆外内存
为了进一步优化内存的使用以及提高Shuffle时排序的效率,Spark引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,存储经过序列化的二进制数据。Spark可以直接操作系统堆外内存,减少了不必要的内存开销,以及频繁的GC扫描和回收,提升了处理性能。堆外内存可以精确的申请和释放,而且序列化的数据占用空间可以被精确计算,所以相比与堆内内存来说降低了管理的难度,也降低了误差。
在默认情况下堆外内存并不启用,可以通过配置spark.memory.offHeap.enabled参数启用,并由spark.memory.offHeap
.size参数设定堆外空间的大小。除了没有other空间,堆外内存和堆内内存的划分方式相同,所有运行中的并发任务共享存储内存和执行内存。
内存空间分配
2.1 静态内存管理
在Spark最初采用的静态内存管理机制下,存储内存、执行内存和其他内存的大小在Spark引用程序运行期间均为固定的,但用户可以引用程序启动前进行配置
可用的存储内存 = systemMaxMemory * spark.storge.memoryFraction * spark.storage.safetyFraction
可用的执行内存 = systemMaxMemory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction
其中这个预留的保险区域仅仅是一种逻辑上的规划,在具体使用时Spark并没有区别对待,和其他内存
一样交给了JVM去管理。
堆外的内存分配较为简单,只有存储内存和执行内存,由参数spark.memory.storageFraction决定,由于堆外内存占用空间可以被精确计算,所以无需再设定保险区域。
2.2 统一内存管理
Spark1.6之后引入的统一内存管理机制,与静态内存管理的区别在于存储内存和执行内存共享同一块空间,可以动态占用对象的空间区域
其中重要的优化在于动态占用机制,其规则如下:
- 设定基本的存储内存和执行内存区域(spark.storage.storageFraction参数),该设定确定了双方各自拥有的空间的范围
- 双方的空间都不足时,则存储到硬盘;若己方空间不足而对方空余时,可借用对方的空间
- 执行内存的空间被对方占用,可以让对方占用部分转存到硬盘,然后归还借用空间
-
存储内存的空间被对方占用后,无法让对方归还,需要考虑shuffle过程的很多因素实现起来较为复杂
凭借统一内存管理机制,Spark在一定程度上提高了堆内和堆外内存资源的利用率,降低了开发者维护Spark内存的难度,但并不意味着开发者可以高枕无忧,所以如果存储内存的空间太大或者说缓存的数据过多,反而会导致频繁的全量垃圾回收。
3. 存储内存管理
3.1 RDD的持久化机制
RDD作为Spark最根本的数据抽象,是只读的分区记录(Partition)的集合,只能基于在稳定物理存储中的数据集上创建,或者由其他已有的RDD上执行转换操作产生一个新的RDD。转化后的RDD与已有的RDD之间产生依赖关系,构成了血统(Lineage)。凭借血统Spark保证了每一个RDD都可以被重新恢复。
Task在启动之初读取一个分区时,会先判断这个分区是否已经被持久化,如果没有则需要检查Checkpoint或者按照血统重新计算。所以如果一个 RDD 上要执行多次行动,可以在第一次行动中使用 persist 或 cache 方法,在内存或磁盘中持久化或缓存这个 RDD,从而在后面的行动时提升计算速度。事实上,cache 方法是使用默认的 MEMORY_ONLY 的存储级别将 RDD 持久化到内存,故缓存是一种特殊的持久化。 堆内和堆外存储内存的设计,便可以对缓存 RDD 时使用的内存做统一的规划和管理。
RDD的持久化由Spark的Storage模块负责,实现了RDD与物理存储的解耦。Storage模块负责管理Spark在计算过程中产生的数据,将那些在内存或磁盘、在本地或者远程存储数据的功能封装了起来。在具体实现时Driver端和Executor端的Storage模块构成了主从的架构,即Driver端BlackManager为Master,Executor端的BlockManager为Slave。Storage模块在逻辑上以Block为基本存储单位,RDD的每个Partition经过处理后唯一对应一个Block的(BlockId的格式为rdd_RDD-ID_PARTITION-ID)。Master负责整个Spark应用程序的Block的元数据信息的管理和维护。而Slave需要将Block的更新状态上报到Master,同时接收Master的命令,例如新增或删除一个RDD。
3.2 RDD缓存的过程
RDD在缓存到存储内存之前,Partition中的数据一般以迭代器(Iterator)的数据结构来访问。通过迭代器可以获取分区中每一条序列化或者非序列化的数据项(Record),这些Record的对象实例在逻辑上占用了JVM堆内内存的other部分的空间,同一Partition的不同Record的空间并不连续。
RDD在缓存到存储内存之后,Partition被转换成Block,Record在堆内或堆外存储内存中占用一块连续的空间。将Parititon由不连续的存储空间转换为连续存储空间的过程,Spark称之为展开(Unroll)。Block有序列化和非序列化两种存储格式,具体以哪中方式取决与该RDD的存储级别。每个Executor的Storage模块用一个链式Map结构(LinkedHashMap)来管理堆内和堆外存储内存中的所有Block对象的实例,对于这个LinkedHashMap新增和删除简介记录了内存的申请和释放。
因为不能保证存储空间可以一次容纳Iterator中的所有数据,当前的计算任务在Unroll时要向MemeoryManager申请足够的Unroll空间来临时占位,空间不足则Unroll失败,空间足够时可以继续进行。对于序列化的Partition,其所需的Unroll空间可以直接累加计算,一次申请。而非序列化的Partition则要在遍历Record过程中依次申请,即每读取一条Record,采样估算其所需的Unroll空间进行申请,空间不足时可以中断,释放已占用的Unroll空间。如果最终Unroll成功,当前Partition所占用的Unroll空间被转换为正常缓存RDD的存储空间。
3.3 淘汰和落盘
由于同一个Executor的所有的计算任务共享有限的存储内存空间,当有新的Block需要缓存但是剩余空间不足且无法动态占用时,就要对LinkedHashMap中的旧Block进行淘汰(Eviction),而被淘汰的Block如果其存储级别中同时包含存储到磁盘的要求,则要对其进行落盘(DROP),否则直接删除该Block。
存储内存的淘汰规则为:
- 被淘汰的Block要与新Block的MemoryMode相同,即同属于堆外或者堆内内存
- 新旧Block不能属于同一个RDD,避免循环淘汰
- 就Block所属RDD不能处于被读状态,避免引发一致性问题
- 遍历LinkedHashMap中Block,按照最近最少使用(LRU)的顺序淘汰,直到满足新Block所需空间。其中LRU是LinkedHashMap的特性。
4. 执行内存管理
4.1 多任务间内存分配
Executor内运行的任务同样共享执行内存,Spark用一个HashMap结构保存了任务到内存耗费的影响。每个任务可占用的执行内存大小的范围为1/2N~1/N, 其中N为当前Executor内正在运行的任务个数。每个任务在启动之时,要向MemoryManager请求申请最少为1/2N的执行内存,如果不能满足要求则该任务被阻塞,直到有其他任务释放了足够的执行内存,该任务才可以被唤醒。
4.2 Shuffle的内存占用
执行内存主要用来存储任务在执行Shuffle时占用的内存,Shuffle是按照一定规则对RDD数据重新分区的过程,我们来看Shuffle的Write和Read两个阶段对执行内存的使用:
- Shuffle Write
- 若在map端选择普通的排序方式,会常用ExternalSorter进行排序,在内存中存储数据时主要占用对内执行空间。
- 若在map端选择Tungsten的排序方式,则采用ShuffleExternalSorter直接以序列化形式存储的数据排序,在内存中存储数据时可以占用堆外或堆内执行空间,取决于用户是否开启了堆外内存以及堆外执行内存是否足够。
- Shuffle Read
- 在对reduce端的数据进行聚合时,要将数据交给Aggregator处理,在内存中存储数据时占用堆内执行空间。
- 如果需要进行最终结果排序,则要再次将数据交给ExternalSorter处理,占用堆内执行空间。
在ExternalSorter和Aggreator中,Spark会使用一种叫AppendOnlyMap的哈希表在堆内执行内存中存储数据,但在Shuffle过程中所有数据并不能都保存该Hash表中,当这个Hash表占用的内存会进行周期性采样,当其大到一定程度,无法再从MemoryManager申请到新的执行内存时,Spark就会将其全部内容存储到磁盘文件中,这个过程被称为溢存(Spill),溢存到磁盘的文件最后被归并(Merge)。