- 运行spark程序首先需要声明SparkSession 创建一个自己的集群(下面使用单机演示)
声明SparkSession
很多书上就只有spark,sc等字符,如果不查资料很难知道这些从哪里来
from pyspark.sql import SparkSession
# 创建一个spark 回话,appName任意取
spark = SparkSession.builder.appName('learnspark').getOrCreate()
# sc用于读取外部数据,spark功能非常强大,可以读取文件,s3,hdfs等等数据
sc = spark.sparkContext
- 读取文件的方法很多,这里只介绍读取文件(我在工作中一般用s3读取,具体原理有待研究)
# 创建RDD的方法有两种,第一种使用外部文件
# 这里最后一个参数4表示数据集被划分分区个数,这里划分了4个分区
data = sc.textFile("/FileStore/tables/VS14MORT_txt-b42fa.gz", 4)
# 第二种方式,使用paralleliz集合
# ***一般来说,建议把每个数据集分为2-4个分区
data2 = sc.parallelize([['Amber', 23],['Bob',32], ['Lucy', 18], ['Amber', 12], ['Kent', 40], ['Julia', 25]], 4)
- 转换,这里的 ‘转换’ 是种惰性操作,实际上类似于把指令记录下来,到真正需要用的在执行
1. map()函数:类似于python中的map,对rdd中每一行进行操作
data2.map(lambda x: x[0]).take(10)
[out]:['Amber', 'Bob', 'Lucy', 'Amber', 'Kent', 'Julia']
# 注意,纯python方法会降低程序的运行速度,尽量使用spark内置的功能
2. filter()函数:类似于python中的filter,筛选满足条件的行
data2 = data2.filter(lambda x: x[1]>20)
data2.collect()
[out]: [['Amber', 23], ['Bob', 32], ['Kent', 40], ['Julia', 25]]
3. flatMap()函数:将结果作为一个list输出
data2 = data2.flatMap(lambda x: (x[0],x[0]))
data2.collect()
[out]: ['Amber', 'Amber', 'Bob', 'Bob', 'Kent', 'Kent', 'Julia', 'Julia']
# 必须是tulpe或者list,不然一个字符一个字符输出
data2 = data2.flatMap(lambda x: x[0])
data2.collect()
[out]: ['A', 'm', 'b', 'e', 'r','B','o',..] # 太多了只截取前一部分
# 如果要正常输出应考虑lambda x: (x[0], )
4. distinct()函数:类似于set()
data2 = data2.map(lambda x:x[0]).distinct()
data2.collect()
[out]:['Amber', 'Julia', 'Bob', 'Lucy', 'Kent']
# 注意这是一个高开销的方法,一般和map一起用
5. leftOuterJoin()函数:左连接
rdd1 = sc.parallelize([('a', 1), ('b', 4), ('c',10)])
rdd2 = sc.parallelize([('a', 4), ('a', 1), ('b', '6'), ('d', 15)])
rdd3 = rdd1.leftOuterJoin(rdd2) # rdd2去匹配rdd1
rdd3.collect()
[out]: [('c', (10, None)), ('b', (4, '6')), ('a', (1, 4)), ('a', (1, 1))]
#这是一个高开销的方法
6. join()函数:相当于 innerjoin两个都有才合并
rdd3 = rdd1.join(rdd2)
rdd3.collect()
[out]:[('b', (4, '6')), ('a', (1, 4)), ('a', (1, 1))]
7. intersection()函数:这里貌似rdd中元素必须完全一样才输出,有待研究...
rdd1 = sc.parallelize([('a', 2), ('b', 4), ('c',10)])
rdd2 = sc.parallelize([('a', 4), ('a', 1), ('b', '6'), ('d', 15)])
rdd3 = rdd1.intersection(rdd2)
print(rdd3.collect()) # 返回空,没有完全一样的
[out]:[]
rdd1 = sc.parallelize([('a', 1), ('b', 4), ('c',10)])
rdd2 = sc.parallelize([('a', 4), ('a', 1), ('b', '6'), ('d', 15)])
rdd3 = rdd1.intersection(rdd2)
print(rdd3.collect())
[out]:[('a', 1)]
8. repartition()函数:重新分区,这个功能很重要
在公司遇到过数据量太大, 集群默认分区数量默认值太小的情况,这里就需要重新分区
rdd1 = sc.parallelize([('a', 2), ('b', 4), ('c',10)])
print(rdd1.glom().collect(), len(rdd1.glom().collect())) # 最开始放在8分区里面,其中5个没有数据
[out]: [[], [], [('a', 2)], [], [], [('b', 4)], [], [('c', 10)]] 8
rdd1 = rdd1.repartition(3)
print(rdd1.glom().collect(), len(rdd1.glom().collect()))# 现在存放在4个分区里面,其中1个没数据
[out]: [[], [('a', 2), ('b', 4), ('c', 10)], []] 3
rdd1 = rdd1.repartition(2)
print(rdd1.glom().collect(), len(rdd1.glom().collect()))# 现在存放在2个分区里面,其中一个分区存2个数据
[out]: [[('b', 4)], [('a', 2), ('c', 10)]] 2
- 操作,‘操作’的函数,会把‘转换’的付诸实施
1. take()函数:取前n条返回记录
rdd1 = sc.parallelize([('a', 2), ('b', 4), ('c',10)])
rdd1 = rdd1.map(lambda x: (x[0], x[1]+1))
rdd1.take(1)
[out]: [('a', 3)]
2. takeSample()函数:随机取得n个记录
rdd1 = sc.parallelize([('a', 2), ('b', 4), ('c',10)])
rdd1.takeSample(True, 5) # 第一个参数是否为有放回抽样,第二个参数n条记录,第三个参数seed
[out]: [('a', 2), ('c', 10), ('b', 4), ('a', 2), ('a', 2)]
3. reduce()函数:类似于python reduce
rdd1 = sc.parallelize([('a', 2), ('b', 4), ('c',10)])
rdd1.map(lambda x:x[1]).reduce(lambda x,y:x+y) # 这里不需要collect
[out]: 16
# 注意,如果是/则要看分区
rdd1 = rdd1.repartition(2)
rdd1.map(lambda x:x[1]).reduce(lambda x,y:x/y) # 2/(4/10) 如果再改分区可能会 (2/4)/10
[out]: 5.0
4. count()函数:返回rdd元素总数
rdd1.count()
[out]: 3
5. saveAsTextFile()函数:保存数据到文本
data_key = sc.parallelize([('a', 4),('b', 3),('c', 2),('a', 8),('d', 2),('b', 1),('d', 3)],4)
data_key.saveAsTextFile('/FileStore/tables/tmp.txt')
# 这里[('a', 4),('b', 3),('c', 2),('a', 8),('d', 2),('b', 1),('d', 3)]
# 会变为str([('a', 4),('b', 3),('c', 2),('a', 8),('d', 2),('b', 1),('d', 3)]) 字符串
# 读取时需要转换
def parseInput(row):
row_split = eval(row) # 字符串需要用eval还原
return row_split
data_key = sc.textFile('/FileStore/tables/tmp.txt').map(parseInput)
data_key.collect()
[out]: [('a', 4),('b', 3),('c', 2),('a', 8),('d', 2),('b', 1),('d', 3)]
6. foreach()函数:和map功能类似,但是是for循环一个接一个做
def f(x): # 这个函数在jupyter中不输出,需要在终端中才能输出
print(x)
data_key.foreach(f)
7. collect()函数:rdd转化为python list, 运行时需考虑单机的内存
data_key.collect()
[out]: [('a', 4), ('b', 3), ('c', 2), ('a', 8), ('d', 2), ('b', 1), ('d', 3)]