利用动态资源分配优化Spark应用资源利用率

背景

在某地市开展项目的时候,发现数据采集,数据探索,预处理,数据统计,训练预测都需要很多资源,现场资源不够用。
目前该项目的资源3台旧的服务器,每台的资源 内存为128G,cores 为24 (core可暂时忽略,以下仅考虑内存即可) 。

案例分析

我们先对任务分别分析,然后分类。

数据采集基于DC,接的是Kafka的源,属于流式,常驻任务。kafka来新数据时才需要资源,空闲时可释放。目前占用的资源情况为:28( topic数)*2(线程数)*1G = 56G,且该值会随着带采集增量表数量的增加而增加。

数据探索主要是算法人员使用命令行或是使用智能融合平台的相关功能进行数据探索,属于临时任务,但会同时有多个并发,且使用的资源跟具体的要处理的数据量和业务有关,一般算法人员会将此值设置得很大。目前平台有两个Thrift server服务,都占用17G内存,共计34G。

​ 预处理单个需要的资源为 4G*4+4G=20G,可并发执行,耗用的资源等于13G*并发数。

​ 数据统计需要的资源为 5G*2+3G=13G,一般十分钟左右。

​ 算法训练需要的资源为12G*3=36G,此为默认值,现场一般都会调得比较高。

​ 算法预测需要的资源相对较少,此处先忽略。

可以看到任务大体分为以下几类:

  1. 常驻任务。此类任务一般在空闲时不需要资源,这是典型的动态资源使用场景。如:流式采集、Thrift Server、算法训练平台的预提交任务等。

  2. 临时任务。此类任务又分为两种:

    • 单一任务,且与数据量基本固定。此时需要的资源可以固化下来。另外对优先级极高的应用也可归为此类。
    • 单一任务,但与数据量相关。如每次面对的数据量不同,典型的应用是统计任务,数据量在不断增多,且每天的增量不固定,此时可以使用动态资源
    • 多个任务。此类任务一般是面临的场景完全未知,比如说预处理任务、训练任务。我们不清楚任务的内部详情,完全无法准确预估资源,只能设置最大值或是每次提交任务都单独设置。这其实要求用户有任务调优经验,对用户的要求较高。
  3. 定时任务。此类任务与临时任务类似,只是加上了简单调度功能。如数据统计。

    通过分析可以知道,很多Spark应用都是需要动态资源分配的,很多用户通过UI经常触发的任务也可使用动态资源规划在不损失更多资源的情况下变成常驻任务来提高响应速度。

动态资源分配机制

Spark提供了基于应用工作负载来动态分配资源的机制,这意味着应用可以根据需要想资源管理器(比如说YARN)释放资源和再请求资源。如果多个应用共享资源的话,这个特性是非常有用的。

需要首先说明的是,这套机制的基本单元是Executor,类似于其它产品中的Slot,这里的单个Executor的资源可通过 spark.executor.memoryspark.executor.cores分配配置其占用的内存及核数。

由于无法确切地知道什么时候需要请求Executor和移除Executor,Spark制定了一套请求和移除的机制。

  • 请求机制。如果查看队列中有挂起的任务,且挂起的时间超过spark.dynamicAllocation.schedulerBacklogTimeout 秒,则请求Executor,按轮次请求,每轮按指数增加,如:1, 2, 4, 8 ……

  • 移除机制。如果一个Executor空闲时间超过spark.dynamicAllocation.executorIdleTimeout秒,则移除。需要注意的是,在大多数场景下,这个与请求机制是互斥的,也就是说,如果还有挂起的任务,那就不应该释放资源。

    满足移除机制,还有两个细节需要处理才能移除Executor。

  • 给其他Executor提供shuffle数据服务。Spark系统在运行含shuffle过程的应用时,Executor进程除了运行task,还要负责写shuffle数据,给其他Executor提供shuffle数据。当Executor进程任务过重,导致GC而不能为其他Executor提供shuffle数据时,会影响任务运行。External shuffle Service是长期存在于NodeManager进程中的一个辅助服务。通过该服务来抓取shuffle数据,减少了Executor的压力,在Executor GC的时候也不会影响其他Executor的任务运行。我们可以在Executor完成后就移除它,由External shuffle Service给其他Executor继续提供shuffle数据服务。

  • 缓存数据。写shuffle文件的时候,Executor也会缓存数据到磁盘或内存中,一旦Executor移除,这部分数据也会不可访问,因此只要有缓存数据,Executor就不会被移除。设置spark.dynamicAllocation.cachedExecutorIdleTimeout可在即使有缓存数据的情况下也能在超时的时候移除Executor,该值默认为无线大。后续这个可能会被优化,类似于使用External shuffle Service。

