spark(1)介绍
1. 快速且通用的集群计算平台
- 扩充了流行的MapReduce计算模型
- 基于内存(发现hadoop在迭代式计算和交互式上的低效)
- 融合优点:批处理(hadoop)、迭代式运算(机器学习系统)、交互式查询(Hive)、流处理(Storm),降低了成本
- 和其他大数据工具整合很好,如hadoop、kafka
2. spark组件
3. Hadoop Spark 差异
- Hadoop离线处理,对时效性要求不高
- Spark用于时效性要求高及机器学习等领域
- Spark不具备HDFS的存储能力,要借助HDFS等持久化数据
spark(2)下载安装、shell
1. 虚拟机联网
2. 下载
3. 解压
- tar -zxvf spark-1.6.3-bin-hadoop2.6.tgz
4. 目录
- bin/spark shell(交互性、实时性)pyspark和spark-shell
- core、streaming、python主要是组件源代码
- example包含单机Spark job
5. scala shell
- 读取文件
val line = sc.textFile("../../testfile/helloSpark") //加载文本文件,返回RDD
line.count() //计算行数
line.first() //第一行
6. 修改日志级别
- 日志输出太多,改为显示WARN日志
conf/log4j
cp log4j.properties.template log4j.properties
vi log4j.properties
log4j.rootCategory=WARN,console(wq)
Spark(3)开发环境
1. Scala安装
- Spark 2.0 —— Scala 2.11
2. IDEA安装
3. IDEA插件
- Scala
- SBT
4. 新建项目
- Scala项目,用SBT打包
Scala 2.10.5 Spark 1.6.2 SBT 0.13.8 JDK 1.8
5. ssh无密码登录
- ssh localhost发现要输密码
ssh-keygen
touch authorized_keys(.ssh下)
cat id_rsa.pub > authorized_keys
chmod 600 authorized_keys
- ssh localhost试验是否登录成功
6. 第一个程序WordCount
- 创建一个Spark Context
- 加载数据
- 把每一行分割成单词
- 转换成pairs并且计数
- 新建Scala类(object类型)
- build,打成jar包
7. 启动集群
- (spark文件夹下)启动master ./sbin/start-master.sh
- 启动worker ./bin/spark-class org.apache.spark.deploy.worker.Worker spark://localhost.localdomain:7077
worker上打开localhost:8080,得到 ://localhost.localdomain:7077 - 提交作业 ./bin/spark-submit --master spark://localhost.localdomain:7077 --class WordCount /home/maixia/soft/imoocpro.jar
Spark(4)RDDS
1.Driver program
- 包含程序的main()方法,RDDs的定义和操作
- 管理很多结点(executors)
2. SparkContext
- Driver programs 通过SparkContext对象访问Spark
- SparkContext对象代表和一个集群的连接
- 在Shell中SparkContext自动创建好了就是sc
3. RDDs基础
- Resilient distributed datasets(弹性分布式数据集)
- 并行分布在整个集群中,是Spark分发数据和计算的基础抽象类
- 代表一个不可改变的分布式集合对象,Spart中所有计算都通过RDDs的创建、转换、操作完成
- 一个RDD内部有很多partitions(分片)组成,每个分片包括一部分数据,partitions可在集群不同节点上计算。
- 分片是Spark并行处理的单元,Spark顺序地、并行地处理分片
4. RDDs创建
- val rdd = sc.parallelize(Array(1,2,2,4),4) 测试用
rdd.foreach(println) - val rddText = sc.textFile("helloSpark.txt") 加载外部数据集
5. Scala匿名函数 类型推断
lines.filter(line => line.contains("world"))
- 定义一个匿名函数,接受一个参数line
- 使用line这个String类型变量上的contains方法,并且返回结果
- line的类型不需要指定,能够推断出来
6. Transformation
- 从一个RDD构建一个新的RDD,如map()和filter()
- 逐元素的Transformation
- map(),接收函数,把函数应用到RDD的每一个元素,返回新RDD
- filter(),接收函数,返回只包含满足filter()函数的元素的新RDD
- flatMap(),对每个输入元素,输出多个输出元素,将RDD中元素压扁后返回一个新的RDD
- 集合运算
- 并集:rdd1.union(rdd2)
- 交集:rdd1.intersection(rdd2)
- 差集:rdd1.subtract(rdd2)
- 去重:rdd.distinct()
7. Action
- 在RDD上计算出一个结果,把结果返回给driver program或保存在文件系统
- reduce(),接受一个函数,作用在RDD两个类型相同的元素上,返回新元素,可以实现RDD中元素累加、计数等聚集操作
- collect(),遍历整个RDD,向driver program返回RDD的内容(需要单机内存能够容纳下,因为数据要拷贝给driver,测试使用,大数据时使用saveAsTextFile())
- take(n),返回RDD的n个元素,同时尝试访问最少的partitions(返回结果无序,测试使用)
- top(),排序(根据RDD中数据的比较器)
- foreach(),计算RDD中每个元素,但不返回到本地(一般配合println打印数据,方便测试)
8. keyValue对RDDs
- 使用map()函数,返回key/value对(包含整行数据的RDD,把每行数据的第一个单词作为key)
val rdd2 = rdd1.map(line=>(line.split(" ")(0),line))
9. combineByKey()
- 最常用的基于key的聚合函数,返回类型可以与输入类型不一样
- 遍历partition中的元素,对于没见过的key使用createCombiner()函数,对于见过的使用mergeValue()函数,合计每个partition的结果时使用mergeCombiners()函数
//把成绩相加再求平均值
val score2=score.combineByKey(score=>(1,score),(c1:(Int,Double),newScore)=>(c1._1+1,c1._2+newScore),(c1:(Int,Double))=>(c1._1+c2._1,c1._2+c2._2))
val average=scores2.map{case(name,(num,score))=>(name,score/num)}
10. Spark特性
- 血统关系
- 延迟计算(Lazy Evaluation)
- action时才会真正计算,减少数据的传输
- 内部记录metadata表明transformations操作已被响应
- 加载数据也是延迟计算,数据只有在必要时候才会加载进去
- RDD.persist(),默认每次在RDD上进行action操作时,Spark都重新计算一遍,如需可重用使用RDD.persist(),unpersist()从缓存中移除
- SER表示序列号,对CPU占用高