上回消费者已经把拉取消息之前的准备工作已经完成了,接下来就进行消息的拉取了。
轮询流程
首先看看最外层的poll方法
KafkaConsumer.poll
public ConsumerRecords<K, V> poll(long timeout) {
acquire();
try {
long start = time.milliseconds(); // 开始时间
long remaining = timeout; // 剩余时间
// 客户端调用一次poll方法会调用多次的pollOnce方法
// 目的是为了一次调用尽可能的获取到消息进行消费
do {
Map<TopicPartition,List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
if (!records.isEmpty()) {
// 只要有结果就立即返回,不要再等待
// 在返回数据之前,发送下次的拉取(fetch)请求,避免用户在下次获取数据时block,从而更快的返回数据
fetcher.sendFetches();
client.pollNoWakeup(); // 无阻赛的轮询
if (this.interceptors == null)
return new ConsumerRecords<>(records);
else
return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
long elapsed = time.milliseconds() - start; // 一个轮询消耗的时间
remaining = timeout - elapsed; // 更新剩余时间,elapsed的值如果一直没有拉取到数据的情况下只会越来越大
} while (remaining > 0);
return ConsumerRecords.empty();
} finally {
release();
}
}
客户端轮询中会指定一个最长的等待时间,如果超过最长等待时间,还没有拉取到任何数据,就会返回一个空的集合。最长等待时间保证了在这段时间内如果没有数据就会一直去拉取数据,直到超时,如果最长等待时间内某一次拉取到数据,就马上返回。
客户端在每次轮询拉取到数据后,会马上发送一个新的拉取请求,并通过网络轮询把新的拉取请求发送出去后再返回拉取到的数据给客户端。这样一次轮询就会发送两次拉取请求,就会产生新请求的结果,新请求的结果会留给下一次轮询来获取。当前轮询只会获取和处理前一个请求的结果。那么一次轮询中发送的第二个拉取请求,必须满足以下条件:
第一:不能影响现有的主流程,因为要返回拉取的数据给客户端。
第二:不能影响第一次拉取的数据,因为发送了两次拉取请求。第二次的拉取请求的结果不能影响第一次的拉取数据。
基于以上的两个条件:kafka是怎么做的喃?
第一:发送拉取请求后返回的是一个异步请求对象,调用方法会立即返回主流程。因此不会影响主流程。
第二:快速轮询将新的拉取请求发送出去,不会等待获取响应结果(无阻塞)。所以不会去影响第一次拉取的数据返回给客户端。快速轮询:网络层轮询-->选择器轮询-->java的selectNow()。
每次的轮询都会提前发送下一个拉取请求(除了第一个轮询)。一个完整的:发送请求,轮询,获取结果的流程分开在两次轮询中。比如:第二个拉取请求的发送和轮询发生在第一次的轮询里面,而获取第二个请求的结果在第二次轮询里面。
拉取消息
客户端必须和分区的主副本节点进行交互。为了减少客户端和服务端的网络交互次数。客户端不以分配到的分区作为粒度来和服务费进行交互。而是以服务端节点为粒度来创建拉取请求。把各个分区的主副本节点在相同节点上的封装成一个拉取请求,发送到该节点所在的服务端。
一个拉取请求需要指定从分区的什么位置开始拉取消息(消费者的订阅状态(SubscriptionState)保存了分配给消费者所有分区的拉取偏移量),再从集群的元数据信息中获取分区主副本节点信息,就可以构建拉取请求向目标节点发送拉取请求了。
发送拉取请求
发送拉取请求由拉取器(Fetcher)来完成。
public void sendFetches() {
// 创建拉取请求,每一个节点都对应一个请求
for (Map.Entry<Node,FetchRequest> fetchEntry : createFetchRequests().entrySet()) {
final FetchRequest request = fetchEntry.getValue();
final Node fetchTarget = fetchEntry.getKey();
// 每个拉取请求的key表示请求发送的目标节点
// 发送拉取请求,并设置相应的Listener
// 请求处理成功,按照tp维度加入completedFetches集合
client.send(fetchTarget, ApiKeys.FETCH, request) .addListener(new RequestFutureListener<ClientResponse>() {
/** * 拉取器处理拉取结果 */
@Override public void onSuccess(ClientResponse resp) {
FetchResponse response = new FetchResponse(resp.responseBody());
if (!matchesRequestedPartitions(request, response)) { return; }
Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet()); FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);
// 因为拉取请求包括多个分区,所以返回结果也有多个分区
for (Map.Entry<TopicPartition,FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
TopicPartition partition = entry.getKey();
// 订阅状态中保存的拉取偏移量
long fetchOffset = request.fetchData().get(partition).offset;
FetchResponse.PartitionData fetchData = entry.getValue();
// 把完成拉取的请求响应添加到队列中,以便在下一次客户端轮询中获取拉取消息 completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator));
}
sensors.fetchLatency.record(resp.requestLatencyMs()); sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
}
@Override
public void onFailure(RuntimeException e) {
log.debug("Fetch request to {} failed", fetchTarget, e);
}
});
}
}
// 通过客户端轮询把拉取请求发送出去
client.poll(pollTimeout, now, new PollCondition() {
@Override
public boolean shouldBlock() {
// 是否有完成的拉取请求
return !fetcher.hasCompletedFetches();
}
});
拉取请求的发送采用异步和轮询的方式,client.send方法不会被阻塞,而是返回一个异步对象,并为异步对象添加一个监听器,处理服务端返回的响应结果。
客户端发送到服务端的拉取请求可能包含多个分区,那么拉取请求的响应也会包含多个分区的数据。在处理拉取请求响应时会按分区整理数据。添加到全局变量:completedFetches(ConcurrentLinkedQueue<CompletedFetch>)中。
最终返回给消费者的消息格式是:Map<TopicPartition,List<ConsumerRecord>>。
拉取请求响应结果类图:
客户端调用fetchedRecords方法获取拉取请求拉取的结果。
public Map<TopicPartition,List<ConsumerRecord<K, V>>> fetchedRecords() {
// <分区信息,list<ConsumerRecord>>
Map<TopicPartition,List<ConsumerRecord<K, V>>> drained = new HashMap<>();
// maxPollRecords在max.poll.records中设置值,返回给客户端的条数
// completedFetches可能拉取保存了全部的信息,但是在客户端真正消费消息的时候,一次可能只消费其中的一部分,可以根据客户端的性能来配置
int recordsRemaining = maxPollRecords;
while (recordsRemaining > 0) {
if (nextInLineRecords == null || nextInLineRecords.isEmpty()) {
CompletedFetch completedFetch = completedFetches.poll(); // 当一个 nextInLineRecords 处理完,就从completedFetches处理下一个完成的Fetch请求
if (completedFetch == null)
break;
nextInLineRecords = parseFetchedData(completedFetch); // 解析拉取到的数据
} else {
recordsRemaining -= append(drained, nextInLineRecords, recordsRemaining);
}
}
return drained;
}
private int append(Map<TopicPartition,List<ConsumerRecord<K, V>>> drained, PartitionRecords<K, V> partitionRecords, int maxRecords) {
if (partitionRecords.isEmpty())
return 0;
if (!subscriptions.isAssigned(partitionRecords.partition)) { // 判断这个分区是不是分配给这个消费者(有可能分配的分区发生了变化)
log.debug("Not returning fetched records for partition {} since it is no longer assigned", partitionRecords.partition);
} else {
long position = subscriptions.position(partitionRecords.partition); // 获取分区的拉取偏移量
if (!subscriptions.isFetchable(partitionRecords.partition)) { // tp不能消费了,比如调用 pause log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", partitionRecords.partition);
} else if (partitionRecords.fetchOffset == position) { // 判断服务器返回消息中的拉取偏移量和消费端本地保存的拉取偏移量是否一样。也就是拉取是按顺序拉的
// 获取该 tp 对应的records,并更新 partitionRecords 的 fetchOffset(用于判断是否顺序) List<ConsumerRecord<K, V>> partRecords = partitionRecords.take(maxRecords);
// 计算出下一次发送拉发请求时的拉取偏移量
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
List<ConsumerRecord<K, V>> records = drained.get(partitionRecords.partition); // 正常情况下,一个node只会发送一个request,一般只会有一个
if (records == null) {
records = partRecords;
drained.put(partitionRecords.partition, records);
} else {
records.addAll(partRecords);
}
// 更新订阅状态分区状态的拉取偏移量,即positiion变量 subscriptions.position(partitionRecords.partition, nextOffset);
return partRecords.size();
} else {
}
}
partitionRecords.discard();
return 0;
}
处理拉取请求响应数据:
上面说了在一次轮询中会发送两次拉取请求,虽然第二次发送拉取请求后是无阻塞的快速轮询,但是第二次请求也有可能会立即产生响应结果,先于第一次的拉取请求的结果回来。这样就会对第一次的响应结果产生影响。结合上面的代码(append方法)中会更新消费者订阅状态中拉取偏移量的值。那么如果第二次拉取请求的结果先于第一次回来,更新拉取偏移量。等第一次拉取请求的结果回来后,也会去更新订阅状态中拉取偏移量。这次拉取偏移量就会更新成老的拉取位置。等第三次再发起拉取请求时,就会使用老的拉取位置去拉取数据,这样就会出现消费重复数据。所以为了保证同一次轮询里面两个拉取请求的结果数据不会互相混淆,必须确保第一个请求获取到结果后,才允许发送第二个请求。
那么拉取器是怎么防止这点的喃?
通过上面的流程图就要看出,kafka的解决办法很简单(这就是代码的艺术)。重点就是判断临时变量是否有数据,如果有数据,就是上一次发送拉取请求的结果回来了,就可以发送下一次的拉取请求了。全局变量的设计也是一个关键点:队列。在全局变量队列中的数据就可以慢慢被客户端消费,因为后面的拉取请求一定会是在队列的后面(FIFO),前一次拉取请求的结果一定会在后面。这样就不会乱序。好巧妙的设计!
将全局变量添加到临时变量返回给客户端的时候,kafka做了一个限制,添加到临时变量里面的只是一部分消息,这是因为要保证客户端的处理性能。假如把全局变量的数据全部返回给客户端,客户端的处理性能跟不上,就会影响客户端下一次去轮询数据(消费者客户端是自己控制轮询)。这样会让服务端认为客户端宕机,把客户端断掉。后面客户端就会重新连接服务端。影响整体性能。fetchedRecords方法中使用的maxPollRecords变量就是控制返回的消息个数。
public Map<TopicPartition,List<ConsumerRecord<K, V>>> fetchedRecords() {
// <分区,list<ConsumerRecord>>
Map<TopicPartition ,List<ConsumerRecord<K, V>>> drained = new HashMap<>();
// maxPollRecords在max.poll.records中设置值,返回给客户端的条数
// completedFetches可能拉取保存了全部的信息,但是在客户端真正消费消息的时候,一次可能只消费其中的一部分,可以根据客户端的性能来配置
int recordsRemaining = maxPollRecords;
while (recordsRemaining > 0) {
if (nextInLineRecords == null || nextInLineRecords.isEmpty()) {
CompletedFetch completedFetch = completedFetches.poll(); // 当一个 nextInLineRecords 处理完,就从 completedFetches 处理下一个完成的 Fetch 请求
if (completedFetch == null)
break;
nextInLineRecords = parseFetchedData(completedFetch); // 解析拉取到的数据
} else {
recordsRemaining -= append(drained, nextInLineRecords, recordsRemaining);
}
}
return drained;
}
全局变量包括了分配给消费者所有分区的数据,而每个分区可能需要调用多次获取记录的方法才会全部返回,kafka限制了如果分区在全局变量中或者是在正在处理的记录集中,那么在创建下一次的拉取请求时就不会把此分区加入到拉取请求的分区中。
private List<TopicPartition> fetchablePartitions() {
// 分配给消费者的所有分区(在订阅状态中获取)
List<TopicPartition> fetchable = subscriptions.fetchablePartitions();
if (nextInLineRecords != null && !nextInLineRecords.isEmpty())
// 在正在处理的记录集对应的分区--移除
fetchable.remove(nextInLineRecords.partition);
for (CompletedFetch completedFetch : completedFetches)
// 在完成拉取全局变量中但是还没有被消费的分区--移除 fetchable.remove(completedFetch.partition);
return fetchable;
}
参考资料:
Kafka技术内幕:图文详解Kafka源码设计与实现