Flink源码分析系列文档目录
请点击:Flink 源码分析系列文档目录
简介
FlinkKafkaConsumer
为Flink消费Kafka数据的连接器。在Flink中的角色为数据源。
FlinkKafkaConsumer的继承结构
如下图所示:
我们发现FlinkKafkaConsumer
继承自FlinkKafkaConsumerBase
。FlinkKafkaConsumerBase
又实现了SourceFunction
和RichFunction
接口。接下来我们重点分析它的open
和run
方法。
FlinkKafkaConsumerBase的open方法
该方法包含的内容为FlinkKafkaConsumer
的初始化逻辑。
首先设置提交offset的模式。
this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
getIsAutoCommitEnabled(),
enableCommitOnCheckpoints,
((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());
OffsetCommitMode
是一个枚举类型,具有如下三个值:
- DISABLED:完全禁用offset提交。
- ON_CHECKPOINTS:当checkpoint完成的时候再提交offset。
- KAFKA_PERIODIC:周期性提交offset。
判断OffsetCommitMode的逻辑封装在了OffsetCommitModes.fromConfiguration
方法中。该方法的代码如下:
public static OffsetCommitMode fromConfiguration(
boolean enableAutoCommit,
boolean enableCommitOnCheckpoint,
boolean enableCheckpointing) {
if (enableCheckpointing) {
// if checkpointing is enabled, the mode depends only on whether committing on checkpoints is enabled
return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED;
} else {
// else, the mode depends only on whether auto committing is enabled in the provided Kafka properties
return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;
}
}
这段代码逻辑可以总结为:
- 如果启用了checkpoint,并且启用了checkpoint完成时提交offset,返回ON_CHECKPOINTS。
- 如果未启用checkpoint,但是启用了自动提交,返回KAFKA_PERIODIC。
- 其他情况都返回DISABLED。
接下来创建和启动分区发现工具。代码如下所示:
this.partitionDiscoverer = createPartitionDiscoverer(
topicsDescriptor,
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks());
this.partitionDiscoverer.open();
subscribedPartitionsToStartOffsets = new HashMap<>();
final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
这段代码中topicsDescriptor为fixedTopics和topicPattern的封装。其中fixedTopics明确指定了topic的名称,称为固定topic。topicPattern为匹配topic名称的正则表达式,用于分区发现。
createPartitionDiscoverer
方法创建了一个KafkaPartitionDiscoverer
对象,主要负责Kafka分区发现。partitionDiscoverer.open()
方法创建出一个KafkaConsumer
。
subscribedPartitionsToStartOffsets
为已订阅的分区列表,这里将它初始化。
partitionDiscoverer.discoverPartitions()
用户获取所有fixedTopics和匹配topicPattern的Topic包含的所有分区信息。该部分代码稍后分析。
接下来open
方法的代码结构如下:
if (restoredState != null) {
// 从快照恢复逻辑...
} else {
// 直接启动逻辑...
}
如果consumer是从快照恢复的,restoredState不为空。反之restoredState为空。
我们首先分析一下从快照恢复的逻辑。代码如下:
// 如果restoredState没有存储某一分区的状态
// 需要重头消费该分区
for (KafkaTopicPartition partition : allPartitions) {
if (!restoredState.containsKey(partition)) {
restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
}
}
for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry : restoredState.entrySet()) {
// restoredFromOldState判断是否从更早的快照恢复(Flink 1.1和1.2)
// 如果为true,禁用分区发现
if (!restoredFromOldState) {
// seed the partition discoverer with the union state while filtering out
// restored partitions that should not be subscribed by this subtask
// KafkaTopicPartitionAssigner.assign方法返回需要执行消费该分区的子任务ID
// 此处可过滤掉不归该task负责的kafka分区
if (KafkaTopicPartitionAssigner.assign(
restoredStateEntry.getKey(), getRuntimeContext().getNumberOfParallelSubtasks())
== getRuntimeContext().getIndexOfThisSubtask()){
// 存入订阅的分区和起始offset信息
subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
}
} else {
// when restoring from older 1.1 / 1.2 state, the restored state would not be the union state;
// in this case, just use the restored state as the subscribed partitions
// 此处为了兼容Flink 1.1和1.2的快照设计
// 直接存入所有订阅的分区和起始offset信息
subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
}
}
// filterRestoredPartitionsWithCurrentTopicsDescriptor含义为是否
// 依照分区发现配置的topic正则表达式过滤分区
if (filterRestoredPartitionsWithCurrentTopicsDescriptor) {
// 过滤掉topic名称不符合topicsDescriptor的topicPattern的分区
subscribedPartitionsToStartOffsets.entrySet().removeIf(entry -> {
if (!topicsDescriptor.isMatchingTopic(entry.getKey().getTopic())) {
LOG.warn(
"{} is removed from subscribed partitions since it is no longer associated with topics descriptor of current execution.",
entry.getKey());
return true;
}
return false;
});
}
LOG.info("Consumer subtask {} will start reading {} partitions with offsets in restored state: {}",
getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets);
接下来我们分析下Consumer直接启动的逻辑(不从快照恢复)。
在此之前需要了解下StartupMode
这个枚举类型。该枚举类型有5个值:
- GROUP_OFFSETS:从保存在zookeeper或者是Kafka broker的对应消费者组提交的offset开始消费,这个是默认的配置
- EARLIEST:尽可能从最早的offset开始消费
- LATEST:从最近的offset开始消费
- TIMESTAMP:从用户提供的timestamp处开始消费
- SPECIFIC_OFFSETS:从用户提供的offset处开始消费
然后,Comsumer使用分区发现工具来获取初始的分区。根据StartupMode
来设置它们的起始消费offset。
我们先看SPECIFIC_OFFSETS这种情况。
case SPECIFIC_OFFSETS:
// 如果没有配置具体从哪个offset开始消费,程序抛出异常
if (specificStartupOffsets == null) {
throw new IllegalStateException(
"Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS +
", but no specific offsets were specified.");
}
for (KafkaTopicPartition seedPartition : allPartitions) {
// 获取每个分区指定的消费起始offset
Long specificOffset = specificStartupOffsets.get(seedPartition);
if (specificOffset != null) {
// since the specified offsets represent the next record to read, we subtract
// it by one so that the initial state of the consumer will be correct
// 如果分区配置了offset,设置从offset开始消费
subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1);
} else {
// default to group offset behaviour if the user-provided specific offsets
// do not contain a value for this partition
// 如果分区没有配置offset,设置从GROUP_OFFSET开始消费
subscribedPartitionsToStartOffsets.put(seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
}
}
break;
如果采用TIMESTAMP模式,逻辑如下所示:
case TIMESTAMP:
// 如果没有配置timestamp,程序报错退出
if (startupOffsetsTimestamp == null) {
throw new IllegalStateException(
"Startup mode for the consumer set to " + StartupMode.TIMESTAMP +
", but no startup timestamp was specified.");
}
// 根据timestamp获取分区的offset
// 遍历这些分区
for (Map.Entry<KafkaTopicPartition, Long> partitionToOffset
: fetchOffsetsWithTimestamp(allPartitions, startupOffsetsTimestamp).entrySet()) {
// 如果无offset,使用LATEST_OFFSET
// 如果获取到了offset,从这个offset开始消费
subscribedPartitionsToStartOffsets.put(
partitionToOffset.getKey(),
(partitionToOffset.getValue() == null)
// if an offset cannot be retrieved for a partition with the given timestamp,
// we default to using the latest offset for the partition
? KafkaTopicPartitionStateSentinel.LATEST_OFFSET
// since the specified offsets represent the next record to read, we subtract
// it by one so that the initial state of the consumer will be correct
: partitionToOffset.getValue() - 1);
}
break;
// 其他情况,使用KafkaTopicPartitionStateSentinel类对应的值作为offset
default:
for (KafkaTopicPartition seedPartition : allPartitions) {
subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());
}
接下来的if语句段负责打印一些日志信息。这里就不再分析了。
FlinkKafkaConsumerBase的run方法
run
方法包含了从KafkaConsumer消费数据,和向Flink下游发送数据的逻辑。
首先检查open
方法中初始化的subscribedPartitionsToStartOffsets
是否为null。
if (subscribedPartitionsToStartOffsets == null) {
throw new Exception("The partitions were not set for the consumer");
}
接下来配置成功commit和失败commit数量的监控。
// 设置成功提交计数监控
this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_SUCCEEDED_METRICS_COUNTER);
// 设置失败提交计数监控
this.failedCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);
// 获取子任务index
final int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask();
// 注册一个提交时的回调函数
this.offsetCommitCallback = new KafkaCommitCallback() {
@Override
public void onSuccess() {
// 提交成功,成功提交计数器加一
successfulCommits.inc();
}
@Override
public void onException(Throwable cause) {
LOG.warn(String.format("Consumer subtask %d failed async Kafka commit.", subtaskIndex), cause);
// 提交失败,失败提交计数器加一
failedCommits.inc();
}
};
接下来判断subscribedPartitionsToStartOffsets集合是否为空。如果为空,标记数据源的状态为暂时空闲。
if (subscribedPartitionsToStartOffsets.isEmpty()) {
sourceContext.markAsTemporarilyIdle();
}
下面是获取数据的过程。这里创建了一个KafkaFetcher,负责借助KafkaConsumer API从Kafka broker获取数据。
this.kafkaFetcher = createFetcher(
sourceContext,
subscribedPartitionsToStartOffsets,
periodicWatermarkAssigner,
punctuatedWatermarkAssigner,
(StreamingRuntimeContext) getRuntimeContext(),
offsetCommitMode,
getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
useMetrics);
接下来检测running
变量的状态。如果没有running,直接返回。
if (!running) {
return;
}
最后是根据分区发现间隔时间的配置来确定是否启动分区的定时发现任务。
// 如果没有配置分区定时发现间隔时间
if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {
// 直接启动获取数据任务
kafkaFetcher.runFetchLoop();
} else {
// 否则,启动定期分区发现任务和数据获取任务
runWithPartitionDiscovery();
}
最后我们分析下runWithPartitionDiscovery方法。代码如下:
private void runWithPartitionDiscovery() throws Exception {
final AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference<>();
// 启动分区发现定时任务
createAndStartDiscoveryLoop(discoveryLoopErrorRef);
// 启动kafka broker数据获取任务
kafkaFetcher.runFetchLoop();
// make sure that the partition discoverer is waked up so that
// the discoveryLoopThread exits
// 使partitionDiscoverer.discoverPartitions()抛异常
// 能够从discoveryLoopThread 返回
partitionDiscoverer.wakeup();
// 等待discoveryLoopThread 执行完毕
joinDiscoveryLoopThread();
// rethrow any fetcher errors
final Exception discoveryLoopError = discoveryLoopErrorRef.get();
if (discoveryLoopError != null) {
throw new RuntimeException(discoveryLoopError);
}
}
我们再跟踪下看看如何启动分区发现定时任务的。
private void createAndStartDiscoveryLoop(AtomicReference<Exception> discoveryLoopErrorRef) {
// 创建一个discoveryLoop线程
discoveryLoopThread = new Thread(() -> {
try {
// --------------------- partition discovery loop ---------------------
// throughout the loop, we always eagerly check if we are still running before
// performing the next operation, so that we can escape the loop as soon as possible
while (running) {
if (LOG.isDebugEnabled()) {
LOG.debug("Consumer subtask {} is trying to discover new partitions ...", getRuntimeContext().getIndexOfThisSubtask());
}
final List<KafkaTopicPartition> discoveredPartitions;
try {
// 尝试发现新分区,如果方法抛出异常,退出循环
discoveredPartitions = partitionDiscoverer.discoverPartitions();
} catch (AbstractPartitionDiscoverer.WakeupException | AbstractPartitionDiscoverer.ClosedException e) {
// the partition discoverer may have been closed or woken up before or during the discovery;
// this would only happen if the consumer was canceled; simply escape the loop
break;
}
// no need to add the discovered partitions if we were closed during the meantime
// 如果没有发现新的分区,或者数据源已关闭之时,没必要再添加新分区
if (running && !discoveredPartitions.isEmpty()) {
kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
}
// do not waste any time sleeping if we're not running anymore
if (running && discoveryIntervalMillis != 0) {
try {
// 睡眠discoveryIntervalMillis时间
Thread.sleep(discoveryIntervalMillis);
} catch (InterruptedException iex) {
// may be interrupted if the consumer was canceled midway; simply escape the loop
break;
}
}
}
} catch (Exception e) {
discoveryLoopErrorRef.set(e);
} finally {
// calling cancel will also let the fetcher loop escape
// (if not running, cancel() was already called)
if (running) {
cancel();
}
}
}, "Kafka Partition Discovery for " + getRuntimeContext().getTaskNameWithSubtasks());
// 启动分区发现定时任务线程
discoveryLoopThread.start();
}
我们再详细研究下上述方法中partitionDiscoverer.discoverPartitions()
的调用,即发现分区的执行过程。代码如下:
public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, ClosedException {
// 确保没有关闭数据源,也没有wakeup
if (!closed && !wakeup) {
try {
List<KafkaTopicPartition> newDiscoveredPartitions;
// (1) get all possible partitions, based on whether we are subscribed to fixed topics or a topic pattern
// 如果配置了fixedTopic,获取这些topic的分区
if (topicsDescriptor.isFixedTopics()) {
newDiscoveredPartitions = getAllPartitionsForTopics(topicsDescriptor.getFixedTopics());
} else {
// 如果没有配置fixedTopic
// 1. 获取所有topic
List<String> matchedTopics = getAllTopics();
// retain topics that match the pattern
// 2. 逐个排除名字不是fixedTopic,或名字不匹配topicPattern的topic
Iterator<String> iter = matchedTopics.iterator();
while (iter.hasNext()) {
if (!topicsDescriptor.isMatchingTopic(iter.next())) {
iter.remove();
}
}
if (matchedTopics.size() != 0) {
// get partitions only for matched topics
// 3. 如果有匹配的topic,获取他们的分区
newDiscoveredPartitions = getAllPartitionsForTopics(matchedTopics);
} else {
// 否则newDiscoveredPartitions 设置为null
newDiscoveredPartitions = null;
}
}
// (2) eliminate partition that are old partitions or should not be subscribed by this subtask
if (newDiscoveredPartitions == null || newDiscoveredPartitions.isEmpty()) {
throw new RuntimeException("Unable to retrieve any partitions with KafkaTopicsDescriptor: " + topicsDescriptor);
} else {
Iterator<KafkaTopicPartition> iter = newDiscoveredPartitions.iterator();
KafkaTopicPartition nextPartition;
while (iter.hasNext()) {
nextPartition = iter.next();
// 分区存入discoveredPartitions集合中
// 返回值为分区是否归当前task消费
if (!setAndCheckDiscoveredPartition(nextPartition)) {
iter.remove();
}
}
}
return newDiscoveredPartitions;
} catch (WakeupException e) {
// the actual topic / partition metadata fetching methods
// may be woken up midway; reset the wakeup flag and rethrow
wakeup = false;
throw e;
}
} else if (!closed && wakeup) {
// may have been woken up before the method call
wakeup = false;
throw new WakeupException();
} else {
throw new ClosedException();
}
}
kafkaFetcher的runFetchLoop方法
此方法为FlinkKafkaConsumer获取数据的主入口,通过一个循环来不断获取kafka broker的数据。
@Override
public void runFetchLoop() throws Exception {
try {
// Handover负责在KafkaConsumerThread和KafkaFetcher之前传递数据
final Handover handover = this.handover;
// kick off the actual Kafka consumer
// 启动kafka消费线程,定期从kafkaConsumer拉取数据并转交给handover对象
consumerThread.start();
while (running) {
// this blocks until we get the next records
// it automatically re-throws exceptions encountered in the consumer thread
// 获取handover中的数据
// 如果此时consumerThread尚未把数据交给handover,该方法会阻塞
final ConsumerRecords<byte[], byte[]> records = handover.pollNext();
// get the records for each topic partition
for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates()) {
// 获取属于该分区的records
List<ConsumerRecord<byte[], byte[]>> partitionRecords =
records.records(partition.getKafkaPartitionHandle());
//
for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
// 反序列化Kafka record为bean
// 此deserializer需要实现KafkaDeserializationSchema接口
final T value = deserializer.deserialize(record);
// 如果数据源已到末尾,停止fetcher循环
if (deserializer.isEndOfStream(value)) {
// end of stream signaled
running = false;
break;
}
// emit the actual record. this also updates offset state atomically
// and deals with timestamps and watermark generation
// 发送数据,更新offset,生成timestamp和watermark
emitRecord(value, partition, record.offset(), record);
}
}
}
}
finally {
// this signals the consumer thread that no more work is to be done
consumerThread.shutdown();
}
// on a clean exit, wait for the runner thread
try {
consumerThread.join();
}
catch (InterruptedException e) {
// may be the result of a wake-up interruption after an exception.
// we ignore this here and only restore the interruption state
Thread.currentThread().interrupt();
}
}
此方法中的collect kafka数据的逻辑在emitRecord
中。我们查看下它的代码:
protected void emitRecord(
T record,
KafkaTopicPartitionState<TopicPartition> partition,
long offset,
ConsumerRecord<?, ?> consumerRecord) throws Exception {
emitRecordWithTimestamp(record, partition, offset, consumerRecord.timestamp());
}
它调用了emitRecordWithTimestamp
方法,继续查看。
protected void emitRecordWithTimestamp(
T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long timestamp) throws Exception {
if (record != null) {
// 如果不需要发送timestamp和watermark
if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
// fast path logic, in case there are no watermarks generated in the fetcher
// emit the record, using the checkpoint lock to guarantee
// atomicity of record emission and offset state update
// 不能和checkpoint操作并发执行,需要保持更新offset操作的原子性
synchronized (checkpointLock) {
// 此处调用SourceFunction中的sourceContext
// 数据源收集元素逻辑在此
sourceContext.collectWithTimestamp(record, timestamp);
// 更新分区状态的state
partitionState.setOffset(offset);
}
} else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
// 如果是周期发送watermark
emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset, timestamp);
} else {
// 反之是PunctuatedWatermark
emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset, timestamp);
}
} else {
// if the record is null, simply just update the offset state for partition
synchronized (checkpointLock) {
partitionState.setOffset(offset);
}
}
}
接下来分别分析下emitRecordWithTimestampAndPeriodicWatermark
和emitRecordWithTimestampAndPunctuatedWatermark
处理逻辑。
emitRecordWithTimestampAndPeriodicWatermark方法。
private void emitRecordWithTimestampAndPeriodicWatermark(
T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long kafkaEventTimestamp) {
@SuppressWarnings("unchecked")
final KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> withWatermarksState =
(KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>) partitionState;
// extract timestamp - this accesses/modifies the per-partition state inside the
// watermark generator instance, so we need to lock the access on the
// partition state. concurrent access can happen from the periodic emitter
final long timestamp;
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (withWatermarksState) {
// 获取timestamp
// 因为TimestampsAndPeriodicWatermarksOperator具有定时触发发送watermark的任务,需要考虑并发访问的问题
// 此处用到了timestampExtractor。需要防止并发修改内部状态引发的问题
timestamp = withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp);
}
// emit the record with timestamp, using the usual checkpoint lock to guarantee
// atomicity of record emission and offset state update
synchronized (checkpointLock) {
// 收集数据和timestamp
sourceContext.collectWithTimestamp(record, timestamp);
// 更新分区的offset
partitionState.setOffset(offset);
}
}
emitRecordWithTimestampAndPunctuatedWatermark
方法。
private void emitRecordWithTimestampAndPunctuatedWatermark(
T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long kafkaEventTimestamp) {
@SuppressWarnings("unchecked")
final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) partitionState;
// only one thread ever works on accessing timestamps and watermarks
// from the punctuated extractor
// punctuated extractor不用考虑并发访问的问题
// 获取timestamp和watermark
final long timestamp = withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp);
final Watermark newWatermark = withWatermarksState.checkAndGetNewWatermark(record, timestamp);
// emit the record with timestamp, using the usual checkpoint lock to guarantee
// atomicity of record emission and offset state update
synchronized (checkpointLock) {
// 收集数据发送给下游
sourceContext.collectWithTimestamp(record, timestamp);
// 设置分区的offset
partitionState.setOffset(offset);
}
// if we also have a new per-partition watermark, check if that is also a
// new cross-partition watermark
if (newWatermark != null) {
// 此处发送watermark
// watermark值为此subtask负责的各个分区的watermark的最小值
updateMinPunctuatedWatermark(newWatermark);
}
}
注:具体periodic watermark 和 punctuated watermark的区别请参考Flink 源码之时间处理
KafkaConsumerThread
KafkaConsumerThread负责在单独的线程中从Kafka中拉取数据到handover。这里我们分析下它的run方法中获取数据的部分。
while (running) {
// check if there is something to commit
// 检查是否则commit过程中
if (!commitInProgress) {
// get and reset the work-to-be committed, so we don't repeatedly commit the same
// 获取需要提交的offset值,以及commit回调函数
// 获取完毕之后需要设置为null,防止反复提交
final Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback> commitOffsetsAndCallback =
nextOffsetsToCommit.getAndSet(null);
if (commitOffsetsAndCallback != null) {
log.debug("Sending async offset commit request to Kafka broker");
// also record that a commit is already in progress
// the order here matters! first set the flag, then send the commit command.
// 开始提交过程
commitInProgress = true;
// 异步提交offset
consumer.commitAsync(commitOffsetsAndCallback.f0, new CommitCallback(commitOffsetsAndCallback.f1));
}
}
// 为consumer指定新的分区
// 由于分区发现功能的存在,consumer需要添加新发现的分区,否则poll数据会报错
try {
if (hasAssignedPartitions) {
newPartitions = unassignedPartitionsQueue.pollBatch();
}
else {
// if no assigned partitions block until we get at least one
// instead of hot spinning this loop. We rely on a fact that
// unassignedPartitionsQueue will be closed on a shutdown, so
// we don't block indefinitely
newPartitions = unassignedPartitionsQueue.getBatchBlocking();
}
if (newPartitions != null) {
reassignPartitions(newPartitions);
}
} catch (AbortedReassignmentException e) {
continue;
}
if (!hasAssignedPartitions) {
// Without assigned partitions KafkaConsumer.poll will throw an exception
continue;
}
// get the next batch of records, unless we did not manage to hand the old batch over
if (records == null) {
try {
// 从consumer拉取数据
// 这里的pollTimeout可以通过配置flink.poll-timeout参数修改
// pollTimeout默认值为100ms
records = consumer.poll(pollTimeout);
}
catch (WakeupException we) {
continue;
}
}
try {
// 将数据交给handover
handover.produce(records);
records = null;
}
catch (Handover.WakeupException e) {
// fall through the loop
}
}
checkpoint流程
checkpoint流程大部分代码为状态的读写。这里为大家总结下主要的流程,不分析具体的代码。
snapshotState
方法
FlinkKafkaConsumerBase
的snapshotState
方法包含snapshot的流程。包含如下:
- 如果
KafkaFetcher
尚未初始化完毕。需要保存已订阅的topic连同他们的初始offset。 - 如果
KafkaFetcher
已初始化完毕,调用fetcher的snapshotCurrentState
方法。 - 如果
offsetCommitMode
为ON_CHECKPOINTS
类型,还需要将topic和offset写入到pendingOffsetsToCommit
集合中。该集合用于checkpoint成功的时候向Kafka broker提交offset。(offsetCommitMode
不为ON_CHECKPOINTS
和DISABLED
的时候,使用的是自动提交offset的模式)
notifyCheckpointComplete
方法
在所有的operator都快照成功的时候,会向JobManager的CheckpointCoordinator
发送确认消息,然后coordinator会通知各个operator checkpoint已经完成。(详细请参见Flink 源码之快照
)为了保证保证数据不会被遗漏和重复消费,ON_CHECKPOINTS模式运行的FlinkKafkaConsumer
只能在这个时候提交offset到kafka consumer。调用notifyCheckpointComplete
的时候通知kafka consumer,将checkpoint之时保存的各个分区的offset提交给kafka broker。从而保证数据的一致性。
本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。