- 使用Canal实现es数据实时同步
canal :github地址:https://github.com/alibaba/canal/
原理很简单:数据库开启bin-log日志,calan伪装成mysql的slave 读取bin-log日志
官方的说明:
canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
canal 解析 binary log 对象(原始为 byte 流)
最新Canal版本为 1.1.4 支持 ES7
(1)、 下载canal项目(git命令也可以 - - 随意.)
(2)、解压
(3)、打开终端 cd到项目目录
mvn clean install
说明:没有配置 maven 的 自己百度配置一下 - - .
安装完成后:打开target文件夹
就可以看到 tar包.
把
canal.deployer-1.1.5-SNAPSHOT.tar.gz
canal.adapter-1.1.5-SNAPSHOT.tar.gz
canal.admin-1.1.5-SNAPSHOT.tar.gz
上传到 centos .并解压.
mkdir canal-xxxx
tar -zxf canal.xxxx-1.1.5-SNAPSHOT.tar.gz -C /usr/soft/canal-xxxx
先新建目录,然后再解压到相应目录. xxxx 是不同的项目.
cd canal-deployer/conf/example
修改instance.properties
vim instance.properties
还有一些其他配置:稍后再说
最后切换到 bin目录 start.sh 即可
查看日志启动成功
日志在logs下面:
tail -200f example/example.log
canal1.1.4提供 web-ui
官方说明:
canal 1.1.4版本,迎来最重要的WebUI能力,引入canal-admin工程,
支持面向WebUI的canal动态管理能力,支持配置、任务、日志等在线白屏运维能力,
具体文档:[Canal Admin Guide](https://github.com/alibaba/canal/wiki/Canal-Admin-Guide)
进入到 canal-admin目录:主要配置在 conf中
首先:创建数据库canal_manager
然后编辑application.yml 修改成自己的数据库信息
修改instance-template.properties
最后切换到 bin目录 启动即可
访问路径:
http://192.168.0.196:8089
账号:admin
密码:123456
详情参考:https://github.com/alibaba/canal/wiki/Canal-Admin-Guide
进入 canal-adapter/conf 目录
这里需要配置 application.yml 和es7(因为我们使用的es是7.0.1版本)
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp # kafka rocketMQ
canalServerHost: 192.168.0.196:11111 #canal默认地址 可以在 canal-deployer 项目中配置
# zookeeperHosts: slave1:2181
# mqServers: 127.0.0.1:9092 #or rocketmq
# flatMessage: true
batchSize: 500
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
username:
password:
vhost:
srcDataSources:
defaultDS:
url: jdbc:mysql://192.168.0.201:3306/khfooddb?useUnicode=true #需要同步数据数据库地址
username: root
password: root
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
# - name: rdb
# key: mysql1
# properties:
# jdbc.driverClassName: com.mysql.jdbc.Driver
# jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
# jdbc.username: root
# jdbc.password: 121212
# - name: rdb
# key: oracle1
# properties:
# jdbc.driverClassName: oracle.jdbc.OracleDriver
# jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
# jdbc.username: mytest
# jdbc.password: m121212
# - name: rdb
# key: postgres1
# properties:
# jdbc.driverClassName: org.postgresql.Driver
# jdbc.url: jdbc:postgresql://localhost:5432/postgres
# jdbc.username: postgres
# jdbc.password: 121212
# threads: 1
# commitSize: 3000
# - name: hbase
# properties:
# hbase.zookeeper.quorum: 127.0.0.1
# hbase.zookeeper.property.clientPort: 2181
# zookeeper.znode.parent: /hbase
- name: es7 #es版本选择 es6 or es7
hosts: 192.168.0.196:9300 # 127.0.0.1:9200 for rest mode
properties:
# mode: transport # or rest
# # security.auth: test:123456 # only used for rest mode
cluster.name: elasticsearch
编辑 es7/mytest_user.yml
为什么是他呢。。。
因为:
相对应,也可以自定义。。。
vim es7/mytest_user.yml
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
_index: coremerchant_index # es 的index
# _type: _doc
_id: _id
upsert: true
# pk: id
sql: "select a.id as _id,a.id,a.name,a.status,concat(IFNULL(a.latitude, 0), ',', IFNULL(a.longitude, 0)) AS location from core_merchant a"
# objFields:
# _labels: array:;
# etlCondition: "where t.c_time>={}"
commitBatch: 3000
最后切换到 bin 目录 startup.sh
然后 tailf ../logs/adapter/adapter.log
查看启动日志
最后进行测试- -
自己数据库对应的表 INSERT INTO 一条数据,看看查看效果.