Flink On Yarn 架构
Flink Cluster on Yarn启动过程中,大体可以分为二个阶段
1. Filnk Client发起请求,申请启动Flink Cluster on Yarn
2. Yarn RM接收请求,并指定NM分配Container启动Flink Cluster.
(需要配置YARN_CONF_DIR, HADOOP_CONF_DIR ,HADOOP_CONF_PATH其中一个用来确保Flink能够访问HDFS和Yarn的RM。)
主要启动流程
1. 进程
首先我们通过下面的命令行启动flink on yarn的集群(Flink Client发起请求)
安装Flink:只需在一台可以连接至Yarn & HDFS集群的任意节点安装即可
启动脚本(命令):./bin/yarn-session.sh -n {num} -jm {num} -tm {num}
运行实例:yarn-session.sh中运行的最后命令是:java …org.apache.flink.yarn.cli.FlinkYarnSessionCli
/opt/meituan/flink-1.6.2/bin/yarn-session.sh -D yarn.container-start-command-template="/usr/local/jdk1.8.0_112/bin/java %jvmmem% %jvmopts% -Djob_name=appflow_log_fact -DjobID=10088 -Dengine_type=FLINK -Djob.suffix.topic=appflow-rtdw %logging% %class% %args% %redirects%" -d -n 120 -tm 4096 -jm 8000 -qu hadoop-rt.queue01 -s 1 -nm appflow_log_fact
这里将产生总共五个进程
** 1个FlinkYarnSessionCli ---> Yarn Client **
** 1个YarnApplicationMasterRunner ---> AM + JobManager**
3个YarnTaskManager --> TaskManager
即一个客户端+4个container,1个container启动AM,3个container启动TaskManager。
2.启动流程
1.FlinkYarnSessionCli 启动的过程中首先会检查Yarn上有没有足够的资源去启动所需要的container,如果有,则上传一些flink的jar和配置文件到HDFS,这里主要是启动AM进程和TaskManager进程的相关依赖jar包和配置文件。
简单描述FlinkYarnSessionCli的主要内容
1) 根据FLINK_CONF_DIR & (YARN_CONF_DIR | HADOOP_CONF_DIR) load相关配置
2) 创建yarnClient,并申请一个applicationId
3) 将Flink集群运行所需要的Jar & Conf PUT至HDFS上
4) 封装ApplicationMaster启动需要的Env & Cmd至Request对象中,并用yarnClient对象发起请求,等待响应
5) 确认启动成功后,将重要信息封装成properties文件,并持久化至本地磁盘
注意事项:
步骤三中的HDFS路径,默认为:/user/{user}/.flink/{applicationId}
- 如果HDFS没有为该user创建 /user/{user} 目录,将抛出异常
- 由于该步骤中需要使用到applicationId,所以需要先通过yarnClient申请applicationId
步骤四才会真正的向Yarn申请资源运行ApplicationMaster
- AM并不是Yarn的接口实现类,而是封装至Context中的启动命令 & 环境变量等相关信息
启动成功后生成的properties文件中 最重要的信息为applicationId,将在之后的Flink Job提交时用于查找Cluster信息
- properties文件持久化路径,默认为:/tmp/.yarn-properties-{user}
- 如果在一个节点启动多个Session,则需要注意这个文件位置(目前还未研究)
2.接着yarn client会首先向RM申请一个container来 ApplicationMaster(YarnApplicationMasterRunner进程),然后RM会通知其中一个NM启动这个container,被分配到启动AM的NM会首先去HDFS上下载第一步上传的jar包和配置文件到本地,接着启动AM;在这个过程中会启动JobManager,因为JobManager和AM在同一进程里面,它会把JobManager的地址重新作为一个文件上传到HDFS上去,TaskManager在启动的过程中也会去下载这个文件获取JobManager的地址,然后与其进行通信;AM还负责Flink的web 服务,Flink里面用到的都是随机端口,这样就允许了用户能够启动多个yarn session。
启动命令:
两组件,三阶段
1) RM接收请求,并查询可用NM,并使其启动Container来运行AM
2) NM接收调度,并依据信息相关信息,将Jar & Conf从HDFS下载至Local,同时还依据Cmd & Env在本地生成launcher脚本
3) 通过运行launcher脚本,来启动ApplicationMaster(从源码中可以发现,Flink Client发送来的Cmd为:java … YarnSessionClusterEntrypoint)
简单描述FlinkSessionClusterEntrypoint的主要内容
1) 启动基于Akka的 RPC Service & Metric Register Service
2) 启动HA Service & Heartbeat Server
3) 启动BLOB Server & ArchivedExecutionGraphStore (会在Local创建临时目录用于存储)
4) 启动Web Monitor Service(任务管理平台)
5) 启动JobManager服务,用以管理TaskManager进程
注意事项:
1) 步骤二中用于存储Jar & Conf以及launcher脚本的地址为:/data/hadoop/yarn/local/usercache/{user}/appcache/application_{applicationId}/container_{applicationId}_…,其中包含一下内容 :
- launch_container.sh
- 启动命令 & 环境变量
- flink-conf.yaml & log配置文件 – 启动配置 & 日志配置
- flink.jar & lib – 运行依赖Jar
2) 步骤三中运行YarnSessionClusterEntrypoint,以此来启动JobManager,而后的TaskManager,则有JobManager来启动并管理
- 实际上,在on Yarn模式下,TaskManager的启动 是推迟到了Filnk Job的调度发起的时候,并且,当一段时间没有接收到Job时,TaskManager将自动退出,释放资源
3.AM 启动完成以后,就会向RM申请container去启动TaskManager,启动的过程中也是首先从HDFS上去下载一些包含TaskManager(yarn模式的话这里就是YarnTaskManager )主类 的jar和启动过程依赖的配置文件,如JobManager地址所在的文件,然后利用java cp的方式去启动YarnTaskManager ,一旦这些准备好,就可以接受任务了。这个和spark on yarn的yarn cluster模式其实差不多,也是分为两个部分,一个是准备工人和工具(spark是启动sc的过程,flink是初始化ENV的过程),另外一个就是给工人分配具体工作(都是执行具体的操作,action什么的触发)。
启动命令:
进程信息
** FlinkYarnSessionCli **
/home/hadoop/ym/jdk1.8.0_101/bin/java -Xmx512m -classpath /home/hadoop/ym/flink-1.1.3/lib/flink-dist_2.10-1.1.3.jar:/home/hadoop/ym/flink-1.1.3/lib/flink-python_2.10-1.1.3.jar:/home/hadoop/ym/flink-1.1.3/lib/log4j-1.2.17.jar:/home/hadoop/ym/flink-1.1.3/lib/slf4j-log4j12-1.7.7.jar::/home/hadoop/ym/hadoop-2.7.1/etc/hadoop: -Dlog.file=/home/hadoop/ym/flink-1.1.3/log/flink-xxxuser-yarn-session-db-180.photo.163.org.log -Dlog4j.configuration=file:/home/hadoop/ym/flink-1.1.3/conf/log4j-yarn-session.properties -Dlogback.configurationFile=file:/home/hadoop/ym/flink-1.1.3/conf/logback-yarn.xml org.apache.flink.yarn.cli.FlinkYarnSessionCli -j /home/hadoop/ym/flink-1.1.3/lib/flink-dist_2.10-1.1.3.jar -n 3 -jm 1024 -nm 1024 -st
** YarnApplicationMasterRunner **
/home/hadoop/ym/jdk1.8.0_101/bin/java -Xmx424M -Dlog.file=/home/hadoop/ym/hadoop-2.7.1/hadoop/nm/application_1480493133223_0009/container_1480493133223_0009_01_000001/jobmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnApplicationMasterRunner
**个YarnTaskManager **
/home/hadoop/ym/jdk1.8.0_101/bin/java -Xms424m -Xmx424m -XX:MaxDirectMemorySize=424m -Dlog.file=/home/hadoop/ym/hadoop-2.7.1/hadoop/nm/application_1480493133223_0009/container_1480493133223_0009_01_000003/taskmanager.log -Dlogback.configurationFile=file:./logback.xml -Dlog4j.configuration=file:./log4j.properties org.apache.flink.yarn.YarnTaskManager --configDir .
其他说明:
日志管理
FlinkYarnSessionCli的启动,由Client发起,归属于Flink管理,所以日志内容存储在Flink安装目录的log/
YarnSessionClusterEntrypoint的启动,又Yarn发起,归属于Yarn管理,所以日志内容存储在Yarn管理的目录/data/hadoop/yarn/log/…
进程管理
FlinkYarnSessionCli进程由Flink管理,YarnSessionClusterEntrypoint进程由Yarn管理
当不通过FlinkYarnSessionCli来stop YarnSessionClusterEntrypoint时,需要使用yarn application -kill …,但是这种方式无法清理由FlinkYarnSessionCli管理和控制的资源,如:/tmp/.yarn-properties-{user}
发起yarn application -kill …命令,请求停止Cluster时,会先停止TaskManager,然后停止JobManager,但是不会清理HDFS上的缓存
通过FlinkYarnSessionCli的interact模式,可以对*/tmp/.yarn-properties-{user}* & HDFS缓存统一进行清理
Job提交
这种模式下,Client将从本地查找/tmp/.yarn-properties-{user}配置,以获取applicationId来定位Cluster,所以Job提交最好是在FlinkYarnSessionCli的启动节点,否则需要指定applicationId
集群安装
on Yarn模式下,Flink只需要安装至 一个节点,因为后续的进程,都会从HDFS上获取Jar & Conf来进行启动
官网文档
The YARN client needs to access the Hadoop configuration to connect to the YARN resource manager and to HDFS. It determines the Hadoop configuration using the following strategy:
Test if YARN_CONF_DIR, HADOOP_CONF_DIR or HADOOP_CONF_PATH are set (in that order). If one of these variables are set, they are used to read the configuration.
If the above strategy fails (this should not be the case in a correct YARN setup), the client is using the HADOOP_HOME environment variable. If it is set, the client tries to access $HADOOP_HOME/etc/hadoop (Hadoop 2) and $HADOOP_HOME/conf (Hadoop 1).
When starting a new Flink YARN session, the client first checks if the requested resources (containers and memory) are available. After that, it uploads a jar that contains Flink and the configuration to HDFS (step 1).
The next step of the client is to request (step 2) a YARN container to start the ApplicationMaster (step 3). Since the client registered the configuration and jar-file as a resource for the container, the NodeManager of YARN running on that particular machine will take care of preparing the container (e.g. downloading the files). Once that has finished, the ApplicationMaster (AM) is started.
The JobManager and AM are running in the same container. Once they successfully started, the AM knows the address of the JobManager (its own host). It is generating a new Flink configuration file for the TaskManagers (so that they can connect to the JobManager). The file is also uploaded to HDFS. Additionally, the AM container is also serving Flink’s web interface. All ports the YARN code is allocating are ephemeral ports. This allows users to execute multiple Flink YARN sessions in parallel.
After that, the AM starts allocating the containers for Flink’s TaskManagers, which will download the jar file and the modified configuration from the HDFS. Once these steps are completed, Flink is set up and ready to accept Jobs.