这几天有一个新需求,就是通过logstash来处理我们的日志系统,但是由于之前文本日志中抓取日志的数据量太大,也有太多的无用记录,所以提出通过kafka来过滤我们的日志,不同的子系统根据需要订阅相应的时间,然后通过logstash来将kafka中的消息传送到es中方便处理。
以上背景,但是比较特殊,我们系统所用的kafka版本是0.9,logstash中的正式版本中并没有直接针对kafka0.9版本的插件,logstash2.4可以使用kafka0.8,而logstash5.0就直接跳到了kafka0.10,所以导致logstash不能正确读取kafka的消息。
经过研究,只能通过更换logstash-input-kafka插件的版本来解决这个问题,具体更换方法如下:
- 在
https://rubygems.org/gems
这个网站中下载对应的logstash-input-kafka版本,kafka0.9需要下载4.X版本的插件,我下载的是logstash-input-kafka-4.2.0.gem这个版本。 - logstash我使用的是5.0版本的,首先在logstash下的bin目录中执行一下命令
a. logstash-plugin remove logstash-input-kafka #这个命令是用来移除logstash本身自带的logstash-input-kafka插件。
b. logstash-plugin install ../plugin/logstash-input-kafka-4.2.0.gem #安装指定的插件 - 安装完成即可进行测试。