简介
在业务开发过程中,mysql应该是最常用的数据库了。不同业务部门会有自己的mysql集群,为了解决数据孤岛问题,我们必须对数据进行同步整合,为了提高数据同步的时效性,我们一般采用cdc的方式实时同步。以下介绍笔者采用flink cdc同步mysql到hudi,用来构建实时数据湖的过程。
mysql配置
- 开启binglog
vim /usr/local/etc/my.cnf 添加以下配置
# Default Homebrew MySQL server config
[mysqld]
# Only allow connections from localhost
bind-address = 127.0.0.1
log-bin = mysql-bin
binlog-format = ROW
server_id = 1
重启mysql :service mysql.server restart
- 创建账号密码,可以修改密码复杂度
set global validate_password_policy=0
set validate_password_length=6
CREATE USER 'flinkuser'@'localhost' IDENTIFIED BY 'flinkpw';
- 授权
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT > ON . TO 'flinkuser'@'localhost' IDENTIFIED BY 'flinkpw'
- 查看授权
show grants for 'flinkuser'@'localhost'
- 把配置刷新到文件
FLUSH PRIVILEGES
hadoop配置
- 配置环境变量
export HADOOP_CLASSPATH=`hadoop classpath`
- 启动集群
sbin/start-all.sh
- 进程列表
>flink-1.13.5 % jps
57988 SecondaryNameNode
64612 YarnSessionClusterEntrypoint
57749 NameNode
58183 ResourceManager
58279 NodeManager
60969 SqlClient
64733 Jps
57855 DataNode
flink 配置
- 添加依赖
flink-sql-connector-mysql-cdc-2.2.1.jar
hudi-flink-bundle_2.11-0.10.0.jar
注:为了保证版本的兼容性,最好自己手动编译jar包
- 启动yarn-session集群(注:依赖于hadoop存储,所以只能以yarn模式启动集群)
bin/yarn-session.sh -nm flink-session-cluster -d
- 启动flink client
bin/sql-client.sh embedded -s yarn-session
- 创建mysql cdc源表
CREATE TABLE mysql_user (
id INT,
name STRING,
age INT,
dt STRING,
score DOUBLE,
create_at STRING,
update_at STRING,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database-name' = 'wlapp',
'table-name' = 'user'
);
- 创建hudi目标表
CREATE TABLE hudi_user(
id INT,
name STRING,
age INT,
dt STRING,
score DOUBLE,
create_at STRING,
update_at STRING,
PRIMARY KEY(id) NOT ENFORCED
)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://localhost:9000/user/warehouse/wlapp.db/user',
'table.type' = 'COPY_ON_WRITE',
'write.insert.drop.duplicates' = 'true'
);
- 执行etl
INSERT INTO hudi_user SELECT * FROM mysql_user;
- 程序运行之后在mysql客户端查看下binlog状态
show master status
运行结果
-
yarn资源管理界面
-
flink作业管理界面
结
本例中重点关注mysql的binglog开启、用户授权以及flink的运行模式须采用yarn-session,standalone模式是不行的,因为依赖于hadoop存储,需要读一些hadoop集群的配置信息。