Spark SQL和DataFrames重要的类有:
- pyspark.sql.SQLContext: DataFrame和SQL方法的主入口
- pyspark.sql.DataFrame: 将分布式数据集分组到指定列名的数据框中
- pyspark.sql.Column :DataFrame中的列
- pyspark.sql.Row: DataFrame数据的行
- pyspark.sql.HiveContext: 访问Hive数据的主入口
- pyspark.sql.GroupedData: 由DataFrame.groupBy()创建的聚合方法集
- pyspark.sql.DataFrameNaFunctions: 处理丢失数据(空数据)的方法
- pyspark.sql.DataFrameStatFunctions: 统计功能的方法
-pyspark.sql.functions DataFrame:可用的内置函数 - pyspark.sql.types: 可用的数据类型列表
- pyspark.sql.Window: 用于处理窗口函数
11.pyspark.sqlDataFrameWriter:用于将[[DataFrame]]写入外部存储系统(例如文件系统,键值存储等)的接口。使用DataFrame.write()来访问这个。
11.1.format(source):指定输入数据源格式。
1.source:string,数据源名称,例如:'json','parquet'
>>> df.write.format('json').save(os.path.join(tempfile.mkdtemp(), 'data'))
11.2.jdbc(url,table,model=None,properties=None):通过JDBC将DataFrame的内容保存到外部数据库表中。
注:不要在大型集群上并行创建踏跺分区,否则spark可能会使外部数据系统崩溃
● url – 一个形式为jdbc:subprotocol:subname的JDBC URL
● table – 外部数据库中表的名称。
● mode – 指定数据已经存在时保存操作的行为:
● append: 将此DataFrame的内容附加到现有数据。
● overwrite: 覆盖现有数据。
● ignore: 如果数据已经存在,静默地忽略这个操作。
● error (默认): 如果数据已经存在,则抛出异常。
● properties – JDBC数据库连接参数,任意字符串标签/值的列表。 通常至少应该包括一个“用户”和“密码”属性。
11.3.json(path,schma=None):以指定的路径以JSON格式保存DataFrame的内容。
● path – 任何Hadoop支持的文件系统中的路径。
● mode –指定数据已经存在时保存操作的行为。
● append: 将此DataFrame的内容附加到现有数据。
● overwrite: 覆盖现有数据。
● ignore: 如果数据已经存在,静默地忽略这个操作。
● error (默认): 如果数据已经存在,则抛出异常。
>>> l=[('Alice',2),('Bob',5)]
>>> df = sqlContext.createDataFrame(l,['name','age'])
>>> df.write.json('file:///data/dfjson')
[root@slave1 dfjson]# ll
total 8
-rw-r--r-- 1 root root 0 Nov 24 12:08 part-r-00000-edbd9c5e-87b2-41f4-81ba-cd59c8ca490e
-rw-r--r-- 1 root root 25 Nov 24 12:08 part-r-00001-edbd9c5e-87b2-41f4-81ba-cd59c8ca490e
-rw-r--r-- 1 root root 0 Nov 24 12:08 part-r-00002-edbd9c5e-87b2-41f4-81ba-cd59c8ca490e
-rw-r--r-- 1 root root 23 Nov 24 12:08 part-r-00003-edbd9c5e-87b2-41f4-81ba-cd59c8ca490e
-rw-r--r-- 1 root root 0 Nov 24 12:08 _SUCCESS
[root@slave1 dfjson.json]# cat part*
{"name":"Alice","age":2}
{"name":"Bob","age":5}
11.4.mode(saveMode):指定数据或表已经存在的行为。
saveMode选项包括:
append: 将此DataFrame的内容附加到现有数据。
overwrite: 覆盖现有数据。
error: 如果数据已经存在,则抛出异常。
ignore: 如果数据已经存在,静默地忽略这个操作。
>>> df.write.mode('append').parquet(os.path.join(tempfile.mkdtemp(), 'data'))
11.5.option(key,value): 添加一个底层数据源的输出选项。
11.6.options(**options):添加底层数据源的多个输出选项。
11.7.orc(path, mode=None, partitionBy=None):以指定的路径以ORC格式保存DataFrame的内容。
注:目前ORC支持只能与HiveContext一起使用。
● path – 任何Hadoop支持的文件系统中的路径。
● mode –指定数据已经存在时保存操作的行为:
append: 将此DataFrame的内容附加到现有数据。
overwrite: 覆盖现有数据。
ignore: 如果数据已经存在,静默地忽略这个操作。
error (默认): 如果数据已经存在,则抛出异常。
● partitionBy – 分区列的名称。
>>> orc_df = hiveContext.read.orc('python/test_support/sql/orc_partitioned')
>>> orc_df.write.orc(os.path.join(tempfile.mkdtemp(), 'data'))
11.8.parquet(path, mode=None, partitionBy=None)
将DataFrame的内容以Parquet格式保存在指定的路径中。
参数:● path – 任何Hadoop支持的文件系统中的路径。
● mode – 指定数据已经存在时保存操作的行为。
append: 将此DataFrame的内容附加到现有数据。
overwrite: 覆盖现有数据。
ignore: 如果数据已经存在,静默地忽略这个操作。
error (默认): 如果数据已经存在,则抛出异常。
● partitionBy – 分区列的名称。
>>> df.write.parquet("file:///data/dfparquet")
[root@slave1 dfparquet]# ll
total 24
-rw-r--r-- 1 root root 285 Nov 24 12:23 _common_metadata
-rw-r--r-- 1 root root 750 Nov 24 12:23 _metadata
-rw-r--r-- 1 root root 285 Nov 24 12:23 part-r-00000-36364710-b925-4a3a-bd11-b295b6bd7c2e.gz.parquet
-rw-r--r-- 1 root root 534 Nov 24 12:23 part-r-00001-36364710-b925-4a3a-bd11-b295b6bd7c2e.gz.parquet
-rw-r--r-- 1 root root 285 Nov 24 12:23 part-r-00002-36364710-b925-4a3a-bd11-b295b6bd7c2e.gz.parquet
-rw-r--r-- 1 root root 523 Nov 24 12:23 part-r-00003-36364710-b925-4a3a-bd11-b295b6bd7c2e.gz.parquet
-rw-r--r-- 1 root root 0 Nov 24 12:23 _SUCCESS
11.09.insertInto(tableName, overwrite=False):将DataFrame的内容插入到指定的表中。它要求DataFrame类的架构与表的架构相同。可以覆盖任何现有的数据。
11.10.partitionBy(*cols):按文件系统上的给定列对输出进行分区。如果指定,则输出将在文件系统上进行布局,类似于Hive的分区方案。
>>> df.write.partitionBy('year', 'month').parquet(os.path.join(tempfile.mkdtemp(), 'data'))
11.11.save(path=None, format=None, mode=None, partitionBy=None, **options):将DataFrame的内容保存到数据源。数据源由format和一组options指定。 如果未指定format,则将使用由spark.sql.sources.default配置的缺省数据源。
● path – Hadoop支持的文件系统中的路径。
● format – 用于保存的格式。
● mode – 指定数据已经存在时保存操作的行为。
append: 将此DataFrame的内容附加到现有数据。
overwrite: 覆盖现有数据。
ignore: 如果数据已经存在,静默地忽略这个操作。
error (默认): 如果数据已经存在,则抛出异常。
● partitionBy – 分区列的名称。
● options – all other string options
>>> l=[('Alice',2),('Bob',5)]
>>> df = sqlContext.createDataFrame(l,['name','age'])
>>> df.write.mode('append').save("file:///data/dfsave")
11.12.saveAsTable(name, format=None, mode=None, partitionBy=None, **options):将DataFrame的内容保存为指定的表格。在表已经存在的情况下,这个函数的行为依赖于由mode函数指定的保存模式(默认为抛出异常)。 当模式为覆盖时,[[DataFrame]]的模式不需要与现有表的模式相同。
append: 将此DataFrame的内容附加到现有数据。
overwrite: 覆盖现有数据。
error: 如果数据已经存在,则抛出异常。
ignore: 如果数据已经存在,静默地忽略这个操作。
参数:● name – 表名
● format – 用于保存的格式
● mode – 追加,覆盖,错误,忽略之一(默认:错误)
● partitionBy – 分区列的名称
● options – 所有其他字符串选项