Spark是什么?
Spark是一个分布式计算平台(Hadoop是一个包含分布式计算、存储、管理的大数据生态系统),它对标的是Hadoop的MapReduce,那么与Hadoop的MapReduce相比,它具有什么优势呢?
- 更快
- 更容易使用
除了Java之外,提供了Scala、Python、R的API; - 好用的库
基于Spark Core提供了Spark SQL、Spark Streaming、MLib、GraphX等库; - 运行方便
能够在生产环境下借助Spark生态,也可以在本地测试时独立运行。
Spark架构
Spark的架构图如上,其核心是Spark Core,可以从HDFS、HBase获取数据;它的运行方式可以是本地运行、独立运行、借助Mesos、YARN等运行;基于Spark Core,提供了Spark SQL(类似Hive)、Spark Streaming(类似Storm)、MLib、GraphX。
Spark的运行
1.四种运行模式
Spark的运行模式主要有Local,Standalone,yarn,mesos等,其中Local主要用于本地调试,而yarn是最常见的生产模式。
2.如何运行
Spark的bin目录下,有若干个启动脚本:
pyspark:python交互式接口
spark-class:java交互式接口
spark-shell:scala交互式接口
spark-submit:提交文件执行
Spark关键概念——RDD
1.RDD(Resilient Distributed Dataset) 弹性分布数据集概念
它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。
RDD特点如下:
(1)一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。
(2)一个计算每个分区的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。
(3)RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。
(4)一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
(5)一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。
2. 生成RDD
由外部存储系统的数据集创建,包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等。
测试时,也可以由数组等生产RDD。
3. RDD算子
RDD包含两大类算子:
- 转化操作(Transformation),从已有数据集转化出新的数据集;
常见的有:map、flatMap、filter、distinct、union、intersection、substract、cartesian、collect; - 启动操作(Action),对数据集进行计算,并返回结果;
常见的有:collect、count、countByValue、reduce、foreach。
Transformation都是惰性求值的,也就是说它不会马上进行计算,仅仅是计算转换操作,当遇到Action操作时,才会触发计算。
4. RDD的宽依赖和窄依赖
由于RDD是粗粒度的操作数据集,每个Transformation操作都会生成一个新的RDD,所以RDD之间就会形成类似流水线的前后依赖关系;RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。如图所示显示了RDD之间的依赖关系。
从图中可知:
窄依赖:是指每个父RDD的一个Partition最多被子RDD的一个Partition所使用,例如map、filter、union等操作都会产生窄依赖;(独生子女)
宽依赖:是指一个父RDD的Partition会被多个子RDD的Partition所使用,例如groupByKey、reduceByKey、sortByKey等操作都会产生宽依赖;(超生)
在这里我们是从父RDD的partition被使用的个数来定义窄依赖和宽依赖,因此可以用一句话概括下:如果父RDD的一个Partition被子RDD的一个Partition所使用就是窄依赖,否则的话就是宽依赖。因为是确定的partition数量的依赖关系,所以RDD之间的依赖关系就是窄依赖;由此我们可以得出一个推论:即窄依赖不仅包含一对一的窄依赖,还包含一对固定个数的窄依赖。
一对固定个数的窄依赖的理解:即子RDD的partition对父RDD依赖的Partition的数量不会随着RDD数据规模的改变而改变;换句话说,无论是有100T的数据量还是1P的数据量,在窄依赖中,子RDD所依赖的父RDD的partition的个数是确定的,而宽依赖是shuffle级别的,数据量越大,那么子RDD所依赖的父RDD的个数就越多,从而子RDD所依赖的父RDD的partition的个数也会变得越来越多。
Shuffle和Stage
宽依赖会导致Shuffle,Shuffle即数据在分片上的重新分布,会产生网络传输是Spark执行过程中最耗时的部分。
调度器会在产生宽依赖的地方(Shuffle)形成一个stage,同一个stage内的RDD操作会流式执行,不会发生数据迁移。
数据集持久化
Spark中的持久化概念与数据库中的持久化概念稍有区别:Spark中数据集的持久化更偏向于缓存的概念,将中间结果保存下来,以便后续步骤重复利用,避免了一些耗时步骤的反复运行。
Spark提供了多种持久化级别
持久化级别 | 说明 |
---|---|
MEMORY_ONLY | 将RDD以非序列化的Java对象存储在JVM中。 如果没有足够的内存存储RDD,则某些分区将不会被缓存,每次需要时都会重新计算。 这是默认级别。 |
MEMORY_AND_DISK | 将RDD以非序列化的Java对象存储在JVM中。如果数据在内存中放不下,则溢写到磁盘上.需要时则会从磁盘上读取 |
MEMORY_ONLY_SER (Java and Scala) | 将RDD以序列化的Java对象(每个分区一个字节数组)的方式存储.这通常比非序列化对象(deserialized objects)更具空间效率,特别是在使用快速序列化的情况下,但是这种方式读取数据会消耗更多的CPU。 |
MEMORY_AND_DISK_SER (Java and Scala) | 与MEMORY_ONLY_SER类似,但如果数据在内存中放不下,则溢写到磁盘上,而不是每次需要重新计算它们。 |
DISK_ONLY | 将RDD分区存储在磁盘上。 |
MEMORY_ONLY_2, MEMORY_AND_DISK_2等 | 与上面的储存级别相同,只不过将持久化数据存为两份,备份每个分区存储在两个集群节点上。 |
OFF_HEAP(实验中) | 与MEMORY_ONLY_SER类似,但将数据存储在堆内存中。 这需要启用堆内存。 |
共享变量(累加器和共享变量)
Spark中因为算子中的真正逻辑是发送到Executor中去运行的,所以当Executor中需要引用外部变量时,需要使用广播变量。累加器相当于统筹大变量,常用于计数,统计。
广播变量:一些公共规则、公共配置可以使用广播
广播变量预先存储在Driver上,当Task需要广播变量时,它会查看它所在的Executor是否包含,若无所在Executor的BlockManager向Driver获取广播变量。
广播变量更新时,要先删除,再声明新的广播变量。
val conf = new SparkConf()
conf.setMaster("local").setAppName("brocast")
val sc = new SparkContext(conf)
val list = List("hello xasxt")
val broadCast = sc.broadcast(list)
val lineRDD = sc.textFile("./words.txt")
lineRDD.filter { x => broadCast.value.contains(x) }.foreach { println}
sc.stop()
具体示例如上图,不使用广播变量,则Driver会把该变量发给每个Task。(在Driver定义,在Executor使用)
累加器:
累加器通常用于统计,在Driver定义,由Executor更新。
import org.apache.spark.{SparkConf, SparkContext}
object AccumulatorOperator {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local").setAppName("accumulator")
val sc = new SparkContext(conf)
val accumulator = sc.accumulator(0)
sc.textFile("./records.txt",2).foreach {//两个变量
x =>{accumulator.add(1)
println(accumulator)}}
println(accumulator.value)
sc.stop()
}
}
参考文献:
https://www.cnblogs.com/qingyunzong/p/8899715.html#_label0_0(RDD介绍,超详细)
https://blog.csdn.net/databatman/article/details/53023818#4shuffle-%E5%92%8C-stage(shuffle讲的不错,调优讲的不错)
https://www.cnblogs.com/liuliliuli2017/p/6782687.html(共享变量)
https://www.cnblogs.com/LHWorldBlog/p/8424681.html(广播变量和累加器)