我们以数据源自kafka为例,进行spark作业调优的分析
1 资源评估
网络能力:
评估下使用的节点数、网络带宽,与所要处理的数据量,在网络能力上是否匹配。节点直接的网络是否符合预期。
计算能力:
估算下所拥有的节点的总核数,能否足以使得输入数据的处理完全并发。比如spark 读取kafka数据, topic为1000 partitions. 能够实际分配的总核数,也接近于1000,那么我们可以认为数据可以并发。
注意,在评估计算能力的时候,比如spark 实际是on yarn的,那么需要关注所使用的yarn 队列的实际资源情况,yarn对节点的资源管理情况(节点允许分配的内存、vcores)。需要注意vcore与实际节点物理cpu算力的差异。进行评估时,应以实际算力为准。
业务运算消耗:
根据实际spark 进行的运算,比如是否有和内存数据库的交互,再比如处理kafka数据的情况,每个paritition实际的数据量,根据业务的以为的经验,大概性能值。
通过资源评估,使得我们对所拥有的计算能力有一个大致的了解,不至于预期偏离太多。业务运算的消耗,可以在作业实际执行过程中进行观察。
2 初始参数的确定
partition数的选择
目前spark 读取kafka 主流都采用了direct API,这种方式下,partition的数目直接影响到了spark job中的task数。
paritition数目并非越大越好,较大的分区数会将任务切分的太细,task的多次加载的时间会被浪费。
过小的partition数目会将任务切分的太粗,不能够充分利用节点的所有计算能力进行并发。一个合适的分区数就是在两者之间进行平衡。
另外需要注意的是paritition数会对加大kafka集群的负载,需要关注kafka集群中每个broker负责的所有topic分区数是有限度的,在不同的kafka版本中会有差异。
kafka paritition分区数对kafka集群性能的影响可以参见:
https://www.confluent.io/blog/how-choose-number-topics-partitions-kafka-cluster
https://www.confluent.io/blog/apache-kafka-supports-200k-partitions-per-cluster
spark executor 的配置
即executor 的个数,cores,内存该如何配置呢
以往的经验值中,executor cores是4一个比较好的选择。
executor内存需要和实际的业务逻辑结合在一起进行评估,需要注意的是,当partition数变少时,单个task所需的内存会变多,executor所需的内存会变多。
当cores较多时,executor所需的内存也会增加。
对于executor个数而言,最理想的情况下,spark job的一个task 能够利用一个core的算力, 从而所有的task都能够真正并行的运行。
我们可以从所拥有计算资源的算力出发,给出executor个数的选择。
比如集群能够提供1000core, 那么我们可以配置executor 为250 executors * 4 cores
注意应当是在集群内存满足的前提下。
另外需要注意的是,这里的算力, core应当是真实能够提供出来的。即,当我们按照这个提交spark作业时,节点的cpu load(top 命令看到的load avage)值在一个合理的水平,即load 值接近,但不高于节点的核数。
超过节点核数较多时,会导致节点cpu负载太高,不能如期并发。比节点核数小太多,则意味着节点的计算资源不能够充分利用。合理的executor值,应当是两者之间的平衡。
比如我们认为我们拥有2000core的计算资源,当按照500*4 进行运行时,节点的负载都很高时,就意味着我们需要调小executor个数。为此,我们选择了2000的paritition数,可以先不调整,观察节点的cpu load情况。
3 运行作业缓慢的排查
分析task的运行时长分布。
根据spark streaming页面上提供的task的运行信息,观察task 运行时长75% max 值分别是多少,是否相差太大。相差太大就要分析原因,是数据分布不均衡,还是节点计算能力有差异,亦或者节点太繁忙算力浮动较大。节点的max值,和该stage的运行时长的关系。如果不等于该stage的运行时长,则说明作业没有完全并发。
分析task的duration时长。
对duration时长进行排序,观察运行最快/慢的节点是否有相同点,节点的负载、网络、节点的规格等是否有差异等。比如是否是所有运行慢的task都跑在负载很高的节点上,而运行快的节点负载一般,后续则需要分析产生这样调度的原因。
欢迎各路大神补充:)