1.persist 与 cache 区别
persist源码内部调用了
persist(self, storageLevel=StorageLevel.MEMORY_ONLY),可以设置persist的级别;
cache() 内部调用了persist,设置persis水平为MEMORY_ONLY
def cache(self):
"""
Persist this RDD with the default storage level (C{MEMORY_ONLY}).
"""
self.is_cached = True
self.persist(StorageLevel.MEMORY_ONLY)
return self
2.输出压缩格式方法
df.saveAsTextFile(savepath, compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec")
3.sc.wholeTextFiles 与 sc.textFile
- sc.textFiles(path) 能将path 里的所有文件内容读出,以文件中的每一行作为一条记录的方式;
- sc.wholeTextFiles 返回的是[(key, val), (key, val)...]的形式,其中key是文件路径,val是文件内容,这里我们要注意的重点是:每个文件作为一个记录!这说明这里的 val 将不再是 list 的方式为你将文件每行拆成一个 list的元素, 而是将整个文本的内容以字符串的形式读进来,也就是说val = '...line1...\n...line2...\n' 这时需要你自己去拆分每行.
4.aggregateByKey用法
aggregateByKey(zeroValue,seqFunc,combFunc,numPartitions=None, partitionFunc=<function portable_hash at 0x0000020066CC9620>)
用于对key进行聚合的转化操作。
参数:
zeroValue: 初始化参数,一般设置为不影响后面函数的计算,例如如果是累加就初始化为0,如果是list累加,就初始化为[]等等。
seqFunc: 元素操作
combFunc:聚合操作
numPartitions: 分区数量
例:
x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])
zeroValue = [] #初始元素
mergeVal = (lambda aggregated, el: aggregated + [(el,el**2)])
mergeComb = (lambda agg1,agg2: agg1 + agg2 )
y = x.aggregateByKey(zeroValue,mergeVal,mergeComb)
y.collect()
Out[59]: [('B', [(1, 1), (2, 4)]), ('A', [(3, 9), (4, 16), (5, 25)])]
x.collect()
Out[60]: [('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)]
5.mapPartition与map区别
两者的主要区别是调用的粒度不一样:map的输入变换函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区。假设一个rdd有10个元素,分成3个分区。如果使用map方法,map中的输入函数会被调用10次;而使用mapPartitions方法的话,其输入函数会只会被调用3次,每个分区调用1次。
例如:
rdd1 = sc.parallelize(range(10), 3)
def myfuncPerElement(line):
print("run in element")
return line**2
def myfuncPartition(line):
print("run in partition")
yield sum(line)
rdd1.map(myfuncPerElement).collect()
rdd1.mapPartitions(myfuncPartition).collect()
在spark shell中运行上述代码,可看到打印了3次run in partition,打印了10次run in element,
从输入函数(myfuncPerElement、myfuncPerPartition)层面来看,map是推模式,数据被推到myfuncPerElement中;mapPartitons是拉模式,myfuncPerPartition通过迭代子从分区中拉数据。
MapPartitions的优点:
这两个方法的另一个区别是在大数据集情况下的资源初始化开销和批处理处理,如果在myfuncPerPartition和myfuncPerElement中都要初始化一个耗时的资源,然后使用,比如数据库连接。在上面的例子中,myfuncPerPartition只需初始化3个资源(3个分区每个1次),而myfuncPerElement要初始化10次(10个元素每个1次),显然在大数据集情况下(数据集中元素个数远大于分区数),mapPartitons的开销要小很多,也便于进行批处理操作。
MapPartitions的缺点:
如果是普通的map操作,一次function的执行就处理一条数据;那么如果内存不够用的情况下,比如处理了1千条数据了,那么这个时候内存不够了,那么就可以将已经处理完的1千条数据从内存里面垃圾回收掉,或者用其他方法,腾出空间来。所以说普通的map操作通常不会导致内存的OOM异常。但是MapPartitions操作,对于大量数据来说,比如甚至一个partition,100万数据,一次传入一个function以后,那么可能一下子内存不够,但是又没有办法去腾出内存空间来,可能就OOM,内存溢出。
什么时候比较适合用MapPartitions系列操作:
数据量不是特别大的时候,都可以用这种MapPartitions系列操作,性能还是非常不错的,是有提升的。但是也有过出问题的经验,MapPartitions只要一用,直接OOM,内存溢出,崩溃。在项目中,自己先去估算一下RDD的数据量,以及每个partition的量,还有自己分配给每个executor的内存资源。看看一下子内存容纳所有的partition数据,行不行。如果行,可以试一下,能跑通就好。性能肯定是有提升的。但是试了一下以后,发现,不行,OOM了,那就放弃吧。
6.stage的划分
(1) Rdd的依赖关系:
Rdd的依赖有两种:
* 宽依赖(Wide Dependency)
* 窄依赖(Narrow Dependency)
以下图说明RDD的窄依赖和宽依赖:
(2)窄依赖
窄依赖指父RDD的每一个分区最多被一个子RDD的分区所用,表现为
• 一个父RDD的分区对应于一个子RDD的分区
• 两个父RDD的分区对应于一个子RDD 的分区。
如上面的map,filter,union属于第一类窄依赖,而join with inputs co-partitioned(对输入进行协同划分的join操作)则为第二类窄依赖
(3)宽依赖
宽依赖指多个子RDD的分区会依赖同一个parent RDD分区,这是shuffle类操作,上图中的groupByKey和对输入未协同划分的join操作就是宽依赖。
(4)宽窄依赖与容错性
Spark基于lineage的容错性是指,如果一个RDD出错,那么可以从它的所有父RDD重新计算所得,如果一个RDD仅有一个父RDD(即窄依赖),那么这种重新计算的代价会非常小。Spark基于Checkpoint(物化)的容错机制何解?在上图中,宽依赖得到的结果(经历过Shuffle过程)是很昂贵的,因此,Spark将此结果物化到磁盘上了,以备后面使用。
7.排序分区函数
repartitionAndSortWithinPartitions(numPartitions=None,ascending=True, partitionFunc=lambda x: hash(x[0]))
该函数的作用是指定数据分区数量以及分区规则,并根据key进行排序。
numPartitions指定重新分区的数量,默认按照key升序排序,partitionFunc可以指定分区的key。
例如:
rdd = sc.parallelize([((0,10), 10), ((3,4), 80), ((2,1), 6), ((0,1), 8), ((3,2), 12), ((1,4), 3), ((3,20), 4), ((3,4), 18), ((3,78), 8), ((3,20), 8)])
#rdd.glom().collect()
rdd2 = rdd.repartitionAndSortWithinPartitions(10, partitionFunc= lambda x: hash(x[0]))
rdd2.glom().collect()
[out1:]
[[((0, 1), 8), ((0, 10), 10)],
[((1, 4), 3)],
[((2, 1), 6)],
[((3, 2), 12),
((3, 4), 80),
((3, 4), 18),
((3, 20), 4),
((3, 20), 8),
((3, 78), 8)],
[],
[],
[],
[],
[],
[]]
8.Spark 运行脚本参数设置
(1)--master yarn采用yarn来管理资源调度
yarn cluster: 这个就是生产环境常用的模式,所有的资源调度和计算都在集群环境上运行。
yarn client: 这个是说Spark Driver和ApplicationMaster进程均在本机运行,而计算任务在cluster上。
(2)--deploy-mode client 或 cluster, 默认是client
(3)--num-executors用于设置Spark作业总共要用多少个Executor进程来执行。Driver在向YARN集群管理器申请资源时,YARN集群管理器会尽可能按照你的设置来在集群的各个工作节点上,启动相应数量的Executor进程。这个参数非常之重要,如果不设置的话,默认只会给你启动少量的Executor进程,此时你的Spark作业的运行速度是非常慢的。
每个Spark作业的运行一般设置50~100个左右的Executor进程比较合适,设置太少或太多的Executor进程都不好。设置的太少,无法充分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源。
(4)--executor-memory用于设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常,也有直接的关联。
(5)--executor-cores用于设置每个Executor进程的CPU core数量。这个参数决定了每个Executor进程并行执行task线程的能力。因为每个CPU core同一时间只能执行一个task线程,因此每个Executor进程的CPU core数量越多,越能够快速地执行完分配给自己的所有task线程。
Executor的CPU core数量设置为2~4个较为合适。同样得根据不同部门的资源队列来定,可以看看自己的资源队列的最大CPU core限制是多少,再依据设置的Executor数量,来决定每个Executor进程可以分配到几个CPU core。建议,如果是跟他人共享这个队列,那么num-executors * executor-cores不要超过队列总CPU core的1/3~1/2左右比较合适,也是避免影响其他同学的作业运行。
(6)--driver-memory用于设置Driver进程的内存。driver的内存通常来说不设置,或者设置1G左右应该就够了。唯一需要注意的一点是,如果需要使用collect算子将RDD的数据全部拉取到Driver上进行处理,那么必须确保Driver的内存足够大,否则会出现OOM内存溢出的问题。
(7)--conf spark.default.parallelism用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能。一般设置的个数是num-executors * executor-cores的2-3倍。如果task数量偏少的话,就会导致你前面设置好的Executor的参数都前功尽弃。试想一下,无论你的Executor进程有多少个,内存和CPU有多大,但是task只有1个或者几个,那么大部分的Executor进程可能根本就没有task执行,也就是白白浪费了资源!另外,每个task执行的速度不一样,如果设置的task低于或者等于num-executors * executor-cores,就会有很多task执行完需要等待没有执行完的task,这样也会浪费集群资源。如果输入数据是gz文件,那这个参数在读取数据阶段是不起作用的,默认一个gz文件一个task(分区)。
def calculate_num(value):
"""
计算phone和imei 1:N 对应关系
:param value:("0|19999269677",[['201807', '00000000000033'], ['201806', '00000000000033'], ['201805', '00000000000033']])
:return: imei_num:(1|imei,[num,month(最新月份)])phone_num子目录:(0|phone,[num,month(最新月份)])
"""
uniqSet = set()
latestMonth = "" ## 最新月份
for val in value[1]:
if latestMonth == "":
latestMonth = val[0]
##只输出最新月份的统计值即可
if latestMonth != "" and JOB_CURRENT_MONTH != latestMonth:
return (value[0], [0, latestMonth])
## 如果是IMEI,需要截取前14位
if value[0][0:2] == "0|":
uniqSet.add(val[1][0:14])
else:
uniqSet.add(val[1])
return (value[0], [len(uniqSet), latestMonth])
phone_num = filein.map(lambda line: (("0" + "|" + line[3], line[5]), line[4])). \
repartitionAndSortWithinPartitions(ascending=False, partitionFunc=lambda x: hash(x[0])).map(
lambda line: (line[0][0], [line[0][1], line[1]])).groupByKey().map(calculate_num).\
filter(lambda line: line[1][0] != 0)