Spark SQL解决了什么问题
这个之前,先说下Hive,Hive有自己的语言Hive SQL(HQL),利用sql语句查询,然后走的是MapReduce程序,提交到集群上运行.这样的话有个很大的优势,那就是它相比MapReduce节省了很多的代码,很多..
但是也有个致命的缺陷,那就是MapReduce.(后面我想写一篇MapReduce从仰望到失望,从失望到绝望...) 前面也说过,MR相比Spark的RDD,性能速度正如官方所说的,有百倍之差.. 既然Spark这么强,那为何不出一个Spark SQL直接对应Hive呢,底层走的是Spark呢? 于是就有了Spark SQL.它将Spark SQL转换成RDD,然后提交到集群执行,执行效率也是MR和spark的差距。
Spark sql宏观了解
Spark sql是Spark的一个组件,Spark sql自己也有两个组件:DataFrame / DataSet.
Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用.
从上图可以看出,SparkSQL可以看做是一个转换层,向下对接各种不同的结构化数据源,向上提供不同的数据访问方式。
DataFrame / DataSet / RDD的关系
之前我们讲,RDD是Spark的基石,因为其他的spark框架都是运行在Spark core上的.但是在我们Spark sql里面,就有点区别了.
在Spark sql中,DataSet是核心,没有之一.但是DataSet仅限于Spark sql中,不能在其他框架中使用,所以RDD依旧还是spark的基石,依旧是核心.而DataFrame已经被DataSet替换了,DataFrame能实现的功能,DataSet都能实现,相反,DataFrame却不能.
三者的关系如下:
RDD + schema(数据的结构信息) = DataFrame = DataSet[Row]
RDD 0.x版本发布, DataFrame1.3版本发布, DataSet1.6版本发布.
RDD: 提供了很多实用简单的API, www.jianshu.com/p/a3a64f51ddf4 ,这是我之前写的RDD的
DataFrame: DataFrame可以理解为一个传统数据库的二维表格,除了数据以外,还记录着数据的结构信息,即schema.DataFrame API提供的是一套高层的关系操作,比函数式的RDD API要更加友好
DataSet: DataSet[Row] = Dataframe ; 它是Dataframe API的一个扩展,是spark最新的数据抽象,也是Spark SQL中最核心的组件,它基本代替了Dataframe,也有完全代替Dataframe的趋势.
注: RDD不支持spark sql的操作
RDD / DataSet / Dataframe之间的转换
上面说到RDD不支持Spark sql的操作,但是Spark生态圈只提供了Spark core一个计算框架,且Spark生态圈都是基于Spark core进行计算的,所以Spark core对接Spark sql的方式就是:将RDD转换为DataSet / Dataframe,且三者之间支持互相转换!
转换之前先聊一下DataFrame支持两种查询方式:一种是DSL风格,另外一种是SQL风格,dataFrame支持两者查询风格
DSL: 你需要引入 import spark.implicit._ 这个隐式转换,可以将DataFrame隐式转换成RDD。
SQL: 你需要将DataFrame注册成一张表格,且需要通过sparkSession.sql 方法来运行你的SQL语句
用Spark-shell来操作Spark SQL,spark作为SparkSession的变量名,sc作为SparkContext的变量名.
将文件中的数据转换成DataSet //先case一个类 case class person(name:String,age:Int) //将文件里面的数据转换成DataSet val peopleDF2 =spark.sparkContext.textFile("../examples/src/main/resources/people.txt").map(_.split(",")).map(para=> Person(para(0).trim,para(1).trim.toInt)).toDF //制成一张person的表 peopleDF2.createOrReplaceTempView("persons") //查询 val teen =spark.sql("select * from persons where age between 13 and 30") //因为DataFrame=DataSet[row],所以要访问row对象中的每一个元素,可以通过这种row[0]角标的方式来访问,上面是通过反射获取schema teen.map(row => "name:" + row(0)).show
//将RDD转换成DataFrame var peopleRDD = sc.textFile("../examples/src/main/resources/people.txt") peopleRDD.collect //以”,”切割,得到一个Array,然后再用map对里面的每一个元素都进行转换,最后用toDF方法给这两个起名字 val peopleDF =peopleRDD.map(_.split(",")).map(para=>(para(0).trim(),para(1).trim().toInt)).toDF("name","age")
//将 DataFrame转换成RDD val aa = peopleDF.rdd //对,没错,就是一行!一个方法搞定
//将RDD转换成DataSet case class person(name:String,age:Int) //先定义一个case实例,最后是直接toDS就ok了 val peopleDF= peopleRDD.map(_.split(",")).map(para=>person(para(0).trim(),para(1).trim().toInt)).toDS
//将DataSet转换成RDD val bb = peopleDF.rdd //和上面一样,一个方法搞定
//将DataFrame转换成DataSet peopleDF.as[person] as那个case的类
将DataSet转换成DataFrame PeopleDS.toDF 用的toDF方法
Spark SQL链接Hive
Spark SQL和Hive的连接有两种,一种是Spark内置的Hive,一种是Spark连接外部的Hive.
内置Hive: 内置Hive和Spark会完美地兼容,但是我用的都是外置的Hive
外置Hive: 这是Spark连接Hive的主要模式; 实现方式:
1. 需要将hive-site.xml 拷贝到spark的conf目录下。
2. 如果hive的metestore使用的是mysql数据库,那么需要将mysql的jdbc驱动包放到spark的jars目录下。
3. 用bin/spark-shell打开时候,第一次需要在后面加--confspark.sql.warehouse.dir=hdfs:hadoop101:9000/spark_warehouse