绍圣--kafka之消费者(二)

上回消费者已经把拉取消息之前的准备工作已经完成了,接下来就进行消息的拉取了。

轮询流程

首先看看最外层的poll方法

KafkaConsumer.poll

public ConsumerRecords<K, V> poll(long timeout) {

acquire();

try {

long start = time.milliseconds(); // 开始时间

long remaining = timeout; // 剩余时间

// 客户端调用一次poll方法会调用多次的pollOnce方法

// 目的是为了一次调用尽可能的获取到消息进行消费

do {

Map<TopicPartition,List<ConsumerRecord<K, V>>> records = pollOnce(remaining);

if (!records.isEmpty()) {

// 只要有结果就立即返回,不要再等待

// 在返回数据之前,发送下次的拉取(fetch)请求,避免用户在下次获取数据时block,从而更快的返回数据

fetcher.sendFetches();

client.pollNoWakeup(); // 无阻赛的轮询

if (this.interceptors == null)

return new ConsumerRecords<>(records);

else

return this.interceptors.onConsume(new ConsumerRecords<>(records));

}

long elapsed = time.milliseconds() - start; // 一个轮询消耗的时间

remaining = timeout - elapsed; // 更新剩余时间,elapsed的值如果一直没有拉取到数据的情况下只会越来越大

} while (remaining > 0);

return ConsumerRecords.empty();

} finally {

release();

}

}

客户端轮询中会指定一个最长的等待时间,如果超过最长等待时间,还没有拉取到任何数据,就会返回一个空的集合。最长等待时间保证了在这段时间内如果没有数据就会一直去拉取数据,直到超时,如果最长等待时间内某一次拉取到数据,就马上返回。

客户端在每次轮询拉取到数据后,会马上发送一个新的拉取请求,并通过网络轮询把新的拉取请求发送出去后再返回拉取到的数据给客户端。这样一次轮询就会发送两次拉取请求,就会产生新请求的结果,新请求的结果会留给下一次轮询来获取。当前轮询只会获取和处理前一个请求的结果。那么一次轮询中发送的第二个拉取请求,必须满足以下条件:

第一:不能影响现有的主流程,因为要返回拉取的数据给客户端。

第二:不能影响第一次拉取的数据,因为发送了两次拉取请求。第二次的拉取请求的结果不能影响第一次的拉取数据。

基于以上的两个条件:kafka是怎么做的喃?

第一:发送拉取请求后返回的是一个异步请求对象,调用方法会立即返回主流程。因此不会影响主流程。

第二:快速轮询将新的拉取请求发送出去,不会等待获取响应结果(无阻塞)。所以不会去影响第一次拉取的数据返回给客户端。快速轮询:网络层轮询-->选择器轮询-->java的selectNow()。

每次的轮询都会提前发送下一个拉取请求(除了第一个轮询)。一个完整的:发送请求,轮询,获取结果的流程分开在两次轮询中。比如:第二个拉取请求的发送和轮询发生在第一次的轮询里面,而获取第二个请求的结果在第二次轮询里面。

拉取消息

客户端必须和分区的主副本节点进行交互。为了减少客户端和服务端的网络交互次数。客户端不以分配到的分区作为粒度来和服务费进行交互。而是以服务端节点为粒度来创建拉取请求。把各个分区的主副本节点在相同节点上的封装成一个拉取请求,发送到该节点所在的服务端。

一个拉取请求需要指定从分区的什么位置开始拉取消息(消费者的订阅状态(SubscriptionState)保存了分配给消费者所有分区的拉取偏移量),再从集群的元数据信息中获取分区主副本节点信息,就可以构建拉取请求向目标节点发送拉取请求了。

发送拉取请求

发送拉取请求由拉取器(Fetcher)来完成。

public void sendFetches() {

// 创建拉取请求,每一个节点都对应一个请求

for (Map.Entry<Node,FetchRequest> fetchEntry : createFetchRequests().entrySet()) {

final FetchRequest request = fetchEntry.getValue();

final Node fetchTarget = fetchEntry.getKey();

// 每个拉取请求的key表示请求发送的目标节点

// 发送拉取请求,并设置相应的Listener

// 请求处理成功,按照tp维度加入completedFetches集合

client.send(fetchTarget, ApiKeys.FETCH, request) .addListener(new RequestFutureListener<ClientResponse>() {

/** * 拉取器处理拉取结果 */

@Override public void onSuccess(ClientResponse resp) {

FetchResponse response = new FetchResponse(resp.responseBody());

if (!matchesRequestedPartitions(request, response)) { return; }

Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet()); FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);

// 因为拉取请求包括多个分区,所以返回结果也有多个分区

for (Map.Entry<TopicPartition,FetchResponse.PartitionData> entry : response.responseData().entrySet()) {

TopicPartition partition = entry.getKey();

// 订阅状态中保存的拉取偏移量

long fetchOffset = request.fetchData().get(partition).offset;

FetchResponse.PartitionData fetchData = entry.getValue();

// 把完成拉取的请求响应添加到队列中,以便在下一次客户端轮询中获取拉取消息 completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator));

}

sensors.fetchLatency.record(resp.requestLatencyMs()); sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());

}

@Override

public void onFailure(RuntimeException e) {

log.debug("Fetch request to {} failed", fetchTarget, e);

}

});

}

}

