FLINK-CDC 之 MongoDB

MongoDB CDC 连接器允许从 MongoDB 读取快照数据和增量数据。

依赖项

为了设置 MongoDB CDC 连接器,下表提供了使用构建自动化工具(例如 Maven 或 SBT)和带有 SQL JAR 包的 SQL 客户端的两个项目的依赖关系信息。

Maven

<dependency>
  <groupId>com.ververica</groupId>
  <artifactId>flink-connector-mongodb-cdc</artifactId>
  <!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. -->
  <version>2.3-SNAPSHOT</version>
</dependency>

SQL Client JAR

下载或者自己根据源码来获取 flink-sql-connector-mongodb-cdc-2.3-SNAPSHOT.jar,放到<FLINK_HOME>/lib/ 下

设置 MongoDB

MongoDB版本

MongoDB 版本 >= 3.6
我们使用变更流功能 change streams (3.6 版中的新功能)来捕获变更数据。

原理请看:
https://www.mongodb.com/docs/manual/changeStreams/

集群部署

需要副本集 replica sets 或 分片集群 sharded clusters 。
replica sets: https://www.mongodb.com/docs/manual/replication/
sharded clusters: https://www.mongodb.com/docs/manual/sharding/

存储引擎

需要WiredTiger 存储引擎。
https://www.mongodb.com/docs/manual/core/wiredtiger/#std-label-storage-wiredtiger

副本集协议版本

需要副本集协议版本 1 (pv1)。
从版本 4.0 开始,MongoDB 仅支持 pv1。 pv1 是使用 MongoDB 3.2 或更高版本创建的所有新副本集的默认值。

权限

MongoDB Kafka 连接器需要 changeStream 和读取权限。
您可以使用以下示例进行简单授权。
更详细的授权请参考 MongoDB 数据库用户角色。

use admin;
db.createUser({
  user: "flinkuser",
  pwd: "flinkpw",
  roles: [
    { role: "read", db: "admin" }, //read role includes changeStream privilege 
    { role: "readAnyDatabase", db: "admin" } //for snapshot reading
  ]
});

如何创建 MongoDB CDC 表

MongoDB CDC 表可以定义如下:

-- register a MongoDB table 'products' in Flink SQL
CREATE TABLE products (
  _id STRING, // must be declared
  name STRING,
  weight DECIMAL(10,3),
  tags ARRAY<STRING>, -- array
  price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document
  suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb-cdc',
  'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
  'username' = 'flinkuser',
  'password' = 'flinkpw',
  'database' = 'inventory',
  'collection' = 'products'
);

-- read snapshot and change events from products collection
SELECT * FROM products;

注意

MongoDB 的更改事件记录在消息之前没有更新。 因此,我们只能将其转换为 Flink 的 UPSERT 变更日志流。 upsert 流需要唯一键,因此我们必须将 _id 声明为主键。 我们不能将其他列声明为主键,因为删除操作除了_id和分片键之外不包含键和值。

连接参数

//TODO 汉化表格,有时间再弄吧
原文连接:https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc.html#data-type-mapping

注意:heartbeat.interval.ms 如果集合变化缓慢,强烈建议设置一个大于 0 的适当值。 当我们从检查点或保存点恢复 Flink 作业时,心跳事件可以将 resumeToken 向前推送,以避免 resumeToken 过期。

可用元数据

以下格式元数据可以作为表定义中的只读 (VIRTUAL) 列公开。

Key DataType Description
database_name STRING NOT NULL Name of the database that contain the row.
collection_name STRING NOT NULL Name of the collection that contain the row.
op_ts TIMESTAMP_LTZ(3) NOT NULL 它表示在数据库中进行更改的时间。如果记录是从表的快照而不是更改流中读取的,则该值始终为 0.

扩展的 CREATE TABLE 示例演示了公开这些元数据字段的语法:

CREATE TABLE products (
    db_name STRING METADATA FROM 'database_name' VIRTUAL,
    table_name STRING METADATA  FROM 'table_name' VIRTUAL,
    operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
    _id STRING, // must be declared
    name STRING,
    weight DECIMAL(10,3),
    tags ARRAY<STRING>, -- array
    price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document
    suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
    PRIMARY KEY(_id) NOT ENFORCED
) WITH (
    'connector' = 'mongodb-cdc',
    'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
    'username' = 'flinkuser',
    'password' = 'flinkpw',
    'database' = 'inventory',
    'collection' = 'products'
);

