快乐大数据第7课 Flume日志收集系统

Flume 日志收集系统

#安装

在node01下

在hadoop用户下

cd ~/apps

在此路径下 解压后是 flume-1.8.0

ll

cd conf

再把演示用的各种conf文件上传给node01

再通过scp命令把 这个flume-1.8.0传给另外4给node

cat example.conf

sources = r1 #多个source可以用空格隔开

channels = c1

sinks = k1

sources.r1.type = netcat #绑定一个本地端口,往flume里面传输数据

sources.r1.bind = localhost

sources.r1.port = 44444

sources.r1.channels = c1 #把source和channel关联起来

channels.c1.type = memory ¥在内存中存储

channels.c1.capacity = 1000 #可以存1000个event

channels.c1.transactionCapacity = 100 #一次事务提交多少给event

sinks.k1.type =logger #以日志的形式在黑窗口中打出来

sinks.k1.channel = c1 #在哪个channel中拉取数据

启动flume

再新开一个node01

telnet localhost 44444

访问这个端口

输入 哈哈哈

#详细介绍flume的组件

一个event在一个agent中的传输处理流程如下:source--interceptor--selector->channel->sink processor--sink->中心存储/下一级

agent

#1.avro source

Avro Source:支持Avro协议,接收RPC事件请求。Avro Source通过监

听Avro端口接收外部Avro客户端流事件(event),在Flume的多层架

构中经常被使用接收上游Avro Sink发送的event

? 关键参数说明

? type:类型名称avro

? bind :绑定的IP

? port :监听的端口

? threads:接收请求的线程数,当需要接收多个avro客户端的数据流时要设置合

适的线程数,否则会造成avro客户端数据流积压

? compression-type:是否使用压缩,如果使用压缩设则值为“deflate”,avro

source一般用于多个Agent组成的数据流,接收来自avro sink的event,如果avro

source设置了压缩,name上一阶段的avro sink也要设置压缩。默认值none

? channels:Source对接的Channel名称

#2.exec source

Exec Source:支持Linux命令,收集标准输的方式监听指定文件。

Exec Source可以实现实时的消息传输,但文件的位置,不支持断点续传,当Exec Source后续增加的消息丢失,一般在测试环境使用

关键参数说明

? type :source类型为exec

? command :Linux命令

? channels :Source对接的Channel名称。

-----演示Avro Source和exec Source

在启动前 先vim avrosource.conf

修改绑定的IP为192.16.183.101

在node01中启动 bin/flume-ng agent --conf conf --conf-file conf/avrosource.conf --name avroagent -Dflume.root.logger=INFO,console

接着新开一个node01(1)的窗口

mkdir -p /home/hadoop/apps/flume/execsource/

touch exectest.log

echo 123 > exectest.log

echo 34567 >> exectest.log

启动execagent

再新建一个窗口node01(2),启动execagent

再回到node01(1)的窗口

cd /home/hadoop/apps/flume/execsource/

echo 8040 >>  exectest.log

原理是 execsource.conf 监听node01/home/hadoop/apps/flume/execsource/exectest.log中日志文件的变化,收集日志里面的数据再传给avrosource

他接收后,在黑窗口中打印出数据

----演示Spooling Directory Source

Spooling Directory Source:监听一个文件夹,收集文件夹下文件数据,收集完文件数据会将文件名称的后缀改为.COMPLETED

缺点不支持已存在文件新增数据的收集,且不能够对嵌套文件夹递归监听

关键参数说明

? type :source类型为spooldir

? spoolDir:source监听的文件夹

? fileHeader :是否添加文件的绝对路径到event的header中,默认值false

? fileHeaderKey:添加到event header中文件绝对路径的键值,默认值file

? fileSuffix:收集完新文件数据给文件添加的后缀名称,默认值:

.COMPLETED

? channels :Source对接的Channel名称

先做好预备工作

在node01下

cd ~/apps/flume

mkdir spoolDir

mkdir selector

mkdir taildir

mkdir filechannel

mkdir multiplexing

启动Spooling Directory Source

在node01的一个新窗口中 bin/flume-ng agent --conf conf --conf-file conf/spooldirsource.conf --name a1 -Dflume.root.logger=INFO,console

换一个窗口

cd ~/apps/flume/spoolDir

echo 134 > test1

echo 477 >> test1    不会响应,也不能监听子文件夹下面的数据