动态资源分配配置

配置External shuffle Service

  1. 修改hadoop-env.sh ,将spark-2.1.0-yarn-shuffle.jar添加到classpath
    HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/usr/lib/LOCALCLUSTER/spark/yarn/spark-2.1.0-yarn-shuffle.jar
    其中/usr/lib/LOCALCLUSTER/spark/为Spark home目录

  2. 修改yarn-site.xml ,关注如下标粗内容:

    <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle,spark_shuffle</value>
    </property>
    <property>
    <name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
    <value>org.apache.spark.network.yarn.YarnShuffleService</value>
    </property>

  3. 修改yarn-env.sh中的YARN_HEAPSIZE变量,默认值为1000(Mb)。提升这个变量的值可以避免shuffle时的GC问题。

  4. 重启所有节点的nodemanager服务

  5. 重启相关需要动态资源分配的服务

配置Spark程序

此处仅列出最常用的参数,具体见附录一

属性 默认值 说明
spark.executor.memory 1G 单个executor的内存,推荐1G,由于动态资源分配是基于Executor的,单个Executor的内存不宜过大。
spark.yarn.executor.memoryOverhead或spark.executor.memoryOverhead executorMemory * 0.10, with minimum of 384 分配给单个executor的堆外内存 ,一个Executor可用的内存为 spark.executor.memory + spark.executor.memoryOverhead
spark.executor.cores 1 单个executor可用核数,与可并行执行的任务数相关,多个任务共享spark.executor.memory,增大可提供并行度,也会加大OOM的风险
spark.dynamicAllocation.enabled false 启用动态资源分配,必须设置为true
spark.shuffle.service.enabled false 启用外部shuffle服务,必须设置为true
spark.dynamicAllocation.minExecutors 0 最小可用cores,建议设置成1
spark.dynamicAllocation.maxExecutors infinity 最大可用cores,必须设置

一个实际的应用如下:

  /usr/lib/LOCALCLUSTER/SERVICE-SPARK-retro/sbin/start-thriftserver.sh  \
          --name "Awaken Insight Thrift Server" \
          --master yarn-client --queue applications-retro   \
          --conf spark.driver.memory=10g  
          --conf spark.yarn.executor.memoryOverhead=2048  
          --conf spark.eventLog.enabled=false \
          --conf spark.dynamicAllocation.enabled=true       \
          --conf spark.shuffle.service.enabled=true       \
          --conf spark.dynamicAllocation.minExecutors=1         \
          --conf spark.dynamicAllocation.maxExecutors=12         \
          --conf spark.executor.memory=1g       \
          --conf spark.executor.cores=1

可在spark-default.conf 下配置类似参数,可对所有应用生效 。(不推荐)

更好地使用动态资源分配

由于动态资源分配思想其实是建议将单个Executor的资源设置一个比较小的值,如1G。而实际上此值一般设置得比较大,主要是为了防止OOM。那为了更好地使用动态资源分配,必须解决此问题。

Executor OOM类错误 (错误代码 137、143等)一般是由于堆内存 已达上限,Task需要更多的内存,而又得不到足够的内存而导致。因此,解决方案要从增加每个Task的内存使用量,满足任务需求 或 降低单个Task的内存消耗量,从而使现有内存可以满足任务运行需求两个角度出发。因此:

  1. 增加单个task的内存使用量
  • 增加spark.executor.memory,使每个Task可使用内存增加。
  • 降低Executor的可用Core的数量 spark.executor.cores , 使Executor中同时运行的任务数减少,在总资源不变的情况下,使每个Task获得的内存相对增加。
  1. 降低单个Task的内存消耗量
    降低单个Task的内存消耗量可从配制方式和调整应用逻辑两个层面进行优化:
  • 配置方式

    减少每个Task处理的数据量,可降低Task的内存开销,在Spark中,每个partition对应一个处理任务Task, 因此,在数据总量一定的前提下,可以通过增加partition数量的方式来减少每个Task处理的数据量,从而降低Task的内存开销。针对不同的Spark应用类型,存在不同的partition调整参数如下:

    • P = spark.default.parallism (非SQL应用) 有父RDD的,以他们的partition数为主,没有的(如parallelize)取决于所有numExcutors*executorCore,最小为2.
    • P = spark.sql.shuffle.partition (SQL 应用) 默认值200
      通过增加P的值,可在一定程度上使Task现有内存满足任务运行
      注: 当调整一个参数不能解决问题时,上述方案应进行协同调整
  • 调整应用逻辑
    Executor OOM 一般发生Shuffle阶段,该阶段需求计算内存较大,且应用逻辑对内存需求有较大影响,下面举例就行说明:

    • 选择合适的算子。 如:groupByKey 转换为 reduceByKey
      一般情况下,groupByKey能实现的功能使用reduceByKey均可实现,而ReduceByKey存在Map端的合并,可以有效减少传输带宽占用及Reduce端内存消耗。


      选择合适的算子
    • 数据倾斜预处理

      数据倾斜是指任务间处理的数据量存大较大的差异。
      如左图所示,key 为010的数据较多,当发生shuffle时,010所在分区存在大量数据,不仅拖慢Job执行(Job的执行时间由最后完成的任务决定)。 而且导致010对应Task内存消耗过多,可能导致OOM. 而右图,经过预处理(加盐,此处仅为举例说明问题,解决方法不限于此)可以有效减少数据倾斜导致 的问题

