MapReduce 原理和源码-实战

Local模式下的 MapReduce 计算步骤(图解)

步骤详解

  1. "main"线程中完成input切片和Job提交
  • 本地构建submitJobDir临时目录
  • 根据InputPath文件数和blockSize大小进行分片: InputSplit[]
  • 将分片信息和作业配置一起写入到submitJobDir中;
  • 将submitJobDir下所有上传(本地告知)MR作业Job,完成提交;
  1. "?"线程完成作业信息的接收和解析
  • 处理submitJobDir提交上来的信息;
  • 根据conf信息创建相关类
  1. "Thread-3"的Job.run()线程执行Job的运算
  • 根据分片构建Map任务的List<RunnableWithThrowable>
  • 遍历提交每个MapTask线程任务;
    • 下接: "LocalJobRunner Map Task"线程的 JobRunner.MapTaskRunnable.run()
  • 根据job.reduces数量构建Reduce的线程任务reduceRunnables: List<RunnableWithThrowable>
  • 遍历并执行每个ReduceTask线程任务;
    • 下接: "pool-n-thread-1"线程中的 ReduceTaskRunnable.run()方法;
  1. "LocalJobRunner Map Task"线程: 完成一个分配Map计算;
  • 获取map对应分片信息和数据,
  • 构建MapTask任务
  • input.init加载分片输入数据;
  • mapper.run() 循环调用User定义的map()方法完成计算;
  • 将Map结果写入中间shuffle文件并告知Tracker;
  1. "pool-3-thread-1"线程完成一次Reduce计算
  • 构建ReduceTask任务;
  • 构建shuffleContext 洗牌任务并执行shuffle.run()洗牌;
  • 基于shuffle后的结果数据, 封装进KeyValueIterator迭代器中, 并执行reducer.reduce()聚合计算;
  • 将reduce计算结果写入保存到OutputPath路径上;
  • 结束Job并释放/消耗相应资源;

"main"线程切片和提交

JobSubmitter.submitJobInternal源码.png

构建上传目录submitJobDir的原理源码

源码详解:

JobSubmitter.submitJobInternal(){
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);{
        client.getStagingAreaDir();{
            Path stagingRootDir = new Path(conf.get(JTConfig.JT_STAGING_AREA_ROOT, "/tmp/hadoop/mapred/staging"));
            user = ugi.getShortUserName() + rand.nextInt(Integer.MAX_VALUE);//username-randid;
            return fs.makeQualified(new Path(stagingRootDir, user+"/.staging")).toString();
        }
    }
    JobID jobId = submitClient.getNewJobID();{//LocalJobRunner.getNewJobID(): 以local+上面rand.nextInt(MAX)生成的随机数+0/n, 作为jobId;
        return new org.apache.hadoop.mapreduce.JobID("local" + randid, ++jobid);
    }
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
}
LocalJobRunner.getStagingAreaDir()方法.png
生成submitJobDir的构成.png

1.2 计算并写入split信息: writeSplits()

job.split文件: 应该是对应每个FileSplit, 他的Zfile类型和存储Path;

// job.split的内容
org.apache.hadoop.mapreduce.lib.input.FileSplitZfile:/E:/ws/ws-idea/ruoze-study/g9-proj-0701/G9-03/hadoop-cdh-demo/input/wc/words_33M.data   

/org.apache.hadoop.mapreduce.lib.input.FileSplitZfile:/E:/ws/ws-idea/ruoze-study/g9-proj-0701/G9-03/hadoop-cdh-demo/input/wc/words_10M.data

// job.splitmetainfo的内容, ?
4d45 5441 2d53 504c 0102 0109 6c6f 6361
6c68 6f73 7407 8c02 1000 0001 096c 6f63
616c 686f 7374 8fa2 8da0 0004 

MR Driver端的 Submit MR Job的逻辑

  • 先构建本地 submitJobDir临时目录;
  • 再 根据InputPath 获取文件分片 splits
  • 最后 分片和配置信息, 统一writeConf 写入到 conf文件中;
  • submitJob() 将Job和配置, jars都上传到远程MR服务;
UserDriver.main(){
    
    job.waitForCompletion(true){//Job: 
        if (state == JobState.DEFINE) {
            submit();{
                setUseNewAPI();//设置新的Api
                connect();{//创建Cluster对象,用于表示与目标FileSystem建立了连接;
                    if (cluster == null) {
                         cluster =  ugi.doAs(new PrivilegedExceptionAction<Cluster>() { return new Cluster(getConfiguration());  });
                    }
                }
                // 构建JobSubmitter , 主要是封装了: jtFs:目标文件系统(本地/hdfs), submitClient(通信客户端), hostAndPort: 地址端口;
                final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
                
                status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
                    run(){return submitter.submitJobInternal(Job.this, cluster);{//JobSubmitter.submitJobInternal()
                        // 校验Jobs 的输出格式和合法性;
                        checkSpecs(job);
                        
                        //根据(哈希)算法创建本地 submitJobDir目录,用于收集/存放/提交 Job所需配置和资源;
                        Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
                        Path submitJobDir = new Path(jobStagingArea, jobId.toString());
                        
                        try{
                            copyAndConfigureFiles(job, submitJobDir);
                            
                            /* 核心代码: 根据file文件数量, 创建相关MR任务的分片/分区: splits
                            *
                            */
                            int maps = writeSplits(job, submitJobDir);{//JobSubmitter.writeSplits()
                                if (jConf.getUseNewMapper()) {//mapred.mapper.new-api==true属性时,用新的api
                                    maps = writeNewSplits(job, jobSubmitDir);{
                                        List<InputSplit> splits = input.getSplits(job);{//FileInputFormat.getSplits()
                                            List<FileStatus> files = listStatus(job);{
                                                Path[] dirs = getInputPaths(job);{//FileInputFormat.getInputPaths()
                                                    String dirs = context.getConfiguration().get(INPUT_DIR, "");// FileInputFormat.setInputPaths() => mapreduce.input.fileinputformat.inputdir (INPUT_DIR) ,即传入Input路劲;
                                                    Path[] result = new Path[list.length];
                                                    return result;
                                                }
                                                
                                                // 根据是否递归,递归获取子文件
                                                boolean recursive = getInputDirRecursive(job);//INPUT_DIR_RECURSIVE参数(mapreduce.input.fileinputformat.input.dir.recursive)设定
                                            }
                                        }
                                        
                                        T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
                                        Arrays.sort(array, new SplitComparator());
                                        return array.length;
                                    }
                                }
                            }
                            conf.setInt(MRJobConfig.NUM_MAPS, maps);
                            
                            writeConf(conf, submitJobFile);{
                                FSDataOutputStream out = FileSystem.create(jtFs, jobFile, new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
                                conf.writeXml(out);
                            }
                            status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials());
                        }finally{
                            jtFs.delete(submitJobDir, true); //清除 submitJobDir临时提交目录;
                        }
                    }}
                });
            }
        }
        if (verbose) {//打印状态;
            monitorAndPrintJob();
        }
    }
}


最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,793评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,567评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,342评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,825评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,814评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,680评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,033评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,687评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,175评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,668评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,775评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,419评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,020评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,206评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,092评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,510评论 2 343