利用maxwell组件监听mysql之binlog日志进行实时同步数据

一 maxwell组件介绍

Maxwell是一个守护进程,它能监听并读取MySQL的binlog,然后解析输出为json,支持将数据输出到Kafka、Kinesis或其他流媒体平台,支持表和库过滤。

注意:对于增删改都有输出,但对于truncate操作,没有输出。

源码地址:https://github.com/zendesk/maxwell

下载地址: https://github.com/zendesk/maxwell/releases/download/v1.21.1/maxwell-1.21.1.tar.gz

示意如下:

  mysql> insert into `test`.`maxwell` set id = 1, daemon = 'Stanislaw Lem';
  maxwell: {
    "database": "test",
    "table": "maxwell",
    "type": "insert",
    "ts": 1449786310,
    "xid": 940752,
    "commit": true,
    "data": { "id":1, "daemon": "Stanislaw Lem" }
  }

  mysql> update test.maxwell set daemon = 'firebus!  firebus!' where id = 1;
  maxwell: {
    "database": "test",
    "table": "maxwell",
    "type": "update",
    "ts": 1449786341,
    "xid": 940786,
    "commit": true,
    "data": {"id":1, "daemon": "Firebus!  Firebus!"},
    "old":  {"daemon": "Stanislaw Lem"}
  }

二 设备与组件版本梳理:

1. linux内核版本(CentOS Linux 7):(命令:uname -a)

Linux slave1 3.10.0-693.el7.x86_64 #1 SMP Tue Aug 22 21:09:27 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux

2. mysql版本:(SQL命令:select version(); 或 status)

Server version: 5.6.43-log MySQL Community Server (GPL)

3. maxwell版本:maxwell-1.21.1

4. kafka版本:kafka_2.11-2.2.0

5. zookeeper版本:zookeeper-3.4.5-cdh5.7.0

三 设备介绍

二台虚拟机,分别为salve1(192.168.175.21)和slave2(192.168.175.22),slave1上安装mysql和kafka,slave2上启动maxwell守护进程和zookeeper。

四 简要过程梳理

主流程示意图

如上图:这次主要介绍从binlog > maxwell > kafka的过程,而kafka后面的过程,就可以有很多种了,比如:

(1)binlog > maxwell > kafka > spark streaming > hdfs、kudu;

(2)binlog > maxwell > kafka > flume > hdfs;

(3)binlog > maxwell > kafka > es > kibana;

第一种spark streaming+hdfs、kudu,是目前我所在公司中使用的场景。简要流程梳理如下:

1. 在slave1上安装mysql;
2. 在slave2上启动maxwell,测试是否可以正常读取binlog;
3. maxwell初步测试ok;
4. 在slave2上安装并启动zk;
5. 在slave1上安装并启动kafka server;
6. 通过kafka producer和consumer测试启动是否成功;
7. 启动maxwell将解析后的json数据发送到kafka;
8. 启动kafka consumer测试数据是否成功发送。

五 详细过程

1 在slave1(192.168.175.21)上安装mysql

详细过程,可参考笔记: https://www.jianshu.com/p/09936d9c7bf2

(1)在创建root账号并设置远程访问之后,接着创建maxwell账号并设置远程访问和权限:
mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY 'XXXXXX';

mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';

mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';

权限:Maxwell需要权限将状态存储在schema_database选项(默认Maxwell)指定的数据库中。

(2)针对maxwell配置mysql: 确保配置了server_id,并打开了基于行的复制。

可参考maxwell快速入门文档: https://github.com/zendesk/maxwell/blob/master/docs/docs/quickstart.md

$ vi my.cnf

[mysqld]
server_id=1
log-bin=master
binlog_format=row

注意: binlog_format是一个基于会话的属性需要关闭所有活动连接才能完全转换为基于行的复制。

mysql配置文件my.cnf查找方式:

$ whereis my

my:/etc/my.cnf

配置完成后,要重启mysql服务,方可生效

2 在slave2上启动maxwell,测试是否可以正常读取binlog

(1)在slave2上测试是否可以进行远程访问数据
mysql -h 192.168.175.21 -P 3306 - u root -proot   #root登录成功后,窗口无需关闭,后面还要接着测试用

