数据同步利器 - canal

前言

大约两年以前,笔者在一个项目中遇到了数据同步的难题。

当时,系统部署了几十个实例,分为1个中心平台和N个分中心平台,而每一个系统都对应一个单独的数据库实例。

在数据库层面,有这样一个需求:

  • 中心平台数据库要包含所有系统平台的数据。
  • 分中心数据库只包含本系统平台的数据。
  • 在中心平台可以新增或修改 中心平台的数据,但要讲数据实时同步到对应的分中心平台数据库。

这几十个数据库实例之间,没有明确的主从关系,是否同步还要看数据的来源,所以并不能用MySQL的主从同步来做。

当时,笔者实验了几种方式,最后采用的方式是基于Mybatis拦截器机制 + 消息队列的方式来做的。

大概原理是通过Mybatis拦截器,拦截到事务操作,比如新增、修改和删除,根据自定义的数据主键(标识数据来源和去向),封装成对象,投递到消息队列对应的topic中去。然后,每个系统监听不同的topic,消费数据并同步到数据库。

在此后的一段时间里,知道了canal这个开源组件。发现它更直接,它可以从MySQL的binlog中解析数据,投递到消息队列或其它地方。

一、canal简介

说起canal,也是阿里巴巴存在数据同步的业务需求。所以从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务。

基于日志增量订阅&消费支持的业务:

  • 数据库镜像
  • 数据库实时备份
  • 多级索引 (卖家和买家各自分库索引)
  • search build
  • 业务cache刷新
  • 价格变化等重要业务消息

我们正可以基于canal的机制,来完成一系列如数据同步、缓存刷新等业务。

二、启动canal

1、修改MySQL配置

对于自建的MySQL服务, 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下:

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

然后创建一个账户,用来链接MySQL,作为 MySQL slave 的权限。

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

2、下载

下载canal非常简单,访问 releases页面选择需要的包下载,然后将下载的包解压到指定的目录即可。

tar -zxvf canal.deployer-1.1.4.tar.gz -C /canal

解压完成后,我们可以看到这样一个目录:

image

3、修改配置

在启动之前,还需要修改一些配置信息。

首先,定位到canal/conf/example ,编辑instance.properties配置文件,重点有几项:

canal.instance.mysql.slaveId=1234               # canal模拟slaveid
canal.instance.master.address=127.0.0.1:3306    # MySQL数据库地址
canal.instance.dbUsername=canal                 # 作为slave角色的账户
canal.instance.dbPassword=canal                 # 作为slave角色的账户密码
canal.instance.connectionCharset = UTF-8        # 数据库编码方式对应Java中的编码类型
canal.instance.filter.regex=.*\\..*             # 表过滤的表达式
canal.mq.topic=example                          # MQ 主题名称

我们希望canal监听到的数据,要发送到消息队列中,还需要修改canal.properties文件,在这里主要是MQ的配置。在这里笔者使用的是阿里云版RocketMQ,参数如下:

# 配置ak/sk
canal.aliyun.accessKey = XXX
canal.aliyun.secretKey = XXX
# 配置topic
canal.mq.accessChannel = cloud
canal.mq.servers = 内网接入点
canal.mq.producerGroup = GID_**group(在后台创建)
canal.mq.namespace = rocketmq实例id
canal.mq.topic=(在后台创建)

4、启动

直接运行启动脚本即可运行:./canal/bin/startup.sh 。 然后打开logs/canal/canal.log文件,可以看到启动效果。

2020-02-26 21:12:36.715 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2020-02-26 21:12:36.746 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.44.128(192.168.44.128):11111]
2020-02-26 21:12:37.406 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......

三、启动MQ监听

我们把canal监听到的数据,投送到了消息队列中,那么接下来就是写个监听程序来消费其中的数据。

为了方便,笔者直接使用的是阿里云版RocketMQ,测试代码如下:

public static void main(String[] args) {
    Properties properties = new Properties();
    // 您在控制台创建的 Group ID
    properties.put(PropertyKeyConst.GROUP_ID, "GID_CANAL");
    // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
    properties.put(PropertyKeyConst.AccessKey, "accessKey");
    // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
    properties.put(PropertyKeyConst.SecretKey, "secretKey");
    // 设置 TCP 接入域名,到控制台的实例基本信息中查看
    properties.put(PropertyKeyConst.NAMESRV_ADDR,"http://MQ_INST_xxx.mq-internet.aliyuncs.com:80");
    // 集群订阅方式(默认)
    // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
    Consumer consumer = ONSFactory.createConsumer(properties);
    consumer.subscribe("example","*",new CanalListener());
    consumer.start();
    logger.info("Consumer Started");
}