数据倾斜预处理

注:上述举例仅为说明调整应用逻辑可以在一定程序上解决OOM问题,解决方法不限于上述举例

动态资源分配效果

本文主要针对 1051847284 条过车记录(约10亿)进行如下操作,分别记录时间。

类型 SQL
count select count(1) from sparta_pass_di
全局排序 select * from sparta_pass_di order by passTime desc limit 10
聚合排序 select plateNo, count(1) as cnt from sparta_pass_di group by plateNo order by cnt desc limit 10;
过滤查询 select * from sparta_pass_di where plateNo = '粤GU54MX' limit 10;

Thrift server V.S. spark sql

以融合平台的Thrift Server为例,先简单对比Thrift server与spark sql之间的性能差异,如下图结果可知,总资源一致的情况下基本没有太大差异:

sql/命令/时间 spark-sql --master yarn-client --driver-memory 10G --driver-cores 1 --executor-memory 6G --executor-cores 1 --num-executors 2 --conf spark.sql.shuffle.partition=500 start-thriftserver.sh --master yarn-client --driver-memory 10G --num-executors 2 --conf spark.driver.memory=10g --executor-memory 6g --conf spark.sql.shuffle.partition=500
select count(1) from sparta_pass_di 6 s 7 s
select * from sparta_pass_di order by passTime desc limit 10 21 min 20 min
select plateNo, count(1) as cnt from sparta_pass_di group by plateNo order by cnt desc limit 10; 2.0 min 1.4 min
select * from sparta_pass_di where plateNo = '粤GU54MX' limit 10; 0.4 s 0.3 s

静态资源分配 V.S. 动态资源分配

在上述前提下,对比静态资源分配和动态资源分配之间的差异,可以看到在明显耗时的全局排序耗时明显更短,其余性能差距不大,但是空闲资源会被释放。

sql/命令/时间 spark-sql --driver-memory 10G --driver-cores 1 --executor-memory 1G --executor-cores 1 --num-executors 12 --conf spark.sql.shuffle.partition=500 start-thriftserver.sh --driver-memory 10G --driver-cores 1 --conf spark.dynamicAllocation.enabled=true --conf spark.shuffle.service.enabled=true --conf spark.dynamicAllocation.minExecutors=1 --conf spark.dynamicAllocation.maxExecutors=12 --conf spark.executor.memory=1g --conf spark.executor.cores=1 --conf spark.sql.shuffle.partition=500
select count(1) from sparta_pass_di 8 s 10 s
select * from sparta_pass_di order by passTime desc limit 10 11 min 4.1 min
select plateNo, count(1) as cnt from sparta_pass_di group by plateNo order by cnt desc limit 10; 57 s 51 s
select * from sparta_pass_di where plateNo = '粤GU54MX' limit 10; 0.5 s 0.5 s

shuffle.partition 多 V.S. 少

spark.sql.shuffle.partition的默认值为200,增加spark.sql.shuffle.partition到500,没有看到明显的性能提升

sql/命令/时间 spark-sql \ --master yarn-client \ --driver-memory 10G \ --driver-cores 1 \ --executor-memory 6G \ --executor-cores 1 \ --num-executors 2 \ spark-sql \ --master yarn-client \ --driver-memory 10G \ --driver-cores 1 \ --executor-memory 6G \ --executor-cores 1 \ --num-executors 2 \ --conf spark.sql.shuffle.partition=500
select count(1) from sparta_pass_di 9 s 6 s
select * from sparta_pass_di order by passTime desc limit 10 23 min 21 min
select plateNo, count(1) as cnt from sparta_pass_di group by plateNo order by cnt desc limit 10; 1.6 min 2.0 min
select * from sparta_pass_di where plateNo = '粤GU54MX' limit 10; 0.3 s 0.4 s

阶段性总结一下,动态资源分配在简单任务性能与静态资源分配差不多,在复杂任务中性能提升较多,可能是由于常驻的外部shuffle服务带来的性能提升,需要进一步测试验证。而由于实际应用中executor内存都分配得较大,总资源一定的情况下,使得任务的并行度较小,任务执行更慢(21 min V.S. 4.1 min)。通过增加spark.sql.shuffle.partition来提升任务并行度,没有看到明显的性能提升。

