【Flume】Flume 简单理解及使用实例

一、Flume简介

flume 是一个cloudera提供的 高可用高可靠,分布式的海量日志收集聚合传输系统。原名是 Flume OG (original generation),但随着 FLume 功能的扩展,Flume OG 代码工程臃肿、核心组件设计不合理、核心配置不标准等缺点暴露出来,尤其是在 Flume OG 的最后一个发行版本 0.94.0 中,日志传输不稳定的现象尤为严重,为了解决这些问题,2011 年 10 月 22 号,cloudera 完成了 Flume-728,对 Flume 进行了里程碑式的改动:重构核心组件、核心配置以及代码架构,重构后的版本统称为 Flume NG(next generation,改动的另一原因是将 Flume 纳入 apache 旗下,cloudera Flume 改名为 Apache Flume)。

二、Flume OG 到 Flume NG

FLUM OG:

  • FLUM OG 有三种角色的节点:代理节点(agent)、收集节点(collector)、主节点(master)。
  • agent 从各个数据源收集日志数据,将收集到的数据集中到 Collector,然后由收集节点汇总存入 HDFS。master 负责管理 agent,collector 的活动。
  • agent、collector 都称为 node,node 的角色根据配置的不同分为 logical node(逻辑节点)、physical node(物理节点)。
  • agent、collector 由 source、sink 组成,代表在当前节点数据是从 source 传送到 sink。

窃取网上一张图用以说明:

Flume NG 取消了集中管理配置的 Master 和 Zookeeper,变为一个纯粹的传输工具。Flume NG 另一个主要的不同点是读入数据和写出数据现在由不同的工作线程处理(称为Runner)。在 Flume OG 中,读入线程同样做写出工作(除了故障重试),如果写出慢的话(不是完全失败),它将阻塞 Flume 接收数据的能力。这种异步的设计使读入线程可以顺畅的工作而无需关注下游的任何问题。

FLUME NG:

  • NG 只有一种角色的节点:代理节点(agent)。
  • 没有 collector、master 节点,这是核心组件最核心的变化。
  • 去除了 physical nodes、logical nodes 的概念和相关内容。
  • agent 节点的组成也发生了变化。Flume NG 的 agent 由 source、sink、Channel 组成。

flume ng 节点组成图:

多 Agent 并联下的架构图:

三、Flume 的特性

flume 支持在日志系统中定制各类数据发送方,用于收集数据;同时支持对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力 。

flume 的数据流由事件(Event)贯穿始终。事件是 Flume 的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些 Event 由 Agent 外部的 Source 生成,当 Source 捕获事件后会进行特定的格式化,然后 Source 会把事件推入(单个或多个) Channel 中。可以把 Channel 看作是一个缓冲区,它将保存事件直到 Sink 处理完该事件。

Sink 负责持久化日志或者把事件推向另一个 Source。

flume 具备高可靠性:

当节点出现故障时,日志能够被传送到其他节点上而不会丢失。Flume提供了三种级别的可靠性保障,从强到弱依次分别为:

  • end-to-end:收到数据agent首先将event写到磁盘上,当数据传送成功后,再删除;如果数据发送失败,可以重新发送。
  • Store on failure:这也是scribe采用的策略,当数据接收方crash崩溃时,将数据写到本地,待恢复后,继续发送。
  • Best effort:数据发送到接收方后,不会进行确认。

四、Flume 架构组成和核心概念

  • client: 生产数据的地方,运行在一个独立的线程。
  • event: 生产的数据,可以是日志记录、 avro 对象等,如果是文本文件通常是一行记录。
  • Flow: Event从源点到达目的点的迁移的抽象。
  • agent: flume 核心组件,flume 以 Agent 为最小的独立运行单位。一个 agent 就是一个 jvm, agent 又是由 source, channel, sink 等构建而成。

agent 由 source, channel, sink 等构建而成:

4.1 Source:从 Client 收集数据,传递给 Channel

不同的 source,可以接受不同的数据格式,比如监视外部源--目录池(spooling directory)数据源,可以监控指定文件夹中的新文件变化,如果目录中有文件产生,就会立刻读取其内容。 source 组件可以处理各种格式的日志数据,eg:avro Sources、thrift Sources、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定义。

Source类型                   说明 

Avro Source                  支持Avro协议(实际上是Avro RPC),内置支持| 
Thrift Source                支持Thrift协议,内置支持
Exec Source                  基于Unix的command在标准输出上生产数据
JMS Source                   从JMS系统(消息、主题)中读取数据
Spooling Directory Source    监控指定目录内数据变更
Twitter 1% firehose Source   通过API持续下载Twitter数据,试验性质
Netcat Source                监控某个端口,将流经端口的每一个文本行数据作为Event输入
Sequence Generator Source    序列生成器数据源,生产序列数据
Syslog Sources               读取syslog数据,产生Event,支持UDP和TCP两种协议
HTTP Source                  基于HTTP POST或GET方式的数据源,支持JSON、BLOB表示形式
Legacy Sources               兼容老的Flume OG中Source(0.9.x版本)

详细参考官网:http://flume.apache.org/FlumeUserGuide.html#flume-sources

4.2、Channel:连接 sources 和 sinks

有点像一个队列,是一个存储池,接收 source 的输出,直到有 sink 消费掉 channel 中的数据,channel 中的数据直到进入下一个 channel 或者进入 sink 才会被删除,当 sink 写入失败后,可以自动重启,不会造成数据丢失。临时存放的数据可以存放在memory Channel、jdbc Channel、file Channel、自定义。

Channel类型                   说明

Memory Channel                Event数据存储在内存中
JDBC Channel                  Event数据存储在持久化存储中,当前Flume Channel内置支持Derby
File Channel                  Event数据存储在磁盘文件中
Spillable Memory Channel      Event数据存储在内存中和磁盘上,当内存队列满了,会持久化到磁盘文件
Pseudo Transaction Channel    测试用途
Custom Channel                自定义Channel实现

详细参考官网:http://flume.apache.org/FlumeUserGuide.html#flume-channels

4.3、Sink:从Channel收集数据,运行在一个独立线程

用于把数据发送到目的地的组件目的地包括 hdfs、logger、avro、thrift、ipc、file、null、hbase、solr、自定义。

详细参考官网:http://flume.apache.org/FlumeUserGuide.html#flume-sinks

flume可以支持:

  • 多级 flume的 agent,(即多个 flume 可以连成串,上一个 flume 可以把数据写到下一个 flume 上)
  • 支持扇入(fan-in):source 可以接受多个输入
  • 扇出(fan-out):sink 可以输出到多个目的地

五、Flume 使用实例

模拟使用 Flume 监听日志变化,并且把增量的日志文件写入到 hdfs 中。

根据需求,首先定义一下3大要素:

  • Source:监控日志文件内容更新,Exec Source(tail -f "file")
  • Sink:HDFS文件系统,hdfs sink
  • Channel:Source和sink之间的传递通道,可用 file channel 也可以用 Memory channel

5.1、编写配置文件

确定好了 Source/Sink/Channel 之后,开始编写配置文件。

gedit exec_tail.conf

内容如下:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
## exec表示flume调用给的命令,然后从给的命令的结果中去拿数据
a1.sources.r1.type = exec
## 使用tail这个命令来读数据
a1.sources.r1.command = tail -F /home/hadoop/testdir/flumedata/test.log
a1.sources.r1.channels = c1

# Describe the sink
## 表示下沉到hdfs,类型决定了下面的参数
a1.sinks.k1.type = hdfs
## sinks.k1只能连接一个channel,source可以配置多个
a1.sinks.k1.channel = c1
## 下面的配置告诉用hdfs去写文件的时候写到什么位置,下面的表示不是写死的,而是可以动态的变化的。表示输出的目录名称是可变的
a1.sinks.k1.hdfs.path = hdfs://master:9000/flume/tailout/%y-%m-%d/%H%M/
##表示最后的文件的前缀
a1.sinks.k1.hdfs.filePrefix = events-
## 表示到了需要触发的时间时,是否要更新文件夹,true:表示要
a1.sinks.k1.hdfs.round = true
## 表示每隔1分钟改变一次
a1.sinks.k1.hdfs.roundValue = 1
## 切换文件的时候的时间单位是分钟
a1.sinks.k1.hdfs.roundUnit = minute
## 表示只要过了3秒钟,就切换生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 3
## 如果记录的文件大于20字节时切换一次
a1.sinks.k1.hdfs.rollSize = 20
## 当写了5个事件时触发
a1.sinks.k1.hdfs.rollCount = 5
## 收到了多少条消息往dfs中追加内容
a1.sinks.k1.hdfs.batchSize = 10
## 使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件类型,默认是Sequencefile,可用DataStream:为普通文本
a1.sinks.k1.hdfs.fileType = DataStream

# Use a channel which buffers events in memory
##使用内存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

5.2、启动 flume 监控

cd /opt/flume-1.8.0
bin/flume-ng agent -c conf -f conf/exec_tail.conf -n a1 -Dflume.root.logger=INFO,console

命令参数解释:

  • agent:运行一个 Flume Agent
  • --conf / -c <conf>:在<conf>目录使用配置文件,指定配置文件放在什么目录
  • --conf-file / -f <file>:指定配置文件,这个配置文件必须在全局选项的 --conf 参数定义的目录下
  • --name / -n <name>: Agent 的名称,同配置文件中的相对应

5.3、模拟日志

hadoop@master:/opt/flume-1.8.0$ cd /home/hadoop/testdir/flumedata/

hadoop@master:~/testdir/flumedata$ while true
> do
> date >> test.log
> sleep 2
> done

查看日志变化:

hadoop@master:~/testdir/flumedata$ tail -f test.log 
Mon Aug 20 20:09:31 PDT 2018
Mon Aug 20 20:09:33 PDT 2018
Mon Aug 20 20:09:35 PDT 2018
Mon Aug 20 20:09:37 PDT 2018
Mon Aug 20 20:09:39 PDT 2018
Mon Aug 20 20:09:41 PDT 2018
Mon Aug 20 20:09:43 PDT 2018
Mon Aug 20 20:09:45 PDT 2018

通过 tail 命令,可以看到test.log在不停的追加数据。

到hdfs中进行查看,效果如下:

也可以到页面查看:

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,602评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,442评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,878评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,306评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,330评论 5 373
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,071评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,382评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,006评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,512评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,965评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,094评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,732评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,283评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,286评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,512评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,536评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,828评论 2 345

推荐阅读更多精彩内容