四、测试

把环境都部署好之后,我们进入测试阶段来看一看实际效果。

我们以一张t_account表为例,这里面记录着账户id和账户余额。

首先,我们新增一条记录,insert into t_account (id,user_id,amount) values (4,4,200);

此时,MQ消费到数据如下:

{
    "data": [{
        "id": "4",
        "user_id": "4",
        "amount": "200.0"
    }],
    "database": "seata",
    "es": 1582723607000,
    "id": 2,
    "isDdl": false,
    "mysqlType": {
        "id": "int(11)",
        "user_id": "varchar(255)",
        "amount": "double(14,2)"
    },
    "old": null,
    "pkNames": ["id"],
    "sql": "",
    "sqlType": {
        "id": 4,
        "user_id": 12,
        "amount": 8
    },
    "table": "t_account",
    "ts": 1582723607656,
    "type": "INSERT"
}

通过数据可以看到,这里面详细记录了数据库的名称、表的名称、表的字段和新增数据的内容等。

然后,我们还可以把这条数据修改一下:update t_account set amount = 150 where id = 4;

此时,MQ消费到数据如下:

{
    "data": [{
        "id": "4",
        "user_id": "4",
        "amount": "150.0"
    }],
    "database": "seata",
    "es": 1582724016000,
    "id": 3,
    "isDdl": false,
    "mysqlType": {
        "id": "int(11)",
        "user_id": "varchar(255)",
        "amount": "double(14,2)"
    },
    "old": [{
        "amount": "200.0"
    }],
    "pkNames": ["id"],
    "sql": "",
    "sqlType": {
        "id": 4,
        "user_id": 12,
        "amount": 8
    },
    "table": "t_account",
    "ts": 1582724016353,
    "type": "UPDATE"
}

可以看到,除了修改后的内容,canal还用old字段记录了修改前字段的值。

最后,我们删除这条数据:delete from t_account where id = 4;

相应的,MQ消费到数据如下:

{
    "data": [{
        "id": "4",
        "user_id": "4",
        "amount": "150.0"
    }],
    "database": "seata",
    "es": 1582724155000,
    "id": 4,
    "isDdl": false,
    "mysqlType": {
        "id": "int(11)",
        "user_id": "varchar(255)",
        "amount": "double(14,2)"
    },
    "old": null,
    "pkNames": ["id"],
    "sql": "",
    "sqlType": {
        "id": 4,
        "user_id": 12,
        "amount": 8
    },
    "table": "t_account",
    "ts": 1582724155370,
    "type": "DELETE"
}

监听到数据库表的变化之后,就可以根据自己的业务场景,对这些数据进行业务上的处理啦。

五、总结

可以看到,利用canal组件可以很方便的完成对数据变化的监听。如果利用消息队列来做数据同步的话,只有一点需要格外注意,即消息顺序性的问题。

binlog本身是有序的,但写入到mq之后如何保障顺序是值得关注的问题。

mq顺序性问题这里,可以看到canal的消费顺序性相关解答。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,636评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,890评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,680评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,766评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,665评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,045评论 1 276
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,515评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,182评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,334评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,274评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,319评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,002评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,599评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,675评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,917评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,309评论 2 345
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,885评论 2 341

推荐阅读更多精彩内容

  • 业务场景描述 在我们的项目中有些配置信息持久化在数据库中,这些配置信息又是在系统启动后自动加载并缓存在local或...
    点融黑帮阅读 14,507评论 2 20
  • 前言 如今大型的IT系统中,都会使用分布式的方式,同时会有非常多的中间件,如redis、消息队列、大数据存储等,但...
    IT米粉阅读 22,499评论 2 8
  • 在滚滚乌云中,耸立云端的城市高楼群仍然继续建造者,源源不断来来往往的运输车、树立在楼侧的安全建造的标语;...
    甄心_心灵守护者阅读 399评论 0 1
  • 你能不能说点实话 今天又整贫困结果和往年一样,一群人,人模狗样的说着自己如何可怜,我真想问问,你们用不用这样……实...
    呼吸的鲸鱼阅读 87评论 0 0
  • 非要穿上这件Tshirt ,还让我给她照相,一边吃着棒棒糖,一边搞怪表情,简直美翻了! 今天回老家呆了一天,和大姑...
    杜学智阅读 266评论 0 1