应用背景
欢迎各位访问链接中原创博客
Spark中行列转换,即数据的透视。
以上图为例,进行如下定义:
- 从左边这种变成右边这种,叫透视(pivot)
- 反之叫逆透视(unpivot)
Spark实现
python、scala、java均可以进行实现
构造样本数据
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('JupyterPySpark').enableHiveSupport().getOrCreate()
import pyspark.sql.functions as F
# 原始数据
df = spark.createDataFrame([('2018-01','项目1',100), ('2018-01','项目2',200), ('2018-01','项目3',300),
('2018-02','项目1',1000), ('2018-02','项目2',2000), ('2018-03','项目x',999)
], ['年月','项目','收入'])
对spark dataframe进行show格式查看
df.show()
'''
+-------+---+----+
| 年月| 项目| 收入|
+-------+---+----+
|2018-01|项目1| 100|
|2018-01|项目2| 200|
|2018-01|项目3| 300|
|2018-02|项目1|1000|
|2018-02|项目2|2000|
|2018-03|项目x| 999|
+-------+---+----+
'''
透视pivot
透视操作简单直接,逻辑如下
- 按照不需要转换的字段分组,本例中是年月;
- 使用pivot函数进行透视,透视过程中可以提供第二个参数来明确指定使用哪些数据项,(可以指定不再DataFrame中schema的字段);
- 汇总数字字段,本例中是收入;
- pivot 只能跟在groupby之后
具体代码如下:
df_pivot = df.groupBy('年月') \
.pivot('项目', ['项目1', '项目2', '项目3', '项目x', 'weizhi']) \
.agg(F.sum('收入')) \
.fillna(0)
print("============df_pivot===================")
df_pivot.show()
'''
+-------+----+----+---+---+------+
| 年月| 项目1| 项目2|项目3|项目x|weizhi|
+-------+----+----+---+---+------+
|2018-03| 0| 0| 0|999| 0|
|2018-02|1000|2000| 0| 0| 0|
|2018-01| 100| 200|300| 0| 0|
+-------+----+----+---+---+------+
'''
逆透视Unpivot
- Spark没有提供内置函数来实现unpivot操作,不过我们可以使用Spark SQL提供的stack函数来间接实现需求。有几点需要特别注意:
- 使用selectExpr在Spark中执行SQL片段;
- 如果字段名称有中文,要使用反引号` 把字段包起来;
具体代码如下:
# 逆透视Unpivot
unpivot_df = df_pivot.selectExpr("`年月`",
"stack(4, '项目1', `项目1`,'项目2', `项目2`, '项目3', `项目3`, '项目x', `项目x`) as (`项目`,`收入`)") \
.filter("`收入` > 0 ") \
.orderBy(["`年月`", "`项目`"]) \
unpivot_df.show()
'''
+-------+---+----+
| 年月| 项目| 收入|
+-------+---+----+
|2018-01|项目1| 100|
|2018-01|项目2| 200|
|2018-01|项目3| 300|
|2018-02|项目1|1000|
|2018-02|项目2|2000|
|2018-03|项目x| 999|
+-------+---+----+
'''
这个函数功能在实际的开发过程中还是很需要进行使用的,练习一下。