Spark 性能调优

1 Spark SQL性能调优

通过缓存数据、调优参数、增加并行度提升性能P94
1)缓存数据
构建一个内存中的列格式缓存表,Spark SQL仅扫描需要的列,并自动调整压缩比,使内存使用率和GC压力最小

2)调优参数
优化选项配置参数

配置可分区的数目 shuffle.partitions
配置将需要执行的sort溢出到磁盘上,否则在每个分区的内存中

3)增加并行度
合理设置并行度提升文件加载效率和并行执行效率
Spark的数据采用内存列式存储,实际执行查询阶段效率较高,相对而言,数据加载阶段耗时较长,对于如何提升数据加载效率,并行加载数据是一个优化方向。

2 Spark Streaming性能调优

重点需要考虑以下两件事情:

有效使用集群资源,减少每批次数据的处理时间;
设置合理的窗口大小,从而使数据尽可能得到处理(即数据处理和数据接收节奏一致)

优化运行时间
优化运行时间可以降低每个批次数据的处理时间,主要包括:

提升数据接收并行度

  • 通过提升Recerver的并发度:通过创建多个Dstream并配置从数据源接受不同分区的数据流,从而实现接受多个数据流;
  • 调整Receiver的RDD数据分区时间间隔:修改blockInterval参数,调整Receiver的blocking interval,对于大多数的Receiver,接受到的数据合并成大的数据块,然后存储在Spark的内存中;推荐block interval最小值是50毫秒

提升数据处理的并行度

  • 任务执行阶段并行度不高,则会造成集群资源利用率低下;确保均衡地使用整个集群的资源,而不是把任务集中在几个特定的节点上,对于包含Shuffle的操作,增加其并行度以确保更充分的使用集群资源

减少序列化和反序列化负担

  • 数据序列化只要包括两个方面:

RDD数据序列化:默认情况下RDD被保存为序列化字节数组来减少GC停顿
输入数据序列化,将获取的外部数据插入Spark,接收到的数据为字节型,需要反序列化为Spark的序列化格式。因此输入数据的反序列化开销可能会成为一个瓶颈

  • Spark Streaming默认将接收到的数据序列化存储,以减少内存的使用。序列化和反序列化,需要更多的CPU时间,更加高效的序列化方式(Kryo)和自定义的序列化接口,可以更高效的使用CPU。

减少任务提交和分发开销

  • 设置合适的Batch间隔,减少时延,可以将任务序列化(序列化的任务可以减少任务的大小,因此减少了发送到节点的时间),也可以使用粗粒度运行任务,相比细粒度有着更低的延迟。

优化内存使用

  • 合理设置DStream存储级别:RDD默认是MEMORY_ONLY,DStream默认是MEMORY_ONLY_SER,尽管保持数据的序列化和反序列化会带来更高的开销,但是却大大减少了GC停顿的情况
  • 及时清理持久化的RDD:会使用内置的内存清理策略LRU,设置自动定期清除旧的内容,设置spark.streaming.unpersist属性启动内存清理,减少Spark RDD内存的使用,提升GC性能
  • 并发垃圾收集策略:采用不同的GC策略进一步减少GC对Job运行的影响,例如:使用并行mark-and-sweep GC能减少GC的突然暂停情况

设置合适的批次大小

  • 处理数据的速度要跟的上数据流入的速度,如何设置Batch size和数据输入速度,确保系统能跟得上数据输入速度,可根据经验调整,查看日志获取总延迟,进行调整;如果延迟时间< Batch批处理时间,则系统稳定,如果延迟一直增加,说明系统的处理速度跟不上数据的输入速度

Spark Sreaming的容错处理
1)回顾RDD的容错处理


RDD的容错机制

2)Spark streaming的容错处理

  • 文件输入源:

有容错的文件系统,HDFS S3

  • 基于Recerver的输入源
    Spark1.2后,接受数据进行容错存储,并提前写日志(write ahead log),用来实现零数据丢失。

可靠的接收器
不可靠的接收器

  • 输出操作

所有的数据都以RDD操作的血统形式存在,任何重复计算都会得到相同的结果,这样一来,所有的DStream转换都确保恰有一次的语义。

3 Spark性能调优

Spark性能优化总结

优化的目的是

保证大数据量下任务运行成功
降低资源消耗
提高计算性能

1)程序优化:

在进行shuffle操作时,如reduceByKey、groupByKey,会划分新的stage。同一个stage内部使用pipe line进行执行,效率较高;stage之间进行shuffle,效率较低。shuffle会产生网络磁盘IO,故大数据量下,应进行代码结构优化,尽量减少shuffle操作。

设置缓存cache,根据程序缓存合适的RDD,设置缓存级别

优化Partition,重新分区或者合并小文件,降低并发量

