本文是对大数据组件Flume的一个学习总结,共包括如下章节的内容:
- 简介
- 核心概念
- 使用场景
- 快速起步
- 小结
一、简介
Flume是一种分布式,可靠且可用的服务,用于有效地收集,聚合和移动大量日志数据。它具有基于流数据的简单灵活的架构、可靠的可靠性机制和许多故障转移和恢复机制,以及强大的容错性。它使用简单的可扩展数据模型,允许在线分析数据。很多大数据分析系统都通过flume来获取数据的输入。
Flume最早是Cloudera提供的日志收集系统,后来成为Apache下的项目,目前已经是Apache下的顶级项目。
Flume有两个版本,一个是老的Flume 0.9X版本,统称为Flume-og;另一个是经过重大重构后的版本Flume1.X,统称为Flume-ng。在本文中,我们使用的是最新的Flume-ng版本,版本号是Apache Flume 1.9.0。
Flume的官方网站是http://flume.apache.org。
Flume是用java开发的,其运行依赖java环境。Flume可以在windows和linux下运行,本文示例所用的是linux环境。
二、核心概念
Event:消息的基本单位,由header和body组成。
Agent:一个独立JVM进程,负责将一端外部来源产生的消息转 发到另一端外部的目的地。在分布式部署中,每个节点部署一个Agent。多个Agent就组成了Flume的分布式系统。
(一)Agent(代理)
Flume 运行的核心是 Agent。Flume以agent为最小的独立运行单位。一个agent就是一个JVM。它是一个完整的数据收集工具,含有三个核心组件,分别是source、 channel、 sink。通过这些组件, Event 可以从一个地方流向另一个地方,如下图示例:
上图是Agent的一个最基本的结构,一个Agent中的source、channel、sink数量可以有多个,它们可相互组合使用,比如一个source可以对应多个channel,一个channel也可对应多个sink。它们的关系在配置文件中配置。后面会举例介绍。
下面接着介绍Agent的这三个核心组件的含义.
(二)Source(源)
Source是数据的收集端,负责将数据捕获后进行特殊的格式化,将数据封装到事件(event) 里,然后将事件推入Channel中。 Flume提供了很多内置的Source, 支持 Avro, log4j, syslog 和 http post(body为json格式),可以让应用程序同已有的Source直接打交道,如AvroSource, SyslogTcpSource。 如果内置的Source无法满足需要, Flume还支持自定义Source。
(三)Channel(通道)
Channel是连接Source和Sink的组件,大家可以将它看做一个数据的缓冲区(数据队列),它可以将事件暂存到内存中也可以持久化到本地磁盘上, 直 到Sink处理完该事件。
Flume提供了多种内置的Channel类型,如MemoryChannel和FileChannel,也支持用户自定义Channel。
(四)Sink(接收器)
Sink从Channel中取出事件,然后将数据发到别处,可以向文件系统、数据库、 hadoop存数据, 也可以是其他flume agent的Source。Flume也提供了多种内置的Sink。
三、使用场景
Flume在英文中的意思是水道, 但Flume更像可以随意组装的消防水管,下面根据官方文档,展示几种Flow。
(一)多个agent顺序连接
可以将多个Agent顺序连接起来,将最初的数据源经过收集,存储到最终的存储系统中。这是最简单的情况,一般情况下,应该控制这种顺序连接的Agent 的数量,因为数据流经的路径变长了,出现故障将影响整个Flow上的Agent收集服务。
(二)多个Agent的数据汇聚到同一个Agent
这种情况应用的场景比较多,比如要收集Web网站的用户行为日志, Web网站为了可用性使用的负载集群模式,每个节点都产生用户行为日志,可以为每个节点都配置一个Agent来单独收集日志数据,然后多个Agent将数据最终汇聚到一个用来存储数据存储系统,如HDFS上。
(三)多级流
Flume还支持多级流,什么多级流?结合在云开发中的应用来举个例子,当syslog, java, nginx、 tomcat等混合在一起的日志流开始流入一个agent后,可以agent中将混杂的日志流分开,然后给每种日志建立一个自己的传输通道。
(四)load balance功能
上图Agent1是一个路由节点,负责将Channel暂存的Event均衡到对应的多个Sink组件上,而每个Sink组件分别连接到一个独立的Agent上 ,这样起到负荷分担的效果。
四、快速起步
下面我们通过一个简单的实例来演示如何使用Flume,从而加深对Flume基本概念的理解。
(一)版本安装
从flume官网下载二进制的安装文件apache-flume-1.9.0-bin.tar.gz,解压后得到的目录结构如下:
flume的可执行程序(实际是个shell脚本)位于bin目录下,文件名是flume-ng。我们可以运行flume-ng version测试下当前环境是否正常,如:
$ bin/flume-ng version
Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: d4fcab4f501d41597bc616921329a4339f73585e
Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
From source with checksum 35db629a3bda49d23e9b3690c80737f9
如果能正常执行,显示上面的类似信息,说明当前环境是正常的(比如java环境)。上面测试并不需要修改flume的任何配置文件。
(二)配置文件
启动一个flume agent需要指定一个配置文件,该配置文件用于配置该agent的相关参数信息,如source,channel,sink等设置信息。配置文件是普通的键值对格式的文本文件。在安装目录下的conf目录下有样例文件可参考。
下面我们通过一个简单的例子来说明如何设置agent的配置文件。
每个flume agent都需要指定一个名称,配置文件的前三个参数都是以如下内容开始,如:
a1.sources=<list of sources>
a1.channels=<list of channel>
a1.sinks=<list of sinks>
上面的配置信息中,a1就是要启动的flume agent的名称,a1.sources的值指定该agent的source组件,一个agent可以配置多个source,所以a1.sources的值是个列表;同样a1.channels用于指定该agent的channel信息;a1.sinks用于指定该agent的sink信息。举例:
a1.sources=s1
a1.channels=c1
a1.sinks=k1
上面信息定义了一个名为a1的flume agent,它有一个名为s1的source(源)、一个名为c1的channel(通道),以及一个名为k1的sink(接收器)。
配置了agent的三个核心组件source,channel,sink后(实际只是指定了一个名称),下面就要接着配置这三个组件的具体信息,如类型等。
下面我们先配置source信息,例子如下:
a1.sources.s1.type=netcat
a1.sources.s1.bind=0.0.0.0
a1.sources.s1.port=12345
a1.sources.s1.channels=c1
上面的信息设置源s1的类型(type参数)为netcat,这是flume内置的一种源类型,它的作用是监听来自socket中的数据,需要指定监听的ip地址和端口号,bind参数设置Ip地址,0.0.0.0表示监听任何地址,port参数设置端口号,这里设置的端口号是12345。对每个源,还需要配置其对应的channel,一个源可以对应多个channel,所以属性名是复数channels,我们这里只对应一个channel,即上面配置的通道c1。
我们再配置channel的信息,例子如下:
a1.channels.c1.type = memory
上面配置c1通道为内存通道,通道数据既可以保存在内存中,也可以在文件中,其它采用默认值。
最后配置sink信息,例子如下:
a1.sinks.k1.type = logger
a1.sinks.k1.channel= c1
上面的配置中type指定sink的类型,这里的logger是flume内置的一种sink类型,主要用于调试,它会使用log4j将所有info级别的日志记录下来,这些日志来自于配置好的通道。channel 参数用于指定sink从哪个channel获取数据。一个sink只能从一个channel获取数据。
(三)启动Agent
配置好要启动的flume agent所需要的配置文件后,就可以启动agent了,即启动一个java进程。
假设我们上面的配置文件名称为hello.conf,保存到flume解压目录的conf目录下,该配置文件中完整的内容如下:
a1.sources=s1
a1.channels=c1
a1.sinks=k1
a1.sources.s1.type=netcat
a1.sources.s1.bind=0.0.0.0
a1.sources.s1.port=12345
a1.sources.s1.channels=c1
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sinks.k1.channel= c1
下面我们执行flume-ng脚本来启动agent。命令格式如下:
$bin/flume-ng agent -n a1 -c conf -f conf/hello.conf -Dflume.root.logger=INFO,console
上面的命令,各选项的含义如下:
1)flume-ng是可执行脚本,位于bin目录下
2)agent参数表示这是启动一个agent。
3)-n 参数用于指定本agent的名称,这里是a1,这样flume就会根据这个名称到配置文件中获取相关的配置信息。
4)-c 参数用于指定log4j的配置文件所在的目录,这里是conf目录
5)-f参数用于指定flume agent配置文件的名称
6)-Dflume.root.logger参数的作用是覆盖conf/log4j.properties中的信息,让采集的信息在控制台上输出。
执行上面命令后,会打印很多信息,该flume agent会等待socket客户端的连接和数据输入,这时我们利用操作系统的nc命令来往该agent发送信息。我们打开另外一个控制台界面,输入如下信息:
$ nc localhost 12345
hello,world
OK
good job
OK
上面我们运行nc命令连接flume,然后输入两行信息。
这时我们切换到运行flume agent的控制台界面,发现控制台上输出了如下信息,这正是nc程序发出的信息,如下:
2019-03-27 11:26:30,281 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 2C 77 6F 72 6C 64 hello,world }
2019-03-27 11:26:37,165 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 67 6F 6F 64 20 6A 6F 62 good job }
上面只是一个简单的例子,从这个例子中可以看出,使用flume并不是很复杂,在规划好启动多少agent之后。后面的主要工作是设置每个agent的配置信息,最基本的就是source,channel和sink信息。
五、小结
本文是对flume组件的一个简单入门学习,主要是对flume的核心概念进行了介绍,并通过一个简单的例子演示了如何使用flume。