// 通过客户端轮询把拉取请求发送出去

client.poll(pollTimeout, now, new PollCondition() {

@Override

public boolean shouldBlock() {

// 是否有完成的拉取请求

return !fetcher.hasCompletedFetches();

}

});

拉取请求的发送采用异步和轮询的方式,client.send方法不会被阻塞,而是返回一个异步对象,并为异步对象添加一个监听器,处理服务端返回的响应结果。

客户端发送到服务端的拉取请求可能包含多个分区,那么拉取请求的响应也会包含多个分区的数据。在处理拉取请求响应时会按分区整理数据。添加到全局变量:completedFetches(ConcurrentLinkedQueue<CompletedFetch>)中。

最终返回给消费者的消息格式是:Map<TopicPartition,List<ConsumerRecord>>。

拉取请求响应结果类图:


拉取请求响应结果类图

客户端调用fetchedRecords方法获取拉取请求拉取的结果。

public Map<TopicPartition,List<ConsumerRecord<K, V>>> fetchedRecords() {

// <分区信息,list<ConsumerRecord>>

Map<TopicPartition,List<ConsumerRecord<K, V>>> drained = new HashMap<>();

// maxPollRecords在max.poll.records中设置值,返回给客户端的条数

// completedFetches可能拉取保存了全部的信息,但是在客户端真正消费消息的时候,一次可能只消费其中的一部分,可以根据客户端的性能来配置

int recordsRemaining = maxPollRecords;

while (recordsRemaining > 0) {

if (nextInLineRecords == null || nextInLineRecords.isEmpty()) {

CompletedFetch completedFetch = completedFetches.poll(); // 当一个 nextInLineRecords 处理完,就从completedFetches处理下一个完成的Fetch请求

if (completedFetch == null)

break;

nextInLineRecords = parseFetchedData(completedFetch); // 解析拉取到的数据

} else {

recordsRemaining -= append(drained, nextInLineRecords, recordsRemaining);

}

}

return drained;

}

private int append(Map<TopicPartition,List<ConsumerRecord<K, V>>> drained, PartitionRecords<K, V> partitionRecords, int maxRecords) {

if (partitionRecords.isEmpty())

return 0;

if (!subscriptions.isAssigned(partitionRecords.partition)) { // 判断这个分区是不是分配给这个消费者(有可能分配的分区发生了变化)

log.debug("Not returning fetched records for partition {} since it is no longer assigned", partitionRecords.partition);

} else {

long position = subscriptions.position(partitionRecords.partition); // 获取分区的拉取偏移量

if (!subscriptions.isFetchable(partitionRecords.partition)) { // tp不能消费了,比如调用 pause log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", partitionRecords.partition);

} else if (partitionRecords.fetchOffset == position) { // 判断服务器返回消息中的拉取偏移量和消费端本地保存的拉取偏移量是否一样。也就是拉取是按顺序拉的

// 获取该 tp 对应的records,并更新 partitionRecords 的 fetchOffset(用于判断是否顺序) List<ConsumerRecord<K, V>> partRecords = partitionRecords.take(maxRecords);

// 计算出下一次发送拉发请求时的拉取偏移量

long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;

List<ConsumerRecord<K, V>> records = drained.get(partitionRecords.partition); // 正常情况下,一个node只会发送一个request,一般只会有一个

if (records == null) {

records = partRecords;

drained.put(partitionRecords.partition, records);

} else {

records.addAll(partRecords);

}

// 更新订阅状态分区状态的拉取偏移量,即positiion变量 subscriptions.position(partitionRecords.partition, nextOffset);

return partRecords.size();

} else {

}

}

partitionRecords.discard();

return 0;

}

处理拉取请求响应数据:

处理拉取请求响应数据

上面说了在一次轮询中会发送两次拉取请求,虽然第二次发送拉取请求后是无阻塞的快速轮询,但是第二次请求也有可能会立即产生响应结果,先于第一次的拉取请求的结果回来。这样就会对第一次的响应结果产生影响。结合上面的代码(append方法)中会更新消费者订阅状态中拉取偏移量的值。那么如果第二次拉取请求的结果先于第一次回来,更新拉取偏移量。等第一次拉取请求的结果回来后,也会去更新订阅状态中拉取偏移量。这次拉取偏移量就会更新成老的拉取位置。等第三次再发起拉取请求时,就会使用老的拉取位置去拉取数据,这样就会出现消费重复数据。所以为了保证同一次轮询里面两个拉取请求的结果数据不会互相混淆,必须确保第一个请求获取到结果后,才允许发送第二个请求。

那么拉取器是怎么防止这点的喃?


消费客户端轮询获取拉取请求的结果

通过上面的流程图就要看出,kafka的解决办法很简单(这就是代码的艺术)。重点就是判断临时变量是否有数据,如果有数据,就是上一次发送拉取请求的结果回来了,就可以发送下一次的拉取请求了。全局变量的设计也是一个关键点:队列。在全局变量队列中的数据就可以慢慢被客户端消费,因为后面的拉取请求一定会是在队列的后面(FIFO),前一次拉取请求的结果一定会在后面。这样就不会乱序。好巧妙的设计!

将全局变量添加到临时变量返回给客户端的时候,kafka做了一个限制,添加到临时变量里面的只是一部分消息,这是因为要保证客户端的处理性能。假如把全局变量的数据全部返回给客户端,客户端的处理性能跟不上,就会影响客户端下一次去轮询数据(消费者客户端是自己控制轮询)。这样会让服务端认为客户端宕机,把客户端断掉。后面客户端就会重新连接服务端。影响整体性能。fetchedRecords方法中使用的maxPollRecords变量就是控制返回的消息个数。

public Map<TopicPartition,List<ConsumerRecord<K, V>>> fetchedRecords() {

// <分区,list<ConsumerRecord>>

Map<TopicPartition ,List<ConsumerRecord<K, V>>> drained = new HashMap<>();

// maxPollRecords在max.poll.records中设置值,返回给客户端的条数

// completedFetches可能拉取保存了全部的信息,但是在客户端真正消费消息的时候,一次可能只消费其中的一部分,可以根据客户端的性能来配置

int recordsRemaining = maxPollRecords;

while (recordsRemaining > 0) {

if (nextInLineRecords == null || nextInLineRecords.isEmpty()) {

CompletedFetch completedFetch = completedFetches.poll(); // 当一个 nextInLineRecords 处理完,就从 completedFetches 处理下一个完成的 Fetch 请求

if (completedFetch == null)

break;

nextInLineRecords = parseFetchedData(completedFetch); // 解析拉取到的数据

} else {

recordsRemaining -= append(drained, nextInLineRecords, recordsRemaining);

}

}

return drained;

}

全局变量包括了分配给消费者所有分区的数据,而每个分区可能需要调用多次获取记录的方法才会全部返回,kafka限制了如果分区在全局变量中或者是在正在处理的记录集中,那么在创建下一次的拉取请求时就不会把此分区加入到拉取请求的分区中。

private List<TopicPartition> fetchablePartitions() {

// 分配给消费者的所有分区(在订阅状态中获取)

List<TopicPartition> fetchable = subscriptions.fetchablePartitions();

if (nextInLineRecords != null && !nextInLineRecords.isEmpty())

// 在正在处理的记录集对应的分区--移除

fetchable.remove(nextInLineRecords.partition);

for (CompletedFetch completedFetch : completedFetches)

// 在完成拉取全局变量中但是还没有被消费的分区--移除 fetchable.remove(completedFetch.partition);

return fetchable;

}

参考资料:

Kafka技术内幕:图文详解Kafka源码设计与实现

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容

  • 此篇开始进入kafka的另外一侧:消费者。kafka中的消费者比生产者要复杂的多,里面涉及到的消费组,偏移量等概念...
    绍圣阅读 1,894评论 0 0
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,580评论 18 139
  • 学习kafka有一段时间了。关于它里面的知识还是需要总结一下,一来是能让自己对kafka能有一个比较成型的理解,二...
    绍圣阅读 1,070评论 0 3
  • 话说上回中,KafkaProducer已经将生产的记录追加到了RecordAccumulator中。那么接下来的事...
    绍圣阅读 892评论 2 1
  • kafka的定义:是一个分布式消息系统,由LinkedIn使用Scala编写,用作LinkedIn的活动流(Act...
    时待吾阅读 5,291评论 1 15