Spark SQL(二)DataFrame和DataSet

  • DataSet:
    A Dataset is a distributed collection of data:分布式的数据集(since Spark 1.6)
  • DataFrame:
    A DataFrame is a Dataset organized into named columns:以列(列名、列的类型、列值)的形式构成的分布式数据集,按照列赋予不同的名称,It is conceptually equivalent to a table in a relational database or a data frame in R/Python(概念上等于关系数据库中的表)

DataFrame和DataSet的关系为:DataFrame = Dataset[Row]

DataFrame它不是Spark SQL提出的,而是早起在R、Pandas语言就已经有了的。

怎样得到一个DataFrame呢,Spark 1.x.时使用SQLContext作为entry point:

 val sqlContext = new SQLContext(sc)
 val people = sqlContext.read.format("json").load(path) //peopel就是一个DataFrame

从Spark 2.0开始,使用SparkSession代替了SQLContext作为entry point:

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

val df = spark.read.json("examples/src/main/resources/people.json") //df就是一个DataFrame
df.show()

DataFrame常用操作包括:

df.printSchema() // Print the schema in a tree format
df.select("name").show() // Select only the "name" column
df.select($"name", $"age" + 1).show() // Select everybody, but increment the age by 1
df.filter($"age" > 21).show() // Select people older than 21
df.groupBy("age").count().show() // Count people by age

还可以把DataFrame转换位临时表,达到使用sql语句操作文件的目的:

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

如上Temporary views仅是session-scoped的,session销毁了临时表就不存在了,想要创建可以在多个session中共享的表,以达到当前Spark application停掉时内部创建的临时表仍然有效的目的,可以创建全局临时表:

//Register the DataFrame as a global temporary view
df.createGlobalTempView("people")
// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()

怎样得到一个DataSet呢,如下:

// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface
case class Person(name: String, age: Long)

// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()

// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()

DataFrame和RDD互操作,有两种方式:

  1. Inferring the Schema Using Reflection:即反射,case class 前提:事先需要知道你的字段、字段类型
  2. Programmatically Specifying the Schema:编程,Row 这种代码比较繁琐,如果第一种情况不能满足你的要求(事先不知道列)
    选型:优先考虑第一种
  • Inferring the Schema Using Reflection
case class Person(name: String, age: Long)

// For implicit conversions from RDDs to DataFrames
import spark.implicits._

// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = spark.sparkContext
  .textFile("examples/src/main/resources/people.txt")
  .map(_.split(","))
  .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
  .toDF()
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")
  • Programmatically Specifying the Schema
    这种方式代码繁琐一些,有三部曲:
    1. Create an RDD of Rows from the original RDD;
    2. Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1
    3. Apply the schema to the RDD of Rows via createDataFrame method provided by SparkSession.
import org.apache.spark.sql.types._
// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")
// The schema is encoded in a string
val schemaString = "name age"
// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
  .map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
  .map(_.split(","))
  .map(attributes => Row(attributes(0), attributes(1).trim))
// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)

除了上边官网给的例子,再举一个:

val rdd = spark.sparkContext.textFile("E:/ATempFile/info.txt")
val infoRDD = rdd.map(_.split(",")).map(line=>Row(line(0).toInt,line(1),line(2).toInt))
val structType = StructType(Array(StructField("id",IntegerType,true),
        StructField("name",StringType,true),
        StructField("age",IntegerType,true)))
        
val infoDF = spark.createDataFrame(infoRDD,structType)

很明显,第一种方式代码更加简洁、方便。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342

推荐阅读更多精彩内容