利用logstash的JDBC插件实现MySQL到Elasticsearch数据同步

注意:es 、logstash版本要一模一样
参考: https://www.elastic.co/guide/en/logstash-versioned-plugins/current/v4.3.16-plugins-inputs-jdbc.html

同步数据库数据到 Elasticsearch

1、需求:将数据库中的数据同步到 ES,借助 ES 的全文搜索,提高搜索速度
2、 需要把新增用户信息同步到 Elasticsearch 中
3、 用户信息 Update 后,需要能被更新到 Elasticsearch
4、支持增量更新
5、用户注销后,不能被 ES 所搜索到

搭建过程

准备user表的结构

  • last_updated 字段非常关键,这个是作为识别数据是否变动的依据
CREATE TABLE `user` (
  `id` int NOT NULL,
  `email` varchar(255) DEFAULT NULL,
  `is_deleted` bit(1) NOT NULL,
  `last_updated` bigint DEFAULT NULL,
  `name` varchar(255) DEFAULT NULL,
  `tags` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

拉取镜像并启动logstash容器

docker pull logstash:7.16.2
docker run --privileged --name logstash -p 5045:5045 -d logstash:7.16.2

logstash可以指定配置文件运行

logstash -f /usr/share/logstash/config/logstash-sample.conf
当然我们也可以直接修改容器中的默认配置:
vi /usr/share/logstash/pipeline/logstash.conf

  • 填入数据源MySQL的连接信息、用户和密码
  • 在指定cron表达式下调用statement指定的SQL脚本
  • output节点表示将SQL查询出来的数据导入到目的的ES服务
input {
  jdbc {
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.1.4:3306/db_example?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=UTC"
    jdbc_user => "root"
    jdbc_password => "123456"
    #启用追踪,如果为true,则需要指定tracking_column
    use_column_value => true
    #指定追踪的字段,
    tracking_column => "last_updated"
    #追踪字段的类型,目前只有数字(numeric)和时间类型(timestamp),默认是数字类型
    tracking_column_type => "numeric"
    #记录最后一次运行的结果
    record_last_run => true
    #上面运行结果的保存位置
    last_run_metadata_path => "jdbc-position.txt"
    statement => "SELECT * FROM user where last_updated >:sql_last_value;"
    schedule => " * * * * * *"
  }
}
output {
  elasticsearch {
    document_id => "%{id}"
    document_type => "_doc"
    index => "user"
    hosts => ["http://192.168.1.4:9200"]
  }
  stdout{
    codec => rubydebug
  }
}

修改logstash.yml,指定ES的地址

vi /usr/share/logstash/config/logstash.yml

http.host: "0.0.0.0"
xpack.monitoring.elasticsearch.hosts: [ "http://192.168.1.4:9200" ]

jdbc驱动

需要保证logstash的jdbc驱动和你的spring程序所用的jdbc驱动一致!
直接copy过去:

docker cp mysql-connector-java-8.0.17.jar logstash:/usr/share/logstash/logstash-core/lib/jars/mysql-connector-java-8.0.17.jar'

logstash启动报错

TypeError: could not coerce Integer to class java.lang.String解决:
jdbc_password => "123456" 密码用引号包括,貌似logstash在读mysql密码时把12345当做Integer处理了,我们加入引号包裹让它被认作为字符串类型、

启动logstash后docker logs -f logstash 查看日志没问题启动正常,观察日志

随后到spring应用程序里修改数据库数据,随便insert一条新的

发现日志输出如下:

[2023-09-25T10:09:36,940][INFO ][logstash.inputs.jdbc     ] (0.025473s) SELECT * FROM user where last_updated >1695627545540;
[2023-09-25T10:09:37,010][INFO ][logstash.inputs.jdbc     ] (0.002776s) SELECT * FROM user where last_updated >1695636565237;
[2023-09-25T10:09:37,049][INFO ][logstash.inputs.jdbc     ] (0.002769s) SELECT * FROM user where last_updated >1695636565237;
[2023-09-25T10:09:37,111][INFO ][logstash.inputs.jdbc     ] (0.006902s) SELECT * FROM user where last_updated >1695636565237;
{
    "last_updated" => 1695636565237,
              "id" => 2,
            "name" => "yinkai",
            "tags" => "Elasticsearch,IntelliJ",
      "@timestamp" => 2023-09-25T10:09:36.969Z,
        "@version" => "1",
           "email" => "2542847562@qq.com",
      "is_deleted" => false
}
[2023-09-25T10:09:38,342][INFO ][logstash.inputs.jdbc     ] (0.002683s) SELECT * FROM user where last_updated >1695636565237;

说明已经正常从MySQL中load到了数据

到es中查询看下新加的数据有了没

POST /user/_search
{"query": {
    "term": {
      "name.keyword": {
        "value": "yinkai"
      }
    }
  }
}

{
  "took" : 3,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 0.6931472,
    "hits" : [
      {
        "_index" : "user",
        "_type" : "_doc",
        "_id" : "2",
        "_score" : 0.6931472,
        "_source" : {
          "last_updated" : 1695636565237,
          "id" : 2,
          "name" : "yinkai",
          "tags" : "Elasticsearch,IntelliJ",
          "@timestamp" : "2023-09-25T10:09:36.969Z",
          "@version" : "1",
          "email" : "2542847562@qq.com",
          "is_deleted" : false
        }
      }
    ]
  }
}

注意事项

1、确保Elasticsearch和logstash的版本一致
2、logstash的jdbc驱动要和应用的一致
3、索引会自动创建
4、删除的实现只能依赖逻辑删除,否则无法从mysql同步es
5、此方法对业务代码毫无侵入
6、但是数据库有定时器一直在查的压力

同步多个表怎么实现?

  • 只需要配置多个jdbc节点即可
  • 不同的jdbc节点下定义不同的type标识
  • 在output节点中根据定义的不同type标识指定不同的es节点(当然可以是同一个es实例,但是index肯定不一样)
input {
  jdbc {
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.1.4:3306/db_example?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=UTC"
    jdbc_user => "root"
    jdbc_password => "123456"
    #启用追踪,如果为true,则需要指定tracking_column
    use_column_value => true
    #指定追踪的字段,
    tracking_column => "last_updated"
    #追踪字段的类型,目前只有数字(numeric)和时间类型(timestamp),默认是数字类型
    tracking_column_type => "numeric"
    #记录最后一次运行的结果
    record_last_run => true
    #上面运行结果的保存位置
    last_run_metadata_path => "jdbc-position.txt"
    statement => "SELECT * FROM user where last_updated >:sql_last_value;"
    schedule => " * * * * * *"
        type => "user"
  }
    jdbc {
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.1.4:3306/db_example?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=UTC"
    jdbc_user => "root"
    jdbc_password => "123456"
    #启用追踪,如果为true,则需要指定tracking_column
    use_column_value => true
    #指定追踪的字段,
    tracking_column => "last_updated"
    #追踪字段的类型,目前只有数字(numeric)和时间类型(timestamp),默认是数字类型
    tracking_column_type => "numeric"
    #记录最后一次运行的结果
    record_last_run => true
    #上面运行结果的保存位置
    last_run_metadata_path => "jdbc-position2.txt"
    statement => "SELECT * FROM book where last_updated >:sql_last_value;"
    schedule => " * * * * * *"
        type => "book"

  }

}
output {
  if[type] == "user" {
    elasticsearch {
    document_id => "%{id}"
    document_type => "_doc"
    index => "user"
    hosts => ["http://192.168.1.4:9200"]
  }
  }

  if[type] == "book" {
   elasticsearch {
    document_id => "%{id}"
    document_type => "_doc"
    index => "book"
    hosts => ["http://192.168.1.4:9200"]
  }

  }

  stdout{
    codec => rubydebug
  }
}

logstash 和 canal的区别

1、在传输上,两者都可以看做传输数据的管道,但logstash可以从数据库读取数据(例:MySql),传输到ES中,而canal能做到的不止这些,它可以把MySql的数据读取出来,配合java代码(貌似只支持java),
将读取到的数据存储到任何代码能操作的地方,例如:文件、MySql、Redis、ES等等
2、在时效性上,logstash配置文件是使用定时器去同步数据,而canal是监听MySql的binlog日志,进而做到数据几乎实时同步(PS:实际开发中基本是1秒~2秒内)

3、在数据处理上,logstash可以读取多个表,然后分别存储到对应节点,或者可以在存储前进行联表查询,进而存储成一条数据。但当联表查询一对多时,存储成一条数据就变得不现实了,比如:文章搜索,并且连文章对应的评论一起搜索出来。如果用logstash的话,可能需要存成两张表,先根据条件搜索文章表,然后再循环查到的文章列表,再查询评论表,再拼装成数据,接口返回结果。但如果用canal的话,在数据存储到ES前,可以先把对应的评论查询出来,并且拼装成json字符串,当做文章表的单个字段存储到ES中,这样可以直接查询出来,避免反复地查询ES组装数据。在这一点上,理论上用canal查询会更快,更高效。

4、在对数据库的压力上,logstash的原理是定时扫描变动的表,所以对数据库有一定压力,并且如果有其他程序在进行某条语句更新,锁住了这条行数据,那logstash读取数据时,就会被“卡住”,如果这个时间过长,可能会影响服务器卡死。而canal由于是监听的binlog日志,所以几乎对MySql没有压力,并且binlog已经记录,不会存在数据变动的情况。

5、读取数据上,logstash可以读取数据库、文本文件,而canal读取的是binlog文件(binlog属于二进制文件)。

6、学习成本上,logstash相对简单,canal相对难一点。logstash时效较低,同步过程中对数据库造成的压力较大,但可以对全表进行扫描,适合做一次性的全量同步。canal时效性更高,同步过程中对数据库造成的压力更小,但仅提供对数据库增量日志的监听,适合做增量同步。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,937评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,503评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,712评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,668评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,677评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,601评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,975评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,637评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,881评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,621评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,710评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,387评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,971评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,947评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,189评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,805评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,449评论 2 342