cdh集群安装

 

[if !supportLists]第1章        [endif]Cloudera Manager

1.1 CM简介

1.1.1 CM简介

Cloudera Manager是一个拥有集群自动化安装、中心化管理、集群监控、报警功能的一个工具,使得安装集群从几天的时间缩短在几个小时内,运维人员从数十人降低到几人以内,极大的提高集群管理的效率。

1.1.2 CM架构

1.2 环境准备

1.2.1 虚拟机准备

克隆三台虚拟机(hadoop102、hadoop103、hadoop104),配置好对应主机的网络IP、主机名称、关闭防火墙。

设置hadoop102、hadoop103、hadoop104的主机对应内存分别是:16G、4G、4G

1.2.2 SSH免密登录

配置hadoop102对hadoop102、hadoop103、hadoop104三台服务器免密登录。

1)生成公钥和私钥:

[root@hadoop102 .ssh]$

ssh-keygen -t rsa

然后敲(三个回车),就会生成两个文件id_rsa(私钥)、id_rsa.pub(公钥)

2)将公钥拷贝到要免密登录的目标机器上

[root@hadoop102 .ssh]$

ssh-copy-id hadoop102

[root@hadoop102 .ssh]$

ssh-copy-id hadoop103

[root@hadoop102 .ssh]$

ssh-copy-id hadoop104

3)重复1和2的操作,配置hadoop103对hadoop102、hadoop103、hadoop104三台服务器免密登录。

1.2.3 集群同步脚本

1)在/root目录下创建bin目录,并在bin目录下创建文件xsync,文件内容如下:

[root@hadoop102 ~]$ mkdir bin

[root@hadoop102 ~]$ cd bin/

[root@hadoop102 bin]$ vi xsync

在该文件中编写如下代码

#!/bin/bash

#1 获取输入参数个数,如果没有参数,直接退出

pcount=$#

if((pcount==0)); then

echo no args;

exit;

fi


#2 获取文件名称

p1=$1

fname=`basename $p1`

echo fname=$fname


#3 获取上级目录到绝对路径

pdir=`cd -P $(dirname $p1); pwd`

echo pdir=$pdir


#4 获取当前用户名称

user=`whoami`


#5 循环

for((host=103; host<105; host++)); do

       echo ------------------- hadoop$host --------------

       rsync -av $pdir/$fname $user@hadoop$host:$pdir

done

2)修改脚本 xsync 具有执行权限

[root@hadoop102 bin]$ chmod 777 xsync

1.2.4 安装JDK(三台)

1)在hadoop102的/opt目录下创建module和software文件夹

[root@hadoop102 opt]# mkdir module

[root@hadoop102 opt]# mkdir software

2)用SecureCRT将jdk-8u144-linux-x64.tar.gz导入到hadoop102的/opt/software目录

3)在Linux系统下的opt目录中查看软件包是否导入成功

[root@hadoop102 software]$ ls

jdk-8u144-linux-x64.tar.gz

4)解压JDK到/opt/module目录下,并修改文件的所有者和所有者组为root

[root@hadoop102 software]$ tar -zxvf

jdk-8u144-linux-x64.tar.gz -C /opt/module/

[root@hadoop102 module]# chown root:root

jdk1.8.0_144/ -R

5)配置JDK环境变量

       (1)打开/etc/profile文件

[root@hadoop102 software]$ vi

/etc/profile

在profile文件末尾添加JDK路径

#JAVA_HOME

export JAVA_HOME=/opt/module/jdk1.8.0_144

export PATH=$PATH:$JAVA_HOME/bin

       (2)让修改后的文件生效

[root@hadoop102 jdk1.8.0_144]$ source

/etc/profile

6)测试JDK是否安装成功

[root@hadoop102 jdk1.8.0_144]# java

-version

java version "1.8.0_144"

7)将hadoop102中的JDK和环境变量分发到hadoop103、hadoop104两台主机

[root@hadoop102 opt]# xsync /opt/module/

[root@hadoop102 opt]# xsync /etc/profile


分别在hadoop103、hadoop104上source一下

[root@hadoop103 ~]$ source /etc/profile

[root@hadoop104 ~]# source /etc/profile

1.2.5 集群整体操作脚本

       1)在/root/bin目录下创建脚本xcall.sh

[root@hadoop102 bin]$ vim xcall.sh

       2)在脚本中编写如下内容

#! /bin/bash


for i in hadoop102 hadoop103 hadoop104

do

       echo --------- $i ----------

       ssh $i "$*"

done

3)修改脚本执行权限

[root@hadoop102 bin]$ chmod 777 xcall.sh

4)将/etc/profile文件追加到~/.bashrc后面

[root@hadoop102 module]# cat /etc/profile>> ~/.bashrc

[root@hadoop103 module]# cat /etc/profile>> ~/.bashrc

[root@hadoop104 module]# cat /etc/profile>> ~/.bashrc

5)测试

[root@hadoop102 bin]# xcall.sh jps

1.2.6 安装MySQL

注意:一定要用root用户操作如下步骤;先卸载MySQL再安装

1)安装包准备

       (1)查看MySQL是否安装

[root@hadoop102 桌面]# rpm

-qa|grep mysql

mysql-libs-5.1.73-7.el6.x86_64

       (2)如果安装了MySQL,就先卸载

[root@hadoop102 桌面]#

rpm -e --nodeps mysql-libs-5.1.73-7.el6.x86_64

       (3)上传mysql-libs.zip到hadoop102的/opt/software目录,并解压文件到当前目录

[root@hadoop102 software]# unzip

mysql-libs.zip

[root@hadoop102 software]# ls

mysql-libs.zip

mysql-libs

       (4)进入到mysql-libs文件夹下

 [root@hadoop102 mysql-libs]#ll

总用量76048

-rw-r--r--. 1 root root 18509960 3月 26 2015 MySQL-client-5.6.24-1.el6.x86_64.rpm

-rw-r--r--. 1 root root 3575135 12月  1 2013 mysql-connector-java-5.1.27.tar.gz

-rw-r--r--. 1 root root 55782196 3月 26 2015 MySQL-server-5.6.24-1.el6.x86_64.rpm

2)安装MySQL服务器

(1)安装MySQL服务端

[root@hadoop102 mysql-libs]# rpm -ivh

MySQL-server-5.6.24-1.el6.x86_64.rpm

(2)查看产生的随机密码

[root@hadoop102 mysql-libs]# cat /root/.mysql_secret

OEXaQuS8IWkG19Xs

(3)查看MySQL状态

[root@hadoop102 mysql-libs]# service mysql status

(4)启动MySQL

[root@hadoop102 mysql-libs]# service mysql start

3)安装MySQL客户端

(1)安装MySQL客户端

[root@hadoop102 mysql-libs]# rpm -ivh MySQL-client-5.6.24-1.el6.x86_64.rpm

(2)链接MySQL(密码替换成产生的随机密码)

[root@hadoop102 mysql-libs]# mysql -uroot -pOEXaQuS8IWkG19Xs

(3)修改密码

mysql>SET PASSWORD=PASSWORD('000000');

(4)退出MySQL

mysql>exit

4)MySQL中user表中主机配置

配置只要是root用户+密码,在任何主机上都能登录MySQL数据库。

(1)进入MySQL

[root@hadoop102 mysql-libs]# mysql -uroot -p000000

(2)显示数据库

mysql>show databases;

(3)使用MySQL数据库

mysql>use mysql;

(4)展示MySQL数据库中的所有表

mysql>show tables;

(5)展示user表的结构

mysql>desc user;

(6)查询user表

mysql>select User, Host, Password from user;

(7)修改user表,把Host表内容修改为%

mysql>update user set host='%' where host='localhost';

(8)删除root用户的其他host

mysql>

delete from user where Host='hadoop102';

delete from user where Host='127.0.0.1';

delete from user where Host='::1';

(9)刷新

mysql>flushprivileges;

(10)退出

mysql>quit;

1.2.7 创建CM用的数据库

在MySQL中依次创建监控数据库、Hive数据库、Oozie数据库、Hue数据库

1)启动数据库

[root@hadoop102 ~]# mysql -uroot -p000000

2)集群监控数据库

mysql> create database amon DEFAULT CHARSET utf8 COLLATE

utf8_general_ci;

3)Hive数据库

mysql> create database hive DEFAULT CHARSET utf8 COLLATE

utf8_general_ci;

4)Oozie数据库

mysql> create database oozie DEFAULT CHARSET utf8 COLLATE

utf8_general_ci;

5)Hue数据库

mysql> create database hue

DEFAULT CHARSET utf8 COLLATE utf8_general_ci;

6)关闭数据库

mysql> quit;

1.2.8 下载第三方依赖

依次在三台节点(所有Agent的节点)上执行下载第三方依赖(注意:需要联网)

[root@hadoop102 ~]# yum -y

install chkconfig python bind-utils psmisc libxslt zlib sqlite cyrus-sasl-plain

cyrus-sasl-gssapi fuse fuse-libs redhat-lsb


[root@hadoop103 ~]# yum -y

install chkconfig python bind-utils psmisc libxslt zlib sqlite cyrus-sasl-plain

cyrus-sasl-gssapi fuse fuse-libs redhat-lsb


[root@hadoop104 ~]# yum -y

install chkconfig python bind-utils psmisc libxslt zlib sqlite cyrus-sasl-plain

cyrus-sasl-gssapi fuse fuse-libs redhat-lsb

1.2.9 关闭SELINUX

安全增强型Linux(Security-Enhanced Linux)简称SELinux,它是一个 Linux 内核模块,也是Linux的一个安全子系统。为了避免安装过程出现各种错误,建议关闭,有如下两种关闭方法:

1)临时关闭(不建议使用)

[root@hadoop102 ~]# setenforce 0

但是这种方式只对当次启动有效,重启机器后会失效。

2)永久关闭(建议使用)

修改配置文件/etc/selinux/config

[root@hadoop102 ~]# vim

/etc/selinux/config

将SELINUX=enforcing 改为SELINUX=disabled

SELINUX=disabled

3)同步/etc/selinux/config配置文件

[root@hadoop102 ~]# xsync

/etc/selinux/config

4)重启hadoop102、hadoop103、hadoop104主机

[root@hadoop102 ~]# reboot

[root@hadoop103 ~]# reboot

[root@hadoop104 ~]# reboot

1.2.10 配置NTP时钟同步

1)NTP服务器配置

[root@hadoop102 ~]# vi /etc/ntp.conf

①注释掉所有的restrict开头的配置

②修改#restrict

