Spark产生背景
➢ MapReduce局限性
• 仅支持Map和Reduce两种语义操作
• 处理效率低,耗费时间长
• 不适合处理迭代计算、交互式处理、实时流处理等
• 更多的应用于大规模批处理场景
➢ 计算处理框架种类多,选型复杂
• 批处理:MapReduce、Hive、Pig
• 流式计算:Storm
• 交互式计算:Impala、Presto
• 机器学习算法:Mahout
➢ 希望能够简化技术选型,在一个统一的框架下,能够完成批处理、流式计算、交互式计算、机器学习算法等
Spark简介
➢ 由加州大学伯克利分校的AMP实验室开源
➢ 大规模分布式通用计算引擎
➢ 具有高吞吐、低延时、通用易扩展、高容错等特点
➢ 使用Scala语言开发,提供了丰富的开发API,支持Scala、Java、Python、R开发语言
➢ Spark提供多种运行模式
Spark特点
➢ 计算高效
• 使用内存计算引擎,提供Cache缓存机制支持迭代计算或多次数据共享,减少数据读取的IO开销
• DAG引擎,减少多次计算之间中间结果写到HDFS的开销
• 使用多线程池模型来减少task启动开销,shuffle过程中避免不必要的sort操作以及减少磁盘IO操作
➢ 通用易用
• 提供了丰富的开发API,支持Scala、Java、Python、R开发语言 • 集成批处理、流处理、交互式计算、机器学习算法、图计算
➢ 运行模式多样
• Local、Standalone、Yarn、Mesos
Spark核心概念RDD
Resilient Distributed Datasets弹性分布式数据集
• Spark基于RDD进行计算
• 分布在集群中的只读对象集合(由多个Partition构成)
• 可以存储在磁盘或内存中
• 可以通过并行转换操作构造
• 失效后自动重构
RDD操作
➢ Transformation
• 将Scala集合或者Hadoop数据集构造一个新的RDD
• 通过已有的RDD产生新的RDD
• 只记录转换关系,不触发计算
• 如:map、filter等
➢ Action
• 通过RDD计算得到一个或者一组值
• 真正触发执行
• 如:count、collect、saveAsTextFile
rdd1.map(_+1).saveAsTextFile("hdfs://node01:9000/")
RDD常用Transformation
- map (func)
接收一个处理函数并行处理源RDD中的每个元素,返回与源RDD元素一一对应的新RDD - filter (func)
并行处理源RDD中的每个元素,接收一个处理函数根据定义的规则对RDD中的每个元素进行过滤处理,返回处理结果为true的元素重新组成新的RDD - flatMap (func)
与map函数相似, flatMap是map和flatten的组合操作, map函数返回的新RDD包含 的元素可能是嵌套类型,flatMap接收一个处理嵌套类型数据的函数,将嵌套类型 的元素展开映射成多个元素组成新的RDD - union (otherDataset)
将两个RDD进行合并,返回结果RDD中元素不去重 - distinct ([numTasks]))
对RDD中元素去重 - reduceByKey(func, [numTasks])
对KV类型的RDD中按Key分组,接收两个参数,第一个参数为处理函数,第二个 参数为可选参数设置reduce的任务数, reduceByKey能够在RDD分区本地提前进行 聚合运算,能够有效减少shuffle过程传输的数据量 - sortByKey([ascending],[numTasks])
对KV类型的RDD内部元素按照Key排序,排序过程会涉及到Shuffle - join (otherDataset,[numTasks])
对KV类型的RDD关联,只能是两个RDD之间关联,超过两个RDD关联需要使用 多次join函数,join函数只会关联出具有相同Key的元素,相当于SQL语句中inner join - repartition (numPartitions)
对RDD重新分区接收一个参数numPartitions分区数 - reduce(func)
处理RDD两两之间元素的聚集操作 - collect()
返回RDD中所有数据元素 - count()
返回RDD中元素个数 - first()
- 返回RDD中的第一个元素
- take(n)
返回RDD中的前n个元素 - saveAsTextFile(path)
将RDD写入到文本文件,保存到本地文件系统或者HDFS中 - saveAsSequenceFile(path)
将KV类型的RDD写入到SequenceFile文件保存到本地文件系统或者HDFS - countByKey()
返回KV类型的RDD每个Key有多少个元素 - foreach(func)
遍历RDD中所有元素,接收参数为func函数,常用操作是传入println函数打印所有元素
Transformation与Action对比
➢ 接口定义方式不同
• Transformation:RDD[X] -> RDD[Y]
• Action:RDD[X] -> Z
➢ 执行计算方式不同
• Transformation采用惰性执行方式,只记录RDD转化关系,不会触发真正计算执行
• Action真正触发计算执行
val rdd1 = sc.textFile("hdfs://192.168.183.101:9000/data/wc/in")
val rdd2 = rdd2.flatMap(_.split("\t"))
val rdd3= rdd3.map((_,1))
val rdd4 = rdd3.reduceByKey(_ + _)
rdd4.saveAsTextFile(“hdfs://192.168.183.100:9000/data/wc/out”)
RDD Dependency依赖
➢ Narrow Dependency窄依赖
• 父RDD中的分区最多只能被一个子
RDD的一个分区使用
• 子RDD如果只有部分分区数据丢失
或者损坏只需要从对应的父RDD重 新计算恢复
➢ Shuffle Dependency宽依赖
• 子RDD分区依赖父RDD所有分区
• 子RDD如果部分分区或者全部分区 数据丢失或者损坏需要从所有父 RDD重新计算,相对窄依赖付出的 代价更高,尽量避免宽依赖的使用
实例
# Word Count
val textFile = sc.textFile("hdfs://...")
val counts = textFile.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")
# PI
val count = sc.parallelize(1 to NUM_SAMPLES).filter { _ =>
val x = math.random
val y = math.random
x*x + y*y < 1
}.count()
println(s"Pi is roughly ${4.0 * count / NUM_SAMPLES}")
val rdd = sc.parallelize(1 to 100,2)
#默认分区大小为该程序所分配的资源CPU核数
val rdd = sc.parallelize(1 to 100)
print red.paratition.size
案例
- Word Count(词频数统计)
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object SparkWordCount {
def FILE_NAME:String = "word_count_results_";
def main(args:Array[String]) {
if (args.length < 1) {
println("Usage:SparkWordCount FileName");
System.exit(1);
}
val conf = new SparkConf().setAppName("Spark Exercise:
Spark Version Word Count Program");
val sc = new SparkContext(conf);
val textFile = sc.textFile(args(0));
val wordCounts = textFile.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey((a, b) => a + b)
//print the results,for debug use.
//println("Word Count program running results:");
//wordCounts.collect().foreach(e => {
//val (k,v) = e
//println(k+"="+v)
//});
wordCounts.saveAsTextFile(FILE_NAME+System.currentTimeMillis());
println("Word Count program running results are successfully saved.");
}
}
// 提交任务
./spark-submit \
--class com.ibm.spark.exercise.basic.SparkWordCount \
--master spark://hadoop036166:7077 \
--num-executors 3 \
--driver-memory 6g --executor-memory 2g \
--executor-cores 2 \
/home/fams/sparkexercise.jar \
hdfs://hadoop036166:9000/user/fams/*.txt
1. Spark 基础(上篇)
2.Spark 基础(下篇)
3.Spark运行原理
4.许鹏:从零开始学习,Apache Spark源码走读(一)
5.Hadoop经典案例Spark实现