2)资源配置:

设置合适的资源参数(executor的数量,内存大小,并发数量等)

查看日志
打印GC日志,或者查看本地的executor的日志,或者查看yarn web UI运行的日志

内存优化/GC优化:
可以减少整个堆内存的大小;可以让年轻代的对象尽快进入年老代,增加年老代的内存;可以让年老代更频繁的进行父gc。

3)其他优化
压缩处理、序列化、设置共享变量等


1)输入采用大文件

将数量众多的小文件合并成大一些的文件,对小文件做预处理

2)IZO压缩处理

3)Cache压缩

RDD Cache本身的目的是追求速度,减少重算步骤。但往往会对内存造成负担,因此缓存压缩也是性能优化、减小负担的一部分。下面主要介绍Spark中各种配置对压缩的影响。
1、利用spark.rdd.compress压缩(默认是不压缩的)
2、利用spark.io.compression.codec压缩(默认采用Snappy压缩)

4)序列化数据

Spark提供了一下两种序列化类库:
1、Java序列化

Java序列化: Java的ObjectOutputStream框架作为Spark序列化默认的序列化方法,只需要实现java.io.Serializable接口就可以直接使用。Java序列化很灵活,但是速度很慢,同时序列化的格式也很大。

2、Kryo序列化

Spark也可以支持Kryo的序列化库(version 2), Kryo序列化能够更快地序列化数据,而且比Java序列化更加高效,通常序列化速度为Java序列化的10倍。但是Kryo序列化并不支持所有的Serializable类型,并且使用Kryo序列化,需要注册后才能使用。

5)缓存

spark.executor.memory:决定了每个Executor可用内存的大小 ;
spark.storage memoryFraction:则决定了在这部分内存中有多少可以用于Memory Store管理RDD Cache数据;
剩下的内存:用来保证任务运行时各种其他内存空间的需要。

spark.executor.memory默认值为0.6,官方文档建议这个比值不要超过JVM Old Gen区域的比值。这也很容易理解,因为RDD Cache数据通常都是长期驻留内存的,理论上也就是说最终会被转移到Old Gen区域(如果该RDD还没有被删除),如果这部分数据允许的尺寸太大,势必把Old Gen区域占满,造成频繁的Full GC.

如何调整这个比值,取决于你的应用对数据的使用模式和数据的规模,粗略地来说,如果频繁发生Full GC,可以考虑降低这个比值,这样RDD Cache可用的内存空间减少(剩下的部分Cache数据就需要通过Disk Store写到磁盘上),会带来一定的性能损失,但是这会腾出更多的内存空间用于执行任务,减少Full GC发生的次数,反而可能改善程序运行的整体性能。

当发现JVM的垃圾收集经常消耗较高或耗尽内存时,以防任务运行缓慢,可以设置此值降低内存消耗。如果想改变此值为50%,可以通过在SparkConf中设置conf.set("spark storage.memoryFraction", "0.5")完成。结合使用序列化缓存,使用一个较小的缓存应足以减轻大多数垃圾收集的问题。

6)共享变量

Spark支持以下两种类型的共享变量:

  • 广播变量:可以在内存的所有节点中被访问,用于缓存变量(只读)。
  • 累加器:只能用来做加法的变量,如计数和求和。

在任务和驱动程序之间共享变量(如静态查找表),可以极大减少每个序列化任务的大小以及在集群中启动一个Job的代价。

7)流水线优化

宽依赖和窄依赖的根本区别是操作是否存在Shuffle操作。
窄依赖指父RDD的每一个分区最多被一个子RDD的分区所用,表现为一个父RDD的分区对应于一个子RDD的分区,以及两个父RDD的分区对应于一个子RDD的分区。(一个或者多个父分区对应一个子分区)
宽依赖指子RDD的分区依赖于父RDD的所有分区,这是因为Shuffle类操作。(一个父分区对应多个子分区)

窄依赖对优化很有利。逻辑上,每个RDD的算子都是一个Fork/Join (此Join非上文的 Join算子,而是指同步多个并行任务的屏障Barrier):把计算Fork到每个分区,算完后Join,然后Fork/Join下一个RDD的算子。如果直接翻译到物理实现,是很不经济的:
一是每一个 RDD (即使是中间结果)都需要物化到内存或存储中,费时费空间;
二是Join作为全局的屏障Barrier,是很昂贵的,会被最慢的那个节点拖死。如果子RDD的分区到父RDD的分区是窄依赖,就可以实施经典的Fusion优化,把两个Fork/Join合为一个;
如果连续的变换算子序列都是窄依赖,就可以把很多个Fork/Join并为一个,不但减少了大量的全局屏障Barrier,而且无需物化很多中间结果RDD,这将极大地提升性能。


Boy-20180717

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342