1. 优化数据结构
2. 修改并行度
1. 改变并行度可以改善数据倾斜的原因是因为如果某个task有100个key并且数据巨大,那么有可能导致OOM或者任务运行缓慢;
2. 此时如果把并行度变大,那么可以分解每个task的数据量,比如把该task分解给10个task, 那么每个task的数据量将变小,从而可以解决OOM或者任务执行慢.
对应reduceByKey而言可以传入并行度参数也可以自定义partition.
3. 增加并行度:改变计算资源并没有从根本上解决数据倾斜的问题,但是加快了任务运行的速度.
4. 这是加入有倾斜的key, 加随机数前缀,reduceByKey聚合操作可以分而治之,产生的结果是代前缀的,因此需要map操作去掉前缀,然后在进行reduceByKey操作.
3. 对数据做采样, 对数据倾斜的key增加随机的前缀.
(1) 针对如果倾斜的key比较少:
对与两个RDD1和RDD2 的join操作, 其中一个RDD, 比如RDD1的数据倾斜的key比较少(比如可以通过sample取样)在三个左右,那么这时候可以把RDD1转换为RDD11(倾斜的key),RDD12(不包含倾斜的key),然后分别和RDD2进行join操作得到的两个结果result1,result2再次join产生最终的result.
(2) 针对如果倾斜的key特别多.
如果特别多的key倾斜那么就不需要考虑某一个key了,把所有的key整体考虑即可,需要把整体的数据量变大;
比如10亿的数据变成500亿,这时候可以使用flatmap进行扩容,比如
scala> List(1,2,3,4,5)
res0: List[Int] = List(1, 2, 3, 4, 5)
scala> res0.flatMap(x => 1 to x )
res1: List[Int] = List(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5)
但是具体扩容的数量要依据机器的各方面的配置.
第一我们要解决的是数据能够均匀的分布到各个节点,让集群能够正常运行起来.
伪代码:
对一个rdd使用flatmap,另外一个使用random
val rdd1 = RDD1.flatmap{
for(i <- 1 to 10) {
i+"_"+item;
}
}
val rdd2 = RDD2.map{
val random = Random(10)
random+"_"+item;
}
val result = rdd1.join(rdd2);
result.map{
item.split //去掉前缀.
}
4. 局部聚合+全局聚合
5. ETL
6. 尽量不要产生shuffle
(1) 对小批量的数据进行广播.
针对两个或者多个RDD进行join操作, 如果其中一个RDDD数据比较小可以采用broadcast的方式(然后进行map操作,mappartition 批量加载数据进行优化) 如果数据都比较大的话会给GC带来负担.不建议使用.
(2) 大表适合使用广播
7. tacheyon
8. 复用RDD.
9. 从数据源头开始考虑.
(1) 可以把key-values 变为key-subkey-values
(2) 提取聚集,预操作join, 把倾斜数据在上游进行操作.
(3) 把所有values的值进行组拼然后就可以形成一个单一的key-values.
(4) 针对比如大量的key倾斜,比如数十万的key倾斜,最简单的办法就是从硬件上去调整,增加cpu, 内存.