192.168.1.0 mask 255.255.255.0 nomodify notrap

为restrict

192.168.1.102 mask 255.255.255.0 nomodify notrap

③将所有server配置进行注释

④添加下面两行内容

server

127.127.1.0

fudge 127.127.1.0

stratum 10

2)启动NTP服务service ntpd start

[root@hadoop102 ~]# service ntpd start

3)NTP客户端配置(在agent主机上进行配置hadoop103,hadoop104)

[root@hadoop103 ~]# vi /etc/ntp.conf


①注释所有restrict和server配置

②添加server 192.168.1.102

4)手动测试

[root@hadoop103~]# ntpdate 192.168.1.102

显示如下内容为成功:

17 Jun 15:34:38 ntpdate[9247]: step time

server 192.168.1.102 offset 77556618.173854 sec

如果显示如下内容需要先关闭ntpd:

17 Jun 15:25:42 ntpdate[8885]: the NTP

socket is in use, exiting

5)启动ntpd并设置为开机自启(每个节点hadoop102,hadoop103,hadoop104)

[root@hadoop103 ~]#  chkconfig ntpd on

[root@hadoop103 ~]# service ntpd start

6)使用群发date命令查看结果

1.3 CM安装部署

1.3.1 CM下载地址

1)CM下载地址:http://archive.cloudera.com/cm5/cm/5/

2)离线库下载地址:http://archive.cloudera.com/cdh5/parcels

1.3.2 CM安装

注:以下所有操作均使用root用户

1)创建/opt/module/cm目录

[root@hadoop102 module]# mkdir –p

/opt/module/cm

2)上传cloudera-manager-el6-cm5.12.1_x86_64.tar.gz到hadoop102的/opt/software目录,并解压到/opt/module/cm目录

[root@hadoop102 software]# tar -zxvf

cloudera-manager-el6-cm5.12.1_x86_64.tar.gz -C /opt/module/cm

3)分别在hadoop102、hadoop103、hadoop104创建用户cloudera-scm

[root@hadoop102 module]#

useradd \

--system \

--home=/opt/module/cm/cm-5.12.1/run/cloudera-scm-server

\

--no-create-home \

--shell=/bin/false \

--comment "Cloudera SCM User"

cloudera-scm


[root@hadoop103 module]#

useradd \

--system \

--home=/opt/module/cm/cm-5.12.1/run/cloudera-scm-server

\

--no-create-home \

--shell=/bin/false \

--comment "Cloudera SCM User" cloudera-scm


[root@hadoop104 module]#

useradd \

--system \

--home=/opt/module/cm/cm-5.12.1/run/cloudera-scm-server

\

--no-create-home \

--shell=/bin/false \

--comment "Cloudera SCM User"

cloudera-scm

参数说明:

--system 创建一个系统账户

--home 指定用户登入时的主目录,替换系统默认值/home/<用户名>

--no-create-home 不要创建用户的主目录

--shell 用户的登录 shell 名

--comment 用户的描述信息

注意:Cloudera

Manager默认去找用户cloudera-scm,创建完该用户后,将自动使用此用户。

4)修改CM Agent配置

修改文件/opt/module/cm/cm-5.12.1/etc/cloudera-scm-agent/

config.ini的主机名称

[root@hadoop102 cloudera-scm-agent]# vim

/opt/module/cm/cm-5.12.1/etc/cloudera-scm-agent/config.ini

修改主机名称

server_host=hadoop102

5)配置CM的数据库

拷贝mysql-connector-java-5.1.27-bin.jar文件到目录/usr/share/java/    

[root@hadoop102 cm]# mkdir –p /usr/share/java/

[root@hadoop102 mysql-libs]# tar -zxvf

mysql-connector-java-5.1.27.tar.gz


[root@hadoop102 mysql-libs]# cp

/opt/software/mysql-libs/mysql-connector-java-5.1.27/mysql-connector-java-5.1.27-bin.jar

/usr/share/java/


[root@hadoop102 mysql-libs]# mv

/usr/share/java/mysql-connector-java-5.1.27-bin.jar

/usr/share/java/mysql-connector-java.jar

注意:jar包名称要修改为mysql-connector-java.jar

6

)使用CM自带的脚本,在MySQL中创建CM库

[root@hadoop102 cm-5.12.1]#

/opt/module/cm/cm-5.12.1/share/cmf/schema/scm_prepare_database.sh

mysql cm -hhadoop102-uroot -p000000 --scm-host hadoop102 scm

scm scm

参数说明

-h:Database host

-u:Databaseusername

-p:DatabasePassword

--scm-host:SCM server's hostname

7)分发cm

[root@hadoop102 module]# xsync

/opt/module/cm

8)创建Parcel-repo

[root@hadoop102 module]# mkdir -p /opt/cloudera/parcel-repo

[root@hadoop102 module]# chown

cloudera-scm:cloudera-scm /opt/cloudera/parcel-repo

9)拷贝下载文件manifest.json、CDH-5.12.1-1.cdh5.12.1.p0.3-el6.parcel.sha1、CDH-5.12.1-1.cdh5.12.1.p0.3-el6.parcel到hadoop102的/opt/cloudera/parcel-repo/目录下

[root@hadoop102 parcel-repo]# ls

CDH-5.12.1-1.cdh5.12.1.p0.3-el6.parcel CDH-5.12.1-1.cdh5.12.1.p0.3-el6.parcel.sha1 

manifest.json

10)将CDH-5.12.1-1.cdh5.12.1.p0.3-el6.parcel.sha1:需改名为

CDH-5.12.1-1.cdh5.12.1.p0.3-el6.parcel.sha

[root@hadoop102 parcel-repo]# mv

CDH-5.12.1-1.cdh5.12.1.p0.3-el6.parcel.sha1

CDH-5.12.1-1.cdh5.12.1.p0.3-el6.parcel.sha

11)在hadoop102上创建目录/opt/cloudera/parcels,并修改该目录的所属用户及用户组为cloudera-scm

[root@hadoop102 module]# mkdir -p

/opt/cloudera/parcels

[root@hadoop102 module]# chown

cloudera-scm:cloudera-scm /opt/cloudera/parcels

12)分发/opt/cloudera/

[root@hadoop102 opt]# xsync

/opt/cloudera/

1.3.3 启动CM服务

1)启动服务节点:hadoop102

[root@hadoop102 cm]# /opt/module/cm/cm-5.12.1/etc/init.d/cloudera-scm-server

start

Starting cloudera-scm-server:                              [确定]

2)启动工作节点:hadoop102、hadoop103、hadoop104

[root@hadoop102 cm]# /opt/module/cm/cm-5.12.1/etc/init.d/cloudera-scm-agent

start


[root@hadoop103 cm]#/opt/module/cm/cm-5.12.1/etc/init.d/cloudera-scm-agent start


[root@hadoop104 cm]#/opt/module/cm/cm-5.12.1/etc/init.d/cloudera-scm-agent start

注意:启动过程非常慢,Manager启动成功需要等待5分钟左右,过程中会在数据库中创建对应的表需要耗费一些时间。

3)查看被占用则表示安装成功了!!!

[root@hadoop102 cm]# netstat -anp | grep

7180

tcp       0      0 0.0.0.0:7180                0.0.0.0:*                   LISTEN      5498/java

4)访问http://hadoop102:7180,(用户名、密码:admin)

1.3.4 关闭CM服务

1)关闭工作节点:hadoop102、hadoop103、hadoop104

[root@hadoop102 cm]# /opt/module/cm/cm-5.12.1/etc/init.d/cloudera-scm-agent

stop

Stopping cloudera-scm-agent:                               [确定]

[root@hadoop103 cm]#

/opt/module/cm/cm-5.12.1/etc/init.d/cloudera-scm-agent stop

Stopping cloudera-scm-agent:                               [确定]

[root@hadoop104 cm]#

/opt/module/cm/cm-5.12.1/etc/init.d/cloudera-scm-agent stop

Stopping cloudera-scm-agent:                               [确定]

2)关闭服务节点:hadoop102

[root@hadoop102 cm]#

/opt/module/cm/cm-5.12.1/etc/init.d/cloudera-scm-server stop

停止cloudera-scm-server:                                 [确定]

1.4 CM的集群部署

1.4.1 接受条款和协议

1.4.2 选择免费

1.4.3 指定主机

1.4.4 选择CDH的版本5.12.1

1.4.5 等待下载安装

第2章数据采集模块

2.1 HDFS、YARN、Zookeeper安装

2.1.1 选择自定义安装

2.1.2 选择安装服务

2.1.3 分配节点

2.1.4 集群设置全部选默认即可

2.1.5 自动启动进程

2.1.6 修改HDFS的权限检查配置

关闭HDFS中的权限检查:dfs.permissions。

2.1.7 配置Hadoop支持LZO

1)点击主机,在下拉菜单中点击Parcel

2)点击配置

3)找到远程Parcel存储库URL,点击最后一栏的加号,增加一栏,输入http://archive.cloudera.com/gplextras/parcels/latest/,之后点击保存更改

4)返回Parcel列表,可以看到多出了LZO,选择下载,下载完成后选择分配,分配完成后选择激活。

5)安装完LZO后,打开HDFS配置,找到“压缩编码解码器”一栏,点击加号,添加com.hadoop.compression.lzo.LzopCodec后保存更改

6)打开YARN配置,找到MR 应用程序 Classpath,添加/opt/cloudera/parcels/HADOOP_LZO/lib/hadoop/lib/hadoop-lzo-cdh4-0.4.15-gplextras.jar

7)更新过期配置,重启进程


2.2 Flume安装

2.2.1 日志采集Flume安装

1)添加服务

2)选择Flume,点击继续

3)选择节点

4)完成

2.2.2 日志采集Flume配置

1)Flume配置分析

Flume直接读log日志的数据,log日志的格式是app-yyyy-mm-dd.log。

2)Flume的具体配置如下:

(1)在CM管理页面上点击Flume

(2)在实例页面选择hadoop102上的Agent

(3)在CM管理页面hadoop102上Flume的配置中找到代理名称改为a1

(4)在配置文件如下内容(flume-kafka)

a1.sources=r1

a1.channels=c1 c2

a1.sinks=k1 k2


# configure source

a1.sources.r1.type = TAILDIR

a1.sources.r1.positionFile =

/opt/module/flume/log_position.json

a1.sources.r1.filegroups = f1

a1.sources.r1.filegroups.f1 = /tmp/logs/app.+

a1.sources.r1.fileHeader = true

a1.sources.r1.channels = c1 c2


#interceptor

a1.sources.r1.interceptors = i1 i2

a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.LogETLInterceptor$Builder

a1.sources.r1.interceptors.i2.type = com.atguigu.flume.interceptor.LogTypeInterceptor$Builder


# selector

a1.sources.r1.selector.type =

multiplexing

a1.sources.r1.selector.header = topic

a1.sources.r1.selector.mapping.topic_start = c1

a1.sources.r1.selector.mapping.topic_event = c2


# configure channel

a1.channels.c1.type = memory

a1.channels.c1.capacity=10000

a1.channels.c1.byteCapacityBufferPercentage=20


a1.channels.c2.type = memory

a1.channels.c2.capacity=10000

a1.channels.c2.byteCapacityBufferPercentage=20


# configure sink

# start-sink

a1.sinks.k1.type =

org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.kafka.topic = topic_start

a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092

a1.sinks.k1.kafka.flumeBatchSize = 2000

a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k1.channel = c1


# event-sink

a1.sinks.k2.type =

org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k2.kafka.topic = topic_event

a1.sinks.k2.kafka.bootstrap.servers =

hadoop102:9092,hadoop103:9092,hadoop104:9092

a1.sinks.k2.kafka.flumeBatchSize = 2000

a1.sinks.k2.kafka.producer.acks = 1

a1.sinks.k2.channel = c2

            注意:com.atguigu.flume.interceptor.LogETLInterceptor和com.atguigu.flume.interceptor.LogTypeInterceptor是自定义的拦截器的全类名。需要根据用户自定义的拦截器做相应修改。

(3)修改/opt/module/flume/log_position.json的读写权限

[root@hadoop102 module]# mkdir -p

/opt/module/flume

[root@hadoop102 flume]# touch

log_position.json

[root@hadoop102 flume]# chmod 777

log_position.json

[root@hadoop102 module]# xsync flume/

注意:Json文件的父目录一定要创建好,并改好权限

2.2.3 Flume拦截器

本项目中自定义了两个拦截器,分别是:ETL拦截器、日志类型区分拦截器。

ETL拦截器主要用于,过滤时间戳不合法和Json数据不完整的日志

日志类型区分拦截器主要用于,将启动日志和事件日志区分开来,方便发往Kafka的不同Topic。

1)创建Maven工程flume-interceptor

2)创建包名:com.atguigu.flume.interceptor

3)在pom.xml文件中添加如下配置

<dependencies>

    <dependency>

        <groupId>org.apache.flume</groupId>

        <artifactId>flume-ng-core</artifactId>

        <version>1.7.0</version>

    </dependency>

</dependencies>

<build>

    <plugins>

        <plugin>

            <artifactId>maven-compiler-plugin</artifactId>

            <version>2.3.2</version>

            <configuration>

                <source>1.8</source>

                <target>1.8</target>

            </configuration>

        </plugin>

        <plugin>

            <artifactId>maven-assembly-plugin</artifactId>

            <configuration>

                <descriptorRefs>

                    <descriptorRef>jar-with-dependencies</descriptorRef>

                </descriptorRefs>

            </configuration>

            <executions>

                <execution>

                    <id>make-assembly</id>

                    <phase>package</phase>

                    <goals>

                        <goal>single</goal>

                    </goals>

                </execution>

            </executions>

        </plugin>

    </plugins>

</build>

4)在com.atguigu.flume.interceptor包下创建LogETLInterceptor类名

Flume ETL拦截器LogETLInterceptor

package com.atguigu.flume.interceptor;

import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.Charset;import java.util.ArrayList;import java.util.List;

public class LogETLInterceptor implements Interceptor {

    @Override

    public void initialize() {

    }

    @Override

    public Event intercept(Event event) {

        // 1 获取数据

        byte[] body = event.getBody();

        String log =new String(body, Charset.forName("UTF-8"));

        // 2 判断数据类型并向Header中赋值

        if (log.contains("start")) {

            if (LogUtils.validateStart(log)){

                return event;

            }

        }else {

            if (LogUtils.validateEvent(log)){

                return event;

            }

        }

        // 3 返回校验结果

        return null;

    }

    @Override

    public Listintercept(List events) {

        ArrayListinterceptors =new ArrayList<>();

        for (Event event : events) {

            Event intercept1 =intercept(event);

            if (intercept1 != null){

               interceptors.add(intercept1);

            }

        }

       return interceptors;

    }

    @Override

    public void close() {

    }

    public static class Builder implements Interceptor.Builder{

        @Override

        public Interceptor build() {

            return new LogETLInterceptor();

        }

        @Override

        public void configure(Context context) {

        }

    }

}


4)Flume日志过滤工具类

package com.atguigu.flume.interceptor;import org.apache.commons.lang.math.NumberUtils;

public class LogUtils {

    public static boolean validateEvent(String log) {

        // 服务器时间| json

        // 1549696569054 |{"cm":{"ln":"-89.2","sv":"V2.0.4","os":"8.2.0","g":"M67B4QYU@gmail.com","nw":"4G","l":"en","vc":"18","hw":"1080*1920","ar":"MX","uid":"u8678","t":"1549679122062","la":"-27.4","md":"sumsung-12","vn":"1.1.3","ba":"Sumsung","sr":"Y"},"ap":"weather","et":[]}

        // 1切割

        String[] logContents = log.split("\\|");

        // 2 校验

        if(logContents.length != 2){

            return false;

        }

        //3 校验服务器时间

        if (logContents[0].length()!=13 || !NumberUtils.isDigits(logContents[0])){

            return false;

        }

        // 4 校验json

        if (!logContents[1].trim().startsWith("{") || !logContents[1].trim().endsWith("}")){

            return false;

        }

        return true;

    }

    public static boolean validateStart(String log) {

        if (log == null){

            return false;

        }

        // 校验json

        if (!log.trim().startsWith("{") || !log.trim().endsWith("}")){

            return false;

        }

        return true;

    }

}


5)Flume日志类型区分拦截器LogTypeInterceptor

package com.atguigu.flume.interceptor;

import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.Charset;import java.util.ArrayList;import java.util.List;import java.util.Map;

public class LogTypeInterceptor implements Interceptor {

    @Override

    public void initialize() {

    }

    @Override

    public Event intercept(Event event) {

        // 区分日志类型:  body  header

        // 1获取body数据

        byte[] body = event.getBody();

        String log =new String(body, Charset.forName("UTF-8"));

        // 2 获取header

        Map headers =event.getHeaders();

        // 3 判断数据类型并向Header中赋值

       if (log.contains("start")) {

            headers.put("topic","topic_start");

        }else {

            headers.put("topic","topic_event");

        }

        return event;

    }

    @Override

    public List intercept(Listevents) {

        ArrayListinterceptors =new ArrayList<>();

        for (Event event : events) {

            Event intercept1 =intercept(event);

            interceptors.add(intercept1);

        }

        return interceptors;

    }

    @Override

    public void close() {

    }

    public static class Builder implements  Interceptor.Builder{

        @Override

        public Interceptor build() {

            return new LogTypeInterceptor();

        }

        @Override

        public void configure(Context context) {

        }

    }

}

6)打包

拦截器打包之后,只需要单独包,不需要将依赖的包上传。打包之后要放入flume的lib文件夹下面。

注意:为什么不需要依赖包?因为依赖包在flume的lib目录下面已经存在了。

7)采用root用户将flume-interceptor-1.0-SNAPSHOT.jar包放入到hadoop102的/opt/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/flume-ng/lib/文件夹下面。

[root@hadoop102

lib]# ls | grep interceptor

flume-interceptor-1.0-SNAPSHOT.jar

8)分发Flume到hadoop103

[root@hadoop102

lib]$ xsync flume-interceptor-1.0-SNAPSHOT.jar

9)启动Flume

10)查看Flume日志

2.3 Kafka安装

可以选择在线安装和离线包安装,在线安装下载时间较长,离线包安装时间较短。这里我们为了节省时间,选择离线安装。

2.3.1 导入离线包

1)在hadoop102上创建/opt/cloudera/csd目录

[root@hadoop102 parcel-repo]# mkdir -p

/opt/cloudera/csd

2)上传KAFKA-1.2.0.jar到/opt/cloudera/csd目录,并修改所有者和所有者的组

[root@hadoop102 cloudera]# chown

cloudera-scm:cloudera-scm /opt/cloudera/csd/ -R

3)上传KAFKA-4.0.0-1.4.0.0.p0.1-el6.parcel、KAFKA-4.0.0-1.4.0.0.p0.1-el6.parcel.sha1到/opt/cloudera/parcel-repo目录,并修改KAFKA-4.0.0-1.4.0.0.p0.1-el6.parcel.sha1名称为KAFKA-4.0.0-1.4.0.0.p0.1-el6.parcel.sha

[root@hadoop102 parcel-repo]# mv

/opt/cloudera/parcel-repo/KAFKA-4.0.0-1.4.0.0.p0.1-el6.parcel.sha1

/opt/cloudera/parcel-repo/KAFKA-4.0.0-1.4.0.0.p0.1-el6.parcel.sha

4)ClouderManager中选择Parcel->检查Parcel->Kafka点击分配->激活

5)ClouderManager中选择Parcel->检查Parcel->Kafka点击分配->激活

2.3.2 在线下载安装包(不选)

1)点击主机,选择Parcel

2)找到Kafka点击下载,下载完成后点击分配进行分配,然后点击激活,出现已分配,已激活则证明分配激活成功

2.3.3 Kafka安装

[if !supportLists]3)  [endif]回到首页,点击添加服务

4)选择Kafka,点击继续

5)Kafka的Broker选择三台机器

6)修改Kafka的堆大小为256M

7)完成

8)重新部署客户端配置

9)点击重启过时服务

10)等待重启,重启完成后点击完成

2.3.4 查看Kafka Topic

[root@hadoop102 KAFKA-4.0.0-1.4.0.0.p0.1]#

/opt/cloudera/parcels/KAFKA-4.0.0-1.4.0.0.p0.1/bin/kafka-topics--zookeeper hadoop102:2181 --list

2.2.5 创建Kafka Topic

进入到/opt/cloudera/parcels/KAFKA-4.0.0-1.4.0.0.p0.1目录下分别创建:启动日志主题、事件日志主题。

1)创建启动日志主题

[root@hadoop102 KAFKA-4.0.0-1.4.0.0.p0.1]$ bin/kafka-topics--zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181  --create --replication-factor 1 --partitions1 --topictopic_start

2)创建事件日志主题

[root@hadoop102 KAFKA-4.0.0-1.4.0.0.p0.1]$ bin/kafka-topics--zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181  --create --replication-factor 1 --partitions1 --topictopic_event

2.3.6 删除Kafka Topic

1)删除启动日志主题

[root@hadoop102 KAFKA-4.0.0-1.4.0.0.p0.1]$ bin/kafka-topics

--delete --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181 --topic topic_start

2)删除事件日志主题

[root@hadoop102 KAFKA-4.0.0-1.4.0.0.p0.1]$ bin/kafka-topics

--delete --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181 --topic topic_event

2.3.7 生产消息

[root@hadoop102 KAFKA-4.0.0-1.4.0.0.p0.1]$

bin/kafka-console-producer --broker-listhadoop102:9092 --topic topic_start

>hello world

>atguigu atguigu

2.3.8 消费消息

[root@hadoop103 KAFKA-4.0.0-1.4.0.0.p0.1]$

bin/kafka-console-consumer \

--bootstrap-server hadoop102:9092--from-beginning --topic topic_start

--from-beginning:会把first主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。

2.3.9 查看某个Topic的详情

[root@hadoop102 KAFKA-4.0.0-1.4.0.0.p0.1]$ bin/kafka-topics--zookeeper hadoop102:2181 --describe--topic topic_start

2.4 Flume消费Kafka数据写到HDFS

1)集群规划

 服务器hadoop102服务器hadoop103服务器hadoop104

Flume(消费Kafka)  Flume

2)Flume配置分析

3)Flume的具体配置如下:

       (1)在CM管理页面hadoop104上Flume的配置中找到代理名称

a1

在配置文件如下内容(kafka-hdfs)

## 组件

a1.sources=r1 r2

a1.channels=c1 c2

a1.sinks=k1 k2


## source1

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource

a1.sources.r1.batchSize = 5000

a1.sources.r1.batchDurationMillis = 2000

a1.sources.r1.kafka.bootstrap.servers =hadoop102:9092,hadoop103:9092,hadoop104:9092

a1.sources.r1.kafka.topics=topic_start


## source2

a1.sources.r2.type =org.apache.flume.source.kafka.KafkaSource

a1.sources.r2.batchSize = 5000

a1.sources.r2.batchDurationMillis = 2000

a1.sources.r2.kafka.bootstrap.servers =hadoop102:9092,hadoop103:9092,hadoop104:9092

a1.sources.r2.kafka.topics=topic_event


## channel1

a1.channels.c1.type=memory

a1.channels.c1.capacity=100000

a1.channels.c1.transactionCapacity=10000


## channel2

a1.channels.c2.type=memory

a1.channels.c2.capacity=100000

a1.channels.c2.transactionCapacity=10000


## sink1

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d

a1.sinks.k1.hdfs.filePrefix = logstart-

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue = 10

a1.sinks.k1.hdfs.roundUnit = second


##sink2

a1.sinks.k2.type = hdfs

a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d

a1.sinks.k2.hdfs.filePrefix = logevent-

a1.sinks.k2.hdfs.round = true

a1.sinks.k2.hdfs.roundValue = 10

a1.sinks.k2.hdfs.roundUnit = second


## 不要产生大量小文件

a1.sinks.k1.hdfs.rollInterval = 10

a1.sinks.k1.hdfs.rollSize = 134217728

a1.sinks.k1.hdfs.rollCount = 0


a1.sinks.k2.hdfs.rollInterval = 10

a1.sinks.k2.hdfs.rollSize = 134217728

a1.sinks.k2.hdfs.rollCount = 0


## 控制输出文件是原生文件。

a1.sinks.k1.hdfs.fileType = CompressedStream

a1.sinks.k2.hdfs.fileType = CompressedStream


a1.sinks.k1.hdfs.codeC = lzop

a1.sinks.k2.hdfs.codeC = lzop


## 拼装

a1.sources.r1.channels = c1

a1.sinks.k1.channel= c1


a1.sources.r2.channels = c2

a1.sinks.k2.channel= c2

2.5 日志生成数据传输到HDFS

1)将log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar上传都hadoop102的/opt/module目录

2)分发log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar到hadoop103

[root@hadoop102

module]# xsync log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar

3)在/root/bin目录下创建脚本lg.sh

[root@hadoop102 bin]$ vim lg.sh

       4)在脚本中编写如下内容

#! /bin/bash


   for iin hadoop102 hadoop103

   do

      ssh$i "java -classpath/opt/module/log-collector-1.0-SNAPSHOT-jar-with-dependencies.jarcom.atguigu.appclient.AppMain$1 $2>/opt/module/test.log &"

   done

5)修改脚本执行权限

[root@hadoop102 bin]$ chmod 777 lg.sh

6)启动脚本

[root@hadoop102 module]$ lg.sh

第3章 数仓搭建环境准备

3.1 Hive安装

1)添加服务

2)添加Hive服务

3)将 Hive 服务添加到Cluster1

4)配置hive元数据

5)测试通过后继续

6)自动启动Hive进程

7)修改Hive配置

/opt/cloudera/parcels/HADOOP_LZO/lib/hadoop/lib/

3.2 Oozie安装

3.2.1 添加Oozie服务

3.2.2 选择集群节点

3.2.3 选择有MySQL的节点安装

3.2.4 链接数据库

3.2.5 一路继续到完成

3.3 Hue安装

3.3.1 Hue概述

1)Hue来源

HUE=Hadoop User Experience(Hadoop用户体验),直白来说就一个开源的Apache Hadoop UI系统,由Cloudera Desktop演化而来,最后Cloudera公司将其贡献给Apache基金会的Hadoop社区,它是基于Python Web框架Django实现的。通过使用HUE我们可以在浏览器端的Web控制台上与Hadoop集群进行交互来分析处理数据。

2)Hue官网及使用者

官网网站:http://gethue.com/

3.3.2 安装前的准备

1)在LoadBalancer节点安装mod_ssl

[root@hadoop102 ~]# yum -y install mod_ssl

2)查看/usr/lib64/mysql下有没有libmysqlclient_r.so.16,如果没有,上传libmysqlclient_r.so.16到/usr/lib64/mysql,并软链接到/usr/lib64/

[root@hadoop102 ~]# ls /usr/lib64/mysql

[root@hadoop103mysql]#

scp /usr/lib64/mysql/libmysqlclient_r.so.16

root@hadoop102:/usr/lib64/mysql/


[root@hadoop102 ~]ln -s /usr/lib64/mysql/libmysqlclient_r.so.16

/usr/lib64/libmysqlclient_r.so.16

3.3.3 HUE安装步骤

1)添加HUE服务

2)选择集群节点

3)分配角色

4)配置数据库

5)安装完成

6)HUE页面

http://hadoop102:8888(未优化)或http://hadoop102:8889(优化)

第一次开启HUE会出现以下页面,此时输入的用户名和密码可以随意,之后登录页面以第一次输入的账号密码为依据。例如,用户名:admin 密码:admin

第4章 用户行为数仓搭建

4.1 ODS层

原始数据层,存放原始数据,直接加载原始日志、数据,数据保持原貌不做处理。

4.1.1 创建数据库

1)创建gmall数据库

hive (default)> create database gmall;

说明:如果数据库存在且有数据,需要强制删除时执行:drop database gmall cascade;

2)使用gmall数据库

hive (default)> use gmall;

4.1.2 创建启动日志表ods_start_log

1)创建输入数据是lzo输出是text,支持json解析的分区表

hive (gmall)>

drop table if exists ods_start_log;

CREATE EXTERNAL TABLE ods_start_log (`line`string)

PARTITIONED BY (`dt` string)

STORED AS

  INPUTFORMAT'com.hadoop.mapred.DeprecatedLzoTextInputFormat'

 OUTPUTFORMAT'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'

LOCATION'/warehouse/gmall/ods/ods_start_log';

说明Hive的LZO压缩:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LZO

4.1.3 ODS层加载数据脚本

1)在hadoop102的/root/bin目录下创建脚本

[root@hadoop102 bin]$ vim ods_log.sh

       在脚本中编写如下内容

#!/bin/bash


# 定义变量方便修改

APP=gmall


# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天

if [ -n "$1" ] ;then

   do_date=$1

else

   do_date=`date -d "-1 day" +%F`

fi


echo "===日志日期为$do_date==="

sql="

load data inpath '/origin_data/gmall/log/topic_start/$do_date'into table "$APP".ods_start_log partition(dt='$do_date');


"


hive -e "$sql"

说明1:

[ -n 变量值] 判断变量的值,是否为空

-- 变量的值,非空,返回true

-- 变量的值,为空,返回false

说明2:

查看date命令的使用,[root@hadoop102 ~]$date --help

2)增加脚本执行权限

[root@hadoop102 bin]$ chmod 777 ods_log.sh

3)脚本使用

[root@hadoop102 module]$ ods_log.sh 2019-02-10

4.2 DWD层启动表数据解析

4.2.1 创建启动表

1)建表语句

hive (gmall)>

drop table if exists dwd_start_log;

CREATE EXTERNAL TABLE dwd_start_log(

`mid_id` string,

`user_id` string,

`version_code` string,

`version_name` string,

`lang` string,

`source` string,

`os` string,

`area` string,

`model` string,

`brand` string,

`sdk_version` string,

`gmail` string,

`height_width` string, 

`app_time` string,

`network` string,

`lng` string,

`lat` string,

`entry` string,

`open_ad_type`string,

`action` string,

`loading_time`string,

`detail` string,

`extend1` string

)

PARTITIONED BY (dt string)

location '/warehouse/gmall/dwd/dwd_start_log/';

4.2.2 DWD层启动表加载数据脚本

1)在hadoop102的/root/bin目录下创建脚本

[root@hadoop102 bin]$ vim dwd_start_log.sh

       在脚本中编写如下内容

#!/bin/bash


# 定义变量方便修改

APP=gmall


# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天

if [ -n "$1" ] ;then

   do_date=$1

else

   do_date=`date-d "-1 day" +%F` 

fi


sql="

set hive.exec.dynamic.partition.mode=nonstrict;


insert overwrite table "$APP".dwd_start_log

PARTITION (dt='$do_date')

select

   get_json_object(line,'$.mid') mid_id,

   get_json_object(line,'$.uid') user_id,

   get_json_object(line,'$.vc') version_code,

    get_json_object(line,'$.vn') version_name,

   get_json_object(line,'$.l') lang,

   get_json_object(line,'$.sr') source,

   get_json_object(line,'$.os') os,

   get_json_object(line,'$.ar') area,

   get_json_object(line,'$.md') model,

    get_json_object(line,'$.ba')brand,

   get_json_object(line,'$.sv') sdk_version,

   get_json_object(line,'$.g') gmail,

   get_json_object(line,'$.hw') height_width,

   get_json_object(line,'$.t') app_time,

   get_json_object(line,'$.nw') network,

    get_json_object(line,'$.ln')lng,

   get_json_object(line,'$.la') lat,

   get_json_object(line,'$.entry') entry,

   get_json_object(line,'$.open_ad_type') open_ad_type,

   get_json_object(line,'$.action') action,

   get_json_object(line,'$.loading_time') loading_time,

   get_json_object(line,'$.detail') detail,

   get_json_object(line,'$.extend1') extend1

from "$APP".ods_start_log

where dt='$do_date';

"


hive -e "$sql"

2)增加脚本执行权限

[root@hadoop102 bin]$ chmod 777 dwd_start_log.sh

3)脚本使用

[root@hadoop102 module]$ dwd_start_log.sh2019-02-10

4.3 DWS层(需求:用户日活跃)

目标:统计当日、当周、当月活动的每个设备明细

4.3.1 每日活跃设备明细

1)建表语句

hive (gmall)>

drop table if exists dws_uv_detail_day;

create external table dws_uv_detail_day

(

    `mid_id` string COMMENT '设备唯一标识',

   `user_id` string COMMENT '用户标识',

   `version_code` string COMMENT '程序版本号',

   `version_name` string COMMENT '程序版本名',

   `lang` string COMMENT '系统语言',

   `source` string COMMENT '渠道号',

   `os` string COMMENT '安卓系统版本',

   `area` string COMMENT '区域',

   `model` string COMMENT '手机型号',

   `brand` string COMMENT '手机品牌',

   `sdk_version` string COMMENT 'sdkVersion',

   `gmail` string COMMENT 'gmail',

   `height_width` string COMMENT '屏幕宽高',

   `app_time` string COMMENT '客户端日志产生时的时间',

   `network` string COMMENT '网络模式',

   `lng` string COMMENT '经度',

   `lat` string COMMENT '纬度'

)

partitioned by(dt string)

stored as parquet

location '/warehouse/gmall/dws/dws_uv_detail_day'

;

4.3.2 DWS层加载数据脚本

1)在hadoop102的/root/bin目录下创建脚本

[root@hadoop102 bin]$ vim dws_log.sh

       在脚本中编写如下内容

#!/bin/bash


# 定义变量方便修改

APP=gmall


# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天

if [ -n "$1" ] ;then

    do_date=$1

else

    do_date=`date-d "-1 day" +%F` 

fi



sql="

  sethive.exec.dynamic.partition.mode=nonstrict;


 insert overwrite table "$APP".dws_uv_detail_daypartition(dt='$do_date')

 select 

   mid_id,

   concat_ws('|', collect_set(user_id)) user_id,

   concat_ws('|', collect_set(version_code)) version_code,

   concat_ws('|', collect_set(version_name)) version_name,

   concat_ws('|', collect_set(lang)) lang,

   concat_ws('|', collect_set(source)) source,

   concat_ws('|', collect_set(os)) os,

   concat_ws('|', collect_set(area)) area,

   concat_ws('|', collect_set(model)) model,

   concat_ws('|', collect_set(brand)) brand,

   concat_ws('|', collect_set(sdk_version)) sdk_version,

   concat_ws('|', collect_set(gmail)) gmail,

   concat_ws('|', collect_set(height_width)) height_width,

   concat_ws('|', collect_set(app_time)) app_time,

   concat_ws('|', collect_set(network)) network,

   concat_ws('|', collect_set(lng)) lng,

   concat_ws('|', collect_set(lat)) lat

  from"$APP".dwd_start_log

  wheredt='$do_date' 

  groupby mid_id;

"


hive -e "$sql"

2)增加脚本执行权限

[root@hadoop102 bin]$ chmod 777 dws_log.sh

3)脚本使用

[root@hadoop102 module]$ dws_log.sh 2019-02-10

4.4 ADS层(需求:用户日活跃)

目标:当日活跃设备数

4.4.1 活跃设备数

1)建表语句

hive (gmall)>

drop table if exists ads_uv_count;

create external table ads_uv_count(

    `dt` string COMMENT '统计日期',

    `day_count` bigint COMMENT '当日用户数量'

) COMMENT '活跃设备数'

row format delimited fields terminated by

'\t'

location '/warehouse/gmall/ads/ads_uv_count/'

;

4.4.2 ADS层加载数据脚本

1)在hadoop102的/root/bin目录下创建脚本

[root@hadoop102 bin]$ vim ads_uv_log.sh

       在脚本中编写如下内容

#!/bin/bash


# 定义变量方便修改

APP=gmall


# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天

if [ -n "$1" ] ;then

   do_date=$1

else

   do_date=`date-d "-1 day" +%F` 

fi


sql="

  sethive.exec.dynamic.partition.mode=nonstrict;


insert into table"$APP".ads_uv_count

select 

 '$do_date' dt,

  daycount.ct

from

(

  select 

     '$do_date' dt,

      count(*) ct

   from"$APP".dws_uv_detail_day

  where dt='$do_date' 

)daycount;

"


hive -e "$sql"

2)增加脚本执行权限

[root@hadoop102 bin]$ chmod 777 ads_uv_log.sh

3)脚本使用

[root@hadoop102 module]$ ads_uv_log.sh2019-02-10

第5章 业务数仓搭建

5.1 业务数据生成

5.1.1 建表语句

1)通过SQLyog创建数据库gmall

2)设置数据库编码

3)导入建表语句(1建表脚本)

选择->1建表脚本.sql

4)重复步骤3的导入方式,依次导入:2商品分类数据插入脚本、3函数脚本、4存储过程脚本。

5.1.2 生成业务数据

1)生成业务数据函数说明

       init_data ( do_date_string VARCHAR(20) , order_incr_numINT, user_incr_num INT , sku_num INT , if_truncate BOOLEAN  ):

       参数一:do_date_string生成数据日期

       参数二:order_incr_num订单id个数

       参数三:user_incr_num用户id个数

       参数四:sku_num商品sku个数

       参数五:if_truncate是否删除数据

2)案例测试:

(1)需求:生成日期2019年2月10日数据、订单1000个、用户200个、商品sku300个、删除原始数据。

CALL init_data('2019-02-10',1000,200,300,TRUE);

(2)查询生成数据结果

SELECT * from base_category1;

SELECT * from base_category2;

SELECT * from base_category3;


SELECT * from order_info;

SELECT * from order_detail;


SELECT * from sku_info;

SELECT * from user_info;


SELECT * from payment_info;

5.2 业务数据导入数仓

5.2.1 Sqoop安装

1)添加服务

2)选择Sqoop

目前选择sqoop1即可

3)选择节点

5.2.2 Sqoop定时导入脚本

1)在/root/bin目录下创建脚本sqoop_import.sh

[root@hadoop102

bin]$ vim sqoop_import.sh

       在脚本中填写如下内容

#!/bin/bash


db_date=$2

echo

$db_date

db_name=gmall


import_data()

{

sqoop

import \

--connect

jdbc:mysql://hadoop102:3306/$db_name \

--username

root \

--password

000000 \

--target-dir  /origin_data/$db_name/db/$1/$db_date \

--delete-target-dir

\

--num-mappers

1 \

--fields-terminated-by

"\t" \

--query

"$2"'and  $CONDITIONS;'

}


import_sku_info(){

  import_data "sku_info"  "select

id, spu_id, price, sku_name, sku_desc,

weight, tm_id,

category3_id, create_time

  from sku_info where 1=1"

}


import_user_info(){

  import_data "user_info""select

id, name, birthday, gender, email,

user_level,

create_time

from user_info where 1=1"

}


import_base_category1(){

  import_data "base_category1""select

id, name from base_category1 where 1=1"

}


import_base_category2(){

  import_data "base_category2""select

id, name, category1_id from

base_category2 where

1=1"

}


import_base_category3(){

  import_data "base_category3""select id, name, category2_id from base_category3where 1=1"

}


import_order_detail(){

  import_data  "order_detail"  "select

    od.id,

    order_id,

    user_id,

    sku_id,

    sku_name,

    order_price,

    sku_num,

    o.create_time 

  from order_info o , order_detail od

  whereo.id=od.order_id

  andDATE_FORMAT(create_time,'%Y-%m-%d')='$db_date'"

}


import_payment_info(){

  import_data "payment_info"   "select

    id, 

    out_trade_no,

    order_id,

    user_id,

    alipay_trade_no,

    total_amount, 

    subject,

    payment_type,

    payment_time

  from payment_info

  whereDATE_FORMAT(payment_time,'%Y-%m-%d')='$db_date'"

}


import_order_info(){

  import_data  "order_info"  "select

    id,

    total_amount,

    order_status,

    user_id,

    payment_way,

    out_trade_no,

    create_time,

    operate_time 

  from order_info

  where  (DATE_FORMAT(create_time,'%Y-%m-%d')='$db_date' or DATE_FORMAT(operate_time,'%Y-%m-%d')='$db_date')"

}


case

$1 in

  "base_category1")

     import_base_category1

;;

  "base_category2")

     import_base_category2

;;

  "base_category3")

     import_base_category3

;;

  "order_info")

     import_order_info

;;

  "order_detail")

     import_order_detail

;;

  "sku_info")

     import_sku_info

;;

  "user_info")

     import_user_info

;;

  "payment_info")

     import_payment_info

;;

   "all")

  import_base_category1

  import_base_category2

  import_base_category3

  import_order_info

  import_order_detail

  import_sku_info

  import_user_info

  import_payment_info

;;

esac

2)增加脚本执行权限

[root@hadoop102 bin]$ chmod 777 sqoop_import.sh

3)执行脚本导入数据

[root@hadoop102 bin]$ sqoop_import.sh all2019-02-10

5.3 ODS层

完全仿照业务数据库中的表字段,一模一样的创建ODS层对应表。

5.3.1 创建订单表

hive (gmall)>

drop table if exists ods_order_info;

create external table ods_order_info (

  `id` string COMMENT '订单编号',

  `total_amount` decimal(10,2) COMMENT '订单金额',

  `order_status` string COMMENT '订单状态',

 `user_id` string COMMENT '用户id' ,

    `payment_way` string

COMMENT '支付方式',  

    `out_trade_no` string

COMMENT '支付流水号',  

    `create_time` string

COMMENT '创建时间',  

    `operate_time` string

COMMENT '操作时间'

) COMMENT '订单表'

PARTITIONED BY ( `dt` string)

row format delimited  fields terminated by '\t'

location '/warehouse/gmall/ods/ods_order_info/'

;

5.3.2 创建订单详情表

hive (gmall)>

drop table if exists ods_order_detail;

create external table ods_order_detail(

  `id` string COMMENT '订单编号',

  `order_id` string  COMMENT '订单号',

 `user_id` string COMMENT '用户id' ,

    `sku_id` string

COMMENT '商品id',  

    `sku_name` string

COMMENT '商品名称',  

    `order_price` string

COMMENT '商品价格',  

    `sku_num` string

COMMENT '商品数量',  

    `create_time` string

COMMENT '创建时间'

) COMMENT '订单明细表'

PARTITIONED BY ( `dt` string)

row format delimited  fields terminated by '\t'

location '/warehouse/gmall/ods/ods_order_detail/'

;

5.3.3 创建商品表

hive (gmall)>

drop table if exists ods_sku_info;

create external table ods_sku_info(

  `id` string COMMENT 'skuId',

  `spu_id` string   COMMENT 'spuid',

 `price` decimal(10,2) COMMENT '价格' ,

    `sku_name` string

COMMENT '商品名称',  

    `sku_desc` string

COMMENT '商品描述',  

    `weight` string

COMMENT '重量',  

    `tm_id` string COMMENT

'品牌id',  

    `category3_id` string

COMMENT '品类id',  

    `create_time` string

COMMENT '创建时间'

) COMMENT '商品表'

PARTITIONED BY ( `dt` string)

row format delimited  fields terminated by '\t'

location '/warehouse/gmall/ods/ods_sku_info/'

;

5.3.4 创建用户表

hive (gmall)>

drop table if exists ods_user_info;

create external table ods_user_info(

  `id` string COMMENT '用户id',

  `name` string COMMENT '姓名',

 `birthday` string COMMENT '生日' ,

    `gender` string

COMMENT '性别',  

    `email` string COMMENT

'邮箱',  

    `user_level` string

COMMENT '用户等级',  

    `create_time` string

COMMENT '创建时间'

) COMMENT '用户信息'

PARTITIONED BY ( `dt` string)

row format delimited  fields terminated by '\t'

location '/warehouse/gmall/ods/ods_user_info/'

;

5.3.5 创建商品一级分类表

hive (gmall)>

drop table if exists ods_base_category1;

create external table ods_base_category1(

  `id` string COMMENT 'id',

  `name` string COMMENT '名称'

) COMMENT '商品一级分类'

PARTITIONED BY ( `dt` string)

row format delimited  fields terminated by '\t'

location '/warehouse/gmall/ods/ods_base_category1/'

;

5.3.6 创建商品二级分类表

hive (gmall)>

drop table if exists ods_base_category2;

create external table ods_base_category2(

  `id` string COMMENT ' id',

  `name` string COMMENT '名称',

 category1_id string COMMENT '一级品类id'

) COMMENT '商品二级分类'

PARTITIONED BY ( `dt` string)

row format delimited  fields terminated by '\t'

location '/warehouse/gmall/ods/ods_base_category2/'

;

5.3.7 创建商品三级分类表

hive (gmall)>

drop table if exists ods_base_category3;

create external table ods_base_category3(

  `id` string COMMENT ' id',

  `name` string COMMENT '名称',

 category2_id string COMMENT '二级品类id'

) COMMENT '商品三级分类'

PARTITIONED BY ( `dt` string)

row format delimited  fields terminated by '\t'

location '/warehouse/gmall/ods/ods_base_category3/'

;

5.3.8 创建支付流水表

hive (gmall)>

drop table if exists `ods_payment_info`;

create external table  `ods_payment_info`(

    `id`  

bigint COMMENT '编号',

`out_trade_no` string COMMENT '对外业务编号',

    `order_id`

       string COMMENT '订单编号',

    `user_id`

        string COMMENT '用户编号',

    `alipay_trade_no`

string COMMENT '支付宝交易流水编号',

    `total_amount`

   decimal(16,2) COMMENT '支付金额',

    `subject`

        string COMMENT '交易内容',

    `payment_type` string

COMMENT '支付类型',

    `payment_time`

  string COMMENT '支付时间'

)  COMMENT '支付流水表'

PARTITIONED BY ( `dt` string)

row format delimited  fields terminated by '\t'

location '/warehouse/gmall/ods/ods_payment_info/'

;

5.3.9 ODS层数据导入脚本

1)在/root/bin目录下创建脚本ods_db.sh

[root@hadoop102

bin]$ vim ods_db.sh

       在脚本中填写如下内容

#!/bin/bash


  APP=gmall


# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天

if [ -n "$1" ] ;then

    do_date=$1

else

    do_date=`date-d "-1 day" +%F` 

fi


sql="

load data inpath '/origin_data/$APP/db/order_info/$do_date'  OVERWRITE into table"$APP".ods_order_info partition(dt='$do_date');


load data inpath'/origin_data/$APP/db/order_detail/$do_date' OVERWRITE into table "$APP".ods_order_detailpartition(dt='$do_date');


load data inpath'/origin_data/$APP/db/sku_info/$do_date' OVERWRITE into table "$APP".ods_sku_infopartition(dt='$do_date');


load data inpath

'/origin_data/$APP/db/user_info/$do_date' OVERWRITE into table

"$APP".ods_user_info partition(dt='$do_date');


load data inpath

'/origin_data/$APP/db/payment_info/$do_date' OVERWRITE into table

"$APP".ods_payment_info partition(dt='$do_date');


load data inpath

'/origin_data/$APP/db/base_category1/$do_date' OVERWRITE into table

"$APP".ods_base_category1 partition(dt='$do_date');


load data inpath

'/origin_data/$APP/db/base_category2/$do_date' OVERWRITE into table

"$APP".ods_base_category2 partition(dt='$do_date');


load data inpath

'/origin_data/$APP/db/base_category3/$do_date' OVERWRITE into table

"$APP".ods_base_category3 partition(dt='$do_date');

"

hive -e "$sql"

2)增加脚本执行权限

[root@hadoop102 bin]$ chmod 777 ods_db.sh

3)采用脚本导入数据

[root@hadoop102 bin]$ ods_db.sh 2019-02-10

5.4 DWD层

对ODS层数据进行判空过滤。对商品分类表进行维度退化(降维)。

5.4.1 创建订单表

hive (gmall)>

drop table if exists dwd_order_info;

create external table dwd_order_info (

  `id` string COMMENT '',

  `total_amount` decimal(10,2) COMMENT '',

  `order_status` string COMMENT ' 1 2 3  4  5',

 `user_id` string COMMENT 'id' ,

    `payment_way` string

COMMENT '',  

    `out_trade_no` string

COMMENT '',  

    `create_time` string

COMMENT '',  

    `operate_time` string

COMMENT ''

)

PARTITIONED BY ( `dt` string)

stored as parquet

location '/warehouse/gmall/dwd/dwd_order_info/'

;

5.4.2 创建订单详情表

hive (gmall)>

drop table if exists dwd_order_detail;

create external table dwd_order_detail(

  `id` string COMMENT '',

  `order_id` decimal(10,2) COMMENT '',

 `user_id` string COMMENT 'id' ,

    `sku_id` string

COMMENT 'id',  

    `sku_name` string

COMMENT '',  

    `order_price` string

COMMENT '',  

    `sku_num` string

COMMENT '',

    `create_time` string

COMMENT ''

)

PARTITIONED BY (`dt` string)

stored as parquet

location '/warehouse/gmall/dwd/dwd_order_detail/'

;

5.4.3 创建用户表

hive (gmall)>

drop table if exists dwd_user_info;

create external table dwd_user_info(

  `id` string COMMENT 'id',

  `name` string COMMENT '',

 `birthday` string COMMENT '' ,

    `gender` string

COMMENT '',  

    `email` string COMMENT

'',  

    `user_level` string

COMMENT '',  

    `create_time` string COMMENT

''

)

PARTITIONED BY (`dt` string)

stored as parquet

location '/warehouse/gmall/dwd/dwd_user_info/'

;

5.4.4 创建支付流水表

hive (gmall)>

drop table if exists `dwd_payment_info`;

create external  table `dwd_payment_info`(

    `id`  

bigint COMMENT '',

`out_trade_no` string COMMENT '',

    `order_id`

       string COMMENT '',

    `user_id`

        string COMMENT '',

    `alipay_trade_no`

string COMMENT '',

    `total_amount`

   decimal(16,2) COMMENT '',

    `subject`

        string COMMENT '',

`payment_type`   string COMMENT '',

`payment_time` string COMMENT ''

PARTITIONED BY ( `dt` string)

stored as parquet

location '/warehouse/gmall/dwd/dwd_payment_info/'

;

5.4.5 创建商品表(增加分类)

hive (gmall)>

drop table if exists dwd_sku_info;

create external table dwd_sku_info(

  `id` string COMMENT 'skuId',

  `spu_id` string COMMENT 'spuid',

 `price` decimal(10,2) COMMENT '' ,

    `sku_name` string

COMMENT '',  

    `sku_desc` string

COMMENT '',  

    `weight` string

COMMENT '',  

    `tm_id` string COMMENT

'id',  

`category3_id` stringCOMMENT '1id',

`category2_id` string COMMENT '2id',

`category1_id` string COMMENT '3id',

`category3_name` string COMMENT '3',

`category2_name` string COMMENT '2',

`category1_name` string COMMENT '1',

`create_time` stringCOMMENT ''

)

PARTITIONED BY ( `dt` string)

stored as parquet

location '/warehouse/gmall/dwd/dwd_sku_info/'

;

5.4.6 DWD层数据导入脚本

1)在/root/bin目录下创建脚本dwd_db.sh

[root@hadoop102

bin]$ vim dwd_db.sh

       在脚本中填写如下内容

#!/bin/bash


# 定义变量方便修改

APP=gmall


# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天

if [ -n "$1" ] ;then

   do_date=$1

else

   do_date=`date-d "-1 day" +%F` 

fi


sql="


set hive.exec.dynamic.partition.mode=nonstrict;


insert overwrite table   "$APP".dwd_order_infopartition(dt)

select * from "$APP".ods_order_info

where dt='$do_date'  and id is not null;


insert overwrite table   "$APP".dwd_order_detailpartition(dt)

select * from"$APP".ods_order_detail

where dt='$do_date'   andid is not null;


insert overwrite table   "$APP".dwd_user_info partition(dt)

select * from "$APP".ods_user_info

where dt='$do_date'   andid is not null;


insert overwrite table   "$APP".dwd_payment_infopartition(dt)

select * from"$APP".ods_payment_info

where dt='$do_date'  andid is not null;


insert overwrite table   "$APP".dwd_sku_info partition(dt)

select 

   sku.id,

   sku.spu_id,

   sku.price,

   sku.sku_name, 

   sku.sku_desc, 

   sku.weight, 

   sku.tm_id, 

    sku.category3_id, 

    c2.id category2_id , 

    c1.id category1_id, 

    c3.namecategory3_name, 

    c2.namecategory2_name, 

    c1.name category1_name, 

   sku.create_time,

   sku.dt

from

   "$APP".ods_sku_info sku

join "$APP".ods_base_category3 c3 on sku.category3_id=c3.id

   join "$APP".ods_base_category2 c2 on c3.category2_id=c2.id

   join "$APP".ods_base_category1 c1 on c2.category1_id=c1.id

where sku.dt='$do_date'  andc2.dt='$do_date' 

and  c3.dt='$do_date'and c1.dt='$do_date'

andsku.id is not null;


"

hive -e "$sql"

2)增加脚本执行权限

[root@hadoop102 bin]$ chmod 777 dwd_db.sh

3)采用脚本导入数据

[root@hadoop102 bin]$ dwd_db.sh 2019-02-10

5.5 DWS层之用户行为宽表

1)为什么要建宽表

需求目标,把每个用户单日的行为聚合起来组成一张多列宽表,以便之后关联用户维度信息后进行,不同角度的统计分析。

5.5.1 创建用户行为宽表

hive (gmall)>

drop table if exists dws_user_action;

create external table dws_user_action

(  

    user_id          string     comment '用户id',

    order_count     bigint     comment '下单次数',

    order_amount    decimal(16,2)  comment '下单金额',

    payment_count   bigint     comment '支付次数',

    payment_amount  decimal(16,2) comment '支付金额'

) COMMENT '每日用户行为宽表'

PARTITIONED BY (`dt` string)

stored as parquet

location'/warehouse/gmall/dws/dws_user_action/'

tblproperties("parquet.compression"="snappy");

5.5.2 用户行为数据宽表导入脚本

1)在/root/bin目录下创建脚本dws_db_wide.sh

[root@hadoop102

bin]$ vim dws_db_wide.sh

       在脚本中填写如下内容

#!/bin/bash


# 定义变量方便修改

APP=gmall


# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天

if [ -n "$1" ] ;then

   do_date=$1

else

   do_date=`date-d "-1 day" +%F` 

fi


sql="

with 

tmp_order as

(

   select

       user_id,

       count(*)  order_count,

       sum(oi.total_amount) order_amount

   from "$APP".dwd_order_info oi

   where date_format(oi.create_time,'yyyy-MM-dd')='$do_date'

   group by user_id

) ,

tmp_payment as

(

   select

       user_id,

       sum(pi.total_amount) payment_amount,

       count(*) payment_count

   from "$APP".dwd_payment_info pi

   where date_format(pi.payment_time,'yyyy-MM-dd')='$do_date'

   group by user_id

)

insert overwrite table"$APP".dws_user_action partition(dt='$do_date')

select

   user_actions.user_id,

   sum(user_actions.order_count),

   sum(user_actions.order_amount),

   sum(user_actions.payment_count),

   sum(user_actions.payment_amount)

from

(

   select

       user_id,

       order_count,

        order_amount,

       0 payment_count,

       0 payment_amount

   from tmp_order


   union all

   select

       user_id,

       0 order_count,

       0 order_amount,

       payment_count,

       payment_amount

   from tmp_payment

 )user_actions

group by user_id;

"


hive -e "$sql"

2)增加脚本执行权限

[root@hadoop102 bin]$ chmod 777 dws_db_wide.sh

3)增加脚本执行权限

[root@hadoop102 bin]$ dws_db_wide.sh

2019-02-10

5.6 ADS层(需求:GMV成交总额)

5.6.1 建表语句

hive (gmall)>

drop table if exists ads_gmv_sum_day;

create external table

ads_gmv_sum_day(

    `dt` string COMMENT '统计日期',

`gmv_count` bigint COMMENT '当日gmv订单个数',

`gmv_amount` decimal(16,2) COMMENT '当日gmv订单总金额',

`gmv_payment` decimal(16,2) COMMENT '当日支付金额'

) COMMENT 'GMV'

row format delimited fields terminated by

'\t'

location '/warehouse/gmall/ads/ads_gmv_sum_day/'

;

5.6.2 数据导入脚本

1)在/root/bin目录下创建脚本ads_db_gmv.sh

[root@hadoop102

bin]$ vim ads_db_gmv.sh

       在脚本中填写如下内容

#!/bin/bash


# 定义变量方便修改

APP=gmall


# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天

if [ -n "$1" ] ;then

   do_date=$1

else

   do_date=`date-d "-1 day" +%F`

fi


sql="

insert into table "$APP".ads_gmv_sum_day

select

   '$do_date' dt,

   sum(order_count)  gmv_count,

   sum(order_amount) gmv_amount,

   sum(payment_amount) payment_amount

from "$APP".dws_user_action

where dt ='$do_date'

group by dt;

"


hive -e "$sql"

2)增加脚本执行权限

[root@hadoop102 bin]$ chmod 777 ads_db_gmv.sh

3)执行脚本

[root@hadoop102 bin]$ ads_db_gmv.sh

2019-02-10

5.6.3 数据导出脚本

1)在MySQL中创建ads_gmv_sum_day表

DROP TABLE IF EXISTS ads_gmv_sum_day;

CREATE TABLE ads_gmv_sum_day(

 `dt` varchar(200) DEFAULT NULL COMMENT '统计日期',

 `gmv_count` bigint(20) DEFAULT NULL COMMENT '当日gmv订单个数',

 `gmv_amount` decimal(16, 2) DEFAULT NULL COMMENT '当日gmv订单总金额',

 `gmv_payment` decimal(16, 2) DEFAULT NULL COMMENT '当日支付金额'

) ENGINE = InnoDB CHARACTER SET = utf8

COLLATE = utf8_general_ci COMMENT = '每日活跃用户数量' ROW_FORMAT =

Dynamic;

2)在/root/bin目录下创建脚本sqoop_export.sh

[root@hadoop102

bin]$ vim sqoop_export.sh

       在脚本中填写如下内容

#!/bin/bash


db_name=gmall


export_data() {

sqoop export \

--connect "jdbc:mysql://hadoop102:3306/${db_name}?useUnicode=true&characterEncoding=utf-8"  \

--username root \

--password 000000 \

--table $1 \

--num-mappers 1 \

--export-dir /warehouse/$db_name/ads/$1 \

--input-fields-terminated-by

"\t" \

--update-mode allowinsert \

--update-key

"tm_id,category1_id,stat_mn,stat_date" \

--input-null-string '\\N'    \

--input-null-non-string '\\N'

}


case $1 in

 "ads_gmv_sum_day")

    export_data "ads_gmv_sum_day"

;;

  "all")

    export_data "ads_gmv_sum_day"

;;

esac

3)增加脚本执行权限

[root@hadoop102 bin]$ chmod 777 sqoop_export.sh

4)执行脚本导入数据

[root@hadoop102 bin]$ sqoop_export.sh

all

5)在SQLyog查看导出数据

select * from ads_gmv_sum_day

5.7 Oozie基于Hue实现GMV指标全流程调度

5.7.1 执行前的准备

1)添加MySQL驱动文件

[root@hadoop102 ~]# cp /opt/software/mysql-libs/mysql-connector-java-5.1.27/mysql-connector-java-5.1.27-bin.jar/opt/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/lib


[root@hadoop102 ~]# cp/opt/software/mysql-libs/mysql-connector-java-5.1.27/mysql-connector-java-5.1.27-bin.jar/opt/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/sqoop/lib/


[root@hadoop102 mysql-connector-java-5.1.27]#hadoop fs -put /opt/software/mysql-libs/mysql-connector-java-5.1.27/mysql-connector-java-5.1.27-bin.jar/user/oozie/share/lib/lib_20190603220831/sqoop


[root@hadoop102 mysql-connector-java-5.1.27]#xsync /opt/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/lib


[root@hadoop102mysql-connector-java-5.1.27]#  xsync /opt/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/sqoop/lib/

2)修改YARN的容器内存yarn.nodemanager.resource.memory-mb为4G

5)新建一个Hue账户

6)切换用户

5.7.2 在Hue中创建Oozie任务GMV

1)生成数据

CALL init_data('2019-02-12',300,200,300,FALSE);

2)安装Oozie可视化js,复制ext-2.2.zip到/opt/cloudera/parcels/CDH/lib/oozie/libext(或/var/lib/oozie)中,解压

[root@hadoop102 libext]# unzip

ext-2.2.zip

3)查看Hue的web页面

4)选择query->scheduler->workflow

5)点击My Workflow->输入gmv

6)点击保存

5.7.3 编写任务脚本并上传到HDFS

1)点击workspace,查看上传情况

2)上传要执行的脚本导HDFS路径

 [root@hadoop102 bin]# hadoop fs -put /root/bin/*.sh/user/hue/oozie/workspaces/hue-oozie-1559457755.83/lib

3)点击左侧的->Documents->gmv

5.7.4 编写任务调度

1)点击编辑

2)选择actions

3)拖拽控件编写任务

4)选择执行脚本

5)Files再选择执行脚本

6)定义脚本参数名称

7)按顺序执行3-6步骤。

ods_db.sh、dwd_db.sh、dws_db_wide.sh、ads_db_gmv.sh和sqoop_export.sh

       其中ods_db.sh、dwd_db.sh、dws_db_wide.sh、ads_db_gmv.sh只有一个参数do_date;

       sqoop_export.sh只有一个参数all。

8)点击保存

5.7.5 执行任务调度

1)点击执行

2)点击提交

第6章 即席查询数仓搭建

6.1 Impala安装

6.1.1 添加服务

6.1.2 选择Impala服务

6.1.3 角色分配

注意:最好将StateStore和CataLog Sever单独部署在同一节点上。

6.1.4 配置Impala

6.1.5 启动Impala

6.1.6 安装成功

6.1.7 配置Hue支持Impala

6.2 Impala基于Hue查询




第7章Spark2.1安装

在CDH5.12.1集群中,默认安装的Spark是1.6版本,这里需要将其升级为Spark2.1版本。经查阅官方文档,发现Spark1.6和2.x是可以并行安装的,也就是说可以不用删除默认的1.6版本,可以直接安装2.x版本,它们各自用的端口也是不一样的。

Cloudera发布Apache Spark 2概述(可以在这里面找到安装方法和parcel包的仓库)

cloudera的官网可以下载相关的parcel 的离线安装包:https://www.cloudera.com/documentation/spark2/latest/topics/spark2_installing.html

Cloudera Manager及5.12.0版本的介绍:https://www.cloudera.com/documentation/enterprise/latest/topics/cm_ig_parcels.html#cmug_topic_7_11_5__section

7.1 升级过程

7.1.1 离线包下载

1)所需软件:http://archive.cloudera.com/spark2/csd/

2)Parcels 包的下载地址:http://archive.cloudera.com/spark2/parcels/2.2.0.cloudera1/

7.1.2 离线包上传

1)上传文件SPARK2_ON_YARN-2.1.0.cloudera1.jar到/opt/cloudera/csd/下面

2)上传文件SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904-el6.parcel和SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904-el6.parcel.sha1到/opt/cloudera/parcel-repo/

3)将SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904-el6.parcel.sha1重命名为SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904-el6.parcel.sha

[root@hadoop102 parcel-repo]# mv

/opt/cloudera/parcel-repo/SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904-el6.parcel.sha1

/opt/cloudera/parcel-repo/SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904-el6.parcel.sha

7.2 页面操作

7.2.1 更新Parcel

在cm首页点击Parcel,再点击检查新Parcel

7.2.2 点击分配

7.2.3 点击激活

7.2.4 回到首页点击添加服务

7.2.5 点击Spark2继续

7.2.6 选择一组依赖关系

7.2.7 角色分配

7.2.8 部署并启动

注意:这里我报了一个错:客户端配置 (id=12) 已使用 1 退出,而预期值为0

1)问题原因:最后找到原因是因为CM安装Spark不会去环境变量去找Java,需要将Java路径添加到CM配置文件

2)解决方法1(需要重启cdh):

[root@hadoop102 java]# vim

/opt/module/cm/cm-5.12.1/lib64/cmf/service/client/deploy-cc.sh

在文件最后加上

JAVA_HOME= /opt/module/jdk1.8.0_144

export JAVA_HOME= /opt/module/jdk1.8.0_144

3)解决方法2(无需重启cdh):

查看/opt/module/cm/cm-5.12.1/lib64/cmf/service/common/cloudera-config.sh

找到java8的home目录

cdh不会使用系统默认的JAVA_HOME环境变量,而是依照bigtop进行管理,因此我们需要在指定的/usr/java/jdk1.8目录下安装jdk。当然我们已经在/opt/module/jdk1.8.0_144下安装了jdk,因此创建一个连接过去即可

[root@hadoop102 ~]# ln -s /opt/module/jdk1.8.0_144/

/usr/java/jdk1.8

[root@hadoop103 ~]# ln -s

/opt/module/jdk1.8.0_144/ /usr/java/jdk1.8

[root@hadoop104 ~]# ln -s

/opt/module/jdk1.8.0_144/ /usr/java/jdk1.8

3)解决方法3(需要重启cdh):

找到hadoop102、hadoop103、hadoop104三台机器的配置,配置java主目录

7.2.9 命令行查看命令

第8章 Sentry安装

8.1 Sentry安装部署

8.1.1 添加Sentry服务

8.1.2 选择添加到集群

8.1.3 自定义Sentry角色分配

8.1.4 Mysql中创建Sentry库

8.1.5 配置数据库连接

8.1.6 成功完成Sentry的服务添加

8.2 Sentry与Hive集成使用

8.2.1 Hive的配置中需要配置以下参数

设置sentry.hive.testing.mode 为true


8.2.2 配置Hive使用Sentry服务

8.2.3 关闭Hive的用户模拟功能


8.3 Sentry与Hive的使用

8.3.1 使用beeline连接HiveServer2,并登录hive用户

[root@hadoop104 init.d]#beeline

Java

  HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=512M;

  support was removed in 8.0

Java

  HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=512M;

  support was removed in 8.0

Beeline

  version 1.1.0-cdh5.12.1 by Apache Hive

beeline>

  ! connect

  jdbc:hive2://hadoop104:10000

scan

  complete in 484ms

Connecting

  to jdbc:hive2://hadoop104:10000

Enter

  username for jdbc:hive2://hadoop104:10000: hive

Enter

  password for jdbc:hive2://hadoop104:10000: (直接回车)

Connected

  to: Apache Hive (version 1.1.0-cdh5.12.1)

Driver:

  Hive JDBC (version 1.1.0-cdh5.12.1)

Transaction

  isolation: TRANSACTION_REPEATABLE_READ

0:

  jdbc:hive2://hadoop104:10000>

注意:标红部分为输入的hive用户,输入的hive用户并未真正的校验。

8.3.2 创建一个admin角色

0: jdbc:hive2://hadoop104:10000> create role admin;


8.3.3 为admin角色赋予超级权限

0:jdbc:hive2://hadoop104:10000> grant all on server server1 to

  role admin;


8.3.4 将admin角色授权给hive用户组

0: jdbc:hive2://hadoop104:10000> grant  role admin to group hive;


8.3.5 创建test表

使用beeline登录hive用户,创建一个test表,并插入测试数据

0:jdbc:hive2://hadoop104:10000> 

  create table test (s1 string, s2 string) row format delimited fields  terminated by ',';


0: jdbc:hive2://hadoop104:10000>  insert into test values('a','b'),('1','2');


8.3.6 创建测试角色并授权给用户组

创建两个角色:

read:只能读default库test表,并授权给user_r用户组

write:只能写default库test表,并授权给user_w用户组

注意:集群所有节点必须存在user_r和user_w用户,用户默认用户组与用户名一致,赋权是针对用户组而不是针对用户。

[root@hadoop104 ~]# useradd user_r

[root@hadoop104

  ~]# id user_r

uid=502(user_r)

  gid=503(user_r) 组=503(user_r)

[root@hadoop104

  ~]# useradd user_w

[root@hadoop104

  ~]# id user_w

uid=503(user_w)

  gid=504(user_w) 组=504(user_w)

8.3.7 使用hive用户创建创建read和write角色,并授权read角色对test表select权限,write角色对test表insert权限

0: jdbc:hive2://hadoop104:10000>  create role read;


0:

  jdbc:hive2://hadoop104:10000> grant select on table test to role read;


0:

  jdbc:hive2://hadoop104:10000> create role write;


0:

  jdbc:hive2://hadoop104:10000> grant insert on table test to role write;



8.3.8 为user_r用户组授权read角色,为user_w用户组授权write角色

0: jdbc:hive2://hadoop104:10000> grant role read to group user_r;


0: jdbc:hive2://hadoop104:10000> grant

  role write to group user_w;


8.3.9 beeline验证

1.使用user_r用户登录beeline进行验证

[root@hadoop104  init.d]#beeline

beeline>! connect  jdbc:hive2://hadoop104:10000

scan complete in 4ms

Connecting to

  jdbc:hive2://hadoop104:10000

Enter username for

  jdbc:hive2://hadoop104:10000: user_r

Enter password for

  jdbc:hive2://hadoop104:10000: (直接回车)

Connected to: Apache Hive (version

  1.1.0-cdh5.12.1)

Driver: Hive JDBC (version

  1.1.0-cdh5.12.1)

Transaction isolation:

  TRANSACTION_REPEATABLE_READ

0:

  jdbc:hive2://hadoop104:10000> show  tables;

INFO  : OK

+-----------+--+

|  tab_name  |

+-----------+--+

|  test      |

+-----------+--+

1

  row selected (0.649 seconds)

0:  jdbc:hive2://hadoop104:10000>  select * from test;

INFO  : OK

+----------+----------+--+

|  test.s1  | test.s2  |

+----------+----------+--+

|  a        | b        |

|  1        | 2        |

+----------+----------+--+

2

  rows selected (0.485 seconds)

0:

  jdbc:hive2://hadoop104:10000> insert

  into test values("2", "222");

Error: Error while compiling statement: FAILED:  SemanticException No valid privileges

 User user_r does  not have privileges for QUERY

 The required  privileges: Server=server1->Db=default->Table=test->action=insert;  (state=42000,code=40000)

0:

  jdbc:hive2://hadoop104:10000>


2.使用user_w用户登录beeline验证

[root@hadoop104 init.d]#beeline

beeline>

  ! connect jdbc:hive2://hadoop104:10000

Connecting

  to jdbc:hive2://hadoop104:10000

Enter

  username for jdbc:hive2://hadoop104:10000: user_w

Enter

  password for jdbc:hive2://hadoop104:10000:

Connected

  to: Apache Hive (version 1.1.0-cdh5.12.1)

Driver:

  Hive JDBC (version 1.1.0-cdh5.12.1)

Transaction

  isolation: TRANSACTION_REPEATABLE_READ

0:

  jdbc:hive2://hadoop104:10000> select  * from test;

Error: Error while compiling statement: FAILED:  SemanticException No valid privileges

 User user_w does  not have privileges for QUERY

 The required  privileges:  Server=server1->Db=default->Table=test->Column=s1->action=select;  (state=42000,code=40000)

0:

  jdbc:hive2://hadoop104:10000>insert into test values("2", "333");

…..

INFO  : Total MapReduce CPU Time Spent: 20  seconds 30 msec

INFO  : Completed executing  command(queryId=hive_20190603173535_88f71295-e612-4b45-bff7-df1fc4310a54);  Time taken: 58.184 seconds

INFO  : OK

No

  rows affected (59.17 seconds)

0:

  jdbc:hive2://hadoop104:10000>

验证总结:

user_r用户所属组为user_r拥有test表读权限,所以只能对test表进行selecth和count操作不能进行insert操作;

user_w用户所属组为user_w拥有test表写权限,所以只能对test表进行insert操作不能进行select和count操作;

8.4 Kerberos在企业中的作用

CDH平台中的安全,认证(Kerberos/LDAP)是第一步,授权(Sentry)是第二步。如果要启用授权,必须先启用认证。但在CDH平台中给出了一种测试模式,即不启用认证而只启用Sentry授权。但强烈不建议在生产系统中这样使用,因为如果没有用户认证,授权没有任何意义形同虚设,用户可以随意使用任何超级用户登录HiveServer2或者Impala,并不会做密码校验。注:本文档仅适用于测试环境。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343

推荐阅读更多精彩内容