1.根据网上文章,客户端使用flink1.11.4+iceberg-flink-runtime-0.11.1.jar (iceberg0.12新出,使用即报错)版本可正常操作。flink1.12.5 与flink1.13.2 都尝试过,皆报错(可能由于本人原因,尚未排查出错误原因)。
2.代码端 flink cdc使用1.13.2 或者1.12.5 版本皆可,但pom配置某些包需降成1.11.1 不然会报缺包等错误。本次操作为使用flinkcdc(flink-connector-mysql-cdc 2.0.0 jar)与flink 13.2 结合,实时监控mysqlbinlog日志(需提前开启binlog日志功能,此处可自行百度,修改mysql配置文件即可),入库iceberg。此代码很多版本问题,版本不一致会出现各种错误,下面会本人使用pom文件和代码,亲测有效
3.pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0
<groupId>org.example
<artifactId>flink-mysql-iceberg
<packaging>pom
<version>1.0-SNAPSHOT
<module>spark-hudi
<java.version>1.8
<maven.compiler.source>1.8
<maven.compiler.target>1.8
<spark.version>3.0.0
<flink.version>1.12.5
<scala.version>2.12
<hadoop.version>3.1.3
<groupId>org.slf4j
<artifactId>slf4j-simple
<version>1.7.25
<groupId>org.projectlombok
<artifactId>lombok
<version>1.18.2
<groupId>org.apache.flink
<artifactId>flink-streaming-java_2.11
<version>${flink.version}
<groupId>org.apache.flink
<artifactId>flink-json
<version>${flink.version}
<groupId>org.apache.flink
<artifactId>flink-table-common
<version>1.11.1
<groupId>org.apache.flink
<artifactId>flink-table-api-java
<version>1.11.1
<!-- 最新加入 -->
<groupId>org.apache.flink
<artifactId>flink-table-api-java-bridge_2.11
<version>1.11.1
<groupId>org.apache.flink
<artifactId>flink-table-runtime-blink_2.11
<version>1.11.1
<groupId>com.alibaba
<artifactId>fastjson
<version>1.2.67
<groupId>org.apache.flink
<artifactId>flink-connector-jdbc_${scala.version}
<version>${flink.version}
<groupId>org.apache.flink
<artifactId>flink-clients_2.11
<version>${flink.version}
<!--后面追加的jar包-->
<groupId>com.ververica
<artifactId>flink-connector-mysql-cdc
<version>2.0.0
<groupId>org.apache.hadoop
<artifactId>hadoop-common
<version>2.7.7
<scope>compile
<groupId>org.apache.hadoop
<artifactId>hadoop-hdfs
<version>2.7.7
<groupId>org.apache.iceberg
<artifactId>iceberg-flink-runtime
<version>0.11.0
<groupId>org.apache.iceberg
<artifactId>iceberg-flink
<version>0.11.0
<scope>provided
<artifactId>slf4j-api
<groupId>org.slf4j
<groupId>org.apache.flink
<artifactId>flink-table-planner-blink_2.11
<version>${flink.version}
<scope>compile
<artifactId>slf4j-api
<groupId>org.slf4j
<groupId>org.apache.flink
<artifactId>flink-table-planner_2.11
<version>${flink.version}
<scope>compile
<artifactId>slf4j-api
<groupId>org.slf4j
<groupId>org.apache.flink
<artifactId>flink-table
<version>${flink.version}
<type>pom
<scope>provided
<groupId>mysql
<artifactId>mysql-connector-java
<version>8.0.16
<scope>${scope}
<groupId>org.apache.hadoop
<artifactId>hadoop-mapreduce-client-core
<version>3.1.3
<!--新加入spark jar包-->
<groupId>org.apache.maven.plugins
<artifactId>maven-assembly-plugin
<version>3.0.0
<descriptorRef>jar-with-dependencies
<id>make-assembly
<phase>package
<goal>single
</project>
5.代码:
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.TypeTransformations;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.*;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.types.Types;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
public class rowData2iceberg {
private static final StringHADOOP_CATALOG ="iceberg_hadoop_catalog";
//定义iceberg schema
private static final SchemaSCHEMA =
new Schema(
Types.NestedField.optional(1, "id", Types.IntegerType.get()),
Types.NestedField.optional(2, "image", Types.BinaryType.get())
//Types.NestedField.optional(3, "age", Types.IntegerType.get()),
//Types.NestedField.optional(4, "address", Types.StringType.get()),
//Types.NestedField.optional(5, "score1", Types.IntegerType.get()));
// Types.NestedField.optional(6, "school", Types.StringType.get());
// Types.NestedField.optional(7, "class", Types.StringType.get())
);
public static void main(String[] args)throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
System.setProperty("HADOOP_USER_NAME","daizhihao");
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointInterval(60 *1000L);
checkpointConfig.setMinPauseBetweenCheckpoints(60 *1000L);
checkpointConfig.setTolerableCheckpointFailureNumber(10);
checkpointConfig.setCheckpointTimeout(12 *1000L);
checkpointConfig.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//定义mysql的监控字段
TableSchema schema =
TableSchema.builder()
.add(TableColumn.of("id", DataTypes.INT()))
.add(TableColumn.of("image", DataTypes.BINARY(2)))
// .add(TableColumn.of("age", DataTypes.INT()))
// .add(TableColumn.of("address", DataTypes.STRING()))
// .add(TableColumn.of("score1", DataTypes.INT()))
//.add(TableColumn.of("school", DataTypes.STRING()))
//.add(TableColumn.of("class", DataTypes.STRING()))
.build();
RowType rowType = (RowType) schema.toRowDataType().getLogicalType();
DebeziumDeserializationSchema deserialer =
new RowDataDebeziumDeserializeSchema(
rowType,
createTypeInfo(schema.toRowDataType()),
(rowData, rowKind) -> {},
ZoneId.of("Asia/Shanghai"));
SourceFunction sourceFunction = MySqlSource.builder()
.hostname("localhost")
.serverTimeZone("UTC")
.port(3306)
.databaseList("demo")// monitor all tables under inventory database
.tableList("demo.picture")
.username("root")
.password("root")
// .hostname("Tgz3-eip-gzjfzxgputest1")
// .port(23308)
// .databaseList("eip_cs")
.deserializer(deserialer)
// .deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String
.build();
DataStreamSource src = env.addSource(sourceFunction);
//设置Checkpoint的模式:精准一次
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
icebergSink_hadoop(src);
env
//.addSource(sourceFunction)
//.print()
.setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute();
}
private static TypeInformationcreateTypeInfo(DataType producedDataType) {
final DataType internalDataType =
DataTypeUtils.transform(producedDataType, TypeTransformations.TO_INTERNAL_CLASS);
return (TypeInformation)
TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(internalDataType);
}
private static void icebergSink_hadoop(DataStream src) {
Map properties =new HashMap<>();
properties.put("type", "iceberg");
properties.put("catalog-type", "hadoop");
properties.put("property-version", "1");
properties.put("warehouse", "hdfs://192.168.163.101:9000/user/hive/warehouse");
CatalogLoader catalogLoader =
CatalogLoader.hadoop(HADOOP_CATALOG, new Configuration(), properties);
icebergSink(src, catalogLoader);
}
private static void icebergSink(DataStream input, CatalogLoader loader) {
Catalog catalog = loader.loadCatalog();
//iceberg 命名
TableIdentifier identifier =
TableIdentifier.of(Namespace.of("iceberg_db"), "image5");
Table table;
if (catalog.tableExists(identifier)) {
table = catalog.loadTable(identifier);
}else {
table =
catalog.buildTable(identifier, SCHEMA)
.withPartitionSpec(PartitionSpec.unpartitioned())
.create();
}
// need to upgrade version to 2,otherwise 'java.lang.IllegalArgumentException: Cannot write
// delete files in a v1 table'
TableOperations operations = ((BaseTable) table).operations();
TableMetadata metadata = operations.current();
operations.commit(metadata, metadata.upgradeToFormatVersion(2));
TableLoader tableLoader = TableLoader.fromCatalog(loader, identifier);
FlinkSink.forRowData(input)
.table(table)
.tableLoader(tableLoader)
.equalityFieldColumns(Arrays.asList("id"))
.writeParallelism(1)
.build();
}
}
5.1代码主要修改点:
1.
2.
3.
4.
5.
6.flink11.1客户端查看入库信息。
6.1启动flink 集群 :./start-cluster.sh。同时需自行启动hdfs集群。
6.2 查看当前进程,若出现StandaloneSessionClusterEntrypoint,TaskManagerRunner即代表成功。
6.3 进去客户端后,创建hadoop_catalog(注意:本人当使用hive_catalog时,插入数据时会报错。尚不清楚什么原因)
CREATE CATALOG hadoop_catalog WITH (
'type'='iceberg',
'catalog-type'='hadoop',
'warehouse'='hdfs://192.168.163.101:9000/user/hive/warehouse',
'property-version'='1'
);
6.4 使用hadoop_catalog及使用iceberg_db数据库(若不存在,个人创建一个 create database iceberg_db)
6.5 查看表数据,若发现表中有数据,及监控日志成功。
select * from image5.
本人参考以上链接,综合个人版本修改,得出以上结果
https://www.136.la/jingpin/show-126501.html