echo 477 >> test2  新的文件就会收集到

---演示Kafka source

先创建kafka 主题 先在node03,node04,node05上启动kafka

在node03上, cd ~/apps/kafka_2.11-0.10.2.1

bin/kafka-topics.sh --create --zookeeper 192.168.183.101:2181,192.168.183.102:2181,192.168.183.103:2181,192.168.183.104:2181,192.168.183.105:2181, --replication-factor 2 --partitions 3 --topic flumetopictest1

在node01上先vim kafkasource.conf的主机名称

然后

bin/flume-ng agent --conf conf --conf-file conf/kafkasource.conf --name kafkasourceagent -Dflume.root.logger=INFO,console

在node03上启动kafka的客户端

cd /home/hadoop/apps/kafka_2.11-0.10.2.1

bin/kafka-console-producer.sh --broker-list 192.168.183.103:9092,192.168.183.104:9092 --topic flumetopictest1

写入12345

node01上就会输出写入的结果

---演示taildir source

Taildir Source:监听一个文件夹或者文件,通过正则表达式匹配需要监听的数据源文件,Taildir Source通过将监听的文件位置写入到文件中来实现断点

续传,并且能够保证没有重复数据的读取

关键参数说明

? type:source类型TAILDIR

? positionFile:保存监听文件读取位置的文件路径

? idleTimeout:关闭空闲文件延迟时间,如果有新的记录添加到已关闭的空闲文件,taildir srouce将继续打开该空闲文件,默认值120000毫秒(2分钟)

? writePosInterval:向保存读取位置文件中写入读取文件位置的时间间隔,默认值

3000毫秒

? batchSize:批量写入channel最大event数,默认值100

? maxBackoffSleep:每次最后一次尝试没有获取到监听文件最新数据的最大延迟时间,默认值5000毫秒

先做好预备工作 在node01下

cd ~/apps/flume/taildir

mkdir test1

mkdir test2

mkdir position

在node01中启动 在flume-1.8.0文件夹下

bin/flume-ng agent --n a1 --conf conf --conf-file conf/taildirsource.conf --name taildiragent -Dflume.root.logger=INFO,console

在另一个窗口中

cd ~/apps/flume/taildir/test1

echo 123 > test.log

cd ~/apps/flume/taildir/test2

echo 4590  > file2.log

----演示filechannel

先做好预备工作 在node01下

cd  /home/hadoop/apps/flume/filechannel

mkdir data

mkdir checkpoint

mkdir backup

cd /home/hadoop/apps/flume-1.8.0

bin/flume-ng agent --conf conf --conf-file conf/filechannle.conf --name a1 -Dflume.root.logger=INFO,console

在node01的另一个窗口

telnet localhost 44444

发送12345

再开一个node01的窗口

cd /home/hadoop/apps/flume/filechannel/data 发现已经创建了文件。

cd /home/hadoop/apps/flume/filechannel/checkpoint 已经创建了检查点文件

----演示kafkachannel(首选)

存储容量更大,容错更好

关键参数说明:

? type:Kafka Channel类型org.apache.flume.channel.kafka.KafkaChannel

? kafka.bootstrap.servers:Kafka broker列表,格式为ip1:port1, ip2:port2…,建

议配置多个值提高容错能力,多个值之间用逗号隔开

? kafka.topic:topic名称,默认值“flume-channel”

? kafka.consumer.group.id:Consumer Group Id,全局唯一

? parseAsFlumeEvent:是否以Avro FlumeEvent模式写入到Kafka Channel中,

默认值true,event的header信息与event body都写入到kafka中

? pollTimeout:轮询超时时间,默认值500毫秒

? kafka.consumer.auto.offset.reset:earliest表示从最早的偏移量开始拉取,latest

表示从最新的偏移量开始拉取,none表示如果没有发现该Consumer组之前拉

取的偏移量则抛异常

首先做好预备工作

在node01上

cd /home/hadoop/apps/flume-1.8.0/conf

vim kafkachannel.conf(修改kafka broker的机器号)

在node03上

创建一个topic

cd /home/hadoop/apps/kafka_2.11-0.10.2.1

bin/kafka-topics.sh --create --zookeeper 192.168.183.101:2181 --replication-factor 1 --partitions 3 --topic flumechannel2

在node01上启动agent

cd /home/hadoop/apps/flume-1.8.0