mysql -h 192.168.175.21 -P 3306 -u maxwell -p111111
(2)下载并解压maxwell
#解压到指定的文件夹

tar xzvf maxwell-1.21.1.tar.gz -C /usr/loca/hadoop/app
(3)命令行启动maxwell,将解析后的日志输出到控制台进行测试
[root@slave2 maxwell-1.21.1]# pwd
/usr/local/hadoop/app/maxwell-1.21.1

[root@slave2 maxwell-1.21.1]# bin/maxwell --user='maxwell' --password='111111' --host='192.168.175.21' --producer=stdout

启动成功后,会展示如下日志内容:

[root@slave2 maxwell-1.21.1]# bin/maxwell --user='maxwell' --password='111111' --host='192.168.175.21' --producer=stdout

Using kafka version: 1.0.0
15:49:04,967 WARN  MaxwellMetrics - Metrics will not be exposed: metricsReportingType not configured.
15:49:08,334 INFO  Maxwell - Maxwell v1.21.1 is booting (StdoutProducer), starting at Position[BinlogPosition[master.000002:984013], lastHeartbeat=1555141484690]
15:49:10,071 INFO  MysqlSavedSchema - Restoring schema id 2 (last modified at Position[BinlogPosition[master.000001:40754], lastHeartbeat=1555112822550])
15:49:12,759 INFO  MysqlSavedSchema - Restoring schema id 1 (last modified at Position[BinlogPosition[master.000001:3816], lastHeartbeat=0])
15:49:13,122 INFO  MysqlSavedSchema - beginning to play deltas...
15:49:13,138 INFO  MysqlSavedSchema - played 1 deltas in 12ms
15:49:13,426 INFO  BinlogConnectorReplicator - Setting initial binlog pos to: master.000002:984013
15:49:13,997 INFO  BinaryLogClient - Connected to 192.168.175.21:3306 at master.000002/984013 (sid:6379, cid:59)
15:49:14,003 INFO  BinlogConnectorLifecycleListener - Binlog connected.

在slave2上root连接mysql的窗口中,执行insert,delete,update操作:

MySQL [mysql]> insert into tb_dept (id,name,description) values(16,'xiaoman','manger');
Query OK, 1 row affected (0.02 sec)

MySQL [mysql]> delete from tb_dept where id = 16;
Query OK, 1 row affected (0.01 sec)

MySQL [mysql]> update tb_dept set name='xiaofei' where id=14;
Query OK, 1 row affected (0.02 sec)
Rows matched: 1  Changed: 1  Warnings: 0

在maxwell的stdout窗口中会产生如下日志:

{"database":"mysql","table":"tb_dept","type":"insert","ts":1555142065,"xid":6349,"commit":true,"data":{"Id":16,"Name":"xiaoman","description":"manger"}}

{"database":"mysql","table":"tb_dept","type":"delete","ts":1555142096,"xid":6361,"commit":true,"data":{"Id":16,"Name":"xiaoman","description":"manger"}}

{"database":"mysql","table":"tb_dept","type":"update","ts":1555142157,"xid":6383,"commit":true,"data":{"Id":14,"Name":"xiaofei","description":"sales"},"old":{"Name":"xiaoming1"}}

3 在slave2上安装并启动zk

具体安装方式,可参考笔记:https://www.jianshu.com/p/10d5a20ab9b7

注意启动完成后,要检查zk是否安装成功

#查看状态

zkServer.sh status

4 在slave1上安装并启动kafka server:

(1)具体启动安装方式,可参考笔记:https://www.jianshu.com/p/3d017bdbfb3c

修改kafka配置文件 $KAFKA_HOME/config/server.properties:

broker.id=1
listeners=PLAINTEXT://slave1:9092
log.dirs=/usr/local/app/tmp/kafka-logs
zookeeper.connect=slave2:2181
(2)通过kafka producer和consumer测试启动是否成功

5 在slave1的kafka上创建名为maxwell的topic

(1)创建topic
kafka-topics.sh --create --zookeeper slave2:2181 --replication-factor 1 --partitions 1 --topic maxwell
(2)检查topic是否创建成功
#查看topic列表:

