1. 安装 ElasticSearch
首先需要安装 ElasticSearch(7.9.2) 和 Logstash(7.9.3) 环境。
ElasticSearch 官网下载解压即可。es需要使用其他用户组权限执行,添加组添加用户chown即可
su es
./bin/elasticsearch
curl http://127.0.0.1:9200
2.安装 Logstash
官网下载Logstash,还需下载mysql-connector-java-8.0.21.jar(从数据库读取数据)
tar xvf logstash-7.9.3.tar.gz
cd logstash-7.9.3/bin
./logstash -e
有些版本需要安装logstash-input-jdbc插件 ./logstash-plugin install logstash-plugin
,我们使用7.9.3内置该插件省略安装
mysql-connector-java-8.0.21.jar 需要放在/usr/local/logstash-7.9.3/logstash-core/lib/jars
目录下
3.准备工作已经好了需要手动创建几个文件jdbc.conf ,jdbc.sql便于执行脚本
方便测试起见可以在logstash-7.9.3/bin/
下创建
jdbc.conf内容如下:
input {
stdin {
}
jdbc {
# 连接的数据库地址和哪一个数据库,指定编码格式,禁用SSL协议,设定自动重连
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/testa?characterEncoding=UTF-8&useSSL=false&autoReconnect=true"
jdbc_user => "root"
jdbc_password => "root"
# 下载连接数据库的驱动包,建议使用绝对地址
jdbc_driver_library => "/usr/local/logstash-7.9.3/logstash-core/lib/jars/mysql-connector-java-8.0.21.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
codec => plain { charset => "UTF-8"}
#use_column_value => true
#追踪的字段
tracking_column => AuthTime //根据这个字段来更新增量数据
record_last_run => true
#/lastrundata/logstash_jdbc_last_run_data 目录文件自行创建
last_run_metadata_path => "/usr/local/logstash-7.9.3/lastrundata/logstash_jdbc_last_run_data"
statement_filepath => "/usr/local/logstash-7.9.3/bin/jdbc.sql"
clean_run => false
# 这是控制定时的,重复执行导入任务的时间间隔,第一位是分钟 不设置就是1分钟执行一次
schedule => "* * * * *"
type => "std"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
elasticsearch {
# Elasticsearch主机地址
hosts => "127.0.0.1:9200"
# Elasticsearch的索引的名称
index => "goods"
# 类型名称(类似数据库表名)
document_type => "spu"
# 主键名称(类似数据库主键)
document_id => "%{id}"
}
stdout {
# JSON格式输出
codec => json_lines
}
}
jdbc.conf
内容切记多余换行和回车符号,logstash执行解析会报语法错误,报错行修改即可
/usr/local/logstash-7.9.3/lastrundata/logstash_jdbc_last_run_data
文件内容为
--- !ruby/object:DateTime '2020-11-24 07:49:00.103092000 Z'
logstash执行后logstash_jdbc_last_run_data文件内容中时间会被更新
jdbc.sql
内容如下:
select id,Operator,Version,ClientInfo,AuthType,AuthTime from authorize where AuthTime > :sql_last_value
4. 执行命令
切换到目录 logstash-7.9.3/bin/
执行 ./logstash -f jdbc.conf
等待····
尝试执行 curl -XGET http://127.0.0.1:9200/goods/spu/_search
则可返回json数据,over~