bin/flume-ng agent --conf conf --conf-file conf/kafkachannel.conf --name a1 -Dflume.root.logger=INFO,console

在node01的一个新窗口中

telnet localhost 44444  发送数据123456789

----演示HDFSsink

在node01上,cd /home/hadoop/apps/flume-1.8.0

bin/flume-ng agent --conf conf --conf-file conf/hdfssink.conf --name a1 -Dflume.root.logger=INFO,console

在node01上

使用telnet发送数据

telnet localhost 44444    发送12345555

在node01的一个新窗口上,

hadoop fs -ls /data/flume/20180811 可以发现一个前缀为hdfssink的文件

---演示kafkasink(略)

---演示replicating seletor

cd /home/hadoop/apps/flume-1.8.0/conf

先修改 vim replicating_selector.conf  kafka的server

修好后

在node03上创建kakfka的 topic

cd /home/hadoop/apps/kafka_2.11-0.10.2.1

bin/kafka-topics.sh --create --zookeeper 192.168.183.101:2181 --replication-factor 1 --partitions 3 --topic FlumeSelectorTopic1

在node01上启动agent

cd /home/hadoop/apps/flume-1.8.0/

bin/flume-ng agent --conf conf --conf-file conf/replicating_selector.conf --name a1

新开一个node01的窗口

telnet localhost 44444 

再在一个新窗口中可以发现

cd /home/hadoop/apps/flume/selector  发现已经写入数据

在node03 启动kafka客户端监听主题

bin/kafka-console-consumer.sh --zookeeper 192.168.183.101:2181 --from-beginning --topic FlumeSelectorTopic1

回到node01的窗口

telnet localhost 44444  发送数据 78834    发送node03的kafka已经读到了

----演示Multiplexing  Channel Selector

在node01上 cd ~/apps/flume/multiplexing

mkdir k11

mkdir k22

mkdir k33

修改四个配置文件绑定的端口号

avro_sink1.conf avro_sink2.conf avro_sink3.conf multiplexing.conf

在node01上

bin/flume-ng agent --conf conf --conf-file conf/multiplexing_selector.conf --name a3 -Dflume.root.logger=INFO,console

再分别启动三个阶段的agent

在node01的一个新窗口下

cd /home/hadoop/apps/flume-1.8.0

bin/flume-ng agent --conf conf --conf-file conf/avro_sink1.conf --name agent1 >/dev/null 2>&1 &

bin/flume-ng agent --conf conf --conf-file conf/avro_sink2.conf --name agent2 >/dev/null 2>&1 &

bin/flume-ng agent --conf conf --conf-file conf/avro_sink3.conf --name agent3 >/dev/null 2>&1 &

jps后

发现的application进程就是新的agent进程

看看端口在不在

lsof -i:44444

lsof -i:44445

lsof -i:44446 发现端口正常监听

在node01的一个新窗口中

telnet localhost 44444  发送4444444444

telnet localhost 44445  发送5555555555

telnet localhost 44446  发送6666666666

查看

cd /home/hadoop/apps/flume/multiplexing/k11

cat 1533997666003-10

发现了4444444444这个数据

cd /home/hadoop/apps/flume/multiplexing/k12

cat 1533997666003-10

发现5555555555这个数据

cd /home/hadoop/apps/flume/multiplexing/k12

cat 1533997666003-10

发现6666666666这个数据

---sink processor

多个sink processor需要

负载均衡或者容错的processor

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,793评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,567评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,342评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,825评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,814评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,680评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,033评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,687评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,175评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,668评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,775评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,419评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,020评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,206评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,092评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,510评论 2 343

推荐阅读更多精彩内容

  • title: Flume构建日志采集系统date: 2018-02-03 19:45tags: [flume,k...
    溯水心生阅读 16,123评论 3 25
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,596评论 18 139
  • Linux-Server-Notes PMS /home/softwareluke/图片/2017-09-11 0...
    燕京博士阅读 559评论 0 1
  • 游走数年,有诸多成效,今方到此请吕兄和伊伯兄,再次出山定计,一切皆可,万事俱成,上光听说我打听到,两位仁兄的落落脚...
    士心武夫阅读 176评论 0 0
  • 去年的昨天,我接到了安妮的电话。 说真的,我接到她的电话时很惊讶。因为她已经很久没有和我联系过了。如果我没有记错的...
    会疼这思念阅读 484评论 20 3