一、基本工具的安装和配置
1、logstash
从官网下载:
wget https://download.elastic.co/logstash/logstash/logstash-2.3.2.tar.gz
下载完后解压:
tar -xvf logstash-2.3.2.tar.gz
改名成logstash后,移动到/usr/lcoal中去:
mv logstash /usr/local
2、zookeeper
要记得安装java8的jdk!
首先在/usr/local下建立一个kafka文件夹,用来存放kafka和zookeeper:
mkdir /usr/local/kafka
进入kafka文件夹后,下载:
wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz
下载完后解压:
tar -xvf apache-zookeeper-3.6.3-bin.tar.gz
创建软链接:
ln -s apache-zookeeper-3.6.3-bin zookeeper
进入zookeeper的conf文件夹,创建一个zoo.cfg文件,并将zoo_sample.cfg中的内容拷贝进来:
cp zoo_sample.cfg ./zoo.cfg
如图所示:
记住zookeeper的默认端口号是2181,今后会用到。
启动zookeeper:
进入zookeeper的bin文件夹中:
输入启动命令:
./zkServer.sh start
再查看状态,确认是否启动,若没有启动则restart。
3、kafka
进入/usr/local/kafka文件夹。
下载kafka:
wget https://mirrors.bfsu.edu.cn/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz
解压:
tar -xvf kafka_2.13-2.8.0.tgz
创建软链接:
ln -s kafka_2.13-2.8.0 kafka
配置server.properties
vim kafka/config/server.properties
有两个需要注意的点:
zookeeper的连接按照实际情况配置。
二、启动相关工具,并完成输出
1、启动kafka,并创建一个topic
启动kafka:
bin/kafka-server-start.sh -daemon config/server.properties
创建topic:
bin/kafka-topics.sh --create --topic suricata-http-log --replication-factor 1 --partitions 1 --zookeeper localhost:2181
其中加粗的两项是需要自行修改的:
其中topic-id是suricata-http-log,而zookeeper的主机和端口则为localhost:2181。
2、创建logstash的输入输出配置文件
位置随意,如我个人文件夹中:
vim /home/canvas/Test/logstash-kafka.conf
conf文件就创建了。
在其中写入的内容如下:
input这里指明了我想将其输出的源文件eve-http.json和源文件的所在路径。
output这里指明了我们要输出往kafka。
注:这里本来指明了codec为json,但是后来输出时发现格式存在问题,所以就将其注释了。可能问题在于我的源文件本身就是json格式。
3、kafka启动消费者
虽然这个案例中不需要启动生产者,但还是在这里把启动producer的指令给打出来:
启动生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --sync --topic suricata-http-log
启动消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic suricata-http-log --from-beginning
在生产者界面输入字符串,回车确认后,消费者端也会有相同输出。
4、启动logstash
logstash/bin/logstash -f /home/canvas/Test/logstash-kafka.conf
有如下提示则说明成功启动:
启动成功后,当我们在/home/canvas/Test/logstash-kafka.conf中写入的input的file路径中的那个日志文件eve-http.json中有新日志产生时,kafka的消费者端,也会同步输出:
红线部分是kafka新增加的标记,日志文件中更新的内容在红线后的部分,跟日志源文件中的内容一致。
这样就完成我们的任务了。