pyspark RDD笔记

  • 运行spark程序首先需要声明SparkSession 创建一个自己的集群(下面使用单机演示)

声明SparkSession

很多书上就只有spark,sc等字符,如果不查资料很难知道这些从哪里来

from pyspark.sql import SparkSession
# 创建一个spark 回话,appName任意取
spark = SparkSession.builder.appName('learnspark').getOrCreate() 
# sc用于读取外部数据,spark功能非常强大,可以读取文件,s3,hdfs等等数据
sc = spark.sparkContext
  • 读取文件的方法很多,这里只介绍读取文件(我在工作中一般用s3读取,具体原理有待研究)
# 创建RDD的方法有两种,第一种使用外部文件
# 这里最后一个参数4表示数据集被划分分区个数,这里划分了4个分区
data = sc.textFile("/FileStore/tables/VS14MORT_txt-b42fa.gz", 4)

# 第二种方式,使用paralleliz集合
# ***一般来说,建议把每个数据集分为2-4个分区
data2 = sc.parallelize([['Amber', 23],['Bob',32], ['Lucy', 18], ['Amber', 12], ['Kent', 40], ['Julia', 25]], 4) 

VS14MORT_txt-b42fa.gz文件下载地址

  • 转换,这里的 ‘转换’ 是种惰性操作,实际上类似于把指令记录下来,到真正需要用的在执行

1. map()函数:类似于python中的map,对rdd中每一行进行操作

data2.map(lambda x: x[0]).take(10)
[out]:['Amber', 'Bob', 'Lucy', 'Amber', 'Kent', 'Julia']
# 注意,纯python方法会降低程序的运行速度,尽量使用spark内置的功能

2. filter()函数:类似于python中的filter,筛选满足条件的行

data2 = data2.filter(lambda x: x[1]>20)
data2.collect()
[out]: [['Amber', 23], ['Bob', 32], ['Kent', 40], ['Julia', 25]]

3. flatMap()函数:将结果作为一个list输出


data2 = data2.flatMap(lambda x: (x[0],x[0])) 
data2.collect()
[out]: ['Amber', 'Amber', 'Bob', 'Bob', 'Kent', 'Kent', 'Julia', 'Julia']
# 必须是tulpe或者list,不然一个字符一个字符输出
data2 = data2.flatMap(lambda x: x[0]) 
data2.collect()
[out]: ['A', 'm', 'b', 'e', 'r','B','o',..] # 太多了只截取前一部分
# 如果要正常输出应考虑lambda x: (x[0], )

4. distinct()函数:类似于set()

data2 = data2.map(lambda x:x[0]).distinct()
data2.collect()
[out]:['Amber', 'Julia', 'Bob', 'Lucy', 'Kent']
# 注意这是一个高开销的方法,一般和map一起用

5. leftOuterJoin()函数:左连接

rdd1 = sc.parallelize([('a', 1), ('b', 4), ('c',10)])
rdd2 = sc.parallelize([('a', 4), ('a', 1), ('b', '6'), ('d', 15)])
rdd3 = rdd1.leftOuterJoin(rdd2)  # rdd2去匹配rdd1
rdd3.collect()
[out]: [('c', (10, None)), ('b', (4, '6')), ('a', (1, 4)), ('a', (1, 1))]
#这是一个高开销的方法

6. join()函数:相当于 innerjoin两个都有才合并

rdd3 = rdd1.join(rdd2)
rdd3.collect()
[out]:[('b', (4, '6')), ('a', (1, 4)), ('a', (1, 1))]

7. intersection()函数:这里貌似rdd中元素必须完全一样才输出,有待研究...

rdd1 = sc.parallelize([('a', 2), ('b', 4), ('c',10)])
rdd2 = sc.parallelize([('a', 4), ('a', 1), ('b', '6'), ('d', 15)])
rdd3 = rdd1.intersection(rdd2)
print(rdd3.collect()) # 返回空,没有完全一样的
[out]:[]
rdd1 = sc.parallelize([('a', 1), ('b', 4), ('c',10)])
rdd2 = sc.parallelize([('a', 4), ('a', 1), ('b', '6'), ('d', 15)])
rdd3 = rdd1.intersection(rdd2)
print(rdd3.collect())
[out]:[('a', 1)]