可能约束

  1. 流式采集使用DC,是否可用动态分配。当前DC并未使用Spark,动态资源分配可能需要额外的开发。
  2. 算法引擎的资源管控可能会失效。由于资源未知,只设定了范围([最小值,最大值]),那资源管控到底以哪一个为主?

总结

本案例针对现场出现的资源不足问题做了分析,对任务进行了分类,然后引入动态分配机制,对融合平台的Thrift Server做了几组测试,可以看到动态资源分配优势较大,建议推广。

参考文献

  1. https://spark.apache.org/docs/latest/job-scheduling.html
  2. https://spark.apache.org/docs/latest/running-on-yarn.html#configuring-the-external-shuffle-service
  3. https://dzone.com/articles/spark-dynamic-allocation
  4. https://www.jianshu.com/p/10e91ace3378

附录一、动态资源分配参数说明

Dynamic Allocation (https://spark.apache.org/docs/latest/configuration.html#dynamic-allocation

Property Name Default Meaning
spark.dynamicAllocation.enabled false Whether to use dynamic resource allocation, which scales the number of executors registered with this application up and down based on the workload. For more detail, see the description here. This requires spark.shuffle.service.enabled to be set. The following configurations are also relevant: spark.dynamicAllocation.minExecutors, spark.dynamicAllocation.maxExecutors, and spark.dynamicAllocation.initialExecutors spark.dynamicAllocation.executorAllocationRatio
spark.dynamicAllocation.executorIdleTimeout 60s If dynamic allocation is enabled and an executor has been idle for more than this duration, the executor will be removed. For more detail, see this description.
spark.dynamicAllocation.cachedExecutorIdleTimeout infinity If dynamic allocation is enabled and an executor which has cached data blocks has been idle for more than this duration, the executor will be removed. For more details, see this description.
spark.dynamicAllocation.initialExecutors spark.dynamicAllocation.minExecutors Initial number of executors to run if dynamic allocation is enabled. If --num-executors (or spark.executor.instances) is set and larger than this value, it will be used as the initial number of executors.
spark.dynamicAllocation.maxExecutors infinity Upper bound for the number of executors if dynamic allocation is enabled.
spark.dynamicAllocation.minExecutors 0 Lower bound for the number of executors if dynamic allocation is enabled.
spark.dynamicAllocation.executorAllocationRatio 1 By default, the dynamic allocation will request enough executors to maximize the parallelism according to the number of tasks to process. While this minimizes the latency of the job, with small tasks this setting can waste a lot of resources due to executor allocation overhead, as some executor might not even do any work. This setting allows to set a ratio that will be used to reduce the number of executors w.r.t. full parallelism. Defaults to 1.0 to give maximum parallelism. 0.5 will divide the target number of executors by 2 The target number of executors computed by the dynamicAllocation can still be overridden by the spark.dynamicAllocation.minExecutors and spark.dynamicAllocation.maxExecutors settings
spark.dynamicAllocation.schedulerBacklogTimeout 1s If dynamic allocation is enabled and there have been pending tasks backlogged for more than this duration, new executors will be requested. For more detail, see this description.
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout schedulerBacklogTimeout Same as spark.dynamicAllocation.schedulerBacklogTimeout, but used only for subsequent executor requests. For more detail, see this description.

本文由博客一文多发平台 OpenWrite 发布!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,968评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,601评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,220评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,416评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,425评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,144评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,432评论 3 401
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,088评论 0 261
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,586评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,028评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,137评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,783评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,343评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,333评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,559评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,595评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,901评论 2 345

推荐阅读更多精彩内容

  • 资源分配是Spark任务中需要深入理解的,如果相关的参数设置不合理,将会造成资源分配不均的情况,可能导致某些任务大...
    zhuamengjio阅读 1,096评论 0 0
  • 原文:https://tech.meituan.com/spark-tuning-basic.html Spark...
    code_solve阅读 1,211评论 0 10
  • 1. 概述 在开发完Spark作业之后,就该为作业配置合适的资源了。Spark的资源参数,基本都可以在spark-...
    CoderJed阅读 578评论 0 1
  • 1. 概述 在开发完Spark作业之后,就该为作业配置合适的资源了。Spark的资源参数,基本都可以在spark-...
    数据萌新阅读 465评论 0 0
  • 透过窗 看着一群奔跑的孩子 一模一样的孩子 在新港遇到过 在育秀遇到过 在国际同样遇到过 他们究竟有什么不同呢? ...
    漫彧阅读 425评论 1 1