背景
在某地市开展项目的时候,发现数据采集,数据探索,预处理,数据统计,训练预测都需要很多资源,现场资源不够用。
目前该项目的资源3台旧的服务器,每台的资源 内存为128G,cores 为24 (core可暂时忽略,以下仅考虑内存即可) 。
案例分析
我们先对任务分别分析,然后分类。
数据采集基于DC,接的是Kafka的源,属于流式,常驻任务。kafka来新数据时才需要资源,空闲时可释放。目前占用的资源情况为:28( topic数)*2(线程数)*1G = 56G,且该值会随着带采集增量表数量的增加而增加。
数据探索主要是算法人员使用命令行或是使用智能融合平台的相关功能进行数据探索,属于临时任务,但会同时有多个并发,且使用的资源跟具体的要处理的数据量和业务有关,一般算法人员会将此值设置得很大。目前平台有两个Thrift server服务,都占用17G内存,共计34G。
预处理单个需要的资源为 4G*4+4G=20G,可并发执行,耗用的资源等于13G*并发数。
数据统计需要的资源为 5G*2+3G=13G,一般十分钟左右。
算法训练需要的资源为12G*3=36G,此为默认值,现场一般都会调得比较高。
算法预测需要的资源相对较少,此处先忽略。
可以看到任务大体分为以下几类:
常驻任务。此类任务一般在空闲时不需要资源,这是典型的动态资源使用场景。如:流式采集、Thrift Server、算法训练平台的预提交任务等。
-
临时任务。此类任务又分为两种:
- 单一任务,且与数据量基本固定。此时需要的资源可以固化下来。另外对优先级极高的应用也可归为此类。
- 单一任务,但与数据量相关。如每次面对的数据量不同,典型的应用是统计任务,数据量在不断增多,且每天的增量不固定,此时可以使用动态资源
- 多个任务。此类任务一般是面临的场景完全未知,比如说预处理任务、训练任务。我们不清楚任务的内部详情,完全无法准确预估资源,只能设置最大值或是每次提交任务都单独设置。这其实要求用户有任务调优经验,对用户的要求较高。
-
定时任务。此类任务与临时任务类似,只是加上了简单调度功能。如数据统计。
通过分析可以知道,很多Spark应用都是需要动态资源分配的,很多用户通过UI经常触发的任务也可使用动态资源规划在不损失更多资源的情况下变成常驻任务来提高响应速度。
动态资源分配机制
Spark提供了基于应用工作负载来动态分配资源的机制,这意味着应用可以根据需要想资源管理器(比如说YARN)释放资源和再请求资源。如果多个应用共享资源的话,这个特性是非常有用的。
需要首先说明的是,这套机制的基本单元是Executor,类似于其它产品中的Slot,这里的单个Executor的资源可通过 spark.executor.memory
和 spark.executor.cores
分配配置其占用的内存及核数。
由于无法确切地知道什么时候需要请求Executor和移除Executor,Spark制定了一套请求和移除的机制。
请求机制。如果查看队列中有挂起的任务,且挂起的时间超过
spark.dynamicAllocation.schedulerBacklogTimeout
秒,则请求Executor,按轮次请求,每轮按指数增加,如:1, 2, 4, 8 ……-
移除机制。如果一个Executor空闲时间超过
spark.dynamicAllocation.executorIdleTimeout
秒,则移除。需要注意的是,在大多数场景下,这个与请求机制是互斥的,也就是说,如果还有挂起的任务,那就不应该释放资源。满足移除机制,还有两个细节需要处理才能移除Executor。
给其他Executor提供shuffle数据服务。Spark系统在运行含shuffle过程的应用时,Executor进程除了运行task,还要负责写shuffle数据,给其他Executor提供shuffle数据。当Executor进程任务过重,导致GC而不能为其他Executor提供shuffle数据时,会影响任务运行。External shuffle Service是长期存在于NodeManager进程中的一个辅助服务。通过该服务来抓取shuffle数据,减少了Executor的压力,在Executor GC的时候也不会影响其他Executor的任务运行。我们可以在Executor完成后就移除它,由External shuffle Service给其他Executor继续提供shuffle数据服务。
缓存数据。写shuffle文件的时候,Executor也会缓存数据到磁盘或内存中,一旦Executor移除,这部分数据也会不可访问,因此只要有缓存数据,Executor就不会被移除。设置
spark.dynamicAllocation.cachedExecutorIdleTimeout
可在即使有缓存数据的情况下也能在超时的时候移除Executor,该值默认为无线大。后续这个可能会被优化,类似于使用External shuffle Service。
动态资源分配配置
配置External shuffle Service
修改hadoop-env.sh ,将spark-2.1.0-yarn-shuffle.jar添加到classpath
HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/usr/lib/LOCALCLUSTER/spark/yarn/spark-2.1.0-yarn-shuffle.jar
其中/usr/lib/LOCALCLUSTER/spark/
为Spark home目录-
修改yarn-site.xml ,关注如下标粗内容:
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle,spark_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
<value>org.apache.spark.network.yarn.YarnShuffleService</value>
</property> 修改
yarn-env.sh
中的YARN_HEAPSIZE
变量,默认值为1000(Mb)。提升这个变量的值可以避免shuffle时的GC问题。重启所有节点的nodemanager服务
重启相关需要动态资源分配的服务
配置Spark程序
此处仅列出最常用的参数,具体见附录一
属性 | 默认值 | 说明 |
---|---|---|
spark.executor.memory | 1G | 单个executor的内存,推荐1G,由于动态资源分配是基于Executor的,单个Executor的内存不宜过大。 |
spark.yarn.executor.memoryOverhead或spark.executor.memoryOverhead | executorMemory * 0.10, with minimum of 384 | 分配给单个executor的堆外内存 ,一个Executor可用的内存为 spark.executor.memory + spark.executor.memoryOverhead |
spark.executor.cores | 1 | 单个executor可用核数,与可并行执行的任务数相关,多个任务共享spark.executor.memory ,增大可提供并行度,也会加大OOM的风险 |
spark.dynamicAllocation.enabled | false | 启用动态资源分配,必须设置为true |
spark.shuffle.service.enabled | false | 启用外部shuffle服务,必须设置为true |
spark.dynamicAllocation.minExecutors | 0 | 最小可用cores,建议设置成1 |
spark.dynamicAllocation.maxExecutors | infinity | 最大可用cores,必须设置 |
一个实际的应用如下:
/usr/lib/LOCALCLUSTER/SERVICE-SPARK-retro/sbin/start-thriftserver.sh \
--name "Awaken Insight Thrift Server" \
--master yarn-client --queue applications-retro \
--conf spark.driver.memory=10g
--conf spark.yarn.executor.memoryOverhead=2048
--conf spark.eventLog.enabled=false \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.shuffle.service.enabled=true \
--conf spark.dynamicAllocation.minExecutors=1 \
--conf spark.dynamicAllocation.maxExecutors=12 \
--conf spark.executor.memory=1g \
--conf spark.executor.cores=1
可在spark-default.conf 下配置类似参数,可对所有应用生效 。(不推荐)
更好地使用动态资源分配
由于动态资源分配思想其实是建议将单个Executor的资源设置一个比较小的值,如1G。而实际上此值一般设置得比较大,主要是为了防止OOM。那为了更好地使用动态资源分配,必须解决此问题。
Executor OOM类错误 (错误代码 137、143等)一般是由于堆内存 已达上限,Task需要更多的内存,而又得不到足够的内存而导致。因此,解决方案要从增加每个Task的内存使用量,满足任务需求 或 降低单个Task的内存消耗量,从而使现有内存可以满足任务运行需求两个角度出发。因此:
- 增加单个task的内存使用量
- 增加
spark.executor.memory
,使每个Task可使用内存增加。 - 降低Executor的可用Core的数量
spark.executor.cores
, 使Executor中同时运行的任务数减少,在总资源不变的情况下,使每个Task获得的内存相对增加。
- 降低单个Task的内存消耗量
降低单个Task的内存消耗量可从配制方式和调整应用逻辑两个层面进行优化:
-
配置方式
减少每个Task处理的数据量,可降低Task的内存开销,在Spark中,每个partition对应一个处理任务Task, 因此,在数据总量一定的前提下,可以通过增加partition数量的方式来减少每个Task处理的数据量,从而降低Task的内存开销。针对不同的Spark应用类型,存在不同的partition调整参数如下:
- P = spark.default.parallism (非SQL应用) 有父RDD的,以他们的partition数为主,没有的(如parallelize)取决于所有numExcutors*executorCore,最小为2.
- P = spark.sql.shuffle.partition (SQL 应用) 默认值200
通过增加P的值,可在一定程度上使Task现有内存满足任务运行
注: 当调整一个参数不能解决问题时,上述方案应进行协同调整
-
调整应用逻辑
Executor OOM 一般发生Shuffle阶段,该阶段需求计算内存较大,且应用逻辑对内存需求有较大影响,下面举例就行说明:-
选择合适的算子。 如:groupByKey 转换为 reduceByKey
一般情况下,groupByKey能实现的功能使用reduceByKey均可实现,而ReduceByKey存在Map端的合并,可以有效减少传输带宽占用及Reduce端内存消耗。
-
数据倾斜预处理
数据倾斜是指任务间处理的数据量存大较大的差异。
如左图所示,key 为010的数据较多,当发生shuffle时,010所在分区存在大量数据,不仅拖慢Job执行(Job的执行时间由最后完成的任务决定)。 而且导致010对应Task内存消耗过多,可能导致OOM. 而右图,经过预处理(加盐,此处仅为举例说明问题,解决方法不限于此)可以有效减少数据倾斜导致 的问题
-
注:上述举例仅为说明调整应用逻辑可以在一定程序上解决OOM问题,解决方法不限于上述举例
动态资源分配效果
本文主要针对 1051847284
条过车记录(约10亿)进行如下操作,分别记录时间。
类型 | SQL |
---|---|
count | select count(1) from sparta_pass_di |
全局排序 | select * from sparta_pass_di order by passTime desc limit 10 |
聚合排序 | select plateNo, count(1) as cnt from sparta_pass_di group by plateNo order by cnt desc limit 10; |
过滤查询 | select * from sparta_pass_di where plateNo = '粤GU54MX' limit 10; |
Thrift server V.S. spark sql
以融合平台的Thrift Server为例,先简单对比Thrift server与spark sql之间的性能差异,如下图结果可知,总资源一致的情况下基本没有太大差异:
sql/命令/时间 | spark-sql --master yarn-client --driver-memory 10G --driver-cores 1 --executor-memory 6G --executor-cores 1 --num-executors 2 --conf spark.sql.shuffle.partition=500 | start-thriftserver.sh --master yarn-client --driver-memory 10G --num-executors 2 --conf spark.driver.memory=10g --executor-memory 6g --conf spark.sql.shuffle.partition=500 |
---|---|---|
select count(1) from sparta_pass_di | 6 s | 7 s |
select * from sparta_pass_di order by passTime desc limit 10 | 21 min | 20 min |
select plateNo, count(1) as cnt from sparta_pass_di group by plateNo order by cnt desc limit 10; | 2.0 min | 1.4 min |
select * from sparta_pass_di where plateNo = '粤GU54MX' limit 10; | 0.4 s | 0.3 s |
静态资源分配 V.S. 动态资源分配
在上述前提下,对比静态资源分配和动态资源分配之间的差异,可以看到在明显耗时的全局排序耗时明显更短,其余性能差距不大,但是空闲资源会被释放。
sql/命令/时间 | spark-sql --driver-memory 10G --driver-cores 1 --executor-memory 1G --executor-cores 1 --num-executors 12 --conf spark.sql.shuffle.partition=500 | start-thriftserver.sh --driver-memory 10G --driver-cores 1 --conf spark.dynamicAllocation.enabled=true --conf spark.shuffle.service.enabled=true --conf spark.dynamicAllocation.minExecutors=1 --conf spark.dynamicAllocation.maxExecutors=12 --conf spark.executor.memory=1g --conf spark.executor.cores=1 --conf spark.sql.shuffle.partition=500 |
---|---|---|
select count(1) from sparta_pass_di | 8 s | 10 s |
select * from sparta_pass_di order by passTime desc limit 10 | 11 min | 4.1 min |
select plateNo, count(1) as cnt from sparta_pass_di group by plateNo order by cnt desc limit 10; | 57 s | 51 s |
select * from sparta_pass_di where plateNo = '粤GU54MX' limit 10; | 0.5 s | 0.5 s |
shuffle.partition 多 V.S. 少
spark.sql.shuffle.partition
的默认值为200,增加spark.sql.shuffle.partition
到500,没有看到明显的性能提升
sql/命令/时间 | spark-sql \ --master yarn-client \ --driver-memory 10G \ --driver-cores 1 \ --executor-memory 6G \ --executor-cores 1 \ --num-executors 2 \ | spark-sql \ --master yarn-client \ --driver-memory 10G \ --driver-cores 1 \ --executor-memory 6G \ --executor-cores 1 \ --num-executors 2 \ --conf spark.sql.shuffle.partition=500 |
---|---|---|
select count(1) from sparta_pass_di | 9 s | 6 s |
select * from sparta_pass_di order by passTime desc limit 10 | 23 min | 21 min |
select plateNo, count(1) as cnt from sparta_pass_di group by plateNo order by cnt desc limit 10; | 1.6 min | 2.0 min |
select * from sparta_pass_di where plateNo = '粤GU54MX' limit 10; | 0.3 s | 0.4 s |
阶段性总结一下,动态资源分配在简单任务性能与静态资源分配差不多,在复杂任务中性能提升较多,可能是由于常驻的外部shuffle服务带来的性能提升,需要进一步测试验证。而由于实际应用中executor内存都分配得较大,总资源一定的情况下,使得任务的并行度较小,任务执行更慢(21 min V.S. 4.1 min)。通过增加spark.sql.shuffle.partition
来提升任务并行度,没有看到明显的性能提升。
可能约束
- 流式采集使用DC,是否可用动态分配。当前DC并未使用Spark,动态资源分配可能需要额外的开发。
- 算法引擎的资源管控可能会失效。由于资源未知,只设定了范围([最小值,最大值]),那资源管控到底以哪一个为主?
总结
本案例针对现场出现的资源不足问题做了分析,对任务进行了分类,然后引入动态分配机制,对融合平台的Thrift Server做了几组测试,可以看到动态资源分配优势较大,建议推广。
参考文献
- https://spark.apache.org/docs/latest/job-scheduling.html
- https://spark.apache.org/docs/latest/running-on-yarn.html#configuring-the-external-shuffle-service
- https://dzone.com/articles/spark-dynamic-allocation
- https://www.jianshu.com/p/10e91ace3378
附录一、动态资源分配参数说明
Dynamic Allocation (https://spark.apache.org/docs/latest/configuration.html#dynamic-allocation)
Property Name | Default | Meaning |
---|---|---|
spark.dynamicAllocation.enabled |
false | Whether to use dynamic resource allocation, which scales the number of executors registered with this application up and down based on the workload. For more detail, see the description here. This requires spark.shuffle.service.enabled to be set. The following configurations are also relevant: spark.dynamicAllocation.minExecutors , spark.dynamicAllocation.maxExecutors , and spark.dynamicAllocation.initialExecutors spark.dynamicAllocation.executorAllocationRatio
|
spark.dynamicAllocation.executorIdleTimeout |
60s | If dynamic allocation is enabled and an executor has been idle for more than this duration, the executor will be removed. For more detail, see this description. |
spark.dynamicAllocation.cachedExecutorIdleTimeout |
infinity | If dynamic allocation is enabled and an executor which has cached data blocks has been idle for more than this duration, the executor will be removed. For more details, see this description. |
spark.dynamicAllocation.initialExecutors |
spark.dynamicAllocation.minExecutors |
Initial number of executors to run if dynamic allocation is enabled. If --num-executors (or spark.executor.instances ) is set and larger than this value, it will be used as the initial number of executors. |
spark.dynamicAllocation.maxExecutors |
infinity | Upper bound for the number of executors if dynamic allocation is enabled. |
spark.dynamicAllocation.minExecutors |
0 | Lower bound for the number of executors if dynamic allocation is enabled. |
spark.dynamicAllocation.executorAllocationRatio |
1 | By default, the dynamic allocation will request enough executors to maximize the parallelism according to the number of tasks to process. While this minimizes the latency of the job, with small tasks this setting can waste a lot of resources due to executor allocation overhead, as some executor might not even do any work. This setting allows to set a ratio that will be used to reduce the number of executors w.r.t. full parallelism. Defaults to 1.0 to give maximum parallelism. 0.5 will divide the target number of executors by 2 The target number of executors computed by the dynamicAllocation can still be overridden by the spark.dynamicAllocation.minExecutors and spark.dynamicAllocation.maxExecutors settings |
spark.dynamicAllocation.schedulerBacklogTimeout |
1s | If dynamic allocation is enabled and there have been pending tasks backlogged for more than this duration, new executors will be requested. For more detail, see this description. |
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout |
schedulerBacklogTimeout |
Same as spark.dynamicAllocation.schedulerBacklogTimeout , but used only for subsequent executor requests. For more detail, see this description. |
本文由博客一文多发平台 OpenWrite 发布!