]Spark引入DataFrame,
它可以提供high-level functions让Spark更好的处理结构数据的计算。
这让Catalyst optimizer 和Tungsten(钨丝) execution engine自动加速大数据分析。
发布DataFrame之后开发者收到了很多反馈,
其中一个主要的是大家反映缺乏编译时类型安全。
为了解决这个问题,Spark采用新的Dataset API (DataFrame API的类型扩展)。
Dataset API扩展DataFrame API支持静态类型和运行已经存在的Scala或Java语言的用户自定义函数。
对比传统的RDD API,Dataset API提供更好的内存管理,特别是在长任务中有更好的性能提升
#创建DataSet
case class Data(a: Int, b: String)
val ds = Seq(Data(1, "one"), Data(2, "two")).toDS()
ds.collect()
ds.show()
#创建DataSet
case class Person(name: String, zip: Long)
val df = sqlContext.read.json(sc.parallelize("""{"zip": 94709, "name": "Michael"}""" :: Nil))
df.as[Person].collect()
df.as[Person].show()
#DataSet的WordCount
import org.apache.spark.sql.functions._
val ds = sqlContext.read.text("hdfs://node-1.sxt.cn:9000/wc").as[String]
val result = ds.flatMap(_.split(" ")).filter(_ != "").toDF().groupBy($"value").agg(count("*") as "numOccurances").orderBy($"numOccurances" desc)
val wordCount = ds.flatMap(_.split(" ")).filter(_ != "").groupBy(_.toLowerCase()).count()
#创建DataSet
val lines = sqlContext.read.text("hdfs://node-1.sxt.cn:9000/wc").as[String]
#对DataSet进行操作
val words = lines.flatMap(_.split(" ")).filter(_ != "")
#查看DataSet中的内容
words.collect
words.show
#分组求和
val counts = words.groupBy(_.toLowerCase).count()
--------------------------------------------------------------------------------------------------------------
{"name": "UC Berkeley", "yearFounded": 1868, "numStudents": 37581}
{"name": "MIT", "yearFounded": 1860, "numStudents": 11318}
#向hdfs中上传数据:/usr/local/hadoop-2.6.4/bin/hdfs dfs -put schools.json /
#定义case class
case class University(name: String, numStudents: Long, yearFounded: Long)
#创建DataSet
val schools = sqlContext.read.json("hdfs://node-1.sxt.cn:9000/schools.json").as[University]
#操作DataSet
schools.map(sc => s"${sc.name} is ${2015 - sc.yearFounded} years old").show
#JSON -> DataFrame
val df = sqlContext.read.json("hdfs://node-1.sxt.cn:9000/person.json")
df.where($"age" >= 20).show
df.where(col("age") >= 20).show
df.printSchema
#DataFrame -> Dataset
case class Person(age: Long, name: String)
val ds = df.as[Person]
ds.filter(_.age >= 20).show
// Dataset -> DataFrame
val df2 = ds.toDF
import org.apache.spark.sql.types._
df.where($"age" > 0).groupBy((($"age" / 10) cast IntegerType) * 10 as "decade").agg(count("*")).orderBy($"decade").show
ds.filter(_.age > 0).groupBy(p => (p.age / 10) * 10).agg(count("name")).toDF().withColumnRenamed("value", "decade").orderBy("decade") .show
val df = sqlContext.read.json("hdfs://node-1.sxt.cn:9000/student.json")
case class Student(name: String, age: Long, major: String)
val studentDS = df.as[Student]
studentDS.select($"name".as[String], $"age".as[Long]).filter(_._2 > 19).collect()
studentDS.groupBy(_.major).count().collect()
import org.apache.spark.sql.functions._
studentDS.groupBy(_.major).agg(avg($"age").as[Double]).collect()
case class Major(shortName: String, fullName: String)
val majors = Seq(Major("CS", "Computer Science"), Major("Math", "Mathematics")).toDS()
val joined = studentDS.joinWith(majors, $"major" === $"shortName")
joined.map(s => (s._1.name, s._2.fullName)).show()
joined.explain()