简介
sparkSQL 是为了让开发人员摆脱 自己编写 RDD 原生代码而产生的,只需要写一句 SQL语句或者调用API,进行查询 或实现更复杂的数据分析,使得开发变得更简洁。
Spark SQL 允许开发人员直接处理RDD,同时也可查询Hive、HBase等外部数据源。
从Shark说起
Shark即Hive on Spark,为了实现与Hive兼容,Shark重用了Hive中的HiveQL解析,以近似认为仅从MapReduce作业 替换成了 Spark作业,通过Hive的HiveQL解析,把HiveQL翻译成Spark上的RDD操作。
这样设计导致两个问题:
- 执行计划优化完全依赖于Hive,不方便添加新的优化策略
- Spark是线程级并行,而MapReduce是进程级并行,因此,Spark在兼容Hive的实现上存在线程安全问题。
因此,在2014年的时候,Shark项目中止,并转向Spark SQL的开发。
Spark SQL设计
Spark SQL增加了Schema RDD(即带有Schema信息的RDD,即DataFrame),使用户可以在Spark SQL中执行SQL语句。
数据既可以来自RDD,也可以来自Hive、HDFS、Cassandra等外部数据源,也可以是json数据。
Spark SQL目前支持Scala、Java、Python三种语言,
Spark SQL可以很好地支持SQL查询,
一方面,可以编写Spark应用程序使用SQL语句进行数据查询,
另一方面,也可以使用标准的数据库连接器(比如JDBC)连接Spark进行SQL查询。
DataFrame
DataFrame 是什么?
DataFrame的推出,让Spark具备了处理大规模结构化数据的能力,
Spark能够轻松实现从 MySQL 到 DataFrame的转化,并且支持SQL查询。
与RDD的区别:
RDD是分布式的 Java对象的数据集
RDD[Person]是以Person为类型参数,但是,Person类的内部结构对于RDD而言却是不可知的。
DataFrame是一种以RDD为基础的分布式数据集,也就是分布式的Row对象的集合(每个Row对象代表一行记录),提供了详细的结构信息,也就是我们经常说的模式(schema),
Spark SQL可以清楚地知道该数据集中包含哪些列、每列的名称和类型,
可以方便的通过 Spark SQL 进行查询
Schema是什么?
DataFrame中提供了详细的数据结构信息,从而使得SparkSQL可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么,
DataFrame中的数据结构信息,即为schema。
DataFrame 的创建
从Spark2.0以上版本开始,Spark使用全新的SparkSession接口替代Spark1.6中的SQLContext及HiveContext接口来实现其对数据加载、转换、处理等功能。
SparkSession实现了 SQLContext 及 HiveContext 所有功能。
先创建一个 SparkSession:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().getOrCreate()
然后从json文件中读取
//使支持RDDs转换为DataFrames及后续sql操作
import spark.implicits._
//后面用到的 spark 都是和上面的 spark 是同一个
val df = spark.read.json("file:///zyb/people.json")
json文件的内容为:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
展示 Dataframe 数据:
df.show()
//结果为:
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
看一下 DataFrame 的 Schema
df.printSchema()
//结果为:
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
通过 sql 语句进行查询
//必须注册为临时表才能供下面的查询使用
df.registerTempTable("peopleTempTab")
spark.sql("select name,age from peopleTempTab where age > 20").rdd
常用的 DataFrame操作
// 选择多列,并为 age 的列 ,都加1
df.select(df("name"),df("age")+1).show()
// 条件过滤
df.filter(df("age") > 20 ).show()
// 分组聚合
df.groupBy("age").count().show()
从 RDD 转换为 DataFrame
这里介绍两种转换方式:
1、利用反射机制推断RDD模式
这种方式要先确定类的成员组成,即Schema信息。
先定义一个 case class
case class Person(name: String, age: Long)
把从文件中读的字符串转换为 Person,然后调用 case class 的 toDF() 方法
val peopleDF = spark.sparkContext.textFile("file:///zyb/people.txt").map(_.split(",")).map(att => Person(att(0), att(1))).toDF()
2、使用编程方式定义RDD模式
先 生成一个 schema,然后生成一个 Row,然后调用 spark的 createDataFrame 方法把两者组装起来。
生成一个 RDD:
val studentRDD = spark.sparkContext.parallelize(Array("3 Rongcheng M 26","4 Guanhua M 27")).map(_.split(" "))
创建 schema:
val schema = StructType(List(StructField("id", IntegerType, true),StructField("name", StringType, true), StructField("gender", StringType, true),StructField("age", IntegerType, true)))
创建Row对象,每个Row对象都是rowRDD中的一行:
val rowRDD = studentRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).trim, p(3).toInt))
建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
val studentDF = spark.createDataFrame(rowRDD, schema)
下面 往 MySql 写数据时,也会用到这个 studentDF
相比较而言,还是第一种方式简单。
DataFrame 的两种保存方式
从 json文件中加载数据得到 DataFrame,load方式:
val peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json")
或者 手动指定 格式:
val peopleDF = spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
- 通过 DataFrame 的 write 方法
peopleDF.select("name", "age").write.format("csv").save("file:///usr/local/spark/mycode/newpeople.csv")
- 通过 DataFrame 的 rdd方式
peopleDF.rdd.saveAsTextFile("file:///usr/local/spark/mycode/newpeople.txt")
DataFrame 读写 parquet
Parquet是一种流行的列式存储格式,可以高效地存储具有嵌套字段的记录。
读 parquet 文件,返回的为 DataFrame
spark.read.parquet("file:///zyb/users.parquet")
通过 DataFrame 写成 parquet
peopleDF.write.parquet("file:///usr/local/spark/mycode/newpeople.parquet")
Spark SQL 支持连接到 MySql Server
1、创建一个表
//创建一个数据库 spark
create database spark;
//使用数据库
use spark;
//创建一个表
create table student (id int(4), name char(20), gender char(4), age int(4));
//往表里插入数据
insert into student values(1,"Xueqian",'F',23);
insert into student values(2,"wegfw",'M',24);
//从表中查询
select * from student
2、通过 jdbc 从MySql 中读数据
下载 jdbc 驱动
mysql-connector-java-5.1.40.tar.gz
把 驱动 放到 spark的 jars 目录下
启动 spark-shell,要附加一些参数,告诉jar的位置
./bin/spark-shell \ --jars /usr/local/spark/jars/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar \ --driver-class-path /usr/local/spark/jars/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar
启动 shell 后,执行以下命令连接数据库,读取数据
这里要告诉 MySql的 URL地址,drive是谁,表是哪个,用户和密码。
val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/spark").option("driver","com.mysql.jdbc.Driver").option("dbtable", "student").option("user", "root").option("password", "hadoop").load()
显示 DataFrame
jdbcDF.show()
//结果为:
+---+--------+------+---+
| id| name|gender|age|
+---+--------+------+---+
| 1| Xueqian| F| 23|
| 2|Weiliang| M| 24|
+---+--------+------+---+
3、通过 jdbc 往MySql 中写数据
创建一个 prop变量 用来保存 JDBC连接参数
val prop = new Properties()
prop.put("user", "root") //表示用户名是root
prop.put("password", "hadoop") //表示密码是hadoop
prop.put("driver","com.mysql.jdbc.Driver") //表示驱动程序是com.mysql.jdbc.Driver
连接数据库,采用append模式,表示追加记录到数据库spark的student表中
studentDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/spark", "spark.student", prop)
Spark SQL 支持连接到 Hive
编译Spark 源码,使Spark 支持Hive
Spark官方提供的版本,通常是不包含Hive支持的,需要采用源码编译,编译得到一个包含Hive支持的Spark版本。
编译 支持 Hive 的 Spark 源码步骤:
- 先到这个网址:http://spark.apache.org/downloads.html 下载源码
- 传到Linux,解压:tar -zxvf ./spark-2.1.0.tgz -C /home/hadoop/
- 查看 Hadoop版本,下面要用到。查看命令:hadoop version
- 编译
cd /home/hadoop/spark-2.1.0 ./dev/make-distribution.sh —tgz —name h27hive -Pyarn -Phadoop-2.7 -Dhadoop.version=2.7.1 -Phive -Phive-thriftserver -DskipTests
- -Phadoop-2.7 -Dhadoop.version=2.7.1
指定安装 spark 时的 hadoop 版本,要和电脑上安装的Hadoop的版本对应。 - -Phive -Phive-thriftserver
这两个选项让其支持Hive。 - -DskipTests
能避免测试不通过时发生的错误。 - h27hive
只是我们给编译以后的文件的一个名称,最终编译成功后会得到文件名“spark-2.1.0-bin-h27hive.tgz”
- -Phadoop-2.7 -Dhadoop.version=2.7.1
编译要花几个小时,而且可能还会遇到各种错误,建议直接在网上下载一个编译好的 Spark,提供一个下载地址:https://pan.baidu.com/s/1nv8Y2hj
配置
-
用vim编辑器打开了spark-env.sh文件,在这个文件增加一行内容,重复的就不需要添加:
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath) export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64 export CLASSPATH=$CLASSPATH:/usr/local/hive/lib export SCALA_HOME=/usr/local/scala export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop export HIVE_CONF_DIR=/usr/local/hive/conf export SPARK_CLASSPATH=$SPARK_CLASSPATH:/usr/local/hive/lib/mysql-connector-java-5.1.40-bin.jar
-
为了让Spark能够访问Hive,需要把Hive的配置文件hive-site.xml拷贝到Spark的conf目录下:
cd /usr/local/sparkwithhive/conf cp /usr/local/hive/conf/hive-site.xml .
vim spark-env.sh
创建 Hive 表
我们进入Hive:
cd /usr/local/hive
./bin/hive
新建一个数据库sparktest,并在这个数据库下面创建一个表student,并录入两条数据。
hive> create database if not exists sparktest;//创建数据库sparktest
hive> show databases; //显示一下是否创建出了sparktest数据库
//下面在数据库sparktest中创建一个表student
hive> create table if not exists sparktest.student(
> id int,
> name string,
> gender string,
> age int);
hive> use sparktest; //切换到sparktest
hive> show tables; //显示sparktest数据库下面有哪些表
hive> insert into student values(1,'Xueqian','F',23); //插入一条记录
hive> insert into student values(2,'Weiliang','M',24); //再插入一条记录
hive> select * from student; //显示student表中的记录
读取 Hive 中的数据
Scala> import org.apache.spark.sql.Row
Scala> import org.apache.spark.sql.SparkSession
Scala> case class Record(key: Int, value: String)
// warehouseLocation points to the default location for managed databases and tables
Scala> val warehouseLocation = "spark-warehouse" //"file:${system:user.dir}/spark-warehouse"
Scala> val spark = SparkSession.builder().appName("Spark Hive Example"). \
config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate()
Scala> import spark.implicits._
Scala> import spark.sql
//下面是运行结果
scala> sql("SELECT * FROM sparktest.student").show()
+---+--------+------+---+
| id| name|gender|age|
+---+--------+------+---+
| 1| Xueqian| F| 23|
| 2|Weiliang| M| 24|
+---+--------+------+---+
向 Hive 中插入数据
先创建一个 DataFrame 数据:
scala> import java.util.Properties
scala> import org.apache.spark.sql.types._
scala> import org.apache.spark.sql.Row
//下面我们设置两条数据表示两个学生信息
scala> val studentRDD = spark.sparkContext.parallelize(Array("3 Rongcheng M 26","4 Guanhua M 27")).map(_.split(" "))
//下面要设置模式信息
scala> val schema = StructType(List(StructField("id", IntegerType, true),StructField("name", StringType, true),StructField("gender", StringType, true),StructField("age", IntegerType, true)))
//下面创建Row对象,每个Row对象都是rowRDD中的一行
scala> val rowRDD = studentRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).trim, p(3).toInt))
//建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
scala> val studentDF = spark.createDataFrame(rowRDD, schema)
往 Hive 中插入数据:
scala> studentDF.show()
+---+---------+------+---+
| id| name|gender|age|
+---+---------+------+---+
| 3|Rongcheng| M| 26|
| 4| Guanhua| M| 27|
+---+---------+------+---+
//下面注册临时表:tempTable
scala> studentDF.registerTempTable("tempTable")
//往 Hive 表:sparktest.student 中插入数据
scala> sql("insert into sparktest.student select * from tempTable")
对 SQL 进行查询优化
主要是对要查询的 存储的数据格式 进行优化,尽可能的使用列式格式 存储数据。
什么是列存储呢?
传统的数据库通常以行单位做数据存储,而列式存储(后文均以列存储简称)以列为单位做数据存储,如下:
列存储相比于行存储主要有以下几个优势:
- 数据即索引,查询是可以跳过不符合条件的数据,只读取需要的数据,降低 IO 数据量(行存储没有索引查询时造成大量 IO,建立索引和物化视图代价较大)
- 只读取需要的列,进一步降低 IO 数据量,加速扫描性能(行存储会扫描所有列)
- 由于同一列的数据类型是一样的,可以使用高效的压缩编码来节约存储空间
当然列存储并不是在所有场景都强于行存储,当查询要读取多个列时,行存储一次就能读取多列,而列存储需要读取多次。Spark 原始支持 parquet 和 orc 两个列存储,下文的实践使用 parquet
使用 Parquet 加速 Spark SQL 查询
使用 Parquet 格式的列存储主要带来三个好处:
- 大大节省存储空间
使用行存储占用 44G,将行存储转成 parquet 后仅占用 5.6G,节省了 87.2% 空间,使用 Spark 将数据转成列存储耗时4分钟左右(该值与使用资源相关) - 只读取指定行
select count(distinct f1) from tbInRow/tbInParquet
行存储耗时: 119.7s
列存储耗时: 3.4s
加速 35 倍 - 跳过不符合条件数据
select count(f1) from tbInRow/tbInParquet where f1 > 10000
行存储耗时: 102.8s
列存储耗时: 1.3s
加速 78 倍