[if !supportLists]第1章 [endif]Cloudera Manager
1.1 CM简介
1.1.1 CM简介
Cloudera Manager是一个拥有集群自动化安装、中心化管理、集群监控、报警功能的一个工具,使得安装集群从几天的时间缩短在几个小时内,运维人员从数十人降低到几人以内,极大的提高集群管理的效率。
1.1.2 CM架构
1.2 环境准备
1.2.1 虚拟机准备
克隆三台虚拟机(hadoop102、hadoop103、hadoop104),配置好对应主机的网络IP、主机名称、关闭防火墙。
设置hadoop102、hadoop103、hadoop104的主机对应内存分别是:16G、4G、4G
1.2.2 SSH免密登录
配置hadoop102对hadoop102、hadoop103、hadoop104三台服务器免密登录。
1)生成公钥和私钥:
[root@hadoop102 .ssh]$
ssh-keygen -t rsa
然后敲(三个回车),就会生成两个文件id_rsa(私钥)、id_rsa.pub(公钥)
2)将公钥拷贝到要免密登录的目标机器上
[root@hadoop102 .ssh]$
ssh-copy-id hadoop102
[root@hadoop102 .ssh]$
ssh-copy-id hadoop103
[root@hadoop102 .ssh]$
ssh-copy-id hadoop104
3)重复1和2的操作,配置hadoop103对hadoop102、hadoop103、hadoop104三台服务器免密登录。
1.2.3 集群同步脚本
1)在/root目录下创建bin目录,并在bin目录下创建文件xsync,文件内容如下:
[root@hadoop102 ~]$ mkdir bin
[root@hadoop102 ~]$ cd bin/
[root@hadoop102 bin]$ vi xsync
在该文件中编写如下代码
#!/bin/bash
#1 获取输入参数个数,如果没有参数,直接退出
pcount=$#
if((pcount==0)); then
echo no args;
exit;
fi
#2 获取文件名称
p1=$1
fname=`basename $p1`
echo fname=$fname
#3 获取上级目录到绝对路径
pdir=`cd -P $(dirname $p1); pwd`
echo pdir=$pdir
#4 获取当前用户名称
user=`whoami`
#5 循环
for((host=103; host<105; host++)); do
echo ------------------- hadoop$host --------------
rsync -av $pdir/$fname $user@hadoop$host:$pdir
done
2)修改脚本 xsync 具有执行权限
[root@hadoop102 bin]$ chmod 777 xsync
1.2.4 安装JDK(三台)
1)在hadoop102的/opt目录下创建module和software文件夹
[root@hadoop102 opt]# mkdir module
[root@hadoop102 opt]# mkdir software
2)用SecureCRT将jdk-8u144-linux-x64.tar.gz导入到hadoop102的/opt/software目录
3)在Linux系统下的opt目录中查看软件包是否导入成功
[root@hadoop102 software]$ ls
jdk-8u144-linux-x64.tar.gz
4)解压JDK到/opt/module目录下,并修改文件的所有者和所有者组为root
[root@hadoop102 software]$ tar -zxvf
jdk-8u144-linux-x64.tar.gz -C /opt/module/
[root@hadoop102 module]# chown root:root
jdk1.8.0_144/ -R
5)配置JDK环境变量
(1)打开/etc/profile文件
[root@hadoop102 software]$ vi
/etc/profile
在profile文件末尾添加JDK路径
#JAVA_HOME
export JAVA_HOME=/opt/module/jdk1.8.0_144
export PATH=$PATH:$JAVA_HOME/bin
(2)让修改后的文件生效
[root@hadoop102 jdk1.8.0_144]$ source
/etc/profile
6)测试JDK是否安装成功
[root@hadoop102 jdk1.8.0_144]# java
-version
java version "1.8.0_144"
7)将hadoop102中的JDK和环境变量分发到hadoop103、hadoop104两台主机
[root@hadoop102 opt]# xsync /opt/module/
[root@hadoop102 opt]# xsync /etc/profile
分别在hadoop103、hadoop104上source一下
[root@hadoop103 ~]$ source /etc/profile
[root@hadoop104 ~]# source /etc/profile
1.2.5 集群整体操作脚本
1)在/root/bin目录下创建脚本xcall.sh
[root@hadoop102 bin]$ vim xcall.sh
2)在脚本中编写如下内容
#! /bin/bash
for i in hadoop102 hadoop103 hadoop104
do
echo --------- $i ----------
ssh $i "$*"
done
3)修改脚本执行权限
[root@hadoop102 bin]$ chmod 777 xcall.sh
4)将/etc/profile文件追加到~/.bashrc后面
[root@hadoop102 module]# cat /etc/profile>> ~/.bashrc
[root@hadoop103 module]# cat /etc/profile>> ~/.bashrc
[root@hadoop104 module]# cat /etc/profile>> ~/.bashrc
5)测试
[root@hadoop102 bin]# xcall.sh jps
1.2.6 安装MySQL
注意:一定要用root用户操作如下步骤;先卸载MySQL再安装
1)安装包准备
(1)查看MySQL是否安装
[root@hadoop102 桌面]# rpm
-qa|grep mysql
mysql-libs-5.1.73-7.el6.x86_64
(2)如果安装了MySQL,就先卸载
[root@hadoop102 桌面]#
rpm -e --nodeps mysql-libs-5.1.73-7.el6.x86_64
(3)上传mysql-libs.zip到hadoop102的/opt/software目录,并解压文件到当前目录
[root@hadoop102 software]# unzip
mysql-libs.zip
[root@hadoop102 software]# ls
mysql-libs.zip
mysql-libs
(4)进入到mysql-libs文件夹下
[root@hadoop102 mysql-libs]#ll
总用量76048
-rw-r--r--. 1 root root 18509960 3月 26 2015 MySQL-client-5.6.24-1.el6.x86_64.rpm
-rw-r--r--. 1 root root 3575135 12月 1 2013 mysql-connector-java-5.1.27.tar.gz
-rw-r--r--. 1 root root 55782196 3月 26 2015 MySQL-server-5.6.24-1.el6.x86_64.rpm
2)安装MySQL服务器
(1)安装MySQL服务端
[root@hadoop102 mysql-libs]# rpm -ivh
MySQL-server-5.6.24-1.el6.x86_64.rpm
(2)查看产生的随机密码
[root@hadoop102 mysql-libs]# cat /root/.mysql_secret
OEXaQuS8IWkG19Xs
(3)查看MySQL状态
[root@hadoop102 mysql-libs]# service mysql status
(4)启动MySQL
[root@hadoop102 mysql-libs]# service mysql start
3)安装MySQL客户端
(1)安装MySQL客户端
[root@hadoop102 mysql-libs]# rpm -ivh MySQL-client-5.6.24-1.el6.x86_64.rpm
(2)链接MySQL(密码替换成产生的随机密码)
[root@hadoop102 mysql-libs]# mysql -uroot -pOEXaQuS8IWkG19Xs
(3)修改密码
mysql>SET PASSWORD=PASSWORD('000000');
(4)退出MySQL
mysql>exit
4)MySQL中user表中主机配置
配置只要是root用户+密码,在任何主机上都能登录MySQL数据库。
(1)进入MySQL
[root@hadoop102 mysql-libs]# mysql -uroot -p000000
(2)显示数据库
mysql>show databases;
(3)使用MySQL数据库
mysql>use mysql;
(4)展示MySQL数据库中的所有表
mysql>show tables;
(5)展示user表的结构
mysql>desc user;
(6)查询user表
mysql>select User, Host, Password from user;
(7)修改user表,把Host表内容修改为%
mysql>update user set host='%' where host='localhost';
(8)删除root用户的其他host
mysql>
delete from user where Host='hadoop102';
delete from user where Host='127.0.0.1';
delete from user where Host='::1';
(9)刷新
mysql>flushprivileges;
(10)退出
mysql>quit;
1.2.7 创建CM用的数据库
在MySQL中依次创建监控数据库、Hive数据库、Oozie数据库、Hue数据库
1)启动数据库
[root@hadoop102 ~]# mysql -uroot -p000000
2)集群监控数据库
mysql> create database amon DEFAULT CHARSET utf8 COLLATE
utf8_general_ci;
3)Hive数据库
mysql> create database hive DEFAULT CHARSET utf8 COLLATE
utf8_general_ci;
4)Oozie数据库
mysql> create database oozie DEFAULT CHARSET utf8 COLLATE
utf8_general_ci;
5)Hue数据库
DEFAULT CHARSET utf8 COLLATE utf8_general_ci;
6)关闭数据库
mysql> quit;
1.2.8 下载第三方依赖
依次在三台节点(所有Agent的节点)上执行下载第三方依赖(注意:需要联网)
[root@hadoop102 ~]# yum -y
install chkconfig python bind-utils psmisc libxslt zlib sqlite cyrus-sasl-plain
cyrus-sasl-gssapi fuse fuse-libs redhat-lsb
[root@hadoop103 ~]# yum -y
install chkconfig python bind-utils psmisc libxslt zlib sqlite cyrus-sasl-plain
cyrus-sasl-gssapi fuse fuse-libs redhat-lsb
[root@hadoop104 ~]# yum -y
install chkconfig python bind-utils psmisc libxslt zlib sqlite cyrus-sasl-plain
cyrus-sasl-gssapi fuse fuse-libs redhat-lsb
1.2.9 关闭SELINUX
安全增强型Linux(Security-Enhanced Linux)简称SELinux,它是一个 Linux 内核模块,也是Linux的一个安全子系统。为了避免安装过程出现各种错误,建议关闭,有如下两种关闭方法:
1)临时关闭(不建议使用)
[root@hadoop102 ~]# setenforce 0
但是这种方式只对当次启动有效,重启机器后会失效。
2)永久关闭(建议使用)
修改配置文件/etc/selinux/config
[root@hadoop102 ~]# vim
/etc/selinux/config
将SELINUX=enforcing 改为SELINUX=disabled
SELINUX=disabled
3)同步/etc/selinux/config配置文件
[root@hadoop102 ~]# xsync
/etc/selinux/config
4)重启hadoop102、hadoop103、hadoop104主机
[root@hadoop102 ~]# reboot
[root@hadoop103 ~]# reboot
[root@hadoop104 ~]# reboot
1.2.10 配置NTP时钟同步
1)NTP服务器配置
[root@hadoop102 ~]# vi /etc/ntp.conf
①注释掉所有的restrict开头的配置
②修改#restrict
192.168.1.0 mask 255.255.255.0 nomodify notrap
为restrict
192.168.1.102 mask 255.255.255.0 nomodify notrap
③将所有server配置进行注释
④添加下面两行内容
server
127.127.1.0
fudge 127.127.1.0
stratum 10
2)启动NTP服务service ntpd start
[root@hadoop102 ~]# service ntpd start
3)NTP客户端配置(在agent主机上进行配置hadoop103,hadoop104)
[root@hadoop103 ~]# vi /etc/ntp.conf
①注释所有restrict和server配置
②添加server 192.168.1.102
4)手动测试
[root@hadoop103~]# ntpdate 192.168.1.102
显示如下内容为成功:
17 Jun 15:34:38 ntpdate[9247]: step time
server 192.168.1.102 offset 77556618.173854 sec
如果显示如下内容需要先关闭ntpd:
17 Jun 15:25:42 ntpdate[8885]: the NTP
socket is in use, exiting
5)启动ntpd并设置为开机自启(每个节点hadoop102,hadoop103,hadoop104)
[root@hadoop103 ~]# chkconfig ntpd on
[root@hadoop103 ~]# service ntpd start
6)使用群发date命令查看结果
1.3 CM安装部署
1.3.1 CM下载地址
1)CM下载地址:http://archive.cloudera.com/cm5/cm/5/
2)离线库下载地址:http://archive.cloudera.com/cdh5/parcels
1.3.2 CM安装
注:以下所有操作均使用root用户
1)创建/opt/module/cm目录
[root@hadoop102 module]# mkdir –p
/opt/module/cm
2)上传cloudera-manager-el6-cm5.12.1_x86_64.tar.gz到hadoop102的/opt/software目录,并解压到/opt/module/cm目录
[root@hadoop102 software]# tar -zxvf
cloudera-manager-el6-cm5.12.1_x86_64.tar.gz -C /opt/module/cm
3)分别在hadoop102、hadoop103、hadoop104创建用户cloudera-scm
[root@hadoop102 module]#
useradd \
--system \
--home=/opt/module/cm/cm-5.12.1/run/cloudera-scm-server
\
--no-create-home \
--shell=/bin/false \
--comment "Cloudera SCM User"
cloudera-scm
[root@hadoop103 module]#
useradd \
--system \
--home=/opt/module/cm/cm-5.12.1/run/cloudera-scm-server
\
--no-create-home \
--shell=/bin/false \
--comment "Cloudera SCM User" cloudera-scm
[root@hadoop104 module]#
useradd \
--system \
--home=/opt/module/cm/cm-5.12.1/run/cloudera-scm-server
\
--no-create-home \
--shell=/bin/false \
--comment "Cloudera SCM User"
cloudera-scm
参数说明:
--system 创建一个系统账户
--home 指定用户登入时的主目录,替换系统默认值/home/<用户名>
--no-create-home 不要创建用户的主目录
--shell 用户的登录 shell 名
--comment 用户的描述信息
注意:Cloudera
Manager默认去找用户cloudera-scm,创建完该用户后,将自动使用此用户。
4)修改CM Agent配置
修改文件/opt/module/cm/cm-5.12.1/etc/cloudera-scm-agent/
config.ini的主机名称
[root@hadoop102 cloudera-scm-agent]# vim
/opt/module/cm/cm-5.12.1/etc/cloudera-scm-agent/config.ini
修改主机名称
server_host=hadoop102
5)配置CM的数据库
拷贝mysql-connector-java-5.1.27-bin.jar文件到目录/usr/share/java/
[root@hadoop102 cm]# mkdir –p /usr/share/java/
[root@hadoop102 mysql-libs]# tar -zxvf
mysql-connector-java-5.1.27.tar.gz
[root@hadoop102 mysql-libs]# cp
/opt/software/mysql-libs/mysql-connector-java-5.1.27/mysql-connector-java-5.1.27-bin.jar
/usr/share/java/
[root@hadoop102 mysql-libs]# mv
/usr/share/java/mysql-connector-java-5.1.27-bin.jar
/usr/share/java/mysql-connector-java.jar
注意:jar包名称要修改为mysql-connector-java.jar
6
)使用CM自带的脚本,在MySQL中创建CM库
[root@hadoop102 cm-5.12.1]#
/opt/module/cm/cm-5.12.1/share/cmf/schema/scm_prepare_database.sh
mysql cm -hhadoop102-uroot -p000000 --scm-host hadoop102 scm
scm scm
参数说明
-h:Database host
-u:Databaseusername
-p:DatabasePassword
--scm-host:SCM server's hostname
7)分发cm
[root@hadoop102 module]# xsync
/opt/module/cm
8)创建Parcel-repo
[root@hadoop102 module]# mkdir -p /opt/cloudera/parcel-repo
[root@hadoop102 module]# chown
cloudera-scm:cloudera-scm /opt/cloudera/parcel-repo
9)拷贝下载文件manifest.json、CDH-5.12.1-1.cdh5.12.1.p0.3-el6.parcel.sha1、CDH-5.12.1-1.cdh5.12.1.p0.3-el6.parcel到hadoop102的/opt/cloudera/parcel-repo/目录下
[root@hadoop102 parcel-repo]# ls
CDH-5.12.1-1.cdh5.12.1.p0.3-el6.parcel CDH-5.12.1-1.cdh5.12.1.p0.3-el6.parcel.sha1
manifest.json
10)将CDH-5.12.1-1.cdh5.12.1.p0.3-el6.parcel.sha1:需改名为
CDH-5.12.1-1.cdh5.12.1.p0.3-el6.parcel.sha
[root@hadoop102 parcel-repo]# mv
CDH-5.12.1-1.cdh5.12.1.p0.3-el6.parcel.sha1
CDH-5.12.1-1.cdh5.12.1.p0.3-el6.parcel.sha
11)在hadoop102上创建目录/opt/cloudera/parcels,并修改该目录的所属用户及用户组为cloudera-scm
[root@hadoop102 module]# mkdir -p
/opt/cloudera/parcels
[root@hadoop102 module]# chown
cloudera-scm:cloudera-scm /opt/cloudera/parcels
12)分发/opt/cloudera/
[root@hadoop102 opt]# xsync
/opt/cloudera/
1.3.3 启动CM服务
1)启动服务节点:hadoop102
[root@hadoop102 cm]# /opt/module/cm/cm-5.12.1/etc/init.d/cloudera-scm-server
start
Starting cloudera-scm-server: [确定]
2)启动工作节点:hadoop102、hadoop103、hadoop104
[root@hadoop102 cm]# /opt/module/cm/cm-5.12.1/etc/init.d/cloudera-scm-agent
start
[root@hadoop103 cm]#/opt/module/cm/cm-5.12.1/etc/init.d/cloudera-scm-agent start
[root@hadoop104 cm]#/opt/module/cm/cm-5.12.1/etc/init.d/cloudera-scm-agent start
注意:启动过程非常慢,Manager启动成功需要等待5分钟左右,过程中会在数据库中创建对应的表需要耗费一些时间。
3)查看被占用则表示安装成功了!!!
[root@hadoop102 cm]# netstat -anp | grep
7180
tcp 0 0 0.0.0.0:7180 0.0.0.0:* LISTEN 5498/java
4)访问http://hadoop102:7180,(用户名、密码:admin)
1.3.4 关闭CM服务
1)关闭工作节点:hadoop102、hadoop103、hadoop104
[root@hadoop102 cm]# /opt/module/cm/cm-5.12.1/etc/init.d/cloudera-scm-agent
stop
Stopping cloudera-scm-agent: [确定]
[root@hadoop103 cm]#
/opt/module/cm/cm-5.12.1/etc/init.d/cloudera-scm-agent stop
Stopping cloudera-scm-agent: [确定]
[root@hadoop104 cm]#
/opt/module/cm/cm-5.12.1/etc/init.d/cloudera-scm-agent stop
Stopping cloudera-scm-agent: [确定]
2)关闭服务节点:hadoop102
[root@hadoop102 cm]#
/opt/module/cm/cm-5.12.1/etc/init.d/cloudera-scm-server stop
停止cloudera-scm-server: [确定]
1.4 CM的集群部署
1.4.1 接受条款和协议
1.4.2 选择免费
1.4.3 指定主机
1.4.4 选择CDH的版本5.12.1
1.4.5 等待下载安装
第2章数据采集模块
2.1 HDFS、YARN、Zookeeper安装
2.1.1 选择自定义安装
2.1.2 选择安装服务
2.1.3 分配节点
2.1.4 集群设置全部选默认即可
2.1.5 自动启动进程
2.1.6 修改HDFS的权限检查配置
关闭HDFS中的权限检查:dfs.permissions。
2.1.7 配置Hadoop支持LZO
1)点击主机,在下拉菜单中点击Parcel
2)点击配置
3)找到远程Parcel存储库URL,点击最后一栏的加号,增加一栏,输入http://archive.cloudera.com/gplextras/parcels/latest/,之后点击保存更改
4)返回Parcel列表,可以看到多出了LZO,选择下载,下载完成后选择分配,分配完成后选择激活。
5)安装完LZO后,打开HDFS配置,找到“压缩编码解码器”一栏,点击加号,添加com.hadoop.compression.lzo.LzopCodec后保存更改
6)打开YARN配置,找到MR 应用程序 Classpath,添加/opt/cloudera/parcels/HADOOP_LZO/lib/hadoop/lib/hadoop-lzo-cdh4-0.4.15-gplextras.jar
7)更新过期配置,重启进程
2.2 Flume安装
2.2.1 日志采集Flume安装
1)添加服务
2)选择Flume,点击继续
3)选择节点
4)完成
2.2.2 日志采集Flume配置
1)Flume配置分析
Flume直接读log日志的数据,log日志的格式是app-yyyy-mm-dd.log。
2)Flume的具体配置如下:
(1)在CM管理页面上点击Flume
(2)在实例页面选择hadoop102上的Agent
(3)在CM管理页面hadoop102上Flume的配置中找到代理名称改为a1
(4)在配置文件如下内容(flume-kafka)
a1.sources=r1
a1.channels=c1 c2
a1.sinks=k1 k2
# configure source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile =
/opt/module/flume/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/logs/app.+
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2
#interceptor
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.atguigu.flume.interceptor.LogTypeInterceptor$Builder
# selector
a1.sources.r1.selector.type =
multiplexing
a1.sources.r1.selector.header = topic
a1.sources.r1.selector.mapping.topic_start = c1
a1.sources.r1.selector.mapping.topic_event = c2
# configure channel
a1.channels.c1.type = memory
a1.channels.c1.capacity=10000
a1.channels.c1.byteCapacityBufferPercentage=20
a1.channels.c2.type = memory
a1.channels.c2.capacity=10000
a1.channels.c2.byteCapacityBufferPercentage=20
# configure sink
# start-sink
a1.sinks.k1.type =
org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = topic_start
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.flumeBatchSize = 2000
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.channel = c1
# event-sink
a1.sinks.k2.type =
org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.kafka.topic = topic_event
a1.sinks.k2.kafka.bootstrap.servers =
hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k2.kafka.flumeBatchSize = 2000
a1.sinks.k2.kafka.producer.acks = 1
a1.sinks.k2.channel = c2
注意:com.atguigu.flume.interceptor.LogETLInterceptor和com.atguigu.flume.interceptor.LogTypeInterceptor是自定义的拦截器的全类名。需要根据用户自定义的拦截器做相应修改。
(3)修改/opt/module/flume/log_position.json的读写权限
[root@hadoop102 module]# mkdir -p
/opt/module/flume
[root@hadoop102 flume]# touch
log_position.json
[root@hadoop102 flume]# chmod 777
log_position.json
[root@hadoop102 module]# xsync flume/
注意:Json文件的父目录一定要创建好,并改好权限
2.2.3 Flume拦截器
本项目中自定义了两个拦截器,分别是:ETL拦截器、日志类型区分拦截器。
ETL拦截器主要用于,过滤时间戳不合法和Json数据不完整的日志
日志类型区分拦截器主要用于,将启动日志和事件日志区分开来,方便发往Kafka的不同Topic。
1)创建Maven工程flume-interceptor
2)创建包名:com.atguigu.flume.interceptor
3)在pom.xml文件中添加如下配置
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
4)在com.atguigu.flume.interceptor包下创建LogETLInterceptor类名
Flume ETL拦截器LogETLInterceptor
package com.atguigu.flume.interceptor;
import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.Charset;import java.util.ArrayList;import java.util.List;
public class LogETLInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
// 1 获取数据
byte[] body = event.getBody();
String log =new String(body, Charset.forName("UTF-8"));
// 2 判断数据类型并向Header中赋值
if (log.contains("start")) {
if (LogUtils.validateStart(log)){
return event;
}
}else {
if (LogUtils.validateEvent(log)){
return event;
}
}
// 3 返回校验结果
return null;
}
@Override
public Listintercept(List events) {
ArrayListinterceptors =new ArrayList<>();
for (Event event : events) {
Event intercept1 =intercept(event);
if (intercept1 != null){
interceptors.add(intercept1);
}
}
return interceptors;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new LogETLInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
4)Flume日志过滤工具类
package com.atguigu.flume.interceptor;import org.apache.commons.lang.math.NumberUtils;
public class LogUtils {
public static boolean validateEvent(String log) {
// 服务器时间| json
// 1549696569054 |{"cm":{"ln":"-89.2","sv":"V2.0.4","os":"8.2.0","g":"M67B4QYU@gmail.com","nw":"4G","l":"en","vc":"18","hw":"1080*1920","ar":"MX","uid":"u8678","t":"1549679122062","la":"-27.4","md":"sumsung-12","vn":"1.1.3","ba":"Sumsung","sr":"Y"},"ap":"weather","et":[]}
// 1切割
String[] logContents = log.split("\\|");
// 2 校验
if(logContents.length != 2){
return false;
}
//3 校验服务器时间
if (logContents[0].length()!=13 || !NumberUtils.isDigits(logContents[0])){
return false;
}
// 4 校验json
if (!logContents[1].trim().startsWith("{") || !logContents[1].trim().endsWith("}")){
return false;
}
return true;
}
public static boolean validateStart(String log) {
if (log == null){
return false;
}
// 校验json
if (!log.trim().startsWith("{") || !log.trim().endsWith("}")){
return false;
}
return true;
}
}
5)Flume日志类型区分拦截器LogTypeInterceptor
package com.atguigu.flume.interceptor;
import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.Charset;import java.util.ArrayList;import java.util.List;import java.util.Map;
public class LogTypeInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
// 区分日志类型: body header
// 1获取body数据
byte[] body = event.getBody();
String log =new String(body, Charset.forName("UTF-8"));
// 2 获取header
Map headers =event.getHeaders();
// 3 判断数据类型并向Header中赋值
if (log.contains("start")) {
headers.put("topic","topic_start");
}else {
headers.put("topic","topic_event");
}
return event;
}
@Override
public List intercept(Listevents) {
ArrayListinterceptors =new ArrayList<>();
for (Event event : events) {
Event intercept1 =intercept(event);
interceptors.add(intercept1);
}
return interceptors;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new LogTypeInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
6)打包
拦截器打包之后,只需要单独包,不需要将依赖的包上传。打包之后要放入flume的lib文件夹下面。
注意:为什么不需要依赖包?因为依赖包在flume的lib目录下面已经存在了。
7)采用root用户将flume-interceptor-1.0-SNAPSHOT.jar包放入到hadoop102的/opt/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/flume-ng/lib/文件夹下面。
[root@hadoop102
lib]# ls | grep interceptor
flume-interceptor-1.0-SNAPSHOT.jar
8)分发Flume到hadoop103
[root@hadoop102
lib]$ xsync flume-interceptor-1.0-SNAPSHOT.jar
9)启动Flume
10)查看Flume日志
2.3 Kafka安装
可以选择在线安装和离线包安装,在线安装下载时间较长,离线包安装时间较短。这里我们为了节省时间,选择离线安装。
2.3.1 导入离线包
1)在hadoop102上创建/opt/cloudera/csd目录
[root@hadoop102 parcel-repo]# mkdir -p
/opt/cloudera/csd
2)上传KAFKA-1.2.0.jar到/opt/cloudera/csd目录,并修改所有者和所有者的组
[root@hadoop102 cloudera]# chown
cloudera-scm:cloudera-scm /opt/cloudera/csd/ -R
3)上传KAFKA-4.0.0-1.4.0.0.p0.1-el6.parcel、KAFKA-4.0.0-1.4.0.0.p0.1-el6.parcel.sha1到/opt/cloudera/parcel-repo目录,并修改KAFKA-4.0.0-1.4.0.0.p0.1-el6.parcel.sha1名称为KAFKA-4.0.0-1.4.0.0.p0.1-el6.parcel.sha
[root@hadoop102 parcel-repo]# mv
/opt/cloudera/parcel-repo/KAFKA-4.0.0-1.4.0.0.p0.1-el6.parcel.sha1
/opt/cloudera/parcel-repo/KAFKA-4.0.0-1.4.0.0.p0.1-el6.parcel.sha
4)ClouderManager中选择Parcel->检查Parcel->Kafka点击分配->激活
5)ClouderManager中选择Parcel->检查Parcel->Kafka点击分配->激活
2.3.2 在线下载安装包(不选)
1)点击主机,选择Parcel
2)找到Kafka点击下载,下载完成后点击分配进行分配,然后点击激活,出现已分配,已激活则证明分配激活成功
2.3.3 Kafka安装
[if !supportLists]3) [endif]回到首页,点击添加服务
4)选择Kafka,点击继续
5)Kafka的Broker选择三台机器
6)修改Kafka的堆大小为256M
7)完成
8)重新部署客户端配置
9)点击重启过时服务
10)等待重启,重启完成后点击完成
2.3.4 查看Kafka Topic
[root@hadoop102 KAFKA-4.0.0-1.4.0.0.p0.1]#
/opt/cloudera/parcels/KAFKA-4.0.0-1.4.0.0.p0.1/bin/kafka-topics--zookeeper hadoop102:2181 --list
2.2.5 创建Kafka Topic
进入到/opt/cloudera/parcels/KAFKA-4.0.0-1.4.0.0.p0.1目录下分别创建:启动日志主题、事件日志主题。
1)创建启动日志主题
[root@hadoop102 KAFKA-4.0.0-1.4.0.0.p0.1]$ bin/kafka-topics--zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181 --create --replication-factor 1 --partitions1 --topictopic_start
2)创建事件日志主题
[root@hadoop102 KAFKA-4.0.0-1.4.0.0.p0.1]$ bin/kafka-topics--zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181 --create --replication-factor 1 --partitions1 --topictopic_event
2.3.6 删除Kafka Topic
1)删除启动日志主题
[root@hadoop102 KAFKA-4.0.0-1.4.0.0.p0.1]$ bin/kafka-topics
--delete --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181 --topic topic_start
2)删除事件日志主题
[root@hadoop102 KAFKA-4.0.0-1.4.0.0.p0.1]$ bin/kafka-topics
--delete --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181 --topic topic_event
2.3.7 生产消息
[root@hadoop102 KAFKA-4.0.0-1.4.0.0.p0.1]$
bin/kafka-console-producer --broker-listhadoop102:9092 --topic topic_start
>hello world
>atguigu atguigu
2.3.8 消费消息
[root@hadoop103 KAFKA-4.0.0-1.4.0.0.p0.1]$
bin/kafka-console-consumer \
--bootstrap-server hadoop102:9092--from-beginning --topic topic_start
--from-beginning:会把first主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。
2.3.9 查看某个Topic的详情
[root@hadoop102 KAFKA-4.0.0-1.4.0.0.p0.1]$ bin/kafka-topics--zookeeper hadoop102:2181 --describe--topic topic_start
2.4 Flume消费Kafka数据写到HDFS
1)集群规划
服务器hadoop102服务器hadoop103服务器hadoop104
Flume(消费Kafka) Flume
2)Flume配置分析
3)Flume的具体配置如下:
(1)在CM管理页面hadoop104上Flume的配置中找到代理名称
a1
在配置文件如下内容(kafka-hdfs)
## 组件
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2
## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers =hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_start
## source2
a1.sources.r2.type =org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers =hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r2.kafka.topics=topic_event
## channel1
a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=10000
## channel2
a1.channels.c2.type=memory
a1.channels.c2.capacity=100000
a1.channels.c2.transactionCapacity=10000
## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second
##sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-
a1.sinks.k2.hdfs.round = true
a1.sinks.k2.hdfs.roundValue = 10
a1.sinks.k2.hdfs.roundUnit = second
## 不要产生大量小文件
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k2.hdfs.rollInterval = 10
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0
## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k2.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = lzop
a1.sinks.k2.hdfs.codeC = lzop
## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2
2.5 日志生成数据传输到HDFS
1)将log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar上传都hadoop102的/opt/module目录
2)分发log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar到hadoop103
[root@hadoop102
module]# xsync log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar
3)在/root/bin目录下创建脚本lg.sh
[root@hadoop102 bin]$ vim lg.sh
4)在脚本中编写如下内容
#! /bin/bash
for iin hadoop102 hadoop103
do
ssh$i "java -classpath/opt/module/log-collector-1.0-SNAPSHOT-jar-with-dependencies.jarcom.atguigu.appclient.AppMain$1 $2>/opt/module/test.log &"
done
5)修改脚本执行权限
[root@hadoop102 bin]$ chmod 777 lg.sh
6)启动脚本
[root@hadoop102 module]$ lg.sh
第3章 数仓搭建环境准备
3.1 Hive安装
1)添加服务
2)添加Hive服务
3)将 Hive 服务添加到Cluster1
4)配置hive元数据
5)测试通过后继续
6)自动启动Hive进程
7)修改Hive配置
/opt/cloudera/parcels/HADOOP_LZO/lib/hadoop/lib/
3.2 Oozie安装
3.2.1 添加Oozie服务
3.2.2 选择集群节点
3.2.3 选择有MySQL的节点安装
3.2.4 链接数据库
3.2.5 一路继续到完成
3.3 Hue安装
3.3.1 Hue概述
1)Hue来源
HUE=Hadoop User Experience(Hadoop用户体验),直白来说就一个开源的Apache Hadoop UI系统,由Cloudera Desktop演化而来,最后Cloudera公司将其贡献给Apache基金会的Hadoop社区,它是基于Python Web框架Django实现的。通过使用HUE我们可以在浏览器端的Web控制台上与Hadoop集群进行交互来分析处理数据。
2)Hue官网及使用者
官网网站:http://gethue.com/
3.3.2 安装前的准备
1)在LoadBalancer节点安装mod_ssl
[root@hadoop102 ~]# yum -y install mod_ssl
2)查看/usr/lib64/mysql下有没有libmysqlclient_r.so.16,如果没有,上传libmysqlclient_r.so.16到/usr/lib64/mysql,并软链接到/usr/lib64/
[root@hadoop102 ~]# ls /usr/lib64/mysql
[root@hadoop103mysql]#
scp /usr/lib64/mysql/libmysqlclient_r.so.16
root@hadoop102:/usr/lib64/mysql/
[root@hadoop102 ~]ln -s /usr/lib64/mysql/libmysqlclient_r.so.16
/usr/lib64/libmysqlclient_r.so.16
3.3.3 HUE安装步骤
1)添加HUE服务
2)选择集群节点
3)分配角色
4)配置数据库
5)安装完成
6)HUE页面
http://hadoop102:8888(未优化)或http://hadoop102:8889(优化)
第一次开启HUE会出现以下页面,此时输入的用户名和密码可以随意,之后登录页面以第一次输入的账号密码为依据。例如,用户名:admin 密码:admin
第4章 用户行为数仓搭建
4.1 ODS层
原始数据层,存放原始数据,直接加载原始日志、数据,数据保持原貌不做处理。
4.1.1 创建数据库
1)创建gmall数据库
hive (default)> create database gmall;
说明:如果数据库存在且有数据,需要强制删除时执行:drop database gmall cascade;
2)使用gmall数据库
hive (default)> use gmall;
4.1.2 创建启动日志表ods_start_log
1)创建输入数据是lzo输出是text,支持json解析的分区表
hive (gmall)>
drop table if exists ods_start_log;
CREATE EXTERNAL TABLE ods_start_log (`line`string)
PARTITIONED BY (`dt` string)
STORED AS
INPUTFORMAT'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION'/warehouse/gmall/ods/ods_start_log';
说明Hive的LZO压缩:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LZO
4.1.3 ODS层加载数据脚本
1)在hadoop102的/root/bin目录下创建脚本
[root@hadoop102 bin]$ vim ods_log.sh
在脚本中编写如下内容
#!/bin/bash
# 定义变量方便修改
APP=gmall
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi
echo "===日志日期为$do_date==="
sql="
load data inpath '/origin_data/gmall/log/topic_start/$do_date'into table "$APP".ods_start_log partition(dt='$do_date');
"
hive -e "$sql"
说明1:
[ -n 变量值] 判断变量的值,是否为空
-- 变量的值,非空,返回true
-- 变量的值,为空,返回false
说明2:
查看date命令的使用,[root@hadoop102 ~]$date --help
2)增加脚本执行权限
[root@hadoop102 bin]$ chmod 777 ods_log.sh
3)脚本使用
[root@hadoop102 module]$ ods_log.sh 2019-02-10
4.2 DWD层启动表数据解析
4.2.1 创建启动表
1)建表语句
hive (gmall)>
drop table if exists dwd_start_log;
CREATE EXTERNAL TABLE dwd_start_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`entry` string,
`open_ad_type`string,
`action` string,
`loading_time`string,
`detail` string,
`extend1` string
)
PARTITIONED BY (dt string)
location '/warehouse/gmall/dwd/dwd_start_log/';
4.2.2 DWD层启动表加载数据脚本
1)在hadoop102的/root/bin目录下创建脚本
[root@hadoop102 bin]$ vim dwd_start_log.sh
在脚本中编写如下内容
#!/bin/bash
# 定义变量方便修改
APP=gmall
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;then
do_date=$1
else
do_date=`date-d "-1 day" +%F`
fi
sql="
set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table "$APP".dwd_start_log
PARTITION (dt='$do_date')
select
get_json_object(line,'$.mid') mid_id,
get_json_object(line,'$.uid') user_id,
get_json_object(line,'$.vc') version_code,
get_json_object(line,'$.vn') version_name,
get_json_object(line,'$.l') lang,
get_json_object(line,'$.sr') source,
get_json_object(line,'$.os') os,
get_json_object(line,'$.ar') area,
get_json_object(line,'$.md') model,
get_json_object(line,'$.ba')brand,
get_json_object(line,'$.sv') sdk_version,
get_json_object(line,'$.g') gmail,
get_json_object(line,'$.hw') height_width,
get_json_object(line,'$.t') app_time,
get_json_object(line,'$.nw') network,
get_json_object(line,'$.ln')lng,
get_json_object(line,'$.la') lat,
get_json_object(line,'$.entry') entry,
get_json_object(line,'$.open_ad_type') open_ad_type,
get_json_object(line,'$.action') action,
get_json_object(line,'$.loading_time') loading_time,
get_json_object(line,'$.detail') detail,
get_json_object(line,'$.extend1') extend1
from "$APP".ods_start_log
where dt='$do_date';
"
hive -e "$sql"
2)增加脚本执行权限
[root@hadoop102 bin]$ chmod 777 dwd_start_log.sh
3)脚本使用
[root@hadoop102 module]$ dwd_start_log.sh2019-02-10
4.3 DWS层(需求:用户日活跃)
目标:统计当日、当周、当月活动的每个设备明细
4.3.1 每日活跃设备明细
1)建表语句
hive (gmall)>
drop table if exists dws_uv_detail_day;
create external table dws_uv_detail_day
(
`mid_id` string COMMENT '设备唯一标识',
`user_id` string COMMENT '用户标识',
`version_code` string COMMENT '程序版本号',
`version_name` string COMMENT '程序版本名',
`lang` string COMMENT '系统语言',
`source` string COMMENT '渠道号',
`os` string COMMENT '安卓系统版本',
`area` string COMMENT '区域',
`model` string COMMENT '手机型号',
`brand` string COMMENT '手机品牌',
`sdk_version` string COMMENT 'sdkVersion',
`gmail` string COMMENT 'gmail',
`height_width` string COMMENT '屏幕宽高',
`app_time` string COMMENT '客户端日志产生时的时间',
`network` string COMMENT '网络模式',
`lng` string COMMENT '经度',
`lat` string COMMENT '纬度'
)
partitioned by(dt string)
stored as parquet
location '/warehouse/gmall/dws/dws_uv_detail_day'
;
4.3.2 DWS层加载数据脚本
1)在hadoop102的/root/bin目录下创建脚本
[root@hadoop102 bin]$ vim dws_log.sh
在脚本中编写如下内容
#!/bin/bash
# 定义变量方便修改
APP=gmall
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;then
do_date=$1
else
do_date=`date-d "-1 day" +%F`
fi
sql="
sethive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table "$APP".dws_uv_detail_daypartition(dt='$do_date')
select
mid_id,
concat_ws('|', collect_set(user_id)) user_id,
concat_ws('|', collect_set(version_code)) version_code,
concat_ws('|', collect_set(version_name)) version_name,
concat_ws('|', collect_set(lang)) lang,
concat_ws('|', collect_set(source)) source,
concat_ws('|', collect_set(os)) os,
concat_ws('|', collect_set(area)) area,
concat_ws('|', collect_set(model)) model,
concat_ws('|', collect_set(brand)) brand,
concat_ws('|', collect_set(sdk_version)) sdk_version,
concat_ws('|', collect_set(gmail)) gmail,
concat_ws('|', collect_set(height_width)) height_width,
concat_ws('|', collect_set(app_time)) app_time,
concat_ws('|', collect_set(network)) network,
concat_ws('|', collect_set(lng)) lng,
concat_ws('|', collect_set(lat)) lat
from"$APP".dwd_start_log
wheredt='$do_date'
groupby mid_id;
"
hive -e "$sql"
2)增加脚本执行权限
[root@hadoop102 bin]$ chmod 777 dws_log.sh
3)脚本使用
[root@hadoop102 module]$ dws_log.sh 2019-02-10
4.4 ADS层(需求:用户日活跃)
目标:当日活跃设备数
4.4.1 活跃设备数
1)建表语句
hive (gmall)>
drop table if exists ads_uv_count;
create external table ads_uv_count(
`dt` string COMMENT '统计日期',
`day_count` bigint COMMENT '当日用户数量'
) COMMENT '活跃设备数'
row format delimited fields terminated by
'\t'
location '/warehouse/gmall/ads/ads_uv_count/'
;
4.4.2 ADS层加载数据脚本
1)在hadoop102的/root/bin目录下创建脚本
[root@hadoop102 bin]$ vim ads_uv_log.sh
在脚本中编写如下内容
#!/bin/bash
# 定义变量方便修改
APP=gmall
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;then
do_date=$1
else
do_date=`date-d "-1 day" +%F`
fi
sql="
sethive.exec.dynamic.partition.mode=nonstrict;
insert into table"$APP".ads_uv_count
select
'$do_date' dt,
daycount.ct
from
(
select
'$do_date' dt,
count(*) ct
from"$APP".dws_uv_detail_day
where dt='$do_date'
)daycount;
"
hive -e "$sql"
2)增加脚本执行权限
[root@hadoop102 bin]$ chmod 777 ads_uv_log.sh
3)脚本使用
[root@hadoop102 module]$ ads_uv_log.sh2019-02-10
第5章 业务数仓搭建
5.1 业务数据生成
5.1.1 建表语句
1)通过SQLyog创建数据库gmall
2)设置数据库编码
3)导入建表语句(1建表脚本)
选择->1建表脚本.sql
4)重复步骤3的导入方式,依次导入:2商品分类数据插入脚本、3函数脚本、4存储过程脚本。
5.1.2 生成业务数据
1)生成业务数据函数说明
init_data ( do_date_string VARCHAR(20) , order_incr_numINT, user_incr_num INT , sku_num INT , if_truncate BOOLEAN ):
参数一:do_date_string生成数据日期
参数二:order_incr_num订单id个数
参数三:user_incr_num用户id个数
参数四:sku_num商品sku个数
参数五:if_truncate是否删除数据
2)案例测试:
(1)需求:生成日期2019年2月10日数据、订单1000个、用户200个、商品sku300个、删除原始数据。
CALL init_data('2019-02-10',1000,200,300,TRUE);
(2)查询生成数据结果
SELECT * from base_category1;
SELECT * from base_category2;
SELECT * from base_category3;
SELECT * from order_info;
SELECT * from order_detail;
SELECT * from sku_info;
SELECT * from user_info;
SELECT * from payment_info;
5.2 业务数据导入数仓
5.2.1 Sqoop安装
1)添加服务
2)选择Sqoop
目前选择sqoop1即可
3)选择节点
5.2.2 Sqoop定时导入脚本
1)在/root/bin目录下创建脚本sqoop_import.sh
[root@hadoop102
bin]$ vim sqoop_import.sh
在脚本中填写如下内容
#!/bin/bash
db_date=$2
echo
$db_date
db_name=gmall
import_data()
{
sqoop
import \
--connect
jdbc:mysql://hadoop102:3306/$db_name \
--username
root \
--password
000000 \
--target-dir /origin_data/$db_name/db/$1/$db_date \
--delete-target-dir
\
--num-mappers
1 \
--fields-terminated-by
"\t" \
--query
"$2"'and $CONDITIONS;'
}
import_sku_info(){
import_data "sku_info" "select
id, spu_id, price, sku_name, sku_desc,
weight, tm_id,
category3_id, create_time
from sku_info where 1=1"
}
import_user_info(){
import_data "user_info""select
id, name, birthday, gender, email,
user_level,
create_time
from user_info where 1=1"
}
import_base_category1(){
import_data "base_category1""select
id, name from base_category1 where 1=1"
}
import_base_category2(){
import_data "base_category2""select
id, name, category1_id from
base_category2 where
1=1"
}
import_base_category3(){
import_data "base_category3""select id, name, category2_id from base_category3where 1=1"
}
import_order_detail(){
import_data "order_detail" "select
od.id,
order_id,
user_id,
sku_id,
sku_name,
order_price,
sku_num,
o.create_time
from order_info o , order_detail od
whereo.id=od.order_id
andDATE_FORMAT(create_time,'%Y-%m-%d')='$db_date'"
}
import_payment_info(){
import_data "payment_info" "select
id,
out_trade_no,
order_id,
user_id,
alipay_trade_no,
total_amount,
subject,
payment_type,
payment_time
from payment_info
whereDATE_FORMAT(payment_time,'%Y-%m-%d')='$db_date'"
}
import_order_info(){
import_data "order_info" "select
id,
total_amount,
order_status,
user_id,
payment_way,
out_trade_no,
create_time,
operate_time
from order_info
where (DATE_FORMAT(create_time,'%Y-%m-%d')='$db_date' or DATE_FORMAT(operate_time,'%Y-%m-%d')='$db_date')"
}
case
$1 in
"base_category1")
import_base_category1
;;
"base_category2")
import_base_category2
;;
"base_category3")
import_base_category3
;;
"order_info")
import_order_info
;;
"order_detail")
import_order_detail
;;
"sku_info")
import_sku_info
;;
"user_info")
import_user_info
;;
"payment_info")
import_payment_info
;;
"all")
import_base_category1
import_base_category2
import_base_category3
import_order_info
import_order_detail
import_sku_info
import_user_info
import_payment_info
;;
esac
2)增加脚本执行权限
[root@hadoop102 bin]$ chmod 777 sqoop_import.sh
3)执行脚本导入数据
[root@hadoop102 bin]$ sqoop_import.sh all2019-02-10
5.3 ODS层
完全仿照业务数据库中的表字段,一模一样的创建ODS层对应表。
5.3.1 创建订单表
hive (gmall)>
drop table if exists ods_order_info;
create external table ods_order_info (
`id` string COMMENT '订单编号',
`total_amount` decimal(10,2) COMMENT '订单金额',
`order_status` string COMMENT '订单状态',
`user_id` string COMMENT '用户id' ,
`payment_way` string
COMMENT '支付方式',
`out_trade_no` string
COMMENT '支付流水号',
`create_time` string
COMMENT '创建时间',
`operate_time` string
COMMENT '操作时间'
) COMMENT '订单表'
PARTITIONED BY ( `dt` string)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ods/ods_order_info/'
;
5.3.2 创建订单详情表
hive (gmall)>
drop table if exists ods_order_detail;
create external table ods_order_detail(
`id` string COMMENT '订单编号',
`order_id` string COMMENT '订单号',
`user_id` string COMMENT '用户id' ,
`sku_id` string
COMMENT '商品id',
`sku_name` string
COMMENT '商品名称',
`order_price` string
COMMENT '商品价格',
`sku_num` string
COMMENT '商品数量',
`create_time` string
COMMENT '创建时间'
) COMMENT '订单明细表'
PARTITIONED BY ( `dt` string)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ods/ods_order_detail/'
;
5.3.3 创建商品表
hive (gmall)>
drop table if exists ods_sku_info;
create external table ods_sku_info(
`id` string COMMENT 'skuId',
`spu_id` string COMMENT 'spuid',
`price` decimal(10,2) COMMENT '价格' ,
`sku_name` string
COMMENT '商品名称',
`sku_desc` string
COMMENT '商品描述',
`weight` string
COMMENT '重量',
`tm_id` string COMMENT
'品牌id',
`category3_id` string
COMMENT '品类id',
`create_time` string
COMMENT '创建时间'
) COMMENT '商品表'
PARTITIONED BY ( `dt` string)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ods/ods_sku_info/'
;
5.3.4 创建用户表
hive (gmall)>
drop table if exists ods_user_info;
create external table ods_user_info(
`id` string COMMENT '用户id',
`name` string COMMENT '姓名',
`birthday` string COMMENT '生日' ,
`gender` string
COMMENT '性别',
`email` string COMMENT
'邮箱',
`user_level` string
COMMENT '用户等级',
`create_time` string
COMMENT '创建时间'
) COMMENT '用户信息'
PARTITIONED BY ( `dt` string)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ods/ods_user_info/'
;
5.3.5 创建商品一级分类表
hive (gmall)>
drop table if exists ods_base_category1;
create external table ods_base_category1(
`id` string COMMENT 'id',
`name` string COMMENT '名称'
) COMMENT '商品一级分类'
PARTITIONED BY ( `dt` string)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ods/ods_base_category1/'
;
5.3.6 创建商品二级分类表
hive (gmall)>
drop table if exists ods_base_category2;
create external table ods_base_category2(
`id` string COMMENT ' id',
`name` string COMMENT '名称',
category1_id string COMMENT '一级品类id'
) COMMENT '商品二级分类'
PARTITIONED BY ( `dt` string)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ods/ods_base_category2/'
;
5.3.7 创建商品三级分类表
hive (gmall)>
drop table if exists ods_base_category3;
create external table ods_base_category3(
`id` string COMMENT ' id',
`name` string COMMENT '名称',
category2_id string COMMENT '二级品类id'
) COMMENT '商品三级分类'
PARTITIONED BY ( `dt` string)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ods/ods_base_category3/'
;
5.3.8 创建支付流水表
hive (gmall)>
drop table if exists `ods_payment_info`;
create external table `ods_payment_info`(
`id`
bigint COMMENT '编号',
`out_trade_no` string COMMENT '对外业务编号',
`order_id`
string COMMENT '订单编号',
`user_id`
string COMMENT '用户编号',
`alipay_trade_no`
string COMMENT '支付宝交易流水编号',
`total_amount`
decimal(16,2) COMMENT '支付金额',
`subject`
string COMMENT '交易内容',
`payment_type` string
COMMENT '支付类型',
`payment_time`
string COMMENT '支付时间'
) COMMENT '支付流水表'
PARTITIONED BY ( `dt` string)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ods/ods_payment_info/'
;
5.3.9 ODS层数据导入脚本
1)在/root/bin目录下创建脚本ods_db.sh
[root@hadoop102
bin]$ vim ods_db.sh
在脚本中填写如下内容
#!/bin/bash
APP=gmall
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;then
do_date=$1
else
do_date=`date-d "-1 day" +%F`
fi
sql="
load data inpath '/origin_data/$APP/db/order_info/$do_date' OVERWRITE into table"$APP".ods_order_info partition(dt='$do_date');
load data inpath'/origin_data/$APP/db/order_detail/$do_date' OVERWRITE into table "$APP".ods_order_detailpartition(dt='$do_date');
load data inpath'/origin_data/$APP/db/sku_info/$do_date' OVERWRITE into table "$APP".ods_sku_infopartition(dt='$do_date');
load data inpath
'/origin_data/$APP/db/user_info/$do_date' OVERWRITE into table
"$APP".ods_user_info partition(dt='$do_date');
load data inpath
'/origin_data/$APP/db/payment_info/$do_date' OVERWRITE into table
"$APP".ods_payment_info partition(dt='$do_date');
load data inpath
'/origin_data/$APP/db/base_category1/$do_date' OVERWRITE into table
"$APP".ods_base_category1 partition(dt='$do_date');
load data inpath
'/origin_data/$APP/db/base_category2/$do_date' OVERWRITE into table
"$APP".ods_base_category2 partition(dt='$do_date');
load data inpath
'/origin_data/$APP/db/base_category3/$do_date' OVERWRITE into table
"$APP".ods_base_category3 partition(dt='$do_date');
"
hive -e "$sql"
2)增加脚本执行权限
[root@hadoop102 bin]$ chmod 777 ods_db.sh
3)采用脚本导入数据
[root@hadoop102 bin]$ ods_db.sh 2019-02-10
5.4 DWD层
对ODS层数据进行判空过滤。对商品分类表进行维度退化(降维)。
5.4.1 创建订单表
hive (gmall)>
drop table if exists dwd_order_info;
create external table dwd_order_info (
`id` string COMMENT '',
`total_amount` decimal(10,2) COMMENT '',
`order_status` string COMMENT ' 1 2 3 4 5',
`user_id` string COMMENT 'id' ,
`payment_way` string
COMMENT '',
`out_trade_no` string
COMMENT '',
`create_time` string
COMMENT '',
`operate_time` string
COMMENT ''
)
PARTITIONED BY ( `dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_order_info/'
;
5.4.2 创建订单详情表
hive (gmall)>
drop table if exists dwd_order_detail;
create external table dwd_order_detail(
`id` string COMMENT '',
`order_id` decimal(10,2) COMMENT '',
`user_id` string COMMENT 'id' ,
`sku_id` string
COMMENT 'id',
`sku_name` string
COMMENT '',
`order_price` string
COMMENT '',
`sku_num` string
COMMENT '',
`create_time` string
COMMENT ''
)
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_order_detail/'
;
5.4.3 创建用户表
hive (gmall)>
drop table if exists dwd_user_info;
create external table dwd_user_info(
`id` string COMMENT 'id',
`name` string COMMENT '',
`birthday` string COMMENT '' ,
`gender` string
COMMENT '',
`email` string COMMENT
'',
`user_level` string
COMMENT '',
`create_time` string COMMENT
''
)
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_user_info/'
;
5.4.4 创建支付流水表
hive (gmall)>
drop table if exists `dwd_payment_info`;
create external table `dwd_payment_info`(
`id`
bigint COMMENT '',
`out_trade_no` string COMMENT '',
`order_id`
string COMMENT '',
`user_id`
string COMMENT '',
`alipay_trade_no`
string COMMENT '',
`total_amount`
decimal(16,2) COMMENT '',
`subject`
string COMMENT '',
`payment_type` string COMMENT '',
`payment_time` string COMMENT ''
)
PARTITIONED BY ( `dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_payment_info/'
;
5.4.5 创建商品表(增加分类)
hive (gmall)>
drop table if exists dwd_sku_info;
create external table dwd_sku_info(
`id` string COMMENT 'skuId',
`spu_id` string COMMENT 'spuid',
`price` decimal(10,2) COMMENT '' ,
`sku_name` string
COMMENT '',
`sku_desc` string
COMMENT '',
`weight` string
COMMENT '',
`tm_id` string COMMENT
'id',
`category3_id` stringCOMMENT '1id',
`category2_id` string COMMENT '2id',
`category1_id` string COMMENT '3id',
`category3_name` string COMMENT '3',
`category2_name` string COMMENT '2',
`category1_name` string COMMENT '1',
`create_time` stringCOMMENT ''
)
PARTITIONED BY ( `dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_sku_info/'
;
5.4.6 DWD层数据导入脚本
1)在/root/bin目录下创建脚本dwd_db.sh
[root@hadoop102
bin]$ vim dwd_db.sh
在脚本中填写如下内容
#!/bin/bash
# 定义变量方便修改
APP=gmall
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;then
do_date=$1
else
do_date=`date-d "-1 day" +%F`
fi
sql="
set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table "$APP".dwd_order_infopartition(dt)
select * from "$APP".ods_order_info
where dt='$do_date' and id is not null;
insert overwrite table "$APP".dwd_order_detailpartition(dt)
select * from"$APP".ods_order_detail
where dt='$do_date' andid is not null;
insert overwrite table "$APP".dwd_user_info partition(dt)
select * from "$APP".ods_user_info
where dt='$do_date' andid is not null;
insert overwrite table "$APP".dwd_payment_infopartition(dt)
select * from"$APP".ods_payment_info
where dt='$do_date' andid is not null;
insert overwrite table "$APP".dwd_sku_info partition(dt)
select
sku.id,
sku.spu_id,
sku.price,
sku.sku_name,
sku.sku_desc,
sku.weight,
sku.tm_id,
sku.category3_id,
c2.id category2_id ,
c1.id category1_id,
c3.namecategory3_name,
c2.namecategory2_name,
c1.name category1_name,
sku.create_time,
sku.dt
from
"$APP".ods_sku_info sku
join "$APP".ods_base_category3 c3 on sku.category3_id=c3.id
join "$APP".ods_base_category2 c2 on c3.category2_id=c2.id
join "$APP".ods_base_category1 c1 on c2.category1_id=c1.id
where sku.dt='$do_date' andc2.dt='$do_date'
and c3.dt='$do_date'and c1.dt='$do_date'
andsku.id is not null;
"
hive -e "$sql"
2)增加脚本执行权限
[root@hadoop102 bin]$ chmod 777 dwd_db.sh
3)采用脚本导入数据
[root@hadoop102 bin]$ dwd_db.sh 2019-02-10
5.5 DWS层之用户行为宽表
1)为什么要建宽表
需求目标,把每个用户单日的行为聚合起来组成一张多列宽表,以便之后关联用户维度信息后进行,不同角度的统计分析。
5.5.1 创建用户行为宽表
hive (gmall)>
drop table if exists dws_user_action;
create external table dws_user_action
(
user_id string comment '用户id',
order_count bigint comment '下单次数',
order_amount decimal(16,2) comment '下单金额',
payment_count bigint comment '支付次数',
payment_amount decimal(16,2) comment '支付金额'
) COMMENT '每日用户行为宽表'
PARTITIONED BY (`dt` string)
stored as parquet
location'/warehouse/gmall/dws/dws_user_action/'
tblproperties("parquet.compression"="snappy");
5.5.2 用户行为数据宽表导入脚本
1)在/root/bin目录下创建脚本dws_db_wide.sh
[root@hadoop102
bin]$ vim dws_db_wide.sh
在脚本中填写如下内容
#!/bin/bash
# 定义变量方便修改
APP=gmall
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;then
do_date=$1
else
do_date=`date-d "-1 day" +%F`
fi
sql="
with
tmp_order as
(
select
user_id,
count(*) order_count,
sum(oi.total_amount) order_amount
from "$APP".dwd_order_info oi
where date_format(oi.create_time,'yyyy-MM-dd')='$do_date'
group by user_id
) ,
tmp_payment as
(
select
user_id,
sum(pi.total_amount) payment_amount,
count(*) payment_count
from "$APP".dwd_payment_info pi
where date_format(pi.payment_time,'yyyy-MM-dd')='$do_date'
group by user_id
)
insert overwrite table"$APP".dws_user_action partition(dt='$do_date')
select
user_actions.user_id,
sum(user_actions.order_count),
sum(user_actions.order_amount),
sum(user_actions.payment_count),
sum(user_actions.payment_amount)
from
(
select
user_id,
order_count,
order_amount,
0 payment_count,
0 payment_amount
from tmp_order
union all
select
user_id,
0 order_count,
0 order_amount,
payment_count,
payment_amount
from tmp_payment
)user_actions
group by user_id;
"
hive -e "$sql"
2)增加脚本执行权限
[root@hadoop102 bin]$ chmod 777 dws_db_wide.sh
3)增加脚本执行权限
[root@hadoop102 bin]$ dws_db_wide.sh
2019-02-10
5.6 ADS层(需求:GMV成交总额)
5.6.1 建表语句
hive (gmall)>
drop table if exists ads_gmv_sum_day;
create external table
ads_gmv_sum_day(
`dt` string COMMENT '统计日期',
`gmv_count` bigint COMMENT '当日gmv订单个数',
`gmv_amount` decimal(16,2) COMMENT '当日gmv订单总金额',
`gmv_payment` decimal(16,2) COMMENT '当日支付金额'
) COMMENT 'GMV'
row format delimited fields terminated by
'\t'
location '/warehouse/gmall/ads/ads_gmv_sum_day/'
;
5.6.2 数据导入脚本
1)在/root/bin目录下创建脚本ads_db_gmv.sh
[root@hadoop102
bin]$ vim ads_db_gmv.sh
在脚本中填写如下内容
#!/bin/bash
# 定义变量方便修改
APP=gmall
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;then
do_date=$1
else
do_date=`date-d "-1 day" +%F`
fi
sql="
insert into table "$APP".ads_gmv_sum_day
select
'$do_date' dt,
sum(order_count) gmv_count,
sum(order_amount) gmv_amount,
sum(payment_amount) payment_amount
from "$APP".dws_user_action
where dt ='$do_date'
group by dt;
"
hive -e "$sql"
2)增加脚本执行权限
[root@hadoop102 bin]$ chmod 777 ads_db_gmv.sh
3)执行脚本
[root@hadoop102 bin]$ ads_db_gmv.sh
2019-02-10
5.6.3 数据导出脚本
1)在MySQL中创建ads_gmv_sum_day表
DROP TABLE IF EXISTS ads_gmv_sum_day;
CREATE TABLE ads_gmv_sum_day(
`dt` varchar(200) DEFAULT NULL COMMENT '统计日期',
`gmv_count` bigint(20) DEFAULT NULL COMMENT '当日gmv订单个数',
`gmv_amount` decimal(16, 2) DEFAULT NULL COMMENT '当日gmv订单总金额',
`gmv_payment` decimal(16, 2) DEFAULT NULL COMMENT '当日支付金额'
) ENGINE = InnoDB CHARACTER SET = utf8
COLLATE = utf8_general_ci COMMENT = '每日活跃用户数量' ROW_FORMAT =
Dynamic;
2)在/root/bin目录下创建脚本sqoop_export.sh
[root@hadoop102
bin]$ vim sqoop_export.sh
在脚本中填写如下内容
#!/bin/bash
db_name=gmall
export_data() {
sqoop export \
--connect "jdbc:mysql://hadoop102:3306/${db_name}?useUnicode=true&characterEncoding=utf-8" \
--username root \
--password 000000 \
--table $1 \
--num-mappers 1 \
--export-dir /warehouse/$db_name/ads/$1 \
--input-fields-terminated-by
"\t" \
--update-mode allowinsert \
--update-key
"tm_id,category1_id,stat_mn,stat_date" \
--input-null-string '\\N' \
--input-null-non-string '\\N'
}
case $1 in
"ads_gmv_sum_day")
export_data "ads_gmv_sum_day"
;;
"all")
export_data "ads_gmv_sum_day"
;;
esac
3)增加脚本执行权限
[root@hadoop102 bin]$ chmod 777 sqoop_export.sh
4)执行脚本导入数据
[root@hadoop102 bin]$ sqoop_export.sh
all
5)在SQLyog查看导出数据
select * from ads_gmv_sum_day
5.7 Oozie基于Hue实现GMV指标全流程调度
5.7.1 执行前的准备
1)添加MySQL驱动文件
[root@hadoop102 ~]# cp /opt/software/mysql-libs/mysql-connector-java-5.1.27/mysql-connector-java-5.1.27-bin.jar/opt/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/lib
[root@hadoop102 ~]# cp/opt/software/mysql-libs/mysql-connector-java-5.1.27/mysql-connector-java-5.1.27-bin.jar/opt/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/sqoop/lib/
[root@hadoop102 mysql-connector-java-5.1.27]#hadoop fs -put /opt/software/mysql-libs/mysql-connector-java-5.1.27/mysql-connector-java-5.1.27-bin.jar/user/oozie/share/lib/lib_20190603220831/sqoop
[root@hadoop102 mysql-connector-java-5.1.27]#xsync /opt/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/lib
[root@hadoop102mysql-connector-java-5.1.27]# xsync /opt/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/sqoop/lib/
2)修改YARN的容器内存yarn.nodemanager.resource.memory-mb为4G
5)新建一个Hue账户
6)切换用户
5.7.2 在Hue中创建Oozie任务GMV
1)生成数据
CALL init_data('2019-02-12',300,200,300,FALSE);
2)安装Oozie可视化js,复制ext-2.2.zip到/opt/cloudera/parcels/CDH/lib/oozie/libext(或/var/lib/oozie)中,解压
[root@hadoop102 libext]# unzip
ext-2.2.zip
3)查看Hue的web页面
4)选择query->scheduler->workflow
5)点击My Workflow->输入gmv
6)点击保存
5.7.3 编写任务脚本并上传到HDFS
1)点击workspace,查看上传情况
2)上传要执行的脚本导HDFS路径
[root@hadoop102 bin]# hadoop fs -put /root/bin/*.sh/user/hue/oozie/workspaces/hue-oozie-1559457755.83/lib
3)点击左侧的->Documents->gmv
5.7.4 编写任务调度
1)点击编辑
2)选择actions
3)拖拽控件编写任务
4)选择执行脚本
5)Files再选择执行脚本
6)定义脚本参数名称
7)按顺序执行3-6步骤。
ods_db.sh、dwd_db.sh、dws_db_wide.sh、ads_db_gmv.sh和sqoop_export.sh
其中ods_db.sh、dwd_db.sh、dws_db_wide.sh、ads_db_gmv.sh只有一个参数do_date;
sqoop_export.sh只有一个参数all。
8)点击保存
5.7.5 执行任务调度
1)点击执行
2)点击提交
第6章 即席查询数仓搭建
6.1 Impala安装
6.1.1 添加服务
6.1.2 选择Impala服务
6.1.3 角色分配
注意:最好将StateStore和CataLog Sever单独部署在同一节点上。
6.1.4 配置Impala
6.1.5 启动Impala
6.1.6 安装成功
6.1.7 配置Hue支持Impala
6.2 Impala基于Hue查询
第7章Spark2.1安装
在CDH5.12.1集群中,默认安装的Spark是1.6版本,这里需要将其升级为Spark2.1版本。经查阅官方文档,发现Spark1.6和2.x是可以并行安装的,也就是说可以不用删除默认的1.6版本,可以直接安装2.x版本,它们各自用的端口也是不一样的。
Cloudera发布Apache Spark 2概述(可以在这里面找到安装方法和parcel包的仓库)
cloudera的官网可以下载相关的parcel 的离线安装包:https://www.cloudera.com/documentation/spark2/latest/topics/spark2_installing.html
Cloudera Manager及5.12.0版本的介绍:https://www.cloudera.com/documentation/enterprise/latest/topics/cm_ig_parcels.html#cmug_topic_7_11_5__section
7.1 升级过程
7.1.1 离线包下载
1)所需软件:http://archive.cloudera.com/spark2/csd/
2)Parcels 包的下载地址:http://archive.cloudera.com/spark2/parcels/2.2.0.cloudera1/
7.1.2 离线包上传
1)上传文件SPARK2_ON_YARN-2.1.0.cloudera1.jar到/opt/cloudera/csd/下面
2)上传文件SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904-el6.parcel和SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904-el6.parcel.sha1到/opt/cloudera/parcel-repo/
3)将SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904-el6.parcel.sha1重命名为SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904-el6.parcel.sha
[root@hadoop102 parcel-repo]# mv
/opt/cloudera/parcel-repo/SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904-el6.parcel.sha1
/opt/cloudera/parcel-repo/SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904-el6.parcel.sha
7.2 页面操作
7.2.1 更新Parcel
在cm首页点击Parcel,再点击检查新Parcel
7.2.2 点击分配
7.2.3 点击激活
7.2.4 回到首页点击添加服务
7.2.5 点击Spark2继续
7.2.6 选择一组依赖关系
7.2.7 角色分配
7.2.8 部署并启动
注意:这里我报了一个错:客户端配置 (id=12) 已使用 1 退出,而预期值为0
1)问题原因:最后找到原因是因为CM安装Spark不会去环境变量去找Java,需要将Java路径添加到CM配置文件
2)解决方法1(需要重启cdh):
[root@hadoop102 java]# vim
/opt/module/cm/cm-5.12.1/lib64/cmf/service/client/deploy-cc.sh
在文件最后加上
JAVA_HOME= /opt/module/jdk1.8.0_144
export JAVA_HOME= /opt/module/jdk1.8.0_144
3)解决方法2(无需重启cdh):
查看/opt/module/cm/cm-5.12.1/lib64/cmf/service/common/cloudera-config.sh
找到java8的home目录
cdh不会使用系统默认的JAVA_HOME环境变量,而是依照bigtop进行管理,因此我们需要在指定的/usr/java/jdk1.8目录下安装jdk。当然我们已经在/opt/module/jdk1.8.0_144下安装了jdk,因此创建一个连接过去即可
[root@hadoop102 ~]# ln -s /opt/module/jdk1.8.0_144/
/usr/java/jdk1.8
[root@hadoop103 ~]# ln -s
/opt/module/jdk1.8.0_144/ /usr/java/jdk1.8
[root@hadoop104 ~]# ln -s
/opt/module/jdk1.8.0_144/ /usr/java/jdk1.8
3)解决方法3(需要重启cdh):
找到hadoop102、hadoop103、hadoop104三台机器的配置,配置java主目录
7.2.9 命令行查看命令
第8章 Sentry安装
8.1 Sentry安装部署
8.1.1 添加Sentry服务
8.1.2 选择添加到集群
8.1.3 自定义Sentry角色分配
8.1.4 Mysql中创建Sentry库
8.1.5 配置数据库连接
8.1.6 成功完成Sentry的服务添加
8.2 Sentry与Hive集成使用
8.2.1 Hive的配置中需要配置以下参数
设置sentry.hive.testing.mode 为true
8.2.2 配置Hive使用Sentry服务
8.2.3 关闭Hive的用户模拟功能
8.3 Sentry与Hive的使用
8.3.1 使用beeline连接HiveServer2,并登录hive用户
[root@hadoop104 init.d]#beeline
Java
HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=512M;
support was removed in 8.0
Java
HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=512M;
support was removed in 8.0
Beeline
version 1.1.0-cdh5.12.1 by Apache Hive
beeline>
! connect
jdbc:hive2://hadoop104:10000
scan
complete in 484ms
Connecting
to jdbc:hive2://hadoop104:10000
Enter
username for jdbc:hive2://hadoop104:10000: hive
Enter
password for jdbc:hive2://hadoop104:10000: (直接回车)
Connected
to: Apache Hive (version 1.1.0-cdh5.12.1)
Driver:
Hive JDBC (version 1.1.0-cdh5.12.1)
Transaction
isolation: TRANSACTION_REPEATABLE_READ
0:
jdbc:hive2://hadoop104:10000>
注意:标红部分为输入的hive用户,输入的hive用户并未真正的校验。
8.3.2 创建一个admin角色
0: jdbc:hive2://hadoop104:10000> create role admin;
8.3.3 为admin角色赋予超级权限
0:jdbc:hive2://hadoop104:10000> grant all on server server1 to
role admin;
8.3.4 将admin角色授权给hive用户组
0: jdbc:hive2://hadoop104:10000> grant role admin to group hive;
8.3.5 创建test表
使用beeline登录hive用户,创建一个test表,并插入测试数据
0:jdbc:hive2://hadoop104:10000>
create table test (s1 string, s2 string) row format delimited fields terminated by ',';
0: jdbc:hive2://hadoop104:10000> insert into test values('a','b'),('1','2');
8.3.6 创建测试角色并授权给用户组
创建两个角色:
read:只能读default库test表,并授权给user_r用户组
write:只能写default库test表,并授权给user_w用户组
注意:集群所有节点必须存在user_r和user_w用户,用户默认用户组与用户名一致,赋权是针对用户组而不是针对用户。
[root@hadoop104 ~]# useradd user_r
[root@hadoop104
~]# id user_r
uid=502(user_r)
gid=503(user_r) 组=503(user_r)
[root@hadoop104
~]# useradd user_w
[root@hadoop104
~]# id user_w
uid=503(user_w)
gid=504(user_w) 组=504(user_w)
8.3.7 使用hive用户创建创建read和write角色,并授权read角色对test表select权限,write角色对test表insert权限
0: jdbc:hive2://hadoop104:10000> create role read;
0:
jdbc:hive2://hadoop104:10000> grant select on table test to role read;
0:
jdbc:hive2://hadoop104:10000> create role write;
0:
jdbc:hive2://hadoop104:10000> grant insert on table test to role write;
8.3.8 为user_r用户组授权read角色,为user_w用户组授权write角色
0: jdbc:hive2://hadoop104:10000> grant role read to group user_r;
0: jdbc:hive2://hadoop104:10000> grant
role write to group user_w;
8.3.9 beeline验证
1.使用user_r用户登录beeline进行验证
[root@hadoop104 init.d]#beeline
beeline>! connect jdbc:hive2://hadoop104:10000
scan complete in 4ms
Connecting to
jdbc:hive2://hadoop104:10000
Enter username for
jdbc:hive2://hadoop104:10000: user_r
Enter password for
jdbc:hive2://hadoop104:10000: (直接回车)
Connected to: Apache Hive (version
1.1.0-cdh5.12.1)
Driver: Hive JDBC (version
1.1.0-cdh5.12.1)
Transaction isolation:
TRANSACTION_REPEATABLE_READ
0:
jdbc:hive2://hadoop104:10000> show tables;
INFO : OK
+-----------+--+
| tab_name |
+-----------+--+
| test |
+-----------+--+
1
row selected (0.649 seconds)
0: jdbc:hive2://hadoop104:10000> select * from test;
INFO : OK
+----------+----------+--+
| test.s1 | test.s2 |
+----------+----------+--+
| a | b |
| 1 | 2 |
+----------+----------+--+
2
rows selected (0.485 seconds)
0:
jdbc:hive2://hadoop104:10000> insert
into test values("2", "222");
Error: Error while compiling statement: FAILED: SemanticException No valid privileges
User user_r does not have privileges for QUERY
The required privileges: Server=server1->Db=default->Table=test->action=insert; (state=42000,code=40000)
0:
jdbc:hive2://hadoop104:10000>
2.使用user_w用户登录beeline验证
[root@hadoop104 init.d]#beeline
beeline>
! connect jdbc:hive2://hadoop104:10000
Connecting
to jdbc:hive2://hadoop104:10000
Enter
username for jdbc:hive2://hadoop104:10000: user_w
Enter
password for jdbc:hive2://hadoop104:10000:
Connected
to: Apache Hive (version 1.1.0-cdh5.12.1)
Driver:
Hive JDBC (version 1.1.0-cdh5.12.1)
Transaction
isolation: TRANSACTION_REPEATABLE_READ
0:
jdbc:hive2://hadoop104:10000> select * from test;
Error: Error while compiling statement: FAILED: SemanticException No valid privileges
User user_w does not have privileges for QUERY
The required privileges: Server=server1->Db=default->Table=test->Column=s1->action=select; (state=42000,code=40000)
0:
jdbc:hive2://hadoop104:10000>insert into test values("2", "333");
…..
INFO : Total MapReduce CPU Time Spent: 20 seconds 30 msec
INFO : Completed executing command(queryId=hive_20190603173535_88f71295-e612-4b45-bff7-df1fc4310a54); Time taken: 58.184 seconds
INFO : OK
No
rows affected (59.17 seconds)
0:
jdbc:hive2://hadoop104:10000>
验证总结:
user_r用户所属组为user_r拥有test表读权限,所以只能对test表进行selecth和count操作不能进行insert操作;
user_w用户所属组为user_w拥有test表写权限,所以只能对test表进行insert操作不能进行select和count操作;
8.4 Kerberos在企业中的作用
CDH平台中的安全,认证(Kerberos/LDAP)是第一步,授权(Sentry)是第二步。如果要启用授权,必须先启用认证。但在CDH平台中给出了一种测试模式,即不启用认证而只启用Sentry授权。但强烈不建议在生产系统中这样使用,因为如果没有用户认证,授权没有任何意义形同虚设,用户可以随意使用任何超级用户登录HiveServer2或者Impala,并不会做密码校验。注:本文档仅适用于测试环境。