8. repartition()函数:重新分区,这个功能很重要

在公司遇到过数据量太大, 集群默认分区数量默认值太小的情况,这里就需要重新分区

rdd1 = sc.parallelize([('a', 2), ('b', 4), ('c',10)])
print(rdd1.glom().collect(), len(rdd1.glom().collect())) # 最开始放在8分区里面,其中5个没有数据
[out]: [[], [], [('a', 2)], [], [], [('b', 4)], [], [('c', 10)]] 8

rdd1 = rdd1.repartition(3)
print(rdd1.glom().collect(), len(rdd1.glom().collect()))# 现在存放在4个分区里面,其中1个没数据
[out]: [[], [('a', 2), ('b', 4), ('c', 10)], []] 3

rdd1 = rdd1.repartition(2)
print(rdd1.glom().collect(), len(rdd1.glom().collect()))# 现在存放在2个分区里面,其中一个分区存2个数据
[out]: [[('b', 4)], [('a', 2), ('c', 10)]] 2
  • 操作,‘操作’的函数,会把‘转换’的付诸实施

1. take()函数:取前n条返回记录

rdd1 = sc.parallelize([('a', 2), ('b', 4), ('c',10)])
rdd1 = rdd1.map(lambda x: (x[0], x[1]+1))
rdd1.take(1)
[out]: [('a', 3)]

2. takeSample()函数:随机取得n个记录

rdd1 = sc.parallelize([('a', 2), ('b', 4), ('c',10)])
rdd1.takeSample(True, 5) # 第一个参数是否为有放回抽样,第二个参数n条记录,第三个参数seed
[out]: [('a', 2), ('c', 10), ('b', 4), ('a', 2), ('a', 2)]

3. reduce()函数:类似于python reduce

rdd1 = sc.parallelize([('a', 2), ('b', 4), ('c',10)])
rdd1.map(lambda x:x[1]).reduce(lambda x,y:x+y) # 这里不需要collect
[out]: 16

# 注意,如果是/则要看分区
rdd1 = rdd1.repartition(2)
rdd1.map(lambda x:x[1]).reduce(lambda x,y:x/y) # 2/(4/10) 如果再改分区可能会 (2/4)/10
[out]: 5.0

4. count()函数:返回rdd元素总数

rdd1.count()
[out]: 3

5. saveAsTextFile()函数:保存数据到文本

data_key = sc.parallelize([('a', 4),('b', 3),('c', 2),('a', 8),('d', 2),('b', 1),('d', 3)],4)
data_key.saveAsTextFile('/FileStore/tables/tmp.txt')
# 这里[('a', 4),('b', 3),('c', 2),('a', 8),('d', 2),('b', 1),('d', 3)]
# 会变为str([('a', 4),('b', 3),('c', 2),('a', 8),('d', 2),('b', 1),('d', 3)]) 字符串

# 读取时需要转换
def parseInput(row):
  row_split = eval(row) # 字符串需要用eval还原
  return row_split
data_key = sc.textFile('/FileStore/tables/tmp.txt').map(parseInput)
data_key.collect()
[out]: [('a', 4),('b', 3),('c', 2),('a', 8),('d', 2),('b', 1),('d', 3)]

6. foreach()函数:和map功能类似,但是是for循环一个接一个做

def f(x): # 这个函数在jupyter中不输出,需要在终端中才能输出
    print(x)
data_key.foreach(f) 

7. collect()函数:rdd转化为python list, 运行时需考虑单机的内存

data_key.collect()
[out]: [('a', 4), ('b', 3), ('c', 2), ('a', 8), ('d', 2), ('b', 1), ('d', 3)]

jupyter 完整代码

未完待续...

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343