摘自:http://staticor.io/post/hadoop/2016-01-23hadoop-definitive-guide-note#toc_7
Developing a MapReduce ApplicationRunning on a ClusterLaunching a Job
MapReduce Web UI
Tuning a JobMapReduce Workflows
How MapReduce WorksJob Submission
Job Intialization
Task Assignment
Task ExecutionStreaming
Progress and Status UpdatesJOB Completion
MapReduce Types and FormatsTypes
Default MapReduce Jobstreaming
Input FormatsInputSplitFileInputFormat
split size 公式
Small files and CombineFileInputFormat
Preventing splitting
Processing a whole file as a record
Output FormatsMultiple Outputs
MapReduce FeaturesCounterTask counters
Job counters
sortingPREPARATION
PARITIAL SORT
TOTAL SORT
Secondary SortSTREAMING
Developing a MapReduce Application
Writing a program in MapReduce follows a certain pattern. Before we start writing a MapReduce program, however, we need to set up and configure the development.
Configuration class ( org.apache.hadoop.conf package)
Running on a Cluster
In a distributed setting, things are a little more complex. For a start, a jobs' classes must be packaged into a jobJAR file to send to the cluster. Hadoop will find the job JAR automactically by searching for the JAR on the drivers' classpath that contains the class set in the setJarByClass() method ( on JobConf or Job).
The client class pathuser's client-side classpath set by hadoop jar is made up of:
- The job Jar file- Any JAR files in the lib directory of the job JAR file, and the classes direcotry ( if present)- The classpath defined by HADOOP_CLASSPATH, if set.
Launching a Job
$ hadoop jar hadoop-examples.jar v2.MaxTemperatureDriver \ -conf conf/hadoop-cluster.xml input/ncdc/all max-temp
Job, Task, and Task Attempt IDs
job ID 根据 YARN application ID 生成. YARN application ID 由 YARN RM 生成 (与 RM 的composed time, counter maintained, 以及后面添加的app唯一标识序号 有关), app ID 一般长得样子是application_141041231231_0003
, 对应的jobID 就是把这前的application 替换为 job_. ==> job_141041231231_0003一个Job再细分为几个Task, 它们的ID是 将job_前缀替换为 task ==> task_141041231231_0003_m_00003. 前后缀加上任务编号, 以区别不同的task.
MapReduce Web UI
YARN Page: resource-manager-host:8088
resource manager page
job history page
mapreduce job page
Tuning a Job
" Can I Make it run faster" may come out of our minds, after our job is working.
You should think about these profile in checklist before you start trying to optimize at the task level.
Number of mappers 如果每个mapper只运行几秒就停了, 你应该让他们运行更长时间, 1分钟甚至更长. 对于small files 去看看 CombineFileInputFormat (本博客中有单独文章提及)
Number of reducers 每个reducer的建议时长至少是5分钟, 产出数据大小应是blocksize水平的. 书中后面会单独讲解.
Combiners 在 mapper-reducer 之间的shuffle过程中能否使用某Combiner 提升效率
Intermidiate compression map输出进行压缩可能提升效率
Custom serialization
Shuffle tweaks
MapReduce Workflows
这里继 求当年最高温度之后再尝试寻找一个新的例子 -- 求一年366天中, 历年在多种天气状态下的平均最大温度. 1.1号为例, 先取下雨天气的天气, 然后求1901年至2000年 每年1.1号的下雨天气的最大温度, 找到最大值, 再对各类天气状态的最大求均值.
这个分析需要进行分解:
计算 (日期-无年, 天气状态) 为key下的最大温度
求上面输出的key下平均值
结合之前的分析, 能看到这里的任务要分2步或以上的MapReduce完成.
How MapReduce Works
MapReduce Job 的Lifetime:
hadoop客户端, 用户提交MapReduce Job
YARN RM(resource manager) 来分配cluster上的资源
YARN NM(NameNode) 准备用于运算的containers
MapRededuce application master, 在job运行过程中的"协调人".
HDFS share job间的文件
Job Submission
submit() 完成: 向 RM 申领一个新的 application Id, 用来指向给MapReduce Job ID; 检测Job, 例如输出的目录(output files)是否已经存在, 如果已经存在则会报错; 运行Input的Split, 若这过程有问题, 例如Input Path 不存在, 则会报错返回给 MapReduce Program; 对运行Job时需要的文件: JAR file, 配置文件, 计算好的 Input Splits, 进行copy, 待JobId分配成功后将放入到以Id命名的Share directory. 在集群运行该JOB时会有大量NM对Jar 进行访问, 因此其copy的量会较大, 程序中用 mapreduce.client.submit.file.replication来的控制, 默认因子设置为10. 提交成功.
Job Intialization
RM 接收到 某项 submit的请求后, 会将由 YARN 的 scheduler 处理该请求 -- 给其分配container(RM 启动任务, NM管理的地方). 该项MapReduce Job的直接master其实是 Java application 中的主类 MRAppMaster , 这个类中会创建一系列的bookkeeping objects用来跟踪记录Job的处理进度. 因为Job是会以被再细分为若干项Task, 所以每项Task都会单独向MRAppMaster汇报其完成情况. 另外, 它还会接收到用集群对输入切分好的Input Splits, 然后为每Input Split创建mapper task, 同样也完成reducer的初始化. (reducer个数由 mapreduce.job.reducers指定).
uberized
application master 决定如何运行MapReduce job下的各项task. 一般来说, 每一项task单独被申请各自container, 等task执行完毕, container被NM回收. 这种情况下, 每个JVM仅仅执行一次task. 如果Job太小, application master 可能会用相同的JVM执行多个任务, 实现JVM的重用 -- 每个task依次在这个container里的JVM里顺序执行, 直到所有task被执行完毕. 这样master不必申请多次, 达到了uberlized 的效果.
application master 在 OutputCommitter上 执行 setupJob() 方法, 为task的输出创建临时woking space及输出目录. 详情还要查询 Output Committers
Task Assignment
non-uber task. application master 开始向 RM 为他创建好的 mapper/reducer申请container 资源完成任务. mapper的请求优先级会高于reducer -- 这是因为在执行sort, reducer 任务之前 所有mapper结果要完成. 对于 reduce的请求, 至少要等 5% 的map任务完成 才会开始接受受理. Reduce 任务可以在集群的任意位置执行, 但map task 受到数据局部性(data locality)制约. map, reduce task默认的分配内存是 1024MB. 该值的properties: mapreduce.map.memory.mb mapreduce.reduce.memory.mb, mapreduce.map.cpo.vcores, mapreduce.reduce.cpu.vcores
data locality , 在quora中找到了一个答案中的解释:
Data locality is a core concept of Hadoop, based on several assumptions around the use of MapReduce. In short, keep data on disks that are close to the CPU and RAM that will be used to process and store it. If you had a cluster of 100 machines, and needed to read a selection of records, the records should be adjacent on disk, fit into RAM and be processable (e.g sorted or computed) using that machine's CPU.
Task Execution
Streaming
Progress and Status Updates
user有必要获得他提交的job目录的运行情况. 包括每个task的status(运行中, 成功, 失败), Mapper和Reducer的完成进度. Mapper的完成度就是和task完成个数比例有关. Reducer则要复杂一些, 涉及到shuffle, reduce.
JOB Completion
application master, task containers 负责打扫, 清理. 执行OutputCommitter 's commitJob, 让用户看到预期的历史记录, 服务器使用信息.
Failures
任务失败, 遇到这种情况很正常, 我们应该学习怎样避免失败. 或者说如何解决这样的问题, 并避免.
失败的维度: task, application master, node manager, resource manger.
task failure 正常来说, 是最易看到的一类错误, 如果一个map/reduce task 抛出 runtime型的异常, JVM将会在其exit之前先把该信息报告给application master. app master 标记这项task为 failed. 然后释放container资源, 供下一个task使用.
另, 对于一个failed的task, app master 会再给他4次重试的机会(4 可进行修改, 参数为:mapreduce.map.maxattempts) app master 会在重新调度的时候尽可能的使用与先前不同的Node来执行这个失败的任务 4次不行的话 则整个Job标记为失败.
如果失败的task 超过一定个数, 则会激活 job failure的改变. Profiles: mapreudce.map.failures.maxpercent, mapreduce.reduce.failures.maxpercent (也就是说失败的个数小于这个百分比, 那么 appication将会继续执行其它的task.
application master failure
node manager failure 当node manager fail RM上看不到其返回的heartbeat. (比如10分钟内看不到, 时间配置:yearn.resourcemanager.nm.liveness-monitor.expiry-interval-ms)
resource manager failure
这个问题就比较严重了. 暂时先不想了解.
Shuffle and Sort
注, 上图中能看出, map的输出结果不是直接写到disk中, 而是先到一个内存中的缓存区(memory buffer), 这个默认大小是100MB(可通过 mapreduce.task.io.sort.mb 参数配置), 当缓存区的空间达到一定水平(默认是 80%mapreduce.map.sort.splill.percent 0.80) , 将会启动spill写到disk. (map持续向buffer中写 , 不会停止) , 要是 buffer达到100%了, 则map的输出则会暂停, 直到spill完成. 不过还要注意的就是, 写向disk之前, data也会按照reducer的要求进行partition. 按照给的或默认的方式, 在每个partition内执行combine.
compress map output通过来说, 对map的结果进行压缩处理, 将会提高Job的效率. 因为这样会节约磁盘空间, 减少向reducer转移的数据量大小. 默认, 是不进行压缩. 但可通过mapreduce.map.output.compress = true.
reducer side
MapReduce Types and Formats
Types
map: (K1, V1) -> list(K2, V2)reduce: (K2, list(V2)) -> list(K3, V3)
若增加了combiner
map: (K1, V1) -> list(K2, V2)combiner: (K2, list(V2)) -> list(K2, V2)reduce: (K2, list(V2)) -> list(K3, V3)
Input types: 输入的格式被 input format 指定, 例如: TextInputFormat 指定 是
Default MapReduce Job
如果MapReduce 没有 mapper, reducer会是怎样?
public MinimalMapReduce extends Configured implements Tool{ @Override public int run(String[] args)throws Exception { if (args.length != 2) { System.err.printf("Usage: %s [generic options] <input> <output> \n", getClass().getSimpleName()); ToolRunner.printGenericCommandUsage(System.err); return -1; } Job job = new Job(getConf()); job.setJarByClass(getClass()); FileInputFormat.addInputPath(job, new Path(args[0]); FileoutputFormat.addOutputPath(job, new Path(args[1]); return job.waitForCompletion(true) ? 0 : 1 ; } public static void main(String[] args ) throws Exception { int exitCode = ToolRunner.run(new MinimalMapReduce(), args); System.exit(exitCode); }}
每行是一个record(key,value分别是 line's offset + line ) 组成. 下面是MapReduce 的 driver程序, 使用精确的配置参数:
public class MinimalMapReduceWithDefaults extends Configured implements Tool { @Override public int run(String[] args) throws Exception{ Job job = JobBuilder.parseInputAndOuput(this, getConf(), args); if (job == null) { return -1; } job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(Mapper.Class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); job.setPartitionerClass(HashPartitioner.class); job.setNumReduceTasks(1); job.setReducerClass(Reducer.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(TextOutputFormat.class); return job.waitForCompletion(true)? :0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new MinimalMapReduceWithDefaults(), args); System.exit(exitCode); }}
以上就是MapReduce 一个Job的框架代码. 默认的input format 是 TextInputFormat , 提供了
keys the offset of the beginning of the line in the file.
values the line of text
mapper的泛用型定义
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { context.write((KEYOUT) key, (VALUEOUT) value); }}
默认partitioner : HashPartitioner 根据 map keyout 的key -hash值 & Integer.Max_VALUE, mod % reduce个数, 进行partition. 原理是让每个reducer处理的 mapOutput key的种类数是even的. (特殊情况当然也能想象: 某几种output key的个数非常多, 则就造成 个别reducer 处理的数据量非常非常大)
public class HashPartitioner<K, V> extends Partitioner<K, V> { public int getPartition(K key, V value, int numReduceTasks){ return (key.hashCode() & Integer.Max_VALUE ) % numReduceTasks; } }
Choose the number of reducers
很多新人(说我呢) 都觉得reducer个数越多越好 -- 这样对于map/reduce之间的数据流有好处. 选择合适的reducer个数也不是一件容易的事. 使用多的reducer固然增大并行化, 让每个reducer处理的数据量减少. 然而, 这样你会得到很多的小文件 -- 相对来说, 把这种文件控制在一定水平内才是最优的策略. 通常来说, 一个reducer的处理时间控制在5分钟左右, 产出的数据量大小应该是和HDFS的blocksize相当的.
reducer generic type
public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException { for (VALUEIN value: values) { context.write((KEYOUT) key, (VALUEOUT) value); } }}
在reducer处理之前数据会先经过shuffle的排序.
streaming
Input Formats
为了处理输入数据更加效率, Hadoop也提供了不仅仅TextInputFormat一种方式. 为了讲的更详尽一点, 还得从它的继承和父类开始.
InputSplit
Hadoop里对于输入的分割, 定义了InputSplit这个抽象类, 表示一个mapper处理的输入数据, 其中有2个抽象方法需要实现:
public abstract class InputSplit{ public abstract long getLength() throws IOException, InterruptedException; public abstract String[] getLocations() throws IOException, InterruptedException; }
这里2个方法功能也非常简单, 目的是为了获得输入的字节长度大小和位置信息. 位置用来分割时将map task处理的数据更接近, 大小信息方便将较大的先被处理, 这样方便减少job的运行时间. (贪心算法)
MapReduce里用InputFormat完成对InputSplits的创建. 也是一个抽象类:
public abstract class InputFormat<K, V> { public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException; public abstract RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException; }
这里的getSplits()方法对输入进行分割, 然后把结果发送到application master, am分配给mapper. mapper将会使用 createRecordReader() 实现的结果, 获得Split的结果 -- RecordReader, 原来输入中record的数据流. 下面是mapper类中run()方法, 完成对 record流的解析处理.
public void run(Context context) throws IOException, InterruptedException{ setup(context) while (context.nextKeyValue()){ map(context.getCurrentKey(), context.getCurrentValue(), context); cleanup(context); }}
FileInputFormat
FileInputFormat 是对 InputFormat类的继承, 用于对文件输入的指定, 它能干两件事: 1. 定义文件在Job中的输入格式; 2. 完成文件的Split. 不过一般的, 在实际工作是使用的它的子类. 见下图:
input paths
FileInputFormat 首先对输入的路径定义了几个常用方法:
public static void addInputPath(Job job, Path path)public static void addInputPaths(Job job, String commaSeparatedPaths)public static void setInputPath(Job job, Path ... input Paths)public static void setInputPaths(Job job, String commaSeparatedPaths)
add~方法用来将一个路径或一组路径添加到输入的列表中. set~方法则是用新的参数替换掉原来已有的输入列表. (比如add 加了3个, set之后加的3个将 不再)
注, 这里路径支持 glob pattern
注, 虽然用户没有自己指定, 但FileInputFormat将自动过滤那些隐藏文件(以 . 或者 _ 开头的文件)
FileInputFormat input splits
FileInputFormat 如何产生input splits> ? 它只能split那些"large" files. 这里大小的定义是和HDFS block 大小有关. 当然这里也有知道有这几个property
- mapreduce.input.fileinputformat.split.minsize 最小分割值, 默认为1- mapreduce.input.fileinputformat.split.maxsize 最大分割值, 默认为 Long.MAX_VALUE- dfs.blocksize long 一般是128 MB(即为 134217728)
最小分割常常是1个byte, 有时有的format 会定义一个下界. Application 可能也会对minSplitSize进行设置, 把它设为一个大于HDFS block 的值, 但这未必是一个好方式. 最大分割, 只有当这个值小于 block size才会起作用.
split size 公式
max(minimumSize, min(maximumSize, blockSize))
默认情况: minimumSize < blockSize < maximumSize
Small files and CombineFileInputFormat
HDFS 是更擅长于处理个数少, 而块头大的数据文件, 相较于个数多,而却很小的小文件簇来说.
如果是面临着非常多的小文件, 比如10000个大小均小于10MB的输入文件群, 再直接用FileInputFormat就不合适了 -- 它是对每个文件进行分割.
为什么说小文件太多 HDFS反而应付不好>>?MapReduce工作模式对集群上磁盘间的文件转移要求很高, 小文件过多, 无形之中加大了对文件的seek工作. 再者, 太多的小文件加大了namenode的管理mapping的工作. 一个可取的策略是将这些小文件通过合并形成sequencedfiles -- key作为文件名, value为文件内容. 但要是目前已经有这些小文件了, 这时应考虑使用 CombineFileInputFormat.
Preventing splitting
有时我们想对一整个文件作为一个Input SPlit, 而不想切分. 方法1: 将minimum split size 设置的非常大. 方法2: 对FileInputFormat实现的子类 重写它的 isSplittable()方法. 例子
import org.apache.hadoop.fs.Path;import org.apache.hadoop.mapreduce.JobContext;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;public class NonSplittableTextInputFormat extends TextInputFormat{ @Override protected boolean isSplitable(JobContext context, Path, file){ return false; }}
Processing a whole file as a record
有些特殊情况, 要求我们以多个目录中各个文件作为record. (比如某个时间戳下产生的数据, 以时间戳命名) 对于这样的场景, 可使用 WholeFileInputFormat
public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> { @Override protected boolean isSplitable(JobContext context, Path file){ return false; } @Override publick RecordReader<NullWrtable, BytesWritable> createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { WholeFileRecordReader reader = new WholeFileRecordReader(); reader.initialize(split, context); return reader; }}
注, 由于整个文件内容作为这里record的value, 因此肯定是不可分的. 同之前, 重写isSplitable()方法.
The RecordReader used by WholeFileInputFormat for reading a whole file as a record
class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {private FileSplit fileSplit;private Configuration conf;private BytesWritable value = new BytesWritable();private boolean processed = false; @Overridepublic void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.fileSplit = (FileSplit) split; this.conf = context.getConfiguration();}@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException { if (!processed) { byte[] contents = new byte[(int) fileSplit.getLength()]; Path file = fileSplit.getPath(); FileSystem fs = file.getFileSystem(conf); FSDataInputStream in = null; try { in = fs.open(file); IOUtils.readFully(in, contents, 0, contents.length); value.set(contents, 0, contents.length); }finally{ IOUtils.closeStream(in); } processed = true; return true } return false;}@Overridepublic NullWritable getCurrentKey() throws IOException, InterruptedException { return NullWritable.get();}@Overridepublic BytesWritable getCurrentValue() throws IOException, InterruptedException { return value;}@Overridepublic float getProgress() throws IOException{ return processed ? 1.0f : 0.0f;}@Override public void close() throws IOException { //do nothing}}
Text Input
TextInputFormat 示例文本 共四行文本
On the top of the Crumpetty Tree The Quangle Wangle sat, But his face you could not see, On account of his Bea ver Hat.
经TextInputFormat, 得到的 对是这样的对:
(0, On the top of the Crumpetty Tree) (33, The Quangle Wangle sat,) (57, But his face you could not see,) (89, On account of his Bea ver Hat.)
The Relationship Between Input Splits and HDFS Blocks
以这里的record形式(一行一行的文本)为例, HDFS中的block并不会考虑太多 - 不会强制要求一行文本全部放在同一个block中. 但Split不会把同一行文本分配在2个不同的Input Split中.
几点要注意的地方:
- Controlling the maximum line length 如果使用 TextInputFormat, User可以对每行的长度进行控制. -- mapreduce.input.linerecordreader.line.maxlength - KeyValueTextInputFormat 若每一行已经具备了明文的<Key,Value> 结构, 可使用这个子类方便的实现Input的格式指定. 利用参数 mapreduce.input.keyvaluelinerecordreader.key.value.separator 指定每行的分隔符. 默认是tab键. - 若将多行指定为一条 record, 请研究 NLineInputFormat .
Multiple Inputs
MultipleInputs.addInputPath(job, path1, TextInputFormat.class, MaxTemperatureMapper.class);MultipleInputs.addInputPath(job, path2, TextInputFormat.class, MaxTemperatureMapper.class);
Output Formats
OutputFormat 的类结构图如下所示:
直接跳到多目标输出:
Multiple Outputs
默认情况下, FileOutputFormat 和 其子类将会将reducer的输出结果以 这样的形式命名: "part-r-00000", 如果我们要想达到"不同reducer输出到不同的路径", 研究一下 MultipleOutputs 这个类.
example Partitioning data
还是之前 天气数据的例子, 根据天气状态(station)进行partition.
方法: 在reducer处理过程中, 对station进行处理 -- 1. 把拥有相同station的map输出 partition到同一个; 2. 设置reducer个数等于station的种类数. partitioner的长相:
public class StationPartitioner extends Partitioner<LongWritable, Text> { private NcdcRecordParse parser = new NcdcRecordParser(); @Override pubic int getPartition(LongWritable key, Text value, int numPartitions) { parser.parse(value); } private int getPartition(String stationId) { /// }}
这里省略掉的 getPartition 方法, 简单的说是把已有的stationID 转化成 partition 的索引.
思考: 这样做的方式存在哪些不足?
实现partition需要我们已经对输出信息掌握, 比如这里 要先知道都有哪些station, 才能进行处理. 即使事先给你一个参考字典, 但是若出现了未知的情况, 难免会有麻烦.
Partition的个数被人为的指定了, 这样极可能导致uneven-sized partitions -- 绝大多数reducer处理非常少量的数据, 这决不是一种高效的思路. 如果极个别的reducer 消耗时间明显长于其它的reducer, 那么这样reducer将直接决定job的执行时间.
为了让Job尽快完成, 默认使用 HashPartitioner 完成Partition (尽量以免 unevenly-sized partition)
multipleoutputs 例子
public class PartitionByStationUsingMultipleOutputs extends Configured implement Tool { static class StationMapper extends Mapper<LongWritable, Text, Text, Text> { private NcdcRecord parser = new NcdcRecordParser(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { parser.parse(value); context.write(new Text(parser.getStationId()), value); } } static class MultipleOutputsReducer extends Reducers<Text, Text, NullWritable, Text> { private MultipleOutputs<NullWritable, Text> multipleOutputs; @Override protected void setup(Context context) throws IOException, InterruptedException { multipleOutputs = new MultipleOutputs<NullWritable, Text>(context); } @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value: values) { multipleOutputs.write(NullWritable.get(), value, key.toString()); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { multipleOutputs.close(); } @Override public int run(String[] args) throws Exception { Job job = JobBuilder.parseInputANdOutput(this, getConf(), args); if (job == null) return -1; job.setMapperClass(StationMapper.class); job.setMapOutputKeyClass(Text.class); job.setReducerClass(MultipleOutputReducer.class); job.setOutputKeyClass(NullWritable.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new PartitionByStationUsingMultipleOutputs(), args); System.exit(exitCode); } }}
注, 注意 MultipleOutputs.write() 写的key, value.
我想目标输出创建任意的子目录>? 没问题, 看这段代码吧
@Override protected void reduce(Text key, Iterable<Text>values, Context context) throws IOException, InterruptedException { for (Text value : values) { parser.parse(value); String basePath = String.format("%s/%s/part", parser.getStationId(), parser.getYear()); multipleOutputs.write(NullWritable.get(), value, basePath); }}
注: LazeOutput
FileOutputFormat 创建的 part-r-nnnnn files 即使这个文件是空的(大小为0) , 也会同样创建. 如果不希望这样, 可使用 LazyOutputFormat. 使用 Streaming的话则是 -lazyOutput 参数.
MapReduce Features
这一章, 讨论 counter, joining, 和 sorting
Counter
在任务执行过程中的一些任务信息的反馈, 有些内容我们希望看到稍加统计的结果, 由于统计本身并不复杂, 也不会用到太多高深的技巧, 大多是对job, task等的计数类反馈, 所以这里先来简单了解一下Counter -- 根据所有计数对象的类型, 分参悟了以下几种:
MapReduce task counters -- org.apache.hadoop.mapreduce.TaskCounter
Filesystem counters -- org.apache.hadoop.mapreduce.FileSYstemCounter
FileInputFormat counters -- org.apahce.hadoop.mapreduce.lib.input.FileInputFormatCounter
FileOutputFormat counters -- org.apahce.hadoop.mapreduce.lib.output.FileOutputFormatCounter
Job counters -- org.apache.hadoop.mapreduce.JobCounter
Task counters
显然, 这是一类针对task而收集信息的工具. 例如 MAP_INPUT_RECORDS 等, 有很多我们在MapReduce WebUI 上看到的信息, 其实都是出自于它们的返回结果. 下面来见表:
MAP_INPUT_RECORDS 统计每个map处理的records个数. 最后聚合, 得到整个Job的输入record个数.
SPLIT_RAW_BYTES input-split 对象的bytes, 由于是在原来输入数据又增加了分割的offset, 因此会大于真正的 total input size.
MAP_OUTPUT_RECORDS map output 产出的record个数. 通过每个map的OutputCollector()调用其 collect()方法来完成.
MAO_OUTPUT_BYTES map output产出的非压缩类bytes大小, 通过每个map的OutputCollector()调用其 collect()方法来完成.
MAP_OUTPUT_MATERIALIZE_BYTES map output 直接向Disk产出的bytes大小(包括压缩类的文件的大小)
COMBINE_INPUT_RECORDS 被所有combiners处理过的 input records个数.
COMBINE_OUTPUT_RECORDS 被所有combiners处理过的 output records个数.
REDUCE_INPUT_GROUPS 所有reducer处理的key种类数, 对reducer执行reduce() 方法累增得到.
REDUCE_INPUT_RECORDS 所有reducer处理的 input records 个数.
REDUCE_OUTPUT_RECORDS 所有reducer处理的 output records 个数.
REDUCE_SHUFFLE_BYTES map output 到 reducer过程中 shuffle用到的bytes大小
SPILLED_RECORDS 所有map/reduce过程中spill到磁盘中的records个数
CPU_MILLISECONDS CPU 对该任务的消耗 毫秒
PHYSICAL_MEMORY_BYTES 任务消耗的内存大小
VIRTUAL_MEMORY_BYTES 任务消耗的虚拟内存大小
COMMITTED_HEAP_BYTES JVM可用的内存大小
GC_TIME_MILLIS GC消耗时间 毫秒
SHUFFLED_MAPS map output 产生的文件数, (被shuffle之后再由reducer处理)
FAILED_SHUFFLE shuffle过程中 map output copy失败的个数
MERGED_MAP_OUTPUTS map output 被合并的个数 (在Shuffle端处理)
BYTES_READ filesystem task counter, map/reduce 读入的bytes
BYTES_WRITTEN filesystem task counter, map/reduce 写入的bytes
READ_OPS filesystem task counter, map/reduce 读的操作个数
LARGE_READ_OPS filesystem task counter, map/reduce 读的操作个数, 限定于 large read(如对于大型目录列表的读入)
WRITE_OPS filesystem task counter, map/reduce 写的操作个数
Job counters
job counter 与其它类的counter 有所不同, 全由application master操纵. 它们用来对Job进行统计汇总, 如:
TOTAL_LAUNCHED_MAPS mapper 启动个数
TOTAL_LAUNCHED_REDUCES reducer 启动个数
TOTAL_LAUNCHED_UBERTASKS uber task的个数
NUM_FAILED_MAPS mapper失败个数
NUM_FAILED_REDUCES reducer失败个数
NUM_KILLED_MAPS mapper killed 个数
NUM_KILLED_REDUCES reducer killed 个数
其它内容暂时略过.
sorting
The ability to sort data is at the heart of MapReduce.
这一部分将会接触到MapReduce中如何使用Sort来重新组织数据流, 以及不同的sort 方式.
PREPARATION
以之前的温度数据为例, 由于要求某些温度数据的最大值, 而原始数据是TEXT结构, 显然不能应照数值型进行排序, 那么 TEXT -> INT(或其它的FLOAT, DOUBLE) 转化过程就要考虑是否有invalid data. 在map端要对不合理的数据过滤处理. 下面看一个实现 数值转型的 程序
public class SortDataPreprocessor extends Configured implements Toll { static class CleannerMapper extends Mapper<LongWritable, Text, IntWritable, Text> { private NcdcRecordParser parser =new NcdcRecordParser(); @Override protected void map(LongWritable key, Text value, Context context) { parser.parse(value); if (parser.isValidTemprature)) { context.write(new IntWritable(parser.getAirTemperature()), value); } } } @Override public int run(String[] args) throws Exception{ Job job = JobBuilder.parseInputAndOutput(this, getConf(), args); if (job == null) { return -1 } job.setMapperClass(CleannerMapper.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueKeyClass(Text.class); job.setNumReduceTasks(0); job.setOutputFormatClass(SequenceFileOutputFormat.class); SequenceFileOutputFormat.setCompressOutput(job, true); SequenceFileOutputFormat.setCompressorClass(job.GzipCodec.class); SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new SortDataPreprocessor(), args); System.exit(exitCode); }}
PARITIAL SORT
默认情况下, MapReduce会根据input records的key进行排序.
Example 9-4. A MapReduce program for sorting a SequenceFile with IntWritable keys using the default HashPartitioner
public class SortByTemperatureUsingHashPartitioner extends Configured implements Tool{ @Override public int run (String[] args) throws Exception{ Job job = JobBuilder.parseInputAndOutput(this, getConf(), args); if (job == null) return -1; job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputKeyClass(IntWrtable.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); SequenceFileOutputFormat.setCompressOutput(job, true); SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.Block); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new SortByTemperatureUsingHashPartitioner(), args); System.exit(exitCode); } }
w假定使用30个reducer来执行程序:
$ hadoop jar hadoop-examples.jar SortByTemperatureUsingHashPartitioner \ -D mapreduce.job.reduces=30 input/folder output/folder
TOTAL SORT
How can you produce a globally sorted file using Hadoop? 如果只有一个partition 可能答案就解决了, 但我们面临问题是多个. 要如何做到全局性排序呢. -- 把构造partitioner时与要排序的值结合到一起. 比如我们有4个partition, 然后把 < -10度的放在第一个, [-10, 0) 放在第2个, [0, 10) 放在第3个, 其余是第4个. 然后在每个partition中对温度进行排序.
MapReduce program for sorting a SequenceFile with IntWritable keys using the TotalOrderPartitioner to globally sort the day
public class SortByTemperatureUsingTotalOrderPartitioner extends Configured implements Tool {@Override public int run(String[] args) throws Exception{ Job job = JobBuilder.parseInputAndOutput(this, getConf(), args); if (job == null) return -1 ; }job.setInputFormatClass(SequenceFileInputFormat.class);job.setOutputKeyClass(IntWritable.class);job.setOutputFormatClass(SequenceFileOutputFormat.class);SequenceFileOutputFormat.setCompressOutput(job, true); SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.classs); SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK); job.setPartitionerClass(TotalORderPartitioner.class); InputSampler.Sampler<IntWrtable, Text> sampler = new InputSampler.RandomSample<IntWritable, Text>(0.1, 10000, 10); InputSampler.writePartitionFile(job, sampler); // Add to DistributedCacheConfiguration conf = job.getConfiguration(); String partitionFile = TotalOrderPartitioner.getPartitionFile(conf); URI partitionUri = new URI(partitionFile); job.addCacheFile(partitionUri); return job.waitForCompletion(true) ? 0 : 1; }public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new SortByTemperatureUsingTotalOrderPartitioner(), args); System.exit(exitCode); }}
Secondary Sort
Application to find the maximum temperature by sorting temperatures in the key
public class MaxTemperatureUsingSecondarySort extends Tool{ static class MaxTemperatureMapper extends Mapper<LongWritable, Text, IntPair, NullWritable> { private NcdcRecordParser parser = new NcdcRecordParser(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { parser.parse(value); if (parser.isValidTemperature()){ context.write(new IntPair(parser.getYearInt(), parser.getAirTemperture()), NullWritable.get()); } } } static class MaxTemperatureReducer extends Reducer<IntPair, NullWritalbe, IntPair, NullWritalbe> { @Override protected void reduce(IntPair key, Iterable<NullWritable> values, Context context throws IOException, InterruptedException{ context.write(key, NullWritable.get()); } } public static class FirstPartitioner extends Partitioner<IntPair, NullWritable> { @Override public int getPartition(IntPair key, NullWritable value, int numPartitions){ // multiply by 127 to perform some mixing return Math.abs(key.getFirst() * 127 ) % numPartitions; } } public static class KeyComparator extends WritableComparator { protected KeyComparator() { super(IntPair.class, true); } @Override public int compare(WritableComparable w1, WritableComparable w2) { IntPair ip1 = (IntPair) w1; IntPair ip2 = (IntPair) w2; int cmp = IntPair.compare(ip1.getFirst(), ip2.getFirst()); if (cmp != 0) return cpm; return - IntPair.compare(ip1.getSecond(), ip2.getSecond()); } } public static class GroupComparator extends WritableComparator { protected GroupComparator() { super(IntPair.class, true); } @Override public int compare(WritableComparable w1, WritableComparable w2) { IntPair ip1 = (IntPair) w1; IntPair ip2 = (IntPair) w2; int cmp = IntPair.compare(ip1.getFirst(), ip2.getFirst()); return cmp; } @Override public int run(String[] args) throws Exception { Job job = JobBuilder.parseInputAndOutput(this, getConf(), args); if (job==null) return -1; job.setMapperClass(MaxTemperatureMapper.class); job.setSortComparatorClass(KeyComparator.class); job.setGroupingComparatorClass(GroupComparator.class); job.setReducerClass(MaxTemperatureReducer.class); job.setOutputKeyClass(IntPair.class); job.setOutputValueClass(NullWritable.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args ) throws Exception { int exitCode = ToolRunner.run(new MaxTemperatureUsingSecondarySort(), args); System.exit(exitCode); }}
STREAMING
to do a secondary sort in Streaming, we can take advantage of a couple of library classes that Hadoop provides.
$ hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \ -D stream.num.map.output.key.fields=2 \ -D mapreduce.partition.keypartitioner.options=-k1, 1 \ -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \ -D mapreduce.partition.keycomparator.options="-k1n k2nr" \ -input input/ncdc/all \ -output output-secondarysport-streaming \ -mapper ... -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \ -reducer ch...
joining datasets