12.Flume介绍

1. Flume简介

Apache Flume是一个分布式的、可靠的、可用的,从多种不同的源收集、聚集、移动大量日志数据到集中数据存储的系统。

目前,只能运行在Unix服务器上。

Flume基于流式数据的、使用简单的(借助配置文件即可)、健壮的、容错的。

Flume的简单体现在:写一个source、channel、sink之后,一条命令就能操作成功。

Flume、Kafka实时进行数据收集,Storm、Spark实时数据处理,Impala实时查询。

1.数据流架构

Flume event定义为一个数据流(包含字节和一些属性)的单元。

数据流模型.png

Flume source消费外部数据源传来的事件。外部数据源发送event给source,当然,外部数据源也可以是另一个Agent的sink。source收到event后,将其存储在一个或者多个channel中,channel是一个流式管道,这些event在channel中等待sink来消费。sink可以将event挪到外部存储(例如hdfs)或者传给另一个agent的source。同一个agent中的source和sink异步处理channel中的event。

2.可恢复性:

channel可以选择内存、文件或者其他一些方式来处理。使用内存作为channel,处理会比较快,但是并不安全。当agent进程意外退出时会丢失数据,所以这种处理方式多用于测试。
可以使用文件作为channel,这样数据可恢复。

3.Event:

event是flume数据传输的基本单元。Flume以事件的形式将数据从源头传到目的地。Event由可选的header和载有数据的一个byte array构成。(1)载有的数据对Flume是不透明的;(2)Header是容纳了key-value字符串对的无序集合,key在集合里是唯一的。(3)Header可以在上下文路由中使用扩展。

4.flume-cdh官方文档:

[官方文档][1]
[1]:http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.5.0-cdh5.3.6/FlumeUserGuide.html

2.Flume安装

1.选择和解压安装包:

为了和之前的软件版本兼容,这里还是选择cdh5.3.6的版本。

$ tar zxf flume-ng-1.5.0-cdh5.3.6.tar.gz -C /opt/modules/

2.修改配置文件:

修改conf/flume-env.sh文件,添加java环境变量。

export JAVA_HOME=/opt/modules/jdk1.7.0_67

3.flume运行:

Flume使用安装目录下bin/flume-ng进行执行程序,查看flume-ng的使用方法:

$ bin/flume-ng 

下面是一个使用案例:

$ bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template

参数说明:
(1)agent参数表示,启动一个agent。
(2)-n或者--name 指定agent的名字,在配置文件中定义的agent的名称,例如下边的样例的名字是a1。
(3)-c或者--conf 指定配置文件所在的目录。该目录下要包含flume-env.sh文件。
(4)-f或者--conf-file 指定具体的配置文件。
-f指定的是一个Flume Agent的配置,存储在本地配置文件,该配置文件包含对source、channel、sink的属性配置,和其相关联形成数据流的配置。

3.Flume官方实例:

下面是官方给出的一个样例,Flume Agent实时监控端口,收集数据,将其以日志的形式打印在控制台。

1.配置文件:

下面配置文件定义了一个agent,名为“a1”。a1有一个source,监听端口44444的数据。source、channel、sink的名称分别是r1、c1、k1。a1.channels.c1.type = memory 定义使用内存作为channel。

# example.conf: A single-node Flume configuration

#Name the components of the agent:
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#Describe/Define the source:
a1.sources.r1.type=netcat
a1.sources.r1.bind=hadoop-senior01.pmpa.com
a1.sources.r1.port=44444

#Describe/Define the channel:
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Describe the sink
a1.sinks.k1.type = logger

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动agent:

$ bin/flume-ng agent -n a1 -c conf -f conf/test-conf.properties -Dflume.root.logger=INFO,console

-Dflume.root.logger=INFO,console选项指定Flume记录日志并打印到控制台。
在启动了这个agent后,Flume会开启对端口44444的监听。

2.测试环境准备:

为了测试结果,需要使用telnet命令,如果没有安装telnet,使用下面方法安装:

$ yum install telnet
$ yum install telnet-server

netcat是Linux中一个强大的网络工具,nc命令是netcat命令的简称。使用root用户,安装软件包(使用rpm包方式安装)。

rpm -ivh nc-1.84-22.el6.x86_64.rpm 

使用nc命令,可以开启对某个端口的监听:

$ nc -l 44444

查看对应的端口是哪个进程在监听的:

$ lsof: -i 44444

3.测试flume实验结果:

启动了a1 Agent之后,flume就开始对端口44444进行监听。为了测试,我们在另一个终端,telnet本机的44444端口,并发送一些消息,这时候flume就会在控制台上打印反馈这些消息。
telnet的输入数据:


telnet测试输入.png

在telnet发送了数据后,flume会立即接收,并在控制台打印:


flume接收端口数据.png

4.结论:

Flume的开发就是编写配置文件,配置Agent中的source、channel、sink的类型和属性。
官方文档中,列出了很多Source、Channel、Sink的种类,数量众多,可以重点关注以下一些常用的类型:
(1)Source:Exec Source、Spooling Directory Source、Kafka Source、Syslog Sources、HTTP Source。
(2)Channel:Kafka Channel、File Channel
(3)Sink: HDFS Sink、HBaseSinks、MorphlineSolrSink、ElasticSearchSink。

对于Spooling Directory Source,只能放不可变、名字唯一的文件到目录中。

4.实时收集数据到HDFS:

实时监控某个日志文件,将数据收集存储到HDFS上。使用EXEC source。下面的例子实时监控Hive日志文件,放到HDFS目录中。

1.Hive日志文件:

首先需要Hive开启日志记录,conf下的log4j文件去掉template,并修改文件中的hive.log.dir的配置为自定义的路径,再启动Hive:

$ cp -a hive-log4j.properties.template hive-log4j.properties
$ cp -a hive-exec-log4j.properties.template hive-exec-log4j.properties
hive.log.dir=/opt/modules/hive-0.13.1-cdh5.3.6/logs

在logs目录下,我们会看到hive.log文件,我们就以监控这个日志文件为例。

2.准备HDFS目录:

在HDFS上创建一个单独的目录,/flume:

$ bin/hdfs dfs -mkdir /flume/hive_log

3.准备Hadoop jar包:

由于Flume将数据写入到HDFS文件中,从某种意义讲,Flume是HDFS的客户端,所以需要将HDFS Client JAR包放到flume的安装目录的lib目录下。具体包括下面4个jar包:

$ cp share/hadoop/common/hadoop-common-2.5.0-cdh5.3.6.jar /opt/modules/apache-flume-1.5.0-cdh5.3.6-bin/lib/
$ cp share/hadoop/tools/lib/commons-configuration-1.6.jar /opt/modules/apache-flume-1.5.0-cdh5.3.6-bin/lib/
$ cp share/hadoop/tools/lib/hadoop-auth-2.5.0-cdh5.3.6.jar /opt/modules/apache-flume-1.5.0-cdh5.3.6-bin/lib/
$ cp share/hadoop/hdfs/hadoop-hdfs-2.5.0-cdh5.3.6.jar /opt/modules/apache-flume-1.5.0-cdh5.3.6-bin/lib/

4.编写配置文件:

在conf/新建一个配置文件,flume-tail.properties 。为了区别于之前的Agent,需要对agent、source、channel、sink重新命名。

#Name the components of the agent:
a2.sources=r2
a2.channels=c2
a2.sinks=k2

#Describe/Define the source:
a2.sources.r2.type=exec
a2.sources.r2.command=tail -f /opt/modules/hive-0.13.1-cdh5.3.6/logs/hive.log
a2.sources.r2.shell=/bin/bash -c


#Describe/Define the channel:
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path=hdfs://hadoop-senior01.pmpa.com:8020/flume/hive_log/%Y%m/%H
a2.sinks.k2.hdfs.fileType=DataStream
a2.sinks.k2.hdfs.writeFormat=Text
a2.sinks.k2.hdfs.batchSize=10

#如果不设置下边两项的话,flume会在hdfs目录下生成很多小文件。
#设置二级目录,按小时切割:
a2.sinks.k2.hdfs.round=true
a2.sinks.k2.hdfs.roundValue=1
a2.sinks.k2.hdfs.roundUnit=hour

#设置文件回滚条件。
a2.sinks.k2.hdfs.rollInterval=60
a2.sinks.k2.hdfs.rollSize=1024000000
a2.sinks.k2.hdfs.rollCount=0
#使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp=true
a2.sinks.k2.hdfs.minBlockReplicas=1

# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

对于HDFS sinks, roll的概念是:close current file and create a new one。

5.测试结果:

运行Agent,可以看到flume开始在hdfs上创建文件的日志。

$ bin/flume-ng agent -n a2 -c conf -f conf/flume-tail.properties -Dflume.root.logger=INFO,console

这时候我们看到hdfs上已经有文件了:

hive> dfs -text /flume/hive_log/201703/01/FlumeData.1490636605634.tmp;

