Pyspark实战指南

开始时间: 2019-11-26 预计时间7天。
作者:托马兹[美]

本书常用下载地址:
1.RDD章节文件下载:http://tomdrabas.com/data/VS14MORT.txt.gz
2.代码github地址:https://github.com/drabastomek/learningPySpark
3.pyspark的官方文档:pyspark官方文档
4.RDD算子的官网详细介绍:pyspark.RDD

1. 基本概念和RDD

pyspark中两个重要的概念是RDD和DataFrame,它俩都是分布在集群的节点中的不可变的数据集合(Pyspark还不支持dataset),两者的区别是,Dataframe是以命名列的方式组织数据的(类似pandas),而RDD的每一行都是一个string。

1.1 创建RDD

创建RDD的方式有2种,第一种使用sc.parallelize(),其参数需要是list或者array,而且list或者array中的元素种类可以随意(tuple, dic, list等等都可以)。另一种方式是直接从文件创建,sc.textFile(), 第二个参数代表数据集被划分的分区数。

rdd1 = sc.parallelize([('上海', 14), ('南京', 6), ('合肥', 4), ('郑州', 2), ('重庆', 1), ('天津', 1)])
rdd2 = sc.textFile('/Users/Lekang/his_ord.txt',4)

1.2 转换(transaction)

常见的转换操作:

  1. map(): 该方法应用在每一个RDD元素上,也就是对每一行做转换。
  2. filter(): 在RDD中筛选符合特定条件的数据元素。
  3. flatMap(): 与map类似,但返回的是一个扁平的结果,不是一个列表。
  4. distinct(): 返回去重后的结果,该方法开销很大,只在必要时使用。
  5. sample(False,0.2,789): 返回数据集的随机样本,第一个参数指定采样是否应该替换,第2个参数返回数据的分数(0.2),第3个参数是伪随机数产生器的种子。
user_list = rdd2.map(lambda row:row.split('\t')[0]).take(20)
user_startid = rdd2.map(lambda row:(row.split('\t')[0],row.split('\t')[1])).take(20)
start_gd_data = rdd2.filter(lambda row:row.split('\t')[1].startswith('B'))\
    .map(lambda row:(row.split('\t')[0],row.split('\t')[1])).collect()
# 使用flatMap,take(20)时,返回的并不是20个tuple,而是打横后一共20个(这里tuple每个里包含2个元素,所以是10个tuple的结果)
user_startid_flat = rdd2.flatMap(lambda row:(row.split('\t')[0],row.split('\t')[1])).take(20)
user_list_sample = rdd2.map(lambda row:row.split('\t')[0]).sample(False,0.2,789).collect()

关联操作:

  1. leftOuterJoin()直接关联2个RDD,类似于sql中的left join。
  2. join()内连接2个RDD,类似于sql中的inner join。
  3. intersection()只返回两个RDD相同的数据。
  4. repartition(): 重新对数据集进行分区,参数指定重分区的个数。
rdd4 = sc.parallelize([('a', 1), ('b', 4), ('c',10)])
rdd5 = sc.parallelize([('a', 4), ('a', 1), ('b', '6'), ('d', 15)])

rdd4.leftOuterJoin(rdd5).repartition(2).collect()
rdd4.join(rdd5).collect()
rdd4.intersection(rdd5).collect()

1.3 操作(action)

  1. take():返回RDD中的指定条数的数据集,相比于collect()的全部返回,在应用大数据集时,更有用。
  2. takeSample():随机采样,有3个参数。第一个参数指定采样是否应该替换,第2个参数指定take的条数,第3个参数是伪随机数产生器的种子。
  3. reduce()使用指定的方法来减少RDD中的元素。 上边例子中使用的公式是x+y, 在多分区计算时候需要注意,如果rdd的数据在多个分区,reduce算子会现在每个分区内计算,最后将每个分区结果再应用公式计算。加法是没有问题的,如果应用除法可能会出现意想不到的结果。
  4. reduceByKey() : 在 键 - 键 基础上进行聚合。
  5. saveAsTextFile() :直接将RDD的内容保存到文件中,上边的例子rdd2有4个分区,类似Hadoop MapReduce的保存结果。
  6. foreach()对每一条记录应用一个定义好的函数。
rdd5.take(3)
rdd5.takeSample(False,2,782)
rdd5.map(lambda row:row[1]).reduce(lambda x,y : x+y)
rdd5.reduceByKey(lambda x,y:x+y).collect()

