Format
Flink 提供了一套与表连接器(table connector)一起使用的表格式(table format)。表格式是一种存储格式,定义了如何把二进制数据映射到表的列上。
Flink 支持以下格式:
Formats | Supported Connectors |
---|---|
CSV | Apache Kafka, Upsert Kafka, Amazon Kinesis Data Streams, Filesystem |
JSON | Apache Kafka, Upsert Kafka, Amazon Kinesis Data Streams, Filesystem, Elasticsearch |
Apache Avro | Apache Kafka, Upsert Kafka, Amazon Kinesis Data Streams, Filesystem |
Confluent Avro | Apache Kafka, Upsert Kafka |
Debezium CDC | Apache Kafka, Filesystem |
Canal CDC | Apache Kafka, Filesystem |
Maxwell CDC | Apache Kafka, Filesystem |
Apache Parquet | Filesystem |
Apache ORC | Filesystem |
Raw | Apache Kafka, Upsert Kafka, Amazon Kinesis Data Streams, Filesystem |
CSV Format
更详细信息参考 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/csv/
允许我们基于 CSV schema 进行解析和生成 CSV 数据。 目前 CSV schema 是基于 table schema 推断而来的。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.13.0</version>
</dependency>
Format Options
选项 | 是否必选 | 默认值 | 类型 | 描述 |
---|---|---|---|---|
format | 必选 | (none) | String | 声明使用的格式,这里应为 'csv'。 |
csv.field-delimiter | 可选 | , | String | 字段分隔符(默认 ','),必须为单字符。 <br />可以使用反斜杠字符指定一些特殊字符,例如 '\t' 代表制表符。<br />可以通过 unicode 编码在纯 SQL 文本中指定一些特殊字符,例如 'csv.field-delimiter' = U&'\0001' 代表 0x01 字符。 |
csv.disable-quote-character | 可选 | false | Boolean | 是否禁止对引用的值使用引号(默认是 false)。<br />如果禁止,选项 'csv.quote-character' 不能设置。 |
csv.quote-character | 可选 | " | String | 用于围住字段值的引号字符(默认"). |
csv.ignore-parse-errors | 可选 | false | Boolean | 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。<br />如果忽略字段的解析异常,则会将该字段值设置为null。 |
csv.array-element-delimiter | 可选 | ; | String | 分隔数组和行元素的字符串(默认 ';')。 |
csv.null-literal | 可选 | (none) | String | 是否将 "null" 字符串转化为 null 值。 |
Json Format
更详细信息参考 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/json/
能读写 JSON 格式的数据。当前,JSON schema 是从 table schema 中自动推导而得的。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.13.0</version>
</dependency>
Format Options
选项 | 是否必选 | 默认值 | 类型 | 描述 |
---|---|---|---|---|
format | 必选 | (none) | String | 声明使用的格式,这里应为 'json'。 |
json.fail-on-missing-field | 可选 | false | Boolean | 当解析字段缺失时,是跳过当前字段或行,<br />还是抛出错误失败(默认为 false,即抛出错误失败)。 |
json.ignore-parse-errors | 可选 | false | Boolean | 当解析异常时,是跳过当前字段或行,<br />还是抛出错误失败(默认为 false,即抛出错误失败)。 <br />如果忽略字段的解析异常,则会将该字段值设置为null。 |
Avro Format
更详细信息参考 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro/
允许基于 Avro schema 读取和写入 Avro 数据。目前,Avro schema 从 table schema 推导而来。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-avro</artifactId>
<version>1.13.0</version>
</dependency>
Format Options
参数 | 是否必选 | 默认值 | 类型 | 描述 |
---|---|---|---|---|
format | 必选 | (none) | String | 声明使用的格式,这里应为'avro'。 |
avro.codec | 可选 | (none) | String | 仅用于 FileSystem Connector,avro 压缩编解码器。默认不压缩。目前支持:deflate、snappy、bzip2、xz。 |
Confluent Avro Format
Avro Schema Registry(avro-confluent)格式能读取被 io.confluent.kafka.serializers.KafkaAvroSerializer 序列化的记录,以及可以写入成能被 io.confluent.kafka.serializers.KafkaAvroDeserializer 反序列化的记录。
当以这种格式读取(反序列化)记录时,将根据记录中编码的 schema version id 从配置的 Confluent Schema Registry 中获取 Avro Schema。
当以这种格式写入(序列化)记录时,Avro schema 是从 Table schema 中推断出来的,并会用来检索要与数据一起编码的 schema id。
Avro Schema Registry 格式只能与 Apache Kafka SQL 连接器或 Upsert Kafka SQL 连接器一起使用。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-confluent-registry</artifactId>
<version>1.13.0</version>
</dependency>
Format Options
参数 | 是否必选 | 默认值 | 类型 | 描述 |
---|---|---|---|---|
format | 必选 | (none) | String | 声明使用的格式,这里应为 'avro-confluent'。 |
avro-confluent.schema-registry.url | 必选 | (none) | String | 用于获取/注册 schemas 的 Confluent Schema Registry 的URL。 |
Debezium Format
更详细信息参考 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/debezium/
Debezium 是一个 CDC 工具(Changelog Data Capture,变更数据捕获),可以把来自 MySQL、PostgreSQL、Oracle、Microsoft SQL Server 和许多其他数据库的变更实时流式地传输到 Kafka 中。Debezium 为变更日志提供了统一的格式结构,并支持使用 JSON 和 Apache Avro 序列化消息。
Flink 支持将 Debezium JSON 和 Avro 消息解析为 INSERT / UPDATE / DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常的有用,例如
- 将增量数据从数据库同步到其他系统
- 日志审计
- 数据库的实时物化视图(包括一个查询结果的数据库对象,类似 Snapshot)
- 关联维度数据库的变更历史。
Flink 还支持将 Flink SQL 中的 INSERT / UPDATE / DELETE 消息编码为 Debezium 格式的 JSON 或 Avro 消息,输出到 Kafka 等存储中。 需要注意的是,目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息。因此,Flink 将 UPDATE_BEFORE 和 UPDATE_AFTER 分别编码为 DELETE 和 INSERT 类型的 Debezium 消息。
<!-- Debezium Avro -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-confluent-registry</artifactId>
<version>1.13.0</version>
</dependency>
<!-- Debezium Json -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.13.0</version>
</dependency>
Format Options
Debezium Avro
参数 | 是否必选 | 默认值 | 类型 | 描述 |
---|---|---|---|---|
format | 必选 | (none) | String | 声明使用的格式,这里应为 'debezium-avro-confluent'。 |
debezium-avro-confluent.schema-registry.url | 必选 | (none) | String | 用于获取/注册 schemas 的 Confluent Schema Registry 的 URL。 |
Debezium Json
参数 | 是否必选 | 默认值 | 类型 | 描述 |
---|---|---|---|---|
format | 必选 | (none) | String | 声明使用的格式,这里应为 'debezium-json'。 |
debezium-json.schema-include | 可选 | false | Boolean | 设置 Debezium Kafka Connect 时,用户可以启用 Kafka 配置 'value.converter.schemas.enable' 以在消息中包含 schema。<br />此选项表明 Debezium JSON 消息是否包含 schema。 |
debezium-json.ignore-parse-errors | 可选 | false | Boolean | 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。<br />如果忽略字段的解析异常,则会将该字段值设置为null。 |
Metadata Column
以下 Format 元数据可以在表定义中作为只读(VIRTUAL)列使用。
键值 | 数据类型 | 描述 |
---|---|---|
ingestion-timestamp | TIMESTAMP_LTZ(3) NULL | Connector 处理事件的时间。<br />对应 Debezium 记录的 ts_ms 字段 |
source.timestamp | TIMESTAMP_LTZ(3) NULL | Source 系统创建事件的时间。<br />对应 Debezium 记录的source.ts_ms 字段 |
source.database | STRING NULL | 原始数据库。<br />对应 Debezium 记录的 source.db 字段 |
source.table | STRING NULL | 原始数据库表。<br />对应 Debezium 记录的 source.table 或 source.collection 字段 |
Parquet Format
更详细信息参考 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/parquet/
允许读写 Parquet 数据。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_2.11</artifactId>
<version>1.13.0</version>
</dependency>
Create SQL
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
dt STRING
) PARTITIONED BY (dt) WITH (
'connector' = 'filesystem',
'path' = '/tmp/user_behavior',
'format' = 'parquet'
)
Orc Format
更详细信息参考 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/orc/
允许读写 Orc 数据。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-orc_2.11</artifactId>
<version>1.13.0</version>
</dependency>
Create SQL
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
dt STRING
) PARTITIONED BY (dt) WITH (
'connector' = 'filesystem',
'path' = '/tmp/user_behavior',
'format' = 'orc'
)
Kafka Connector
更详细信息参考 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/kafka/
Source:Unbounded
Sink:Streaming Append
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.13.0</version>
</dependency>
Create SQL
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`partition` BIGINT METADATA VIRTUAL,
`offset` BIGINT METADATA VIRTUAL,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)
Metadata Column
Key | 数据类型 | 描述 | R/W |
---|---|---|---|
topic | STRING NOT NULL | Topic name of the Kafka record. | R |
partition | INT NOT NULL | Partition ID of the Kafka record. | R |
headers | MAP NOT NULL | Headers of the Kafka record as a map of raw bytes. | R/W |
leader-epoch | INT NULL | Leader epoch of the Kafka record if available. | R |
offset | BIGINT NOT NULL | Offset of the Kafka record in the partition. | R |
timestamp | TIMESTAMP_LTZ(3) NOT NULL | Timestamp of the Kafka record. | R/W |
timestamp-type | STRING NOT NULL | Timestamp type of the Kafka record. Either "NoTimestampType", "CreateTime" (also set when writing metadata), or "LogAppendTime". | R |
Connector Options
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 必选 | (无) | String | 指定使用的连接器,Kafka 连接器使用 'kafka'。 |
topic | Source 可选Sink 必选 | (无) | String | 当表用作 source 时读取数据的 topic 名。<br />支持用分号间隔的 topic 列表,如 'topic-1;topic-2'。<br />对 source 表而言,'topic' 和 'topic-pattern' 两个选项只能使用其中一个。 <br />当表被用作 sink 时,该配置表示写入的 topic 名。<br />sink 表不支持 topic 列表。 |
topic-pattern | 可选 | (无) | String | 匹配读取 topic 名称的正则表达式。<br />在作业开始运行时,所有匹配该正则表达式的 topic 都将被 Kafka consumer 订阅。<br />对 source 表而言,'topic' 和 'topic-pattern' 两个选项只能使用其中一个。 |
properties.bootstrap.servers | 必选 | (无) | String | 逗号分隔的 Kafka broker 列表。 |
properties.group.id | Source 必选 | (无) | String | Kafka source 的 consumer 组 id<br />对于 Kafka sink 可选填。 |
properties.* | 可选 | (无) | String | 可以设置和传递任意 Kafka 的配置项。 <br />后缀必须匹配在 Kafka 配置文档 中定义的配置键。Flink 将移除 "properties." 前缀并将变换后的配置键和值传入底层的 Kafka 客户端。 <br />例如,可以通过 'properties.allow.auto.create.topics' = 'false' 来禁用 topic 的自动创建。但是某些配置项不支持进行配置,因为 Flink 会覆盖这些配置,例如 'key.deserializer' 和 'value.deserializer'。 |
format | 必选 | (无) | String | 用来序列化或反序列化 Kafka 消息的格式。 该配置项和 'value.format' 二者必需其一。 |
key.format | 可选 | (无) | String | 用来序列化和反序列化 Kafka 消息键(Key)的格式。 如果定义了键格式,则配置项 'key.fields' 也是必需的。 否则 Kafka 记录将使用空值作为键。 |
key.fields | 可选 | [] | List<String> | 表结构中用来配置消息键(Key)格式数据类型的字段列表。默认情况下该列表为空。 列表格式为 'field1;field2'。 |
value.format | 必选 | (无) | String | 用来序列化和反序列化 Kafka 消息体(Value)的格式。 该配置项和 'format' 二者必需其一。 |
value.fields-include | 可选 | ALL | 可选值:[ALL, EXCEPT_KEY] | 定义消息体(Value)格式如何处理消息键(Key)字段的策略。默认情况下,表结构中 'ALL' 即所有的字段都会包含在消息体格式中,即消息键字段在消息键和消息体格式中都会出现。 |
scan.startup.mode | 可选 | group-offsets | String | Kafka consumer 的启动模式。<br />有效值为:'earliest-offset','latest-offset','group-offsets','timestamp' 和 'specific-offsets'。 |
scan.startup.specific-offsets | 可选 | (无) | String | 在使用 'specific-offsets' 启动模式时为每个 partition 指定 offset,例如 'partition:0,offset:42;partition:1,offset:300'。 |
scan.startup.timestamp-millis | 可选 | (无) | Long | 在使用 'timestamp' 启动模式时指定启动的时间戳(单位毫秒)。 |
scan.topic-partition-discovery.interval | 可选 | (无) | Duration | Consumer 定期探测动态创建的 Kafka topic 和 partition 的时间间隔。 |
sink.partitioner | 可选 | 'default' | String | Flink partition 到 Kafka partition 的分区映射关系,可选值有:- default:使用 Kafka 默认的分区器对消息进行分区。- fixed:每个 Flink partition 最终对应最多一个 Kafka partition。- round-robin:Flink partition 按轮循(round-robin)的模式对应到 Kafka partition。只有当未指定消息的消息键时生效。- 自定义 FlinkKafkaPartitioner 的子类:例如 'org.mycompany.MyPartitioner'。 |
sink.semantic | 可选 | at-least-once | String | 定义 Kafka sink 的语义。有效值为 'at-least-once','exactly-once' 和 'none'。请阅读下面介绍的一致性保障。 |
sink.parallelism | 可选 | (无) | Integer | 定义 Kafka sink 算子的并行度。默认情况下,并行度由框架定义为与上游串联的算子相同。 |
特性
Key and Value Format
Kafka 消息的 Key 和 Value 部分都可以使用某种 Format 来序列化或反序列化成二进制数据。
Kafka 消息中 Key 是可选的,'format' 选项与 'value.format' 意义相同。以下语句将使用 Value Format 读取和写入消息。
CREATE TABLE KafkaTable (,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp',
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
'connector' = 'kafka',
...
'format' = 'json',
'json.ignore-parse-errors' = 'true'
)
Record Value 将配置为以下的数据类型:
ROW<`user_id` BIGINT, `item_id` BIGINT, `behavior` STRING>
以下示例展示了如何配置 Key Format 和 Value Format。使用 'key' 或 'value' 作为 Option 参数的前缀。
CREATE TABLE KafkaTable (
`ts` TIMESTAMP(3) METADATA FROM 'timestamp',
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
'connector' = 'kafka',
...
'key.format' = 'json',
'key.json.ignore-parse-errors' = 'true',
'key.fields' = 'user_id;item_id',
'value.format' = 'json',
'value.json.fail-on-missing-field' = 'false',
'value.fields-include' = 'ALL'
)
Key 包含了在 'key.fields' 中列出的字段(使用 ';' 分隔)和字段顺序。 将配置为以下的数据类型:
ROW<`user_id` BIGINT, `item_id` BIGINT>
Value 将配置为以下的数据类型。由于配置为 'value.fields-include' = 'ALL',所以 Key 字段也会出现在 Value 的数据类型中:
ROW<`user_id` BIGINT, `item_id` BIGINT, `behavior` STRING>
重名字段
如果 Key 和 Value 有重名字段,Connector 无法根据表结构信息将这些列区分开。
'key.fields-prefix' 配置项可以在表结构中为消息键字段指定一个前缀,进行区分。以下示例展示了在消息键和消息体中同时包含 version 字段的情况:
CREATE TABLE KafkaTable (
`k_version` INT,
`k_user_id` BIGINT,
`k_item_id` BIGINT,
`version` INT,
`behavior` STRING
) WITH (
'connector' = 'kafka',
...
'key.format' = 'json',
'key.fields-prefix' = 'k_',
'key.fields' = 'k_version;k_user_id;k_item_id',
'value.format' = 'json',
'value.fields-include' = 'EXCEPT_KEY'
)
'value.fields-include' 需要配置为 'EXCEPT_KEY',排除属于 Key 的字段
key format:
ROW<`version` INT, `user_id` BIGINT, `item_id` BIGINT>
value format:
ROW<`version` INT, `behavior` STRING>
Topic 和 Partition 发现
topic 和 topic-pattern 配置项决定了 Source 消费的 topic(或 topic 的匹配规则)。topic 配置项可接受使用分号间隔的 topic 列表,例如 'topic-1;topic-2'
。topic-pattern 配置项使用正则表达式来探测匹配的 topic,例如 'topic-pattern' 设置为 'test-topic-[0-9]'
,则在作业启动时,所有匹配该正则表达式的 topic 都将被 consumer 订阅。
为允许 Consumer 在作业启动之后探测到动态创建的 topic,请将 'scan.topic-partition-discovery.interval' 配置为一个非负值。这将使 Consumer 能够探测匹配名称规则的新的 topic 的 partition。
请参阅 Kafka DataStream 连接器文档 以获取更多关于 topic 和 partition 探测的信息。
起始消费位置
scan.startup.mode
配置项决定了 Kafka consumer 的启动模式。可选值为:
- group-offsets:从 topic 中记录的 group 上次消费的位置开始消费。默认行为。
- earliest-offset:从最早 offset 开始。
- latest-offset:从最新 offset 开始。
- timestamp:从用户为每个 partition 指定的时间戳开始。
- specific-offsets:从用户为每个 partition 指定的偏移量开始。
如果使用了 'timestamp',必须使用另外一个配置项 scan.startup.timestamp-millis
来指定一个毫秒单位时间戳作为起始时间。
如果使用了 'specific-offsets',必须使用另外一个配置项 scan.startup.specific-offsets
来为每个 partition 指定起始偏移量, 例如,选项值 'partition:0,offset:42;partition:1,offset:300'
表示 partition 0 从偏移量 42 开始,partition 1 从偏移量 300 开始。
CDC Source
Flink 原生支持使用 Kafka 作为 CDC Source。如果 Kafka topic 中的消息是通过 CDC 工具从数据库捕获的变更事件,则可以使用 CDC 格式将消息解x析为 Flink SQL 系统中的插入(INSERT)、更新(UPDATE)、删除(DELETE)消息。
Flink 提供了几种 CDC format:
- debezium
- canal
- maxwell
Sink 分区
配置项 sink.partitioner
指定了从 Flink 分区到 Kafka 分区的映射关系。 默认情况下,Flink 使用 Kafka 默认分区器(org.apache.kafka.clients.producer.internals.DefaultPartitioner)来对消息分区。
默认分区器对没有消息键的消息使用粘性分区策略(sticky partition strategy,保证分配尽可能平衡) 进行分区,对含有消息键的消息使用哈希算法 murmur2 计算分区。
'fixed' 分区器会将同一个 Flink 分区中的消息写入同一个 Kafka 分区,从而减少网络连接的开销。
为了控制数据行到分区的路由,也可以自定义的 Sink 分区器(实现 FlinkKafkaPartitioner)。
一致性保证
默认情况下,如果查询在启用 checkpoint 模式下执行时,Kafka Sink 按照至少一次(at-lease-once)语义保证将数据写入到 Kafka Topic 中。
当 Flink checkpoint 启用时,Kafka Connector 可以提供精确一次(exactly-once)的语义保证。
除了启用 Flink checkpoint,还可以通过配置 sink.semantic
选项来选择三种不同的运行模式:
- none:Flink 不保证任何语义。已经写出的记录可能会丢失或重复。
- at-least-once(默认设置):保证没有记录会丢失(但可能会重复)。
- exactly-once:使用 Kafka 事务提供精确一次(exactly-once)语义。当使用事务向 Kafka 写入数据时,请将所有从 Kafka 中消费记录的应用中的
isolation.level
配置项设置成read_committed
或read_uncommitted
(默认值)。
请参阅 Kafka 文档 以获取更多关于语义保证的信息。
Source Per-Partition Watermark
Flink 对于 Kafka 支持发送每个分区的 watermark。Watermark 在 Kafka consumer 中生成。
合并每个分区的 Watermark 的方式与 streaming shuffle 时合并 Watermark 的方式一致。Source 输出的 watermark 由读取的分区中最小的 watermark 决定。如果 topic 中的某些分区闲置,watermark 生成器将不会向前推进。可以在表配置中设置 table.exec.source.idle-timeout
选项(set table.exec.source.idle-timeout='10000ms'
)来避免上述问题。
请参阅 Kafka watermark 策略 以获取更多细节。
Upsert Kafka Connector
更详细信息参考 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/
Source:Unbounded
Sink:Streaming Upsert
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.13.0</version>
</dependency>
Upsert Kafka 连接器支持以 upsert 方式从 Kafka topic 中读取数据并将数据写入 Kafka topic。
作为 Source,upsert-kafka connector 产生 changelog 流,其中每条数据记录代表一个更新或删除事件。更准确地说,数据记录中的 value 被解释为同一 key 的最后一个 value 的 UPDATE,如果有这个 key(如果不存在相应的 key,则该更新被视为 INSERT)。
用表来类比,changelog 流中的数据记录被解释为 UPSERT,也称为 INSERT/UPDATE,因为任何具有相同 key 的现有行都被覆盖。另外,value 为空的消息将会被视作为 DELETE 消息。
作为 Sink,upsert-kafka connector 消费 changelog 流。会将 INSERT/UPDATE_AFTER 数据作为正常的 Kafka 消息写入,并将 DELETE 数据以 value 为空的 Kafka 消息写入(表示对应 key 的消息被删除)。Flink 将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。
Create SQL
确保在DDL中定义主键
CREATE TABLE pageviews_per_region (
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'pageviews_per_region',
'properties.bootstrap.servers' = '...',
'key.format' = 'avro',
'value.format' = 'avro'
);
CREATE TABLE pageviews (
user_id BIGINT,
page_id BIGINT,
viewtime TIMESTAMP,
user_region STRING,
WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'pageviews',
'properties.bootstrap.servers' = '...',
'format' = 'json'
);
-- 计算 pv、uv 并插入到 upsert-kafka sink
INSERT INTO pageviews_per_region
SELECT
user_region,
COUNT(*),
COUNT(DISTINCT user_id)
FROM pageviews
GROUP BY user_region;
Metadata Column
与常规 kafka connector 相同
Connector Options
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 必选 | (none) | String | 指定使用的连接器,Upsert Kafka 连接器使用:'upsert-kafka'。 |
topic | 必选 | (none) | String | 用于读取和写入的 Kafka topic 名称。 |
properties.bootstrap.servers | 必选 | (none) | String | 以逗号分隔的 Kafka brokers 列表。 |
key.format | 必选 | (none) | String | 用于对 Kafka 消息中 key 部分序列化和反序列化的格式。 key 字段由 PRIMARY KEY 语法指定。 |
sink.buffer-flush.max-rows | 可选 | 0 | Integer | 缓存刷新前,最多能缓存多少条记录。 当 sink 收到很多同 key 上的更新时,缓存将保留同 key 的最后一条记录,因此 sink 缓存能帮助减少发往 Kafka topic 的数据量,以及避免发送潜在的 tombstone 消息(value 为 null 的消息)。 可以通过设置为 '0' 来禁用它。默认,该选项是未开启的。注意,如果要开启 sink 缓存,需要同时设置 'sink.buffer-flush.max-rows' 和 'sink.buffer-flush.interval' 两个选项为大于零的值。 |
sink.buffer-flush.interval | 可选 | 0 | Duration | 缓存刷新的间隔时间,超过该时间后异步线程将刷新缓存数据。 |
特性
Key and Value Formats
可以参考普通 Kafka Connector 的特性。但是,Upsert-kafka connector 需要 Key 和 Value format,其中 Key 字段派生自主键约束。
CREATE TABLE KafkaTable (
`ts` TIMESTAMP(3) METADATA FROM 'timestamp',
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
...
'key.format' = 'json',
'key.json.ignore-parse-errors' = 'true',
'value.format' = 'json',
'value.json.fail-on-missing-field' = 'false',
'value.fields-include' = 'EXCEPT_KEY'
)
主键约束
Upsert Kafka 始终以 upsert 方式工作,需要在 DDL 中定义主键。在具有相同主键值的消息按序存储在同一个分区的前提下,在 changlog source 定义主键意味着在物化后的 changelog 上主键具有唯一性。定义的主键将决定哪些字段出现在 Kafka 消息的 key 中。
一致性保证
默认情况下,如果启用 checkpoint,Upsert Kafka sink 会保证至少一次将数据插入 Kafka topic。
这意味着,Flink 可以将具有相同 key 的重复记录写入 Kafka topic。但由于该连接器以 upsert 的模式工作,该连接器作为 source 读入时,可以确保具有相同主键值下仅最后一条消息会生效。因此,upsert-kafka 连接器可以像 HBase sink 一样实现幂等写入。
Source Per-Partition Watermarks
该特性与常规 Kafka Connector 相同