前言:测试环境
LINUX:centos6.5
FLUME:1.6.0-cdh5.7.0
KAFKA:2.12-0.11.0
一、定义Kafka所使用的Topic
启动Kafka:
$ kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
创建Topic:
$ kafka-topics.sh --create --zookeeper hadoopmaster:2181 --replication-factor 1 --partitions 1 --topic test_topic
查看Topic是否创建成功:
$ kafka-topics.sh --list --zookeeper hadoopmaster:2181
二、定义Flume Agent的类型
1、配置Agent1,类型为exec-memory-avro(监听某个文档是否有文档生成)
#定义agent name,source,sinks,channels的对应名称
exec-memory-avro.sources=exec-source
exec-memory-avro.sinks=avro-sink
exec-memory-avro.channels=memory-channel
#定义source为exec,监听目录及方式
exec-memory-avro.sources.exec-source.type=exec
exec-memory-avro.sources.exec-source.command=tail -F /home/hadoop/data/test.log
exec-memory-avro.sources.exec-source.shell=/bin/sh -c
#定义sinks为avro,对应的主机名及端口
exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname = hadoopmaster
exec-memory-avro.sinks.avro-sink.port = 22222
#定义channel为memory方式,统一source和sink的channel
exec-memory-avro.channels.memory-channel.type = memory
exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel
2、配置Agent2,类型为avro-memory-kafka(监听某个文档是否有文档生成)
#同上,接收agent1 avro sink的消息
avro-memory-kafka.sources=avro-source
avro-memory-kafka.sinks=kafka-sink
avro-memory-kafka.channels=memory-channel
avro-memory-kafka.sources.avro-source.type=avro
avro-memory-kafka.sources.avro-source.bind=hadoopmaster
avro-memory-kafka.sources.avro-source.port=22222
#Kafka sinks配置,详情请参考flume官方文档“Kafka Sink”
avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafka-sink.brokerList = hadoopmaster:9092
avro-memory-kafka.sinks.kafka-sink.topic = test_topic
avro-memory-kafka.sinks.kafka-sink.batchSize = 5 ##当消息量多少条才处理
avro-memory-kafka.sinks.kafka-sink.requiredAcks = 1
avro-memory-kafka.channels.memory-channel.type = memory
avro-memory-kafka.sources.avro-source.channels = memory-channel
avro-memory-kafka.sinks.kafka-sink.channel = memory-channel
三、Flume与Kafka启动
1、先启动agent2(avro-memory-kafka)
$ flume-ng agent --name avro-memory-kafka --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/avro-memory-kafka -Dflume.root.logger = INFO,console
2、启动agent1(exec-memory-avro--flume)
$ flume-ng agent --name exec-memory-avro --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/exec-memory-avro.conf -Dflume.root.logger = INFO,console
3、启动Kafka Consumer
$ kafka-console-consumer.sh --zookeeper hadoopmaster:2181 --topic test_topic
四、在监听目录上进行测试
$ echo test1 >> test.log
$ echo test2 >> test.log
$ echo test3 >> test.log
$ echo test4 >> test.log
$ echo test5 >> test.log
$ echo test6 >> test.log
若启动的Kafka Consumer控制台上,有对应消息输出,则测试成功。