Flume的介绍和简单操作

Flume是什么

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

Flume的功能

  • 支持在日志系统中定制各类数据发送方,用于收集数据
  • 提供对数据简单处理,并写到各类数据接收方(可定制)的能力

Flume的组成

  • Agent:核心组件
    • source 负责数据的产生或搜集
    • channel 是一种短暂的存储容器,负责数据的存储持久化
    • sink 负责数据的转发

Flume的工作流示意图

  • 数据流模型


    image
  • 多Agent模型


    image
  • 合并模型


    image
  • 混合模型


    image

Flume的安装

下载安装包并解压

wget http://www.apache.org/dyn/closer.lua/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz
tar -zxvf apache-flume-1.8.0-bin.tar.gz

配置环境变量

vim ~/.bashrc

export FLUME_HOME=/usr/local/src/apache-flume-1.8.0-bin
export PATH=$PATH:$FLUME_HOME/bin

source ~/.bashrc

Flume简单操作

  • netcat模式
    进入conf目录下编写netcat.conf文件,内容如下:
agent.sources = netcatSource
agent.channels = memoryChannel
agent.sinks = loggerSink

agent.sources.netcatSource.type = netcat
agent.sources.netcatSource.bind = localhost
agent.sources.netcatSource.port = 11111
agent.sources.netcatSource.channels = memoryChannel

agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memoryChannel

agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 100
agent.channels.memoryChannel.transactionCapacity = 10

启动一个实例

(py27) [root@master conf]# pwd
/usr/local/src/apache-flume-1.8.0-bin/conf
(py27) [root@master conf]# flume-ng agent --conf conf --conf-file ./netcat.conf --name agent -Dflume.root.logger=INFO,console

启动成功

18/10/24 11:26:35 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
18/10/24 11:26:35 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:./flume_netcat.conf
18/10/24 11:26:35 INFO conf.FlumeConfiguration: Processing:loggerSink
18/10/24 11:26:35 INFO conf.FlumeConfiguration: Processing:loggerSink
18/10/24 11:26:35 INFO conf.FlumeConfiguration: Added sinks: loggerSink Agent: agent
18/10/24 11:26:35 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [agent]
18/10/24 11:26:35 INFO node.AbstractConfigurationProvider: Creating channels
18/10/24 11:26:35 INFO channel.DefaultChannelFactory: Creating instance of channel memoryChannel type memory
18/10/24 11:26:35 INFO node.AbstractConfigurationProvider: Created channel memoryChannel
18/10/24 11:26:35 INFO source.DefaultSourceFactory: Creating instance of source netcatSource, type netcat
18/10/24 11:26:35 INFO sink.DefaultSinkFactory: Creating instance of sink: loggerSink, type: logger
18/10/24 11:26:35 INFO node.AbstractConfigurationProvider: Channel memoryChannel connected to [netcatSource, loggerSink]
18/10/24 11:26:35 INFO node.Application: Starting new configuration:{ sourceRunners:{netcatSource=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:netcatSource,state:IDLE} }} sinkRunners:{loggerSink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@262b92ac counterGroup:{ name:null counters:{} } }} channels:{memoryChannel=org.apache.flume.channel.MemoryChannel{name: memoryChannel}} }
18/10/24 11:26:35 INFO node.Application: Starting Channel memoryChannel
18/10/24 11:26:35 INFO node.Application: Waiting for channel: memoryChannel to start. Sleeping for 500 ms
18/10/24 11:26:36 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: memoryChannel: Successfully registered new MBean.
18/10/24 11:26:36 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: memoryChannel started
18/10/24 11:26:36 INFO node.Application: Starting Sink loggerSink
18/10/24 11:26:36 INFO node.Application: Starting Source netcatSource
18/10/24 11:26:36 INFO source.NetcatSource: Source starting
18/10/24 11:26:36 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/172.16.155.120:11111]

然后新开一个终端,发送数据

(py27) [root@master apache-flume-1.8.0-bin]# telnet localhost 11111
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
1
OK

查看接收数据

18/10/24 11:30:15 INFO sink.LoggerSink: Event: { headers:{} body: 31 0D                                           1. }

注:如果没有telnet工具,请先安装:yum install telnet

  • Exec模式
    编写配置文件exec.conf
agent.sources = netcatSource
agent.channels = memoryChannel
agent.sinks = loggerSink

agent.sources.netcatSource.type = exec 
agent.sources.netcatSource.command = tail -f /home/master/FlumeTest/test_data/exec.log
agent.sources.netcatSource.channels = memoryChannel

agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memoryChannel

agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 100
agent.channels.memoryChannel.transactionCapacity = 10

启动实例

(py27) [root@master conf]# flume-ng agent --conf conf --conf-file ./flume_exec.conf --name agent -Dflume.root.logger=INFO,console

启动成功后,创建配置文件中的exec.log文件

(py27) [root@master test_data]# ls
exec.log
(py27) [root@master test_data]# pwd
/home/master/FlumeTest/test_data
(py27) [root@master test_data]#

然后通过echo命令模拟日志的产生

(py27) [root@master test_data]# echo 'Hello World!!!' >> exec.log

查看接收的日志

18/10/25 09:19:52 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 57 6F 72 6C 64 21 21 21       Hello World!!! }

如何将日志保存到HDFS上
修改配置文件

agent.sources = netcatSource
agent.channels = memoryChannel
agent.sinks = loggerSink

agent.sources.netcatSource.type = exec 
agent.sources.netcatSource.command = tail -f /home/master/FlumeTest/test_data/exec.log
agent.sources.netcatSource.channels = memoryChannel

agent.sinks.loggerSink.type = hdfs 
agent.sinks.loggerSink.hdfs.path = /flume/%y-%m-%d/%H%M/
agent.sinks.loggerSink.hdfs.filePrefix = exec_hdfs_
agent.sinks.loggerSink.hdfs.round = true
agent.sinks.loggerSink.hdfs.roundValue = 1
agent.sinks.loggerSink.hdfs.roundUnit = minute
agent.sinks.loggerSink.hdfs.rollInterval = 3
agent.sinks.loggerSink.hdfs.rollSize = 20
agent.sinks.loggerSink.hdfs.rollCount = 5
agent.sinks.loggerSink.hdfs.useLocalTimeStamp = true
agent.sinks.loggerSink.hdfs.fileType = DataStream
agent.sinks.loggerSink.channel = memoryChannel

agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 100
agent.channels.memoryChannel.transactionCapacity = 10

然后启动实例

(py27) [root@master conf]# flume-ng agent --conf conf --conf-file ./flume_exec_hdfs.conf --name agent -Dflume.root.logger=INFO,console

然后可以看到它把exec.log文件里的日志给写到了HDFS上

18/10/25 09:54:26 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
18/10/25 09:54:26 INFO hdfs.BucketWriter: Creating /flume/18-10-25/0954//exec_hdfs_.1540475666623.tmp
18/10/25 09:54:32 INFO hdfs.BucketWriter: Closing /flume/18-10-25/0954//exec_hdfs_.1540475666623.tmp
18/10/25 09:54:32 INFO hdfs.BucketWriter: Renaming /flume/18-10-25/0954/exec_hdfs_.1540475666623.tmp to /flume/18-10-25/0954/exec_hdfs_.1540475666623
18/10/25 09:54:32 INFO hdfs.HDFSEventSink: Writer callback called.

我们进入HDFS查看,可以看到log里的内容

(py27) [root@master sbin]# hadoop fs -ls /flume/18-10-25/0954
Found 1 items
-rw-r--r--   3 root supergroup         15 2018-10-25 09:54 /flume/18-10-25/0954/exec_hdfs_.1540475666623
(py27) [root@master sbin]# hadoop fs -text /flume/18-10-25/0954/exec_hdfs_.1540475666623
Hello World!!!

然后我们再次写入写的log,然后再查看

//写入新的log
(py27) [root@master test_data]# echo 'test001' >> exec.log               
(py27) [root@master test_data]# echo 'test002' >> exec.log
//进入HDFS目录查看
(py27) [root@master sbin]# hadoop fs -ls /flume/18-10-25
Found 2 items
drwxr-xr-x   - root supergroup          0 2018-10-25 09:54 /flume/18-10-25/0954
drwxr-xr-x   - root supergroup          0 2018-10-25 09:56 /flume/18-10-25/0956
(py27) [root@master sbin]# hadoop fs -ls /flume/18-10-25/0956
Found 1 items
-rw-r--r--   3 root supergroup         16 2018-10-25 09:56 /flume/18-10-25/0956/exec_hdfs_.1540475766338
(py27) [root@master sbin]# hadoop fs -text /flume/18-10-25/0956/exec_hdfs_.1540475766338
test001
test002
  • 故障转移实例
    首先需要三台机器,master、slave1、slave2,然后分别配置实例并启动,master上的agent实例发送日志,slave1和slave2接收日志
    master配置
agent.sources = netcatSource
agent.channels = memoryChannel
agent.sinks = loggerSink1 loggerSink2

