untyped操作:观察一下就会发现,实际上基本就涵盖了普通sql语法的全部。
untyped基本操作如下:
select
where
join
group by
agg
实践:在这里改造一下前面讲解的那个统计部门平均薪资和年龄的案例,将所有的untyped算子都应用进去
输入数据:
employee.json:
{"name": "Leo", "age": 25, "depId": 1, "gender": "male", "salary": 20000}
{"name": "Marry", "age": 30, "depId": 2, "gender": "female", "salary": 25000}
{"name": "Jack", "age": 35, "depId": 1, "gender": "male", "salary": 15000}
{"name": "Tom", "age": 42, "depId": 3, "gender": "male", "salary": 18000}
{"name": "Kattie", "age": 21, "depId": 3, "gender": "female", "salary": 21000}
{"name": "Jen", "age": 30, "depId": 2, "gender": "female", "salary": 28000}
{"name": "Jen", "age": 19, "depId": 2, "gender": "female", "salary": 8000}
department:
{"id": 1, "name": "Technical Department"}
{"id": 2, "name": "Financial Department"}
{"id": 3, "name": "HR Department"}
代码:
package com.spark.ds
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.functions._
object UnTypedOperation {
case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Long)
case class Department(id: Long, name: String)
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("UnTypedOperation")
.master("local")
.config("spark.sql.warehouse.dir", "D:/spark-warehouse")
.getOrCreate()
import spark.implicits._
val employee = spark.read.json("inputData/employee.json")
val department = spark.read.json("inputData/department.json")
employee
.where("age > 20")
.join(department,$"depId" === $"id")
.groupBy(department("name"),employee("gender"))
.agg(avg(employee("salary")))
.select( $"name",$"gender",$"avg(salary)")
.show()
}
}
输出结果
+--------------------+------+-----------+
| name|gender|avg(salary)|
+--------------------+------+-----------+
| HR Department|female| 21000.0|
|Technical Department| male| 17500.0|
|Financial Department|female| 26500.0|
| HR Department| male| 18000.0|
+--------------------+------+-----------+