实时数仓中架构如下图:
实时架构主要数据来源有3块
日志文件(或设备上的心跳数据),由Flume或者Springboot程序直接写到kafka中
接口API,使用程序调用接口
业务数据库,大部分是使用mysql,mysql数据可以由binary log进行实时获取到kafka集群
一,实时获取业务数据库
离线获取mysql数据,一般通过JDBC获取数据,这样对mysql的负载也是很大;
实时获取mysql数据,一般通过binary log获取数据,这样对mysql的负载很小。
实时获取binlog的工具:canal/Maxwell/Debezium; canal是阿里开发的,Maxwell国外的,上升很快
1.1 canal介绍
- 原始场景;阿里otter中间件的一部分
otter是阿里用于进行异地数据库之间的同步框架,canal是其中一部分 - 场景1:更新缓存
- 场景2:抓取业务数据新增变化表,用于制作拉链表。
拉链表:需要知道数据今天变化情况,canal只是监控数据库数据变化,然后把数据抽到新的数据库中,做拉链表
canal不消耗mysql性能,canal监控mysql文件变化 - 场景3:抓取业务表的新增变化数据,用于制作实时统计
1.2 canal的工作原理
参考mysql主备复制实现
canal的工作原理很简单,就是把自己伪装成salve, 假装从master复制数据
读取binary log需要master授权和用户名密码;
Maxwell也是读取binary log
二,mysql的binlog
2.1 什么是binlog
MySQL的二进制日志可以说是MySQL最重要的日志了,他记录了所有的DDL和DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗时间,MySQL的二进制日志是事务安全型的。
一般来说开启二进制日志大概会有1%的性能损耗。二进制的两个最重要的使用场景:
1. MySQL Replication在Master端开启binlog, Master把它的二进制日志传递给slaves来达到master-slave数据一致的目的
2. 数据恢复,通过使用mysqlbinlog工具来使数据恢复
二进制包括两类文件;二进制日志索引文件(文件名后缀为.index)用于记录所在的二进制文件
二进制日志文件(文件名后缀为.00000*)记录数据库所有的DDL和DML(除了数据查询语句)语句事件
二进制日志文件量比较大
mysqlbinlog工具可以用binlog进行备份恢复
2.2 MySQL binlog开启
Linux(Centos 7)服务器上的MySQL:
#修改binlog
cd /etc/
vim my.cnf
#/etc/my.cnf是mysql下关键配置文件
#默认mysql数据目录是/var/lib/mysql
binlog文件也在这个目录下:文件中名类似:mysql-bin.000106
data数据文件是ibdata1
开启binlog:
修改配置文件:/etc/my.cnf
1. 添加:log-bin=mysql-bin
2. bin-format=row
3. binlog-do-db=testdb //指定binlog监控的数据库
4. server-id=1 //如果mysql是集群,canal的server-id要设置高点,不要与mysql重复了
2.3 binlog格式
statement 语句级
如:update xxx set xxx= xx where xxx =xxx,会把SQL记录在binlog中,会把这条SQL重新执行一遍,
优点: 记录的是语句,日志文件不会很大
缺点: 有可能照成mysql主从数据不一致,比如使用now(),rand()都有可能照成数据不一致row 行级
语句执行后会记录每一行的结果,在slave时,直接拿每一行的值去覆盖
优点: 解决数据不一致问题,大数据抽取binlog进行数据解析方便
缺点: 如果一次更改数据量很大,有几百万时,会出现大量日志;遇到批量执行的SQL出现时,会出现大量的冗余-
mixed 混合
一般情况下使用statement, 遇到几种特殊情况,使用row,
特殊情况:NOW(), RAND(), UUID(), UDF, INSERT DELAYED, AUTO_INCREMENT
优点: binlog日志文件不会太大,同时也能保证主从数据一致
缺点: 大数据去抽取binlog日志文件解析得到数据不方便,数据分析使用方去分析数据最方便直接,分析语句太麻烦2.4 重启Mysql
sudo systemctl restart mysql//重启mysql
sudo service sshd restart //重启sshd命令
ps -ef|grep mysql//查看mysql进程是否开启
改完/etc/my.cnf文件后需要把文件权限降级,
sudo chmod 644 /etc/my.cnf
新建一个canal账号,给canal使用,给canal的账号赋权限:SELECT,REPLICATION SLAVE, REPLICATION CLIENT
GRANT SELECT,REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO canal@'%' IDENTIFIED BY 'canal'
三,canal安装
安装前先看看canal的原理图,如下:
canal的github上的地址:https://github.com/alibaba/canal/releases/
这里下载1.14版本,
一个canal可以对应多个数据库配置,一个Server可以对应多个Instance,每个Instance可以独立监控一台MySQL服务器
下载后在Linux目录/usr/local中新建canal目录,把下载的文件canal.deployer-1.1.4.tar.gz
上传到该目录中,运行如下语句进行解压:
tar -zxvf canal.deployer-1.1.4.tar.gz
解压后可以把文件canal.deployer-1.1.4.tar.gz删了
下面需要修改配置文件:
-
需要修改配置文件
./conf/canal.properties
**1. canal.zkServers是配置zookeeper的信息,如果canal配置集群模式,这个需要配,如果只是单机版不需要配置,我这边是单机版,就没有配置了;
- canal.serverMode配置成kafka,这样canal会直接把数据写入到kafka中;
- canal.mq.servers 配置成kafka的broker server信息,让canal知道怎么把数据写入kafka**
-
需要修改配置文件./conf/example/instance.properties
**1. 修改canal.instance.master.address的值,改成你需要监控的mysql的host和port;- 修改canal.instance.dbUsername和canal.instance.dbPassword的值,改成你要监控的mysql的账号和密码
- 修改canal.mq.topic的值,改成在kafka中的topic
- canal.mq.partition表示数据写到kafka中哪个分片中,一般不知道分片,给注释掉
- canal.mq.partitionsNum表示建的topic有几个分区,可以根据数据量大小设置分区数
**注意: 1. canal是一个数据源一个topic,这样会把每张表的binlog都放在一个topic中,所以后面需要做实时分流
- 一个instance.properties对应一个数据库服务器,一个canal可以配置多个instance,可以配置多个instance.properties
- 一个instance中能监控多少个数据库,取决于mysql的配置文件/ect/my.cnf中binlog-do-db的值**
- 启动canal
#启动命令
cd ./canal/bin
./startup.sh
启动成功后,JPS后会出现CanalLauncher
四,Canal高可用配置
canal高可用配置原理如下图:
canal对比Maxwell, Maxwell没有高可用,服务故障了,就重启,keepalived工具可以监测进程是否挂了,挂了后可以触发重启
- copy当前服务器上的canal文件夹到standby服务器上,代码如下:
scp -r ./canal hadoop@dw-test-cluster-006:/usr/local/tools/
修改配置文件/conf/example/instance.properties
之前canal版本需要把server.id修改,不过1.1.4这版已经自动分配service.id了,不需要再修改配置了
这样两台机器的配置一样修改配置文件/conf/canal.properties中的zookeeper的配置
canal.zkServers = dw-test-cluster-001:2181,dw-test-cluster-002:2181,dw-test-cluster-003:2181,dw-test-cluster-004:2181,dw-test-cluster-005:2181,dw-test-cluster-006:2181,dw-test-cluster-007:2181
- 两个服务器都重启canal
./canal/bin/restart.sh
用JPS检查下服务是否启动成功。
如下图在改下配置,防止一个节点挂了,另一个节点发重复数据,如下图配置:
-
canal生成kafka topic时,分区平衡设置
在example/Instance.properties文件中加如下配置 表示全库全表的主键 canal.mq.partitionHash=.*\\..*:$pk