(转)Spark DataFrame 开发指南

DataFrame 是 Spark 在 RDD 之后新推出的一个数据集,从属于 Spark SQL 模块,适用于结构化数据。对于我们这些用惯了数据库表和散列/字典结构的人来说,尤为亲切。

可以直接读取关系型数据库产生 DataFrame:

frompyspark.sqlimportSparkSessionspark = SparkSession \        .builder \        .appName("myapp") \        .config("spark.sql.shuffle.partitions",10) \        .getOrCreate()table ="(select * from users where province='jiangxi') as myusers"df = spark.read \                .format("jdbc") \                .option("driver","com.mysql.jdbc.Driver") \                .option("url","jdbc:mysql://"+dbhost+"/"+dbname) \                .option("dbtable", table) \                .option("user", dbuser) \                .option("password", dbpass) \                .option("partitionColumn", partitionColumn) \                .option("lowerBound", lowerBound) \                .option("upperBound", upperBound) \                .option("numPartitions","4") \                .load()

几个选项说明一下:

url - 数据库的 JDBC 连接串

dbtable - 可以是表名,也可以是一个子查询。如果是子查询的话,必须用括号括起来,并加别名,参见上面代码示例。

user - 数据库用户名

password - 数据库密码

partitionColumn - 用于并发分区的表字段。下面几个选项都是围绕这个字段来的。Spark 会根据分区数量按这个字段的上下限把取出来的数据等分成几份,并行处理。

lowerBound - 字段下限

upperBound - 字段上限

numPartitions - 分区数量

得到的 DataFrame,可以通过字段名来引用列或者某行的值,如:

username = df.nameage = df["age"]

而不像以前用 RDD,老是 split 成元组,然后 0、1、2、3... 地引用,很容易搞错位置。

调用 show 方法可以查看 DataFrame 里的数据:

df.show()

如果数据当中有中文的话,show 方法会报编码错误,需要提前设置一下环境变量:

export PYTHONIOENCODING=utf8

然后,就可以像 RDD 一样,进行各种数据变化、聚合操作了。不过遍查文档,你也找不到 DataFrame 的 map 方法,你需要用 select 和 withColumn 这两个方法来替代。其实回想 DataFrame 在 Saprk SQL 这个模块里,所以它的很多行为都会非常像关系数据库的 SQL 查询。

可以把每个 DataFrame 都想象成一个临时表,select 方法,就是从这个表里选择出一些字段,做一点变换,变成另外一个 DataFrame(另外一个临时表):

df2 = df.select(df['name'], df['age'] +1)df2.show()# +-------+---------+# |  name|(age + 1)|# +-------+---------+# |Michael|    null|# |  Andy|      31|# | Justin|      20|# +-------+---------+

在实际使用中,我们经常会在某个 DataFrame 基础上加上一个变换后的字段,如果用 select 来写,就是这样:

importpyspark.sql.functionsasFdf3 = df2.select("*", F.abs(df2.age).alias("abs_age"))

这里有三个新知识点:一个是在 select 中使用 "*",这样可以避免我们把 df2 的所有字段重新敲一遍。然后我们新增了一个字段,是 age 的绝对值。我们这里用到了 abs 函数,它来自 pyspark.sql.functions,是 spark 内置的众多变换函数之一。pyspark.sql.functions 里提供了相当多的变换函数,可以在文档里查到。这些变换函数结合加减乘除这些运算,基本上可以做完成任何变换了。实在搞不定的,可以用我们后面讲的 UDF(用户自定义函数)。第三点是用到了 alias 函数,来为新变换出的字段命名,类似 SQL 语句中的 "as"。下图是内置函数文档目录:

内置函数文档目录

上面这段代码使用 withColumn 会更加简洁,withColumn 就是在一个 DataFrame 的基础上增加或替换一个字段:

importpyspark.sql.functionsasFdf3 = df2.withColumn("abs_age", F.abs(df2.age))

然后有些字段变换成其他字段以后就没用了,可以 drop 掉它:

df4 = df.drop("name")df5 = df.select("*", df.age+1).drop(df.age)

我们到哪了?我们还是在说 RDD 的 map 方法,可以用 DataFrame 的 select、withColumn、alias、drop、加减乘除、pyspark.sql.functions 组合来替代。看似是用很多东西才替代了 RDD 的 map,但是实际开发的时候会很省事,省去了在 map 函数中每次都要把需要的字段逐一返回了,特别是有时候只是把某些字段在 pair 的 key 和 value 间移动。后面还会看到,DataFrame 可以直接对任意字段做 groupBy,而不用先移动摆好 pair 的 key 和 value。

如果你需要做的变换实在太复杂,无法用加减乘除和 pyspark.sql.functions 来搞定,那么 DataFrame 也支持自定义函数:udf。udf 本身是一个普通的内置函数,可以用它来包装普通的 python 函数,生成 select 和 withColumn 支持的变换函数:

importpyspark.sql.functionsasFimportpyspark.sql.typesasTdefdo_something(col1, col2, col3):returncol1*col1 + col2 / col3udf_dosth = F.udf(do_something, T.IntegerType())df6 = df.select(df.name, udf_dosth(df.age, df.born_year, df.children_count))

好了,到此可以先松口气,休息一下了。filter 方法 DataFrame 直接支持,不需要寻找替代品:

df.filter(df['age'] >21).show()# +---+----+# |age|name|# +---+----+# | 30|Andy|# +---+----+

DataFrame 可以很方便地做 join、groupBy 等操作,就像在写 SQL:

people = sqlContext.read.parquet("...")department = sqlContext.read.parquet("...")people.filter(people.age >30).join(department, people.deptId == department.id) \  .groupBy(department.name,"gender").agg({"salary":"avg","age":"max"})

其实可以真的把 DataFrame 注册成临时表,然后真的写 SQL:

df.createOrReplaceTempView("people")sqlDF = spark.sql("SELECT * FROM people")sqlDF.show()# +----+-------+# | age|  name|# +----+-------+# |null|Michael|# |  30|  Andy|# |  19| Justin|# +----+-------+

agg 这个方法是用来对 groupBy 之后的数据集做聚合操作的(对应 RDD 的 reduceByKey 方法)。它支持 avg、max、min、sum、count 等操作。但是这几个操作在实际使用中是远远不够用的,这时候我们还是需要自定义函数的。这种自定义函数叫做 UDAF( User Defined Aggregate Function)。UDAF 只在 Spark 的 scala 和 Java 中支持,pyspark并不支持。在 Scala 中,你需要重载 UserDefinedAggregateFunction 这个类即可。本文就不具体展示了,留待我稍后一篇专门介绍 Scala Spark 的文章里细说。

DataFrame API 参考:http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame

pyspark.sql.functions 参考:

http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions

作者:许伦

链接:https://www.jianshu.com/p/b1398f9f5a06

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,053评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,527评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,779评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,685评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,699评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,609评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,989评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,654评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,890评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,634评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,716评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,394评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,976评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,950评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,191评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,849评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,458评论 2 342

推荐阅读更多精彩内容