转载--distributedShell和Unmanaged AM示例代码解析

示例执行

我使用ambari安装的hadoop环境,jar包在/usr/lib/hadoop-yarn中。 执行命令:
$ su hdfs $ hadoop jar hadoop-yarn-applications-distributedshell-2.2.0.2.0.6.0-101.jar org.apache.hadoop.yarn.applications.distributedshell.Client -jar hadoop-yarn-applications-distributedshell-2.2.0.2.0.6.0-101.jar -shell_command '/bin/date' -num_containers 10
需要切到hdfs用户,否则会有下面的错误提示:
15/05/05 09:17:42 INFO distributedshell.Client: Copy App Master jar from local filesystem and add to local environment 15/05/05 09:17:43 FATAL distributedshell.Client: Error running CLient org.apache.hadoop.security.AccessControlException: Permission denied: user=root, access=WRITE, inode="/user":hdfs:hdfs:drwxr-xr-x
原因是本地hdfs上的/user目录只对hdfs用户开放了写权限,root不可写。cloudera安装的时候可以选择所有服务使用同一个账户,不会存在权限的问题(但据说会造成安装变复杂)。 执行完后提示信息请见文章最后

client解析

AM,真正要做的任务由AM来调度。 Client的简化框架如下
public static void main(String[] args) { boolean result = false; try { Client client = new Client(); //1 创建Client对象 try { boolean doRun = client.init(args); //2 初始化 if (!doRun) { System.exit(0); } } result = client.run(); //3 运行 } if (result) { System.exit(0); } System.exit(2); }

  1. 创建Client对象
    创建时会指定本Client要用到的AM。 创建yarnClient。yarn将client与RM的交互抽象出了编程库YarnClient,用以应用程序提交、状态查询和控制等,简化应用程序。
    public Client(Configuration conf) throws Exception { this( //指定AM "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster", conf); Client(String appMasterMainClass, Configuration conf) { this.conf = conf; this.appMasterMainClass = appMasterMainClass; yarnClient = YarnClient.createYarnClient(); //创建yarnClient yarnClient.init(conf); opts = new Options(); //创建opts,后面解析参数的时候用 opts.addOption("appname", true, "Application Name. Default value - DistributedShell"); opts.addOption("priority", true, "Application Priority. Default 0"); }
  2. 初始化
    init会解析命令行传入的参数,例如使用的jar包、内存大小、cpu个数等。 代码里使用GnuParser解析:init时定义所有的参数opts(可以认为是一个模板),然后将opts和实际的args传入解析后得到一个CommnadLine对象,后面查询选项直接操作该CommnadLine对象即可,如cliParser.hasOption("help")和cliParser.getOptionValue("jar")。
    public boolean init(String[] args) throws ParseException { CommandLine cliParser = new GnuParser().parse(opts, args); amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10")); amVCores = Integer.parseInt(cliParser.getOptionValue("master_vcores", "1")); shellCommand = cliParser.getOptionValue("shell_command"); appMasterJar = cliParser.getOptionValue("jar"); ...
  3. 执行
  • 先启动yarnClient,会建立跟RM的RPC连接,之后就跟调用本地方法一样。通过此yarnClient查询NM个数、NM详细信息(ID/地址/Container个数等)、Queue info(其实没用到,示例里只是打印了下调试用)。
    public class Client { public boolean run() throws IOException, YarnException { yarnClient.start(); YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics(); List<NodeReport> clusterNodeReports = yarnClient.getNodeReports(
  • 收集提交AM所需的信息。
    YarnClientApplication app = yarnClient.createApplication(); //创建app GetNewApplicationResponse appResponse = app.getNewApplicationResponse(); ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext(); //AM需要的本地资源,如jar包、log文件 Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); FileSystem fs = FileSystem.get(conf); addToLocalResources(fs, appMasterJar, appMasterJarPath, appId.toString(), localResources, null); //添加localResource vargs.add(Environment.JAVA_HOME.$$() + "/bin/java"); vargs.add("-Xmx" + amMemory + "m"); vargs.add(appMasterMainClass); for (CharSequence str : vargs) { command.append(str).append(" "); //重新组织命令行 } //创建Container加载上下文,包含本地资源,环境变量,实际命令。 ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance( localResources, env, commands, null, null, null); Resource capability = Resource.newInstance(amMemory, amVCores); appContext.setResource(capability); //请求使用的内存、cpu appContext.setAMContainerSpec(amContainer); appContext.setQueue(amQueue);
    重新组织出来的commands如下:
    $JAVA_HOME/bin/java -Xmx10m org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster --container_memory 10
  • 提交AM(即appContext),并启动监控。 Client只关心自己提交到RM的AM是否正常运行,而AM内部的多个task,由AM管理。如果Client要查询应用程序的任务信息,需要自己设计与AM的交互。
    yarnClient.submitApplication(appContext); //客户端提交AM到RM return monitorApplication(appId);
    总的来说,Client做的事情比较简单,即建立与RM的连接,提交AM,监控AM运行状态。 > 有个疑问,走读代码没有看到jar包是怎么送到NM上去的。

Application Master解析

AM简化框架如下:
boolean doRun = appMaster.init(args); if (!doRun) { System.exit(0); } appMaster.run(); result = appMaster.finish();
yarn抽象了两个编程库,AMRMClient和NMClient(AM和RM都可以用),简化AM编程。

  • 设置RM、NM消息的异步处理方法
    AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler(); amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener); amRMClient.init(conf); amRMClient.start(); containerListener = createNMCallbackHandler(); nmClientAsync = new NMClientAsyncImpl(containerListener); nmClientAsync.init(conf); nmClientAsync.start();
  • 向RM注册
    RegisterApplicationMasterResponse response = amRMClient.registerApplicationMaster(appMasterHostname, appMasterRpcPort, appMasterTrackingUrl);
  • 计算需要的Container,向RM发起请求
    // Setup ask for containers from RM // Send request for containers to RM // Until we get our fully allocated quota, we keep on polling RM for // containers // Keep looping until all the containers are launched and shell script // executed on them ( regardless of success/failure). for (int i = 0; i < numTotalContainersToRequest; ++i) { ContainerRequest containerAsk = setupContainerAskForRM(); amRMClient.addContainerRequest(containerAsk); //请求指定个数的Container } private ContainerRequest setupContainerAskForRM() { Resource capability = Resource.newInstance(containerMemory, containerVirtualCores); //指定需要的memory/cpu能力 ContainerRequest request = new ContainerRequest(capability, null, null, pri);
    好吧,先假设上面的addContainerRequest会向RM发送请求。对于AM来说,接下来就是等待RM回消息告知分配的Container。 > Q:注释里说这里会一直循环,怎么理解?按说发起Container请求以后,异步等待RM的应答,在相应的处理中加载任务(前面已经注册了AMRM的回调方法)就行了
  • RM分配Container给AM,AM启动任务
    RMCallbackHandler RM消息的响应,由RMCallbackHandler处理。示例中主要对前两种消息 进行了处理。
    private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler { //处理消息:Container执行完毕。在RM返回的心跳应答中携带。如果心跳应答中有已完成和新分配两种Container,先处理已完成 public void onContainersCompleted(List<ContainerStatus> completedContainers) { //处理消息:RM新分配Container。在RM返回的心跳应答中携带 public void onContainersAllocated(List<Container> allocatedContainers) { public void onShutdownRequest() {done = true;} //节点状态变化 public void onNodesUpdated(List<NodeReport> updatedNodes) {} public float getProgress() {
    onContainersAllocated收到分配的Container之后,会提交任务到NM。
    public void onContainersAllocated(List<Container> allocatedContainers) { LaunchContainerRunnable runnableLaunchContainer =new LaunchContainerRunnable(allocatedContainer, containerListener); //创建runnable容器 Thread launchThread = new Thread(runnableLaunchContainer); //新建线程 // launch and start the container on a separate thread to keep // the main thread unblocked // as all containers may not be allocated at one go. launchThreads.add(launchThread); launchThread.start(); //线程中提交Container到NM,不影响主流程
    简单分析下LaunchContainerRunnable。该类实现自Runnable,其run方法准备任务命令(本例即为date)。

private class LaunchContainerRunnable implements Runnable { public LaunchContainerRunnable( Container lcontainer, NMCallbackHandler containerListener) { this.container = lcontainer; this.containerListener = containerListener; } public void run() { vargs.add(shellCommand); //待执行的shell命令 vargs.add(shellArgs); //shell命令参数 List<String> commands = new ArrayList<String>(); commands.add(command.toString()); ContainerLaunchContext ctx = ContainerLaunchContext.newInstance( localResources, shellEnv, commands, null, allTokens.duplicate(), null); containerListener.addContainer(container.getId(), container); nmClientAsync.startContainerAsync(container, ctx); //异步启动Container
onContainersCompleted的功能比较简单,收到Container执行完毕的消息,检查其执行结果,如果执行失败,则重新发起请求,直到全部完成。

  • NMCallbackHandler NM消息的响应,由NMCallbackHandler处理。
    在distShell示例里,回调句柄对NM通知过来的各种事件的处理比较简单,只是修改AM维护的Container执行完成、失败的个数。这样等到有Container执行完毕后,可以重启发起请求。失败处理和上面Container执行完毕消息的处理类似,达到了上面问题里所说的loopback效果。
    static class NMCallbackHandler implements NMClientAsync.CallbackHandler { @Override public void onContainerStopped(ContainerId containerId) { @Override public void onContainerStatusReceived(ContainerId containerId, @Override public void onContainerStarted(ContainerId containerId,
    总的来说,AM做的事就是向RM/NM注册回调函数,然后请求Container;得到Container后提交任务,并跟踪这些任务的执行情况,如果失败了则重新提交,直到全部任务完成。

UnmanagedAM

distShell的Client提交AM到RM后,由RM将AM分配到某一个NM上的Container,这样给AM调试带来了困难。yarn提供了一个参数,Client可以设置为Unmanaged,提交AM后,会在客户端本地起一个单独的进程来运行AM。

public class UnmanagedAMLauncher { public void launchAM(ApplicationAttemptId attemptId){ Process amProc = Runtime.getRuntime().exec(amCmd, envAMList.toArray(envAM)); try { int exitCode = amProc.waitFor(); //等待AM进程结束 } finally { amCompleted = true; } } public boolean run() throws IOException, YarnException { appContext.setUnmanagedAM(true); //设置为Unmanaged rmClient.submitApplication(appContext); //提交AM ApplicationReport appReport = monitorApplication(appId, EnumSet.of(YarnApplicationState.ACCEPTED, YarnApplicationState.KILLED, YarnApplicationState.FAILED, YarnApplicationState.FINISHED)); if (appReport.getYarnApplicationState() ==YarnApplicationState.ACCEPTED) { launchAM(attemptId); } } }
附:命令执行输出
15/05/05 09:12:22 INFO distributedshell.Client: Initializing Client 15/05/05 09:12:22 INFO distributedshell.Client: Running Client 15/05/05 09:12:23 INFO client.RMProxy: Connecting to ResourceManager at ty11.dtdream.com/10.168.250.59:8050 15/05/05 09:12:23 INFO distributedshell.Client: Got Cluster metric info from ASM, numNodeManagers=3 15/05/05 09:12:23 INFO distributedshell.Client: Got Cluster node info from ASM 15/05/05 09:12:23 INFO distributedshell.Client: Got node report from ASM for, nodeId=ty11.dtdream.com:45454, nodeAddressty11.dtdream.com:8042, nodeRackName/default-rack, nodeNumContainers0 15/05/05 09:12:23 INFO distributedshell.Client: Got node report from ASM for, nodeId=ty10.dtdream.com:45454, nodeAddressty10.dtdream.com:8042, nodeRackName/default-rack, nodeNumContainers0 15/05/05 09:12:23 INFO distributedshell.Client: Got node report from ASM for, nodeId=ty12.dtdream.com:45454, nodeAddressty12.dtdream.com:8042, nodeRackName/default-rack, nodeNumContainers0 15/05/05 09:12:23 INFO distributedshell.Client: Queue info, queueName=default, queueCurrentCapacity=0.0, queueMaxCapacity=1.0, queueApplicationCount=0, queueChildQueueCount=0 15/05/05 09:12:23 INFO distributedshell.Client: User ACL Info for Queue, queueName=root, userAcl=SUBMIT_APPLICATIONS 15/05/05 09:12:23 INFO distributedshell.Client: User ACL Info for Queue, queueName=root, userAcl=ADMINISTER_QUEUE 15/05/05 09:12:23 INFO distributedshell.Client: User ACL Info for Queue, queueName=default, userAcl=SUBMIT_APPLICATIONS 15/05/05 09:12:23 INFO distributedshell.Client: User ACL Info for Queue, queueName=default, userAcl=ADMINISTER_QUEUE 15/05/05 09:12:23 INFO distributedshell.Client: Max mem capabililty of resources in this cluster 4096 15/05/05 09:12:23 INFO distributedshell.Client: Copy App Master jar from local filesystem and add to local environment 15/05/05 09:12:23 INFO distributedshell.Client: Set the environment for the application master 15/05/05 09:12:23 INFO distributedshell.Client: Setting up app master command 15/05/05 09:12:23 INFO distributedshell.Client: Completed setting up app master command $JAVA_HOME/bin/java -Xmx10m org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster --container_memory 10 --num_containers 10 --priority 0 --shell_command /bin/date 1><LOG_DIR>/AppMaster.stdout 2><LOG_DIR>/AppMaster.stderr 15/05/05 09:12:23 INFO distributedshell.Client: Submitting application to ASM 15/05/05 09:12:23 INFO impl.YarnClientImpl: Submitted application application_1430207548681_0011 to ResourceManager at ty11.dtdream.com/10.168.250.59:8050 15/05/05 09:12:24 INFO distributedshell.Client: Got application report from ASM for, appId=11, clientToAMToken=null, appDiagnostics=, appMasterHost=N/A, appQueue=default, appMasterRpcPort=-1, appStartTime=1430788343925, yarnAppState=ACCEPTED, distributedFinalState=UNDEFINED, appTrackingUrl=http://ty11.dtdream.com:8088/proxy/application_1430207548681_0011/, appUser=hdfs 15/05/05 09:12:25 INFO distributedshell.Client: Got application report from ASM for, appId=11, clientToAMToken=null, appDiagnostics=, appMasterHost=ty10.dtdream.com/10.252.142.223, appQueue=default, appMasterRpcPort=-1, appStartTime=1430788343925, yarnAppState=RUNNING, distributedFinalState=UNDEFINED, appTrackingUrl=http://ty11.dtdream.com:8088/proxy/application_1430207548681_0011/, appUser=hdfs ... 15/05/05 09:12:32 INFO distributedshell.Client: Got application report from ASM for, appId=11, clientToAMToken=null, appDiagnostics=, appMasterHost=ty10.dtdream.com/10.252.142.223, appQueue=default, appMasterRpcPort=-1, appStartTime=1430788343925, yarnAppState=FINISHED, distributedFinalState=SUCCEEDED, appTrackingUrl=http://ty11.dtdream.com:8088/proxy/application_1430207548681_0011/, appUser=hdfs 15/05/05 09:12:32 INFO distributedshell.Client: Application has completed successfully. Breaking monitoring loop 15/05/05 09:12:32 INFO distributedshell.Client: Application completed successfully

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,602评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,442评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,878评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,306评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,330评论 5 373
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,071评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,382评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,006评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,512评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,965评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,094评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,732评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,283评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,286评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,512评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,536评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,828评论 2 345

推荐阅读更多精彩内容