开始时间: 2019-11-26 预计时间7天。
作者:托马兹[美]
本书常用下载地址:
1.RDD章节文件下载:http://tomdrabas.com/data/VS14MORT.txt.gz
2.代码github地址:https://github.com/drabastomek/learningPySpark
3.pyspark的官方文档:pyspark官方文档
4.RDD算子的官网详细介绍:pyspark.RDD
1. 基本概念和RDD
pyspark中两个重要的概念是RDD和DataFrame,它俩都是分布在集群的节点中的不可变的数据集合(Pyspark还不支持dataset),两者的区别是,Dataframe是以命名列的方式组织数据的(类似pandas),而RDD的每一行都是一个string。
1.1 创建RDD
创建RDD的方式有2种,第一种使用sc.parallelize(),其参数需要是list或者array,而且list或者array中的元素种类可以随意(tuple, dic, list等等都可以)。另一种方式是直接从文件创建,sc.textFile(), 第二个参数代表数据集被划分的分区数。
rdd1 = sc.parallelize([('上海', 14), ('南京', 6), ('合肥', 4), ('郑州', 2), ('重庆', 1), ('天津', 1)])
rdd2 = sc.textFile('/Users/Lekang/his_ord.txt',4)
1.2 转换(transaction)
常见的转换操作:
- map(): 该方法应用在每一个RDD元素上,也就是对每一行做转换。
- filter(): 在RDD中筛选符合特定条件的数据元素。
- flatMap(): 与map类似,但返回的是一个扁平的结果,不是一个列表。
- distinct(): 返回去重后的结果,该方法开销很大,只在必要时使用。
- sample(False,0.2,789): 返回数据集的随机样本,第一个参数指定采样是否应该替换,第2个参数返回数据的分数(0.2),第3个参数是伪随机数产生器的种子。
user_list = rdd2.map(lambda row:row.split('\t')[0]).take(20)
user_startid = rdd2.map(lambda row:(row.split('\t')[0],row.split('\t')[1])).take(20)
start_gd_data = rdd2.filter(lambda row:row.split('\t')[1].startswith('B'))\
.map(lambda row:(row.split('\t')[0],row.split('\t')[1])).collect()
# 使用flatMap,take(20)时,返回的并不是20个tuple,而是打横后一共20个(这里tuple每个里包含2个元素,所以是10个tuple的结果)
user_startid_flat = rdd2.flatMap(lambda row:(row.split('\t')[0],row.split('\t')[1])).take(20)
user_list_sample = rdd2.map(lambda row:row.split('\t')[0]).sample(False,0.2,789).collect()
关联操作:
- leftOuterJoin()直接关联2个RDD,类似于sql中的left join。
- join()内连接2个RDD,类似于sql中的inner join。
- intersection()只返回两个RDD相同的数据。
- repartition(): 重新对数据集进行分区,参数指定重分区的个数。
rdd4 = sc.parallelize([('a', 1), ('b', 4), ('c',10)])
rdd5 = sc.parallelize([('a', 4), ('a', 1), ('b', '6'), ('d', 15)])
rdd4.leftOuterJoin(rdd5).repartition(2).collect()
rdd4.join(rdd5).collect()
rdd4.intersection(rdd5).collect()
1.3 操作(action)
- take():返回RDD中的指定条数的数据集,相比于collect()的全部返回,在应用大数据集时,更有用。
- takeSample():随机采样,有3个参数。第一个参数指定采样是否应该替换,第2个参数指定take的条数,第3个参数是伪随机数产生器的种子。
- reduce()使用指定的方法来减少RDD中的元素。 上边例子中使用的公式是x+y, 在多分区计算时候需要注意,如果rdd的数据在多个分区,reduce算子会现在每个分区内计算,最后将每个分区结果再应用公式计算。加法是没有问题的,如果应用除法可能会出现意想不到的结果。
- reduceByKey() : 在 键 - 键 基础上进行聚合。
- saveAsTextFile() :直接将RDD的内容保存到文件中,上边的例子rdd2有4个分区,类似Hadoop MapReduce的保存结果。
- foreach()对每一条记录应用一个定义好的函数。
rdd5.take(3)
rdd5.takeSample(False,2,782)
rdd5.map(lambda row:row[1]).reduce(lambda x,y : x+y)
rdd5.reduceByKey(lambda x,y:x+y).collect()
data_reduce = sc.parallelize([1, 2, .5, .1, 5, .2], 1)
data_reduce.reduce(lambda x,y : 1.0*x/y) #在一个分区内时,能得到预计的结果
data_reduce_2 = sc.parallelize([1, 2, .5, .1, 5, .2], 3)
data_reduce_2.reduce(lambda x,y: 1.0*x/y) #在3个分区时,返回了未预计的结果
rdd2.count()
rdd2.map(lambda row:row.split('\t')[0]).saveAsTextFile('/Users/Lekang/his_ord_output.txt')
def my_print(x):
print "user_id:"+ str(x)
rdd2.map(lambda row:row.split('\t')[0]).foreach(my_print)
2.DataFrame
当使用RDD执行pyspark时,需要巨大的开销来执行,RDD转换最初都映射到java中的PythonRDD。Spark sql引擎较快的原因是Catalyst优化器。这个东西生成类似于以往sql的执行计划。
Dataframe和Catalyst优化器的意义在于和非优化的RDD查询比较时增加pyspark查询的性能。
因为,需要使用jupyter的图形化展示,需要安装ipython-sql。如果 Python2 和 Python3 同时有 pip,则使用方法如下:
#Python2:
python2 -m pip install XXX
#Python3:
python3 -m pip install XXX
#安装插件
python2 -m pip install ipython-sql
通过SparkSession创建dataframe,需要先对其初始化:
spark = SparkSession.builder.appName("dataframe_testing!").getOrCreate()
2.1 DataFrame初览
下面例子,通过一个简单的例子说明dataframe的使用规则:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import *
sc = SparkContext()
spark = SparkSession.builder.appName("dataframe_testing!").getOrCreate()
stringJSONRDD = sc.parallelize(("""
{ "id": "123",
"name": "Katie",
"age": 19,
"eyeColor": "brown"
}""",
"""{
"id": "234",
"name": "Michael",
"age": 22,
"eyeColor": "green"
}""",
"""{
"id": "345",
"name": "Simone",
"age": 23,
"eyeColor": "blue"
}""")
)
personJSON = spark.read.json(stringJSONRDD)
personJSON.createOrReplaceTempView("person") #通过dataframe创建临时表,为了使用sql语法
spark.sql("select * from person").show() #sql查询
personJSON.collect() #DataFrame API查询
personJSON.printSchema() #打印通过反射方式生成的dataframe的schema。
- 可以使用DataFrame API或者SQL来查询。
- 展示结果,可以使用show()或者collect(),在RDD部分已经介绍collect(),当数据集小时可以使用。
对于show(),如果不指定参数就展示10条数据,如果指定参数例如 show(n),那么就展示n条数据。
2.2 从RDD转化dataframe
RDD和Dataframe的最主要差别是schema,RDD没有Schema,如果为RDD指定了schema的话,就转化为了dataframe。
- personJSON通过反射方式,spark自动通过数据获取schema。上边代码展示了,自行定义schema的一种方法。
- StructType()来定义schema,参数是一个列表,列表没一个元素是StructField对象(列定义)。
- StructField()定义时,需要提供3个参数:(1)列名,字符串;(2)列的类型;(3)该列是否可以NULL。
同样地,查询时,可以使用Dataframe API,也可以使用SQL。
stringCSVRDD = sc.parallelize([(123, 'Katie', 19, 'brown'),
(234, 'Michael', 22, 'green'),
(345, 'Simone', 23, 'blue')])
schema = StructType( #一个列表定义每个列
[
StructField("id",LongType(),False),
StructField("name",StringType(),False),
StructField("age",IntegerType(),True),
StructField("eye_color",StringType(),True)
]
)
string_df = spark.createDataFrame(stringCSVRDD,schema) #自定义schema创建dataframe
string_df.createOrReplaceTempView("string_table")
string_sql = spark.sql("select id,name from string_table")
string_df.count() # count()函数
string_df.select("id","eye_color").filter("eye_color = 'blue'").show() # dataframe api
string_df.select(string_df.id,string_df.name).filter(string_df.eye_color == 'green').show()
spark.sql("select id,age,name from string_table where eye_color like 'b%'").show()
2.3 sql查询数据可视化
3. 准备数据建模
首先,了解一下python关于for和if语句的简写的方法,在下边代码中会使用到。
for和if语句的简写