MapReduce输入输出格式之输入格式

1 常用输入格式

输入格式 特点 使用的RecordReader 是否使用FileInputFormat的getSplits
TextInputFormat 以行偏移量为key,以换行符前的字符为Value LineRecordReader
KeyValueTextInputFormat 默认分割符为”\t”,根据分割符来切分行,前为key,后为value KeyValueLineRecordReader,内部使用LineRecordReader
NLineInputFormat 根据属性mapreduce.input.lineinputformat.linespermap所设置的行数为每片split的行数 LineRecordReader 覆盖FileInputFormat的getSplits
SequenceFileInputFormat 使用Hadoop特有文件格式SequenceFile.Reader进行读写,读取二进制文件 SequenceFileRecordReader
DBInputFormat 通过与数据建立连接,将读取的数据根据map数进行分片 DBRecordReader 继承InputFormat,实现分片和RecordReader
InputFormat层级.JPG

2 自定义InputFormat的流程

1)如果是文本格式的数据,那么实现一个XXInputForamt继承FileInputFormat
2)重写 FileInputFormat 里面的 isSplitable() 方法。如果文件是压缩文件的话则不能切割,一般都是支持切割
3)重写 FileInputFormat 里面的 createRecordReader()方法
4)自定义XXRecordReader,来读取特定的格式

XXRecordReader中需要重点实现以下两个的方法
        @Override
        public void initialize(InputSplit input, TaskAttemptContext context)
                throws IOException, InterruptedException {
            FileSplit split=(FileSplit)input;
            Configuration job=context.getConfiguration();
            Path file=split.getPath();
            FileSystem fs=file.getFileSystem(job);
           
            FSDataInputStream fileIn=fs.open(file);
            //红色标记这部分对于文本型数据来说基本是一样的
            in=new LineReader(fileIn,job);
            line=new Text();
            lineKey=new Text();
            lineValue = new Text();
        }

        //此方法读取每行数据,完成自定义的key和value
        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            int linesize=in.readLine(line);//每行数据
            if(linesize==0) return false;
            String[] pieces = line.toString().split("\\s+");//解析每行数据
            ...
            lineKey.set(“key”);//完成自定义key数据
            lineValue.set(“value”);//封装自定义value数据
            return true;
        }       

3 多个输入

1)如果输入格式存在多种,可以设置不同Mapper处理不同的数据源

MultipleInputs.addInputPath(job,ncdcInputPath,TextInputFormat.class,NCDCTemperatureMapper.class);

2)存在多种输入格式,而只有一个Mapper则可使用

public static void addInputPath(Job job,Path path,class< ? extends InputFormat> inputFormatClass);
使用job.setMapperClass();

4. InputFormat及其子类解决的是针对不同的数据格式分片和读取问题

4.1 getSplits方法中实现如何分片?

1)根据不同的数据来源采取不同的切片方式
例子1:文本格式的数据来源
通过计算确认splitSize的大小,假如输入文件为100M,那么splitSize则为64M,那么文件会被切分为64M和36M两个分片输出。

public List<InputSplit> getSplits(JobContext job) throws IOException {
    StopWatch sw = new StopWatch().start();
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    long maxSize = getMaxSplitSize(job);

    // generate splits
    List<InputSplit> splits = new ArrayList<InputSplit>();
    List<FileStatus> files = listStatus(job);
    for (FileStatus file: files) {
      Path path = file.getPath();
      long length = file.getLen();
      if (length != 0) {
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        if (isSplitable(job, path)) {
          long blockSize = file.getBlockSize();
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);

          long bytesRemaining = length;
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
            bytesRemaining -= splitSize;
          }

          if (bytesRemaining != 0) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                       blkLocations[blkIndex].getHosts(),
                       blkLocations[blkIndex].getCachedHosts()));
          }
        } else { // not splitable
          if (LOG.isDebugEnabled()) {
            // Log only if the file is big enough to be splitted
            if (length > Math.min(file.getBlockSize(), minSize)) {
              LOG.debug("File is not splittable so no parallelization "
                  + "is possible: " + file.getPath());
            }
          }
          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                      blkLocations[0].getCachedHosts()));
        }
      } else { 
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    // Save the number of input files for metrics/loadgen
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
          + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
    }
    return splits;
  }

例子2:数据来源为数据库
通过查询需要读取的条数,然后再除以map_task数,得出每个map_task需要处理的条数,进行切分

public List<InputSplit> getSplits(JobContext job) throws IOException {

    ResultSet results = null;  
    Statement statement = null;
    try {
      statement = connection.createStatement();

      results = statement.executeQuery(getCountQuery());//查询总条数
      results.next();

      long count = results.getLong(1);
      int chunks = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
      long chunkSize = (count / chunks);

      results.close();
      statement.close();

      List<InputSplit> splits = new ArrayList<InputSplit>();

      // Split the rows into n-number of chunks and adjust the last chunk
      // accordingly
      for (int i = 0; i < chunks; i++) {
        DBInputSplit split;

        if ((i + 1) == chunks)
          split = new DBInputSplit(i * chunkSize, count);
        else
          split = new DBInputSplit(i * chunkSize, (i * chunkSize)
              + chunkSize);

        splits.add(split);
      }

      connection.commit();
      return splits;
    } catch (SQLException e) {
      throw new IOException("Got SQLException", e);
    } finally {
      try {
        if (results != null) { results.close(); }
      } catch (SQLException e1) {}
      try {
        if (statement != null) { statement.close(); }
      } catch (SQLException e1) {}

      closeConnection();
    }
  }

2)RecordReader类实现如何读取数据
一般的文本格式一般使用LineRecordReader进行读取,然后根据需求进行处理

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 200,392评论 5 470
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,258评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 147,417评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,992评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,930评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,199评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,652评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,327评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,463评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,382评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,432评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,118评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,704评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,787评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,999评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,476评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,057评论 2 341

推荐阅读更多精彩内容

  • 先思考问题 我们处在一个大数据的时代已经是不争的事实,这主要表现在数据源多且大,如互联网数据,人们也认识到数据里往...
    墙角儿的花阅读 7,335评论 0 9
  • 摘自:http://staticor.io/post/hadoop/2016-01-23hadoop-defini...
    wangliang938阅读 578评论 0 1
  • 目的这篇教程从用户的角度出发,全面地介绍了Hadoop Map/Reduce框架的各个方面。先决条件请先确认Had...
    SeanC52111阅读 1,699评论 0 1
  • 思考问题 Mapper类 Mapper类 四个泛型,分别是KEYIN、VALUEIN、KEYOUT、VALUEOU...
    Sakura_P阅读 846评论 0 3
  • 一路逡巡只等归零, 三三两两再遇重启。 一行人, 踏遍世间,留下足迹。 另一群人, 寻找那行人,踏过的脚印。 演变...
    炀歧阅读 726评论 43 37