- 相较于rdd,在数据挖掘中更常用的数据格式是DataFrame,由于Catalyst优化器的原因,DataFrame在python上并不比scala上慢多少
# 引入必要包
from pyspark.sql import SparkSession
from pyspark.sql import types
spark = SparkSession.builder.master("local").appName("learnsparkdf").enableHiveSupport().getOrCreate()
sc = spark.sparkContext
创建DataFrame
# 使用sc创建df
#方法一:通过json创建
stringJSONRDD = sc.parallelize(["""
{ "id": "123",
"name": "Katie",
"age": 19,
"eyeColor": "brown"
}""",
"""{
"id": "234",
"name": "Michael",
"age": 22,
"eyeColor": "green"
}""",
"""{
"id": "345",
"name": "Simone",
"age": 23,
"eyeColor": "blue"
}"""]
)
df = spark.read.json(stringJSONRDD)
df.show()
[out]:
+---+--------+---+-------+
|age|eyeColor| id| name|
+---+--------+---+-------+
| 19| brown|123| Katie|
| 22| green|234|Michael|
| 23| blue|345| Simone|
+---+--------+---+-------+
# 方法二,通过sc创建,通常要指定列名不然会变成[-1,-2]
list_rdd = sc.parallelize([('TOM', 23), ('JIM', 18), ('BOSE', 50), ('JAME',23), ('JAM')],4)
df2 = spark.createDataFrame(list_rdd)
df2.show()
print(df2.schema) # ***默认int数据类型为LongType
[out]:
+----+---+
| _1| _2|
+----+---+
| TOM| 23|
| JIM| 18|
|BOSE| 50|
|JAME| 23|
+----+---+
StructType(List(StructField(_1,StringType,true),StructField(_2,LongType,true)))
# 需要注意的是整数位LongType,与FloatType和DoubleType不能隐式转换
schema = types.StructType([
types.StructField('Name', types.StringType(), True), # 列名,数据类型,能否为空
types.StructField('Age', types.ShortType(), True),
])
df2 = spark.createDataFrame(list_rdd, schema)
df2.show()
print(df2.schema)
[out]:
+----+---+
|Name|Age|
+----+---+
| TOM| 23|
| JIM| 18|
|BOSE| 50|
|JAME| 23|
+----+---+
StructType(List(StructField(Name,StringType,true),StructField(Age,ShortType,true)))
数据类型总共有一下类型
"DataType", "NullType", "StringType", "BinaryType", "BooleanType", "DateType",
"TimestampType", "DecimalType", "DoubleType", "FloatType", "ByteType", "IntegerType",
"LongType", "ShortType", "ArrayType", "MapType", "StructField", "StructType"
创建临时表
dataframe的操作通常会使用sql语句完成,下面有四个创建表的方法
#df.createGlobalTempView("tempViewName") 创建一个全局临时表,生命周期为程序的生命周期 **使用的时候 global_temp.tempViewName
#df.createOrReplaceGlobalTempView("tempViewName") 创建或者替换一个全局临时表,生命周期为程序的生命周期
#df.createOrReplaceTempView("tempViewName") 创建一个临时表,生命周期为当前SparkSession的生命周期
#df.createTempView("tempViewName") 创建或者替换一个临时表,生命周期为当前SparkSession的生命周期
# 删除临时表
# spark.catalog.dropTempView("tempViewName")
# spark.catalog.dropGlobalTempView("tempViewName")
创建表之后,剩余的操作就和sql基本一样,一般来说sql操作都会返回一个新的dataframe
查看临时表
# 创建临时表,查看信息,
df2.createOrReplaceTempView('df')
spark.sql("select * from df").show() #***注意返回的也是一个dataframe
[out]:
+----+---+
|Name|Age|
+----+---+
| TOM| 23|
| JIM| 18|
|BOSE| 50|
|JAME| 23|
+----+---+
[Row(Name='TOM', Age=23),
Row(Name='JIM', Age=18),
Row(Name='BOSE', Age=50),
Row(Name='JAME', Age=23)]
# 取出数据后
ret= spark.sql("select * from df").collect() # 也可以直接取出来
ret[0]['name'] 类似于字典和列表综合,这两种方法都可以获取元素
ret[0][0] 类似于字典和列表综合,这两种方法都可以获取元素
数据结构
printSchema()函数
输出dataframe schema结构
若不指定dataframe结构,系统会自动推断数据类型
df.printSchema()
[out]:
root
|-- age: long (nullable = true)
|-- eyeColor: string (nullable = true)
|-- id: string (nullable = true)
|-- name: string (nullable = true)
API操作
dataframe API查询
dataframe可以通过take(),show()展示结果
可以使用select filter选择过滤数据,还有很多函数
df2.select("*").filter("age>18").show()
[out]:
+----+---+
|Name|Age|
+----+---+
| TOM| 23|
|BOSE| 50|
|JAME| 23|
+----+---+
保存文件
# 保存文件
# 一般来说可以存储为csv,json,不过更常见的是使用parquet存储
# Parquet仅仅是一种存储格式,它是语言、平台无关的,并且不需要和任何一种数据处理框架绑定,目前能够和Parquet适配的组件包括下面这些,可以看出基本上通常使用的查询引擎和计算框架都已适配,并且可以很方便的将其它序列化工具生成的数据转换成Parquet格式
df.rdd.getNumPartitions() # 获取分区数目
df.write.parquet('/FileStore/tables/testpar', mode = 'overwrite') # 以parquet格式保存数据
df.toPandas().to_csv('/FileStore/tables/testpar2.csv')# 以csv格式保存数据
from sklearn.externals import joblib
joblib.dump(df.toJSON().collect(), '/FileStore/tables/testpar3.json') # 以json格式保存
# 保存后的/FileStore/tables/testpar是一个文件加,有多少个分区数目就有多少个文件***********
df2 = spark.read.parquet('/FileStore/tables/testpar')# 读取parquet格式文件
jupyter 代码