Local模式下的 MapReduce 计算步骤(图解)
步骤详解
- "main"线程中完成input切片和Job提交
- 本地构建submitJobDir临时目录
- 根据InputPath文件数和blockSize大小进行分片: InputSplit[]
- 将分片信息和作业配置一起写入到submitJobDir中;
- 将submitJobDir下所有上传(本地告知)MR作业Job,完成提交;
- "?"线程完成作业信息的接收和解析
- 处理submitJobDir提交上来的信息;
- 根据conf信息创建相关类
- "Thread-3"的Job.run()线程执行Job的运算
- 根据分片构建Map任务的List<RunnableWithThrowable>
- 遍历提交每个MapTask线程任务;
- 下接: "LocalJobRunner Map Task"线程的 JobRunner.MapTaskRunnable.run()
- 根据job.reduces数量构建Reduce的线程任务reduceRunnables: List<RunnableWithThrowable>
- 遍历并执行每个ReduceTask线程任务;
- 下接: "pool-n-thread-1"线程中的 ReduceTaskRunnable.run()方法;
- "LocalJobRunner Map Task"线程: 完成一个分配Map计算;
- 获取map对应分片信息和数据,
- 构建MapTask任务
- input.init加载分片输入数据;
- mapper.run() 循环调用User定义的map()方法完成计算;
- 将Map结果写入中间shuffle文件并告知Tracker;
- "pool-3-thread-1"线程完成一次Reduce计算
- 构建ReduceTask任务;
- 构建shuffleContext 洗牌任务并执行shuffle.run()洗牌;
- 基于shuffle后的结果数据, 封装进KeyValueIterator迭代器中, 并执行reducer.reduce()聚合计算;
- 将reduce计算结果写入保存到OutputPath路径上;
- 结束Job并释放/消耗相应资源;
"main"线程切片和提交
构建上传目录submitJobDir的原理源码
源码详解:
JobSubmitter.submitJobInternal(){
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);{
client.getStagingAreaDir();{
Path stagingRootDir = new Path(conf.get(JTConfig.JT_STAGING_AREA_ROOT, "/tmp/hadoop/mapred/staging"));
user = ugi.getShortUserName() + rand.nextInt(Integer.MAX_VALUE);//username-randid;
return fs.makeQualified(new Path(stagingRootDir, user+"/.staging")).toString();
}
}
JobID jobId = submitClient.getNewJobID();{//LocalJobRunner.getNewJobID(): 以local+上面rand.nextInt(MAX)生成的随机数+0/n, 作为jobId;
return new org.apache.hadoop.mapreduce.JobID("local" + randid, ++jobid);
}
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
}
1.2 计算并写入split信息: writeSplits()
job.split文件: 应该是对应每个FileSplit, 他的Zfile类型和存储Path;
// job.split的内容
org.apache.hadoop.mapreduce.lib.input.FileSplitZfile:/E:/ws/ws-idea/ruoze-study/g9-proj-0701/G9-03/hadoop-cdh-demo/input/wc/words_33M.data
/org.apache.hadoop.mapreduce.lib.input.FileSplitZfile:/E:/ws/ws-idea/ruoze-study/g9-proj-0701/G9-03/hadoop-cdh-demo/input/wc/words_10M.data
// job.splitmetainfo的内容, ?
4d45 5441 2d53 504c 0102 0109 6c6f 6361
6c68 6f73 7407 8c02 1000 0001 096c 6f63
616c 686f 7374 8fa2 8da0 0004
MR Driver端的 Submit MR Job的逻辑
- 先构建本地 submitJobDir临时目录;
- 再 根据InputPath 获取文件分片 splits
- 最后 分片和配置信息, 统一writeConf 写入到 conf文件中;
- submitJob() 将Job和配置, jars都上传到远程MR服务;
UserDriver.main(){
job.waitForCompletion(true){//Job:
if (state == JobState.DEFINE) {
submit();{
setUseNewAPI();//设置新的Api
connect();{//创建Cluster对象,用于表示与目标FileSystem建立了连接;
if (cluster == null) {
cluster = ugi.doAs(new PrivilegedExceptionAction<Cluster>() { return new Cluster(getConfiguration()); });
}
}
// 构建JobSubmitter , 主要是封装了: jtFs:目标文件系统(本地/hdfs), submitClient(通信客户端), hostAndPort: 地址端口;
final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
run(){return submitter.submitJobInternal(Job.this, cluster);{//JobSubmitter.submitJobInternal()
// 校验Jobs 的输出格式和合法性;
checkSpecs(job);
//根据(哈希)算法创建本地 submitJobDir目录,用于收集/存放/提交 Job所需配置和资源;
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
try{
copyAndConfigureFiles(job, submitJobDir);
/* 核心代码: 根据file文件数量, 创建相关MR任务的分片/分区: splits
*
*/
int maps = writeSplits(job, submitJobDir);{//JobSubmitter.writeSplits()
if (jConf.getUseNewMapper()) {//mapred.mapper.new-api==true属性时,用新的api
maps = writeNewSplits(job, jobSubmitDir);{
List<InputSplit> splits = input.getSplits(job);{//FileInputFormat.getSplits()
List<FileStatus> files = listStatus(job);{
Path[] dirs = getInputPaths(job);{//FileInputFormat.getInputPaths()
String dirs = context.getConfiguration().get(INPUT_DIR, "");// FileInputFormat.setInputPaths() => mapreduce.input.fileinputformat.inputdir (INPUT_DIR) ,即传入Input路劲;
Path[] result = new Path[list.length];
return result;
}
// 根据是否递归,递归获取子文件
boolean recursive = getInputDirRecursive(job);//INPUT_DIR_RECURSIVE参数(mapreduce.input.fileinputformat.input.dir.recursive)设定
}
}
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
Arrays.sort(array, new SplitComparator());
return array.length;
}
}
}
conf.setInt(MRJobConfig.NUM_MAPS, maps);
writeConf(conf, submitJobFile);{
FSDataOutputStream out = FileSystem.create(jtFs, jobFile, new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
conf.writeXml(out);
}
status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials());
}finally{
jtFs.delete(submitJobDir, true); //清除 submitJobDir临时提交目录;
}
}}
});
}
}
if (verbose) {//打印状态;
monitorAndPrintJob();
}
}
}