Hadoop
一. 引言
1.1 什么是大数据
大数据:(Big Data
):数据量级很大的应用处理。TB级 ,日数据增长GB级
K -- M---- G ---- T ----PB ---- EB ---ZB 1024
通过对海量数据进行分析,挖掘,进而发现数据内在的规律,从而为企业或者国家创造价值。
1.2 大数据特点
4V
是大数据典型的特点具体指的是:
# 1.Volume (大量)
数据量很大,至少是TB或者日均增加GB级
# 2.Variety (多样)
a.结构化数据 : 传统关系型数据库中的数据
b.半结构化数据: json xml mongodb存储的数据
c.非结构化数据: 音频 视频
# 3.Velocity(快速)
处理数据速度要快 注意:是相对速度快
# 4.Value (价值)
海量没有价值的数据中,分析出有价值的内容。
1.3 大数据的工作方向
# 1. 业务
电商的推荐系统,智能广告系统,专家系统,智慧城市,智能交通,金融大脑,智慧医疗,灾害预警....
# 2. 工作方向
大数据运维工程师,大数据开发工程师(实时计算,数据仓库,ETL,基本挖掘),数据分析师(算法)
1.4 大数据的起源
Google是最早面临大数据问题的公司。
1. GFS google File System
2. MapReduce
3. BigTable (NoSQL 数据库)
大数据起源可以说是google最早开源的3篇论文,开创了大数据时代
1.5 大数据处理的核心数据类型
大数据处理的核心数据类型通常为:
文本类型
1.6 大数据的数据来源
# 1.自己公司业务系统运行产生的日志 (nginx,log4j,数据库中的日志)
# 2.爬虫
# 3.行业数据 电信 医疗 政府.
1.7 大数据目前面临问题
# 1.存储
如何解决现有大数据中数据存储问题
# 2.统计|计算
如何解决现有大规模的数据集中统计和计算的问题
二. Hadoop的引言
2.1 解决问题
Hadoop
主要是用来解决大数据所面临的数据存储
和数据计算
的问题。
2.2 Hadoop诞生
2003-2004年,Google公布了部分GFS和MapReduce思想的细节,受此启发的Doug Cutting等人用2年的业余时间实现了DFS和MapReduce机制,使Nutch性能飙升。然后Yahoo招安Doug Gutting及其项目。
2005年,Hadoop作为Lucene的子项目Nutch的一部分正式引入Apache基金会。
2006年2月被分离出来,成为一套完整独立的软件,起名为Hadoop
Hadoop名字不是一个缩写,而是一个生造出来的词。是Hadoop之父Doug Cutting儿子毛绒玩具象命名的。
Hadoop之父
Doug Cutting
,可能所有人都间接用过他的作品,他是Lucene
、Nutch
、Hadoop
等项目的发起人
。是他,把高深莫测的搜索技术形成产品,贡献给我们;还是他,打造了目前在云计算和大数据领域里如日中天的Hadoop。
# Haoop核心设计
HDFS (Hadoop Distribute File System) -------> GFS
MapReduce -------> MapReduce
HBase -------> Big Table
apache
组织正式开源,并将hadoop作为apache顶级的开源项目之一
2.3 Hadoop的发行版本
版本 | 是否收费 | 使用难度 |
---|---|---|
Apache 开源的Hadoop | 免费 | ★★★★☆ |
Clouder(CDH) | $4000 (1个节点) | ★★★☆☆ |
Hortonworks | $12500(10个节点) | ★★★☆☆ |
华为hadoop | 未知(内部使用) | ☆☆☆☆☆ |
注意:在实际开发中Appache的Hadoop企业实际使用并不多。最原始(基础)版本。但是却是学习hadoop的基础。
2.4 hadoop的生态圈
# 1.hadoop核心 HDFS,MapReduce
# 2.Hive 通过SQl语句形式执行mapreduce
# 3.Hbase Nosql数据库
# 4.Flume 日志采集工具
# 5.Sqoop sql to hadoop 将数据导入到hadoop中
# 6.Zookeeper 协调服务工具
# 7.Mahout 算法库
# 8.Pig 是MapReduce的一个抽象,它是一个工具/平台,用于分析较大的数据集,并将它们表示为数据流。
三.Hadoop的安装(单机)
说明: hadoop的核心为
HDFS
和MapReduce
3.1 Hadoop的核心之HDFS
3.1.1 HDFS引言
# HDFS (Hadoop Distribute File System): Hadoop 的分布式文件存储系统,他核心解决的大数据的存储问题
3.1.2 HDFS基本架构图
-
NameNode:
是整个HDFS集群的总入口,存储着HDFS的集群的文件元数据(如:client上传文件的文件名 副本数 块数等相关信息)。 -
DataNode:
是真正用来负责存储数据的节点,一个DataNode就是一个真实的物理主机。 -
Block:
数据块,为了能通过多个节点保存大数据集,HDFS将大数据集文件切分成一块块的数据块,在现有hadoop2版本中默认一个块大小为128M。
3.1.3 Hadoop的安装
准备环境
# 0. 安装centos7.x 虚拟机,并启动
# 1. 输入hostname 查看当前主机名
# 2. 使用vim /etc/hostname 修改主机名
# 3. 重启centos 系统 reboot
# 4. 查看修改之后的主机名 hostname
# 5. 添加主机名与ip映射 vim /etc/hosts
加入 ip(当前ip地址) centos(主机名)
# 6. 检测主机名ip配置是否生效
# 7.关闭防火墙
`systemctl stop firewalld
`systemctl disable firewalld
配置java环境变量
# 0.下载jdk
wget https://download.oracle.com/otn/java/jdk/8u231-b11/5b13a193868b4bf28bcb45c792fce896/jdk-8u231-linux-x64.rpm
# 1. 安装jdk文件
rpm -ivh jdk-8u231-linux-x64.rpm
# 2. 配置环境变量 vim /etc/profile
export JAVA_HOME=/usr/java/jdk1.8.0_171-amd64
export PATH=$PATH:$NODE_HOME/bin:$MAVEN_HOME/bin:$JAVA_HOME/bin
# 3. 重新载入配置
source /etc/profile
# 4. 检测配置是否生效
jps
java
javac
安装hadoop
# 0.下载hadoo
wget hadoop-2.9.2.tar.gz 注意:本次课程使用的事hadoop2.9.2版本 和 centos7.x
# 1. 上传hadoop软件包到系统中
hadoop-2.9.2.tar.gz
# 2. 解压到指定文件目录中
tar -zxvf hadoop-2.9.2.tar.gz -C /usr
# 3. 配置hadoop环境变量
export HADOOP_HOME=/usr/hadoop-2.9.2
export PATH=$PATH$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
# 4. 测试环境变量是否配置成功
直接输入hdfs命令查看效果
# 5. 查看hadoop的安装目录 tree -L 1 /usr/hadoop-2.9.2
[root@hadoop ~]# tree -L 2 hadoop-2.9.2
hadoop-2.9.2
├── bin
│ ├── container-executor
│ ├── hadoop
│ ├── hadoop.cmd
│ ├── hdfs
│ ├── hdfs.cmd
│ ├── mapred
│ ├── mapred.cmd
│ ├── rcc
│ ├── test-container-executor
│ ├── yarn
│ └── yarn.cmd
├── etc
│ └── hadoop
├── include
│ ├── hdfs.h
│ ├── Pipes.hh
│ ├── SerialUtils.hh
│ ├── StringUtils.hh
│ └── TemplateFactory.hh
├── lib
│ └── native
├── libexec
│ ├── hadoop-config.cmd
│ ├── hadoop-config.sh
│ ├── hdfs-config.cmd
│ ├── hdfs-config.sh
│ ├── httpfs-config.sh
│ ├── kms-config.sh
│ ├── mapred-config.cmd
│ ├── mapred-config.sh
│ ├── yarn-config.cmd
│ └── yarn-config.sh
├── LICENSE.txt
├── logs
│ ├── hadoop-root-datanode-hadoop.log
│ ├── hadoop-root-datanode-hadoop.out
│ ├── hadoop-root-datanode-hadoop.out.1
│ ├── hadoop-root-datanode-hadoop.out.2
│ ├── hadoop-root-datanode-hadoop.out.3
│ ├── hadoop-root-datanode-hadoop.out.4
│ ├── hadoop-root-datanode-hadoop.out.5
│ ├── hadoop-root-namenode-hadoop.log
│ ├── hadoop-root-namenode-hadoop.out
│ ├── hadoop-root-namenode-hadoop.out.1
│ ├── hadoop-root-namenode-hadoop.out.2
│ ├── hadoop-root-namenode-hadoop.out.3
│ ├── hadoop-root-namenode-hadoop.out.4
│ ├── hadoop-root-namenode-hadoop.out.5
│ ├── hadoop-root-secondarynamenode-hadoop.log
│ ├── hadoop-root-secondarynamenode-hadoop.out
│ ├── hadoop-root-secondarynamenode-hadoop.out.1
│ ├── hadoop-root-secondarynamenode-hadoop.out.2
│ ├── hadoop-root-secondarynamenode-hadoop.out.3
│ ├── hadoop-root-secondarynamenode-hadoop.out.4
│ ├── hadoop-root-secondarynamenode-hadoop.out.5
│ ├── SecurityAuth-root.audit
│ ├── userlogs
│ ├── yarn-root-nodemanager-hadoop.log
│ ├── yarn-root-nodemanager-hadoop.out
│ ├── yarn-root-nodemanager-hadoop.out.1
│ ├── yarn-root-nodemanager-hadoop.out.2
│ ├── yarn-root-nodemanager-hadoop.out.3
│ ├── yarn-root-nodemanager-hadoop.out.4
│ ├── yarn-root-nodemanager-hadoop.out.5
│ ├── yarn-root-resourcemanager-hadoop.log
│ ├── yarn-root-resourcemanager-hadoop.out
│ ├── yarn-root-resourcemanager-hadoop.out.1
│ ├── yarn-root-resourcemanager-hadoop.out.2
│ ├── yarn-root-resourcemanager-hadoop.out.3
│ ├── yarn-root-resourcemanager-hadoop.out.4
│ └── yarn-root-resourcemanager-hadoop.out.5
├── NOTICE.txt
├── README.txt
├── sbin
│ ├── distribute-exclude.sh
│ ├── FederationStateStore
│ ├── hadoop-daemon.sh
│ ├── hadoop-daemons.sh
│ ├── hdfs-config.cmd
│ ├── hdfs-config.sh
│ ├── httpfs.sh
│ ├── kms.sh
│ ├── mr-jobhistory-daemon.sh
│ ├── refresh-namenodes.sh
│ ├── slaves.sh
│ ├── start-all.cmd
│ ├── start-all.sh
│ ├── start-balancer.sh
│ ├── start-dfs.cmd
│ ├── start-dfs.sh
│ ├── start-secure-dns.sh
│ ├── start-yarn.cmd
│ ├── start-yarn.sh
│ ├── stop-all.cmd
│ ├── stop-all.sh
│ ├── stop-balancer.sh
│ ├── stop-dfs.cmd
│ ├── stop-dfs.sh
│ ├── stop-secure-dns.sh
│ ├── stop-yarn.cmd
│ ├── stop-yarn.sh
│ ├── yarn-daemon.sh
│ └── yarn-daemons.sh
└── share
├── doc
└── hadoop
-
bin 和 sbin
目录用来启动hdfs yarn 等可执行的脚本文件 -
etc
目录用来存放hadoop的配置文件 -
logs
目录用来存放hadoop的日志文件 -
share
用来存放hadoop的依赖jar第三方jar目录 -
lib
用来存放hadoop使用核心库文件
# 6.配置core-site.xml
vim /usr/hadoop-2.9.2/etc/hadoop/core-site.xml 加入如下配置:
<configuration>
<!--配置hdfs文件系统默认名称-->
<property>
<name>fs.defaultFS</name>
<value>hdfs://hadoop(主机名):9000</value>
</property>
</configuration>
注意:名称是一个HDFS的URL
# 7.配置hdfs-site.xml
vim /usr/hadoop-2.9.2/etc/hadoop/hdfs-site.xml 加入如下配置:
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>
# 8.配置slaves文件
vim /usr/hadoop-2.9.2/etc/hadoop/slaves 加入如下配置:
hadoop (当前主机名)
# 9.格式化hdfs
hdfs namenode -format (仅仅是第一次使用需要格式化)
`出现如下错误: /usr/hadoop-2.9.2/bin/hdfs:行319: /usr/java/jdk1.8.0_171-amd64/bin//bin/java: 没有那个文件或目录`参考步骤10解决
注意:这里的格式化是格式成hadoop可以识别的文件系统,比如我们买了一块硬盘我们需要格式化成windows或者mac,linux系统识别的文件系统,才能使用这个文件系统。
# 10.配置etc/hadoop目录中中hadoop-env.sh
将原来export JAVA_HOME=$JAVA_HOME ====修改为jdk安装目录==> export JAVA_HOME=/usr/java/jdk1.8.0_171-amd64
# 11.启动HDFS
start-dfs.sh 启动
stop-dfs.sh 关闭
# 12. 查看hadoop是否启动成功
jps 存在以下进程名称说明启动成功
5876 SecondaryNameNode
5702 DataNode
5995 Jps
5612 NameNode
注意:只要能看到NameNode 和 DataNode 说明启动成功
# 13. 访问hdfs提供的web界面
http://IP地址:50070/
3.2 查看HDFS日志
# 1.进入hadoop安装目录中logs目录中
hadoop-root-namenode-hadoop.log ---- namenode日志
hadoop-root-datanode-hadoop.log ---- datanode日志
hadoop-root-secondarynamenode-hadoop.log ---- secondnamenode 日志
yarn-root-resourcemanager-hadoop.log ---- resourcemanager 日志
yarn-root-nodemanager-hadoop.log ---- nodemanager 日志
注意:针对于hdfs日志规则为hadoop-用户名-服务名-主机名.log,针对于后续学习的yarn生成规则为yarn-用户名-服务名-主机名.log
3.3 修改hdfs默认数据位置
- 说明: 通过查看日志得知namenode数据和datanode数据默认都是存放在/tmp//tmp/hadoop-root/dfs下,这对于我们来说是不安全的,因为tmp目录为临时目录,系统可能会定期清除目录中文件,因此为了保证数据安全修改数据默认的存放位置
# 1.修改hadoop安装目录下etc/hadoop/core-site.xml 加入如下配置
<property>
<name>hadoop.tmp.dir</name>
<value>/usr/hadoop-2.9.2/data</value>
</property>
3.4 配置SSH免密登录
SSH 为 [Secure Shell](https://baike.baidu.com/item/Secure Shell) 的缩写,由 IETF 的网络小组(Network Working Group)所制定;SSH 为建立在应用层基础上的安全协议。
从客户端来看,SSH提供两种级别的安全验证。
3.4.1基于口令的安全验证
只要你知道自己帐号和口令,就可以登录到远程主机。所有传输的数据都会被加密,但是不能保证你正在连接的服务器就是你想连接的服务器。可能会有别的服务器在冒充真正的服务器,也就是受到“中间人”这种方式的攻击。
3.4.2 基于密匙的安全验证
需要依靠密匙,也就是你必须为自己创建一对密匙,并把公用密匙放在需要访问的服务器上。如果你要连接到SSH服务器上,客户端软件就会向服务器发出请求,请求用你的密匙进行安全验证。服务器收到请求之后,先在该服务器上你的主目录下寻找你的公用密匙,然后把它和你发送过来的公用密匙进行比较。如果两个密匙一致,服务器就用公用密匙加密“质询”(challenge)并把它发送给客户端软件。客户端软件收到“质询”之后就可以用你的私人密匙解密再把它发送给服务器。
注意:第二种级别不仅加密所有传送的数据,而且“中间人”这种攻击方式也是不可能的(因为他没有你的私人密匙)。但是整个登录的过程可能需要10秒 。
3.4.3 ssh 登录过程
3.4.4 配置ssh
# 1. 生成ssh秘钥对
ssh-keygen -t rsa 然后回车几次就可以啦
# 2. 查看秘钥对生成位置
ls /root/.ssh 会发现在home目录中生成了两个文件
id_rsa(私钥) id_rsa.pub(公钥)
# 3. 将公钥加入另一台机器的受信列表中
ssh-copy-id hadoop(主机名)
cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys (和上面命令一样)
# 4. 再次查看/root/.ssh 目录 多出两个文件其中authorized_keys就是存放公钥列表文件
authorized_keys id_rsa id_rsa.pub known_hosts
# 5. 检测是否配置成功
ssh hadoop 不需要输入密码即可
四. HDFS的基本操作
4.1 Shell基本操作
4.1.1 命令总结
[root@hadoop ~]# hdfs dfs
Usage: hadoop fs [generic options]
[-appendToFile <localsrc> ... <dst>]
[-cat [-ignoreCrc] <src> ...]
[-checksum <src> ...]
[-chgrp [-R] GROUP PATH...]
[-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...]
[-chown [-R] [OWNER][:[GROUP]] PATH...]
[-copyFromLocal [-f] [-p] [-l] [-d] <localsrc> ... <dst>]
[-copyToLocal [-f] [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
[-count [-q] [-h] [-v] [-t [<storage type>]] [-u] [-x] <path> ...]
[-cp [-f] [-p | -p[topax]] [-d] <src> ... <dst>]
[-createSnapshot <snapshotDir> [<snapshotName>]]
[-deleteSnapshot <snapshotDir> <snapshotName>]
[-df [-h] [<path> ...]]
[-du [-s] [-h] [-x] <path> ...]
[-expunge]
[-find <path> ... <expression> ...]
[-get [-f] [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
[-getfacl [-R] <path>]
[-getfattr [-R] {-n name | -d} [-e en] <path>]
[-getmerge [-nl] [-skip-empty-file] <src> <localdst>]
[-help [cmd ...]]
[-ls [-C] [-d] [-h] [-q] [-R] [-t] [-S] [-r] [-u] [<path> ...]]
[-mkdir [-p] <path> ...]
[-moveFromLocal <localsrc> ... <dst>]
[-moveToLocal <src> <localdst>]
[-mv <src> ... <dst>]
[-put [-f] [-p] [-l] [-d] <localsrc> ... <dst>]
[-renameSnapshot <snapshotDir> <oldName> <newName>]
[-rm [-f] [-r|-R] [-skipTrash] [-safely] <src> ...]
[-rmdir [--ignore-fail-on-non-empty] <dir> ...]
[-setfacl [-R] [{-b|-k} {-m|-x <acl_spec>} <path>]|[--set <acl_spec> <path>]]
[-setfattr {-n name [-v value] | -x name} <path>]
[-setrep [-R] [-w] <rep> <path> ...]
[-stat [format] <path> ...]
[-tail [-f] <file>]
[-test -[defsz] <path>]
[-text [-ignoreCrc] <src> ...]
[-touchz <path> ...]
[-truncate [-w] <length> <path> ...]
[-usage [cmd ...]]
4.1.2 常见命令
# 1.查看目录结构
[root@hadoop1 ~]# hdfs dfs -ls /
# 2.上传文件到HDFS
[root@hadoop1 ~]# hdfs dfs -put aa.txt /
# 3.创建文件夹
[root@hadoop1 ~]# hdfs dfs -mkdir -p /bbb/cccc
[root@hadoop1 ~]# hdfs dfs -ls /
Found 2 items
-rw-r--r-- 1 root supergroup 58 2019-12-18 16:13 /aa.txt
drwxr-xr-x - root supergroup 0 2019-12-18 16:16 /bbb
[root@hadoop1 ~]# hdfs dfs -ls /bbb/cccc
# 4.查看文件内容
[root@hadoop1 ~]# hdfs dfs -cat /aa.txt
chenyn 1
xiaohei 1
wangwu 1
xiaohei 1
chenyn 1
zhangsan 1
[root@hadoop1 ~]# hdfs dfs -text /aa.txt
chenyn 1
xiaohei 1
wangwu 1
xiaohei 1
chenyn 1
zhangsan 1
# 5.删除文件
[root@hadoop1 ~]# hdfs dfs -rm /aa.txt
Deleted /aa.txt
# 6.删除空目录
[root@hadoop1 ~]# hdfs dfs -rm -r /bbb ----- 递归删除
Deleted /bbb
[root@hadoop1 ~]# hdfs dfs -mkdir -p /aa/bb/cc ----- 创建多级目录
[root@hadoop1 ~]# hdfs dfs -rm -r -f /aa ----- 强制删除
Deleted /aa
# 7.追加文件内容
[root@hadoop1 ~]# hdfs dfs -put aa.txt /
[root@hadoop1 ~]# hdfs dfs -cat /aa.txt
chenyn 1
xiaohei 1
wangwu 1
xiaohei 1
chenyn 1
zhangsan 1
[root@hadoop1 ~]# touch bb.txt
[root@hadoop1 ~]# echo "xiaohei 1" >> bb.txt
[root@hadoop1 ~]# cat bb.txt
xiaohei 1
[root@hadoop1 ~]# hdfs dfs -appendToFile bb.txt /aa.txt
[root@hadoop1 ~]# hdfs dfs -cat /aa.txt
chenyn 1
xiaohei 1
wangwu 1
xiaohei 1
chenyn 1
zhangsan 1
xiaohei 1
# 8.查看文件的校验核
[root@hadoop1 ~]# hdfs dfs -checksum /aa.txt
/aa.txt MD5-of-0MD5-of-512CRC32C 000002000000000000000000fb2fbd294298362dbaabfb7fc8724306
# 9.查看文件的权限
[root@hadoop1 ~]# hdfs dfs -ls -R /aa.txt
-rw-r--r-- 1 root supergroup 68 2019-12-18 16:35 /aa.txt
[root@hadoop1 ~]# hdfs dfs -chmod a+x /aa.txt
[root@hadoop1 ~]# hdfs dfs -ls -R /aa.txt
-rwxr-xr-x 1 root supergroup 68 2019-12-18 16:35 /aa.txt
# 10.从本地copy到hdfs中
[root@hadoop1 ~]# hdfs dfs -copyFromLocal bb.txt /bb.txt -----从本地复制文件到HDFS
[root@hadoop1 ~]# hdfs dfs -copyFromLocal bb.txt /bb.txt -----如果文件已经存在hdfs 复制失败
copyFromLocal: `/bb.txt': File exists
[root@hadoop1 ~]# hdfs dfs -copyFromLocal -f bb.txt /bb.txt -----如果文件已经存在hdfs 可以强制覆盖hdfs中文件
# 11.hdfs中复制文件
[root@hadoop1 ~]# hdfs dfs -mkdir /datas
[root@hadoop1 ~]# hdfs dfs -cp /aa.txt /datas
[root@hadoop1 ~]# hdfs dfs -ls /datas
Found 1 items
-rw-r--r-- 1 root supergroup 68 2019-12-18 16:54 /datas/aa.txt
# 12.从hdfs上下载文件到本地
[root@hadoop1 ~]# hdfs dfs -ls /
Found 3 items
-rwxr-xr-x 1 root supergroup 68 2019-12-18 16:35 /aa.txt
-rw-r--r-- 1 root supergroup 10 2019-12-18 16:50 /bb.txt
drwxr-xr-x - root supergroup 0 2019-12-18 16:54 /datas
[root@hadoop1 ~]# ls
aa.txt bb.txt hadoop-2.9.2.tar.gz jdk-8u171-linux-x64.rpm
[root@hadoop1 ~]# hdfs dfs -get /aa.txt /root/down.txt
[root@hadoop1 ~]# ls
aa.txt bb.txt down.txt hadoop-2.9.2.tar.gz jdk-8u171-linux-x64.rpm
# 13.查找某个路径下文件
[root@hadoop1 ~]# hdfs dfs -find / -name "aa.txt"
/aa.txt
/datas/aa.txt
# 14.将hdfs文件移动到hdfs另一个位置
[root@hadoop1 ~]# hdfs dfs -ls /
Found 3 items
-rwxr-xr-x 1 root supergroup 68 2019-12-18 16:35 /aa.txt
-rw-r--r-- 1 root supergroup 10 2019-12-18 16:50 /bb.txt
drwxr-xr-x - root supergroup 0 2019-12-18 16:54 /datas
[root@hadoop1 ~]# hdfs dfs -ls /datas
Found 1 items
-rw-r--r-- 1 root supergroup 68 2019-12-18 16:54 /datas/aa.txt
[root@hadoop1 ~]# hdfs dfs -mv /bb.txt /datas/bb.txt
[root@hadoop1 ~]# hdfs dfs -ls /
Found 2 items
-rwxr-xr-x 1 root supergroup 68 2019-12-18 16:35 /aa.txt
drwxr-xr-x - root supergroup 0 2019-12-18 17:03 /datas
[root@hadoop1 ~]# hdfs dfs -ls /datas
Found 2 items
-rw-r--r-- 1 root supergroup 68 2019-12-18 16:54 /datas/aa.txt
-rw-r--r-- 1 root supergroup 10 2019-12-18 16:50 /datas/bb.txt
4.2 Java操作HDFS
4.2.1 引入依赖
<properties>
<hadoop.version>2.9.2</hadoop.version>
</properties>
<dependencies>
<!--hadoop公共依赖-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!--hadoop client 依赖-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!--junit-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
4.2.2 获取hdfs客户端
public class TestHDFS {
private FileSystem fileSystem; //hdfs客户端对象
@Before
public void before() throws IOException {
//hadoop文件系统的权限设置为root
System.setProperty("HADOOP_USER_NAME","root");
//用来对core-site.xml hdfs-site.xml进行配置
Configuration conf = new Configuration();
//连接hdfs
conf.set("fs.defaultFS","hdfs://10.15.0.4:9000");
//设置上传文件的副本集
conf.set("dfs.replication","1");
fileSystem = FileSystem.get(conf);
}
@After
public void close() throws IOException {
fileSystem.close();
}
}
注意: FileSystem是java操作HDFS的客户端对象
4.2.3 上传文件到hdfs
@Test
public void testUpload() throws IOException {
FileInputStream is = new FileInputStream("/Users/chenyannan/IdeaProjects/ideacode/hadoop_hdfs/pom.xml");
Path path = new Path("/pom.xml");
FSDataOutputStream os = fileSystem.create(path);
//参数1:输入流 参数2:输出流 参数3:缓冲区大小 参数4:是否关闭流
IOUtils.copyBytes(is,os,1024,true);
}
4.2.4 hdfs下载文件
// 1.第一种方式
@Test
public void testDownload() throws IOException {
Path source = new Path("/pom.xml");
Path des = new Path("/Users/chenyannan");
fileSystem.copyToLocalFile(source,des);
}
// 2.第二种方式
@Test
public void testDownload1() throws IOException {
Path path = new Path("/pom.xml");
FSDataInputStream in = fileSystem.open(path);
FileOutputStream os = new FileOutputStream("/Users/chenyannan/aa.xml");
IOUtils.copyBytes(in,os,1024,true);
}
4.2.5 hdfs创建目录
@Test
public void testMkdirs() throws IOException {
boolean mkdirs = fileSystem.mkdirs(new Path("/aa/cc/cc"));
System.out.println("mkdirs = " + mkdirs);
}
4.2.6 展示hdfs文件列表
@Test
public void testListFiles() throws IOException {
Path path = new Path("/");
RemoteIterator<LocatedFileStatus> listFiles = fileSystem.listFiles(path, true);
while (listFiles.hasNext()){
LocatedFileStatus next = listFiles.next();
System.out.println("next = " + next);
}
}
4.2.7 展示hdfs目录和文件
@Test
public void testListDirs() throws IOException {
Path path = new Path("/");
FileStatus[] fileStatuses = fileSystem.listStatus(path);
for (FileStatus fileStatus : fileStatuses) {
System.out.println(fileStatus.isDirectory()+" "+fileStatus.getPath());
}
}
4.2.8 删除文件
@Test
public void testDelete() throws IOException {
Path path= new Path("/aa");
//参数1:目录路径 参数2:是否递归删除
fileSystem.delete(path,true);
}
4.3 HDFS配置文件的优先级详解
注意:hadoop的配置文件解析顺序java代码客户端优于 > hadoop目录中etc/中配置优于 >share中jar默认配置
五. NameNode的持久化
5.1 NameNode的数据存在哪?
首先,我们做个假设,如果存储在NameNode节点的磁盘中,因为经常需要进行随机访问,还有响应客户请求,必然是效率过低。因此,元数据需要存放在内存中。
5.2 NameNode的持久化
NameNode数据存在内存中,一旦断电,元数据丢失,整个集群就无法工作了。因此产生在磁盘中备份元数据的FsImage。这样又会带来新的问题,当在内存中的元数据更新时,如果同时更新FsImage,就会导致效率过低,但如果不更新,就会发生一致性问题,一旦NameNode节点断电,就会产生数据丢失。因此,引入Edits文件(只进行追加操作,效率很高)。每当元数据有更新或者添加元数据时,修改内存中的元数据并追加到Edits中。这样,一旦NameNode节点断电,可以通过FsImage和Edits的合并,合成元数据。但是,如果一旦长时间添加数据到Edits中,会导致该文件数据过大,效率降低,而且一旦断电,恢复元数据需要的时间过长。因此,需要定期进行FsImage和Edits的合并,如果这个操作由NameNode节点完成,又会效率过低。因此,引入一个新的节点SecondaryNamenode,专门用于FsImage和Edits的合并。
总结:通过SecondaryNameNode 定期 对 FsImage 和 Edits文件的合并来保证NameNode中数据的高可用
5.3 持久化机制工作原理
# 第一阶段:NameNode启动
1>.第一次启动NameNode格式化后,创建Fsimage和Edits文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。
2>.客户端对元数据进行增删改的请求。
3>.NameNode记录操作日志,更新滚动日志。
4>.NameNode在内存中对数据进行增删改。
# 第二阶段:Secondary NameNode工作
1>.Secondary NameNode询问NameNode是否需要CheckPoint。直接带回NameNode是否检查结果。
2>.Secondary NameNode请求执行CheckPoint。
3>.NameNode滚动正在写的Edits日志。
4>.将滚动前的编辑日志和镜像文件拷贝到Secondary NameNode。
5>.Secondary NameNode加载编辑日志和镜像文件到内存,并合并。
6>.生成新的镜像文件fsimage.chkpoint。
7>.拷贝fsimage.chkpoint到NameNode。
8>.NameNode将fsimage.chkpoint重新命名成fsimage。
1>.NameNode启动时,先滚动Edits并生成一个空的edits.inprogress,然后加载Edits和Fsimage到内存中,此时NameNode内存就持有最新的元数据信息。
2>.Client开始对NameNode发送元数据的增删改的请求,这些请求的操作首先会被记录到edits.inprogress中(查询元数据的操作不会被记录在Edits中,因为查询操作不会更改元数据信息),如果此时NameNode挂掉,重启后会从Edits中读取元数据的信息。然后,NameNode会在内存中执行元数据的增删改的操作。
3>.由于Edits中记录的操作会越来越多,Edits文件会越来越大,导致NameNode在启动加载Edits时会很慢,所以需要对Edits和Fsimage进行合并(所谓合并,就是将Edits和Fsimage加载到内存中,照着Edits中的操作一步步执行,最终形成新的Fsimage)。
4>.SecondaryNameNode的作用就是帮助NameNode进行Edits和Fsimage的合并工作。
5>.SecondaryNameNode首先会询问NameNode是否需要CheckPoint(触发CheckPoint需要满足两个条件中的任意一个,定时时间到和Edits中数据写满了)。直接带回NameNode是否检查结果。
6>.SecondaryNameNode执行CheckPoint操作,首先会让NameNode滚动Edits并生成一个空的edits.inprogress,滚动Edits的目的是给Edits打个标记,以后所有新的操作都写入edits.inprogress,其他未合并的Edits和Fsimage会拷贝到SecondaryNameNode的本地,然后将拷贝的Edits和Fsimage加载到内存中进行合并,生成fsimage.chkpoint,然后将fsimage.chkpoint拷贝给NameNode,重命名为Fsimage后替换掉原来的Fsimage。
7>.NameNode在启动时就只需要加载之前未合并的Edits和Fsimage即可,因为合并过的Edits中的元数据信息已经被记录在Fsimage中
5.4 Checkpoint的时间参数设置
修改hdfs-site.xml配置合并时间
<property>
<name>dfs.namenode.checkpoint.period</name>
<value>3600</value>
</property>
修改hdfs-site.xml中操作次数和检查操作次数周期
<property>
<name>dfs.namenode.checkpoint.txns</name>
<value>1000000</value>
<description>操作动作次数</description>
</property>
<property>
<name>dfs.namenode.checkpoint.check.period</name>
<value>60</value>
<description> 1分钟检查一次操作次数</description>
</property>
六.HDFS完全分布式(简单版)
6.1 集群选型
# 1.NameNode 内存大 性能好一点的机器
# 2.DataNode 硬盘大 性能中等一般的机器即可
# 3.可以让NameNode 同时充当DataNode节点
6.2 集群搭建
# 1.克隆三台机器通过CRT连接
10.15.0.6
10.15.0.7
10.15.0.8
# 2.配置主机名以
vim /etc/hostname
NameNode修改为 : hadoop2
DataNode1修改为: hadoop3
DataNode2修改为: hadoop4
修改完成后必须重新启动
# 3.配置主机名与ip映射
vim /etc/hosts 三个机器配置一致
10.15.0.6 hadoop2
10.15.0.7 hadoop3
10.15.0.8 hadoop4
注意:配置完成后可以使用ping命令测试下是否配置成功
# 4.配置ssh免密登录
NameNode中执行: ssh-copyid root@hadoop2 ssh-copyid root@hadoop3 ssh-copyid root@hadoop4
# 5.安装jdk配置环境变量省略
# 6.安装hadoop 并配置环境变量省略
# 7.配置三个机器 hadoop-env.sh中环境变量设置
vim /usr/hadoop-2.9.2/etc/hadoop/hadoop-env.sh 文件
# 8.配置三台机器的core-site.xml文件
vim /usr/hadoop-2.9.2/etc/hadoop/core-site.xml
<property>
<name>fs.defaultFS</name>
<value>hdfs://hadoop2:9000</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/usr/hadoop-2.9.2/data</value>
</property>
注意:在配置core-site.xml时想要使那台节点作为NameNode节点,就在配置文件中使用那个节点主机名作为fs.defaultFS的名字使用,图中使用的是Hadoop2作为NameNode,所以配置全部为Hadoop2
# 9.配置三台机器hdfs-site.xml配置文件
vim /usr/hadoop-2.9.2/etc/hadoop/hdfs-site.xml
注意:这里将副本数量修改为了3份,默认配置也是3份
# 10.配置三个机器slaves文件
vim /usr/hadoop-2.9.2/etc/hadoop/slaves
hadoop2
hadoop3
hadoop4
注意:slaves文件用来决定哪些节点为datanode,因为Hadoop2节点既要是NameNode也要是DataNode因此也要讲hadoop2放在salves文件中,这样才会认为自己也是一个dataNode节点
# 4.在hadoop2节点上进行Namenode格式化
[root@hadoop2 ~]# hdfs namenode -format
# 5.启动hdfs集群
在hadoop2上执行: start-dfs.sh 关闭使用: stop-dfs.sh
# 6.查看各个节点进程
# 7.上传文件到hdfs上测试
七.ZK搭建高可用HDFS集群
7.1 原理解析
QJM(Quorum Journal Manager)是Hadoop专门为Namenode共享存储开发的组件。其集群运行一组Journal Node,每个Journal 节点暴露一个简单的RPC接口,允许Namenode读取和写入数据,数据存放在Journal节点的本地磁盘。当Namenode写入edit log时,它向集群的所有Journal Node发送写入请求,当多数节点回复确认成功写入之后,edit log就认为是成功写入。例如有3个Journal Node,Namenode如果收到来自2个节点的确认消息,则认为写入成功。
而在故障自动转移的处理上,引入了监控Namenode状态的ZookeeperFailController(ZKFC)。ZKFC一般运行在Namenode的宿主机器上,与Zookeeper集群协作完成故障的自动转移。整个集群架构图如下:
7.2 搭建HDFS高可用集群
# 0.集群规划 和 环境准备
hadoop1 10.15.0.5 ---(zk cluster 这里zk集群放在单独一台机器搭建的是一个伪分布式,我们重点关注hdfs集群)
hadoop2 10.15.0.6 --- namenode(active) & datanode & DFSZKFailoverController(zkfc) & journalnode
hadoop3 10.15.0.7 --- datanode & namenode(standby) & DFSZKFailoverController(zkfc) & journalnode
hadoop4 10.15.0.8 --- datanode & journalnode
环境准备: centos7.x 必须安装: yum install psmisc -y
1.修改Linux主机名
2.修改IP
3.修改主机名和IP的映射关系 /etc/hosts
4.关闭防火墙
5.ssh免登陆
6.安装JDK,配置环境变量等 省略
# 1.安装zk(在一个机器上实现zk集群)
# 2.准备3个数据存放目录
mkdir -p /root/zkdata1
mkdir -p /root/zkdata2
mkdir -p /root/zkdata3
# 3.在每个数据文件夹中准备一个myid文件
touch /root/zkdata1/myid
touch /root/zkdata2/myid
touch /root/zkdata3/myid
# 4.编辑每个data目录中myid
vim /root/zkdata1/myid 输入 1
vim /root/zkdata2/myid 输入 2
vim /root/zkdata3/myid 输入 3
# 5.将zk配置文件复制三份到zkdata目录中
cp zk安装目录中/conf/zoo.cfg /root/zkdata1
cp zk安装目录中/conf/zoo.cfg /root/zkdata2
cp zk安装目录中/conf/zoo.cfg /root/zkdata3
# 6.分别修改zkdata目录中zoo.cfg配置端口号和数据目录位置
-
配置节点1 vim /root/zkdata1/zoo.cfg
tickTime=2000 initLimit=10 syncLimit=5 dataDir=/root/zkdata clientPort=3001 server.1=主机名:3002:3003 server.2=主机名:4002:4003 server.3=主机名:5002:5003
-
配置节点2 vim /root/zkdata2/zoo.cfg
tickTime=2000 initLimit=10 syncLimit=5 dataDir=/root/zkdata clientPort=4001 server.1=主机名:3002:3003 server.2=主机名:4002:4003 server.3=主机名:5002:5003
-
配置节点3 vim /root/zkdata3/zoo.cfg
tickTime=2000 initLimit=10 syncLimit=5 dataDir=/root/zkdata clientPort=5001 server.1=主机名:3002:3003 server.2=主机名:4002:4003 server.3=主机名:5002:5003
# 7.启动zk节点
[root@c60 zookeeper]# ./bin/zkServer.sh start /root/zkdata1/zoo.cfg
[root@c60 zookeeper]# ./bin/zkServer.sh start /root/zkdata2/zoo.cfg
[root@c60 zookeeper]# ./bin/zkServer.sh start /root/zkdata3/zoo.cfg
# 8.查看zk角色信息
[root@c60 zookeeper]# ./bin/zkServer.sh status /root/zkdata1/zoo.cfg
# 9.查看其它3个hadoop机器主机名 回顾集群规划
hadoop1 10.15.0.5 --- (zkcluster 上面我们已经构建zk集群)
hadoop2 10.15.0.6 --- namenode(active) & datanode & DFSZKFailoverController(zkfc)
hadoop3 10.15.0.7 --- datanode & namenode(standby) & DFSZKFailoverController(zkfc)
hadoop4 10.15.0.8 --- datanode
# 1.配置hadoop的core-site.xml 三个机器一致内容如下:
vim /usr/hadoop-2.9.2/etc/hadoop/core-site.xml
<!--hdfs主要入口不再是一个具体机器而是一个虚拟的名称 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://ns</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/usr/hadoop-2.9.2/data</value>
</property>
<property>
<name>ha.zookeeper.quorum</name>
<value>hadoop1:3001,hadoop1:4001,hadoop1:5001</value>
</property>
<img src="https://upload-images.jianshu.io/upload_images/19301226-54c05119fed56846.png" alt="image-20191219133333624" />
注意:ha.zookeeper.quorum 用来指定zk集群的节点数
# 2.配置hdfs-site.xml
<!--指定hdfs的nameservice为ns,需要和core-site.xml中的保持一致 -->
<property>
<name>dfs.nameservices</name>
<value>ns</value>
</property>
<!-- ns下面有两个NameNode,分别是nn1,nn2 -->
<property>
<name>dfs.ha.namenodes.ns</name>
<value>nn1,nn2</value>
</property>
<!-- nn1的RPC通信地址 -->
<property>
<name>dfs.namenode.rpc-address.ns.nn1</name>
<value>hadoop2:9000</value>
</property>
<!-- nn1的http通信地址 -->
<property>
<name>dfs.namenode.http-address.ns.nn1</name>
<value>hadoop2:50070</value>
</property>
<!-- nn2的RPC通信地址 -->
<property>
<name>dfs.namenode.rpc-address.ns.nn2</name>
<value>hadoop3:9000</value>
</property>
<!-- nn2的http通信地址 -->
<property>
<name>dfs.namenode.http-address.ns.nn2</name>
<value>hadoop3:50070</value>
</property>
<!-- 指定NameNode的元数据在JournalNode上的存放位置 -->
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://hadoop2:8485;hadoop3:8485;hadoop4:8485/ns</value>
</property>
<!-- 指定JournalNode在本地磁盘存放数据的位置 -->
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/root/journal</value>
</property>
<!-- 开启NameNode故障时自动切换 -->
<property>
<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>
</property>
<!-- 配置失败自动切换实现方式 -->
<property>
<name>dfs.client.failover.proxy.provider.ns</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<!-- 配置隔离机制,如果ssh是默认22端口,value直接写sshfence即可 -->
<property>
<name>dfs.ha.fencing.methods</name>
<value>sshfence</value>
</property>
<!-- 使用隔离机制时需要ssh免登陆 -->
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/root/.ssh/id_rsa</value>
</property>
# 修改slaves文件指定哪些机器为DataNode
hadoop2
hadoop3
hadoop4
注意:由于配置内容比较多,这里就不截图了,保证三个机器配置一致即可
# 3.在任意一个namenode上执行如下命令:
[root@hadoop2 ~]# hdfs zkfc -formatZK
# 4.启动journalnode(分别在在hadoop2、hadoop3、hadoop4上执行)
[root@hadoop2 ~]# hadoop-daemon.sh start journalnode
[root@hadoop3 ~]# hadoop-daemon.sh start journalnode
[root@hadoop4 ~]# hadoop-daemon.sh start journalnode
使用:jps查看如过都出现如下进程说明启动成功
# 5.在hadoop2上执行(NameNode active)节点执行:
[root@hadoop2 ~]# hdfs namenode -format ns
# 6.启动hdfs集群
[root@hadoop2 ~]# start-dfs.sh
# 7.在standby 的 NameNode节点上执行如下命令:
[root@hadoop3 ~]# hdfs namenode -bootstrapStandby
[root@hadoop14 ~]# hadoop-daemon.sh start namenode
注意:启动standby的NameNode的
# 8.查看namenode(standby) 进程 和 hadoop4 进程
-
namenode (standby)
-
hadoop4 进程
# 9.访问namenode(active) 与 namenode(standby) hdfsweb界面
-
hadoop2(namenode active)
-
Hadoop3 (standby namenode)
# 10.停止正常NameNode进行测试
[root@hadoop2 ~]# jps
7426 DFSZKFailoverController
8499 NameNode
8631 Jps
7128 DataNode
5519 JournalNode
[root@hadoop2 ~]# kill 8499
[root@hadoop2 ~]# jps
7426 DFSZKFailoverController
7128 DataNode
8669 Jps
5519 JournalNode
-
hadoop2(namenode down)
-
hadoop3(namenode active)
八. MapReduce
8.1 计算
计算实际上也可以说是统计和分析,就是在大量的数据集中通过计算从而统计、分析出我们关注的数据,从而为我们创造相应的价值
8.2 如何解决大规模数据计算问题
- 抽样统计分析 ------> 结果不准确
- 全部数据统计 -------> 计算机性能要求极高
MapReduce就更好的解决了我们在处理大数据集的计算问题
8.2 MapReduce 引言
MapReduce是hadoop体系下的一种计算模型(计算框架|编程框架),主要是用来对存储在hdfs上的数据进行统计,分析的。
8.3 MapReduce的核心思想
Map Reduce:
计算框架分为两个部分: Map (局部统计) 和 Reduce (局部统计,汇总计算)
Yarn:
资源调度,任务监控 主要用来整合hadoop集群中的资源(CPU 内存),进行统一调度 同时监控任务的执行情况
Job作业:
一组MapReduce又统称为一个Job作业,在一个Hadoop集群中有很多job作业。
8.4 搭建yarn集群
8.4.1 配置mapred-site.xml
# 0.启动hdfs集群
# 1.复制 cp hadoop-2.9.2/etc/hadoop/mapred-site.xml.template hadoop-2.9.2/etc/hadoop/mapred-site.xml
# 2.编辑 vim hadoop-2.9.2/etc/hadoop/mapred-site.xml添加配置
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
8.4.2 配置yarn-site.xml
# 1.编辑 添加如下配置:
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>Hadoop</value>
</property>
上述yarn.resourcemanager.hostname配置中的:
hadoop
为当前机器的主机名
8.4.3 启动yarn
[root@hadoop hadoop]# start-yarn.sh
starting yarn daemons
starting resourcemanager, logging to /root/hadoop-2.9.2/logs/yarn-root-resourcemanager-hadoop.out
hadoop: starting nodemanager, logging to /root/hadoop-2.9.2/logs/yarn-root-nodemanager-hadoop.out
8.4.4 是否启动成功
[root@hadoop hadoop]# jps
4897 Jps
4724 NodeManager
3542 SecondaryNameNode
3367 DataNode
4619 ResourceManager
3229 NameNode
如果出现
NodeManager
和ResourceManager
这两个进程说明配置成功
8.4.5 mapreduce的web监控页面
http://10.15.0.4:8088/cluster
8.5 Job 作业总体流程
8.6 World Count 第一个案例
8.6.1 什么是Word Count
说明: World Count 简单说也叫单词计数统计,就是在一个文件中统计出每个单词出现的次数 如图:
- 说明: 解决上述问题,实际上使用曾经的 Core Java就可以很快的计算出来,为什么需要Hadoop呢?注意:如果数据非常少我们可以很快计算,但是如果数据非常大有1TB或者10TB这样的大文件我们可能很难计算结果。这个使用我们如果借助Hadoop为我们提供的Map Reduce 我们就能很好解决这个问题!
8.6.2 使用Map Reduce 完成 Word Count 的思路分析
8.6.3 Map Reduce 的第一个程序
-
准备数据上传的HDFS中
chenyn xiaohei xiaowang chenyn zhaoliu wangwu zhangsan xiaoming xiaochen chenyn chenyn xiaozhang xiaohei xiaoliu xiaozi xiaosun xiaochen
[root@hadoop5 ~]# touch data [root@hadoop5 ~]# vim data [root@hadoop5 ~]# hdfs dfs -mkdir -p /wordcount [root@hadoop5 ~]# hdfs dfs -put data /wordcount/data [root@hadoop5 ~]# hdfs dfs -cat /wordcount/data chenyn xiaohei xiaowang chenyn zhaoliu wangwu zhangsan xiaoming xiaochen chenyn chenyn xiaozhang xiaohei xiaoliu xiaozi xiaosun xiaochen
-
引入依赖
<properties> <hadoop.version>2.9.2</hadoop.version> </properties> <dependencies> <!--hadoop公共依赖--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> </dependency> <!--hadoop client 依赖--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> </dependency> <!--junit--> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <!--map reduce--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-common</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-jobclient</artifactId> <version>2.9.2</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> </dependencies>
-
开发Job作业编码
//word count job作业开发 public class WordCountJob extends Configured implements Tool { public static void main(String[] args) throws Exception { ToolRunner.run(new WordCountJob(),args); } @Override public int run(String[] strings) throws Exception { //创建job作业 Configuration conf = getConf(); Job job = Job.getInstance(conf); job.setJarByClass(WordCountJob.class); //设置Input Format job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job,new Path("/wordcount/data")); //设置map阶段 job.setMapperClass(WordCountMap.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //设置Shuffle 阶段 默认 //设置reduce 阶段 job.setReducerClass(WordCountReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //设置 Output Formate job.setOutputFormatClass(TextOutputFormat.class); //注意:要求结果目录不能存在 FileSystem fileSystem = FileSystem.get(conf); Path res = new Path("/wordcount/res"); if(fileSystem.exists(res)) { fileSystem.delete(res,true); } TextOutputFormat.setOutputPath(job, res); //提交job作业 boolean b = job.waitForCompletion(true); System.out.println("作业执行状态 = " + b); return 0; } //开发Map阶段 public static class WordCountMap extends Mapper<LongWritable, Text,Text, IntWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //value 就是读入的一行数据 String[] keys = value.toString().split(" "); for (String word : keys) { context.write(new Text(word),new IntWritable(1)); } } } //开发Reduce阶段 public static class WordCountReduce extends Reducer<Text,IntWritable,Text,IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum+=value.get(); } context.write(key,new IntWritable(sum)); } } }
注意:在执行mapreduce作业过程中,一定涉及到数据数据的序列化,hadoop对原始基本数据类型进行了二次包装
hadoop中包装类型 java原始数据类型 Text String LongWritable Long IntWritable Integer FloatWritable Float DoubleWritable Double -
将wordcount打成jar包
-
上传到hadoop集群并执行wordcount jar包
[root@hadoop5 ~]# hadoop jar hadoop_wrodcount-1.0-SNAPSHOT.jar com.baizhi.wordcount.WordCountJob
-
查看执行结果
[root@hadoop5 ~]# hdfs dfs -text /wordcount/res/part-r-00000 chenyn 4 wangwu 1 xiaochen 2 xiaohei 2 xiaoliu 1 xiaoming 1 xiaosun 1 xiaowang 1 xiaozhang 1 xiaozi 1 zhangsan 1 zhaoliu 1
8.7 MapReduce 自动化运行配置
8.7.1 打包时指定main Class信息
默认直接通过maven插件打成jar包中没有指定main class 信息,因此在运行mapreduce的jar包时必须在指令后面明确指定main class 的信息是谁,这样日后在执行mapreduce作业时会加大执行的难度,因此我们需要在打jar包时指定main class信息,减少执行作业时的操作,如果需要在打包中指定main class 信息:只需要对打包插件进行配置即可:
<build>
<plugins>
<!-- 在打包插件中指定main class 信息 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<outputDirectory>${basedir}/target</outputDirectory>
<archive>
<manifest>
<mainClass>com.baizhi.wordcount.WordCountJob</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
加入上述配置之后再打包,日后就可以直接执行jar包,不需要额外指定main class 信息:
执行命令: clean package
8.7.2 使用wagon插件实现自动上传至hadoop集群
<build>
<!--扩展maven的插件中加入ssh插件-->
<extensions>
<extension>
<groupId>org.apache.maven.wagon</groupId>
<artifactId>wagon-ssh</artifactId>
<version>2.8</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>wagon-maven-plugin</artifactId>
<version>1.0</version>
<configuration>
<fromFile>target/test.jar 或者 ${project.build.finalName}.jar</fromFile>
<url>scp://user:password@192.168.20.128/root</url>
</configuration>
</plugin>
</plugins>
</build>
打包后直接执行wagon uplod-single
即可:
执行命令操作: clean package wagon:upload-single
8.7.3 使用wagon上传jar完成后远程执行job作业
wagon配置加入commands命令
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>wagon-maven-plugin</artifactId>
<version>1.0</version>
<configuration>
<fromFile>target/${project.build.finalName}.jar</fromFile>
<url>scp://root:1@10.15.0.5/root</url>
<commands>
<!-- 通过sh 执行shell脚本文件 -->
<command>nohup hadoop-2.9.2/bin/hadoop jar hadoop_wordcount-1.0-SNAPSHOT.jar > /root/mapreduce.out 2>&1 & </command>
</commands>
<displayCommandOutputs>true</displayCommandOutputs>
</configuration>
</plugin>
执行命令操作:clean package wagon:upload-single wagon:sshexec
任务执行成功:
8.8 配置历史服务器调试Map Reduce
8.8.1 配置mapped-site.xml 并同步集群配置
<property>
<name>mapreduce.jobhistory.address</name>
<value>hadoop5:10020</value>
</property>
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>hadoop5:19888</value>
</property>
8.8.2 配置yarn-site.xml 并同步集群配置
<!--开启日志聚合-->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<!--日志保存时间 单位秒 这里是7天-->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>604800</value>
</property>
8.8.3 启动历史服务器
[root@hadoop6 ~]# mr-jobhistory-daemon.sh start historyserver
[root@hadoop6 ~]# mr-jobhistory-daemon.sh stop historyserver