MongoDB CDC 连接器允许从 MongoDB 读取快照数据和增量数据。
依赖项
为了设置 MongoDB CDC 连接器,下表提供了使用构建自动化工具(例如 Maven 或 SBT)和带有 SQL JAR 包的 SQL 客户端的两个项目的依赖关系信息。
Maven
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mongodb-cdc</artifactId>
<!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. -->
<version>2.3-SNAPSHOT</version>
</dependency>
SQL Client JAR
下载或者自己根据源码来获取 flink-sql-connector-mongodb-cdc-2.3-SNAPSHOT.jar,放到<FLINK_HOME>/lib/ 下
设置 MongoDB
MongoDB版本
MongoDB 版本 >= 3.6
我们使用变更流功能 change streams (3.6 版中的新功能)来捕获变更数据。
原理请看:
https://www.mongodb.com/docs/manual/changeStreams/
集群部署
需要副本集 replica sets 或 分片集群 sharded clusters 。
replica sets: https://www.mongodb.com/docs/manual/replication/
sharded clusters: https://www.mongodb.com/docs/manual/sharding/
存储引擎
需要WiredTiger 存储引擎。
https://www.mongodb.com/docs/manual/core/wiredtiger/#std-label-storage-wiredtiger
副本集协议版本
需要副本集协议版本 1 (pv1)。
从版本 4.0 开始,MongoDB 仅支持 pv1。 pv1 是使用 MongoDB 3.2 或更高版本创建的所有新副本集的默认值。
权限
MongoDB Kafka 连接器需要 changeStream 和读取权限。
您可以使用以下示例进行简单授权。
更详细的授权请参考 MongoDB 数据库用户角色。
use admin;
db.createUser({
user: "flinkuser",
pwd: "flinkpw",
roles: [
{ role: "read", db: "admin" }, //read role includes changeStream privilege
{ role: "readAnyDatabase", db: "admin" } //for snapshot reading
]
});
如何创建 MongoDB CDC 表
MongoDB CDC 表可以定义如下:
-- register a MongoDB table 'products' in Flink SQL
CREATE TABLE products (
_id STRING, // must be declared
name STRING,
weight DECIMAL(10,3),
tags ARRAY<STRING>, -- array
price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document
suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database' = 'inventory',
'collection' = 'products'
);
-- read snapshot and change events from products collection
SELECT * FROM products;
注意
MongoDB 的更改事件记录在消息之前没有更新。 因此,我们只能将其转换为 Flink 的 UPSERT 变更日志流。 upsert 流需要唯一键,因此我们必须将 _id 声明为主键。 我们不能将其他列声明为主键,因为删除操作除了_id和分片键之外不包含键和值。
连接参数
//TODO 汉化表格,有时间再弄吧
原文连接:https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc.html#data-type-mapping
注意:heartbeat.interval.ms 如果集合变化缓慢,强烈建议设置一个大于 0 的适当值。 当我们从检查点或保存点恢复 Flink 作业时,心跳事件可以将 resumeToken 向前推送,以避免 resumeToken 过期。
可用元数据
以下格式元数据可以作为表定义中的只读 (VIRTUAL) 列公开。
Key | DataType | Description |
---|---|---|
database_name | STRING NOT NULL | Name of the database that contain the row. |
collection_name | STRING NOT NULL | Name of the collection that contain the row. |
op_ts | TIMESTAMP_LTZ(3) NOT NULL | 它表示在数据库中进行更改的时间。如果记录是从表的快照而不是更改流中读取的,则该值始终为 0. |
扩展的 CREATE TABLE 示例演示了公开这些元数据字段的语法:
CREATE TABLE products (
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
_id STRING, // must be declared
name STRING,
weight DECIMAL(10,3),
tags ARRAY<STRING>, -- array
price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document
suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database' = 'inventory',
'collection' = 'products'
);
特征
精确一次处理
MongoDB CDC 连接器是一个 Flink Source 连接器,它将首先读取数据库快照,然后继续读取更改流事件,即使发生故障,也只能进行一次处理。
Snapshot When Startup Or Not
配置选项 copy.existing 指定是否在 MongoDB CDC 消费者启动时进行快照。
默认为true。
Snapshot Data Filters
复制现有数据时描述过滤器的配置选项 copy.existing.pipeline。
这可以仅过滤所需的数据并改进复制管理器对索引的使用。
在以下示例中,$match 聚合运算符确保仅复制已关闭字段设置为 false 的文档。
'copy.existing.pipeline' = '[ { "$match": { "closed": "false" } } ]'
Change Streams
我们集成了 MongoDB 的官方 Kafka 连接器以从 MongoDB 读取快照或更改事件,并通过 Debezium 的 EmbeddedEngine 驱动它。
MongoDB’s official Kafka Connector: https://www.mongodb.com/docs/kafka-connector/current/source-connector/
Debezium 的 EmbeddedEngine 提供了一种在应用程序进程中运行单个 Kafka Connect SourceConnector 的机制,它可以正确驱动任何标准的 Kafka Connect SourceConnector,即使是 Debezium 不提供的。
我们选择 MongoDB 的官方 Kafka 连接器而不是 Debezium 的 MongoDB 连接器,因为它们使用不同的变更数据捕获机制。
对于 Debezium 的 MongoDB 连接器,它读取每个副本集主节点的 oplog.rs 集合。
对于 MongoDB 的 Kafka 连接器,它订阅了 MongoDB 的 Change Stream。
MongoDB 的 oplog.rs 集合不保留更改记录在状态之前的更新,因此很难通过单个 oplog.rs 记录提取完整的文档状态并将其转换为 Flink 接受的更改日志流(仅插入、更新插入、全部) . 此外,MongoDB 5(2021 年 7 月发布)更改了 oplog 格式,因此无法使用当前的 Debezium 连接器。
Change Stream 是 MongoDB 3.6 为副本集和分片集群提供的一项新功能,它允许应用程序访问实时数据更改,而不会出现拖尾 oplog 的复杂性和风险。
应用程序可以使用更改流来订阅单个集合、数据库或整个部署上的所有数据更改,并立即对其做出反应。
Lookup Full Document for Update Operations 是 Change Stream 提供的一项功能,可以配置更改流以返回更新文档的最新多数提交版本。 因为这个特性,我们可以很方便的收集到最新的完整文档,并将 change log 转换为 Flink 的 Upsert Changelog Stream。
顺便说一句,DBZ-435 提到的 Debezium 的 MongoDB 变更流探索正在路线图中。
如果做到了,我们可以考虑集成两种源连接器供用户选择。
DataStream Source
MongoDB CDC 连接器也可以是 DataStream 源。 您可以创建一个 SourceFunction,如下所示:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mongodb.MongoDBSource;
public class MongoDBSourceExample {
public static void main(String[] args) throws Exception {
SourceFunction<String> sourceFunction = MongoDBSource.<String>builder()
.hosts("localhost:27017")
.username("flink")
.password("flinkpw")
.databaseList("inventory") // set captured database, support regex
.collectionList("inventory.products", "inventory.orders") //set captured collections, support regex
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(sourceFunction)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute();
}
}
注意:如果使用 database regex ,则需要 readAnyDatabase 角色。
数据类型映射
这个就不翻译了,没啥意思
说明,本文参考自 cdc官网,仅个人学习所用,任何组织和个人不得作为商业用途!