总目录:https://www.jianshu.com/p/e406a9bc93a9
Hadoop - 子目录:https://www.jianshu.com/p/9428e443b7fd
Job的实例化
Configuration con = new Configuration(); //读取各项配置
Job job = Job.getInstance(con);//将配置加载到job作业里面。
job.setJarByClass(WordCountMain.class);//指定主类
Configuration做为Hadoop的一个基础功能承担着重要的责任,为Yarn、HSFS、MapReduce、NFS、调度器等提供参数的配置、配置文件的分布式传输(实现了Writable接口)等重要功能。
从上到下的实现的功能依次是
LOG
--记录日志
quietmode
--boolean类型,配置信息加载过程中是否静默,即有一些信息不会被记录,默认是true
resources
--ArrayList<Resource>类型,Resource是Configuration的内部类,有两个属性Object resource和String name;resources是一个对象数组,用于存储有关包含配置信息的对象
finalParameter
--Set<String>类型,所有被声明为final的变量集合,声明为final就表示不能被后续覆盖
loadDefault
--boolean类型,是否加载默认配置;
REGISTRY
--WeakHashMap<Configuration,Object>类型,用于多个对象的相关配置的注册及对它们进行管理,记录了所有的Configuration
defaultResources
--CopyOnWriteArrayList<String>类型,用于存储默认的配置资源名或路径
{…}
--加载默认的配置资源
properties
--java内置的Properties类型,存储所有配置信息,KV值
overlay
--Properties类型,是用户设置的而不是通过对资源解析得到的
classLoader
--ClassLoader类型,主要用于加载指定的类或者加载相关资源
varPat
--静态Pattern类型,对含有环境变量的值的进行转换的正则表达式
MAX_SUBST
--静态int类型,默认值是20,MAX_SUBST是设定对带有环境变量的值所能够深入解析的层次数,超出这个最大的层数的值将不能够解析。
文件的输入输出
FileInputFormat.addInputPath(job, new Path("输入路径"));
FileOutputFormat.setOutputPath(job, new Path("输出路径"));
先说输入路径
进入 org.apache.hadoop.mapreduce.lib.input.FileInputFormat 类中
这是一个三目运算符,追加所有输入的目录,目录间用逗号分隔。
注:文件输入有两个方法,除了addinputpath还有一个setinputpaths,区别是后者一次只能处理一个输入路径。但是前者兼容后者。
之后是输出路径
进入org.apache.hadoop.mapreduce.lib.output.FileOutputFormat类中
输出路径是唯一的,这里会读取主类中设置的输出路径,但是会以最后一个为准,后面设置的会自动覆盖前面设置的。而且输出路径目录必须为空,且已经创建好。(IOException是读写异常)
Job的提交
第一层:waitForCompletion
job.setNumReduceTasks(1);//设置reduce任务数量
job.waitForCompletion(true);//job的提交
setNumReduceTasks源码没有什么意义,我们直接来看waitForCompletion。
我们会给waitForCompletion传输一个布尔值,来判断是否打印job的执行情况。
在org.apache.hadoop.mapreduce.Job类中
判断Job状态,如果处于DEFINE状态则通过submit()方法向集群提交job
if (state == JobState.DEFINE) {
submit();
}
判断是否需要打印job的执行过程
if (verbose) {
monitorAndPrintJob(); //verbose为TRUE时打印
} else {
。。。
//verbose为FALSE时轮询任务是否完成
Job.getCompletionPollInterval(cluster.getConf());
。。。
}
waitForCompletion的功能是向集群提交任务,并等待任务完成,这个方法里面我们有一条语句需要详细分析。
第二层:submit
submit();向集群提交任务,并立即返回。
进入submit方法后,首先会执行三个方法,分别是:
ensureState(JobState.DEFINE); //判断job类型是否是DEFIND
setUseNewAPI();//指定job使用的是新版mapreduce的API
connect();//对封装Configuration设置的集群的信息的cluster对象初始化
//创建真正的通信协议,被用于最终的job提交
之后产生一个对象负责job的运行进度
// 通过cluster中封装的集群信息(HDFS和client)获取JobSubmitter对象
//该对象负责最终向集群提交job并返回job的运行进度
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
之后会将JobState的状态从DEFINE转换为RUNNING,开始执行Job
ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster);
}
第三层:submitJobInternal
这个方法会将任务提交到集群中,是真正干活的方法。
1.在执行任务前,首先会检查输出路径是否配置,是否存在;正确应已配置但未创建,其默认配置参数为mapreduce.output.fileoutputformat.outputdir。
2.获取job中的集群配置信息,添加到分布式缓存中。
3.通过JobSubmissionFiles中的getStagingDir()方法获取作业执行时相关资源的存放路径。
4.获取当前主机ip与主机名,封装进Configuration对象中。
5.生成JobID并封装进Job。
6.构建提交作业的路径
7.权限与密钥相关配置
8.拷贝副本
9.将jar包上传到HDFS中
10.分片 (重要)
11.程序正式开始执行,并返回作业状态。
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {
//validate the jobs output specs
checkSpecs(job); //1.检查作业输出路径是否配置并且是否存在,必须配置且不存在
Configuration conf = job.getConfiguration(); // 2.获取job中封装的Configuration对象
addMRFrameworkToDistributedCache(conf); //2.添加到分布式缓存中
// 3.通过getStagingDir获取作业执行时相关资源的存放路径
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//configure the command line options correctly on the submitting dfs
//4.获取job的当前主机的IP,并将ip、主机名等相关信息封装进Configuration对象中
InetAddress ip = InetAddress.getLocalHost();
。。。。
//5.生成作业ID,即jobID
JobID jobId = submitClient.getNewJobID();
//5.将jobID添加到job内
job.setJobID(jobId);
//6.构造提交作业路径, jobStagingArea + /jobID
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
。。。。
// get delegation token for the dir 7.关于job提交路径权限的设置
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { submitJobDir }, conf);
。。。。
//8.复制job与相关配置文件作为副本
copyAndConfigureFiles(job, submitJobDir);
//8.获取配置文件路径
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
// Create the splits for the job 分片
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
//10.写分片数据文件job.splits和分片元数据文件job.splitmetainfo,计算map任务数
int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps);
。。。。
//9. Write job file to submit dir将Job文件(jar包)上传到任务提交文件夹(HDFS)
writeConf(conf, submitJobFile);
。。。。
//11.真正的提交作业到集群,并返回作业状态到status成员
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
。。。。
分片:writeSplits
进行追踪后,可以很清晰的看到,这里是决定使用新版API或者旧版API的地方。
对新版API进行追踪。
新版API:writeNewSplits()
1.根据我们设置的inputFormat反射出一个inputFormat类型的对象input
2.调用他的getSplits方法来获取分片信息
3.调用JobSplitWriter.createSplitFiles方法将分片的信息写入到submitJobDir/job.split文件中。
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
//获取切片信息
List<InputSplit> splits = input.getSplits(job);
//创建一个数组,以split集合的切片大小为长度
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length;
}
分片实际方法:getSplits()
这个方法在org.apache.hadoop.mapreduce.lib.input.FileInputFormat类中。
这个方法和submitJobInternal方法一样,都是用来干活的。
我们需要注意这一个while循环。
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
这个循环的判定语句是剩余的字节大小是否大于预设分片大小的1.1倍。
private static final double SPLIT_SLOP = 1.1; // 10% slop
也就是说当剩余字节大于预设分片大小的110%后,对剩余的文件继续分片,否则直接将剩余文件生成一个分片。
我们在MapReduce那一篇的分片补充知识中说过,分片是有1.1倍冗余的。