Spark SQL 入门指北

1. Overview

本文将介绍 Spark SQL 的基本概念和基本使用,并介绍自定义数据源和 catalyst 的基本概念。通过此文可以对 Spark SQL 建立一个初步了解。

2. Basic

2.1 什么是 Spark SQL?

Spark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed. Internally, Spark SQL uses this extra information to perform extra optimizations.

以上内容摘自官网,这里有两个重点:1. structured data processing 2. extra optimizations 即 Spark SQL 是专门处理结构化数据的一个 Spark 模块,通过额外的 schema 信息使得计算更加便捷,框架对计算也可以进行性能上的优化

2.2 Spark SQL 有哪些接口?

There are several ways to interact with Spark SQL including SQL and the Dataset API.

使用 Spark SQL 有两种方式,一类是通过 SQL ,一类是通过 Dataset API。其中前者更常用一些。

2.3 外部数据源获取 DataFrame/DataSet

DataFrame 可以通过 SparkSession 从已存在的 RDD,Hive 表,或者外部数据源读取。从 RDD 以及从 Hive 表获取 DataFrame 后面进行详细讲解。这里以外部数据源为例获取 DF。首先看一下 Spark 支持哪些外部数据源:

scala> spark.read.
csv   format   jdbc   json   load   option   options   orc   parquet   schema   table   text   textFile

可以看到有 csv,jdbc,json,orc,parquet,text 等。

2.3.1 DataFrame

以 json 为例,people.json 文件内容如下:

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

scala> val df = spark.read.json("examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.printSchema
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

scala> df.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

scala> df.select("name").show()
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

scala> df.createOrReplaceTempView("people")

scala> spark.sql("select name from people").show
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

上面代码演示了 DataFrame 的创建、以及如何使用 SQL 接口和 DataFrame API 的方式进行数据查询。

2.3.2 DataSet

DataSet是具有强类型的数据集合,需要提供对应的类型信息。

scala> case class Person(name: String, age: Long)
defined class Person

scala> val peopleDS = spark.read.json("examples/src/main/resources/people.json").as[Person]
peopleDS: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]

scala> peopleDS.map(_.
age   canEqual   copy   equals   hashCode   name   productArity   productElement   productIterator   productPrefix   toString

上面代码演示了 DataSet 的创建,可以看到只需要在创建 DataFrame 的基础上加上类型信息即可。在 map 时可以获取到对象结构内的成员变量,SQL 操作和 API 接口和 DataFrame 都是一样的,不再演示。

2.4 RDD 获取 DataFrame/DataSet

RDD 的数据来源 people.txt 内容如下:

Michael, 29
Andy, 30
Justin, 19

2.4.1 反射方式

scala> val txtRDD = sc.textFile("/examples/src/main/resources/people.txt")
txtRDD: org.apache.spark.rdd.RDD[String] = /examples/src/main/resources/people.txt 

scala> case class Person(name: String, age: Long)
defined class Person

scala> val peopleDF = txtRDD.map(_.split(",")).map(attrs => Person(attrs(0),attrs(1).trim.toLong)).toDF()
peopleDF: org.apache.spark.sql.DataFrame = [name: string, age: bigint]

scala> peopleDF.show
+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+

2.4.2 编程方式

scala> val txtRDD = sc.textFile("/examples/src/main/resources/people.txt")
txtRDD: org.apache.spark.rdd.RDD[String] = /examples/src/main/resources/people.txt 

scala> val schema = StructType(Array(StructField("name",StringType,nullable = true),StructField("age",LongType,nullable = true)))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,LongType,true))

scala> val rowRDD = txtRDD.map(_.split(",")).map(attrs => Row(attrs(0),attrs(1).trim.toLong))
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row

scala> val peopleDF = spark.createDataFrame(rowRDD,schema)
peopleDF: org.apache.spark.sql.DataFrame = [name: string, age: bigint]

scala> peopleDF.show
+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+

通过如上演示可以看到,反射方式的核心是通过 RDD[CaseClass].toDF()方式实现的,而编程方式的核心是通过 spark.createDataFrame(RDD[Row],schema) 实现的。

2.5 Hive 表获取 DataFrame/DataSet

想使用 hdfs 上的 hive 表,至少需要配置一个文件 hive-site.xml 到 Spark 的 conf 目录下,并在文件中通过spark.sql.warehouse.dir 指明数仓的地址。如果有其他关于 hdfs 的配置,则需要把 core-site.xml、hdfs-site.xml 配置到 conf 目录下。成功配置后直接通过spark.sql(...) 执行 SQL 语句即可。