data_reduce = sc.parallelize([1, 2, .5, .1, 5, .2], 1)
data_reduce.reduce(lambda x,y : 1.0*x/y)   #在一个分区内时,能得到预计的结果
data_reduce_2 = sc.parallelize([1, 2, .5, .1, 5, .2], 3)
data_reduce_2.reduce(lambda x,y: 1.0*x/y)  #在3个分区时,返回了未预计的结果

rdd2.count()
rdd2.map(lambda row:row.split('\t')[0]).saveAsTextFile('/Users/Lekang/his_ord_output.txt')

def my_print(x):
    print "user_id:"+ str(x)

rdd2.map(lambda row:row.split('\t')[0]).foreach(my_print)

2.DataFrame

当使用RDD执行pyspark时,需要巨大的开销来执行,RDD转换最初都映射到java中的PythonRDD。Spark sql引擎较快的原因是Catalyst优化器。这个东西生成类似于以往sql的执行计划。
Dataframe和Catalyst优化器的意义在于和非优化的RDD查询比较时增加pyspark查询的性能。
因为,需要使用jupyter的图形化展示,需要安装ipython-sql。如果 Python2 和 Python3 同时有 pip,则使用方法如下:

#Python2:
python2 -m pip install XXX
#Python3:
python3 -m pip install XXX

#安装插件
python2 -m pip install ipython-sql

通过SparkSession创建dataframe,需要先对其初始化:

spark = SparkSession.builder.appName("dataframe_testing!").getOrCreate()

2.1 DataFrame初览

下面例子,通过一个简单的例子说明dataframe的使用规则:

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import *
sc = SparkContext()
spark = SparkSession.builder.appName("dataframe_testing!").getOrCreate()

stringJSONRDD = sc.parallelize((""" 
  { "id": "123",
    "name": "Katie",
    "age": 19,
    "eyeColor": "brown"
  }""",
   """{
    "id": "234",
    "name": "Michael",
    "age": 22,
    "eyeColor": "green"
  }""", 
  """{
    "id": "345",
    "name": "Simone",
    "age": 23,
    "eyeColor": "blue"
  }""")
)
personJSON = spark.read.json(stringJSONRDD)
personJSON.createOrReplaceTempView("person")  #通过dataframe创建临时表,为了使用sql语法
spark.sql("select * from person").show()      #sql查询
personJSON.collect()                          #DataFrame API查询
personJSON.printSchema()                      #打印通过反射方式生成的dataframe的schema。
  1. 可以使用DataFrame API或者SQL来查询。
  2. 展示结果,可以使用show()或者collect(),在RDD部分已经介绍collect(),当数据集小时可以使用。
    对于show(),如果不指定参数就展示10条数据,如果指定参数例如 show(n),那么就展示n条数据。

2.2 从RDD转化dataframe

RDD和Dataframe的最主要差别是schema,RDD没有Schema,如果为RDD指定了schema的话,就转化为了dataframe。

  1. personJSON通过反射方式,spark自动通过数据获取schema。上边代码展示了,自行定义schema的一种方法。
  2. StructType()来定义schema,参数是一个列表,列表没一个元素是StructField对象(列定义)。
  3. StructField()定义时,需要提供3个参数:(1)列名,字符串;(2)列的类型;(3)该列是否可以NULL。
    同样地,查询时,可以使用Dataframe API,也可以使用SQL。
stringCSVRDD = sc.parallelize([(123, 'Katie', 19, 'brown'),
                               (234, 'Michael', 22, 'green'), 
                               (345, 'Simone', 23, 'blue')])
schema = StructType(           #一个列表定义每个列
    [
        StructField("id",LongType(),False),
        StructField("name",StringType(),False),
        StructField("age",IntegerType(),True),
        StructField("eye_color",StringType(),True)
    ]
)
string_df = spark.createDataFrame(stringCSVRDD,schema)     #自定义schema创建dataframe
string_df.createOrReplaceTempView("string_table")
string_sql = spark.sql("select id,name from string_table")
string_df.count()                 # count()函数
string_df.select("id","eye_color").filter("eye_color = 'blue'").show()   # dataframe api
string_df.select(string_df.id,string_df.name).filter(string_df.eye_color == 'green').show()

spark.sql("select id,age,name from string_table where eye_color like 'b%'").show()

2.3 sql查询数据可视化

3. 准备数据建模

首先,了解一下python关于for和if语句的简写的方法,在下边代码中会使用到。
for和if语句的简写

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342

推荐阅读更多精彩内容