注意:es 、logstash版本要一模一样
参考: https://www.elastic.co/guide/en/logstash-versioned-plugins/current/v4.3.16-plugins-inputs-jdbc.html
同步数据库数据到 Elasticsearch
1、需求:将数据库中的数据同步到 ES,借助 ES 的全文搜索,提高搜索速度
2、 需要把新增用户信息同步到 Elasticsearch 中
3、 用户信息 Update 后,需要能被更新到 Elasticsearch
4、支持增量更新
5、用户注销后,不能被 ES 所搜索到
搭建过程
准备user表的结构
- last_updated 字段非常关键,这个是作为识别数据是否变动的依据
CREATE TABLE `user` (
`id` int NOT NULL,
`email` varchar(255) DEFAULT NULL,
`is_deleted` bit(1) NOT NULL,
`last_updated` bigint DEFAULT NULL,
`name` varchar(255) DEFAULT NULL,
`tags` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
拉取镜像并启动logstash容器
docker pull logstash:7.16.2
docker run --privileged --name logstash -p 5045:5045 -d logstash:7.16.2
logstash可以指定配置文件运行
logstash -f /usr/share/logstash/config/logstash-sample.conf
当然我们也可以直接修改容器中的默认配置:
vi /usr/share/logstash/pipeline/logstash.conf
- 填入数据源MySQL的连接信息、用户和密码
- 在指定cron表达式下调用statement指定的SQL脚本
- output节点表示将SQL查询出来的数据导入到目的的ES服务
input {
jdbc {
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://192.168.1.4:3306/db_example?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=UTC"
jdbc_user => "root"
jdbc_password => "123456"
#启用追踪,如果为true,则需要指定tracking_column
use_column_value => true
#指定追踪的字段,
tracking_column => "last_updated"
#追踪字段的类型,目前只有数字(numeric)和时间类型(timestamp),默认是数字类型
tracking_column_type => "numeric"
#记录最后一次运行的结果
record_last_run => true
#上面运行结果的保存位置
last_run_metadata_path => "jdbc-position.txt"
statement => "SELECT * FROM user where last_updated >:sql_last_value;"
schedule => " * * * * * *"
}
}
output {
elasticsearch {
document_id => "%{id}"
document_type => "_doc"
index => "user"
hosts => ["http://192.168.1.4:9200"]
}
stdout{
codec => rubydebug
}
}
修改logstash.yml,指定ES的地址
vi /usr/share/logstash/config/logstash.yml
http.host: "0.0.0.0"
xpack.monitoring.elasticsearch.hosts: [ "http://192.168.1.4:9200" ]
jdbc驱动
需要保证logstash的jdbc驱动和你的spring程序所用的jdbc驱动一致!
直接copy过去:
docker cp mysql-connector-java-8.0.17.jar logstash:/usr/share/logstash/logstash-core/lib/jars/mysql-connector-java-8.0.17.jar'
logstash启动报错
TypeError: could not coerce Integer to class java.lang.String解决:
jdbc_password => "123456" 密码用引号包括,貌似logstash在读mysql密码时把12345当做Integer处理了,我们加入引号包裹让它被认作为字符串类型、
启动logstash后docker logs -f logstash 查看日志没问题启动正常,观察日志
随后到spring应用程序里修改数据库数据,随便insert一条新的
发现日志输出如下:
[2023-09-25T10:09:36,940][INFO ][logstash.inputs.jdbc ] (0.025473s) SELECT * FROM user where last_updated >1695627545540;
[2023-09-25T10:09:37,010][INFO ][logstash.inputs.jdbc ] (0.002776s) SELECT * FROM user where last_updated >1695636565237;
[2023-09-25T10:09:37,049][INFO ][logstash.inputs.jdbc ] (0.002769s) SELECT * FROM user where last_updated >1695636565237;
[2023-09-25T10:09:37,111][INFO ][logstash.inputs.jdbc ] (0.006902s) SELECT * FROM user where last_updated >1695636565237;
{
"last_updated" => 1695636565237,
"id" => 2,
"name" => "yinkai",
"tags" => "Elasticsearch,IntelliJ",
"@timestamp" => 2023-09-25T10:09:36.969Z,
"@version" => "1",
"email" => "2542847562@qq.com",
"is_deleted" => false
}
[2023-09-25T10:09:38,342][INFO ][logstash.inputs.jdbc ] (0.002683s) SELECT * FROM user where last_updated >1695636565237;
说明已经正常从MySQL中load到了数据
到es中查询看下新加的数据有了没
POST /user/_search
{"query": {
"term": {
"name.keyword": {
"value": "yinkai"
}
}
}
}
{
"took" : 3,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 0.6931472,
"hits" : [
{
"_index" : "user",
"_type" : "_doc",
"_id" : "2",
"_score" : 0.6931472,
"_source" : {
"last_updated" : 1695636565237,
"id" : 2,
"name" : "yinkai",
"tags" : "Elasticsearch,IntelliJ",
"@timestamp" : "2023-09-25T10:09:36.969Z",
"@version" : "1",
"email" : "2542847562@qq.com",
"is_deleted" : false
}
}
]
}
}
注意事项
1、确保Elasticsearch和logstash的版本一致
2、logstash的jdbc驱动要和应用的一致
3、索引会自动创建
4、删除的实现只能依赖逻辑删除,否则无法从mysql同步es
5、此方法对业务代码毫无侵入
6、但是数据库有定时器一直在查的压力
同步多个表怎么实现?
- 只需要配置多个jdbc节点即可
- 不同的jdbc节点下定义不同的type标识
- 在output节点中根据定义的不同type标识指定不同的es节点(当然可以是同一个es实例,但是index肯定不一样)
input {
jdbc {
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://192.168.1.4:3306/db_example?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=UTC"
jdbc_user => "root"
jdbc_password => "123456"
#启用追踪,如果为true,则需要指定tracking_column
use_column_value => true
#指定追踪的字段,
tracking_column => "last_updated"
#追踪字段的类型,目前只有数字(numeric)和时间类型(timestamp),默认是数字类型
tracking_column_type => "numeric"
#记录最后一次运行的结果
record_last_run => true
#上面运行结果的保存位置
last_run_metadata_path => "jdbc-position.txt"
statement => "SELECT * FROM user where last_updated >:sql_last_value;"
schedule => " * * * * * *"
type => "user"
}
jdbc {
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://192.168.1.4:3306/db_example?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=UTC"
jdbc_user => "root"
jdbc_password => "123456"
#启用追踪,如果为true,则需要指定tracking_column
use_column_value => true
#指定追踪的字段,
tracking_column => "last_updated"
#追踪字段的类型,目前只有数字(numeric)和时间类型(timestamp),默认是数字类型
tracking_column_type => "numeric"
#记录最后一次运行的结果
record_last_run => true
#上面运行结果的保存位置
last_run_metadata_path => "jdbc-position2.txt"
statement => "SELECT * FROM book where last_updated >:sql_last_value;"
schedule => " * * * * * *"
type => "book"
}
}
output {
if[type] == "user" {
elasticsearch {
document_id => "%{id}"
document_type => "_doc"
index => "user"
hosts => ["http://192.168.1.4:9200"]
}
}
if[type] == "book" {
elasticsearch {
document_id => "%{id}"
document_type => "_doc"
index => "book"
hosts => ["http://192.168.1.4:9200"]
}
}
stdout{
codec => rubydebug
}
}
logstash 和 canal的区别
1、在传输上,两者都可以看做传输数据的管道,但logstash可以从数据库读取数据(例:MySql),传输到ES中,而canal能做到的不止这些,它可以把MySql的数据读取出来,配合java代码(貌似只支持java),
将读取到的数据存储到任何代码能操作的地方,例如:文件、MySql、Redis、ES等等
2、在时效性上,logstash配置文件是使用定时器去同步数据,而canal是监听MySql的binlog日志,进而做到数据几乎实时同步(PS:实际开发中基本是1秒~2秒内)
3、在数据处理上,logstash可以读取多个表,然后分别存储到对应节点,或者可以在存储前进行联表查询,进而存储成一条数据。但当联表查询一对多时,存储成一条数据就变得不现实了,比如:文章搜索,并且连文章对应的评论一起搜索出来。如果用logstash的话,可能需要存成两张表,先根据条件搜索文章表,然后再循环查到的文章列表,再查询评论表,再拼装成数据,接口返回结果。但如果用canal的话,在数据存储到ES前,可以先把对应的评论查询出来,并且拼装成json字符串,当做文章表的单个字段存储到ES中,这样可以直接查询出来,避免反复地查询ES组装数据。在这一点上,理论上用canal查询会更快,更高效。
4、在对数据库的压力上,logstash的原理是定时扫描变动的表,所以对数据库有一定压力,并且如果有其他程序在进行某条语句更新,锁住了这条行数据,那logstash读取数据时,就会被“卡住”,如果这个时间过长,可能会影响服务器卡死。而canal由于是监听的binlog日志,所以几乎对MySql没有压力,并且binlog已经记录,不会存在数据变动的情况。
5、读取数据上,logstash可以读取数据库、文本文件,而canal读取的是binlog文件(binlog属于二进制文件)。
6、学习成本上,logstash相对简单,canal相对难一点。logstash时效较低,同步过程中对数据库造成的压力较大,但可以对全表进行扫描,适合做一次性的全量同步。canal时效性更高,同步过程中对数据库造成的压力更小,但仅提供对数据库增量日志的监听,适合做增量同步。