使用Logstash也可以把关系型数据库如mysql, postgresql中的数据同步到其它存储介质,下面介绍如何使用腾讯云Logstash产品同步mysql中的数据到Elasticsearch。
创建管道
在“管道管理”页面,点击“新建管道”按钮,创建一个管道:
进入管道配置页面,点击“引用模板”按钮,同时引用“input-jdbc”和“output-elasticsearch”两个模板:
在管道配置中,分别针对“input-jdbc”和“output-elasticsearch”进行配置,一些关键的配置参数说明如下:
input-jdbc
- jdbc_connection_string:数据库连接地址
- jdbc_user: 数据库账号
- jdbc_password: 数据库账号密码
- jdbc_driver_library: jdbc驱动jar包,在Logstash节点的/usr/local/service/logstash/extended-files目录下,有大多数版本的mysql以及postgresql数据库的jdbc驱动jar包,可根据需要直接引用,可用的驱动jar包列表如下:
- mysql-connector-java-5.1.27.jar
- mysql-connector-java-5.1.35.jar
- mysql-connector-java-5.1.39-bin.jar
- mysql-connector-java-5.1.39.jar
- mysql-connector-java-5.1.40.jar
- mysql-connector-java-5.1.43.jar
- mysql-connector-java-5.1.47.jar
- mysql-connector-java-5.1.48.jar
- mysql-connector-java-5.1.9.jar
- mysql-connector-java-6.0.2.jar
- mysql-connector-java-6.0.6.jar
- mysql-connector-java-8.0.11.jar
- mysql-connector-java-8.0.17.jar
- mysql-connector-java-8.0.18.jar
- postgresql-42.0.0.jar
- postgresql-42.1.4.jar
- postgresql-42.2.0.jar
- postgresql-42.2.10.jar
- postgresql-42.2.13.jar
- postgresql-42.2.1.jar
- postgresql-42.2.8.jar
- jdbc_driver_class: jdbc驱动类,对于mysql可填写“com.mysql.jdbc.Driver”,postgresql可填:“org.postgresql.Driver”
- jdbc_paging_enabled: 从数据库批量拉取数据时是否开启分页,可选值"true"或者"false"
- jdbc_page_size: jdbc分页大小
- statement: 用于拉取数据的sql语句
- tracking_column: 当在statement中指定了sql_last_value用于记录读取数据的offset时,使用数据库表中的哪个字段的值来记录offset
- use_column_value: 当在statement中指定了sql_last_value用于记录读取数据的offset时,是否使用数据库表中的字段;设置为true则使用tracking_column定义的字段,否则使用前一次sql语句执行时的时间戳
- schedule:是否开启定时任务持续执行sql语句,不设置的话则只会执行一次sql语句,执行结束后管道自动结束
- type:标识字段
查看更多参数的具体含义,请参考logstash-input-jdbc
output-elasticsearch
- hosts: elasticsearch集群地址列表
- user: elasticsearch集群账号
- password: elasticsearch集群密码
- index: 索引名称
- document_type: 索引type,对于不同版本的ES集群,该字段有不同的默认值,5.x及以下的集群,默认会使用input中指定的type字段,如果type字段不存在,则该字段的值为doc;6.x的集群,该字段默认值为doc;7.x的集群,该字段默认值为_doc; 8.x的集群,不会使用该字段
- document_id: 文档ID
查看更多参数,可以参考output-elasticsearch
在配置完管道后,点击“保存并部署”创建一个管道并自动部署:
实战案例
全量同步mysql表中的数据到Elasticsearch
当mysql某张表不再进行写入时,可以使用如下配置全量地把数据同步到Elasticsearch集群中,管道配置如下:
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://x.x.x.x:3306/logstash_test"
jdbc_user => "user"
jdbc_password => "xxxxx"
jdbc_driver_library => "/usr/local/service/logstash/extended-files/mysql-connector-java-5.1.40.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "5000"
statement => "select * from newTable0"
type => "jdbc"
}
}
output {
elasticsearch {
hosts => ["http://x.x.x.x:9200"]
user => "elastic"
password => "xxxxx"
index => "newTable0"
}
}
增量同步mysql表中的数据到Elasticsearch
当mysql某张表在持续进行写入,可以使用如下配置,通过sql_last_value记录offset,把数据增量地同步到Elasticsearch集群中,管道配置如下:
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://x.x.x.x:3306/logstash_test"
jdbc_user => "user"
jdbc_password => "xxxxx"
jdbc_driver_library => "/usr/local/service/logstash/extended-files/mysql-connector-java-5.1.40.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "5000"
statement => "select * from newTable0 where id > :sql_last_value"
use_column_value => true
tracking_column => "id"
type => "jdbc"
}
}
output {
elasticsearch {
hosts => ["http://x.x.x.x:9200"]
user => "elastic"
password => "xxxxx"
index => "newTable0"
}
}
注意,上述配置中指定了tracking_column为字段"id", 需要数据表中包含一个自增的"id"字段,当然可以根据实际情况使用不同的字段。
查看日志
在控制台中,查看日志,如果没有ERROR级别的日志,则说明管道配置没有问题:
查看数据写入情况
进入到output-elasticsearch中定义的输出端的ES集群对应的kibana页面,在Dev tools工具栏里查看索引是否存在,以及索引的文档数量是否正确: