1、使用场景
随着业务及数据量的增长,数据库中的数据大致可以分为两类,一类为操作型数据,另一类为分析型数据。其中,操作型数据通常与日常业务紧密相关且可进行增删改查,而分析型数据通常为历史数据,用于统计分析,仅能查询不可增删改。此外,分析型数据有时需要对业务数据进行数据清洗得到。因此,可以将分析型数据导入数据仓库hive中,spark再定时从hive中取出数据进行分析。以城市空气质量预测为例,空气监测点分布在城市中的各个地方,定时地将数据上传至平台中,为了对城市空气质量进行预测,需定期将城市中各监测点的小时数据取平均值后存入hive中,spark再定期从hive中取出数据进行预测分析。
2、spark存入hive
spark存入hive表有两种方式,一种调用方式DF.write.saveAsTable,另一种方式调用hiveContext.sql将数据导入hive中。首先,spark从数据库中读取原始数据并进行数据清洗,求出城市中所有点的平均值代码如下:
mpInfoList = spark.read.format("com.mongodb.spark.sql.DefaultSource") \
.option("spark.mongodb.input.uri", MONITOR_POINT_INFO_URL) \
.option("pipeline", matchCity) \
.load().select("ID").rdd.map(lambda x: x.ID).collect()
print(mpInfoList)
airQualityData = spark.read.format("com.mongodb.spark.sql.DefaultSource")\
.option("spark.mongodb.input.uri", INPUT_URL)\
.load()
airQualityData = airQualityData.filter(airQualityData.NodeIdentifier.isin(mpInfoList))
airQualityData = airQualityData.groupBy('ComponentTime')\
.avg('pm25', 'temp', 'press', 'humi', 'wind_speed', 'wind_dir')\
.orderBy(airQualityData.ComponentTime)\
.withColumnRenamed('avg(pm25)', 'pm25')\
.withColumnRenamed('avg(temp)', 'temp')\
.withColumnRenamed('avg(press)', 'press')\
.withColumnRenamed('avg(humi)', 'humi')\
.withColumnRenamed('avg(wind_speed)', 'wind_speed')\
.withColumnRenamed('avg(wind_dir)', 'wind_dir')
随后,将清洗后的数据存入hive中,代码如下:
data.write.saveAsTable("test.airData", None, "overwrite", None)
3、spark从hive中读取数据
调用sparkSession.sql从hive中读取数据,代码如下:
data = spark.sql("select * from test.airData")
data1 = data.orderBy(data.ComponentTime)