数据同步解决方案canal

前言:

继上一篇网站首页高可用之后,今天来谈一谈canal+mysql+mq+Elasticsearch来实现一下几个功能:

1.数据监控微服务的开发
2.首页广告缓存更新的功能,
3.商品上架索引库导入数据功能,
4.商品下架索引库删除数据功能。

canal可以用来监控数据库数据的变化,从而获得新增数据,或者修改的数据。一幅简单的图来了解一下


流程图

原理相对比较简单:

  1. canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  2. mysql master收到dump请求,开始推送binary log给slave(也就是canal)
  3. canal解析binary log对象(原始为byte流)

环境部署

centos7+docker+canal(难不成有人没装好?下面简单讲一下,搞不通的去www.baidu.com

MySQL

1.查看当前MySQL是否开启了binlog日志

SHOW VARIABLES LIKE '%log_bin%'

2.如果log_bin的值为OFF是未开启,为ON是已开启。
修改/etc/my.cnf 需要开启binlog模式。

[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1

3.修改完成之后,重启mysqld的服务。
4.进入mysql

mysql -h localhost -u root -p

5.创建账号用于测试使用:使用root账号创建用户并授予权限

create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT, SUPER ON  *.* TO 'canal'@'%'; FLUSH PRIVILEGES;

canal服务端安装配置

1.下载地址canal
https://github.com/alibaba/canal/releases/tag/canal-1.0.24

下载位置

2.下载之后上传到linux系统中,解压缩到指定的目录/usr/local/canal
canan目录结构

3.修改 exmaple下的实例配置

vi conf/example/instance.properties
修改如图所示的几个参数
注意:自己的虚拟机ip+3306的端口,用户名和密码随意

4.指定读取位置
进入虚拟机中MySQL执行下面语句查看binlog所在位置

show master status;

大概显示如下:
+------------------+----------+--------------+------------------+-------------------+
|File |Position |Binlog_Do_DB |Binlog_Ignore_DB |Executed_Gtid_Set|
+------------------+----------+--------------+------------------+-------------------+
|mysql-bin.000001 |120 | | | |
+------------------+----------+--------------+------------------+-------------------+
1 row in set (0.00sec)

4.1 如果file中binlog文件不为 mysql-bin.000001 可以重置mysql
mysql > reset master;

5.查看canal配置文件

vim /usr/local/canal/conf/example/meta.dat

找到对应的binlog信息更改和MySQL查询结果一致即可

"journalName":"mysql-bin.000001","position":120,"

注意:如果不一致,可能导致未知错误

  1. 进入canal目录启动服务
./bin/startup.sh

7.查看日志:

cat /usr/local/canal/logs/canal/canal.log
启动成功

这样就表示启动成功了。

现在回到IntelliJ IDEA中

构建数据监控微服务
当用户执行数据库的操作的时候,binlog 日志会被canal捕获到,并解析出数据。我们就可以将解析出来的数据进行相应的逻辑处理。我们这里使用的一个开源的项目,它实现了springboot与canal的集成。比原生的canal更加优雅。
下载地址:
https://github.com/chenqian56131/spring-boot-starter-canal
使用前需要将starter-canal安装到本地仓库。我们可以参照它提供的canal-test,进行代码实现。(具体怎么实现,我想大家都是优秀的程序员了,maven命令应该不用我教吧)

数据监控微服务的搭建应该不用我手把手教吧!!!

算了谁叫我心太软, 我就粘贴部分代码仅供参考:
1.pom文件

<dependencies>
        <dependency>
            <groupId>com.xpand</groupId>
            <artifactId>starter-canal</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
        </dependency>
    </dependencies>

2.启动类
追加注解

@EnableCanalClient  //声明canal客户端  监听数据服务

3.application.properties
canal.client.instances.example.host=192.168.200.128
canal.client.instances.example.port=11111
canal.client.instances.example.batchSize=1000
spring.rabbitmq.host=192.168.200.128
4.监听类的部分展示代码:

@CanalEventListener  //声明当前类是canal监听类

监听方法:

//对于adupdate方法监听的是changgou_business数据库中的tb_ad表
    @ListenPoint(schema = "changgou_business", table = {"tb_ad"})
    public void adUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData)

修改前数据:

 for (CanalEntry.Column column : rowData.getBeforeColumnsList())

修改后数据
for (CanalEntry.Column column : rowData.getAfterColumnsList())
就这样吧!

重点来了:接着广告缓存现在来实现广告缓存更新

需求分析:

当广告表的数据发生变化时,更新redis中的广告数据

实现思路

(1)修改数据监控微服务,监控广告表,当发生增删改操作时,提取广告位置key,发送到rabbitmq
(2)从rabbitmq中提取消息,通过OkHttpClient调用ad_update来实现对广告缓存数据的更新。

重点图
代码实现

1.用java代码实现一个简单的工作队列,用于接收广告更新通知
在监听类中当指定字段的值发生改变时,将数据发送到mq中
2.用java代码实现一个监听队列的监听类

@RabbitListener(queues = "队列名称")

通过OkHttpClient或者RestTemplate发起远程调用
3.测试,启动eureka和广告微服务,观察控制台输出和数据同步效果。

商品上架索引库导入数据

需求分析

利用canal监听数据库,当商品上架将商品的sku列表导入或更新索引库。

实现思路

(1)在数据监控微服务中监控tb_spu表的数据,当tb_spu发生更改且商品上架时,将spu的id发送到rabbitmq。
(2)在rabbitmq管理后台创建商品上架交换器(fanout)。使用分列模式的交换器是考虑商品上架会有很多种逻辑需要处理,导入索引库只是其中一项,另外还有商品详细页静态化等操作。这样我们可以创建导入索引库的队列和商品详细页静态化队列并与商品上架交换器进行绑定。
(3)搜索微服务从rabbitmq的导入索引库的队列中提取spu的id,通过feign调用商品微服务得到sku的列表,并且通过调用elasticsearch的高级restAPI 将sku列表导入到索引库。


重点图
代码实现

1.创建交换机和队列,并且在数据监控微服务中创建spu的监听类

索引库环境准备

  1. 这里我用的是elasticsearch 5.6.8,是一个比较老的版本的,6以后的API会有差异,建议大家先去看看。
  2. 创建索引库实体类,这里只展示部分代码
@Document(indexName = "skuinfo", type = "docs")
public class Skuinfo implements Serializable {
    //商品id,同时也是商品编号
    @Id
    @Field(index = true, store = true, type = FieldType.Keyword)
    private Long id;

    //SKU名称
    @Field(index = true, store = true, type = FieldType.Text, analyzer = "ik_smart")
    private String name;

    //商品价格,单位为:元
    @Field(index = true, store = true, type = FieldType.Double)
    private Long price;

    //库存数量
    @Field(index = true, store = true, type = FieldType.Integer)
    private Integer num;

    //商品图片
    @Field(index = false, store = true, type = FieldType.Text)
    private String image;

    //商品状态,1-正常,2-下架,3-删除
    @Field(index = true, store = true, type = FieldType.Keyword)
    private String status;

    //创建时间
    private Date createTime;

    //更新时间
    private Date updateTime;

    //是否默认
    @Field(index = true, store = true, type = FieldType.Keyword)
    private String isDefault;

    //SPUID
    @Field(index = true, store = true, type = FieldType.Long)
    private Long spuId;

    //类目ID
    @Field(index = true, store = true, type = FieldType.Long)
    private Long categoryId;

    //类目名称
    @Field(index = true, store = true, type = FieldType.Keyword)
    private String categoryName;

    //品牌名称
    @Field(index = true, store = true, type = FieldType.Keyword)
    private String brandName;

    //规格
    private String spec;

    //规格参数
    private Map<String, Object> specMap;

  部分代码省略......
}

搜索微服务搭建

1.部分pom文件:

  <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <!--springboot对elasticsearch的启动依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-thymeleaf</artifactId>
        </dependency>
         <!--mq消息启动依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

配置文件和启动类就不一一介绍了!
2.java代码实现mq交换机和队列,其实和上面的数据监控服务的交换机和队列一样
3.在商品服务中定义根据spu的id查询sku的数据,如果参数中携带spuid就查询指定sku数据否则就查询所有的sku数据(其实我这里这样写是为了以后的扩展)。

@GetMapping("spu/{spuId}")
    public List<Sku> findSkuListByspuid(@PathVariable("spuId") String spuId) {
        HashMap<String, Object> searchMap = new HashMap<>();

        //如果接收的的参数不是all,那么查询指定id
        if (!"all".equals(spuId)) {
            searchMap.put("spuId", spuId);
        }
        //如果接收的的参数是all, 那么查询审核状态为1的商品
        searchMap.put("status", "1");
        List<Sku> skuList = skuService.findList(searchMap);
        System.out.println("根据spu的id查询sku的数据和:" + skuList.size());
        return skuList;
    }

接下来将改接口暴露出来,通过feign的调用

 @GetMapping("sku/spu/{spuId}")
    public List<Sku> findSkuListByspuid(@PathVariable("spuId") String spuId);
搜索微服务批量导入数据逻辑

1.新增ESManagerMapper接口

public interface ESMangerMapper extends ElasticsearchRepository<Skuinfo, Long> {
}

2.创建接口EsManagerService

public interface EsMangerService {

    //创建索引库结构
    void createMappingAndIndex();

    //导入全部数据进入es
    void importAll();

    //根据spuid查询skuList,再导入索引库
    void importDataBySpuId(String spuId);

    //根据spuid删除es索引库中相关的sku数据
    void delDataBySpuId(String spuId);
}

3.创建实现类

@Service
public class EsMangerServiceImpl implements EsMangerService {

    @Autowired
    private ElasticsearchTemplate elasticsearchTemplate;

    @Autowired
    private SkuFeign skuFeign;

    @Autowired
    private ESMangerMapper esMangerMapper;

    //创建索引库
    @Override
    public void createMappingAndIndex() {
        //根据映射实体类来创建索引库
        elasticsearchTemplate.createIndex(Skuinfo.class);
        //根据映射实体类来创建映射
        elasticsearchTemplate.putMapping(Skuinfo.class);

        System.out.println("创建成功");
    }

    //导入全部sku数据进入es
    @Override
    public void importAll() {
        //查询sku所有数据
        List<Sku> skuList = skuFeign.findSkuListByspuid("all");
        if (skuList == null || skuList.size() <= 0) {
            MyBaseException.throwe(ResultCode.DATA_NOT_EXIST);
        }
        //skulist转换为json
        String skuJson = JSON.toJSONString(skuList);
        //json转化成skuinfo
        List<Skuinfo> skuinfoList = JSON.parseArray(skuJson, Skuinfo.class);

        //遍历集合 将规格信息装成map
        for (Skuinfo skuinfo : skuinfoList) {
            System.out.println("规格信息:" + skuinfo.getSpec());
            Map map = JSON.parseObject(skuinfo.getSpec(), Map.class);
            skuinfo.setSpecMap(map);
        }
        //导入索引库
        esMangerMapper.saveAll(skuinfoList);
    }


    //根据spuid查询skuList,再导入索引库
    @Override
    public void importDataBySpuId(String spuId) {
        //根据spuid查询skuList
        List<Sku> skuList = skuFeign.findSkuListByspuid(spuId);

        if (skuList == null || skuList.size() <= 0) {
            MyBaseException.throwe(ResultCode.DATA_NOT_EXIST);
        }
        //skulist转换为json
        String skuJson = JSON.toJSONString(skuList);
        //json转化成skuinfo
        List<Skuinfo> skuinfoList = JSON.parseArray(skuJson, Skuinfo.class);
        //遍历集合 将规格信息装成map
        for (Skuinfo skuinfo : skuinfoList) {
            System.out.println("规格信息:" + skuinfo.getSpec());
            Map map = JSON.parseObject(skuinfo.getSpec(), Map.class);
            skuinfo.setSpecMap(map);
        }
        //导入索引库
        esMangerMapper.saveAll(skuinfoList);
    }


    //根据spuid删除es索引库中相关的sku数据
    @Override
    public void delDataBySpuId(String spuId) {
        //根据spuid查询skuList
        List<Sku> skuList = skuFeign.findSkuListByspuid(spuId);
        if (skuList == null || skuList.size() <= 0) {
            MyBaseException.throwe(ResultCode.DATA_NOT_EXIST);
        }
        for (Sku sku : skuList) {
            String id = sku.getId();
            esMangerMapper.deleteById(Long.parseLong(id));
        }

    }
}

突然觉得CRUD的代码没什么亮点,到底写不写呢?

随随便便测试一下,创建索引库和导入全部数据都能成功
接下来是有当商品上架和下架时自动导入商品信息到es中?

4.商品上架的监听类,当数据库的商品上架时,数据监控服务canal监听到发送spuid到mq队列中,搜索微服务监听到mq中的消息时,会根据spuid去查询sku集合,并导入索引库中。

@Component
public class GoodsUpListener {
    @Autowired
    private EsMangerService esMangerService;
    @RabbitListener(queues = RabbitmqConfig.SEARCH_ADD_QUEUE)
    public void receiveMessage(String spuId) {
        System.out.println("导入索引库的spuid:" + spuId);
        //根据spuid查询skuList,再导入索引库
        esMangerService.importDataBySpuId(spuId);
    }
}

5.测试
5.1启动环境eureka 、elasticsearch 、canal服务端、canal数据监控微服务、rabbitmq
5.2启动商品微服务、搜索微服务
5.3修改spu表中某记录的为上架,观察控制台输出,启动kibana查询记录是否导入成功

商品下架索引库删除数据

需求分析

商品下架后将商品从索引库中移除。

实现思路

与商品上架的实现思路非常类似。
(1)在数据监控微服务中监控spu表的数据,当spu发生更改商品下架时,将spu的id发送到rabbitmq。
(2)在rabbitmq管理后台创建商品下架交换器(fanout)。使用分列模式的交换器是考虑商品下架会有很多种逻辑需要处理,索引库删除数据只是其中一项,另外还有删除商品详细页等操作。
(3)搜索微服务从rabbitmq的的队列中提取spu的id,通过调用elasticsearch的高级restAPI 将相关的sku列表从索引库删除。


重点图
代码实现

1.创建交换器与队列
这里我就不粘贴出来,随意发挥就好
2.canal服务中监听商品的下架,并把下架的spuid发送消息队列中
3.根据spuId删除索引数据,代码已经在上面演示了(什么?你说你找不到!)
4.监听类:接收mq消息,执行索引库删除

@Component
public class GoodsDeleteListener {
    @Autowired
    private EsMangerService esMangerService;
    @RabbitListener(queues = RabbitmqConfig.SEARCH_DELETE_QUEUE)
    public void receiveMessage(String spuId) {
        System.out.println("删除es索引库的spuid:" + spuId);
        //根据spuid删除es索引库中相关的sku数据
        esMangerService.delDataBySpuId(spuId);
    }
}
第二次来到简书,如果写的有误,欢迎在评论交流!(谢谢大家点赞评论关注)
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 196,264评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,549评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,389评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,616评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,461评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,351评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,776评论 3 387
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,414评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,722评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,760评论 2 314
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,537评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,381评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,787评论 3 300
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,030评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,304评论 1 252
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,734评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,943评论 2 336