如何使用logstash同步es数据
1.下载
https://download.elastic.co/logstash/logstash/logstash-2.4.1.zip
2.安装
2.1 解压安装到目录
2.2 安装插件 logstash-input-mysql
cd logstash-2.4.1
bin/logstash-plugin install logstash-input-jdbc
如果插件更新失败, 是因为RubyGems 一直以来在国内都非常难访问到
请尽可能用比较新的 RubyGems 版本,建议 2.6.x 以上。
# 这里请翻墙一下
$ gem update --system
$ gem -v
2.6.3
$ gem sources --add https://gems.ruby-china.org/ --remove https://rubygems.org/
$ gem sources -l
https://gems.ruby-china.org
# 确保只有 gems.ruby-china.org
# 如果遇到 SSL 证书问题,你又无法解决,请直接用 http://gems.ruby-china.org 避免 SSL 的问题
(gem sources --add http://gems.ruby-china.org/ --remove https://gems.ruby-china.org/)
#修改源
cd logstash-2.4.1
vim Gemfile
vim Gemfile.jruby-1.9.lock
3 使用
参考
3.1 logstash.conf 配置
# 这里的路径相对 你执行命令所在路径
input {
jdbc {
#驱动
jdbc_driver_library => "/opt/lib/mysql-connector-java-5.1.40.jar"
#驱动类
jdbc_driver_class => "com.mysql.jdbc.Driver"
#数据库
jdbc_connection_string => "jdbc:mysql://localhost:3306/database_name"
#数据库用户名
jdbc_user => "root"
#数据库密码
jdbc_password => "password"
#任务调度 (分、时、天、月、年,全部为*默认含义为每分钟都更新)
schedule => "* * * * *"
#脚本路径
statement_filepath => "../scripts/patent.sql"
#分页
jdbc_paging_enabled => "true"
#分页大小(默认100,000)
jdbc_page_size => "100000"
#---- 增量更新 ----
#是否记录上次执行结果(执行最后一条记录的结果,为了增量更新,数据量大时,不需要重新查询)
#默认true
record_last_run=>"true"
#是否记录某个column的值,如果false存储同步最后一条记录的时间戳
use_column_value => "true"
#如果 use_column_value 为真,需配置此参数,表示记录哪一列(注意是elasticsearch中document里的列,不是sql的列)
tracking_column => "updateDate"
#指定文件(记录上次执行到的 tracking_column 字段的值)
last_run_metadata_path => "../lastrun/logstash_jdbc_last_run_patent_payable_fee"
#是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
#避免踩坑,在同步缴费信息过程中,将clean_run设置为true,
#导致第一次同步50w条数据后,关闭logstash,再次开启同步,发现logstash居然从头查询,没有使用tracking_column的value
#默认false
clean_run => "false"
#时区设置
#建议不要使用该配置,可能会导致数据同步时遗漏数据
#修改了之后发现last_run中记录的时间变成本地时间,当时es里还是存储的utc标准时间(比本地时间早8个小时)
jdbc_default_timezone => "Asia/Shanghai"
}
}
output {
elasticsearch {
#全文检索服务器
hosts => "localhost:9200"
#索引(数据库)
index => "zl_dev"
#类型(数据库表)
document_type => "%{type}"
#主键(防止重复)
document_id => "%{id}"
}
}
3.2 sql 脚本
这里sql_last_value 我们记录的是修改时间,这样容易完成增量更新
SELECT * FROM t_dp_patent WHERE update_date > :sql_last_value
3.3 执行脚本
cd /opt/logstash-2.4.1/bin
./logstash -f ../scripts/patent.conf &
4.疑难杂症
映射文件
{
"settings": {
"analysis": {
"analyzer": {
"my_analyzer": {
"type": "pattern",
"pattern": ",",
"tokenizer": "whitespace",
"char_filter": [ "html_strip"]
}
}
}
},
"mappings": {
"patent": {
"properties": {
"patentHolder": {
"type": "string",
"analyzer": "ik"
},
"patentHolder_": {
"type": "string",
"index":"not_analyzed"
},
"patentHolder_1": {
"type": "string",
"analyzer":"my_analyzer"
},
"applicant_": {
"type": "string",
"index":"not_analyzed"
},
"name": {
"type": "string",
"analyzer": "ik"
},
"name_": {
"type": "string",
"index":"not_analyzed"
},
"applicationNumber": {
"type": "string",
"analyzer": "ik"
},
"applicationNumber_": {
"type": "string",
"index":"not_analyzed"
},
"applicant": {
"type": "string",
"analyzer": "ik"
},
"applicant_": {
"type": "string",
"index":"not_analyzed"
},
"applicant_1": {
"type": "string",
"analyzer":"my_analyzer"
},
"agent": {
"type": "string",
"analyzer": "ik"
},
"agency": {
"type": "string",
"analyzer": "ik"
},
"agency_": {
"type": "string",
"index":"not_analyzed"
},
"applicantAddr": {
"type": "string",
"analyzer": "ik"
},
"inventor": {
"type": "string",
"analyzer": "ik"
},
"synoptic": {
"type": "string",
"analyzer": "ik"
},
"createDate": {
"format": "strict_date_optional_time||epoch_millis",
"type": "date"
},
"updateDate": {
"format": "strict_date_optional_time||epoch_millis",
"type": "date"
},
"ipc": {
"type": "string"
},
"loc": {
"type": "string"
},
"synopticPicId": {
"type": "string"
},
"censor": {
"type": "string"
},
"applicantArea": {
"type": "string"
},
"applicationDate": {
"format": "strict_date_optional_time||epoch_millis",
"type": "date"
},
"notificationDate": {
"format": "strict_date_optional_time||epoch_millis",
"type": "date"
},
"notificationNumber": {
"type": "string"
},
"id": {
"type": "long"
},
"keysHash": {
"type": "string"
},
"patentNo": {
"type": "string"
},
"type_": {
"type": "string"
},
"status": {
"type": "long"
},
"delFlag": {
"type": "long"
}
}
}
}
}