kafka-topics.sh --list --zookeeper slave2:2181

#查看topic具体描述:

kafka-topics.sh --describe --zookeeper slave2:2181  --topic maxwell

6 在slave1上启动消费topic名称为maxwell的kafka consumer:

kafka-console-consumer.sh --zookeeper slave2:2181 --topic maxwell --from-beginning

7 在slave2上启动maxwell进程,将解析后的json数据输出到kafka:

bin/maxwell --user='maxwell' --password='111111' --host='192.168.175.21' --producer=kafka --kafka.bootstrap.servers=192.168.175.21:9092 --kafka_topic=maxwell

注意:启动之前,需要将输出到stdout上的maxwell进程停掉,否则会报错。

同样,启动成功后,会输出Binlog连接成功的日志信息。如下:

16:22:19,127 INFO  AppInfoParser - Kafka version : 1.0.0
16:22:19,129 INFO  AppInfoParser - Kafka commitId : aaa7af6d4a11b29d
16:22:19,391 INFO  Maxwell - Maxwell v1.21.1 is booting (MaxwellKafkaProducer), starting at Position[BinlogPosition[master.000002:1084309], lastHeartbeat=1555143612778]
16:22:20,916 INFO  MysqlSavedSchema - Restoring schema id 2 (last modified at Position[BinlogPosition[master.000001:40754], lastHeartbeat=1555112822550])
16:22:22,861 INFO  MysqlSavedSchema - Restoring schema id 1 (last modified at Position[BinlogPosition[master.000001:3816], lastHeartbeat=0])
16:22:23,178 INFO  MysqlSavedSchema - beginning to play deltas...
16:22:23,192 INFO  MysqlSavedSchema - played 1 deltas in 7ms
16:22:23,482 INFO  BinlogConnectorReplicator - Setting initial binlog pos to: master.000002:1084309
16:22:23,887 INFO  BinaryLogClient - Connected to 192.168.175.21:3306 at master.000002/1084309 (sid:6379, cid:64)
16:22:23,894 INFO  BinlogConnectorLifecycleListener - Binlog connected.

8 验证

(1)在slave1中的root账号登录的mysql窗口中,执行一条更新操作:
MySQL [mysql]> update tb_dept set name='xiaofei123' where id=14;
Query OK, 1 row affected (0.02 sec)
Rows matched: 1  Changed: 1  Warnings: 0
(2)随机在消费topic=maxwell的kafka consumer中输出日志如下:
{"database":"mysql","table":"tb_dept","type":"update","ts":1555143780,"xid":6888,"commit":true,"data":{"Id":14,"Name":"xiaofei123","description":"sales"},"old":{"Name":"xiaofei"}}

到此,流程梳理完毕!

六 遗留问题:

1 mysql数据库

设置远程访问之后,在本地一直访问不了,尝试修改密码同样访问不了。猜测是由于设置远程访问时的%百分号影响的。故而这里访问数据库用的都是远程访问的,其实正常生产环境,我们也都是远程进行访问的。

ERROR 1045 (28000): Access denied for user 'root'@'localhost' (using password: YES)

2 kafka:

在将maxwell进程和kafka server运行在同一台虚拟机上时,启动kafka consumer时,经常报ConsumerRebalanceFailedException的异常,如下:没有找到具体原因,最终通过kafka server和maxwell不运行在一起将问题解决。其实生产环境,一般kafka都是有专用的集群,也都不会和maxwell运行在一起。

[2019-04-13 13:12:03,599] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$)
kafka.common.ConsumerRebalanceFailedException: console-consumer-72779_slave2-1555132312263-9d33f2d8 can't rebalance after 4 retries
    at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:660)
    at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:967)
    at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:1001)
    at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:163)
    at kafka.consumer.OldConsumer.<init>(BaseConsumer.scala:75)
    at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:63)
    at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:47)
    at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)

如有疑问,请给我留言。我会在看到之后,第一时间给你回复的。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,236评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,867评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,715评论 0 340
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,899评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,895评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,733评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,085评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,722评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,025评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,696评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,816评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,447评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,057评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,009评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,254评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,204评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,561评论 2 343

推荐阅读更多精彩内容