Apache Flink的Exactly-Once机制
Apache Flink是目前市场最受关注的流计算处理引擎,相较于Spark Streaming的依托Spark Core实现的微批处理模型,Flink是一个纯粹的流处理引擎,其基于操作符的连续流模型,可以达到微秒级别的延迟。
Flink实现了流批一体化模式,实现按照事件处理和无序处理两种形式,基于内存计算。强大高效的反压机制和内存管理,基于轻量级分布式快照checkpoint机制,从而自动实现了Exactly-Once一致性语义。
1. 数据源端
支持可靠的数据源(如kafka), 数据可重读
Apache Flink内置FlinkKafkaConsumer010类,不依赖于 kafka 内置的消费组offset管理,在内部自行记录和维护 consumer 的offset。
Flink 自己管理offset(手动提交offset),并保持到checkpoint中
API内部集成了Flink Checkpoint 机制, 自动实现了精确一次的处理语义
(类似于Spark的offset位移管理,但实现机制不同)
2. Flink消费端
轻量级快照机制: 一致性checkpoint检查点
Flink采用了一种轻量级快照机制(检查点checkpoint)来保障Exactly-Once的一致性语义。所谓的一致检查点,即在某个时间点上所有任务状态的一份拷贝(快照)。该时间点是所有任务刚好处理完一个相同数据的时间。
- 一致性检查点
间隔时间自动执行分布式一致性检查点(Checkpoints)程序,异步插入barrier检查点分界线,内存状态自动存储为cp进程文件。保证数据Exactly Oncey精确一次处理。
(1) 从source(Input)端开始,JobManager会向每个source(Input)发送检查点barrier消息,启动检查点。在保证所有的source(Input)数据都处理完成后,Flink开始保存具体的一致性检查点checkpoints,并在过程中启用barrier检查点分界线。
(2) 接收数据和barrier消息,两个过程异步进行。在所有的source(Input)数据都处理完成后,开始将自己的检查点(checkpoints)保存到状态后(StateBackend)中,并通知JobManager将Barrier分发到下游
(3) barrier向下游传递时,会进行barrier对齐确认。待barrier都到齐后才进行checkpoints检查点保存。
(4) 重复以上操作,直到整个流程完成。
env.enableCheckPoint( interval = 1000L , CheckPointingMode.EXACTLY_ONCE) //启用checkpoint
env.getCheckPointConfig.setCheckPointTimeout(60000L) //超时时间
env.getCheckPointConfig.setMaxConcurrentCheckPoints(2) //最大并行的CheckPoint数量
env.getCheckPointConfig.setMinPauseBetweenCheckPoint(500) //最小的checkpoint间隔时间
env.getCheckPointConfig.setFailOnCheckPointingErrors(false) //checkpoint失败是否导致job失败
3. 输出端
与上文Spark的输出端Exactly-Once一致性上实现类似,除了目标源需要满足一定条件以外,Flink内置的二阶段提交机制也变相实现了事务一致性。**支持幂等写入、事务写入机制(二阶段提交) **
- 幂等写入
这一块和上文Spark的幂写入特性内容一致,即相同Key/ID 更新写入,数据不变。借助支持主键唯一性约束的存储系统,实现幂等性写入数据,此处将不再继续赘述。
- 事务写入: 二阶段提交 + WAL预写日志
Flink在处理完source端数据接收和operator算子计算过程,待过程中所有的checkpoint都完成后,准备发送数据到sink端,此时启动事务。其中存在两种方式: (1) WAL预写日志: 将计算结果先写入到日志缓存(状态后端/WAL)中,等checkpoint确认完成后一次性写入到sink。(2) 二阶段提交: 对于每个checkpoint创建事务,先预提交数据到sink中,然后等所有的checkpoint全部完成后再真正提交请求到sink, 并把状态改为已确认。
整体思想: 为checkpoint创建事务,等到所有的checkpoint全部真正的完成后,才把计算结果写入到sink中。