MRAppMaster
MRAppMaster是MapReduce的ApplicationMaster实现,它使得MapReduce计算框架可以运行于YARN之上。在YARN中,MRAppMaster负责管理MapReduce作业的生命周期,包括创建MapReduce作业,向ResourceManager申请资源,与NodeManage通信要求其启动Container,监控作业的运行状态,当任务失败时重新启动任务等。
YARN使用了基于事件驱动的异步编程模型,它通过事件将各个组件联系起来,并由一个中央事件调度器统一将各种事件分配给对应的事件处理器。在YARN中,每种组件是一种事件处理器,MRAppMaster在整个MapReduce任务中负责管理整个任务的生命周期。它是一个独立的进程org.apache.hadoop.mapreduce.v2.app.MRAppMaster#main,由AppClient向Yarn申请Container后启动。MRAppMaster是MapReduce对ApplicationMaster的实现,它让MapReduce任务能运行在Yarn上。它主要作用在于管理作业的生命周期:
- 作业的管理:作业的创建,初始化以及启动等
- 向RM申请资源和再分配资源
- Container的启动与释放
- 监控作业运行状态
- 作业恢复
当MRAppMaster启动时,它们会以服务的形式注册到MRAppMaster的中央事件调度器上,并告诉调度器它们处理的事件类型,这样,当出现某一种事件时,MRAppMaster会查询<事件,事件处理器>表,并将该事件分配给对应的事件处理器。
接下来,我们分别介绍MRAppMaster各种组件/服务的功能。
ContainerAllocator: 与ResourceManager通信,为作业申请资源。作业的每个任务资源需求可描述为四元组<Priority, hostname,capability,containers>,分别表示作业优先级、期望资源所在的host,资源量(当前仅支持内存),container数目。ContainerAllocator周期性通过RPC与ResourceManager通信,而ResourceManager会为之返回已经分配的container列表,完成的container列表等信息。
ClientService: ClientService是一个接口,由MRClientService实现。MRClientService实现了MRClientProtocol协议,客户端可通过该协议获取作业的执行状态(而不必通过ResourceManager)控制作业(比如杀死作业等)。
Job
表示一个MapReduce作业,与MRv1的JobInProgress功能一样,负责监控作业的运行状态。它维护了一个作业状态机,以实现异步控制各种作业操作。Task
表示一个MapReduce作业中的某个任务,与MRv1中的TaskInProgress功能类似,负责监控一个任务的运行状态。它维护了一个任务状态机,以实现异步控制各种任务操作。TaskAttempt:
表示一个任务运行实例,同MRv1中的概念一样。Speculator:完成推测执行功能。当一个任务运行速度明显慢于其他任务时,Speculator会为该任务启动一个备份任务,让其同慢任务一同处理同一份数据,谁先计算完成则将谁的结果作为最终结果,另一个任务将被杀掉。该机制可有效防止“拖后腿”任务拖慢整个作业的执行进度。
ContainerLauncher
与NodeManager通信,要求其启动一个Container。当ResourceManager为作业分配资源后,ContainerLauncher会将资源信息封装成container,包括任务运行所需资源、任务运行命令、任务运行环境、任务依赖的外部文件等,然后与对应的节点通信,要求其启动container。TaskAttemptListener
管理各个任务的心跳信息,如果一个任务一段时间内未汇报心跳,则认为它死掉了,会将其从系统中移除。同MRv1中的TaskTracker类似,它实现了TaskUmbilicalProtocol协议,任务会通过该协议汇报心跳,并询问是否能够提交最终结果。JobHistoryEventHandler
对作业的各个事件记录日志,比如作业创建、作业开始运行、一个任务开始运行等,这些日志会被写到HDFS的某个目录下,这对于作业恢复非常有用。当MRAppMaster出现故障时,YARN会将其重新调度到另外一个节点上,为了避免重新计算,MRAppMaster首先会从HDFS上读取上次运行产生的运行日志,以恢复已经运行完成的任务,进而能够只运行尚未运行完成的任务。Recovery
当一个MRAppMaster故障后,它将被调度到另外一个节点上重新运行,为了避免重新计算,MRAppMaster首先会从HDFS上读取上次运行产生的运行日志,并恢复作业运行状态。
MRAppMaster工作流程
- 用户向YARN中(RM)提交应用程序,其中包括ApplicationMaster程序、启动ApplicationMaster的命令、用户程序等。
- ResourceManager为该应用程序分配第一个Container,ResouceManag与某个NodeManager通信,启动应用程序ApplicationMaster,NodeManager接到命令后,首先从HDFS上下载文件(缓存),然后启动ApplicationMaser。
- 当ApplicationMaster启动后,它与ResouceManager通信,以请求和获取资源。ApplicationMaster获取到资源后,与对应NodeManager通信以启动任务。
( 如果该应用程序第一次在节点上启动任务,则NodeManager首先从HDFS上下载文件缓存到本地,然后启动该任务。) - ApplicationMaster首先向ResourceManager注册,这样用户可以直接通过ResourceManage查看应用程序的运行状态,然后它将为各个任务申请资源,并监控它们的运行状态,直到运行结束,即重复步骤5~8
- ApplicationMaster采用轮询的方式通过RPC协议向ResourceManager申请和领取资源
- 一旦ApplicationMaster申请到资源后,ApplicationMaster就会将启动命令交给NodeManager,要求它启动任务。启动命令里包含了一些信息使得Container可以与ApplicationMaster进行通信。
- NodeManager为任务设置好运行环境(包括环境变量、JAR包、二进制程序等)后,将任务启动命令写到一个脚本中,并通过运行该脚本启动任务(Container)
- 在应用程序运行过程中,用户可随时通过RPC向ApplicationMaster查询应用程序的当前运行状态
- 应用程序运行完成后,ApplicationMaster向ResourceManager注销并关闭自己
ContainerLauncher
ContainerLauncher负责与NodeManager通信,以启动一个container。在YARN中,运行Task所需的全部信息被封装到Container中,包括所需资源、依赖的外部文件、jar包、运行时环境变量、运行命令等。ContainerLauncher通过ContainerManager协议与NodeManager通信,该协议定义了三个RPC接口,具体如下:
tartContainerResponse startContainer(StartContainerRequest request)
throws YarnRemoteException;//启动一个container
StopContainerResponse stopContainer(StopContainerRequest request)
throws YarnRemoteException;//停止一个container
GetContainerStatusResponse getContainerStatus(
GetContainerStatusRequest request) throws YarnRemoteException;//获取一个container运行情况
ContainerLauncher是一个接口,它定义了2种事件:
CONTAINER_REMOTE_LAUNCH 启动一个container。当ContainerAllocator为某个任务申请到资源后,会将运行该任务相关的所有信息封装到container中,并要求对应的节点启动该container
CONTAINER_REMOTE_CLEANUP 停止/杀死一个container。存在多种可能触发该事件的行为,常见的有,1)推测执行时一个任务运行完成,则需杀死另一个同输入数据的任务 2)用户发送一个杀死任务请求 3)任意一个任务运行结束时,YARN会触发一个杀死任务的命令,以便结束对应进程。
ContainerLauncher接口由ContainerLauncherImpl类实现,它是一个服务,接收和处理来自事件调度器发送过来的CONTAINER_REMOTE_LAUNCH和CONTAINER_REMOTE_CLEANUP两种事件,它采用了队列+线程池的方式异步并行处理这两种事件。
对于CONTAINER_REMOTE_LAUNCH事件,它会调用Container.launch()函数与对应的NodeManager通信,以启动container,代码如下:
proxy = getCMProxy(containerID, containerMgrAddress,
containerToken);//构造一个RPC client
ContainerLaunchContext containerLaunchContext =
event.getContainer();
StartContainerRequest startRequest = Records
.newRecord(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext);
StartContainerResponse response = proxy.startContainer(startRequest);//调用RPC函数,获取返回值
对于CONTAINER_REMOTE_CLEANUP事件,它会调用Container. kill()函数与对应的NodeManager通信,以杀死一个container,代码如下:
proxy = getCMProxy(this.containerID, this.containerMgrAddress,
this.containerToken);
StopContainerRequest stopRequest = Records
.newRecord(StopContainerRequest.class);
stopRequest.setContainerId(this.containerID);
proxy.stopContainer(stopRequest);
总之,ContainerLauncherImpl是一个非常简单的服务,其最核心的代码组织方式是队列+进程池,以处理事件调度器发送过来的CONTAINER_REMOTE_LAUNCH和CONTAINER_REMOTE_CLEANUP两种事件。
作业恢复
在MRAppMaster中,记录日志是由服务JobHistoryEventHandler完成的,而作业恢复是由服务RecoveryService完成的。
同MRv1一样,MRv2也会对一些关键的时间记录日志,这主要有两个作用:(1)方便用户查看历史作业运行信息 (2)作业因故障重新启动后,可根据日志信息恢复之前已经运行完成的任务,以减少重新计算代价。
MRAppMaster采用的日志格式与MRv1一样,但有两个小的改动:
- 实现方式不同。MRv1采用了同步记录日志的方式,也就是说,每发生一个行为,会记录一次日志,然后才可以执行下面的代码。由于YARN引入了基于事件的异步编程模型,因此,MRAppMaster也采用了异步方式记录日志。
- 存储位置不同。尽管MRv1允许用户将作业日志存放到HDFS上,但默认是存储到本地的,MRAppMaster则不同,它直接将日志写到HDFS上,这样,当MRAppMaster失败后,另一个MRAppMaster启动时,可直接读取HDFS中上一个作业产生的日志,以恢复已经运行完成的任务。
作业恢复的过程是重新解析作业日志,以恢复各个任务运行状态的过程(重做日志),这是由RecoveryService完成的。如果用户将yarn.app.mapreduce.am.job.recovery.enable参数置为true(默认就是true),则MRAppMaster运行作业之前,首先会检查这是否是第一次运行该作业,如果不是,则从HDFS上读取上次运行的作业日志,并恢复作业的运行状态,然后才会按照正常流程执行。
public void init(final Configuration conf) {
boolean recoveryEnabled = conf.getBoolean(
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
boolean recoverySupportedByCommitter = committer.isRecoverySupported();
if (recoveryEnabled && recoverySupportedByCommitter
&& appAttemptID.getAttemptId() > 1) {
LOG.info("Recovery is enabled. "
+ "Will try to recover from previous life on best effort basis.");
recoveryServ = createRecoveryService(context);
addIfService(recoveryServ);
dispatcher = recoveryServ.getDispatcher();
clock = recoveryServ.getClock();
inRecovery = true;
}
}
public void start() {
if (inRecovery) {
completedTasksFromPreviousRun = recoveryServ.getCompletedTasks();
amInfos = recoveryServ.getAMInfos();
}
……
}
生命周期
我们知道MRAppMaster中每个job,若干干Map Task和Reduce Task组成,每个Task进一步由若干个TaskAttempt组成,Job、Task和TaskAttempt的生命周期均由一个状态机表示,
作业的创建入口在MRAppMaster类中,如下所示:
public class MRAppMaster extends CompositeService {
public void start() {
...
job = createJob(getConfig());//创建Job
JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
jobEventDispatcher.handle(initJobEvent);//发送JOB_INI,创建MapTask,ReduceTask
startJobs();//启动作业,这是后续一切动作的触发之源
...
}
protected Job createJob(Configuration conf) {
Job newJob =
new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
completedTasksFromPreviousRun, metrics, committer, newApiCommitter,
currentUser.getUserName(), appSubmitTime, amInfos, context);
((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
dispatcher.register(JobFinishEvent.Type.class,
createJobFinishEventHandler());
return newJob;
}
}
JobImpl会接收到.JOB_INIT事件,然后触发作业状态从NEW变为INITED,并触发函数InitTransition(),该函数会创建MapTask和 ReduceTask,
public static class InitTransition
implements MultipleArcTransition<JobImpl, JobEvent, JobState> {
...
createMapTasks(job, inputLength, taskSplitMetaInfo);
createReduceTasks(job);
...
}
其中,createMapTasks函数实现如下:
private void createMapTasks(JobImpl job, long inputLength,
TaskSplitMetaInfo[] splits) {
for (int i=0; i < job.numMapTasks; ++i) {
TaskImpl task =
new MapTaskImpl(job.jobId, i,
job.eventHandler,
job.remoteJobConfFile,
job.conf, splits[i],
job.taskAttemptListener,
job.committer, job.jobToken, job.fsTokens,
job.clock, job.completedTasksFromPreviousRun,
job.applicationAttemptId.getAttemptId(),
job.metrics, job.appContext);
job.addTask(task);
}
}
作业启动
public class MRAppMaster extends CompositeService {
protected void startJobs() {
JobEvent startJobEvent = new JobEvent(job.getID(), JobEventType.JOB_START);
dispatcher.getEventHandler().handle(startJobEvent);
}
}
JobImpl会接收到.JOB_START事件,会触发作业状态从INITED变为RUNNING,并触发函数StartTransition(),进而触发Map Task和Reduce Task开始调:
public static class StartTransition
implements SingleArcTransition<JobImpl, JobEvent> {
public void transition(JobImpl job, JobEvent event) {
job.scheduleTasks(job.mapTasks);
job.scheduleTasks(job.reduceTasks);
}
}
这之后,所有Map Task和Reduce Task各自负责各自的状态变化,ContainerAllocator模块会首先为Map Task申请资源,然后是Reduce Task,一旦一个Task获取到了资源,则会创建一个运行实例TaskAttempt,如果该实例运行成功,则Task运行成功,否则,Task还会启动下一个运行实例TaskAttempt,直到一个TaskAttempt运行成功或者达到尝试次数上限。当所有Task运行成功后,Job运行成功。一个运行成功的任务所经历的状态变化如下(不包含失败或者被杀死情况):