agent.sinkgroups = group

agent.sources.netcatSource.type = exec
agent.sources.netcatSource.command = tail -f /home/master/FlumeTest/test_data/exec.log
agent.sources.netcatSource.channels = memoryChannel

agent.sinks.loggerSink1.type = avro
agent.sinks.loggerSink1.hostname = slave1
agent.sinks.loggerSink1.port = 52020
agent.sinks.loggerSink1.channel = memoryChannel

agent.sinks.loggerSink2.type = avro
agent.sinks.loggerSink2.hostname = slave2
agent.sinks.loggerSink2.port = 52020
agent.sinks.loggerSink2.channel = memoryChannel

agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 10000
agent.channels.memoryChannel.transactionCapacity = 1000

agent.sinkgroups.group.sinks = loggerSink1 loggerSink2


agent.sinkgroups.group.processor.type = failover
agent.sinkgroups.group.processor.loggerSink1 = 10
agent.sinkgroups.group.processor.loggerSink2 = 1
agent.sinkgroups.group.processor.maxpenalty = 10000

slave1配置

agent.sources = netcatSource
agent.channels = memoryChannel
agent.sinks = loggerSink

agent.sources.netcatSource.type = avro
agent.sources.netcatSource.bind = slave1
agent.sources.netcatSource.port = 52020
agent.sources.netcatSource.channels = memoryChannel

agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memoryChannel

agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 10000
agent.channels.memoryChannel.transactionCapacity = 1000

slave2配置

agent.sources = netcatSource
agent.channels = memoryChannel
agent.sinks = loggerSink

agent.sources.netcatSource.type = avro
agent.sources.netcatSource.bind = slave2
agent.sources.netcatSource.port = 52020
agent.sources.netcatSource.channels = memoryChannel

agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memoryChannel

agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 10000
agent.channels.memoryChannel.transactionCapacity = 1000

分别启动master、slave1、slave2的agent,然后在mater上写入日志,然后观察谁收到了

//master
(py27) [root@master test_data]# echo 'hello' >> exec.log  
//slave1
18/10/25 10:53:53 INFO sink.LoggerSink: Event: { headers:{} body: 68 65 6C 6C 6F                                  hello }
//slave2
18/10/25 10:43:00 INFO ipc.NettyServer: [id: 0x8da012e3, /172.16.155.120:39726 => /172.16.155.122:52020] CONNECTED: /172.16.155.120:39726                             

发现是slave1收到数据,然后我们把slave1的agent关掉,再次发送日志

//master
(py27) [root@master test_data]# echo '11111' >> exec.log      
//slave2
18/10/25 10:43:00 INFO ipc.NettyServer: [id: 0x8da012e3, /172.16.155.120:39726 => /172.16.155.122:52020] CONNECTED: /172.16.155.120:39726
18/10/25 10:56:53 INFO sink.LoggerSink: Event: { headers:{} body: 31 31 31 31 31                                  11111 }

然后再次启动slave1的agent

//master
(py27) [root@master test_data]# echo '22222' >> exec.log      
//slave1
18/10/25 10:58:21 INFO sink.LoggerSink: Event: { headers:{} body: 32 32 32 32 32                                  22222 }
//slave2
18/10/25 10:43:00 INFO ipc.NettyServer: [id: 0x8da012e3, /172.16.155.120:39726 => /172.16.155.122:52020] CONNECTED: /172.16.155.120:39726
18/10/25 10:56:53 INFO sink.LoggerSink: Event: { headers:{} body: 31 31 31 31 31                                  11111 }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,590评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,808评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,151评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,779评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,773评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,656评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,022评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,678评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,038评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,756评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,411评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,005评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,973评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,053评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,495评论 2 343

推荐阅读更多精彩内容

  • 博客原文 翻译作品,水平有限,如有错误,烦请留言指正。原文请见 官网英文文档 引言 概述 Apache Flume...
    rabbitGYK阅读 11,437评论 13 34
  • 介绍 概述 Apache Flume是为有效收集聚合和移动大量来自不同源到中心数据存储而设计的可分布,可靠的,可用...
    ximengchj阅读 3,513评论 0 13
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,596评论 18 139
  • 1. Flume简介 Apache Flume是一个分布式的、可靠的、可用的,从多种不同的源收集、聚集、移动大量日...
    奉先阅读 4,459评论 2 5
  • 一、Flume简介 flume 是一个cloudera提供的 高可用高可靠,分布式的海量日志收集聚合传输系统。原名...
    w1992wishes阅读 1,884评论 0 7