Input Splits and Records
- 每个map处理一个输入分片
split
,输入分片是一个数据块 - 每个分片内部包含若干记录
record
(key-value 对),由map依次处理 - 都是逻辑概念,用来划分任务
package org.apache.hadoop.mapreduce;
public abstract class InputSplit {
//获取分片的大小(字节),用来排序,优先处理最大的分片,以最小化运行时间
public abstract long getLength() throws IOException, InterruptedException;
//存储位置(主机名称),用来调度map任务到分片数据附近
public abstract String[] getLocations() throws IOException, InterruptedException;
}
InputSplit
由实现InputFormat
接口的类创建,该类同时负责并将他们分割成记录
public abstract class InputFormat<K, V> {
//获取所有的split,从中可以得到map任务的数量
public abstract List<InputSplit> getSplits(JobContext context)
throws IOException, InterruptedException;
//获取输入分片内的记录
public abstract RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException;
}
RecordReader
类似于一个迭代器,用来获取split对应的键值对记录record
FileInputFormat
FileInputFormat
是所有使用文件作为数据源的InputFormat
的基类
该抽象类主要实现了getSplits
方法,将文件分片,返回一个FileSplit
列表,内部包含文件的位置信息和长度信息,从分片获取记录的工作由具体的子类来完成
-
TextInputFormat extends FileInputFormat<LongWritable, Text>
:每条记录的Key表示该行在整个文件中的字节偏移,Value是该行的内容 -
KeyValueTextInputFormat extends FileInputFormat<Text, Text>
:文件中的每行记录是一个键值对,分割符可以自行设定,默认是\t
-
NLineInputFormat extends FileInputFormat<LongWritable, Text>
:设定每个split中含有固定N行的数据,产生的RecordReader
与TextInputFormat
相同,都是字节偏移作为key值 -
CombineFileInputFormat<K, V> extends FileInputFormat<K, V>
:文件较小,数量大会导致很多map任务,每个map都会有额外的开销,使用CombineFileSplit
将多个小文件打包为一个split,将一个mpper需要的输入数据量和HDFS块大小解耦 -
SequenceFileInputFormat<K, V>
:用于读取二进制类型的SequenceFile,它是可分割的(内部由同步点),同时还支持压缩的键值对文件结构
输入路径
添加路径到输入列表,或者设定输入列表,路径可以是文件,目录或者通配符路径,
public static void addInputPath(Job job, Path path)
public static void addInputPaths(Job job, String commaSeparatedPaths)
public static void setInputPaths(Job job, Path... inputPaths)
public static void setInputPaths(Job job, String commaSeparatedPaths)
设定过滤器
//设定自定义的路径过滤器
public static void setInputPathFilter(Job job,
Class<? extends PathFilter> filter)
//默认过滤器,排除隐藏文件,以点或下划线开头的文件
private static final PathFilter hiddenFileFilter
相关配置属性
FileInputFormat相关的配置:
// 输入目录
public static final String INPUT_DIR =
"mapreduce.input.fileinputformat.inputdir";
public static final String SPLIT_MAXSIZE =
"mapreduce.input.fileinputformat.split.maxsize";
public static final String SPLIT_MINSIZE =
"mapreduce.input.fileinputformat.split.minsize";
// 文件过滤器
public static final String PATHFILTER_CLASS =
"mapreduce.input.pathFilter.class";
public static final String NUM_INPUT_FILES =
"mapreduce.input.fileinputformat.numinputfiles";
// 递归读取输入目录
public static final String INPUT_DIR_RECURSIVE =
"mapreduce.input.fileinputformat.input.dir.recursive";
public static final String LIST_STATUS_NUM_THREADS =
"mapreduce.input.fileinputformat.list-status.num-threads";
// 当剩余数据与分片大小比值大于SPLIT_SLOP时,继续分片, 否则停止分片
private static final double SPLIT_SLOP = 1.1; // 10% slop
FileInputFormat input splits
FileInputFormat
只对大文件(超过HDFS块大小)的文件进行分割
控制split大小的属性
属性名称 | 类型 | 默认值 | 描述 |
---|---|---|---|
SPLIT_MAXSIZE /mapreduce.input.fileinputformat.split.maxsize
|
int | 1 | 一个分片文件的最小有效字节数目 |
SPLIT_MINSIZE / mapreduce.input.fileinputformat.split.minsize
|
long | Long.MAX_VALUE |
一个分片的最大有效字节数目 |
dfs.blocksize |
long | 128 MB(134217728) | HDFS的block大小 |
FileInputFormat
的computeSplitSize
方法
Math.max(minSize, Math.min(maxSize, blockSize));
通常有:
minimumSize < blockSize < maximumSize
避免切分
两种方法
- 设定split的最小值大于文件的最大大小
- 重载其
isSplitable
,设定返回值为false
Split和HDFS块的关系
FileInputFormat
的逻辑记录和HDFS的块之间通常并不完全匹配,导致本地split对应的map在执行过程中部分需要远程读数据,由此产生的开销并不明显。
上图中一个文件被分成多行,并且行边界与HDFS块边界不对应,Splits取决于逻辑记录边界,因此第一个split包含第5行,它跨越第一个和第二个块,第二个split从第6行开始
FileSplite
FileInputFormat
产生的Split都是FileSplit
,本身不是数据,只是描述数据切分的状态信息
List<InputSplit> getSplits(JobContext context )
- file:输入文件的路径
- start:split起始位置的字节偏移
- length:split长度
- hosts:位置
- hostInfos:记录位置以及标注数据是否位于内存
所以在FileInputFormat的具体子类在读取单个FileSplit时打开的文件是完整的文件,不是限定在FileSplit范围内的文件块,然后seek
到对应的offset开始读取,除第一个split之外,每个split跳过第一条记录(第一条记录可能不完整),然后在到达结尾时确保超出split读取边界(读取下一个Split的第一条记录)
输出
一般只需要重写getRecordWriter(TaskAttemptContext taskAttemptContext)
方法提供RecordWriter即可,不过可能需要对多个输出文件进行合并