1. ResourceManager
在yarn中, ResourceManager负责集群中所有资源的统一管理和分配,它接收来自各个NodeManager的资源汇报信息,并根据这些信息把集群里的资源按照一定的策略分配给各个应用程序。
ResourceManager主要由以下几个部分组成:
用户交互
YARN分别针对普通用户、管理员和web提供了三种对外服务,分别对应ClientRMService、WebApp:
- ClientRMService: ClientRMService是为普通用户提供的服务,它会处理来自客户端各种RPC请求,比如提交应用程序、终止应用程序,获取应用程序运行状态等。
- AdminService : YARN为管理员提供了一套独立的服务接口,以防止大量的普通用户请求使管理员发送的管理命令饿死,管理员可通过这些接口管理集群,比如动态更新节点列表,更新ACL列表,更新队列信息等。
- WebApp: 为了更加友好地展示集群资源使用情况和应用程序运行状态等信息,YARN对外提供了一个Web 界面.
NM管理
- NMLivelinessMonitor: 监控NM是否活着。如果一个NodeManager在一定时间(默认为10min)内未汇报心跳信息,则认为它死掉了,会将其从集群中移除。
- NodesListManager: 维护正常节点和异常节点列表,管理exlude(类似于黑名单)和inlude(类似于白名单)节点列表,这两个列表均是在配置文件中设置的,可以动态加载。
- ResourceTrackerService: 处理来自NodeManager的请求,主要包括两种请求:注册和心跳,其中,注册是NodeManager启动时发生的行为,请求包中包含节点ID,可用的资源上限等信息,而心跳是周期性 行为,包含各个Container运行状态,运行的Application列表、节点健康状况
AM管理
AMLivelinessMonitor: 监控AM是否活着,如果一个ApplicationMaster在一定时间(默认为10min)内未汇报心跳信息,则认为它死掉了。AM本身会被重新分配到另外一个节点上(用户可指定每个ApplicationMaster的尝试次数,默认是1次)执行。
ApplicationMasterLauncher:与NodeManager通信,要求它为某个应用程序启动ApplicationMaster。
ApplicationMasterService:处理来自ApplicationMaster的请求,主要包括两种请求:注册和心跳。其中,注册是ApplicationMaster启动时发生的行为,请求包中包含其所在节点,RPC端口号和tracking URL等信息。而心跳是周期性 行为,包含请求资源的类型描述、待释放的Container列表等
Application管理
- ApplicationACLsManager: 管理应用程序访问权限,包含两部分权限:查看和修改,查看主要指查看应用程序基本信息,而修改则主要是修改应用程序优先级、杀死应用程序等。
- RMAppManager: 管理应用程序的启动和关闭。
- ContainerAllocationExpirer: YARN不允许AM获得Container后长时间不对其使用,因为这会降低整个集群的利用率。当AM收到RM新分配的一个Container后,必须在一定的时间(默认为10min)内在对应的NM上启动该Container, 否则,RM会回收该Container。
ResourceScheduler
ResourceScheduler是资源调度器,它按照一定的约束条件(比如队列容量限制等)将集群中的资源分配给各个应用程序,ResourceScheduler是一个插拔式模块,默认是FIFO实现,YARN还提供了Fair Scheduler和Capacity Scheduler两个多租户调度器。
2. ContainerAllocator
ContainerAllocator负责与ResourceManager通信,为作业申请资源。作业的每个任务资源需求可描述为四元组<Priority, hostname,capability,containers>,分别表示作业优先级、期望资源所在的host,资源量(这里暂时只说内存),container数目。 比如:
<10, “node1”, “memory:1G”, 3>//优先级是一个正整数,优先级值越小,优先级越高
<2, “”, “memory:1G”, 20> //表示这样的资源可来自任意一个节点,即不考虑数据本地性
ContainerAllocator周期性通过心跳与ResourceManager通信,ResourceManager每次会返回已经分配的container列表,完成的container列表等信息。
ContainerAllocator工作流程
当用户提交作业之后,MRAppMaster会为之初始化,并创建一系列map task和reduce task,由于reduce task会依赖于map task的结果,所以reduce task会延后调度,在ContainerAllocator中,当map task数目完成一定比例(由mapreduce.job.reduce.slowstart.completedmaps指定,默认是0.05,即5%)且Reduce Task可允许占用的资源(Reduce Task可占用资源比由yarn.app.mapreduce.am.job.reduce.rampup.limit指定)能够折合成整数个任务时,才会调度Reduce Task。考虑到Map Task和Reduce Task之间的依赖关系,因此,它们之间的状态也是不一样的,对于Map Task而言,会依次转移到以下几个状态中:scheduled->assigned->completed,对于Reduce Task而言,则按照以下流程:pending->scheduled->assigned->completed
其中,pengding表示等待ContainerAllocator发送资源请求,scheduled表示已经将资源请求发送给RM,但还没有收到分配的资源,assigned是已经收到RM分配的资源。Reduce Task之所有多出一个pending,主要是为了根据Map Task情况调整Reduce Task状态(在pengding和scheduled中相互转移)。进一步说,这主要是为了防止Map Task饿死,因为在YARN中不再有map slot和reduce slot的概念(这两个概念从一定程度上减少了作业饿死的可能性)只有内存、CPU等真实的资源,需要由ApplicationMaster控制资源申请的顺序,以防止可能产生的作业饿死。此外,ContainerAllocator将所有任务划分成三类,分别是failed Map、Map和Reduce,并分别赋予它们优先级5、20和10,也就是说,当三种任务同时有资源需求是,会优先分配给failed map,然后是reduce,最后是map。
总结起来,ContainerAllocator工作流程如下:
步骤1 将所有map task的资源需求一次性发送给RM
步骤2 如果达到了Reduce task调度条件,则开始为Reduce Task申请资源。
步骤3 如果任务运行失败,则会重新为该任务申请资源。
步骤4 如果一个节点失败的任务数目过多,则会撤销对该节点的所有资源申请请求。
ContainerAllocator类图
ContainerAllocator实际上是一接口,它只定义了三个事件:CONTAINER_REQ,,CONTAINER_DEALLOCATE和CONTAINER_FAILED,分别表示请求container,释放container和container运行失败。
ContainerAllocator的实现是RMContainerAllocator,它只接收和处理ContainerAllocator接口中定义的三种事件,它的运行是这三种事件驱动的。
RMContainerAllocator中最核心的框架是维护了一个心跳信息,在RMCommunicator类中实现如下:
while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(rmPollInterval);
try {
heartbeat();
} catch (YarnException e) {
LOG.error("Error communicating with RM: " + e.getMessage() , e);
return;
} catch (Exception e) {
LOG.error("ERROR IN CONTACTING RM. ", e);
}
} catch (InterruptedException e) {
LOG.warn("Allocated thread interrupted. Returning.");
return;
}
}
protected synchronized void heartbeat() throws Exception {
LOG.info("Before Scheduling: " + getStat());
List<Container> allocatedContainers = getResources();
LOG.info("After Scheduling: " + getStat());
if (allocatedContainers.size() > 0) {
LOG.info("Before Assign: " + getStat());
scheduledRequests.assign(allocatedContainers);
LOG.info("After Assign: " + getStat());
}
……
}
其中,getResources()函数用于向RM发送心跳信息,并处理心跳应答。需要注意的是,有些情况下,心跳信息中并不包含新的资源请求信息,即空的心跳信息,这有以下几个作用
- 周期性发送心跳,告诉RM自己还活着。
- 周期性询问RM,以获取新分配的资源和各个container运行状况。
assign()函数是将收到的container分配给某个任务,如果这个container无法分配下去(比如内存空间不够),则是在下次心跳中通知RM释放该container,如果container可以分下去,则会释放对应任务的其他资源请求,同时会向TaskAttempt发送一个TA_ASSIGNED事件,以通知ContainerLauncher启动container。
推测执行机制
推测执行(Speculative Execution)是指在分布式集群环境下,因为程序bug、负载不均衡或者资源分布不均等原因,造成同一个job的多个task的执行速度不一致,有的task运行速度明显慢于其他task,(比如:一个job的某个task进度只有10%, 其他所有task已经执行完毕),这些task拖慢了作业的整体执行进度,为了避免这种情况发生,hadoop会为该task启动备份任务,让该speculative task和原始task同时处理一份数据,哪个先运行完,则将谁的结果作为最终结果。
推测执行优化机制采用了典型的以空间换时间的优化策略,它同时启动多个相同task处理相同的数据块,哪个完成的早,则采用哪个task的结果,这样可防止拖后腿Task任务出现,进而提高作业计算速度,但是,这样却会占用更多的资源。设计合理的推测执行机制可在多用少量资源情况下,减少大作业的计算时间。
YARN中采用的算法
当决定一个任务是否可以启动备份任务时,采用了下面的计算方法:
总是取speculationValue值最大的任务并为之启动备份任务,speculationValue计算方法为:
speculationValue= estimatedEndTime – estimatedReplacementEndTime
其中,estimatedEndTime是通过预测算法推测的该任务的最终完成时刻,计算方法为:
estimatedEndTime = estimatedRunTime + taskAttemptStartTime
其中,taskAttemptStartTime为该任务的启动时间,而estimatedRunTime为推测出来的任务运行时间,计算方法如下:
estimatedRunTime= (timestamp – start) / Math.max(0.0001, progress)
其中,timestamp为当前时刻,而start为任务开始运行时间,(timestamp-start)表示已经运行时间,progress为任务运行进度(0.0~1.0)。
estimatedReplacementEndTime含义为:如果此刻启动该任务,(可推测出来的)任务最终可能的完成时刻。很明显,如果estimatedReplacementEndTime大于estimatedEndTime,则没必要启动备份任务,因为即使启动了,它的完成时刻也会大于当前正在运行任务的完成时刻,只有当estimatedReplacementEndTime小于estimatedEndTime时,才有必要启动备份任务。而MRv2总是选择speculationValue值最大的任务并为之启动备份任务,且启动备份任务之前需检查是否满足以下条件:
- 每个任务最多只能有一个备份任务
- 已经完成的任务数目比例不小于MINIMUM_COMPLETE_PROPORTION_TO_SPECULATE(0.05,即5%),只有这样,才能有足够的历史信息估算estimatedReplacementEndTime
estimatedReplacementEndTime计算过方法为:{当前时刻}+{已经成功运行完成的任务所使用的平均运行时间}。
简单提炼一下核心思想:在某一时刻,判断一个任务是否拖后腿或者是否是值得为其运行备份任务时,则首先假设为其启动一个备份任务,那我们估算一下它的完成时间estimatedReplacementEndTime,同样,如果按照此刻该任务的计算速度,我们可以估算一下该任务最有可能的完成时间estimatedEndTime,如果estimatedEndTime与estimatedReplacementEndTime之差越大,则表明为该任务启动备份任务的价值越大。
Speculator是一个服务,它由DefaultSpeculator实现,DefaultSpeculator每隔一段事件会扫描一次所有正在运行的任务,如果一个任务可以启动备份任务,则会向Task发出一个T_ADD_SPEC_ATTEMPT事件,以启动另外一个任务实例。