tmp结尾的文件,表示正在写入的文件,文件写入完成后,后边的.tmp后缀会去掉,并开启一个新的.tmp文件。例如下面的例子,FlumeData.1490637118909文件已经写完,又开启了一个新的文件FlumeData.1490637180881.tmp。

-rw-r--r--   3 natty supergroup       6372 2017-03-28 01:53 /flume/hive_log/201703/01/FlumeData.1490637118909
-rw-r--r--   3 natty supergroup          0 2017-03-28 01:53 /flume/hive_log/201703/01/FlumeData.1490637180881.tmp

5.数据仓库架构:

数据仓库架构.png

如何使用Flume? 通常在每一台应用服务器(例如Nginx、Apache服务器等)上安装Flume,这些服务器会产生很多日志,Flume将其抽取到HDFS上。
同时,也可以通过FTP将多台日志服务器上的日志放到统一的一台服务器上,然后在这个服务器上安装Flume来抽取到HDFS。


每台应用服务器上安装Flume.png
单独在一台服务器上安装Flume.png

6.Flume使用案例,监控某一个目录:

与前边的案例类似,我们只需要开发配置文件即可。按照之前的配置文件复制一个新的文件flume-directory.properties:

$ cp flume-tail.properties flume-directory.properties

很多应用服务器在产生日志时,都存在“回滚”的情况,例如在一个日志目录中,文件abc.tmp表示正在写入的文件,在写入完成后,生成一个文件abc.tmp.1000。我们的这个案例就是为了解决这个实际情况,Flume实时扫描一个文件夹,按照正则表达式筛选目标文件(这里要剔除类似abc.tmp这样的文件)。
这里source选用Spooling Directory Source;channel选用File Channel;sink选用HDFS sink。
下面是开发的配置文件:

#Name the components of the agent:
a3.sources=r3
a3.channels=c3
a3.sinks=k3

#Describe/Define the source:
a3.sources.r3.type=spooldir
a3.sources.r3.spoolDir=/opt/data/flume/spooldir
a3.sources.r3.ignorePattern=.*\.tmp$

#Describe/Define the channel:
a3.channels.c3.type=file
a3.channels.c3.checkpointDir=/opt/modules/apache-flume-1.5.0-cdh5.3.6-bin/data/checkpointdir
a3.channels.c3.dataDirs=/opt/modules/apache-flume-1.5.0-cdh5.3.6-bin/data/datadir



# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path=hdfs://hadoop-senior01.pmpa.com:8020/flume/hive_log/%Y%m/%H
a3.sinks.k3.hdfs.fileType=DataStream
a3.sinks.k3.hdfs.writeFormat=Text
a3.sinks.k3.hdfs.batchSize=10


#如果不设置下边两项的话,flume会在hdfs目录下生成很多小文件。
#设置二级目录,按小时切割:
a3.sinks.k3.hdfs.round=true
a3.sinks.k3.hdfs.roundValue=1
a3.sinks.k3.hdfs.roundUnit=hour

#设置文件回滚条件。
a3.sinks.k3.hdfs.rollInterval=60
a3.sinks.k3.hdfs.rollSize=1024000000
a3.sinks.k3.hdfs.rollCount=0
#使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp=true
a3.sinks.k3.hdfs.minBlockReplicas=1

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

由于source的fileSuffix配置项采用默认值,所有Flume处理完的文件,都加上了.COMPLETE后缀。
测试运行结果:

$ bin/flume-ng agent -n a3 -c conf -f conf/flume-directory.properties -Dflume.root.logger=INFO,console
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,830评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,992评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,875评论 0 331
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,837评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,734评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,091评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,550评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,217评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,368评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,298评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,350评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,027评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,623评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,706评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,940评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,349评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,936评论 2 341

推荐阅读更多精彩内容

  • 背景 Flume是由Cloudera公司开源的,分布式可靠,高可用的系统,它能够将不同数据源的海量日志数据进行高...
    Bloo_m阅读 1,082评论 0 0
  • 知识要点 . 前言 在一个完整的离线大数据处理系统中,除了hdfs+mapreduce+hive组成分析系统的核心...
    JN冰阅读 4,243评论 0 0
  • Flume的功能和架构特点 ** 功能 **flume 是一个分布式的,可靠的,可用的,可以非常有效率的对大数据的...
    心_的方向阅读 2,504评论 1 10
  • 面对以上的问题,我们如何将这些日志移动到hdfs集群上尼???? 第一种方案:使用shell脚本cp 文件,然后通...
    机灵鬼鬼阅读 1,373评论 1 1
  • 夜莺2517阅读 127,706评论 1 9