3. Deep

3.1 自定义外部数据源

Spark 自带了很多外部数据源,如前面章节介绍的 json、csv等等。但有时这些数据格式无法满足我们的需求,此时我们可以通过自定义数据源的方式来解析我们的数据。自定义数据源的本质是使用 RDD 转 DF 中的编程方式,即 spark.createDataFrame(RDD[Row],schema),通过继承抽象类、实现接口来提供所需要的参数。

3.1.1 基本使用

class MyRelation(sqlContext:SQLContext,path:String,schema:StructType=null) extends BaseRelation with TableScan

这里通过继承一个抽象类 BaseRelation 来提供 schema,并通过实现一个接口 TableScan 来提供 RDD[Row]

假如我们要解析的数据格式如下:

19.167.29.40 [2018-03-04 21:10:16] (Android o,Meizu note 7) 三国志 104

抽象类 BaseRelation 中的 schema 方法如下实现:

override def schema: StructType = {
    if(userSchema != null){
      userSchema
    }else{
      StructType(
        Array(
          StructField("ip",StringType),
          StructField("date",StringType),
          StructField("androidVersion",StringType),
          StructField("phoneModel",StringType),
          StructField("gameName",StringType),
          StructField("gameId",IntegerType)
        ))
    }
  }

接口 TableScan 的方法 buildScan 方法如下实现:

override def buildScan(): RDD[Row] = {
    val rdd = sqlContext.sparkContext.textFile(path)
    rdd.map(line => line.split("\t")).map(attrs => Row(
      attrs(0),
      formatTime(attrs(1)),
      attrs(2).split(",")(0).filter(_!='('),
      attrs(2).split(",")(1).filter(_!=')'),
      attrs(3),
      attrs(4).toInt))
  }

此时已经通过两个方法提供了spark.createDataFrame(RDD[Row],schema)所需要的两个参数。最后再通过一个 Provider 关联我们的 MyRelation 即可。

package com.xiaoc024.spark.offline.datasource
class MySource extends RelationProvider {

  override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = {
    val path = parameters.get("path")

    path match {
      case Some(p) => new CTxtRelation(sqlContext, p)
    }
  }
}

最后通过 spark.read.format("com.xiaoc024.spark.offline.datasource.MySource").option("path",path).load()进行使用

3.1.2 进阶使用

上一小节介绍的是自定义外部数据源的基本使用,还有一些进阶的使用方式,比如列裁剪优化、shortName 等等。其中优化查询效率的手段有:

  • 实现PrunedScan 接口实现列裁剪
  • 实现 PrunedFilteredScan 接口实现裁剪+谓词下推

上一小节中使用外部数据源,我们传入的是全类名,如果想使用更加简洁的方式可以让 MySource 实现DataSourceRegister接口,然后实现 shortName 方法。比如 override def shortName(): String = "ctxt" 但是有一点需要注意的是,单单这样做是不够的,我们还需要修改 spark-sql 的源码来注册 MySource,具体位置在 spark-sql jar包中的 META-INF/services/org.apache.spark.sql.sources.DataSourceRegister中,将我们的 MySource 全类名添加到最后一行。此时就可以通过spark.read.format("ctxt").option("path",path).load()来进行使用了。

3.2 Catalyst 简介

Catalyst 是 Spark SQL 中的一种优化器,执行策略主要分为两个大的方向:基于规则优化(RBO)以及基于代价优化(CBO)。工作流程如下图:

这里只对流程做简要的概述,总共分为 5 个步骤:

  • sql/df --parser---> Unresolved Logical Plan

    解析 sql/df 为抽象语法树,不包含表的数据类型等任何信息,类似于建模

  • Unresolved Logical Plan --analyzer--> Resolved Logical Plan

    Analyzer 通过 Catalog 元数据信息遍历抽象语法树为每个结点进行数据类型绑定和函数绑定,生成未优化的逻辑执行计划

  • Resolved Logical Plan --Optimizer(RBO)---> Optimized Logical Plan

    对未优化的逻辑执行计划进一步使用 RBO 优化,如谓词下推(参与 join 的数据量减少)、常量累加(不用算很多次)、列裁剪(减少读取的数据量)生成优化的逻辑执行计划

  • Optimized Logical Plan --Query Planner----> Physical Plans

  • Physical Plans --CBO---> one Physical Plan

4. Ref

  1. http://spark.apache.org/docs/latest/sql-programming-guide.html
  2. https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,457评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,837评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,696评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,183评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,057评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,105评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,520评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,211评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,482评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,574评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,353评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,897评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,489评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,683评论 2 335