本案例出自于厦门大学数据库实验室,原采用的方法是PySpark, 在此基础之上,我们通过spark-sql、zeppelin及可视化的方式加以改进。
一 数据集说明
数据集来自数据网站Kaggle的美国新冠肺炎疫情数据集,该数据集以数据表us-counties.csv组织,其中包含了美国发现首例新冠肺炎确诊病例至今(2020-05-19)的相关数据。数据字典如下:
字段名称 字段含义 例子
date 日期 2020/1/21;2020/1/22;etc
county 区县(州的下一级单位) Snohomish;
state 州 Washington
cases 截止该日期该区县的累计确诊人数 1,2,3…
deaths 截止该日期该区县的累计死亡人数 1,2,3…
二 使用Zeppelin对数据进行分析
1 导入数据并注册为临时表
把数据存放到HDFS文件系统中,用到的方法是:
./bin/hdfs dfs -put /home/hadoop/us-counties.txt /user/hadoop
这里,我们可以把文件放到本地(本地指Linux)或者上传到hdfs皆可,为了表现不同,我们从本地直接导入文件,代码如下:
import org.apache.spark.sql.types._
## 定义schema
val fields = Array(StructField("date", StringType,true),
StructField("county", StringType,true),
StructField("state", StringType,true),
StructField("cases", IntegerType,true),
StructField("deaths", IntegerType,true))
val schema = StructType(fields)
val file = ""file:///home..." ## 本地使用file:///
val df = spark.read.option("header", "true").schema(schema).csv(file)
df.show(5)
# 注册为临时表
df.createOrReplaceTempView("usinfo")
2 计算每日的累计确诊病例数和死亡数
我们这里直接运用了spark-sql的方法,代码如下:
import org.apache.spark.sql.functions._
%sql
select date, sum(cases) as `累积确诊`, sum(deaths) as `累积死亡`
from usinfo
group by date
order by date asc
结果导入HDFS数据库存储,方法如下:
val df1 = result1.withColumnRenamed("sum(cases)","cases").withColumnRenamed("sum(deaths)","deaths")
df1.printSchema
df1.repartition(1).write.json("file:///.../result1.json")
具体结果如下图:
3 计算每日较昨日的新增确诊病例数和死亡病例数
这里注意,我们需要把上面的结果再注册一下临时表,方便下面使用spark-sql,在sql中我们使用了自连接的方法,最后查询的结果保存了到HDFS上,如下:
## 注册为临时表供下一步使用
df1.createOrReplaceTempView("ustotal")
## sql自连接
val df2 = spark.sql("""
select t1.date,t1.cases-t2.cases as caseIncrease,t1.deaths-t2.deaths as deathIncrease
from ustotal t1,ustotal t2
where t1.date = date_add(t2.date,1)
""")
# 再保存到HDFS
df2.repartition(1).write.json("file:///.../result2.json"
结果如下图:
4 统计截止5.19日 美国各州的累计确诊人数和死亡人数
操作类似:
val df3 = spark.sql("""
select date,state,sum(cases) as totalCases,sum(deaths) as totalDeaths,round(sum(deaths)/sum(cases),4) as deathRate
from usinfo
where date = '2020-05-19'
group by date,state
""")
# 再保存到HDFS
df3.repartition(1).write.json("file:///.../result3.json"
展示如下:
5 统计截止5.19全美各州的病死率
val df4 = spark.sql("""
select 1 as sign,date,'USA' as state,round(sum(totalDeaths)/sum(totalCases),4) as deathRate
from eachStateInfo
group by date
union
select 2 as sign,date,state,deathRate from eachStateInfo
""")
# 再保存到HDFS
df4.repartition(1).write.json("file:///.../result4.json"
展示结果: