集群规划:
服务器hadoop002 服务器hadoop003 服务器hadoop004
Flume(采集日志) Flume Flume
安装部署
1)将apache-flume-1.7.0-bin.tar.gz上传到linux的/opt/software目录下
2)解压apache-flume-1.7.0-bin.tar.gz到/opt/module/目录下
tar -zxf apache-flume-1.7.0-bin.tar.gz -C /opt/module/
3)修改apache-flume-1.7.0-bin的名称为flume
mv apache-flume-1.7.0-bin flume
4)将flume/conf下的flume-env.sh.template文件修改为flume-env.sh,并配置flume-env.sh文件
mv flume-env.sh.template flume-env.sh
vi flume-env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_144
第4章Flume监控之Ganglia
4.1 Ganglia的安装与部署
1) 安装httpd服务与php
sudo yum -y install httpd php
2) 安装其他依赖
sudo yum -y install rrdtool perl-rrdtool rrdtool-devel
sudo yum -y install apr-devel
3) 安装ganglia
sudo rpm -Uvh http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm
sudo yum -y install ganglia-gmetad
sudo yum -y install ganglia-web
sudo yum install -y ganglia-gmond
4) 修改配置文件/etc/httpd/conf.d/ganglia.conf
sudo vim /etc/httpd/conf.d/ganglia.conf
修改为红颜色的配置:
# Ganglia monitoring system php web frontend
Alias /ganglia /usr/share/ganglia
<Location /ganglia>
Order deny,allow
Deny from all
Allow from all
# Allow from 127.0.0.1
# Allow from ::1
# Allow from .example.com
</Location>
5) 修改配置文件/etc/ganglia/gmetad.conf
sudo vim /etc/ganglia/gmetad.conf
修改为:
data_source "hadoop002" 192.168.xx.102
6) 修改配置文件/etc/ganglia/gmond.conf
sudo vim /etc/ganglia/gmond.conf
修改为:
cluster {
name = "hadoop002"
owner = "unspecified"
latlong = "unspecified"
url = "unspecified"
}
udp_send_channel {
#bind_hostname = yes # Highly recommended, soon to be default.
# This option tells gmond to use a source address
# that resolves to the machine's hostname. Without
# this, the metrics may appear to come from any
# interface and the DNS names associated with
# those IPs will be used to create the RRDs.
# mcast_join = 239.2.11.71
host = 192.168.xx.102
port = 8649
ttl = 1
}
udp_recv_channel {
# mcast_join = 239.2.11.71
port = 8649
bind = 192.168.xx.102
retry_bind = true
# Size of the UDP buffer. If you are handling lots of metrics you really
# should bump it up to e.g. 10MB or even higher.
# buffer = 10485760
}
7) 修改配置文件/etc/selinux/config
sudo vim /etc/selinux/config
修改为:
# This file controls the state of SELinux on the system.
# SELINUX= can take one of these three values:
# enforcing - SELinux security policy is enforced.
# permissive - SELinux prints warnings instead of enforcing.
# disabled - No SELinux policy is loaded.
SELINUX=disabled
# SELINUXTYPE= can take one of these two values:
# targeted - Targeted processes are protected,
# mls - Multi Level Security protection.
SELINUXTYPE=targeted
尖叫提示:selinux本次生效关闭必须重启,如果此时不想重启,可以临时生效之:
sudo setenforce 0
5) 启动ganglia
sudo service httpd start
sudo service gmetad start
sudo service gmond start
6) 打开网页浏览ganglia页面
尖叫提示:如果完成以上操作依然出现权限不足错误,请修改/var/lib/ganglia目录的权限:
sudo chmod -R 777 /var/lib/ganglia
Flume组件
1)Source
(1)Taildir Source相比Exec Source、Spooling Directory Source的优势
TailDir Source:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。
Exec Source可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。
Spooling Directory Source监控目录,不支持断点续传。
(2)batchSize大小如何设置?
答:Event 1K左右时,500-1000合适(默认为100)
2)Channel
采用Kafka Channel,省去了Sink,提高了效率。Kafka Channel的数据是存储在Kafka里面,而不是存在flume中的,所以数据是存储在磁盘上,可靠性很高。
注意在Flume1.7以前,Kafka Channel很少有人使用,因为发现parseAsFlumeEvent这个配置起不了作用。也就是无论parseAsFlumeEvent配置为true还是false,都会转为Flume Event。这样的话,造成的结果是,会始终都把Flume的headers中的信息混合着内容一起写入Kafka的消息中,这显然不是我所需要的,我只是需要把内容写入即可。
日志采集Flume配置
官网配置案例:
1)Flume配置分析
Flume直接读log日志的数据,log日志的格式是app-yyyy-mm-dd.log。
2)Flume的具体配置如下:
(1)在/opt/module/flume/conf目录下创建file-flume-kafka.conf文件
vim file-flume-kafka.conf
在文件配置如下内容
a1.sources=r1
a1.channels=c1 c2
# configure source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/flume/test/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/logs/app.+
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2
#interceptor
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.xxxx.flume.interceptor.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.xxxx.flume.interceptor.LogTypeInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = topic
a1.sources.r1.selector.mapping.topic_start = c1
a1.sources.r1.selector.mapping.topic_event = c2
# configure channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop002:9092,hadoop003:9092,hadoop004:9092
a1.channels.c1.kafka.topic = topic_start
# 这里一定要设成false,默认是true
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumer
a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers = hadoop002:9092,hadoop003:9092,hadoop004:9092
a1.channels.c2.kafka.topic = topic_event
# 这里一定要设成false,默认是true
a1.channels.c2.parseAsFlumeEvent = false
a1.channels.c2.kafka.consumer.group.id = flume-consumer
注意:com.xxxx.flume.interceptor.LogETLInterceptor和com.xxxx.flume.interceptor.LogTypeInterceptor是自定义的拦截器的全类名。需要根据用户自定义的拦截器做相应修改。
配置完毕,同步分发,004节点可以不用
官网拦截器配置:
选择器操作:
官网kafkachannel配置:
Java端代码实现ETL拦截器:建立LogETLInterceptor类
package com.xxxx.flume.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
public class LogETLInterceptor implements Interceptor {
// 不要导错包,实现四个方法
@Override
public void initialize() {
// 初始化
}
@Override
public Eventintercept(Event event) {
// 单event
// 清洗数据--ETL,初级岗位(不参与数据分析) { } =》 { xxx 不完整的大括号,就是脏数据
// {"action":"1","ar":"MX","ba":"Huawei","detail":"325"," 非法数据,必须清洗掉
// 1.得到一个字节数组,获取日志
byte[] body = event.getBody();
// 将字节数组转为string类型,导入lang包下,并定义编码类型
String log =new String(body, Charset.forName("UTF-8"));
// 2.区分类型处理
if (log.contains("start")){
// 验证启动日志的逻辑
if(LogUtils.validateStart(log)){
return event;
}
}else{
// 验证事件日志的逻辑
if(LogUtils.validateEvent(log)){
return event;
}
}
return null;
}
@Override
public Listintercept(List events) {
// 多event处理
ArrayList interceptors =new ArrayList<>();
// 判断取出校验合格的数据返回
for (Event event : events) {
Event intercept1 = intercept(event);
// 校验合格的放入list集合
if(intercept1 !=null){
interceptors.add(intercept1);
}
}
return interceptors;
}
@Override
public void close() {
// 关闭资源
}
// 静态内部类,实现builder
public static class Builderimplements Interceptor.Builder{
@Override
public Interceptorbuild() {
// 实例化一个对象,进行调用
return new LogETLInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
建立LogUtils类:
package com.atguigu.flume.interceptor;
import org.apache.commons.lang.math.NumberUtils;
public class LogUtils {
// 验证启动日志
public static boolean validateStart(String log) {
// {"action":"1","ar":"MX","ba":"Huawei","detail":"325","}
if(log ==null){
return false;
}
// 去除空格,如果不是{开头或者不是}结尾,全部返回false
if(!log.trim().startsWith("{") || !log.trim().endsWith("}")){
return false;
}
// 正常的数据,直接返回
return true;
}
// 验证事件日志
public static boolean validateEvent(String log) {
// 1600432697673|{"cm":{"ln":"-49.4","sv":"V2.9.2","os":"8.0.2","g":"UR801VVP@gmail.com",
// 服务器时间 | 日志内容
if(log ==null){
return false;
}
// 切割
String[] logContents = log.split("\\|");
// 防止数组越界
if(logContents.length !=2){
return false;
}
// 校验服务器时间(长度必须是13位 必须全部是数字)
if(logContents[0].length()!=13 || !NumberUtils.isDigits(log)){
return false;
}
// 校验日志格式
if (!logContents[1].trim().startsWith("{") || !logContents[1].trim().endsWith("}")){
return false;
}
return true;
}
}
类型区分拦截器:建立LogTypeInterceptor类
package com.xxxx.flume.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class LogTypeInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Eventintercept(Event event) {
// 区分类型start event
// body header
// 获取body
byte[] body = event.getBody();
String log =new String(body, Charset.forName("UTF-8"));
// 获取头信息,拿到的是地址--对象
Map headers = event.getHeaders();
// 业务逻辑判断,区分类型
if (log.contains("start")){
// k,v
headers.put("topic","topic_start");
}else {
headers.put("topic","topic_event");
}
return event;
}
@Override
public Listintercept(List events) {
ArrayList interceptors =new ArrayList<>();
for (Event event : events) {
Event intercept1 = intercept(event);
interceptors.add(intercept1);
}
return interceptors;
}
@Override
public void close() {
}
public static class Builderimplements Interceptor.Builder{
@Override
public Interceptorbuild() {
return new LogTypeInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
maven打包上传到flume下的lib中:
配置完毕,启动集群
后台启动flume:bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &
集群启动、停止脚本
# !/bin/bash
case $1 in
"start"){
for i in hadoop002 hadoop003
do
echo ------------- $i flume启动---------------
ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/test1 2>&1 &"
done
};;
"stop"){
for i in hadoop002 hadoop003
do
echo ------------- $i flume停止---------------
# 使用唯一值过滤,避免杀死重名的线程,后面的$2需要\转义一下
ssh $i "ps -ef | grep file-flume-kafka.conf | grep -v grep | awk '{print \$2}' | xargs kill -9"
done
};;
esac
演示效果:
说明1:nohup,该命令可以在你退出帐户/关闭终端之后继续运行相应的进程。nohup就是不挂起的意思,不间断地运行命令。(比较常用)
说明2:/dev/null代表linux的空设备文件,所有往这个文件里面写入的内容都会丢失,俗称“黑洞”。
说明3:awk 默认分割符号为空格
说明4:xargs 表示取出签名命令运行的结果,作为后面命令的输入参数。
Flume004节点,配置消费者模式
在hadoop004的/opt/module/flume/conf目录下创建kafka-flume-hdfs.conf文件
## 组件
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2
## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop002:9092,hadoop003:9092,hadoop004:9092
a1.sources.r1.kafka.topics=topic_start
## source2
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = hadoop002:9092,hadoop003:9092,hadoop004:9092
a1.sources.r2.kafka.topics=topic_event
## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
## channel2
a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /opt/module/flume/checkpoint/behavior2
a1.channels.c2.dataDirs = /opt/module/flume/data/behavior2/
a1.channels.c2.maxFileSize = 2146435071
a1.channels.c2.capacity = 1000000
a1.channels.c2.keep-alive = 6
## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second
##sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path
/origin_data/gmall/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-
a1.sinks.k2.hdfs.round = true
a1.sinks.k2.hdfs.roundValue = 10
a1.sinks.k2.hdfs.roundUnit = second
## 不要产生大量小文件
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k2.hdfs.rollInterval = 3600
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0
## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k2.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = lzop
a1.sinks.k2.hdfs.codeC = lzop
## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2
flume消费脚本004
#! /bin/bash
case $1 in
"start"){
for i in hadoop004
do
echo " --------启动 $i 消费flume-------"
ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log.txt 2>&1 &"
done
};;
"stop"){
for i in hadoop004
do
echo " --------停止 $i 消费flume-------"
ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs kill"
done
};;
esac
在002上更改脚本文件的权限
cd ~/bin
chmod 777 f2.sh
f2.sh start
f2.sh stop
Flume内存优化
1)问题描述:如果启动消费Flume抛出如下异常
ERROR hdfs.HDFSEventSink: process failed
java.lang.OutOfMemoryError: GC overhead limit exceeded
2)解决方案步骤:
(1)在hadoop002服务器的/opt/module/flume/conf/flume-env.sh文件中增加如下配置
export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
(2)同步配置到hadoop003、hadoop004服务器
xsync flume-env.sh
3)Flume内存参数设置及优化
JVM heap一般设置为4G或更高,部署在单独的服务器上(4核8线程16G内存)
-Xmx与-Xms最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁fullgc。
-Xms表示JVM Heap(堆内存)最小尺寸,初始分配;-Xmx表示JVM Heap(堆内存)最大允许的尺寸,按需分配。如果不设置一致,容易在初始化时,由于内存不够,频繁触发fullgc。