1. Flume简介
Apache Flume是一个分布式的、可靠的、可用的,从多种不同的源收集、聚集、移动大量日志数据到集中数据存储的系统。
目前,只能运行在Unix服务器上。
Flume基于流式数据的、使用简单的(借助配置文件即可)、健壮的、容错的。
Flume的简单体现在:写一个source、channel、sink之后,一条命令就能操作成功。
Flume、Kafka实时进行数据收集,Storm、Spark实时数据处理,Impala实时查询。
1.数据流架构
Flume event定义为一个数据流(包含字节和一些属性)的单元。
Flume source消费外部数据源传来的事件。外部数据源发送event给source,当然,外部数据源也可以是另一个Agent的sink。source收到event后,将其存储在一个或者多个channel中,channel是一个流式管道,这些event在channel中等待sink来消费。sink可以将event挪到外部存储(例如hdfs)或者传给另一个agent的source。同一个agent中的source和sink异步处理channel中的event。
2.可恢复性:
channel可以选择内存、文件或者其他一些方式来处理。使用内存作为channel,处理会比较快,但是并不安全。当agent进程意外退出时会丢失数据,所以这种处理方式多用于测试。
可以使用文件作为channel,这样数据可恢复。
3.Event:
event是flume数据传输的基本单元。Flume以事件的形式将数据从源头传到目的地。Event由可选的header和载有数据的一个byte array构成。(1)载有的数据对Flume是不透明的;(2)Header是容纳了key-value字符串对的无序集合,key在集合里是唯一的。(3)Header可以在上下文路由中使用扩展。
4.flume-cdh官方文档:
[官方文档][1]
[1]:http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.5.0-cdh5.3.6/FlumeUserGuide.html
2.Flume安装
1.选择和解压安装包:
为了和之前的软件版本兼容,这里还是选择cdh5.3.6的版本。
$ tar zxf flume-ng-1.5.0-cdh5.3.6.tar.gz -C /opt/modules/
2.修改配置文件:
修改conf/flume-env.sh文件,添加java环境变量。
export JAVA_HOME=/opt/modules/jdk1.7.0_67
3.flume运行:
Flume使用安装目录下bin/flume-ng进行执行程序,查看flume-ng的使用方法:
$ bin/flume-ng
下面是一个使用案例:
$ bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template
参数说明:
(1)agent参数表示,启动一个agent。
(2)-n或者--name 指定agent的名字,在配置文件中定义的agent的名称,例如下边的样例的名字是a1。
(3)-c或者--conf 指定配置文件所在的目录。该目录下要包含flume-env.sh文件。
(4)-f或者--conf-file 指定具体的配置文件。
-f指定的是一个Flume Agent的配置,存储在本地配置文件,该配置文件包含对source、channel、sink的属性配置,和其相关联形成数据流的配置。
3.Flume官方实例:
下面是官方给出的一个样例,Flume Agent实时监控端口,收集数据,将其以日志的形式打印在控制台。
1.配置文件:
下面配置文件定义了一个agent,名为“a1”。a1有一个source,监听端口44444的数据。source、channel、sink的名称分别是r1、c1、k1。a1.channels.c1.type = memory 定义使用内存作为channel。
# example.conf: A single-node Flume configuration
#Name the components of the agent:
a1.sources=r1
a1.channels=c1
a1.sinks=k1
#Describe/Define the source:
a1.sources.r1.type=netcat
a1.sources.r1.bind=hadoop-senior01.pmpa.com
a1.sources.r1.port=44444
#Describe/Define the channel:
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Describe the sink
a1.sinks.k1.type = logger
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动agent:
$ bin/flume-ng agent -n a1 -c conf -f conf/test-conf.properties -Dflume.root.logger=INFO,console
-Dflume.root.logger=INFO,console选项指定Flume记录日志并打印到控制台。
在启动了这个agent后,Flume会开启对端口44444的监听。
2.测试环境准备:
为了测试结果,需要使用telnet命令,如果没有安装telnet,使用下面方法安装:
$ yum install telnet
$ yum install telnet-server
netcat是Linux中一个强大的网络工具,nc命令是netcat命令的简称。使用root用户,安装软件包(使用rpm包方式安装)。
rpm -ivh nc-1.84-22.el6.x86_64.rpm
使用nc命令,可以开启对某个端口的监听:
$ nc -l 44444
查看对应的端口是哪个进程在监听的:
$ lsof: -i 44444
3.测试flume实验结果:
启动了a1 Agent之后,flume就开始对端口44444进行监听。为了测试,我们在另一个终端,telnet本机的44444端口,并发送一些消息,这时候flume就会在控制台上打印反馈这些消息。
telnet的输入数据:
在telnet发送了数据后,flume会立即接收,并在控制台打印:
4.结论:
Flume的开发就是编写配置文件,配置Agent中的source、channel、sink的类型和属性。
官方文档中,列出了很多Source、Channel、Sink的种类,数量众多,可以重点关注以下一些常用的类型:
(1)Source:Exec Source、Spooling Directory Source、Kafka Source、Syslog Sources、HTTP Source。
(2)Channel:Kafka Channel、File Channel
(3)Sink: HDFS Sink、HBaseSinks、MorphlineSolrSink、ElasticSearchSink。
对于Spooling Directory Source,只能放不可变、名字唯一的文件到目录中。
4.实时收集数据到HDFS:
实时监控某个日志文件,将数据收集存储到HDFS上。使用EXEC source。下面的例子实时监控Hive日志文件,放到HDFS目录中。
1.Hive日志文件:
首先需要Hive开启日志记录,conf下的log4j文件去掉template,并修改文件中的hive.log.dir的配置为自定义的路径,再启动Hive:
$ cp -a hive-log4j.properties.template hive-log4j.properties
$ cp -a hive-exec-log4j.properties.template hive-exec-log4j.properties
hive.log.dir=/opt/modules/hive-0.13.1-cdh5.3.6/logs
在logs目录下,我们会看到hive.log文件,我们就以监控这个日志文件为例。
2.准备HDFS目录:
在HDFS上创建一个单独的目录,/flume:
$ bin/hdfs dfs -mkdir /flume/hive_log
3.准备Hadoop jar包:
由于Flume将数据写入到HDFS文件中,从某种意义讲,Flume是HDFS的客户端,所以需要将HDFS Client JAR包放到flume的安装目录的lib目录下。具体包括下面4个jar包:
$ cp share/hadoop/common/hadoop-common-2.5.0-cdh5.3.6.jar /opt/modules/apache-flume-1.5.0-cdh5.3.6-bin/lib/
$ cp share/hadoop/tools/lib/commons-configuration-1.6.jar /opt/modules/apache-flume-1.5.0-cdh5.3.6-bin/lib/
$ cp share/hadoop/tools/lib/hadoop-auth-2.5.0-cdh5.3.6.jar /opt/modules/apache-flume-1.5.0-cdh5.3.6-bin/lib/
$ cp share/hadoop/hdfs/hadoop-hdfs-2.5.0-cdh5.3.6.jar /opt/modules/apache-flume-1.5.0-cdh5.3.6-bin/lib/
4.编写配置文件:
在conf/新建一个配置文件,flume-tail.properties 。为了区别于之前的Agent,需要对agent、source、channel、sink重新命名。
#Name the components of the agent:
a2.sources=r2
a2.channels=c2
a2.sinks=k2
#Describe/Define the source:
a2.sources.r2.type=exec
a2.sources.r2.command=tail -f /opt/modules/hive-0.13.1-cdh5.3.6/logs/hive.log
a2.sources.r2.shell=/bin/bash -c
#Describe/Define the channel:
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path=hdfs://hadoop-senior01.pmpa.com:8020/flume/hive_log/%Y%m/%H
a2.sinks.k2.hdfs.fileType=DataStream
a2.sinks.k2.hdfs.writeFormat=Text
a2.sinks.k2.hdfs.batchSize=10
#如果不设置下边两项的话,flume会在hdfs目录下生成很多小文件。
#设置二级目录,按小时切割:
a2.sinks.k2.hdfs.round=true
a2.sinks.k2.hdfs.roundValue=1
a2.sinks.k2.hdfs.roundUnit=hour
#设置文件回滚条件。
a2.sinks.k2.hdfs.rollInterval=60
a2.sinks.k2.hdfs.rollSize=1024000000
a2.sinks.k2.hdfs.rollCount=0
#使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp=true
a2.sinks.k2.hdfs.minBlockReplicas=1
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
对于HDFS sinks, roll的概念是:close current file and create a new one。
5.测试结果:
运行Agent,可以看到flume开始在hdfs上创建文件的日志。
$ bin/flume-ng agent -n a2 -c conf -f conf/flume-tail.properties -Dflume.root.logger=INFO,console
这时候我们看到hdfs上已经有文件了:
hive> dfs -text /flume/hive_log/201703/01/FlumeData.1490636605634.tmp;
tmp结尾的文件,表示正在写入的文件,文件写入完成后,后边的.tmp后缀会去掉,并开启一个新的.tmp文件。例如下面的例子,FlumeData.1490637118909文件已经写完,又开启了一个新的文件FlumeData.1490637180881.tmp。
-rw-r--r-- 3 natty supergroup 6372 2017-03-28 01:53 /flume/hive_log/201703/01/FlumeData.1490637118909
-rw-r--r-- 3 natty supergroup 0 2017-03-28 01:53 /flume/hive_log/201703/01/FlumeData.1490637180881.tmp
5.数据仓库架构:
如何使用Flume? 通常在每一台应用服务器(例如Nginx、Apache服务器等)上安装Flume,这些服务器会产生很多日志,Flume将其抽取到HDFS上。
同时,也可以通过FTP将多台日志服务器上的日志放到统一的一台服务器上,然后在这个服务器上安装Flume来抽取到HDFS。
6.Flume使用案例,监控某一个目录:
与前边的案例类似,我们只需要开发配置文件即可。按照之前的配置文件复制一个新的文件flume-directory.properties:
$ cp flume-tail.properties flume-directory.properties
很多应用服务器在产生日志时,都存在“回滚”的情况,例如在一个日志目录中,文件abc.tmp表示正在写入的文件,在写入完成后,生成一个文件abc.tmp.1000。我们的这个案例就是为了解决这个实际情况,Flume实时扫描一个文件夹,按照正则表达式筛选目标文件(这里要剔除类似abc.tmp这样的文件)。
这里source选用Spooling Directory Source;channel选用File Channel;sink选用HDFS sink。
下面是开发的配置文件:
#Name the components of the agent:
a3.sources=r3
a3.channels=c3
a3.sinks=k3
#Describe/Define the source:
a3.sources.r3.type=spooldir
a3.sources.r3.spoolDir=/opt/data/flume/spooldir
a3.sources.r3.ignorePattern=.*\.tmp$
#Describe/Define the channel:
a3.channels.c3.type=file
a3.channels.c3.checkpointDir=/opt/modules/apache-flume-1.5.0-cdh5.3.6-bin/data/checkpointdir
a3.channels.c3.dataDirs=/opt/modules/apache-flume-1.5.0-cdh5.3.6-bin/data/datadir
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path=hdfs://hadoop-senior01.pmpa.com:8020/flume/hive_log/%Y%m/%H
a3.sinks.k3.hdfs.fileType=DataStream
a3.sinks.k3.hdfs.writeFormat=Text
a3.sinks.k3.hdfs.batchSize=10
#如果不设置下边两项的话,flume会在hdfs目录下生成很多小文件。
#设置二级目录,按小时切割:
a3.sinks.k3.hdfs.round=true
a3.sinks.k3.hdfs.roundValue=1
a3.sinks.k3.hdfs.roundUnit=hour
#设置文件回滚条件。
a3.sinks.k3.hdfs.rollInterval=60
a3.sinks.k3.hdfs.rollSize=1024000000
a3.sinks.k3.hdfs.rollCount=0
#使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp=true
a3.sinks.k3.hdfs.minBlockReplicas=1
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
由于source的fileSuffix配置项采用默认值,所有Flume处理完的文件,都加上了.COMPLETE后缀。
测试运行结果:
$ bin/flume-ng agent -n a3 -c conf -f conf/flume-directory.properties -Dflume.root.logger=INFO,console