Hadoop学习笔记

参考:http://www.cnblogs.com/heavenwang/p/3988033.html

1. 基本概念

Hadoop 是Apache基金会下一个开源的分布式计算平台,它以分布式文件系统HDFS和MapReduce算法为核心,为用户提供了系统底层细节透明的分布式基础架构。
如下图Hadoop集群中有很多并行的机器来存储和分析数据,客户端把任务提交到集群,集群计算返回结果。

Hadoop强调把代码向数据迁移,即Hadoop集群中既包含数据又包含运算环境,并且尽可能让一段数据的计算发生在同一台机器上,代码比数据更加容易移动,Hadoop的设计理念即是把要执行的计算代码移动到数据所在的机器上去。


  • HDFS是一种分布式文件系统,数据被保存在计算机集群上,HDFS为HBase等工具提供了基础。
  • MapReduce,它是一个分布式、并行处理的编程模型,MapReduce把任务分为map(映射)阶段和reduce(化简)。实现并行。
  • Hive类似于SQL高级语言,用于运行存储在Hadoop上的查询语句。





2. Hadoop与SQL数据库

从总体上看,现在大多数数据应用处理的主力是关系型数据库,即SQL面向的是结构化的数据,而Hadoop则针对的是非结构化的数据,从这一角度看,Hadoop提供了对数据处理的一种更为通用的方式。

  1. 用scale-out代替scale-up
    拓展商用服务器的代价是非常昂贵的。要运行一个更大的数据库,就要一个更大的服务器,事实上,各服务器厂商往往会把其昂贵的高端机标称为“数据库级服务器”,不过有时候有可能需要处理更大的数据集,但却找不到更大的机器,而更为重要的是,高端机对于许多应用并不经济。

  2. 用键值对代替关系表
    关系型数据库需要将数据按照某种模式存放到具有关系型数据结构表中,但是许多当前的数据模型并不能很好的适应这些模型,如文本、图片、xml等,此外,大型数据集往往是非结构化或半结构化的。而Hadoop以键值对作为最基本的数据单元,能够灵活的处理较少结构化的数据类

  3. 用函数式编程(MapReduce)代替声明式查询(SQL)
    SQL从根本上说是一个高级声明式语言,它的手段是声明你想要的结果,并让数据库引擎判断如何获取数据。而在MapReduce程序中,实际的数据处理步骤是由你指定的。SQL使用查询语句,而MapReduce使用程序和脚本。MapReduce还可以建立复杂的数据统计模型,或者改变图像数据的处理格式。

  4. 用离线批量处理代替在线处理
    Hadoop并不适合处理那种对几条记录读写的在线事务处理模式,而适合一次写入多次读取的数据需求。




3. HDFS Hadoop Distributed File System

特点:高访问量,高容错性,线性拓展

HDFS采用了主从(Master/Slave)结构模型,一个HDFS集群是由一个NameNode和若干个DataNode组成的。其中NameNode作为主服务器,管理文件系统的命名空间和客户端对文件的访问操作;集群中的DataNode管理存储的数据。NameNode执行文件系统的命名操作,比如打开、关闭、重命名文件或目录等,它也负责数据块到具体DataNode的映射。DataNode负责处理文件系统客户端的文件读写请求,并在NameNode的同意调度下进行数据块的创建、删除和复制工作


HDFS是Master和Slave的结构

结构:

  • NameNode:master节点,管理HDFS的名称空间和数据块映射信息、配置副本策略和处理客户端请求
  • Secondary NameNode: 辅助master节点,备份namespace
  • DataNod: slave节点,实际存储数据、执行数据块的读写并汇报存储信息给NameNode



操作:

  • hadoop fs: 实际存储数据、执行数据块的读写并汇报存储信息给NameNode;
