前言:
logstash是什么?为什么需要logstash?援引官网的介绍:Logstash 是开源的服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的 “存储库” 中。(我们的存储库当然是 Elasticsearch。)
简单来说,logstash就是用于将不同来源的数据(如JDBC、http网络、kafka、log4j日志等)统一管理,输入到Elasticsearch中,以作为整个搜索引擎的数据源。例如:存储在Mysql中 的数据,可以用JDBC来导入到Elasticsearch中(需要安装logstash-input-jdbc插件),还可以设置定时任务,支持增量导入等。
本文主要介绍logstash-input-jdbc插件的安装,以及简单的.conf配置文件,实现执行定时任务,从Mysql增量导入数据到Elasticsearch的简单效果。
1.安装logstash-input-jdbc
首先,在安装好Elasticsearch的环境中,装logstash,下载按照官网的步骤来就可以。logstash是一个数据导入工具,支持多种数据源导入到Elasticsearch中,导入不同的数据源需要用到不同的插件,要想导入Mysql的数据到Elasticsearch,需要安装插件——logstash-input-jdbc。
Elasticsearch、logstash下载地址:https://www.elastic.co/downloads
插件github地址:logstash-plugins/logstash-input-jdbc
安装此插件以及依赖需要在ruby环境下,ruby默认的镜像在国外,所以很慢,需要手动将镜像网站https://rubygems.org/替换为国内的https://gems.ruby-china.com/
具体步骤如下:(MacOS系统)
1.1安装gem
Gem是一个管理Ruby库和程序的标准包,MacOS系统自带gem环境,可以用gem -v查看版本,gems.ruby-china建议gem版本最好>2.6.x,如果版本较低,可以用 gem update --system更新
gem -v
gem update --system
1.2给gem更换网站源
添加源https://gems.ruby-china.com/,删除旧源https://rubygems.org/
$ gem sources --add https://gems.ruby-china.com/ --remove https://rubygems.org/
$ gem sources -l
gem sources -l命令的作用是查看当前gem的源,在替换成功后,应该显示如下:
*** CURRENT SOURCES ***
https://gems.ruby-china.com/
确保只有 http://gems.ruby-china.com,即替换成功。
1.3使用 Bundler 的Gem 源代码镜像命令或修改Gemfile
$ bundle config mirror.https://rubygems.org https://gems.ruby-china.com
使用bundle命令替换掉镜像配置后即可执行第4步,插件安装,本人用此方法没成功,用的是第二种——修改Gemfile。修改Gemfile需要对logstash安装文件目录下的两处文件——Gemfile和Gemfile.lock进行修改,Mac下直接用记事本打开修改就OK,服务器环境可以用vim修改:
cd /usr/local/logstash-6.4.2
vim Gemfile
#修改第1行出现的source,将source "https://rubygems.org",改为source "https://gems.ruby-china.com/"
vim Gemfile.lock
#修改GEM下的remote,将remote: https://rubygems.org,改为remote: https://gems.ruby-china.com/
1.4安装logstash-input-jdbc插件
执行完上面3步后,终于可以安装插件了,启动logstash的bin目录下的logstash-plugin命令,执行安装。
bin/logstash-plugin install logstash-input-jdbc
安装完成不报错就应该是成功了,可用bin/logstash-plugin list查看所有已安装插件,加上--verbose参数可以显示插件版本号。
bin/logstash-plugin list
bin/logstash-plugin list --verbose
2.配置logstash的conf文件
logstash-input-jdbc插件安装完成后,就可以配置.conf文件导入mysql的数据了,详细请戳官方文档,此处以我的配置文件为例:
文件名:logstash-mysql.conf,路径/usr/local/logstash-6.4.2/logstash-mysql.conf
input {
jdbc {
#jar包可以存放在任意路径
jdbc_driver_library => "/usr/local/logstash-6.4.2/mysql-connector-java-5.1.36.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/DB_NAME"
jdbc_user => "mysqlUser"
jdbc_password => "userPass"
#schedule设置每分钟执行
schedule => "* * * * *"
statement => "SELECT * FROM blog_article WHERE blog_article.edit_date >= :sql_last_value"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "articles"
#index表示存入elasticsearch中的索引名字
}
stdout { codec => json_lines }
}
mysql-connector-java这个jar包下载: https://mvnrepository.com/artifact/mysql/mysql-connector-java
可以看到,在logstash的conf文件中可以配置schedule,即定时任务;配置statement,即定时执行的SQL语句,此处:
statement => "SELECT * FROM TABLE_NAME WHERE TABLE_NAME.edit_date >= :sql_last_value"
例中,blog_article表中edit_date列记录了数据编辑时间,当有新增的文章或者有修改的文章时,这些记录的edit_date时间>上次sql执行的时间sql_last_value,于是这些内容就是要导入到Es中的所谓【增量】。
3.启动Logstash导入Mysql中的数据
配置好.conf文件后,启动logstash时即可直接根据conf文件启动,从而实现数据的导入。启动命令如下:
/usr/local/logstash-6.4.2/bin/logstash -f /usr/local/logstash-6.4.2/logstash-mysql.conf