特征

精确一次处理

MongoDB CDC 连接器是一个 Flink Source 连接器,它将首先读取数据库快照,然后继续读取更改流事件,即使发生故障,也只能进行一次处理。

Snapshot When Startup Or Not

配置选项 copy.existing 指定是否在 MongoDB CDC 消费者启动时进行快照。
默认为true。

Snapshot Data Filters

复制现有数据时描述过滤器的配置选项 copy.existing.pipeline。
这可以仅过滤所需的数据并改进复制管理器对索引的使用。

在以下示例中,$match 聚合运算符确保仅复制已关闭字段设置为 false 的文档。

'copy.existing.pipeline' = '[ { "$match": { "closed": "false" } } ]'

Change Streams

我们集成了 MongoDB 的官方 Kafka 连接器以从 MongoDB 读取快照或更改事件,并通过 Debezium 的 EmbeddedEngine 驱动它。
MongoDB’s official Kafka Connector: https://www.mongodb.com/docs/kafka-connector/current/source-connector/

Debezium 的 EmbeddedEngine 提供了一种在应用程序进程中运行单个 Kafka Connect SourceConnector 的机制,它可以正确驱动任何标准的 Kafka Connect SourceConnector,即使是 Debezium 不提供的。

我们选择 MongoDB 的官方 Kafka 连接器而不是 Debezium 的 MongoDB 连接器,因为它们使用不同的变更数据捕获机制。

对于 Debezium 的 MongoDB 连接器,它读取每个副本集主节点的 oplog.rs 集合。
对于 MongoDB 的 Kafka 连接器,它订阅了 MongoDB 的 Change Stream。

MongoDB 的 oplog.rs 集合不保留更改记录在状态之前的更新,因此很难通过单个 oplog.rs 记录提取完整的文档状态并将其转换为 Flink 接受的更改日志流(仅插入、更新插入、全部) . 此外,MongoDB 5(2021 年 7 月发布)更改了 oplog 格式,因此无法使用当前的 Debezium 连接器。

Change Stream 是 MongoDB 3.6 为副本集和分片集群提供的一项新功能,它允许应用程序访问实时数据更改,而不会出现拖尾 oplog 的复杂性和风险。
应用程序可以使用更改流来订阅单个集合、数据库或整个部署上的所有数据更改,并立即对其做出反应。
Lookup Full Document for Update Operations 是 Change Stream 提供的一项功能,可以配置更改流以返回更新文档的最新多数提交版本。 因为这个特性,我们可以很方便的收集到最新的完整文档,并将 change log 转换为 Flink 的 Upsert Changelog Stream。

顺便说一句,DBZ-435 提到的 Debezium 的 MongoDB 变更流探索正在路线图中。
如果做到了,我们可以考虑集成两种源连接器供用户选择。

DataStream Source

MongoDB CDC 连接器也可以是 DataStream 源。 您可以创建一个 SourceFunction,如下所示:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mongodb.MongoDBSource;

public class MongoDBSourceExample {
    public static void main(String[] args) throws Exception {
        SourceFunction<String> sourceFunction = MongoDBSource.<String>builder()
                .hosts("localhost:27017")
                .username("flink")
                .password("flinkpw")
                .databaseList("inventory") // set captured database, support regex
                .collectionList("inventory.products", "inventory.orders") //set captured collections, support regex
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.addSource(sourceFunction)
                .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

        env.execute();
    }
}

注意:如果使用 database regex ,则需要 readAnyDatabase 角色。

数据类型映射

这个就不翻译了,没啥意思


说明,本文参考自 cdc官网,仅个人学习所用,任何组织和个人不得作为商业用途!

更多内容,请阅读原文:
https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc.html#data-type-mapping

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,179评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,229评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,032评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,533评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,531评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,539评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,916评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,813评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,568评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,654评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,354评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,937评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,918评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,152评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,852评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,378评论 2 342