09spark生态圈复习笔记
1.RDD的数据结构是一个分布式集合,主要分为三部分组成:
数据分片、计算函数、RDD依赖(lineage血缘关系)
2.不同来源的数据 都需要:
经过转换变为RDD 再由Spark进行处理
3. 将内存中的普通的集合变为RDD
sc.parallelize(List(1,2,3),2)//指定分区个数
sc.makeRDD( List(1,2,3,4), 2 )
由一个rdd执行 Transformation操作产生新的rdd
sc.textFile()
4. RDD提供的方法
map - 将函数应用到rdd的每个元素中
filter - 用来从rdd中过滤掉不符合条件的数据
flatMap - 扁平map处理
distinct 去重
union 合并
intersection 交集
subtract 差集
reduceByKey 按照键来进行合并处理
5. PairRDD提供的方法
(1)aggregateByKey(zeroValue)(func1,func2)
(2)groupByKey
(3)join
(4)partitionBy
6. 普通RDD提供的方法
(1)++并集
(2)collect收集
(3)take获取前几个数据
(4)takeOrdered(n) 先将rdd中的数据进行升序排序 然后取前n个
(5)top(n) 先将rdd中的数据进行降序排序 然后取前n个
(6)cache缓存
7.rdd相关的方法可以分为(是否会懒执行):
Transformation与Action两种类型
如果一个方法是由RDD调用 执行后产生 另一个RDD 则这个方法属于Transformation方法,会懒执行
如果一个方法是由RDD调用 执行后不是产生另一个RDD 而是产生一个非RDD的结果 则这个方法是Action类型的方法,会立即执行之前所有的操作包括当前操作和当前操作之前为执行的懒执行操作。
8. 为什么SPark要采用懒执行机制呢?
Spark会将所有连续的懒执行的操作都不立即执行 而是组建出一个执行的有向无环图 称为DAG 直到遇到Action类型的操作 整个DAG有向无环图 才真正去执行
这样的目的在于 在DAG有向无环图执行的内部可以执行流水线优化减少shuffle的过程 提高执行效率。
9. rdd之间的依赖:
整个DAG有向无环图的执行 其实就是处理rdd为另一个rdd的过程 这个过程中父rdd和子rdd之间是有关系的这种关系称之为子rdd对父rdd依赖 这种依赖是通过在子rdd中保存父rdd的血缘关系了来实现。
10.窄依赖和宽依赖的含义及作用:
窄依赖:父RDD中的所有的分区都只面向一个子RDD中的分区
宽依赖:父RDD中有分区面向多个子RDD中的分区
窄依赖可以省略shuffle的过程 执行效率可以大大提高
而如果整个DAG中存在多个连续的窄依赖 则可以将这些连续的窄依赖合到一起连续执行 中间不执行shuffle 从而提高效率 这样的优化方式称之为流水线优化
整个spark在执行DAG的过程中 提升性能的关键就是 尽力的去应用流水线优化
11. spark的处理rdd的过程(即spark比hadoop快的原因)
spark在遇到Transformation类型操作时都不会立即执行 而是懒执行 若干步的Transformation类型的操作后 遇到Action类型操作时 必须要执行了 这时将所有之前的Transformation类型的操作和当前Action类型的操作组成一个DAG有向无环图 。
再从Action方法向前回溯 如果遇到的是窄依赖则应用流水线优化 继续向前找 直到遇到宽依赖 无法实现优化 则将这一次段执行过程组装为一个stage 再从当前宽依赖开始继续向前找 重复刚才的步骤 从而将这个DAG还分为若干的stage。
在stage内部可以执行流水线优化 而在stage之间没办法执行流水线优化 必然会有shuffle 但是这种机制已经尽力的去避免了shuffle
最终
一个DAG对应一个Spark的Job 而其中划分出来的stage对应的就是job当中的task 而又由于rdd中可能有多个分区 这个task可能有多个实例来分布式的并发处理数据
这样 减少了 task的数量 减少了shuffle的过程 - 减少了数据落地的情况 和 由于shuffle的全局栅栏造成对性能的影响。
这就是为什么spark比hadoop快的原因
12. spark中的shuffle的含义
spark中一旦遇到宽依赖就需要进行shuffle的操作
所谓的shuffle的操作的本质就是将数据汇总后重新分发的过程
这个过程数据要汇总到一起 数据量可能很大所以不可避免的需要进行数据落磁盘的操作 会降低程序的性能
所以spark并不是完全使用内存而不读写磁盘 只能说它尽力避免这样的过程来提高效率
13.spark项目的创建和提交
在scala中创建项目 导入spark相关的jar包
创建配置
基于配置生成sc
基于sc开发spark代码
将结果写出
将写好的项目打成jar
上传到服务器
通过命令将jar提交到spark中运行
14. SparkSql中的DataFrame的创建方法
SparkSql中有一个核心的数据结构叫做DataFrame 本质上是对RDD的一个封装 其中采用类似表的结构来存储数据
(1)将rdd转换为df
(2)将txt文件转换为df
(3)将json文件转换为df
(5)利用jdbc将关系型数据中的数据转换为df
15.SparkStreaming 中的DStream:
SparkStreaming 将输入的数据 按照时间为单位进行切片 切出一个个的批 称之为DStream 而DStream本质上就是Spark中的RDD 对DStream的处理最终会被翻译成对底层RDD的处理
16.SparkStreaming 中的DStream处理流程:
SparkStreaming在处理DStream数据时 是按照串行化的方式进行处理的。这也就意味着上一个DStream在当前算子中未处理完成时 下一个DStream即使到来也要阻塞等待。 而DStream内部本质上是RDD 本身就是分布式的数据集 可以分布式的处理。 所以SParkStreaming中对数据的并发的处理是体现在DStream对应的RDD本身的并发上 , 而放弃了批之间的并发。放弃了批之间的并发后 虽然造成的延时性上的一些损失 但是在可靠性保证 并发控制 程序开发复杂度的降低上 都带来了好处。我们在使用SparkStreaming时 应该合理的配置切片时间 和 每个算子的复杂程度 尽力的让每个算子都可以在切片时间内将数据处理完 这样 可以减少数据可能的堆积 以及 算子闲置的可能 实现最优的并发。
17.SparkStreaming 相对于Storm的优势:
相对于Storm ,SparkStreaming的优势并不体现在低延时 而是在高吞吐量上有自己独特的优势。
18.SparkStreaming 入门案例(包括checkPoint,滑动窗口的使用方法)
import org.apache.spark.streaming._
val ssc = new StreamingContext(sc,Seconds(1));
val lines = ssc.textFileStream("file:///root/work/streamData");
val words = lines.flatMap(_.split(" "));
val wordCounts = words.map((_,1)).reduceByKey(_+_);
wordCounts.print();
ssc.start();
经过测试如上的代码确实可以监控指定的文件夹处理其中产生的新的文件
经过测试 发现 数据每次都会重新进行计算 每个block都会有一个独立的结果
而如果需要对历史数据进行累计处理 该怎么做呢
SparkStreaming提供了checkPoint机制 通过在临时文件中存储中间数据 为历史数据累计处理提供了可能性
import org.apache.spark.streaming._
val ssc = new StreamingContext(sc,Seconds(1));
ssc.checkpoint("file:///root/work/streamCheckPoint");
val lines = ssc.textFileStream("file:///root/work/streamData");
val words = lines.flatMap(_.split(" "));
val wordCounts = words.map((_,1)).updateStateByKey{
(seq, op:Option[Int]) => { Some(seq.sum + op.getOrElse(0)) }
}
wordCounts.print();
ssc.start();
上面的例子实现了数据的累计统计
但是这上面的例子里所有的数据不停的累计 一直累计下去
很多的时候我们要的也不是这样的效果 我们希望能够每隔一段时间重新统计下一段时间的数据
这样的功能可以通过滑动窗口的方式来实现
在DStream中提供了如下的和滑动窗口相关的方法:
window(windowLength, slideInterval)
countByWindow(windowLength, slideInterval)
reduceByWindow(func, windowLength, slideInterval)
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])
countByValueAndWindow(windowLength, slideInterval, [numTasks])
可以通过以上机制改造案例
import org.apache.spark.streaming._
val ssc = new StreamingContext(sc,Seconds(1));
ssc.checkpoint("file:///root/work/streamCheckPoint");
val lines = ssc.textFileStream("file:///root/work/streamData");
val words = lines.flatMap(_.split(" "));
words.map((_,1)).reduceByKeyAndWindow( (x:Int,y:Int)=>x+y, Seconds(5), Seconds(5) ).print
ssc.start();
19. SparkStreaming可以从多种输入源获取数据
文件系统
ssc.textFileStream
hdfs
kafka
KafkaUtils.createStream(ssc, "localhost:2181", "group1", Map("jt"->1))
数据库
网络
ssc.socketTextStream
20. SparkStreaming可以通过多种渠道进行输出
print()(即打印到控制台) Prints the first ten elements of every batch of data in a DStream on the driver node running the streaming application. This is useful for development and debugging.
Python API This is called print() in the Python API.
saveAsTextFiles(prefix, [suffix]) Save this DStream's contents as text files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]".
saveAsObjectFiles(prefix, [suffix]) Save this DStream's contents as SequenceFiles of serialized Java objects. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]".
Python API This is not available in the Python API.
saveAsHadoopFiles(prefix, [suffix]) Save this DStream's contents as Hadoop files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]".
Python API This is not available in the Python API.
foreachRDD(func) The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs.
项目
大数据中心项目
这个项目的主要目的是通过大数据中心对省电力公司下属所有集体企业现有系统进行业务数据融合,建立统一的数据模型,实现业务互联,数据互通,统一分析,统一服务
项目概述:
我们项目的数据来源全都是oracle或mysql数据库中的表结构的数据,我们通过informatic将最新数据(这里的最新是指按照业务需求的不同抽取表中当日、近一个月、近三个月、近一年、近三年的数据)导入到oracle数据库中,通过视图的方式进行处理后推送给展示大屏,然后通过sqoop将全量数据(包括所有历史数据)导入hdfs中,有些业务需要用到这些历史数据,我们通过hive创建外部表进行关联后处理业务数据,将数据处理后推送给展示大屏
对于informatic,我们目前是每天晚上将所有工作流都定时跑一边
对于sqoop,我们目前对大部分表是每个月6号跑一次,少部分表是每天晚上都跑一次
数据处理的频度和数据接入的频度一致,我们都是在数据接入完后紧接着就处理,然后推送
infa是通过ETS配置定时任务的
sqoop是通过ooize配置定时任务的
由于数据来源都是数据库,因此数据品质比较高,并不需要进行数据清洗,我们就是最后看看展示数据有没有问题,如果有问题再回溯找到问题就可以了
有些业务hive处理不了时我们也会写MR来处理
之所以数据分别储存在oracle 与hdfs中,是因为面对的业务场景不同
对于实时性要求高的业务场景(如:银行账户交易明细,要求5分钟更新一次),由oracle 提供数据服务oracle 中的表大部分只储存当月的数据,不储存历史数据)
对于无实时性要求的业务场景(如:近5年主营和辅营业务的收支情况变化趋势),由大数据平台提供数据服务
集群规模:
infa 1 64G
oracle 1 128G
hadoop 5 128G
每日处理的数据量:
我们数据处理的频率不太一样,hive\MR是每个月6号执行一次,视图是每天执行,综合下来每天接入处理的数据至少也有几亿条吧
我们有1500多张表要接入,数据量有的只有几条,有的有几千万条,我们每天跑的oracle中的视图有200-300个每个最多十几秒,hive任务有20几个每个10几分钟,MR任务有7、8个也是每个10几分钟
遇到问题:
平台搭建会遇到一些问题,尤其是kerberos服务安装以后,各种权限问题
infa导入时数据长度问题
sqoop导入时还要注意字段类型转换问题
空值或者key值分布不均过多造成的数据倾斜
需求举例:
银行账户交易明细(求出银行类别,账户名称,交易时间,交易金额,交易类别,借贷标记等)维度包括账户类别,收款方类别,交易类别
近5年年度资产负债率预算与执行(求出历年的资产负债率,本年是预算,往年是决算)维度包括采购类型、采购方式
三年以上应收账款,车辆使用情况,人员变动情况
农业服务平台项目
本项目主要目的是为虾稻产业园提供一套完整的管理系统,对各个生产环节进行监控、预警、调度,优化产业链
视频软件后台数据分析项目
这个app主要是做一对一视频聊天和短视频的,通过对用户、主播的行为分析以及对app浏览日志的分析,为产品的迭代、产品运营状况提供数据支持