public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql("CREATE TABLE WaterSensor (" +
"id STRING," +
"ts BIGINT," +
"vc BIGINT," +
// "`pt` TIMESTAMP(3),"+
// "WATERMARK FOR pt AS pt - INTERVAL '10' SECOND" +
"pt as PROCTIME() " +
") WITH (" +
"'connector' = 'kafka'," +
"'topic' = 'kafka_data_waterSensor'," +
"'properties.bootstrap.servers' = '127.0.0.1:9092'," +
"'properties.group.id' = 'test'," +
"'scan.startup.mode' = 'earliest-offset'," +
// "'json.fail-on-missing-field' = 'false'," +
// "'json.ignore-parse-errors' = 'true'," +
"'format' = 'json'" +
")"
);
tableEnv.executeSql("CREATE TABLE flinksink (" +
"componentname STRING," +
"componentcount BIGINT NOT NULL," +
"componentsum BIGINT" +
") WITH (" +
"'connector.type' = 'jdbc'," +
"'connector.url' = 'jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai'," +
"'connector.table' = 'flinksink'," +
"'connector.driver' = 'com.mysql.cj.jdbc.Driver'," +
"'connector.username' = 'root'," +
"'connector.password' = 'root'," +
"'connector.write.flush.max-rows'='3'\r\n" +
")"
);
Table result = tableEnv.sqlQuery(
"SELECT " +
"id as componentname, " + //window_start, window_end,
"COUNT(ts) as componentcount ,SUM(ts) as componentsum " +
"FROM TABLE( " +
"TUMBLE( TABLE WaterSensor , " +
"DESCRIPTOR(pt), " +
"INTERVAL '10' SECOND)) " +
"GROUP BY id , window_start, window_end"
);
// //方式一:写入数据库
//// result.executeInsert("flinksink").print(); //;.insertInto("flinksink");
//
//方式二:写入数据库
tableEnv.createTemporaryView("ResultTable", result);
tableEnv.executeSql("insert into flinksink SELECT * FROM ResultTable").print();
env.execute();
}
(7)FlinkSQL将kafka数据写入到mysql方式二
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
推荐阅读更多精彩内容
- 这里不展开zookeeper、kafka安装配置(1)首先需要启动zookeeper和kafka (2)定义一个k...
- 本章节主要演示从socket接收数据,通过滚动窗口每30秒运算一次窗口数据,然后将结果写入Mysql数据库 (1)...
- 本次使用版本kafka_2.12-2.70Apache-Flink 1.12Debezium 1.3环境均为本地启...