hadoop fs -ls / 
hadoop fs -lsr 
hadoop fs -mkdir /user/hadoop 
hadoop fs -put a.txt /user/hadoop/ 
hadoop fs -get /user/hadoop/a.txt / 
hadoop fs -cp src dst 
hadoop fs -mv src dst 
hadoop fs -cat /user/hadoop/a.txt 
hadoop fs -rm /user/hadoop/a.txt 
hadoop fs -rmr /user/hadoop/a.txt 
hadoop fs -text /user/hadoop/a.txt 
hadoop fs -copyFromLocal localsrc dst 与hadoop fs -put功能类似。 
hadoop fs -moveFromLocal localsrc dst
  • hadoop fsadmin
    dfsadmin是一个多任务的工具,我们可以使用它来获取HDFS的状态信息,以及在HDFS上执行的一系列管理操作。

    -report:查看文件系统的基本信息和统计信息。
    -safeadmin enter | leave | get | wait:安全模式命令。安全模式是NameNode的一种状态,在这种状态下,NameNode不接受对名字空间的更改(只读);不复制或删除块。NameNode在启动时自动进入安全模式,当配置块的最小百分数满足最小副本数的条件时,会自动离开安全模式。enter是进入,leave是离开。
    -refreshNodes:重新读取hosts和exclude文件,使新的节点或需要退出集群的节点能够被NameNode重新识别。这个命令在新增节点或注销节点时用到。

  • hadoop fsck
    HDFS支持fsck命令用以检查各种不一致。fsck用以报告各种文件问题,如block丢失或缺少block等。



4. MapReduce

是一个分布式计算框架,用来并行计算海量数据。

MapReduce 框架的核心步骤主要分两部分:Map 和Reduce。

Hadoop的MapReduce模型是通过输入key/value对进行运算得到输出key/value对。其分为Map过程和Reduce过程。

  • Map主要的工作是接收一个key/value对,产生一个中间key/value对,之后MapReduce把集合中所有相同key值的value放在一起并传递给Reduce函数。
  • Reduce函数接收key和相关的value集合并合并这些value值,得到一个较小的value集合。

下图是MapReduce的数据流图,体现了MapReduce处理大数据集的过程。这个过程就是将大数据分解为成百上千个小数据集,每个(或若干个)数据集分别由集群中的一个节点进行处理并生成的中间结果,然后这些中间结果又由大量的节点合并,形成最终结果。




例子1:Hadoop Word Count

package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>{
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
      
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }
  
  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setInputFormatClass(TextInputFormat.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}



例子2:下载气象数据,求每年的最低温度
Min Temperature

 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MinTemperature {

        public static void main(String[] args) throws Exception {
            if(args.length != 2) {
                System.err.println("Usage: MinTemperature<input path> <output path>");
                System.exit(-1);
            }

            Job job = new Job();
            job.setJarByClass(MinTemperature.class);
            job.setJobName("Min temperature");
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            job.setMapperClass(MinTemperatureMapper.class);
            job.setReducerClass(MinTemperatureReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }

MinTemperatureMapper

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

    public class MinTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

        private static final int MISSING = 9999;

        @Override 
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String line = value.toString();
            String year = line.substring(15, 19);

            int airTemperature;
            if(line.charAt(87) == '+') {
                airTemperature = Integer.parseInt(line.substring(88, 92));
            } else {
                airTemperature = Integer.parseInt(line.substring(87, 92));
            }

            String quality = line.substring(92, 93);
            if(airTemperature != MISSING && quality.matches("[01459]")) {
                context.write(new Text(year), new IntWritable(airTemperature));
            }
        }
    }

MinTemperatureReducer

  import java.io.IOException;
  import org.apache.hadoop.io.IntWritable;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.mapreduce.Reducer;

    public class MinTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

            int minValue = Integer.MAX_VALUE;
            for(IntWritable value : values) {
                minValue = Math.min(minValue, value.get());
            }
            context.write(key, new IntWritable(minValue));
        }
    }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342

推荐阅读更多精彩内容