Hadoop InputFormat

Input Splits and Records

  • 每个map处理一个输入分片split,输入分片是一个数据块
  • 每个分片内部包含若干记录record(key-value 对),由map依次处理
  • 都是逻辑概念,用来划分任务
package org.apache.hadoop.mapreduce;
public abstract class InputSplit {
    //获取分片的大小(字节),用来排序,优先处理最大的分片,以最小化运行时间
    public abstract long getLength() throws IOException, InterruptedException;

    //存储位置(主机名称),用来调度map任务到分片数据附近
    public abstract String[] getLocations() throws IOException, InterruptedException;
}

InputSplit由实现InputFormat接口的类创建,该类同时负责并将他们分割成记录

public abstract class InputFormat<K, V> {
      //获取所有的split,从中可以得到map任务的数量
      public abstract List<InputSplit> getSplits(JobContext context)
          throws IOException, InterruptedException;
      //获取输入分片内的记录
      public abstract RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context)
          throws IOException, InterruptedException;
}

RecordReader类似于一个迭代器,用来获取split对应的键值对记录record

FileInputFormat

FileInputFormat是所有使用文件作为数据源的InputFormat的基类

该抽象类主要实现了getSplits方法,将文件分片,返回一个FileSplit列表,内部包含文件的位置信息和长度信息,从分片获取记录的工作由具体的子类来完成

InputFormat class hierarchy
  • TextInputFormat extends FileInputFormat<LongWritable, Text>:每条记录的Key表示该行在整个文件中的字节偏移,Value是该行的内容
  • KeyValueTextInputFormat extends FileInputFormat<Text, Text>:文件中的每行记录是一个键值对,分割符可以自行设定,默认是\t
  • NLineInputFormat extends FileInputFormat<LongWritable, Text>:设定每个split中含有固定N行的数据,产生的RecordReaderTextInputFormat相同,都是字节偏移作为key值
  • CombineFileInputFormat<K, V> extends FileInputFormat<K, V>:文件较小,数量大会导致很多map任务,每个map都会有额外的开销,使用CombineFileSplit将多个小文件打包为一个split,将一个mpper需要的输入数据量和HDFS块大小解耦
  • SequenceFileInputFormat<K, V>:用于读取二进制类型的SequenceFile,它是可分割的(内部由同步点),同时还支持压缩的键值对文件结构

输入路径

添加路径到输入列表,或者设定输入列表,路径可以是文件,目录或者通配符路径,

public static void addInputPath(Job job, Path path)
public static void addInputPaths(Job job, String commaSeparatedPaths)
public static void setInputPaths(Job job, Path... inputPaths)
public static void setInputPaths(Job job, String commaSeparatedPaths)

设定过滤器

//设定自定义的路径过滤器
public static void setInputPathFilter(Job job,
                                        Class<? extends PathFilter> filter)
//默认过滤器,排除隐藏文件,以点或下划线开头的文件
private static final PathFilter hiddenFileFilter

相关配置属性

FileInputFormat相关的配置:

  // 输入目录
  public static final String INPUT_DIR = 
    "mapreduce.input.fileinputformat.inputdir";
  public static final String SPLIT_MAXSIZE = 
    "mapreduce.input.fileinputformat.split.maxsize";
  public static final String SPLIT_MINSIZE = 
    "mapreduce.input.fileinputformat.split.minsize";
  // 文件过滤器
  public static final String PATHFILTER_CLASS = 
    "mapreduce.input.pathFilter.class";
  public static final String NUM_INPUT_FILES =
    "mapreduce.input.fileinputformat.numinputfiles";
  // 递归读取输入目录
  public static final String INPUT_DIR_RECURSIVE =
    "mapreduce.input.fileinputformat.input.dir.recursive";
  public static final String LIST_STATUS_NUM_THREADS =
      "mapreduce.input.fileinputformat.list-status.num-threads";
  // 当剩余数据与分片大小比值大于SPLIT_SLOP时,继续分片, 否则停止分片
  private static final double SPLIT_SLOP = 1.1;   // 10% slop

FileInputFormat input splits

FileInputFormat只对大文件(超过HDFS块大小)的文件进行分割

控制split大小的属性

属性名称 类型 默认值 描述
SPLIT_MAXSIZE /mapreduce.input.fileinputformat.split.maxsize int 1 一个分片文件的最小有效字节数目
SPLIT_MINSIZE / mapreduce.input.fileinputformat.split.minsize long Long.MAX_VALUE 一个分片的最大有效字节数目
dfs.blocksize long 128 MB(134217728) HDFS的block大小

FileInputFormatcomputeSplitSize方法

Math.max(minSize, Math.min(maxSize, blockSize));

通常有:

minimumSize < blockSize < maximumSize

避免切分

两种方法

  • 设定split的最小值大于文件的最大大小
  • 重载其isSplitable,设定返回值为false

Split和HDFS块的关系

FileInputFormat的逻辑记录和HDFS的块之间通常并不完全匹配,导致本地split对应的map在执行过程中部分需要远程读数据,由此产生的开销并不明显。

Logical records and HDFS blocks for TextInputFormat

上图中一个文件被分成多行,并且行边界与HDFS块边界不对应,Splits取决于逻辑记录边界,因此第一个split包含第5行,它跨越第一个和第二个块,第二个split从第6行开始

FileSplite

FileInputFormat产生的Split都是FileSplit,本身不是数据,只是描述数据切分的状态信息

List<InputSplit> getSplits(JobContext context )
FileSplite字段信息
  • file:输入文件的路径
  • start:split起始位置的字节偏移
  • length:split长度
  • hosts:位置
  • hostInfos:记录位置以及标注数据是否位于内存

所以在FileInputFormat的具体子类在读取单个FileSplit时打开的文件是完整的文件,不是限定在FileSplit范围内的文件块,然后seek到对应的offset开始读取,除第一个split之外,每个split跳过第一条记录(第一条记录可能不完整),然后在到达结尾时确保超出split读取边界(读取下一个Split的第一条记录)

输出

一般只需要重写getRecordWriter(TaskAttemptContext taskAttemptContext)方法提供RecordWriter即可,不过可能需要对多个输出文件进行合并

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342

推荐阅读更多精彩内容

  • 摘自:http://staticor.io/post/hadoop/2016-01-23hadoop-defini...
    wangliang938阅读 582评论 0 1
  • 要知道怎么对MapReduce作业进行调优前提条件是需要对Map-Reduce的过程了然于胸。 Map Side ...
    在路上很久了阅读 789评论 0 0
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,580评论 18 139
  • MapReduce框架结构## MapReduce是一个用于大规模数据处理的分布式计算模型MapReduce模型主...
    Bloo_m阅读 3,722评论 0 4
  • 思考问题 MapReduce总结 MapReduce MapReduce的定义MapReduce是一种编程模型, ...
    Sakura_P阅读 926评论 0 1