一、 hadoop 离线部分
1. hadoop HA 集群都要启动哪些进程,他们的作用是什么
Namenode:(1) 维护文件系统的目录树,管理文件系统的 namespace、(2) 管理元数据信息、(3) 接收用户的请求
DFSZKFailoverController(ZKFC):负责namenode的故障切换
QuorumPeerMain:zookeeper 进程
DataNode:HDFS 的工作节点,负责实际的数据存储
ResourceManager:YARN 集群中负责资源协调和管理的进程
NodeManager:ResourceManager 在每台机器上的代理,负责容器管理,并监控它们的资源使用情况,以及向 ResourceManager/Scheduler 提供资源使用情况报告。
JournalNode:用来同步 active namenode 和standby namenode 之间的元数据
2. 简要说明下安装配置hadoop的步骤,描述即可,无需列出完整步骤
创建 hadoop 用户,安装 JDK,安装 SSH 并配置免密码通信
修改环境变量,配置 hosts
-
解压 hadoop 安装包,修改配置文件,分别是 core-site.xml、hdfs-site.xml、yarn-site.xml、mapred-site.xml、hadoop-env.sh、yarn-env.sh、slaves
其中:
core-site.xml 主要配置:(1) hdfs 的 namenode 的地址、(2) hadoop 运行时产生文件的存储目录
hdfs-site.xml 主要配置:(1) 副本数量、(2) namenode 的工作目录、(3) datanode 的工作目录、(3) secondary namenode 的地址
yarn-site.xml 主要配置:(1) 指定 ResourceManager 的地址、(2) reducer获取数据的方式
mapred-site.xml 主要配置:mapreduce 运行的 framework 为 yarn
hadoop-env.sh 和 yarn-env.sh 主要配置:JAVA_HOME
slaves:配置 slave 节点
将安装包 copy 到其他机器
执行 Hadoop namenode -format 命令
start-dfs.sh、start-yarn.sh 启动集群
3. Hadoop HA 模式下集群启动的步骤
下面以7台虚拟机为例进行说明:
安装时的初始化集群步骤
启动 zookeeper 集群。
格式化ZooKeeper集群,目的是在ZooKeeper集群上建立HA的相应节点。
/home/hadoop/hadoop/bin/hdfs zkfc –formatZK
启动JournalNode集群。
/home/hadoop/hadoop/sbin/hadoop-daemon.sh start journalnode
格式化master1的namenode。
/home/hadoop/hadoop/bin/hdfs namenode -format -clusterId hellokitty
启动master1上的namenode。
/home/hadoop/hadoop/sbin/hadoop-daemon.sh start namenode
在master1ha上执行,将master1上的namenode数据同步到master1ha上。
/home/hadoop/hadoop/bin/hdfs namenode -bootstrapStandby
启动master1ha的namenode。
/home/hadoop/hadoop/sbin/hadoop-daemon.sh start namenode
将master1的namenode置成active状态,分别在 active 和 standby 上执行。
/home/hadoop/hadoop/sbin/hadoop-daemon.sh start zkfc
格式化并启动第二套集群,步骤同 4 ~ 8。
启动所有的datanode。
/home/hadoop/hadoop/sbin/hadoop-daemon.sh start datanode
启动 yarn。
/home/hadoop/hadoop/sbin/start-yarn.sh
停止集群的顺序
- 在master1上执行:
stop-dfs.sh
,stop-yarn.sh
- 在master2上执行:
stop-yarn.sh
- 停止zookeeper
启动的顺序
- 启动zookeeper
- 启动journalnode
- 在master1上执行命令,启动 hdfs 和 yarn
- 在master2上执行命令,启动 yarn
3. 简要说明执行命令
-
杀死一个 job
yarn application -kill <applicationid>
-
删除 hdfs 上的 /tmp/aaa 目录
hdfs dfs -rm -r /tmp/aaa
-
加入一个新存储节点和移除一个节点需要刷新集群状态的命令
新节点加入:hadoop-daemon.sh start datanode、yarn-daemon.sh start nodemanager
移除节点:hdfs dfsadmin -refreshNodes、yarn rmadmin -refreshNodes
4. 列出 hadoop 的调度器,并说明工作方法
-
默认的调度器 FIFO
Hadoop 中默认的调度器,它先按照作业的优先级高低,再按照到达时间的先后选择被执行的作业。
-
计算能力调度器 Capacity Scheduler ( YARN 默认调度器 )
可以看作是 FIFO Scheduler 的多队列版本。每个队列可以限制资源使用量。但是,队列间的资源分配以使用量作排列依据,使得容量小的队列有竞争优势。调度时,首先按以下策略选择一个合适队列:计算每个队列中正在运行的任务数与其应该分得的计算资源之间的比值,选择一个该比值最小的队列;然后按以下策略选择该队列中一个作业:按照作业优先级和提交时间顺序选择,同时考虑用户资源量限制和内存限制。
-
公平调度器 Fair Scheduler
计算能力调度器类似,支持多队列多用户,每个队列中的资源量可以配置,==同一队列中的作业公平共享队列中所有资源。当单独一个作业在运行时,它将使用整个集群==。当有其它作业被提交上来时,系统会将任务(task)空闲资源(container)赋给这些新的作业,以使得每一个作业都大概获取到等量的CPU时间。
适用于异构集群的调度器LATE
现有的Hadoop调度器都是建立在同构集群的假设前提下,LATE建立在集群异构的情况。
- 适用于实时作业的调度器 Deadline Scheduler 和 Constraint-based Scheduler
这种调度器主要用于有时间限制的作业(Deadline Job),即给作业一个deadline时间,让它在该时间内完成。实际上,这类调度器分为两种,软实时(允许作业有一定的超时)作业调度器和 硬实时(作业必须严格按时完成)作业调度器。
- Deadline Scheduler主要针对的是软实时作业,该调度器根据作业的运行进度和剩余时间动态调整作业获得的资源量,以便作业尽可能的在deadline时间内完成。
- Constraint-based Scheduler主要针对的是硬实时作业,该调度器根据作业的deadline和当前系统中的实时作业运行情况,预测新提交的实时作业能不能在deadline时间内完成,如果不能,则将作业反馈给用户,让他重调整作业的deadline。
5. hive 保存元数据有哪些方式,各有哪些特点
三种方式:1. 内存数据库 derby、2. 本地 metastore、3. 远程 metastore
内存数据库 derby
使用derby存储方式时,运行hive会在当前目录生成一个derby文件和一个metastore_db目录。==这种存储方式的弊端是在同一个目录下同时只能有一个hive客户端能使用数据库,否则会提示如下错误(这是一个很常见的错误)。==
hive> show tables;
FAILED:
Error in metadata: javax.jdo.JDOFatalDataStoreException: Failed to
start database 'metastore_db', see the next exception for details.
NestedThrowables:
java.sql.SQLException: Failed to start database 'metastore_db', see the next exception for details.
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask
本地和远程 metastore
本地元存储和远程元存储的区别是:本地元存储不需要单独起 metastore 服务,用的是跟 hive 在同一个进程里的 metastore 服务。远程元存储需要单独起 metastore 服务,然后每个客户端都在配置文件里配置连接到该 metastore 服务。远程元存储的 metastore 服务和 hive 运行在不同的进程里。在生产环境中,建议用远程元存储来配置 Hive Metastore。
6. mapreduce 如何实现二次排序
我是对的,可以参考 mapreduce-demos 的 secondarysort 类。
可以考虑将二次排序的字段组合在一起,形成组合 key,可以考虑封装对象,实现 WritableComparable 接口。
定义分区的 Partitioner 类,继承 org.apache.hadoop.mapreduce.Partitioner 类,根据 key 的第一个字段进行分区。
定义分组函数类 GroupingComparator,让相同 key 的数据变成一组,进入到 reduce。
main 方法中设置
job.setGroupingComparatorClass(XXX.class);
==TODO: 还有一种方法要补充。记得有一个 sort 什么的东西==
7. mapreduce 实现 join 的几种方法
1. map side join:
两份数据中,如果有一份数据比较小,小数据全部加载到内存,按关键字建立索引。大数据文件作为 map 的输入文件,对 map() 函数每一对输入,都能够方便地和已加载到内存的小数据进行连接。把连接结果按 key 输出,经过 shuffle 阶段,reduce 端得到的就是已经按 key 分组的,并且连接好了的数据。
这种方法,要使用 hadoop 中的 DistributedCache 把小数据分布到各个计算节点,每个 map 节点都要把小数据库加载到内存,按关键字建立索引。
但这种方法有明显的局限性:有一份数据比较小,在map端,能够把它加载到内存,并进行join操作。
2. reduce side join:
在 map 阶段, 把关键字作为 key 输出,并在 value 中标记出数据是来自 data1 还是 data2。因为在 shuffle 阶段已经自然按 key 分组,reduce 阶段,判断每一个 value 是来自 data1 还是 data2,在内部分成 2 组,做集合的乘积。
但这种方法有 2 个问题:
- map阶段没有对数据瘦身,shuffle的网络传输和排序性能很低。
- reduce端对2个集合做乘积计算,很耗内存,容易导致OOM。
3. 使用内存服务器,扩大节点的内存空间
针对 map join,可以把一份数据存放到专门的内存服务器,在 map() 方法中,对每一个的输入对,根据 key 到内存服务器中取出数据,进行连接。
4. 使用 BloomFilter 过滤空连接的数据,实现 reduce 端 join
对其中一份数据在内存中建立 BloomFilter,另外一份数据在连接之前,用 BloomFilter 判断它的 key 是否存在,如果不存在,那这个记录是空连接,可以忽略。
适用场景
连接的列数据量很大,在分布式缓存中无法存储时,Bloom Filter 可解决这个问题,用很小的内存可有 MAP 端过滤掉不需要 JOIN 的数据,这样传到 REDUCE 的数据量减少,减少了网络传及磁盘 IO。
缺点:Bloom Filter 会有一定的错误率,但是错误率很低,用空间换取了时间。并且,最终的 JOIN 在 REDUCE 端还要进行比对,所以对最终结果无影响。
8. hive 的内部表和外部表的区别
内部表导入数据会将 hdfs 中的文件直接移动到 hive 的 warehouse 目录中去,相当于是剪切。当要删除这个内部表的时候,warehouse 中存储的文件也会一起删除,元数据信息也会删除。
外部表导入数据时数据不会移动到 hive 的数据仓库中,数据还是在 hdfs 中,当要删除外部表的时候,仅仅只是删除了元数据,hdfs 中的数据文件还是存在的。
9. hbase 的 rowkey 如何设计,列族如何设计
10. mapreduce 如何解决 shuffle 中的数据倾斜问题
详细可以参考美团技术博客。
方案一:对 key 进行打散
(1) 设置一个hash份数N,用来对条数众多的key进行打散。
(2) 对有多条重复 key 的那份数据进行处理:从 1 到 N 将数字加在 key 后面作为新 key,如果需要和另一份数据关联的话,则要重写比较类和分发类,如此实现多条 key 的平均分发。
(3) 上一步之后,key 被平均分散到很多不同的 reduce 节点。如果需要和其他数据关联,为了保证每个 reduce 节点上都有关联的 key,对另一份单一key的数据进行处理:循环的从1到N将数字加在 key 后面作为新 key
以此解决数据倾斜的问题,经试验大大减少了程序的运行时间。但此方法会成倍的增加其中一份数据的数据量,以增加 shuffle 数据量为代价,所以使用此方法时,要多次试验,取一个最佳的 hash 份数值。
用上述的方法虽然可以解决数据倾斜,但是当关联的数据量巨大时,如果成倍的增长某份数据,会导致 reduce shuffle 的数据量变的巨大,得不偿失,从而无法解决运行时间慢的问题。
方案二:增加 reduce 个数
针对唯一值较多,单个唯一值的记录数不会超过分配给 reduce 的内存的情况。如果发生了偶尔的数据倾斜情况,增加 reduce 个数可以缓解偶然情况下的某些 reduce 不小心分配了多个较多记录数的情况,但是对于第一种数据分布,单个值有大量记录, 这种值的所有纪录已经超过了分配给reduce 的内存的情况无效。
方案三:增加 reduce 的内存
增加 reduce 的内存大小显然有改善数据倾斜的可能,这种方式尤其适合数据分布第一种情况,单个值有大量记录, 这种值的所有纪录已经超过了分配给reduce 的内存,无论你怎么样分区这种情况都不会改变。当然这种情况的限制也非常明显, 1. 内存的限制存在,2. 可能会对集群其他任务的运行产生不稳定的影响。
方案四:自定义 partitioner
对数据进行手工做 sample 采样,观察数据的分布情况,然后根据数据分布自定义 partitioner。这种情况需要开发人员熟悉数据的分布类型。这种方式的缺点为:1. 手工做sample 非常耗时间,需要使用者对查询使用的数据集的分布有领域知识。2. 分配方式是死的,reduce 个数是确定的,一旦某种情况下发生倾斜,很难修正。