1,spark的工作机制
用户在客户`端提交作业后,会由Driver运行main方法并创建SparkContext上下文,
SparkContext向资源管理器申请资源, 启动Execotor进程, 并通过执行rdd算子,形成DAG有向无环图,输入DAGscheduler, 然后通过DAGscheduler调度器, 将DAG有向无环图按照rdd之间的依赖关系划分为几个阶段,也就是stage, 输入task scheduler, 然后通过任务调度器taskscheduler将stage划分为task set分发到各个节点的executor中执行。
2,spark中stage是如何划分的
在DAG调度的过程中,Stage阶段的划分是根据是否有shuffle过程,也就是存在ShuffleDependency宽依赖的时候,需要进行shuffle,这时候会将作业job划分成多个Stage
整体思路:从后往前推,遇到宽依赖就断开,划分为一个 stage;遇到窄依赖就将这个 RDD 加入该 stage 中
3, spark的shuffle和调优
Spark中一旦遇到宽依赖就需要进行shuffle的操作,本质就是需要将数据汇总后重新分发的过程, 也就是数据从map task输出到reduce task输入的这段过程, 在分布式情况下,reduce task需要跨节点去拉取其它节点上的map task结果,这就要数据落磁盘,会影响性能,所以说spark通过流水线优化尽力的避免shuffle操作来提高效率
Spark的shuffle总体而言就包括两个基本的过程:Shuffle write 和Shuffle read
在发生shuffle操作的算子中,需要进行stage的划分,shuffle操作之前的一个stage称之为map task,shuffle操作后的一个stage称之为reduce task, 其中map task负责数据的组织,也就是将同一个key对应的value根据hash算法写入同一个下游task对应的分区文件中,也就是Shuffle write, 其中reduce task负责数据的聚合,也就是在上一个stage的task所在的节点上,把属于自己的各个分区文件都拉取过来进行聚合,这个过程称之为shuffle read
map task会将数据先保存在内存中,如果内存不够时,就溢写到磁盘文件中,reduce task 会读取各个节点上属于自己的分区磁盘文件到自己节点的内存中进行聚合
spark shuffle参数调优:
1, spark.shuffle.file.buffe: 32k,参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘
2, spark.reducer.maxSizeInFlight:48m 该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据
3, spark.shuffle.io.maxRetries:3, shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败
4, sparkStreaming调优
1,资源参数调优
增加Driver和Executor的内存,设置合理的cpu个数(--num-executors ,--executor-cores通常设置较多的Executor个数和较少的Executor core个数来达到资源最大使用率)
2,增加并行度parallelism:增加spark partition数量
spark.default.parallelism 可以设置spark 默认的分区数量,另外,在SparkStreaming+Kafka的案列中,如果采用Direct的方式从Kafka中获取数据,此时Kafka partition的数量和spark Rdd的分区数量是1:1的映射关系,最好的方式是直接增加Kafka的分区个数,比如提高到100或者更高;还有一种方式,是可以在创建InputDstream之后再repartition一个更大的并行度,然后再进行逻辑计算,但是实际操作中性能提升很小,是因为repartition会造成shuffle操作
3,设置合理的批处理时间和Kafka数据拉取速率maxRatePerPartition
Spark会每隔batchDuration时间去提交一次job(让程序去处理),如果job处理时间超过了batchDuration的设置,那么会导致后面的作业无法按时提交,随着时间推移,越来越多的作业被拖延,导致整个streaming作业被阻塞
批处理时间不能太短,如果每个批次产生的job不能再这个时间内处理完成,就会造成数据不断积压
另外根据生产者写入 Kafka 的速率以及 Streaming 本身消费数据的速率设置合理的 Kafka 读取速率(spark.streaming.kafka.maxRatePerPartition),使得 Spark Streaming 的每个 task 都能在 Batch 时间内及时处理完 Partition 内的数据,使 Scheduling Delay 尽可能的小
4,使用Kryo序列化
SparkStreaming官方在调优的时候第一个建议就是设置序列化
SparkStreaming在传输,使用对象的时候需要使用序列化和反序列化,Spark默认的是使用Java内置的序列化类, 官方介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右
Spark之所以默认没有使用Kryo作为序列化类库,是因为Kryo要求最好只要是自定义类型,都需要进行注册,因此对于开发者来说,这种方式比较麻烦
5,缓存需要经常使用的数据
对一些经常使用到的数据,我们可以显式地调用rdd.cache()来缓存数据,这样也可以加快数据的处理,但是我们需要更多的内存资源
6,spark优化之广播变量
使用广播变量在sparkContext中,可以大幅降低每一个序列化task这个对象的大小,集群中启动一个job的成本也会降低。如果你的task中使用了一个大对象(large object),考虑把他优化成一个广播变量。通常来说,一个task大于20KB就值得优化
5,spark的算子和分类
Spark算子一般分为TractionFormation类型的算子和Action类型的算子
1,Transformaction:这种变换并不触发提交作业,而是完成作业中间的处理. TractionFormation操作是懒执行的,也就是从一个Rdd转化为另一个Rdd的转化并不是马上执行,而是需要等到有Action操作的时候才会真正触发运算,一般是Map等方法
2,Action行动算子:这类算子会触发SparkContext提交job作业,并将数据落地到磁盘,进行shuffle,对性能开销比较大
Shuffle(Action)算子分类
去重 : def distinct()
聚合 : def reduceByKey() def groupByKey() aggregateByKey combinByKey()
排序 : def sortByKey() def sortBy()
重分区 : def reparation() ==def coalesce() [ˌkəʊəˈles]扩大或者缩小分区
集合或者表操作: join 内连接,intersection 交集 subtract 差集
6,Spark数据倾斜 shuffle操作问题解决(也可以作为遇到的问题)
1, 首先要知道发生数据倾斜的原理,数据倾斜就是进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,此时如果某个key对应的数据量特别大的话,就会发生数据倾斜
2, 数据倾斜问题发现和定位,通过spark web ui 来查看当前运行的stage各个task分配的数据量,从而进一步确定是不是task分配的数据不均匀导致了数据倾斜,知道数据倾斜发生在哪个stage之后,我们就需要根据stage划分原理,推算出来发生倾斜的那个stage对应代码中的哪一部分,这部分代码中肯定会有一个shuffle类型的算子,通过countByKey查看各个key的分布.
3,数据倾斜的 解决方案
(1),过滤少数导致倾斜的key
(2),提高shuffle操作的并行度
(3),如果是聚合类算子,使用局部聚合和全聚合的方式,下面是思路
方案实现思路:这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个key都打上一个随机数,比如10以内的随机数,此时原先一样的key就变成不一样的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)。然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次进行全局聚合操作,就可以得到最终结果了,比如(hello, 4)。
(4),如果是join类型的算子,则使用随机前缀和扩容RDD进行join
(RDD中有大量的key导致数据倾斜)
方案实现思路:将含有较多倾斜key的RDD扩大多倍,与相对分布均匀的RDD配一个随机数。
7,遇见的问题 1,序列化,2,数据倾斜 见上面
报错:Serialization stack: Task not serializable
当我们要对Rdd做map,flapMap,filter等操作的时候是在excutor上完成的,当我们在driver中定义了一个变量,在Rdd中使用的时候,这个变量就需要分发到各个excutor上去,因为dirver和executor运行在不同的jvm中,这就需要涉及对象的序列化和反序列化,如果这个变量没有被序列化,就会报这个异常;
我所遇见的,有来自第三方的一个类对象定义在Driver端,但是他没有实现序列化的接口java.io.Serializable,而我又没法去修改它的源码,这时候我就想到自己定义一个类去继承这个第三方类,然后让自己定义的类实现java.io.serializable,然后再Rdd中调用自己的类,问题就得到了解决
另外的方式是将不可序列化的对象定义在闭包中,或者使用为其注册序列化类的方法;
8,spark中Repartion和Coalesce 的区别?
repartition和Coalesce都是用来对rdd进行重分区的操作,但是看源码可以发现
repartition内部是调用了coalesce方法,然后把shuffle设置为true的实现
减少分区的时候,最好使用coalesce,这样可以避免引起shuffle
增加分区的时候,由于需要重新分区,所以可以使用repartition,也就是coalesce shuffle=ture;