1. 通过Flink Sql 将mysql 的数据同步到Mysql 中
套路
官网示例:
// create a TableEnvironment for batch or streaming execution
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// create an input Table
tableEnv.executeSql("CREATE TEMPORARY TABLE table1 ... WITH ( 'connector' = ... )");
// register an output Table
tableEnv.executeSql("CREATE TEMPORARY TABLE outputTable ... WITH ( 'connector' = ... )");
// create a Table object from a Table API query
Table table2 = tableEnv.from("table1").select(...);
// create a Table object from a SQL query
Table table3 = tableEnv.sqlQuery("SELECT ... FROM table1 ... ");
// emit a Table API result Table to a TableSink, same for SQL result
TableResult tableResult = table2.executeInsert("outputTable");
tableResult...
总结为:
准备环境 ----> 准备源表 -----> 准备目标表
2.我的实现:
package com.wudl.flink.examples;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @ClassName : FlinkSqlMysqlToMySql
* @Description : Flink sql-mysql
* @Author :wudl
* @Date: 2021-08-24 23:28
*/
public class FlinkSqlMysqlToMySql {
public static void main(String[] args) {
String driverClass = "com.mysql.jdbc.Driver";
String dbUrl = "jdbc:mysql://192.168.1.180:3306/MyFlink";
String userNmae = "root";
String passWord = "123456";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEvn = StreamTableEnvironment .create(env);
//1. 指定方言
tableEvn.getConfig().setSqlDialect(SqlDialect.DEFAULT);
String schema = "id INT ,name STRING";
String source_table = "student";
String sink_table = "SinkStudent";
String flink_source_table = "mysource_student";
String flink_sink_table = "SinkStudent";
String base_sql = "CREATE TABLE IF NOT EXISTS %s (%s) " +
"WITH (" +
"'connector' = 'jdbc'," +
"'url' = '" + dbUrl + "'," +
"'table-name' = '%s'," +
" 'username' = '" + userNmae + "'," +
" 'password' = '" + passWord + "'" +
" )";
String source_ddl = String.format(base_sql, flink_source_table, schema, source_table);
String sink_ddl = String.format(base_sql, flink_sink_table, schema, sink_table);
tableEvn.executeSql(source_ddl);
tableEvn.executeSql(sink_ddl);
String insertsql = String.format("insert into %s select * from %s", flink_sink_table, flink_source_table);
tableEvn.executeSql(insertsql);
System.out.println("数据写入mysql成功");
}
}