The following table summarizes terms you’ll see used to refer to cluster concepts:
Term Meaning
Application User program built on Spark. Consists of a driver program and executors on the cluster.
Application jar A jar containing the user's Spark application. In some cases users will want to create an "uber jar" containing their application along with its dependencies. The user's jar should never include Hadoop or Spark libraries, however, these will be added at runtime.
Driver program The process running the main() function of the application and creating the SparkContext
Cluster manager An external service for acquiring resources on the cluster (e.g. standalone manager, Mesos, YARN)
Deploy mode Distinguishes where the driver process runs. In "cluster" mode, the framework launches the driver inside of the cluster. In "client" mode, the submitter launches the driver outside of the cluster.
Worker node Any node that can run application code in the cluster
Executor A process launched for an application on a worker node, that runs tasks and keeps data in memory or disk storage across them. Each application has its own executors.
Task A unit of work that will be sent to one executor
Job A parallel computation consisting of multiple tasks that gets spawned in response to a Spark action (e.g. save, collect); you'll see this term used in the driver's logs.
Stage Each job gets divided into smaller sets of tasks called stages that depend on each other (similar to the map and reduce stages in MapReduce); you'll see this term used in the driver's logs.
spark://HOST:PORT Connect to the given Spark standalone cluster master. The port must be whichever one your master is configured to use, which is 7077 by default.
In general, configuration values explicitly set on a SparkConf take the highest precedence, then flags passed to spark-submit, then values in the defaults file.
Note that JARs and files are copied to the working directory for each SparkContext on the executor nodes. This can use up a significant amount of space over time and will need to be cleaned up. With YARN, cleanup is handled automatically, and with Spark standalone, automatic cleanup can be configured with the spark.worker.cleanup.appDataTtl property.
spark.driver.memory 1g Amount of memory to use for the driver process, i.e. where SparkContext is initialized. (e.g. 1g, 2g).
Note: In client mode, this config must not be set through the SparkConf directly in your application, because the driver JVM has already started at that point. Instead, please set this through the --driver-memory command line option or in your default properties file.
spark.driver.extraClassPath spark.driver.extraJavaOptions spark.driver.extraLibraryPath
Container的一些基本概念和工作流程如下:
(1) Container是YARN中资源的抽象,它封装了某个节点上一定量的资源(CPU和内存两类资源)。它跟Linux Container没有任何关系,仅仅是YARN提出的一个概念(从实现上看,可看做一个可序列化/反序列化的Java类)。
(2) Container由ApplicationMaster向ResourceManager申请的,由ResouceManager中的资源调度器异步分配给ApplicationMaster;
(3) Container的运行是由ApplicationMaster向资源所在的NodeManager发起的,Container运行时需提供内部执行的任务命令(可以使任何命令,比如java、Python、C++进程启动命令均可)以及该命令执行所需的环境变量和外部资源(比如词典文件、可执行文件、jar包等)。
另外,一个应用程序所需的Container分为两大类,如下:
(1) 运行ApplicationMaster的Container:这是由ResourceManager(向内部的资源调度器)申请和启动的,用户提交应用程序时,可指定唯一的ApplicationMaster所需的资源;
(2) 运行各类任务的Container:这是由ApplicationMaster向ResourceManager申请的,并由ApplicationMaster与NodeManager通信以启动之。
以上两类Container可能在任意节点上,它们的位置通常而言是随机的,即ApplicationMaster可能与它管理的任务运行在一个节点上。
Container是YARN中最重要的概念之一,懂得该概念对于理解YARN的资源模型至关重要,希望本文对学习Container这一概念有所帮助。
在学习Container之前,大家应先了解YARN的基本架构、工作流程。比如,大家应该了解一个应用程序的运行过程如下:
步骤1:用户将应用程序提交到ResourceManager上;
步骤2:ResourceManager为应用程序ApplicationMaster申请资源,并与某个NodeManager通信,以启动ApplicationMaster;
步骤3:ApplicationMaster与ResourceManager通信,为内部要执行的任务申请资源,一旦得到资源后,将于NodeManager通信,以启动对应的任务。
步骤4:所有任务运行完成后,ApplicationMaster向ResourceManager注销,整个应用程序运行结束。
上述步骤中,步骤2~3涉及到资源申请与使用,而这正是Container出现的地方。
在YARN中,ResourceManager中包含一个插拔式的组件:资源调度器,它负责资源的管理和调度,是YARN中最核心的组件之一。
当向资源调度器申请资源,需向它发送一个ResourceRequest列表,其中,每个ResourceRequest描述了一个资源单元的详细需求,而资源调度器则为之返回分配到的资源描述Container。每个ResourceRequest可看做一个可序列化Java对象,包含的字段信息(直接给出了Protocol Buffers定义)如下:
message ResourceRequestProto {
optional PriorityProto priority = 1; // 资源优先级
optional string resource_name = 2; // 资源名称(期望资源所在的host、rack名称等)
optional ResourceProto capability = 3; // 资源量(仅支持CPU和内存两种资源)
optional int32 num_containers = 4; // 满足以上条件的资源个数
optional bool relax_locality = 5 [default = true]; //是否支持本地性松弛(2.1.0-beta之后的版本新增加的,具体参考我的这篇文章:Hadoop新特性、改进、优化和Bug分析系列3:YARN-392)
}
从上面定义可以看出,可以为应用程序申请任意大小的资源量(CPU和内存),且默认情况下资源是本地性松弛的,即申请优先级为10,资源名称为“node11”,资源量为<2GB, 1cpu>的5份资源时,如果节点node11上没有满足要求的资源,则优先找node11同一机架上其他节点上满足要求的资源,如果仍找不到,则找其他机架上的资源。而如果你一定要node11上的节点,则将relax_locality置为false。
发出资源请求后,资源调度器并不会立马为它返回满足要求的资源,而需要应用程序的ApplicationMaster不断与ResourceManager通信,探测分配到的资源,并拉去过来使用。一旦分配到资源后,ApplicatioMaster可从资源调度器那获取以Container表示的资源,Container可看做一个可序列化Java对象,包含的字段信息(直接给出了Protocol Buffers定义)如下:
message ContainerProto {
optional ContainerIdProto id = 1; //container id
optional NodeIdProto nodeId = 2; //container(资源)所在节点
optional string node_http_address = 3;
optional ResourceProto resource = 4; //container资源量
optional PriorityProto priority = 5; //container优先级
optional hadoop.common.TokenProto container_token = 6; //container token,用于安全认证
}
一般而言,每个Container可用于运行一个任务。ApplicationMaster收到一个或多个Container后,再次将该Container进一步分配给内部的某个任务,一旦确定该任务后,ApplicationMaster需将该任务运行环境(包含运行命令、环境变量、依赖的外部文件等)连同Container中的资源信息封装到ContainerLaunchContext对象中,进而与对应的NodeManager通信,以启动该任务。ContainerLaunchContext包含的字段信息(直接给出了Protocol Buffers定义)如下:
message ContainerLaunchContextProto {
repeated StringLocalResourceMapProto localResources = 1; //Container启动以来的外部资源
optional bytes tokens = 2;
repeated StringBytesMapProto service_data = 3;
repeated StringStringMapProto environment = 4; //Container启动所需的环境变量
repeated string command = 5; //Container内部运行的任务启动命令,如果是MapReduce的话,Map/Reduce Task启动命令就在该字段中
repeated ApplicationACLMapProto application_ACLs = 6;
}
每个ContainerLaunchContext和对应的Container信息(被封装到了ContainerToken中)将再次被封装到StartContainerRequest中,也就是说,ApplicationMaster最终发送给NodeManager的是StartContainerRequest,每个StartContainerRequest对应一个Container和任务。
Delay scheduling机制是为了提高数据本地性提出的,它的基本思想是,当一个节点出现空闲资源时,调度器按照调度策略应将该资源分配给job1,但是job1没有满足locality的任务,考虑到性能问题,调度器暂时跳过该作业,而将空闲资源分配给其他有locality任务的作业,今后集群出现空闲资源时,job1将一直被跳过,知道它有一个满足locality的任务,或者达到了管理员事先配置的最长跳过时间,这时候不得不将资源分配给job1(不能让人家再等了啊,亲)。
YARN调度器是一个resource-centric调度器,调度时间复杂度是O(number of nodes),而JobTracker调度器是一个task-centric调度器,调度时间复杂度是O(number of tasks),在设计YARN调度策略时,一定要牢记这一点,这是保证YARN高扩展性的前提,切莫混淆了这两种调度。
对于FIFO,在hadoop中,就是有节点汇报心跳,然后遍历所有任务找出优先级最高的满足本地性的任务,调度任务执行;在yarn中根据各个队列资源请求,然后遍历节点,找到合适资源,将容器列表分派给队列。
Yarn,AM从RM申请资源,是pull模式,AM要求NM启动container是push模式, NM向RM汇报心跳,是Pull模式。