这个组件基于一个很现实的需求,系统随着业务发展而扩张时,总会碰到新业务需要整合老业务数据的现象,一般的微服务会实现为新业务系统从老业务系统获取数据。这里提供一种基于Mysql Binlog的思路,新业务系统可以通过监听老业务系统的数据变化,这样在不改变老业务系统代码的前提下,新业务系统可以获取到老业务系统数据变化的通知,及时更新自己的Redis或其他缓存。
也可以用于在同一个系统中,跨表整合数据。
我自己实现这个组件是为了跨微服务整合数据到Elastic Search。
组件使用起来比较简单,需要一个前提就是Mysql开启binlog且binlog格式为ROW,格式见https://zhuanlan.zhihu.com/p/26977878
1. 配置文件
binlog:
mysql:
hosts: #配置数据库的连接信息
- name: test-db
host: localhost
port: 3306
username: root
password: root
timeOffset: 28800000 #时区偏移 单位:毫秒
- name: test-db2
host: localhost
port: 3306
username: root
password: root
timeOffset: 28800000
2. bean扫描
@SpringBootApplication(scanBasePackages = {"net.giafei"})
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
3.监听
//hostName 即为配置文件中的name
//MysqlWatcher注解继承有Component注解,所以会被Spring创建为Bean
@MysqlWatcher(hostName = "test-db", database = "test_binlog", table = "test_table1")
public class TestTable1Watcher implements IMysqlDataListener<TestTable1> {
private Logger logger = LoggerFactory.getLogger(TestTable1Watcher.class);
@Override
public void onUpdate(TestTable1 from, TestTable1 to) {
logger.info("ID 为 {} 的条目数据变更", from.getId());
logger.info("\t变化前:" + JSON.toJSONString(from, SerializerFeature.WriteDateUseDateFormat));
logger.info("\t变化后:" + JSON.toJSONString(to, SerializerFeature.WriteDateUseDateFormat));
}
@Override
public void onInsert(TestTable1 data) {
logger.info("插入ID为 {} 的数据", data.getId());
}
@Override
public void onDelete(TestTable1 data) {
logger.info("ID 为 {} 的数据被删除", data.getId());
}
}
binlog解析通过组件mysql-binlog-connector-java,数据到Entity通过fastjson,其他部分比较简单就不再赘述了。