Kafka因为具有优秀的并发读写性能在大数据收集传输过程中有的非常重要的作用。近期项目计划将Kafka作为数据建模分析的总线使用,但是因为项目的特殊性需求,我们在两个不同的地理位置上部署了两套Kafka队列,并且要求其中一套能把数据传递到另一套的指定队列,说简单点就是需要从一个Kafka队列里读数据,然后写到另一个Kafka队列里,读写Kafka队列的工具有很多,相比Apache Flume我们更倾向于Logstash。本文简单介绍一下何如基于Docker容器配置Logstash集成Kafka。
实验场景
假设有两个Kafka的Topic:
- 第一个Logstash实例读取实验机器的linux系统日志,将文本原样发送到Test2队列;
- 第二个Logtash实例从Test2队列读取消息,转发到Test3队列
实现用Logstash读/写队列实验。
Kafka环境准备
基于Docker搭建简单Kafka的指导请参考《基于Docker搭建分布式消息队列Kafka》
配置Logstash
基于Docker安装Logstash非常简单,不用准备一大堆环境变量什么的,直接拉镜像就行了。Logstash对Kafka的操作需要用到相应的插件,但是当前的版本默认就带这些插件,所以也不需要额外注意什么。
docker pull logstash
从日志文件到Kafka
logstash.conf
input {
file {
path => "/log/*"
}
}
output {
kafka {
topic_id => "test2"
bootstrap_servers => "192.168.2.249:9092"
}
stdout {}
}
配置很简单,我们为了方便调试,在output里加了一个stdout把日志输出到控制台,实际使用的时候最好还是去掉。这里还要注意一点,因为配置是跑在docker容器里面的,如果我们实际要监控的是宿主机的系统日志,那么在启容器的时候一定要把监控的目标路径映射到容器里,然后配置的path指向的是映射到容器里的位置。
logstash.conf所在的目录也要映射到容器里面,启动容器:
docker run -it --rm \
-v /root/config-dir:/config-dir \
-v /var/log/messages:/log/messages \
logstash -f /config-dir/logstash.conf
容器启动以后可以看到控制台上有日志输出,同时我们也可以登录到Kafka的容器里用命令查看kafka队列的写入情况:
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.249:9092 --topic test2 --from-beginning
从Kafka到Kafka
logstash2.conf 从test2队列中读取数据,写入新的队列test3:
input{
kafka{
bootstrap_servers => "192.168.2.249:9092"
client_id => "test"
group_id => "test"
auto_offset_reset => "latest"
consumer_threads => 4
decorate_events => true
topics => ["test2"]
type => "bhy"
}
}
output {
kafka {
topic_id => "test3"
bootstrap_servers => "192.168.2.249:9092"
}
stdout {}
}
- auto_offset_reset => "latest" //从最新的偏移量开始消费
- decorate_events => true //此属性会将当前topic、offset、group、partition等信息也带到message中
- topics => ["logq","loge"] //数组类型,可配置多个topic
- type => "bhy" //所有插件通用属性,尤其在input里面配置多个数据源时很有用
启动容器同样需要映射相应的目录:
docker run -it --privileged --rm \
-v /root/config-dir:/config-dir \
logstash -f /config-dir/logstash2.conf
同样的,可以用kafka命令行查看消息是否正确写入。我们也可以用Kafka的UI工具来查看,结果如下,可以看到test2和test3同时都有数据写入