Spark是一种基于内存计算的计算引擎(通俗地说就是计算速度快),由于其可以将计算的中间结果存放在内存中,因此非常适合于迭代计算和交互式查询。
一、Spark的四大核心组件和集群资源调度器
1.1 Spark的四大核心组件
Spark core定义了Spark基本功能和模块,包含SparkSession、RDD、DAG、Lingage、Cache、broadcast等,并封装了底层通讯框架,是Spark的基础。我们常说的Spark四大核心组件分别是指Spark SQL、Spark Streaming、Spark ML 以及 Spark Graphx,它们担任的角色如下:
- Spark SQL:通常用于对结构化数据(比如RDBMS)进行SQL操作(比如ETL),交互式查询;
- Spark Streaming:流计算,严格地说是micro-batch流计算;与之类似的产品有Strom和Flink;关于区别可以参考:https://mp.weixin.qq.com/s/Fb1cW0oN7xYeb1oI2ixtgQ;
- Spark ML: 机器学习,可以利用Spark适合迭代计算的优势来加速机器学习训练,但通常算法人员用的比较多,而开发人员用的较少;
- Spark GraphX:图计算,没用过。
这四大核心组件都依赖于Spark core才能工作。
1.2 Spark的集群资源调度器
通常做大数据处理,至少要具有三个功能,即:
- 数据存储;
- 数据计算;
- 集群资源管理。
比如Hadoop 2.X 就是HDFS、MapReduce、YARN分别负责这三个功能。Spark则采用如下方式:
- 数据存储:由于Spark本身不支持数据存储模块,通常使用HDFS作为其存储引擎;
- 数据计算:Spark本身就是计算引擎;
- 集群资源管理:在Spark中称为ClusterManager,Spark自带Standalone scheduler,也可以利用第三方资源管理器,比如Hadoop YARN 和 Mesos以及最近很流行的k8s,即Spark on YARN(生产环境用得最多的方式)、Spark on Mesos 以及 Spark on Kubernetes。
二、Spark中常见的专业术语
名称 | 本质 | 角色 | 备注 |
---|---|---|---|
application | 提交给spark处理的应用程序,比如一个jar | 无 | 无 |
transformation操作 | 一种操作类型,抽象 | 无 | lazy,直至遇到下一个action操作才执行 |
action操作 | 一种操作类型,抽象 | 无 | 立即执行,非lazy |
job | action操作 | 无 | 1个action操作对应个job |
task | action或者transformation操作可以分成多个子任务,这里的子任务即task | 无 | 一般来说 task数=RDD分区数 |
stage | 执行整个application需要分成若干个stage依次先后完成,是一种抽象 | 无 | 当出现shuffler时,认为存在新的stage,一个stage通常包含多个task |
RDD | 弹性分布式数据集,一种数据结构 | Spark中独特的数据结构抽象 | 无 |
DAG | 有向无环图,由stages 和 tasks组成 | 用于指定执行application中的各stage及其task | 无 |
DAG scheduler | 接口 | 将整个application划分成一个或多个stage | 生成相应的task set放到task scheduler中 |
task scheduler | 接口 | 负责分配具体的task给executor进行处理,向 DAGScheduler 汇报执行情况 | 无 |
driver | 进程 | application中main方法所在进程,用于管理指定application中的action/transformation操作(即各stages)的执行先后顺序,协调tasks和stages | 如果把一个application视为一个建筑工程,那么driver就像是现场指挥,提交一个application就有一个driver |
master | 服务级别的进程 | 负责集群级别的全局调度,向cluster manager请求资源并使得这些资源能够为driver所用 | 一个集群只有一个处于working状态的active master(一台机器),如果是HA则可以多添加一个standby master(另一台机器) |
worker/slave | 服务级别的进程 | 即slave,多个executor可以运行在一个worker节点上 | 一台物理机器上可以布置多个worker节点,由于ip是一致的,通过port来区分同一台机器上不同的worker |
executor | 线程 | 真正干活的,用于启动线程池运行任务,负责处理task | 一个executor可以负责处理多个task,每个Application拥有独立的一组Executors,一个executor负责处理一个task |
cluster manager | 服务级别的进程 | 负责集群级别的资源分配,监控worker节点,保留master为worker节点请求的资源 | 一个集群只有一个 |
注:worker节点即slave(Spark采用的是主从架构),定义在spark配置路径{SPARK_HOME}/conf中的slaves文件(需要手动创建,通过copy 现有的slaves.template得到,并添加上worker主机名),比如:
# A Spark Worker will be started on each of the machines listed below.
# 注意此处的slave名必须和hosts文件中定义的slave名一致
# 每个slave占一行
slave1
同时slave(也包括master)还必须在hosts文件中定义,以让Spark知道worker所在机器的IP,比如:
# spark master & slaves
127.0.0.1 master
127.0.0.1 slave1
三、Spark的架构和部署方式
与Hadoop类似,Spark也是采用主从(master/slaves)架构,一个active master 节点(如果是高可用HA则还有一个standby master节点)和若干个worker节点(slaves)。
3.1 Spark的架构
如果将Spark集群类比为一个建筑公司,那么:
Spark集群 | 建筑公司 | 备注 |
---|---|---|
master | CEO | 无 |
worker/slave | 工程队 | active master只有一个,worker可以多个 |
executor | 工程队中的施工人员(真正干活的) | 一个worker中executor可以多个 |
driver | 某个工程的现场总指挥 | 一个application对应一个driver,多个application就会有多个driver,各driver之间互相隔离 |
cluster manager | 资源管家(负责整个公司的资源分配) | 一个集群只有一个cluster manager |
- 建筑公司视为众多客户服务的,就像是客户将需求提交到公司CEO,client将自己的application提交到Spark master(比如YARN),这里的client可以是spark-shell或者spark java/scala/python API;
- 每个client提交application时会请求所需的硬件资源,比如executor个数、executor memory大小、driver memory大小等,只有在Spark集群资源满足所需请求的条件下,新submitted的application才会被accepted,否则需要等待其他application完成或者退出而释放出足够的硬件资源;
- 就像是某位施工人员同一时刻只能在建筑项目中干活一样,某个executor同一时刻只能服务于某一个application,直至该application完成或者退出而得以释放。
3.2 Spark的部署方式
Spark的部署方式有两个因素决定:
- master:分单机模式(local、local[k]、local[*],其中k为正整数,通常用于开发时的debug)、集群模式(standalone、YARN、Mesos、k8s,生产环境中YARN最常见);
- deploy-mode:只是当master为集群模式时才生效,分client和cluster。
在生产环境中最常见的部署方式还是yarn-cluster,因为存在如下考虑:
- 生产环境中通常还需要用到Hadoop的MapReduce,其需要YARN来进行集群的资源分配;Spark on YARN就正好可以共用YARN来负责Hadoop和Spark的资源分配;
- 如果一直使用yarn-client模式提交application,由于driver进程运行在提交application的单机节点上,会导致提交任务的那个节点运行过多的driver进程,而driver进程又要与众多的executor进程通信,因此会导致提交任务的节点单机流量暴增,且加大了该节点的压力;
- 使用yarn-cluster模式提交application时,由于driver会被根据集群负载情况被自动合理地分配到集群中的某个节点上,因此可以实现负载均衡,防止单机流量暴增的出现。
综上,yarn-cluster模式有类似于负载均衡的作用,在需要提交多个spark任务时尽量使用yarn-cluster模式。
3.3 Spark参数配置的优先级
基本上有3处可以配置Spark的master和deploy-mode的取值:
- Spark集群的配置文件:spark-defaults.conf;-- 优先级最低;
-
spark-submit脚本或者SparkLauncher(java API);--优先级次之(参考博客:https://www.jianshu.com/p/36193b394a50
和https://www.jianshu.com/p/1d41174441b6); - 业务代码中通过SparkConf设置,--优先级最高,比如:
// initialize spark context
val sparkConf = new SparkConf().setAppName("app1").setMaster("yarn")
优先级就是说,当有两处以上对同一个参数进行了设置时,优先级高的生效。
四、Spark application提交和处理流程
4.1 Spark提交application的方式
目前Spark支持两种提交application的方式:
- spark-shell中执行命令:在终端terminal中输入spark-shell或者spark2-shell启动spark client,执行命令比如:
spark2-shell --executor-memory=512m
# 启动client后会显示:
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.0
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_171)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
# 然后将application的业务代码逐句执行即可
......
执行spark-submit脚本:参考博客https://www.jianshu.com/p/1d41174441b6;
java API SparkLauncher类:参考博客https://www.jianshu.com/p/36193b394a50。
4.2 Spark处理所提交的application的流程
(1) 构建Spark Application的运行环境(启动SparkContext),SparkContext向资源管理器(可以是Standalone、Mesos、YARN或k8s)注册并申请运行Executor资源;
(2) 资源管理器分配Executor资源并启动StandaloneExecutorBackend,Executor运行情况将随着心跳发送到资源管理器上;
(3) SparkContext根据driver端代码构建DAG图,将DAG图分解成Stage,并把Taskset发送给Task Scheduler。Executor向SparkContext申请Task;
(4) Task Scheduler将Task发放给Executor运行,同时SparkContext将应用程序代码发放给Executor;
(5) Task在Executor上运行,运行完毕释放所有资源。
五、Spark的共享变量
参考博客:https://www.jianshu.com/p/e09827bce476
六、Spark的监听
参考博客:https://www.jianshu.com/p/7d79092e2d9c
如有错误,敬请指正!
参考:
https://www.cnblogs.com/langfanyun/p/8098804.html
https://blog.csdn.net/addUpDay/article/details/89112712
https://www.jianshu.com/p/da20133ecfea
https://blog.csdn.net/songhao22/article/details/79069983
https://www.cnblogs.com/frankdeng/p/9301485.html