The YARN client needs to access the Hadoop configuration to connect to the YARN resource manager and to HDFS. It determines the Hadoop configuration using the following strategy:
Test if YARN_CONF_DIR, HADOOP_CONF_DIR or HADOOP_CONF_PATH are set (in that order). If one of these variables are set, they are used to read the configuration.
If the above strategy fails (this should not be the case in a correct YARN setup), the client is using the HADOOP_HOME environment variable. If it is set, the client tries to access $HADOOP_HOME/etc/hadoop (Hadoop 2) and $HADOOP_HOME/conf (Hadoop 1).
When starting a new Flink YARN session, the client first checks if the requested resources (containers and memory) are available. After that, it uploads a jar that contains Flink and the configuration to HDFS (step 1).
The next step of the client is to request (step 2) a YARN container to start the ApplicationMaster (step 3). Since the client registered the configuration and jar-file as a resource for the container, the NodeManager of YARN running on that particular machine will take care of preparing the container (e.g. downloading the files). Once that has finished, the ApplicationMaster (AM) is started.
The JobManager and AM are running in the same container. Once they successfully started, the AM knows the address of the JobManager (its own host). It is generating a new Flink configuration file for the TaskManagers (so that they can connect to the JobManager). The file is also uploaded to HDFS. Additionally, the AM container is also serving Flink’s web interface. All ports the YARN code is allocating are ephemeral ports. This allows users to execute multiple Flink YARN sessions in parallel.
After that, the AM starts allocating the containers for Flink’s TaskManagers, which will download the jar file and the modified configuration from the HDFS. Once these steps are completed, Flink is set up and ready to accept Jobs.
编写Yarn Application
接口
Following are the important interfaces:
Client<-->ResourceManager
By using YarnClient objects.ApplicationMaster<-->ResourceManager
By using AMRMClientAsync objects, handling events asynchronously by AMRMClientAsync.CallbackHandlerApplicationMaster<-->NodeManager
Launch containers. Communicate with NodeManagers by using NMClientAsync objects, handling container events by NMClientAsync.CallbackHandler
Note
The three main protocols for YARN application (ApplicationClientProtocol, ApplicationMasterProtocol and ContainerManagementProtocol) are still preserved. The 3 clients wrap these 3 protocols to provide simpler programming model for YARN applications.
Under very rare circumstances, programmer may want to directly use the 3 protocols to implement an application. However, note that such behaviors are no longer encouraged for general use cases.
1.Writing a Simple Yarn Application
The first step that a client needs to do is to initialize and start a YarnClient.
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
yarnClient.start();
Once a client is set up, the client needs to create an application, and get its application id.
YarnClientApplication app = yarnClient.createApplication();
GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
The response from the YarnClientApplication for a new application also contains information about the cluster such as the minimum/maximum resource capabilities of the cluster. This is required so that to ensure that you can correctly set the specifications of the container in which the ApplicationMaster would be launched. Please refer to GetNewApplicationResponse for more details.
ContainerLaunchContext: The information defining the container in which the AM will be launched and run. The ContainerLaunchContext, as mentioned previously, defines all the required information needed to run the application such as the local Resources (binaries, jars, files etc.), Environment settings (CLASSPATH etc.), the Command to be executed and security Tokens (RECT).
// set the application submission context
ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
ApplicationId appId = appContext.getApplicationId();
appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
appContext.setApplicationName(appName);
// set local resources for the application master
// local files or archives as needed
// In this scenario, the jar file for the application master is part of the local resources
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
LOG.info("Copy App Master jar from local filesystem and add to local environment");
// Copy the application master jar to the filesystem
// Create a local resource to point to the destination jar path
FileSystem fs = FileSystem.get(conf);
addToLocalResources(fs, appMasterJar, appMasterJarPath, appId.toString(),
localResources, null);
// Set the log4j properties if needed
if (!log4jPropFile.isEmpty()) {
addToLocalResources(fs, log4jPropFile, log4jPath, appId.toString(),
localResources, null);
}
// The shell script has to be made available on the final container(s)
// where it will be executed.
// To do this, we need to first copy into the filesystem that is visible
// to the yarn framework.
// We do not need to set this as a local resource for the application
// master as the application master does not need it.
String hdfsShellScriptLocation = "";
long hdfsShellScriptLen = 0;
long hdfsShellScriptTimestamp = 0;
if (!shellScriptPath.isEmpty()) {
Path shellSrc = new Path(shellScriptPath);
String shellPathSuffix =
appName + "/" + appId.toString() + "/" + SCRIPT_PATH;
Path shellDst =
new Path(fs.getHomeDirectory(), shellPathSuffix);
fs.copyFromLocalFile(false, true, shellSrc, shellDst);
hdfsShellScriptLocation = shellDst.toUri().toString();
FileStatus shellFileStatus = fs.getFileStatus(shellDst);
hdfsShellScriptLen = shellFileStatus.getLen();
hdfsShellScriptTimestamp = shellFileStatus.getModificationTime();
}
if (!shellCommand.isEmpty()) {
addToLocalResources(fs, null, shellCommandPath, appId.toString(),
localResources, shellCommand);
}
if (shellArgs.length > 0) {
addToLocalResources(fs, null, shellArgsPath, appId.toString(),
localResources, StringUtils.join(shellArgs, " "));
}
// Set the env variables to be setup in the env where the application master will be run
LOG.info("Set the environment for the application master");
Map<String, String> env = new HashMap<String, String>();
// put location of shell script into env
// using the env info, the application master will create the correct local resource for the
// eventual containers that will be launched to execute the shell scripts
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation);
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp));
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen));
// Add AppMaster.jar location to classpath
// At some point we should not be required to add
// the hadoop specific classpaths to the env.
// It should be provided out of the box.
// For now setting all required classpaths including
// the classpath to "." for the application jar
StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$$())
.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*");
for (String c : conf.getStrings(
YarnConfiguration.YARN_APPLICATION_CLASSPATH,
YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
classPathEnv.append(c.trim());
}
classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append(
"./log4j.properties");
// Set the necessary command to execute the application master
Vector<CharSequence> vargs = new Vector<CharSequence>(30);
// Set java executable command
LOG.info("Setting up app master command");
vargs.add(Environment.JAVA_HOME.$$() + "/bin/java");
// Set Xmx based on am memory size
vargs.add("-Xmx" + amMemory + "m");
// Set class name
vargs.add(appMasterMainClass);
// Set params for Application Master
vargs.add("--container_memory " + String.valueOf(containerMemory));
vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
vargs.add("--num_containers " + String.valueOf(numContainers));
vargs.add("--priority " + String.valueOf(shellCmdPriority));
for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
}
if (debugFlag) {
vargs.add("--debug");
}
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
// Get final command
StringBuilder command = new StringBuilder();
for (CharSequence str : vargs) {
command.append(str).append(" ");
}
LOG.info("Completed setting up app master command " + command.toString());
List<String> commands = new ArrayList<String>();
commands.add(command.toString());
// Set up the container launch context for the application master
ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
localResources, env, commands, null, null, null);
// Set up resource type requirements
// For now, both memory and vcores are supported, so we set memory and
// vcores requirements
Resource capability = Resource.newInstance(amMemory, amVCores);
appContext.setResource(capability);
// Service data is a binary blob that can be passed to the application
// Not needed in this scenario
// amContainer.setServiceData(serviceData);
// Setup security tokens
if (UserGroupInformation.isSecurityEnabled()) {
// Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce
Credentials credentials = new Credentials();
String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
if (tokenRenewer == null | | tokenRenewer.length() == 0) {
throw new IOException(
"Can't get Master Kerberos principal for the RM to use as renewer");
}
// For now, only getting tokens for the default file-system.
final Token<?> tokens[] =
fs.addDelegationTokens(tokenRenewer, credentials);
if (tokens != null) {
for (Token<?> token : tokens) {
LOG.info("Got dt for " + fs.getUri() + "; " + token);
}
}
DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
amContainer.setTokens(fsTokens);
}
appContext.setAMContainerSpec(amContainer);
After the setup process is complete, the client is ready to submit the application with specified priority and queue.
// Set the priority for the application master
Priority pri = Priority.newInstance(amPriority);
appContext.setPriority(pri);
// Set the queue to which this application is to be submitted in the RM
appContext.setQueue(amQueue);
// Submit the application to the applications manager
// SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);
yarnClient.submitApplication(appContext);
At this point, the RM will have accepted the application and in the background, will go through the process of allocating a container with the required specifications and then eventually setting up and launching the AM on the allocated container.
There are multiple ways a client can track progress of the actual task.
It can communicate with the RM and request for a report of the application via the getApplicationReport() method of YarnClient.
// Get application report for the appId we are interested in
ApplicationReport report = yarnClient.getApplicationReport(appId);
2.Writing a Simple Yarn Application
步骤
- 根据交互获得applicationAttempId
- AM初始化完成之后,开启两个client,一个是连接RM,另一个是连接NM
- AM通过心跳上传信息和运行状态给RM
- 根据注册的Response信息,判断集群资源是否充足
- 根据任务的需求,AM请求一批container开始运行任务,我们可以计算需要多少个conainter
- 在continaer请求的分配的request发送出去之后,container会异步的发布
- event handler会异步的报告application的进度
- contianers会通过在NMs部署container发布线程
- 当AM决定task已经结束之后,会通过AM-RM clien取消注册并且停止client
The AM is the actual owner of the job. It will be launched by the RM and via the client will be provided all the necessary information and resources about the job that it has been tasked with to oversee and complete.
AM是job真正的owner。它会被RM发布,通过client会提供所有job需要的信息和资源
,这些信息和资源都是job监督和完成所需要的。As the AM is launched within a container that may (likely will) be sharing a physical host with other containers, given the multi-tenancy nature, amongst other issues, it cannot make any assumptions of things like pre-configured ports that it can listen on.
由于Am是通过container与其他container一起发布到到物理机器上,在一个多租户的场景中,包括其他问题,它没有办法保证一些提前定义的事情:比如某个指定的端口要被监听等When the AM starts up, several parameters are made available to it via the environment. These include the ContainerId for the AM container, the application submission time and details about the NM (NodeManager) host running the ApplicationMaster. Ref ApplicationConstants for parameter names.
当AM启动,有多个参数会从环境中生成。这些包括AM continaer的Container ID,application提交的时间和运行了ApplicationMaster的Node Manager的一些细节。关联的ApplicationConstants指定了parameter名称。All interactions with the RM require an ApplicationAttemptId (there can be multiple attempts per application in case of failures). The ApplicationAttemptId can be obtained from the AM’s container id. There are helper APIs to convert the value obtained from the environment into objects.
所有与RM的交互都需要ApplicationAttemptId(每个application可以有多个attempts,以防如果失败了).ApplicationAttemptID可以从AM'scontainer id获得。提供了一些helper的api把环境中获得的值转换成objects。
//获取appAttemptID
Map<String, String> envs = System.getenv();
String containerIdString =
envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV);
if (containerIdString == null) {
// container id should always be set in the env by the framework
throw new IllegalArgumentException(
"ContainerId not set in the environment");
}
ContainerId containerId = ConverterUtils.toContainerId(containerIdString);
After an AM has initialized itself completely, we can start the two clients: one to ResourceManager, and one to NodeManagers. We set them up with our customized event handler, and we will talk about those event handlers in detail later in this article.
当AM已经完全初始化自己后,我们可以开启2个clinets:一个是针对ResourceManager,另一个是针对NodeManager。我们用自己自定义的event Handler创建它们,另外我们会在后面的章节中详细讲解event handler。
//开启连接RM的客户端
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
amRMClient.init(conf);
amRMClient.start();
//开启连接NM的客户端
containerListener = createNMCallbackHandler();
nmClientAsync = new NMClientAsyncImpl(containerListener);
nmClientAsync.init(conf);
nmClientAsync.start();
The AM has to emit heartbeats to the RM to keep it informed that the AM is alive and still running. The timeout expiry interval at the RM is defined by a config setting accessible via YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS with the default being defined by YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS. The ApplicationMaster needs to register itself with the ResourceManager to start heartbeating.
AM必须必通过心跳将AM存活并且在running的状态发送给RM。
ApplictionMaster需要在将它自己注册到ResourceManager,从心态开始。
// Register self with ResourceManager
// This will start heartbeating to the RM
appMasterHostname = NetUtils.getHostname();
RegisterApplicationMasterResponse response = amRMClient
.registerApplicationMaster(appMasterHostname, appMasterRpcPort,
appMasterTrackingUrl);
In the response of the registration, maximum resource capability if included. You may want to use this to check the application’s request.
在注册的response中,请求的最大资源如果是包含的。你可能希望使用这个去检查application的request
// Dump out information about cluster capability as seen by the
// resource manager
//获取集群最大的内存资源
int maxMem = response.getMaximumResourceCapability().getMemory();
LOG.info("Max mem capability of resources in this cluster " + maxMem);
//获取集群的最大的cpu资源
int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
LOG.info("Max vcores capability of resources in this cluster " + maxVCores);
//请求的资源不能超过集群最大的资源
// A resource ask cannot exceed the max.
if (containerMemory > maxMem) {
LOG.info("Container memory specified above max threshold of cluster."
+ " Using max value." + ", specified=" + containerMemory + ", max="
+ maxMem);
containerMemory = maxMem;
}
if (containerVirtualCores > maxVCores) {
LOG.info("Container virtual cores specified above max threshold of cluster."
+ " Using max value." + ", specified=" + containerVirtualCores + ", max="
+ maxVCores);
containerVirtualCores = maxVCores;
}
List<Container> previousAMRunningContainers =
response.getContainersFromPreviousAttempts();
LOG.info("Received " + previousAMRunningContainers.size()
+ " previous AM's running containers on AM registration.");
Based on the task requirements, the AM can ask for a set of containers to run its tasks on. We can now calculate how many containers we need, and request those many containers.
基于task的需求,AM可以请求一批(set)的containers运行task的任务。我们可以计算我们需要多少containers,并且申请这些containers。
List<Container> previousAMRunningContainers =
response.getContainersFromPreviousAttempts();
LOG.info("Received " + previousAMRunningContainers.size()
+ " previous AM's running containers on AM registration.");
int numTotalContainersToRequest =
numTotalContainers - previousAMRunningContainers.size();
// Setup ask for containers from RM
// Send request for containers to RM
// Until we get our fully allocated quota, we keep on polling RM for
// containers
// Keep looping until all the containers are launched and shell script
// executed on them ( regardless of success/failure).
//通过循环的方式不断向RM请求资源,直到所有的资源都已分配
for (int i = 0; i < numTotalContainersToRequest; ++i) {
ContainerRequest containerAsk = setupContainerAskForRM();
amRMClient.addContainerRequest(containerAsk);
}
In setupContainerAskForRM(), the follow two things need some set up:
Resource capability: Currently, YARN supports memory based resource requirements so the request should define how much memory is needed. The value is defined in MB and has to less than the max capability of the cluster and an exact multiple of the min capability. Memory resources correspond to physical memory limits imposed on the task containers. It will also support computation based resource (vCore), as shown in the code.
Priority: When asking for sets of containers, an AM may define different priorities to each set. For example, the Map-Reduce AM may assign a higher priority to containers needed for the Map tasks and a lower priority for the Reduce tasks’ containers.
private ContainerRequest setupContainerAskForRM() {
// setup requirements for hosts
// using * as any host will do for the distributed shell app
// set the priority for the request
Priority pri = Priority.newInstance(requestPriority);
// Set up resource type requirements
// For now, memory and CPU are supported so we set memory and cpu requirements
Resource capability = Resource.newInstance(containerMemory,
containerVirtualCores);
ContainerRequest request = new ContainerRequest(capability, null, null,
pri);
LOG.info("Requested container ask: " + request.toString());
return request;
}
After container allocation requests have been sent by the application manager, contailers will be launched asynchronously, by the event handler of the AMRMClientAsync client. The handler should implement AMRMClientAsync.CallbackHandler interface.
When there are containers allocated, the handler sets up a thread that runs the code to launch containers. Here we use the name LaunchContainerRunnable to demonstrate. We will talk about the LaunchContainerRunnable class in the following part of this article.
@Override
public void onContainersAllocated(List<Container> allocatedContainers) {
LOG.info("Got response from RM for container ask, allocatedCnt="
+ allocatedContainers.size());
numAllocatedContainers.addAndGet(allocatedContainers.size());
for (Container allocatedContainer : allocatedContainers) {
LaunchContainerRunnable runnableLaunchContainer =
new LaunchContainerRunnable(allocatedContainer, containerListener);
Thread launchThread = new Thread(runnableLaunchContainer);
// launch and start the container on a separate thread to keep
// the main thread unblocked
// as all containers may not be allocated at one go.
launchThreads.add(launchThread);
launchThread.start();
}
}
On heart beat, the event handler reports the progress of the application.
在心跳中,event handler会把进度报告给application
@Override
public float getProgress() {
// set progress to deliver to RM on next heartbeat
float progress = (float) numCompletedContainers.get()
/ numTotalContainers;
return progress;
}
The container launch thread actually launches the containers on NMs. After a container has been allocated to the AM, it needs to follow a similar process that the client followed in setting up the ContainerLaunchContext for the eventual task that is going to be running on the allocated Container. Once the ContainerLaunchContext is defined, the AM can start it through the NMClientAsync.
container运行线程,实际上运行在NMs的conainter上。在container已经分配到NM之后。
// Set the necessary command to execute on the allocated container
Vector<CharSequence> vargs = new Vector<CharSequence>(5);
// Set executable command
vargs.add(shellCommand);
// Set shell script path
if (!scriptPath.isEmpty()) {
vargs.add(Shell.WINDOWS ? ExecBatScripStringtPath
: ExecShellStringPath);
}
// Set args for the shell command if any
vargs.add(shellArgs);
// Add log redirect params
// 添加log的日志参数
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
// Get final command
StringBuilder command = new StringBuilder();
for (CharSequence str : vargs) {
command.append(str).append(" ");
}
List<String> commands = new ArrayList<String>();
commands.add(command.toString());
// Set up ContainerLaunchContext, setting local resource, environment,
// command and token for constructor.
// Note for tokens: Set up tokens for the container too. Today, for normal
// shell commands, the container in distribute-shell doesn't need any
// tokens. We are populating them mainly for NodeManagers to be able to
// download anyfiles in the distributed file-system. The tokens are
// otherwise also useful in cases, for e.g., when one is running a
// "hadoop dfs" command inside the distributed shell.
ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
localResources, shellEnv, commands, null, allTokens.duplicate(), null);
containerListener.addContainer(container.getId(), container);
nmClientAsync.startContainerAsync(container, ctx);
The NMClientAsync object, together with its event handler, handles container events. Including container start, stop, status update, and occurs an error.
After the ApplicationMaster determines the work is done, it needs to unregister itself through the AM-RM client, and then stops the client.
try {
amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
} catch (YarnException ex) {
LOG.error("Failed to unregister application", ex);
} catch (IOException e) {
LOG.error("Failed to unregister application", e);
}
amRMClient.stop();