主要的component
standalone模式下:
- Master+work 的组网模式,master可以配置HA,可以通过 zookeeper来配置master选举;
- 一个application对应一个 driver;driver 与master 通信, master 负责管理自己的资源分配,与work通信生成 executors。
- executor 为work 中的进程,我们可以简单看作JVM进程,运行对应的task;
- driver 与 executor 通信,使用netty 组件;下发任务,协调数据分发等;
yarn 模式 - 不同于standalone 模式,由yarn管理资源,使用resourcer manager 分配资源和动态生成 executors,是一个按需的过程;
而standalone,是一开始启动集群,work +master就在那里了; - 不管是master 还是 yarn 对于资源的调度都是一个很麻烦的事情; 基本的原则是让数据和计算节点越近越好,最优的情况是数据就存在计算节点上;
- yarn 有client 模式和 cluster模式。client模式下,driver 在提交任务的本机,而cluster模式,有集群选取一个节点作为运行driver的节点;
- 更细化来讲,对于yarn 模式,分配的单元为container;spark 依旧有一个动态的 applicationMaster,负责资源的申请和调度,只不过负责向resourcemanager申请; 并且 yarn 的第一个分配的container 就是会运行 appMaster; client模式里,appMaster只负责申请支援,而在cluster模式中, appMaster所在的节点,同样会运行 spark driver 程序,更加复杂一些;
数据模型层面RDD
分布式弹性数据集RDD,对于分布在集群中的数据进行建模,有了这样一个logical 数据集概念,我们就可以在数据集上增加一些列的操作,操作对应集群的运算任务,就这样 存储+计算模型就有了;
RDD 相应的操作有很多,可以参考RDD的API,主要有 创建, transform,控制,sink(收集);
不同的RDD API 对于性能也是有不同的消耗,我们需要根据需求进行取舍;
比如 reduceByKey,groupByKey都可以实现reduce操作,但是reducebykey, 会在map阶段之后本地进行一次merge。比如groupbykey效果好;
比如foreachPartition 和 partitionMap,都可以针对 parititon级别的数据进行操作,有batch 的效果,对于需要针对每个元素开启连接资源的任务,使用 partition级别的迭代效果要好很多;
RDD 有了链式操作之后,就需要走一个典型的任务构建。非常典型的分为 logical plan 和 physical plan。对应 DAG 图 和 Task Schedule Graph; 其中 DAG 图就是 把RDD 的操作 分为 不同的stage。主要用到了 窄依赖 和宽依赖的概念。
窄依赖就是 我们常用的 transform step,在一个节点可以完成, 而宽依赖 则是 需要shuffle的步骤,比如 group,reduce,join等; 一个 宽依赖 就是 DAG 中一个stage, spark 会构建一个stage 有向无环图,然后并行执行stage。 而每个stage 背后就是一系列的 taskset; task 会被调度到 具体的excutor中,也有非常复杂的调度算法进行处理;
存储模型层面
RDD 数据集 包装了 底层文件的各个block,各个节点通过blockManager来访问分布在集群中的 block数据块;
RDD 的持久化有多个级别,包含了 disk,memory,disk_memory, spark 尽可能的将RDD 的数据集缓存在中,以获得更快的计算和IO性能;我们也可以通过类似于cache()的方法将数据集存放在内存中,spark 调度策略里也会优先使用包含缓存数据的节点,就近计算对应任务;
在需要 shuffle 的task中, spark 也做了对应的优化;最开始是 hashShuffle。 如果有 N 个mapper,M 个 reducer,那么每个Mapper就会 生产M个文件,系统有 N*M个文件,小的文件会造成 磁盘性能的下降和 随机写的开销;
目前 spark 使用 sortShuffle。也就是说一个 mapper 自保存一个 sort 的 文件和一个index 文件,不同的 reducer 的input 被划分到不同的 offset里面,这样就解决了小文件 碎片化问题;
作为分布式系统,spark 也提供了 可配置的序列化方式,比如 java/kyro, 以及多种 压缩方式;
此外还提供了了 分布式counter/ Accumlator, 以及 broadcast 变量(可读),供我们使用;
sparkSQL
支持SQL语义的查询,使用antlr4 词法语法解析sql,构建逻辑和物理执行计划; 基于rdd构建了DataFrame数据集,可以让我们使用类似于pandas dataframe的概念来操作数据集; 支持jdbc的连接来创建数据集;
支持对hive 进行连接,然后sparkSql On Hive 的方式,运行交互式的查询, sparksql的性能会大幅高于 hive本身;
spark streaming
采用了消费生产者模式, 从数据源收集的数据,放入pool中,然后根据我们定义的 处理时间来划分一个batch。
比如定义了1s为一个batch 间隔,那么每1s 拿去 一个batch 然后生成 RDD,之后把一些列小的RDD 放入一个队列里面,由 Driver端的 JobGenerator 持续的生成一个一个的job; 可以看到不同的应用场景有不同的 driver来处理;
streamming也支持 window的概念,滚动和滑动窗口,目窗口的时间单位为秒级别; 相对于flink 无法处理低延迟的场景;
其他的很多概念和flink里面是差不多的,比如说 HA 时候的高可用,需要结合上游的组件来做容错,如果是kafka那么 checkpoint可以记住offset,如果是其他的数据源,可以在 额外的系统里面存储 checkpoint的数据集等等
spark graphx
graphx 算是平常工作中用的比较多的了。 最主要有两点,给你一个图的节点集合,如何存储在不同的分区中,涉及到的hash分片等算法,spark有随机hash,2D partition 等策略等;
如果确定了一个节点在哪个分区之后,就是确定存储哪些数据,分为点分割还是边分割,对于A-B-C 这样一个结构;
如果是点分割,存储的是 A-B; B-C;节点冗余,但是获取连接矩阵就特别方便,都在一个分区上;如果是边分割,
A,(A-B), B(B-C,B-A), C(C-B); 节点不会冗余,边冗余,拉去连接矩阵还是有开销;
各自的优点: 点分割,适合OLAP,不在乎存储的,空间换时间;
边分割,适合OLTP,空间少,hugegraph 就是oltp场景, 比如插入的时候,不需要为冗余的节点都插入边;
然后就是计算模型使用 pregrel,实际使用的是spark 的aggregate 方法, 沿着 边发送消息到目标节点,目标节点进行reduce,然后就是不断的迭代;
优化TIPS
数据倾斜:
- 倾斜的原因可能是Map 和 reducer端, map 端 文件大小不一样,而且采用了不支持切分的压缩算法,那么文件如果100G,则一个map就会以100G作为一个map;map就会出问题;map端的倾斜需要做一些预处理; 而reducer端是比较常见的倾斜;i
- 使用sample 函数然后再统计一下key的分布,看看哪些key倾斜,最针对性的处理
- 比如对于key 进行随机化处理,reduce之后,再进行一次 reduce,三段与多个阶段;
- 尽可能的不适用reduce 等需要shuffle的算子;比如 join的时候,可以使用 map join。也就是说 在 map函数里面自己实现;
这个在hive 或者spark 里面 的 table join之中比较常见; 大表join 小表的时候,小表 broadcast 到各个节点,然后map join【之所以叫map join,是因为在map阶段就解决了,没有reduce阶段】;另外可以考虑尽可能的增加资源,再不行就要随机化。 - 广播变量:需要序列化,缺省10M,由driver的blockManager管理;使用BT传输协议,是一个p2p传输协议,减少 Driver的传输负担;比如广播变量大了之后,分成M个小block;传给N个executor。executor 自己可以相互交换;
- 大表join(1)。找出一个大表中的 倾斜集合,分为 skew key 和 normal key,分为两部分。normal的key 正常join; skew 的key 单独join;这种方式就是 把数据集拆分出来 正常+异常的部分;
- 大表join(2)。 对于大表的数据做 随机化,相对小的表 做 扩容; 比如随机化 参数为10,那么小表就需要扩容10倍; 在一些场景可能合适;
TopN:
- 全局topN会有问题,可能数据太大,内存溢出,可以parition 之后,对每个pairtion 取 topN; 合并起来,在做一个排序,去topN;
- 对于分组TopN,如果groupBy之后又数据倾斜的内存溢出,这考虑随机化,或者是自定义分区器。就是获取key的枚举值,然后重分区到 各个枚举值中,然后再取topN; 总之要避免全局排序。或者是非常大的一个key的排序;
Yarn 的调度流程
- 创建applicationId,上传 jar 包+其他资源等;
- 由RM 分配第一个容器,构建 appMaster 做统一的为每个任务进行调度
- appMaster 告诉resourceManager自己要的资源,然后由其分配资源;appMaster使用心跳不断轮训RM分配的情况;
- appMaster 拿到分配的资源definition, 让 NodeManager启动executor
- appMaster解析好 stageScheduler和taskScheduler,不断下发任务,和spark cluster 模式是一样的;
Spakr on K8s
从spark2.3开始,支持k8s作为调度了. 目前也有 k8s-spark-operator
主要的思路,构建一个driver-pod,由driver pod 不断去申请 execution pod;类似于NR的调度方式;只不过换成了 POD;
** yarn vs k8s
k8s 更全面的隔离;