介绍
canal可以用来监控数据库数据的变化,从而获得新增数据,或者修改的数据。
原理
- canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
- mysql master收到dump请求,开始推送binary log给slave(也就是canal)
- canal解析binary log对象(原始为byte流)
一、mysql开启binlog模式
- 1.1 查看当前mysql是否开启binlog模式,ON是已开启。
SHOW VARIABLES LIKE '%log_bin%';
- 1.2 如果log_bin的值为OFF是未开启。修改/etc/my.cnf 需要开启binlog模式。
#添加
#二进制日志存储路径
log-bin=/var/lib/mysql/mysql-bin
#server_id 不能与canal配置文件中的canal_id冲突
server_id=1234
修改完成之后,重启mysqld的服务。
- 1.3 创建 canal 用户,以canal身份访问数据库
create user canal@'%' IDENTIFIED by 'Canal123&';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
二、canal服务端安装配置
- 2.1 下载地址canal
链接:https://pan.baidu.com/s/1fukqi7ZirAo-MLHedc18hA
提取码:ye8n
https://github.com/alibaba/canal/releases/tag/canal-1.0.24
上传,解压后
- 2.2 修改 exmaple下的实例配置
vi conf/example/instance.properties
- 2.3 进入mysql中执行下面语句查看binlog所在位置
show master status;
如果file中binlog文件不为 mysql-bin.000001 可以重置mysql
mysql> reset master;
查看canal配置文件
vi /usr/local/canal/conf/example/meta.dat
找到对应的binlog信息更改一致即可
"journalName":"mysql-bin.000001","position":120,"
- 2.4 启动服务
bin/startup.sh
查看日志
cat /usr/local/canal/logs/canal/canal.log
三、数据监控微服务
- 3.1 pom引入依赖
<dependency>
<groupId>com.xpand</groupId>
<artifactId>starter-canal</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
我们这里使用的一个开源的项目,它实现了springboot与canal的集成。
下载 https://github.com/chenqian56131/spring-boot-starter-canal
将starter-canal 项目 mvn install 到本地仓库。
链接:https://pan.baidu.com/s/1ckOikMgyP9-HOkj_Ar3kWw
提取码:8eue
-
3.2 创建启动类
3.3 添加配置文件application.properties
canal:
client:
instances:
example:
#canal客户端Ip
host: 192.168.3.156
#canal客户端端口
port: 11111
- 3.4 编写监听
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.xpand.starter.canal.annotation.*;
@CanalEventListener
public class CanalDataEventListener {
/*
eventType 事件类型:增删改等
rowData 更新行的数据
*/
//新增监听
@InsertListenPoint
public void onEventInsert(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
rowData.getAfterColumnsList().forEach((column) ->
System.out.println("列名:" + column.getName() + "-------变成数据:" + column.getValue()));
}
//编辑监听
@UpdateListenPoint
public void onEventUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
//更新前的数据
rowData.getBeforeColumnsList().forEach((column) ->
System.out.println("修改前列名:" + column.getName() + "-------变成数据:" + column.getValue()));
//更新后的数据
rowData.getAfterColumnsList().forEach((column) ->
System.out.println("修改后列名:" + column.getName() + "-------变成数据:" + column.getValue()));
}
//删除监听
@DeleteListenPoint
public void onEventDelete(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
rowData.getBeforeColumnsList().forEach((column) ->
System.out.println("删除前列名:" + column.getName() + "-------变成数据:" + column.getValue()));
}
//自定义监听
@ListenPoint(
eventType = {CanalEntry.EventType.INSERT, CanalEntry.EventType.DELETE}, // 监听的事件类型
schema = {"gyg_member"}, // 数据库
table = {"tb_menu"}, //表
destination = "example" //指向 canal客户端 的 example配置
)
public void onEventMyListener(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
rowData.getAfterColumnsList().forEach((column) ->
System.out.println("列名:" + column.getName() + "-------变成数据:" + column.getValue()));
}
}
四、CanalUtil
为了方便获取返回值,创建CanalUtil
import com.alibaba.otter.canal.protocol.CanalEntry;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class CanalUtil {
public static Map<String, String> convertToMap(List<CanalEntry.Column> columnsList) {
Map<String, String> map = new HashMap();
columnsList.forEach(c -> map.put(c.getName(), c.getValue()));
return map;
}
调用
@InsertListenPoint
public void onEventInsert(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
Map<String, String> afterMap = CanalUtil.convertToMap( rowData.getAfterColumnsList());
System.out.println(JSON.toJSONString(afterMap));
}
五、Canal 使用
1.1 若Canal服务出现宕机,MySQL的binlog仍存储数据,并等待Canal服务消费。当需要同步Redis中的数
据时,为防止Redis服务出现宕机问题从而导致数据无法同步。Canal服务调用RabbitMQ,由RabbitMQ消费
端进行对Redis的数据同步。1.2 RabbitMQ搭建
1.3 待补充