摘要:本文整理自易车数据平台负责人王林红,在 Flink Forward Asia 2022 数据集成专场的分享。本篇内容主要分为四个部分:
- Flink 应用场景
- DTS 平台建设
- Flink CDC + Hudi 应用实践
- 未来规划
一、Flink 应用场景
Flink 在易车有丰富的应用场景,主要包含实时数仓建设和数据集成。
对于实时数仓建设,主要是数仓实时指标的开发,将离线指标逐步向实时指标过度;同时承接了公司各种实时大屏需求,并多次支持了公司内部的 818 购车节活动。
在实时监控方面,首先是日志监控,主要用来监控埋点情况,另外对于服务器日志,我们也进行统一收集和监控,监控服务响应和异常日志等,同时结合机器学习算法,做日志的聚类;在前端层面,监控前端接口超时、白屏等异常情况。最后也应用在一些业务实时监控、风控等场景。
在数据集成方面,使用 Flink 完成关系型数据库的实时接入,将 MySQL、SQL Server 等的数据实时接入到 Kafka 中;同时将 Kafka 的数据实时同步到 HDFS/Hive 中,实现数据的实时入仓、入湖;对于数据传输通道,我们使用 Flink 将 Kafka 的数据同步到下游存储引擎中,比如 TiDB、MySQL、ClickHouse、HDFS、Doris 等存储中,也实现一些异构数据源数据的实时同步。
二、DTS 平台建设
易车的数据集成主要分为两条线,一条离线数据集成,另一条是实时数据集成。本次主要介绍的是实时数据集成的演进过程,对于实时数据集成,最开始也是处在离线阶段,随着业务的发展,对数据的时效性越来越高,开始使用 Canal 同步 MySQL 的数据,然后使用 Spark 做微批计算,再之后引入 Flink,还是使用 Canal 同步接入 MySQL 数据,使用 Flink 进行数据的实时计算,再之后引入了 Flink CDC,基于 Flink CDC 做全增量一体化实时计算。
在使用 Canal+Flink 的早期阶段,整体流程如下,对于 MySQL 的数据,使用 Canal 通过解析 Binlog 的方式将数据同步到 Kafka 中,对于 SQL Server 的数据,通过 CDC 的方式同步到 Kafka 中,然后通过 Flink 进行加工计算,同步到下游系统如 HDFS 或 Kafka 中,在这个阶段,基本可以满足业务需求,也可以快速完成数据接入及后续开发。
但是也存在一些痛点问题,主要是,整个数据链路比较长,依赖的组件多,运维成本也比较高,另外 Canal 不支持全量数据的同步,全量和增量是割裂的两个阶段,并且对于不同数据源的接入,需要考虑不同的实时接入方案,维护也比较困难。
基于以上痛点问题,和我们的历史经验总结和评估,我们对数据集成工具提出了新的诉求。
希望可以分布式地去支撑大数据场景,工具能够线性扩展,可以方便的对接更多数据源。
希望用一个框架支撑流批一体的传输。
希望基于一个开源框架来开发,这个框架需要和 Hadoop 的整个生态有比较好地集成。并且我们的终极目标,是用一套统一的技术架构来覆盖离线和实时的所有数据集成场景。
基于以上诉求,我们把方案锁定在 Flink 技术栈中,决定基于 Flink CDC 自研实现流批一体的数据集成服务。为什么选择 Flink CDC?
-
Flink CDC 引入了无锁算法,读取阶段全程无锁,降低了因加锁而带来的对线上数据库的影响风险,同时降低了对数据库的压力。支持并发读取,在全量数据同步阶段,可以更快地完成海量数据同步,可以通过水平扩展节点数或增加并行度的方式来加快数据处理速度、加速海量数据的处理。
支持断点续传,全量阶段支持 Checkpoint,即使任务因某种原因退出了,也可通过保存的 Checkpoint 对任务进行恢复实现数据的断点续传。比如同步数据需要 1 天时间,但是同步任务运行 12 小时后失败了,不需要重跑整个数据同步任务,只需要从发生错误的位置重跑即可。
支持丰富的数据源,目前支持 MySQL、SQL Server、Oracle、TiDB、MongoDB 等,也方便的实现异构数据源的数据同步和数据融合。
端到端的一致性,支持 Exactly-Once 语义,保证全链路数据的准确性。
无缝对接 Flink 生态,复用 Flink 众多 Sink 能力。
Flink CDC 支持了丰富的数据源,源头支持 MySQL、Mongo、TiDB、Oracle、SQL Server 等,目标端支持 kakfa、Hudi、TiDB、Hive、Doris、ClickHouse 等。
同时 Flink CDC 作为新一代的数据集成框架,不仅可以替代传统的 DataX 和 Canal 做实时数据同步,将数据库的全量和增量数据一体化的同步到消息队列或下游系统中;也可以用于实时数据集成,将数据库数据实时入湖入仓;同时还支持强大的数据加工能力,可以通过 SQL API 或 DataStrean API 对数据库数据进行实时关联、打宽、聚合等。
在 2.0 版本中,Flink CDC 对于 MySQL CDC 支持了无锁读取、并发读取、全程断点续传等高级功能,实现 MySQL 数据的增量快照读取,在最新的 2.2 版本中,将增量快照读取算法抽象成了公共框架,也方便其他 Connector 的接入,其他 Connector 只需要接入这个框架就可以提供无锁算法,并发读取和断点续传的能力,十分方便其他连接器的扩展。
所以我们基于 Flink CDC 构建了 DTS 数据传输平台,在源端,目前已经集成支持了 MySQL、SQL Server、TiDB、Mongo、kafka 等数据源,在目标端,也集成了 Hudi、kafka、Doris、ClickHouse、HDFS、Hive 等数据源,方便业务进行数据实时入湖入仓,和异构数据源的传输、同步。
但是,在 DTS 平台建设过程中,我们也遇到了一些问题,比如元信息的字段映射,如何方便安全的将源库的字段类型映射成 Flink 的字段类型;在任务运行过程中,如何动态的增加新的同步表,包括如果业务源库字段变更了,下游系统如何处理?另外随着任务的增多,如何更好的对数据源信息进行维护,如一个业务库迁移,如何优雅的对任务中的数据源信息进行变更?
首先说元信息自动映射的问题,Flink CDC 支持丰富的数据源,这些数据源都需通过手工的方式映射成 Flink 的 DDL。手工映射表结构是比较繁琐的,尤其是数据源头多、映射关系比较复杂,每种数据源都有自己的映射关系,当表和字段数非常多的时候,手工映射也非常容易出错,对用户不友好,开发效率也不高。
为了解决上述问题,我们开发了统一数据源服务,我们将平台中使用到的数据源统一注册到数据源系统中,实现数据源的统一维护管理,同时实现表结构变更通知,影响分析等。
用户在实时计算平台创建表和创建同步任务时,选择对应的数据源,自动获取表的 Schema,通过模版化的方式创建表和数据同步任务,同时使用数据源 ID 对用户屏蔽连接串和账号密码信息,提升账号安全性。
另外数据源信息与任务信息关联,数据源变更或迁移,只需要修改数据源信息,降低修改成本。最后离线层的数据接入也依赖于统一数据源,离线和实时使用同一套元数据,便于流和批模型的统一。
上图是数据源改造前后的一个对比图,前面是原生的 MySQL 的流表,需要连接串、账号、密码信息,统一数据源之后,在链接串中只需要关注数据源 ID。
同时我们对 Connector 进行了改造,任务在执行时,会将具体的数据源 ID 替换为真实的链接串、账号和密码,对于 Kafka 流表也一样,在表创建时,只需要数据源 ID,任务执行时,会替换为 Kafka 的 Server 地址,对于 groupID,在任务中,通过 set 的方式进行设置。这种方式也方便我们进行后续 Kafka 集群的主备切换。对于其他的数据源,比如 Hbase、HDFS 等也做了类似的改造。
上图是我们自动建表的页面,选择对应的数据源,需要映射的源表,通过数据源服务自动获取源表的 Schema,自动做字段类型映射,通过模版化的方式,一键生成 Flink 的建表语句。
上图是数据同步的配置界面,用户主要选择对应的源和目标数据源、数据表,自动做字段映射,如果目标表不存在,会自动创建目标表。
得益于 MySQL CDC 动态加表功能,也可以在已有任务中,直接增加需要同步的表,添加的表会自动先同步该表的全量数据,然后再无缝切换到同步增量数据。遇到新增监控表时不用新起作业。同时也支持通过正则的方式配置分库分表的同步,另外对于源表字段变更,类型变更等,也做了一些适配。
接下来介绍下平台的整体架构,我们对 DTS 平台、调度平台、实时平台进行了深度的整合和集成,任务调度层集成到统一调度平台中,实现任务的统一管理,主要包含任务的运维管理、权限管理、资源管控、监控告警、和变量管理等。
对于实时平台,主要关注任务的开发、运维、和治理。
对于任务开发,提供 WebIDE 给用户进行任务的开发调试,同时提供语法智能校验、检测功能,便于用户发现代码语法问题。
对于任务运维,提供任务诊断、评分、健康检查、日志收集、作业快照(Checkpoint、Savapoint)、自动拉起、批量重启等功能,同时支持任务的容灾恢复。
对于任务治理,主要包含全链路的任务血缘和表血缘,方便的了解任务和表的血缘关系和对任务进行影响分析,还有数据层面质量监控,包括断流、数据量异常波动、数据比对等。
实时计算平台上主要支持 SQL 任务、Jar 包任务和 DTS 任务。对于 SQL 任务和 Jar 包任务,提供版本管理、资源管理:资源管理主要是将表、UDF、Connector 等资源统一管理,并通过模版化的方式和配置化的方式完成 Source、Sink 表的创建,降低用户开发成本;对于 TDS 任务,提供数据源管理、任务配置和数据校验等一些模块和服务。
通过服务平台化,打造一站式的任务开发管理平台,在平台上完成任务从开发到测试、发布、监控的全流程处理操作,降低用户使用平台的门槛。
对于核心的 DTS 数据传输架构如上,整个架构主要是基于 Flink 1.14 的 DataStream API 和 Flink CDC 2.2 构建,覆盖流批的场景,实现各种同步需求。
整个架构主要包含 Source 端、数据传输层、Sink 端, Source 和 Sink 端抽象出 SourceFactory 和 SinkFactory,方便实现对接各种类型数据源,在数据传输层,提供统一的基础服务框架,支持类型转换、自定义监控指标、数据校验等功能,如类型转换,DTS 中也支持了对 Canal 数据格式的适配。
目前 DTS 支持了公司内大部分数据传输管道,涵盖数据库,如 MySQL、SQL Server 和 TiDB 等;消息队列,如 Kafka、RocketMQ 等;以及大数据生态系统的各种组件,例如 HDFS、Hive、ClickHouse、Doris 等,覆盖了易车大部分实时流场景和少数离线场景。
这套数据集成架构如今在易车内部已稳定运行近一年时间,服务于众多产品线,整套架构对数据集成,有很大的收益。
统一了技术栈,通过 Flink 可以完成数据异构数据源的实时集成,同时支持流批一体。
通过平台化的操作,降低了数据接入、任务运维等的复杂度,也无需额外部署 Canal 等组件,降低运维成本,链路稳定性也得到了提升。
Flink CDC 全增量一体化的框架,解决了在数据集成方面全量、增量隔裂的痛点问题,实现了全增量一体化的数据集成。
三、Flink CDC + Hudi 应用实践
Flink CDC 的一个主要应用场景就是数据实时入湖,对于数据湖我们主要使用的 Hudi,Hudi 的主要特点如下:
Hudi 的 upsert 功能支持的比较成熟。
Hudi 的表文件可以存储在 HDFS 上,兼容 Hadoop 生态圈,可以使用 Hive、Spark、Presto 等引擎查询 Hudi 表。
Hudi 表的组织模式也很灵活,可以根据不同场景选择不同的表模式。
Hudi 已经集成了 Flink,便于我们计算引擎的统一。最后 Hudi 也有相对比较活跃的社区。
使用 Hudi 后,在没有引入 Flink CDC 之前,我们的数据入湖架构如下:
首先使用 Canal 通过解析 Binlog 的方式将增量数据同步到 Kafka 中,然后通过 DataX,将 MySQL 的全量数据同步到 HDFS 上,然后使用 bulk_insert 的方式初始化数据到 Hudi 中,完成全量数据的初始化,最后,使用 Flink 消费 Kafka 的数据,将数据写入到 Hudi 中,同时通过主键解决数据冲突问题。
大家可以看到,这个架构整体的链路比较长,操作频繁、维护成本比较高,涉及的组件比较多,对于完成数据接入工作量比较大,并且稳定性不好保证,如果一旦有数据问题,数据恢复、重导也是一件比较痛苦的事情。
在使用 Flink CDC 之后,结合 DTS 平台,架构如上。在 DTS 平台中,很方便的可以实现 MySQL 数据一键入湖,并且得益于 Flink CDC 全增量一体化框架,不用考虑全增量问题,同时也支持动态增加表功能,操作非常简单。
随着接入表的增多,对于同一个数据源下的数据同步任务,建立了过多数据库连接,导致 Binlog 重复读取,会对数据源库造成巨大的压力。另外有些 Task 同步的数据量很小,也会造成一定的资源浪费。
为了解决这个问题,我们使用 API 的方式读取数据,通过侧输出流的方式对 DataStream 进行分流,实现合并 Source 的功能。对于读取的同一数据源,同一任务只会建立一个数据库连接,Binlog 也只会读取一次,降低了对数据库的压力,方便的实现了单任务多表的数据实时入湖。
在数据实时同步写入 Hudi 时,Flink Hudi 的写入 Pipeline 算子如下。
第一个算子负责将快照数据+增量数据加载到 Flink 状态。接着经过一个 Bucket Assigner,它主要负责将已经转好的 HudiRecord 分配到特定的 File Group 中,接着分好 File Group 的 Record 会流入 Writer 算子执行真正的文件写入。再之后会接一个 Compaction 的算子,主要用来解决 MOR 表读放大的问题。
这个架构在实际的生产环境会遇到如下问题:
当数据量比较大的时候,Flink State 的膨胀会比较厉害,相应地会影响 Task 的写入速度以及 Checkpoint 的成功率。
对于 Compaction 算子,当在执行 Compaction 阶段时,会和数据读写算子进行资源的抢占,也会导致任务的背压、Checkpoint 超时等。
为了解决这个问题,我们把 Compaction 进行单独拆分,拆分为一个独立的调度任务,同时为了合并的合理性,对相关的合并计划也做了一些优化。
除此之外,我们还做了一些其他的优化和 bug 修复。
第一,在 Hudi 同步 Hive 分区时,会对 Hive 外表和 Hudi 表当前表结构、分区做比较,会获取 Hive 的所有分区,而我们在 Hive 层面对分区访问做了限制,超过分区数量限制,禁止访问,所以触发了该问题,我们通过修改源码,如果是分区表,对访问 Hive 的分区做了过滤,只访问最近一段时间的分区。
第二,在业务 MySQL 升级时,出现了混合模式的 Binlog,导致任务失败,也是修改了源码,忽略了一些 DML 的 Binlog 操作,具体 patch 可以参考 3319;
第三,解决了一些 Flink CDC 分片的 bug,如 Flink CDC 分片字段是 String 时,比较逻辑没有忽略大小写,导致抽取全量数据到内存,导致任务失败。Flink CDC 分片字段是 bigint 时,ID 差值较大,触发了 Flink CDC 的分片优化逻辑,但在优化逻辑后加载了大量数据到内存中,所以优化参数,降低数据分布因子等。
除此之外,还有一些其他优化实践,大部分可以查阅资料或在社区的帮助下解决,在这里再次感谢社区。
四、未来规划
简单介绍一下我们的未来规划:
第一,全量阶段的异步切片逻辑优化,目前数据全量读取阶段读取流程为首先通过主键对表进行 Snapshot Chunk 划分,再将 Snapshot Chunk 分发给多个 SourceReader。在所有 Snapshot Chunk 读取完成后,下发一个 Binlog chunk 进行增量部分的 Binlog 读取。Snapshot Chunk 划分及读取是顺序的,影响整个读取阶段的性能,导致大表全量接入阶段周期长。所以优化 Snapshot Chunk 划分切片的逻辑,增加异步读取策略,提升全量读取阶段性能
第二,目前我们的数据集成工具只覆盖了少量的离线场景,后续准备覆盖更多的离线数据集成场景。
第三,目前我们的实时数据集成的质量相对还比较薄弱,需要进一步加强和打磨。