1. maxwell简介
maxwell,可以监听mysql binlog文件,实时进行更新,以json格式,写到kafka,redis,Kinesis,sqs,pubsub,rabbitmq,file等。
官网: http://maxwells-daemon.io
下载地址:https://github.com/zendesk/maxwell
2. 修改mysql binlog格式为row模式
2.1 查看binlog是否开启
mysql> show variables like '%log_bin%';
2.2 退出mysql,查看配置文件,/etc/my.conf(macOS的路径)
vim /etc/my.conf
2.3 修改binlog_format 为 row
log-bin=mysql-bin
binlog_format=row
server-id=1
2.4 重启mysql
service mysqld restart
binlog format 三种方式说明:
https://www.cnblogs.com/xingyunfashi/p/8431780.html
3. mysql权限配置
mysql> GRANT ALL on maxwell.* to 'maxwell'@'%' identified by 'XXXXXX';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
4. 执行maxwell命令,开启监听
# 未过滤
bin/maxwell --user='maxwell' --password='XXXXXX' --host='127.0.0.1' \
--producer=kafka --kafka.bootstrap.servers=localhost:9092
默认消息会写到topic为 maxwell中
执行结果# 过滤数据库
/usr/local/maxwell/bin/maxwell --user='maxwell' --password='XXXXXX' --host='127.0.0.1' \
--producer=kafka --kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092 \
--kafka_topic=maxwells --filter 'exclude: dbName01.*, include: dbName02.*'
更多参数设置参考:http://maxwells-daemon.io/config/
5. kafka代码监听
# pom.xml
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.1</version>
</dependency>
</dependencies>
# KafkaConsumer.java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumer {
public static void main(String[] args) {
//连接kafka集群的参数
Properties prop = new Properties();
prop.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
prop.put("group.id", "test");
prop.put("enable.auto.commit", "true");
prop.put("auto.commit.interval.ms", "1000");
prop.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
prop.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(prop);
//订阅生产者的topic
consumer.subscribe(Arrays.asList("maxwell"));
while (true){
//poll获取元素
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
System.out.println("消费的数据为:"+record.value());
}
}
}
}
6. 测试
6.1 运行代码