7.1 分区分配策略
在 3.1 节中讲述了消费者与消费组的模型,并且在默认分区分配策略的背景下通过案例进 行了具体的分析。 Kafka 提供了消费者客户端参数 partition . assignrr阳1t . strategy 来设 置消费者与订阅主题之间的分区分配策略。默认情况下,此参数的值为 org.apache.kafka. clients.consumer.RangeAssignor,即采用 RangeAssignor分配策略。除此之外, Kafka还提供了另 外两种分配策略: RoundRobinAssignor 和 StickyAssignor。 消费者客户端参数 partitio口- assignment.strategy 可以配置多个分配策略,彼此之间以逗号分隔。
7.1 .1 RangeAssignor 分配策略
RangeAssignor 分配策略的原理是按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配, 以保证 分区尽可 能均匀地分配 给所 有的消费者 。 对于每一个主题 , RangeAssignor 策略会将消费组内所有订阅这个主题的消费者按照名称的字典序排 序 , 然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会 被多分配一个分区。假设 n=分区数/消费者数量, m=分区数%消费者数量,那么前 m 个消费者每个分配 n+l 个 分区,后面的(消费者数量-m)个消费者每个分配 n个分区。
除了第 3.1 节的示例,为了更加通俗地讲解 RangeAssignor 策略 , 我们不妨再举一些示例。 假设消费组内有 2个消费者 co和 Cl,都订阅了主题 tO和 tl,并且每个主题都有 4个分区, 那 么订阅的所有分区可以标识为 : tOpO、 tOpl、 t0p2、 t0p3、 tlpO、 tlp1、 tlp2、 tlp3。最终的分配 结果为 :
消费者 CO: tOpO、 tOpl, t lpO、 tlpl
消货者 Cl : t0p2、 t0p3, tlp2、 tlp3
这样分配得很均匀,那么这个分配策略能够一直保持这种良好的特性吗?我们不妨再来看 另一种情况。假设上面例子中 2个主题都只有 3个分区,那么订阅的所有分区可以标识为: tOpO、 tOpl、 t0p2、 tlpO、 tlp 1、 tlp2。最终的分配结果为 :
消费者 CO: tOpO、 tOpL tlpO、 tlpl
消费者 Cl: t0p2, tlp2
可以明显地看到这样的分配并不均匀,如果将类似的情形扩大, 则有可能出现部分消费者 过载的情况。对此我们再来看另一种 RoundRobinAssignor策略的分配效果如何。
7.1.2 RoundRobinAssignor 分配策略
RoundRobinAssignor 分配策略的原理是将消费组内所有消费者及消费者订阅的所有主题的分 区按照字典序 排序,然后通过轮询方式逐 个将分区依次分配给每个消费者。 RoundRobinAssignor 分配策略对应的 partition.assignment.strategy 参数值为 org.apache.kafka.clients.consumer.RoundRobinAssignor。
如果同一个消费组内所有的消费者的订阅信息都是相同的,那么 RoundRobinAssignor分配 策略 的分区分配会是均匀的。举个例子,假设消费组中有 2 个消费者 co 和 Cl,都订阅了主题 tO 和 tl,并且每个主题都有 3 个分区 , 那么订阅的所有分区可以标识为: tOpO、 tOpl、 t0p2、 tlpO、 tlpl、 tlp2。最终的分配结果为 :
消费者 CO: tOpO、 t0p2, tlpl
消费者 Cl: tOpl, tlpO、 tlp2
如果同一个消费组内的消费者订阅的信息是不相同的,那么在执行分区分配的时候就不是 完全的轮询分配,有可能导致分区分配得不均匀。如果某个消费者没有订阅消费组内的某个主 题,那么在分配分区的时候此消费者将分配不到这个主题的任何分区。
举个例子,假设消费组内有 3个消费者 CCO、 Cl 和 C刀,它们共订阅了 3个主题(tO、 tl、 t刀,这 3个主题分别有 l、 2、 3个分区,即整个消费组订阅了 tOpO、 tlpO、 tlpl、 t2p0、 t2pl、 t2p2这6个分区。 具体而言,消费者co订阅的是主题tO,消费者Cl订阅的是主题tO和tl. 消费者 C2 订阅的是主题 tO、 ti 和 t2, 那么最终的分配结果为 :
消费者CO: 消费者Cl: 消费者C2:
tOpO
tlpO
tlpL t2p0、 t2pL t2p2
可以看到 RoundRobinAssignor策略也不是十分完美,这样分配其实并不是最优解,因为完 全可以将分区 tlpl 分配给消费者 Cl。
7.1.3 StickyAssignor分配策略
我们再来看一下 StickyAssignor分配策略,“sticky”这个单词可以翻译为“黠性的”, Kafka从 0.11.x 版本开始引入这种分配策略,它主要有两个目的 :
Cl )分区的分配要尽可能均匀 。
(2)分区的分配尽可能与上次分配的保持相同。
当两者发生冲突时,第 一个目标优先于第二个目标 。 鉴于这两个目标 , StickyAssignor 分配 策略的具体实现要比 RangeAssignor 和 RoundRobinAssignor 这两种分配策略要复杂得多 。 我们 举例来看一下 StickyAssignor 分配策略的实际效果 。
假设消费组内有3个消费者 (CO、 Cl和C2),它们都订阅了4个主题 (tO、 tl、 t2、 t3) ' 并且每个主题有 2 个 分区 。 也就是说,整个消费组订阅了 tOpO、 tOpl、 tlpO、 tlpl、 t2p0、 t2pl、 t3p0、 t3pl这8个分区。 最终的分配结果如下:
这样初看上去似乎与采用 RoundRobinAssignor分配策略所分配的结果相同,但事实是否真 的如此呢?再假设此时消费者 Cl 脱离了消费组,那么消费组就会执行再均衡操作,进而消费 分区会重新分配。如果采用 RoundRobinAssignor分配策略,那么此时的分配结果如下:
可以看到分配结果中保留了上一次分配中对消费者 co和 C2 的所有分配结果,并将原来消 费者 Cl 的“ 负担”分配给了剩余的两个消费者 co 和 C2, 最终 co 和 C2 的分配还保持了均衡 。
如果发生分区重分配,那么对于同一个分区而言,有可能之前的消费者和新指派的消费者 不是同一个,之前消费者进行到 一半的处理还要在新指派的消费者中再次复现一遍,这显然很 琅费系统资源。 StickyAssignor 分配策略如同其名称中的“ sticky” 一样,让分配策略具备一定 的“勤性”,尽可能地让前后两次分配相同,进而减少系统资源的损耗及其他异常情况的发生 。
到目前为止,我们分析的都是消 费者的订阅信息都是相同的情况,我们来看一下订阅信息 不同的情况下的处理。
举个例子,同样消费组内有3个消费者 CCO、 Cl和C2), 集群中有3个主题 CtO、 tl和 t2),这 3 个主题分别有 1、 2、 3 个分区 。 也就是说, 集群中有 tOpO、 tlpO、 tlpl、 t2p0、 t2pl、 t2p2这6个分区。 消费者co订阅了主题tO,消费者Cl订阅了主题tO和tl,消费者C2订阅了 主题tO、 ti和t2。
如果此时采用 RoundRobinAssignor分配策略,那么最终的分配结果如分配清单 7-1 所示(和 讲述 RoundRobinAssignor分配策略时的一样,这样不妨赘述一下):
可以看到这才是一个最优解(消费者 co 没有订阅主题 tl 和 t2,所以不能分配主题 tl 和 t2 中的任何分区给它,对于消费者 Cl 也可同理推断〉 。
StickyAssignor分配策略比另外两者分配策略而言显得更加优异,这个策略的 代码实现也异常复杂 ,
7.1.4 自定义分区分配策略
读者不仅可以任意选用 Kafka提供的 3种分配策略, 还可以自定义分配策略来实现更多可 选的功能 。自定义 的 分配策略 必 须要实 现 org.apache.kafka.clients.consumer.intemals. PartitionAssignor接口。 PartitionAssignor接口的定义如下:
Subscription subscription(Set<String> top工cs); String name() ;
PartitionAssignor接口中定义了两个内部类: Subscription和 Assignment。
Subscription 类用来表示消费者 的订阅 信息,类中有两 个属性: topics 和 userData,分 别表示消费者 的订阅主题列表和用户自 定义信息。 PartitionAssignor 接口通过 subscription()方法 来设置消费者自身相关的 Subscription 信 息,注意到此方法中只有 一 个参数 topics , 与 Subscription 类中的 topics 的相呼应,但并没有体现有关 userData 的参数 。 为了增强用户 对分配结果的控制,可以在 subscription()方法内部添加 一 些影 响 分配的用户自定义信息赋予 userData,比如权重、 IP 地址 、 host 或机架 Crack)等 。
举个例子,在 subscription()方法 中提供机架 信息 ,标识此消费者所部署的机架位置,在分 区分配时可以根据分区的 leader 副本所在的机架位置来实施具体的分配,这样可以让消费者与 所需拉取消息的 broker 节点处于同 一机架 。参考图 7-1, 消费者 consumer!和 brokerl 都部署在 机架 rackl 上,消 费者 consumer2 和 broker2 都部署在机架 rack2 上 。 如果分区的分配不是机架 感知的,那么有可能与图 7”1 (上半部分)中的分配结果一样, consumerl 消费 broker2 中的分 区,而 consumer2 消费 brokerl 中的分区 ; 如果分区的分配是机架感知的 , 那么就会出现图 7-1(下半部分〉的分配结果, consumer!消 费 broker! 中的分区,而 consumer2 消费 broker2 中的 分区,这样相 比前一种情形,既可以减少消费延时,又可以减少跨机架带宽的占用 。
再来说一下 Assignment类,它用来表示分配结果信息, 类中也有两个属性: partitions 和 userData, 分别表示所分配到的分区集合和用户自定义的数据 。 PartitionAssignor 接口中的 onAssignment()方法是在每个消费者收到消费组 leader 分配结果时的回调函数,例如在 StickyAssignor 分配策略中就是通过这个方法保存当前的分配方案,以备在下次消费组再均衡
(rebalance)时可以提供分配参考依据 。
接口中的 name()方法用来提供分配策略的名称,对 Kafka 提供的 3 种分配策略而言, RangeAssignor 对应的 protocol_name 为“ range”, RoundRobinAssignor 对应的 protocol name 为“ roundrobin”, StickyAssignor 对应的 protocol_name 为“ sticky”,所以自定义的分配策略 中 要注意命名的时候不要与己存在的分配策略发生冲突。这个命名用来标识分配策略的名称, 在后面所描述的加入消费组及选举消费组 leader 的时候会有涉及 。
真正的分区分配方案的实现是在 assign()方法中,方法中的参数 metadata表示集群的元数据 信息,而 subscriptions 表示消费组内各个消费者成员的订阅信息,最终方法返 回各个消费者的 分配信息。
Kafka 还提供了一个抽象类 org.apache.kafka.clients.consumer.intemals.AbstractPartitionAssignor, 它可以简化实现 PartitionAssignor 接口的工作,井对 assign()方法进行了详细实现, 其中会将 Subscription中的 us巳rData信息去掉后再进行分配。 Kafka提供的 3种分配策略都继承自这个抽 象类 。 如果开发人员在自定义分区分配策略时需要使用 userData 信息来控制分区分配的结果, 那么就不能直接继承 AbstractPartitionAssignor 这个抽象类,而需 要直接实现 PartitionAssignor 接口 。
下面笔者参考 Kafka 的 RangeAssignor 分配策略来自定义 一个随机 的分配策略,这里笔者称 之为 RandomAssignor,具体代码实现如下:
package chapter7;
import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
/**
*/
public class RandomAssignor extends AbstractPartitionAssignor {
@Override
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) {
Map<String, List<String>> consumersPerTopic =
consumersPerTopic(subscriptions);
Map<String, List<TopicPartition>> assignment = new HashMap<>();
for (String memberId : subscriptions.keySet()) {
assignment.put(memberId, new ArrayList<>());
}
//针对每一个主题进行分区分配
for (Map.Entry<String, List<String>> topicEntry :
consumersPerTopic.entrySet()) {
String topic = topicEntry.getKey();
List<String> consumersForTopic = topicEntry.getValue();
int consumerSize = consumersForTopic.size();
Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
if (numPartitionsForTopic == null) {
continue;
}
//当前主题下的所有分区
List<TopicPartition> partitions =
AbstractPartitionAssignor.partitions(topic,
numPartitionsForTopic);
//将每个分区随机分配给一个消费者
for (TopicPartition partition : partitions) {
int rand = new Random().nextInt(consumerSize);
String randomConsumer = consumersForTopic.get(rand);
assignment.get(randomConsumer).add(partition);
}
}
return assignment;
}
@Override
public String name() {
return "name";
}
private Map<String,List<String>> consumersPerTopic(Map<String,Subscription> consumerMetadata){
Map<String, List<String>> res = new HashMap<>();
for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {
String consumerId = subscriptionEntry.getKey();
for (String topic : subscriptionEntry.getValue().topics()) {
put(res, topic, consumerId);
}
}
return res;
}
}
在使用时, 消费者客户端需要添加相 应 的 Properties 参数 , 示例如 下 :
properties .put(ConsumerConfig . PARTITION ASSIGNMENT STRATEGY CONFIG,
RandomAssignor .cl ass.getName ());
这里只是演示如何自定义实现一个分区分配策略, RandomAssignor 的实现并不是特别理 想, 并不见得会比 Kafka 自身提供的 RangeAssignor之类的策略要好。
在第3章中陈述了一个事实: 按照Kafka默认的消费逻辑设定, 一个分区只能被同一个消 费组(ConsumerGroup) 内的一个消费者消费。 但这一设定不是绝对的,我们可以通过自定义 分区分配策略使一个分区可以分配给多个消费者消费。
考虑一种极端情况, 同一消费组 内的任意消费者都可以消费订阅主题的所有分区, 从而实 现了一种“组内广播(消费)”的功能。 针对第3章中图3-4的7个分区和3个消费者的情形,
如果采用组内广播的分配策略 , 那么就会变成图 7-2 中的这种分配结果。
import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
import org.apache.kafka.common.TopicPartition;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class BroadcastAssignor extends AbstractPartitionAssignor {
@Override
public String name() {
return "broadcast";
}
private Map<String, List<String>> consumersPerTopic(
Map<String, Subscription> consumerMetadata) {
//(具体实现请参考 RandomAssignor 中的 consumersPerTopic()方法)
return null;
}
@Override
public Map<String, List<TopicPartition>> assign(
Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
Map<String, List<String>> consumersPerTopic =
consumersPerTopic(subscriptions);
Map<String, List<TopicPartition>> assignment = new HashMap<>();
//Java8
subscriptions.keySet().forEach(memberId ->
assignment.put(memberId, new ArrayList<>()));
//针对每一个主题,为每一个订阅的消费者分配所有的分区
consumersPerTopic.entrySet().forEach(topicEntry->{
String topic = topicEntry.getKey();
List<String> members = topicEntry.getValue();
Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
if (numPartitionsForTopic == null || members.isEmpty())
return;
List<TopicPartition> partitions = AbstractPartitionAssignor
.partitions(topic, numPartitionsForTopic);
if (!partitions.isEmpty()) {
members.forEach(memberId ->
assignment.get(memberId).addAll(partitions));
}
});
return assignment;
}
}
注意组内广播的这种实现方式会有一个严重的问题一一默认的消费位移的提交会失效。所 有的消费者都会提交它自身的消费位移到 consumer_offsets 中 , 后提交的消费位移会覆盖前面 提交的消费位移。
假设消费者 consumerl 提交了分区 tpO 的消 费位移为 10, 这时消费者 consumer2 紧接着提 交了同一分区 tpO 的消费位移为 12,如果此时消费者 consumer!由于某些原因重启了 ,那么 consum巳rl 就会从位移 12 之后重新开始消费,这样 consumer! 就丢失了部分消息。
再考虑另一种情况,同样消费者 consumerl 提交了分区 tpO 的消费位移为 10, 这时消费者 consumer2 紧接着提交了同 一分区的消费位移为 8,如果此时消费者 consumerl 由于某些原因重 启了,那么 consumerl 就会从位移 8 之后重新开始消费,这样 consumerl 就重复消费了消息。 很多情形下 , 重复消费少量消息对于上层业务应用来说可以忍受。但是设想这样一种情况 , 消 费组 内的消 费者对于分区 tpO 的 消费位移都在 100000 之后了, 此时 又有一 个新的消 费者 consumer3 加入进来,消费了部分消息之后提交了 tpO 的消费位移为 9, 那么此时原消费组内的 任何消 费者重启都会从这个消 费位移 9 之后再开始重新消费 ,这样大量的重复消息会让上层业 务应用猝不及防,同样会造成计算资源的浪费 。
针对上述这种情况,如果要真正实现组内广播,则需要自己保存每个消费者的消费位移 。 笔者的实践经验是,可以通过将消费位移保存到本地文件或数据库中等方法来实现组内广播的 位移提交。
7.2 消费者协调器和组协调器
了解了 Kafka 中消费者的分区分配策略之后是否会有这样的疑问:如果消费者客户端中配 置了两个分配策略,那么以哪个为准呢?如果有 多个消费者,彼此所配置的分配策略并不完全 相同,那么以哪个为准?多个消费者之间的分区分配是需要协同的,那么这个协同的过程又是 怎样的呢?这一切都是交由消费者协调器( ConsumerCoordinator )和组协调器(GroupCoordinator)来完成的,它们之间使用 一套组协调协议进行交互 。
7.2.1 旧版消费者客户端的问题
消费者协调器和组协调器的概念是针对新版的消费者客户端而言的, Kafka 建立之初并没 有它们。旧版的消费者客户端是使用 ZooKeeper 的监听器( Watcher〕来实现这些功能的。
每个消费组(<group>)在 ZooKeeper 中都维护了 一个/ consumers/<group>/ids 路径, 在此路径下使用临时节点记录隶属于此消费组的消费者的唯 一标识( consumerldString) , consumerldString由消费者启动时创建。消费者的唯一标识由 consumer.id+主机名+时间戳+UUID 的部分信息构成,其中 consumer.id 是旧版消 费 者客户端中的配置,相当于新版客户端中的 client.id。比如某个消费者的唯一标识为 consumerld localhost-1510734527562-64b377f5, 那么其中 consumerId 为指定的 consumer.id, localhost 为计算机的主机名, 1510734527562 代表时间戳,而 64b377f5 表示 UUID 的部分信息 。
参考图 7-4,与/consumers/<group>/工ds 同级的还有两个节点: owners 和 offsets, /consumers/<group>/owner 路径下记录了分区和消费者的对应关系,/ consumers/ <group>/offsets 路径下记录了 此消 费组在分区中对应的消费位移。
每个 broker、 主题和分区在 ZooKeeper 中也都对应一个路径 : /brokers/ids/<id>记录 了 host、 port及分配在此 broker上的主题分区列表; /brokers/topics/<topic>记录了每 个分区的 leader 副本、 ISR 集合等信息 。/brokers/topics/<topic>/partitions/ <partition>/state记录了当前 leader副本、 leader epoch等信息。
每个消费者在启动时都会在/ consumers/<group>/ids 和/brokers/ids 路径上注册 一个监听器。当/consumers/<group>/ids 路径下的子节点发生变化时,表示消费组中的消 费者发生了变化;当/ brokers/ids 路径下的子节点发生变化时,表示 broker 出现了增减 。 这 样通过 ZooKeeper所提供的 Watcher, 每个消费者就可以监昕消费组和 Kafka集群的状态了 。
这种方式下每个消费者对 ZooKeeper 的相关路径分别进行监听, 当触发再均衡操作时, 一 个消费组下的所有消费者会同时进行再均衡操作,而消费者之间并不知道彼此操作的结果,这 样可能导致 Kafka工作在一个不正确的状态。 与此同时,这种严重依赖于 ZooKeeper集群的做 法还有两个 比较严重的问题 。
(1)羊群效应(HerdEffect) : 所谓的羊群效应是指 ZooK巳eper中一个被监听的节点变化, 大量的 Watcher 通知被发送到客户端 , 导致在通知期间的其他操作延迟,也有可能发生类似死 锁的情况 。
(2)脑裂问题( Split Brain) : 消费者进行再均衡操作时每个消费者都与 ZooKeeper 进行 通信以判断消费者或 broker变化的情况,由于 ZooKeeper本身的特性,可能导致在同一时刻各 个消 费者获取的状态不一致 , 这样会导致异常 问题 发生。
7.2.2 再均衡的原理
新版的消费者客户端对此进行了重新设计 , 将全部消费组分成多个子集 , 每个消费组的子 集在服务端对应一个 GroupCoordinator 对其进行管理, GroupCoordinator 是 Kafka 服务端 中用于 管理消费组的组件。而消费者客户端中的 ConsumerCoordinator组件负责与 GroupCoordinator进 行交互 。
Consum巳rCoordinator 与 GroupCoordinator 之间最重要 的职 责就是负责执行消费者再 均衡的 操作,包括前面提及的分区分配的工作也是在再均衡期间完成的。就目前而言 , 一共有 如下几 种情形会触发再均衡的操作 :
- 有新的消 费者 加入 消费组。
- 有消费者若机下线 。 消费者并不一定需要真正下线,例如遇到长时间的 GC、网络延 迟导致消费者长时间未向 GroupCoordinator 发送心跳等情况时, GroupCoordinator 会认 为消费者己经下线。
- 有消费者主动退出消费组(发送 LeaveGroupRequest 请求) 。 比如客户端调用了 unsubscrible()方法取消对某些主题 的订阅 。
- 消费组所对应的 GroupCoorinator节点发生了变更。
- 消费组内所订阅的任一主题或者主题的分区数量发生变化。
下面就以一个简单的例子来讲解一下再均衡操作的具体内容 。当有消费者加入消费组时,消费者、消费组及组协调器之间会经历 一 下 几个 阶段。
第一阶段( FIND_COORDINATOR)
消费者需要确定它所属的消费组对应的 GroupCoordinator所在的 brok巳r,并创建与该 broker 相互通信的网络连接 。 如果消 费者 己经保存了与消费组对应的 GroupCoordinator 节点的信息, 并且与它之间的网络连接是正常的,那么就可以进入第二阶段。否则, 就需要向集群中 的某个 节点发送 FindCoordinatorRequest请求来查找对应的 GroupCoordinator, 这里的“某个节点” 并 非是集群中的任意节点,而是负载最小的节点,即 2.2.2 节中的leastLoadedNode。
如图 7-5 所示, FindCoordinatorRequest请求体中只有两个域( Field) : coordinator key 和 coord工nator_typeo coordinator_key 在这里就是消费组的名称 , 即 groupid, coordinator type 置为 0。
Kafka 在收到 FindCoordinatorRequest 请求之后,会根据 coordinator_key (也就是 groupld)查找对应的 GroupCoordinator 节点,如果找到对应的 GroupCoordinator 则会返回其相 对应的 node_id、 host 和 port 信息。
具体查找 GroupCoordinator的方式是先根据消费组 groupid的晗希值计算_consumer_offsets 中的分区编号,具体算法如代码清单 7-1 所示。 :
代码清单 7-1 消费组所对应的分区号的计算方式 Utils.abs(groupid.hashCode ) % groupMetadataTopicPartit 工 onCount
其中 groupid.hashCode 就是使用 Java 中 String 类的 hashCode()方法获得的, groupMetadataTopicPartitio口Cou口t 为主题 consumer一offsets 的分区个数,这个可以 通过 broker端参数 offsets.topic.num.partitions 来配置,默认值为 50。
找到对应的 consumer offsets 中的分区之后,再寻找此分区 leader副本所在的 broker节点, 该 broker节点即为这个 groupld所对应的 GroupCoordinator节点。消费者 groupId最终的分区分 配方案及组内消费者所提交的消费位移信息都会发送给此分区 leader副本所在的 broker节点, 让此 broker节点既扮演 GroupCoordinator 的角色,又扮演保存分区分配方案和组内消费者位移 的角色,这样可以省去很多不必要的中间轮转所带来的开销 。
第二阶段(JOIN GROUP)
在成功找到消费组所对应的 GroupCoordinator 之后就进入加入消费组的阶段,在此阶段的 消费者会向 GroupCoordinator 发送 JoinGroupRequest请求,并处理响应。
如图 7-6所示, JoinGroupR巳quest的结构包含多个域:
- group_id 就是消费组的 id,通常也表示为 groupld。
- sessioηtimout 对应消费端参数 sess工on.timeout.ms,默认值为 10000,即10 秒 。 GroupCoordinator 超过 session_timeout 指 定 的时间内没有收到心跳报文则 认为此消 费者已经下线。
- rebalance timeout 对应消费端参数 max .poll . interval.ms , 默认值 为 300000,即 5 分钟 。表示当消费组再平衡的时候, GroupCoordinator 等待各个消费者 重新加入的最长等待时间 。
- member id 表示 GroupCoordinator 分配给消费者的 id 标识。 消费者第一次发送 JoinGroupRequest请求的时候此字段设置为 nulla
- protocol_type 表示消费组实现的协议,对于消费者而言此字段值为“consumer”。
JoinGroupRequest 中的 group protocols 域为数组类型,其中可以囊括多个分区分配策 略,这个主要取决于消 费者客户端参数 pa:::-titio口 .assignment. strategy 的配置。 如果 配置了多种策略,那么 JoinGroupRequest 中就会包含多个 protocol name 和 protocol metadata。其中 protocol name 对应于 PartitionAssignor 接口中的 name()方法,我们在讲 述消费者分区分配策略的时候提及过相关内容(参考 7.1.4 节) 。 而 protocol metadata 和 PartitionAssignor接口中的 subscription()方法有直接关系, protocol_metadata 是一个 bytes 类型,其实质上还可以更细粒度地划分为 version、 topics 和 user data,如图 7-7 所示。
version 占 2个字节,目前其值固定为 0; topics 对应 PartitionAssignor接口的 subscription() 方法返回值类型 Subscription 中的 topics,代表一个主题列表; user_data 对应 Subscription 中 的 userData,可以为空 。
如果是原有的消费者重新加入消费组,那么在真正发送 JoinGroupRequest 请求之前还要执 行一些准备工作:
(1)如果消费端参数enable.auto.commit设置为true(默认值也为true), 即开启自 动提交位移功能,那么在请求加入消费组之前需要向 GroupCoordinator 提交消费位移。这个过 程是阻塞执行的,要么成功提交消费位移,要么超时。
(2)如果消 费者添加了自定义的再均衡监听器( ConsumerRebalanceListener),那么此时 会调用 onPartitionsRevoked()方法在重新加入消费组之前实施自定义的规则逻辑,比如清除一些 状态,或者提交消费位移 等。
(3)因为是重新加入消费组,之前与 GroupCoordinator节点之间的心跳检测也就不需要了, 所以在成功地重新加入消费组之前 需要禁止 心跳检测的 运作。
消费者在发送 JoinGroupRequest 请求之后会阻塞等待 Kafka 服务端的响应。服务端在收到JainGroupRequest 请求 后会交由 GroupCoordinator 来进行处理 。 GroupCoordinator 首先 会对 JoinGroupRequest 请求做合法性校验,比如 group 工d 是否为空、当前 broker 节点是否是请求 的消费者组所对应的组协调器、 rebalance timeout 的值是否在合理的范围之内。如果消费 者是第一次请求加入消费组,那么 JoinGroupRequest 请求中的 member_id 值为 null,即没有 它自身的唯一标志,此时组协调器负责为此消费者生成一个 member id。这个生成的算法很 简单,具体如以下伪代码所示。
Stringmemberid= clientid+ ”+ UUID.randomUUID().toStr工ng();
其中 clientld 为消费者客户端的 clientld,对应请求头中的 client id。由此可见消费者的member 工d 由 clientld 和 UUID 用“-” 字符拼接而成。
选举消费纽的 leader
GroupCoordinator 需要为消费组内的消费者选举出一个消费组的 leader,这个选举的算法也 很简单,分两种情况分析。如果消费组内还没有 leader,那么第一个加入消费组的消费者即为 消费组的 leader。如果某一时刻 leader 消费者由于某些原因退出了消费组,1 那么会重新选举一 个新的 leader,这个重新选举 leader 的过程又更“随 意”了,相关代码如下 :
//scala code.
pr工vate val members = new mutable.HashMap[String, MemberMetadata] var leaderid = members.keys.head
解释一下这 2 行代码:在 GroupCoordinator 中消费者的信息是以 HashMap 的形式存储的,其中 key 为消 费者的 member id,而 value 是消 费者相关的元数据信息。 leaderld 表示 leader 消费者的 member id,它的取值为 HashMap 中的第一个键值对的 key,这种选举的方式基本 上和随机无异。 总体上来说,消费组的 l巳ader选举过程是很随意的。
选举分区分配某咯
每个消费者都可以设置自己的分区分配策略,对消费组而言需要从各个消费者呈报上来的 各个分配策略中选举一个彼此都“信服”的策略来进行整体上的分区分配 。 这个分区分配的选 举并非由 leader消费者决定,而是根据消费组内的各个消费者投票来决定的。这里所说的 “根据组内的各个消费者投票来决定”不是指 GroupCoordinator 还要再与各个消费者进行进一步交 互,而是根据各个消费者呈报的分配策略来实施。最终选举的分配策略基本上可以看作被各个 消费者支持的最多的策略,具体的选举过程如下:
(1)收集各个消费者支持的所有分配策略,组成候选集 candidates。 (2)每个消费者从候选集 candidates 中找出第一个自身支持的策略,为这个策略投上一票。
(3)计算候选集中各个策略的选票数,选票数最多的策略即为当前消费组的分配策略。
如果有消费者并不支持选出的分配策略,那么就会报出异常 IllegalArgumentException: Member does not supp。此 protocol。 需要注意的是,这里所说的“消费者所支持的分配策略”是 指 partition.assignment.strategy 参数配置的策略,如果这个参数值只配置了 RangeAssignor, 那么这个消费者客户端只支持 RangeAssignor 分配策略,而不是消费者客户端 代码中实现的 3 种分配策略及可能的自定义分配策略 。
在此之后, Kafka 服务端就要发送 JoinGroupResponse 响应给各个消费者, leader 消费者和 其他普通消费者收到的响应内容并不相同,首先我们看一下 JoinGroupResponse 的具体结构,如 图 7-8 所 示。
JoinGroupRespons巳包含了多个域,其中 ge口eratio口一工d 用来标识当前消费组的年代信息,避免受到过期请求的影响。 leader 工d 表示消费组 leader 消费者的 member id。
Kafka 发送给普通消费者的 JoinGroupResponse 中的 members 内容为空,而只有 leader 消 费者的 JoinGroupResponse 中的 members 包含有效数据。members 为数组类型,其中包含各 个成员信息 。 member_metadata 为消费者的订阅信息,与 JoinGroupRequest 中的 protocol metadata 内容相同,不同的是 JoinGroupR巳quest可以包含多个<protocol 口ame, protocol metadata>的键值对,在收到 JoinGroupRequest 之后 , GroupCoordinator 已经选举 出唯一的分配策略。也就是说, protocol name 己经确定( group protocol 〉 , 那么对应 的 protocol metadata 也就确定了,最终各个消费者收到的 JoinGroupResponse 响应中的 member_metadata 就是这个确定了的 protocol_metadata。 由此可见, Kafka 把分区分配 的具体分配交还给客户端,自身并不参与具体的分配细节,这样即使以后分区分配的策略发生 了变更,也只需要重启消费端的应用即可,而不需要重启服务端。
第三阶段( SYNC GROUP)
leader 消费者根据在第二阶段中选举出来的分区分配策略来实施具体的分区分配,在此之 后需要将分配的方案同步给各个消费者,此时 leader 消费者并不是直接和其余的普通消费者同 步分配方案,而是通过 GroupCoordinator 这个“中间人”来负责转发同步分配方案的。 在第三 阶段,也就是同步阶段, 各个消 费者会 向 GroupCoordinator 发送 SyncGroupRequest 请求来同步 分配方案,如图 7-11 所示。
我们再来看一下SyncGroupRequest 请求的具体结构 ,如 图 7-12 所示 。 SyncGroupRequest 中的 group id、 generation id 和 member id 前面都有涉及,这里不再赘述 。 只有 leader 消费者发送的 SyncGroupRequest 请求中才包含具体的分区分配方案,这个分配方案保存在 group ass工gnment 中,而其余消费者发送的 SyncGroupRequest请求中的 group assignment 为空。
group assignment是一个数组类型,其中包含了各个消费者对应的具体分配方案 :
member id 表示消费者的唯一标识,而 member assignment 是与消费者对应的分配方案,它还可以做更具体的划分,
服务端在收到消费者发送的 SyncGroupRequest 请求之后 会交 由 GroupCoordinator 来负责具 体的逻辑处理。 GroupCoordinator 同样会先对 SyncGroupRequest 请求做合法性校验,在此之后 会将从 leader 消费者发送过来的分配方案提取出来,连同整个消费组的元数据信息一起存入 Kafka 的 consumer offsets 主题中 , 最后发送响应给各个消费者 以提供给各个消费者各自所属 的分配方案。
这里所说的响应就是指 SyncGroupRequest 请求对应 的 SyncGroupResponse, SyncGroupResponse
的内容很简单,里面包含的就是消费者对应的所属分配方案, SyncGroupResponse 的结构如图
7-14 所示,具体字段的释义可以从前面的内容中推测出来,这里就不赘述了。
当消费者收到所属的分配方案之后会调用 PartitionAssignor 中的 onAssignment()方法。随后再调用 ConsumerRebalanceListener 中的 OnPartitionAssigned()方法 。 之后开启 心跳任务 , 消费者定期 向服 务端的 GroupCoordinator 发送 HeartbeatRequest 来确定彼此在线。
消费组元数据信息
我们知道消费者客户端提交的消费位移会保存在Kafka的_consumer_offsets主题中,这里也一样,只不过保存的是消费组的元数据信息(GroupMetadata)。
图 7-15 中对应的就是消费组元数据信息的具体内容格式,上面是消息的 key,下面是消息 的value。可以看到 key和 value中都包含 version字段,用来标识具体的 key和 value的版本信 息 , 不同的版本对应的内容格式可能并不相同,就目前版本而言, key 的 version 为 2,而 value 的 version 为 1, 读者在理解时其实可以忽略这个字段而探究其他具备特定含义的 内容。 key 中除了 versio口就是 group 宇段,它表示消费组的名称,和 JoinGroupRequest 或 SyncGroupRequest 请求中的 group_id 是同 一个东西 。 虽然 key 中包含了 version 字段, 但确定这条信息所要存储的分区还是根据单独的 group 字段来计算的,这样就可以保证消费组 的元数据信息与消费组对应的 GroupCoordinator 处于同 一个 broker 节点上,省去了中间轮转的 开销。
value 中包含的内容有很多,可以参照和 JoinGroupRequest 或 SyncGroupRequest 请求中的内容来理解,具体各个字段的释义如下 。
- protocol type:消费组实现的协议,这里的值为“ consumer”。 * generation:标识当前消费组的年代信息,避免收到过期请求的影响。 protocol : 消费组选取的分区分配策略。
- leader: 消费组的 lead巳r消费者的名称。
- members: 数组类型,其中包含了消费组的各个消费者成员信息,图 7-15 中右边部分 就是消费者成员的具体信息,每个具体字段都 比较容易辨 别, 需要着重说明的是 subscription 和 assignment 这两个字段, 分别代码消费者的订阅信息和分配信息
第四阶段( HEARTBEAT)
进入这个阶段之后,消 费组中的所有消费者就会处于正常工作状态。在正式消费之前 ,消 费者还需要确定拉取消息的起始位置。假设之前已经将最后的消费位移提交到了 GroupCoordinator,并且GroupCoordinator将其保存到了Kafka内部的一consumer_offsets主题中, 此时消费者可以通过 OffsetFetchRequest 请求获取上次提交的消 费位移并 从此处继续消费 。
消费者通过向 GroupCoordinator 发送心跳来维持它们与消费组的从属关系,以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送 心跳 , 就被认为是活跃的 ,说明它还在读 取分区中的消息。 心跳线程是一个独立的线程,可以在轮询消息的空档发送心跳。如果消费者停 止发送心跳的时间足够长,则整个会话就被判定为过期, GroupCoordinator 也会认为这个消费者 己经死亡,就会触发一次再均衡行为。消费者的心跳间隔时间由参数 heartbeat.interval.ms 指定,默认值为 3000,即 3 秒 , 这个参数必须比 session . timeout.ms 参数设定的值要小, 一般情况下 heartbeat. interval.ms 的配置值不能超过 session.timeout.ms配置值的 1/3。这个参数可以调整得更低,以控制正常 重新平衡 的预期时间 。
消费者通过向 GroupCoordinator 发送心跳来维持它们与消费组的从属关系,以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送 心跳 , 就被认为是活跃的 ,说明它还在读 取分区中的消息。 心跳线程是一个独立的线程,可以在轮询消息的空档发送心跳。如果消费者停 止发送心跳的时间足够长,则整个会话就被判定为过期, GroupCoordinator 也会认为这个消费者 己经死亡,就会触发一次再均衡行为。消费者的心跳间隔时间由参数 heartbeat.interval.ms 指定,默认值为 3000,即 3 秒 , 这个参数必须比 session . timeout.ms 参数设定的值要小, 一般情况下 heartbeat. interval.ms 的配置值不能超过 session.timeout.ms配置值的 1/3。这个参数可以调整得更低,以控制正常 重新平衡 的预期时间 。
如果一个消费者发生崩溃,并停止读取消息 , 那么 GroupCoordinator 会等待一小段时间 , 确认这个消费者死亡之后才会触发再均衡。在这一小段 时间内, 死掉的消费者井不会读取分区 里的消息。这个一小段时间由 session . timeout.ms 参数控制,该参数的配置值必须在 broker 端参数 group.m工n.sessio口.timeout.ms (默认值为 6000,即 6 秒)和 group.max. session. timeout. ms (默认值为 300000,即 5 分钟)允许的范围内。
还有一个参数 max.poll.interval.ms,它用来指定使用消费者组管理时 poll()方法调 用之 间的 最大延迟 ,也就是消费者在获取更多消息之前可以空闲的时间量的上限。如果 此超时 时间期满之前 poll()没有调用, 则消费者被视为失败,并且分组将重新平衡, 以便将分区重新分 配给别的成员。
除了被动退 出消费组,还可 以使用 Leav巳GroupRequest 请求主动退出消费组,比如客户端调用了 unsubscrible()方法取消对某些主题的订阅,这个比较简单,这里就不再赘述了 。
7.3 _consumer_offsets 剖析
位移提交是使用消费者客户端过程中一个比较“讲究”的操作, 3.2.5 节也 使用了较大的篇 幅来介绍它。位移提交的内容最终会保存到 Kafka 的内部主题 consumer offsets 中,对于主题
consumer offsets 的深度掌握也可以让我们更好地理解和使用好位移提交。
一般情况下,当集群中第一次有消费者消费消息时会自动创建主题 consumer offsets,不 过它的副本因子还受 offsets.topic.replication.factor参数的约束,这个参数的默认值为 3 (下载安 装的包中此值可能为 1),分区数可以通过 offsets.topic.num.partitions参数设置,默认为 50。客 户端提交消费位移是使用。他etCommitRequest 请求实现的, OffsetCommitRequest 的结构如图 7-16 所示 。
如果已经掌握了 6.1 节和] 7.2 节的内容,那么就很容易理解 OffsetCommitRequest 的结构 。 请求体第一层中的 group id、 generation_id 和 member_id 在前面的内容中已经介绍过 多次了, retention time 表示当前提交的消费位移所能保留的时长,不过对于消费者而言 这个值保持为 I。也就是说,按照 broker 端的配置 offsets . retention . minutes 来确定 保留时长 。 offsets . retention .minutes 的默认值为 10080,即 7 天,超过这个时间后消 费位移的信息就会被删除(使用墓碑消息和日志压缩策略) 。 注意这个参数在 2.0.0 版本之前的 默认值为 1440,即 1 天,很多关于消费位移的异常也是由这个参数的值配置不当造成的 。 有些 定时消费的任务在执行完某次消费任务之后保存了消费位移,之后隔了一段时间再次执行消费任务,如果这个问隔时间超过 offsets.retent工on.minutes 的配置值,那么原先的位移信 息就会丢失, 最后只能根据客户端参数 auto . offset.reset 来决定开始消费的位置,遇到 这种情况时就需要根据实际情况来调配 offsets.retention.minutes 参数 的值 。
OffsetCommitRequest 中的其余字段大抵也是按照分区的粒度来划分消费位移的 : topic 表 示主题名称, partition 表示分区编号等。注意这里还有一个 metadata 字段。在 3.2.5 节中 讲到手动位移提交时提到了可以通过 Map<TopicPartition, OffsetAndMetadata> offsets 参数来指 定要提交的分区位移
同消费组的元数据信息 一样,最终提交的消费位移也会以消息的形式发送至主题 _consumer_offsets,与消费位移对应的消息也只定义了 key 和 value 字段的具体内容,它不依 赖于具体版本的消息格式,以此做到与具体的消息格式无关 。
图 7”17 中展示了消费位移对应的消息内容格式,上面是消息的 key,下面是消息的 value。
可以看到 key 和 value 中都包含了 version 宇段 ,这个用来标识具体的 key 和 value 的版本信 息,不同的版本对应的内容格式可能并不相同 。就目前版本 而 言 , key 和 value 的 version 值 都为 l。 key 中除了 version 字段还有 group、 topic、 partition 字段,分别表示消费组 的 groupId、 主题名称和分区编号。虽然 key 中包含了 4 个字段,但最终确定这条消息所要存储 的分区还是根据单独的 group 字段来计算的,这样就可以保证消费位移信息与消费组对应的 GroupCoordinator 处于同 一个 broker 节点上,省去了中间轮转的开销,这 一点 与消费组的元数 据信息的存储是一样的 。
value 中包含了 5 个字段,除 version 宇段外,其余的 offset、 metadata、 commit
timestamp、 expire timestamp 宇段分别表示消费位移、自定义的元数据信息、位移提交 到 Kafka 的时间戳、消费位移被判定为超时的时间戳 。其 中 offset 和 metadata 与 OffsetCommitRequest 请求体中的 offset 和 metadata 对应,而 expire timestamp 和 OffsetCommitRequest 请求体中的 retention time 也有关联, commit timestamp 值与 offsets . retention .minutes 参数值之和即为 expire_timestamp (默认情况下)。
在处理完消费位移之后, Kafka返回 OffsetCommitResponse给客户端 ,OffsetCommitResponse 的结构如图 7-18 所示 。 OffsetCornmitResponse 中各个域的具体含义可以通过前面内容中推断出 来,这里就不再赘述了 。
我们可以通过kafka-console-consumer.sh脚本来查看 consumeroffsets中的内容,不过要设 定 formatter 参数为 kafka.coordinator.group.GroupMetadataManager$0ffsetsMessageForrnatter。 假设我们要查看消费组“consumerGroupid” 的位移提交信息 , 首先可 以根据代码清单 7-1 中的 计算方式得出分区编号为 20, 然后查看这个分区中的消息,相关示例如下:
有时候在查看主题 consumer offsets 中的内容时有可能出现下面这种情况:
[consumerGroupid, topic-offsets,21]: :null
这说明对应的消费位移己经过期了 。在 Kafka 中有一个名为“ delete-expired-group-metadata” 的定时任务来负责清理过期的消费位移,这个定时任务的执行周期由参数 offsets . retention.check.interval . ms 控制,默认值为 600000,即 10 分钟。
事务
7.4.1 消息传输保障
一般而言,消息中间件的消息传输保障有 3个层级,分别如下。
(1)at most once:至多 一次。消息可能会丢失,但绝对不会重复传输。
(2)at least once: 最少一次。消息绝不会丢失,但可能会重复传输。
(3) exactly once:恰好一次。每条消息肯定会被传输一次且仅传输一次。
Kafka 的消息传输保障机制非常直观 。 当生产者向 Kafka 发送消息时,一旦消息被成功提 交到日志文件,由于多副本机制的存在,这条消息就不会丢失。如果生产者发送消息到 Kafka 之后,遇到了网络问题而造成通信中断,那么生产者就无法判断该消息是否己经提交。虽然 Kafka 无法确定网络故障期间发生了什么,但生产者可以进行多次重试来确保消息已经写入 Kafka, 这个重试的过程中有可能会造成消息的重复写入,所以这里 Kafka 提供的消息传输保障为 at least once。
对消费者而言,消费者处理消息和提 交消费位移 的顺序在很大程度上决定了消费者提供哪 一种消息传输保障 。 如果消费者在拉取完消息之后 ,应用逻辑先处理消息后提交消费位移 ,那 么在消息处理之后且在位移提交之前消费者看机了,待它重新上线之后,会从上一次位移提交 的位置拉取,这样就出现了重复消费,因为有部 分消息已经处理过了只是还没来得及提交消费 位移,此时就对应 at least once。如果消费者在拉完消息之后,应用逻辑先提交消费位移后进行消息处理,那么在位移提交之后且在消息处理完成之前消费者岩机了,待它重新上线之后,会 从己经提交的位移处开始重新消费,但之前尚有部分消息未进行消 费,如此就会发生消 息丢失, 此时就对应 atmost once。
Kafka 从 0.11.0.0 版本开始引 入了军等和事务这两个特性,以此来实现 EOS ( exactly once semantics,精确 一次 处理语义) 。
7.4.2 幕等
所谓的幕等,简单地说就是对接口的多次调用所产生的结果和调用 一次是一致 的 。生产者 在进行重试的时候有可能会重复写入消息,而使用 Kafka 的幕等性功能之后就可以避免这种情况。
开启幕等性功能的方式很简单,只需要显式地将生产者客户端参数 enable.idempotence 设置为 true 即可(这个参数的默认值为 false),参考如下:
properties .put(ProducerConfig .ENABLE_IDEMPOTENCE CONFIG, true);
或者
properties.put ("enable . idempotence ” , true);
不过如果要确保军等性功能正常,还需要确保生产者客户端的 retries 、 acks 、 max.in. flight.requests.per. connect工on 这几个参数不被配置错。实际上在使用幕等 性功能的时候,用户完全可以不用配置(也不建议配置)这几个 参数。
如果用户显式地指定了 retries 参数,那么这个参数的值必须大于 0, 否则会报出 ConfigException:
如果用户没有显式地指定 retries 参数,那么 KafkaProducer 会将它置为 Integer.MAX_ VALUE。 同时还需要保证 max.in.flight.requests.per . connection 参数的值不能大 于 5 (这个参数的值默认为 5, 在 2.2.1 节中有相关的介绍),否则也会报出 ConfigException:
如果用户还显式地指定了 acks 参数,那么还需要保证这个参数 的值为一1 (all),如果不 为 1 (这个参数的值默认为 1' 2.3 节中有相关的介绍),那么 也会报出 ConfigException:
org.apache.kafka.common.config.ConfigException: Must set acks to all in order
to use the jdempotent producer . Otherwise we cannot guarantee idempotence .
如果用户没有显式地指定这个 参数 ,那么 KafkaProducer 会将它置为 1。 开启幕等性功能之 后 ,生 产者就可以如同未开启幕等 时 一样发送消息了。
为了实现生产者的幕等性,Kafka为此引入了 producerid(以下简称 PID)和序列号(sequence number)这两个概念,这两个概念其实在 5.2.5 节中就讲过,分别对应 v2 版的日志格式中 RecordBatch 的 producer id 和 first seqe口ce 这两个宇段(参考图 5-7)。每个新的生产 者实例在初始化的时候都会被分配一个 PID,这个 PID 对用户而言是完全透明的 。对于每个 PID, 消息发送到的每一个分区都有对应的序列号,这些序列号从 0 开始单调递增。生产者每发送一 条消息就会将<PID, 分区>对应的序列号的值加 l。
broker 端会在内存中为 每一对<PID,分区>维护一个序列号。对于收到的每一条消息,只有 当它的序列号的值(SN new)比broker端中维护的对应的序列号的值(SN old)大 1(即 SN new =SN old+1)时, broker才会接收它。 如果SN new<SN old+I, 那么说明消息被重复写入, broker 可以直接将其丢弃 。 如果 SN new> SN old + l,那么说明中间有数据尚未写入, 出现了 乱序,暗示可能有消息丢失,对应的生产者会抛出 OutOfOrderSequenceException,这个异常是 一个严重的异常,后续的诸如 send()、 beginTransaction()、 commitTransaction()等方法的调用都 会抛出 Illega!StateException 的异常 。
引入序列号来实现幕等也只 是针对每一对<PID, 分区>而言的,也就是说, Kafka 的霖等只 能保证单个生产者会话( session)中单分区的事等 。
ProducerRecord<String , String> record
=new ProducerRecord<>(topic, "key", ”msg” ) ;
producer . send(record) ;
producer . send (record ) ;
注意,上面示例中发送了两条相同的消息,不过这仅仅是指消息 内 容相同,但对 Kafka 而 言是两条不同 的消息,因为会为这两条消息分配不同的序列号 。 Kafka 并不会保证消息 内容的 罪等。
7.4.3 事务
军等性并不能跨多个分区运作,而事务可以弥补这个缺陷。事务可以保证对多个分区写操作的原子性。操作的原子性是指多个操作要么全部成功,要么全部失败,不存在部分成功、部分失败的可能。
对流式应用( Stream Processing Applications )而 言 , 一 个典型的 应用模 式为“ consume transform-produce” 。在这种模式下消费和生产 并存: 应用程序从某个主题中消费消息 , 然后经 过一系列转换后写入另一个主题 ,消费者可能在提交消费位移的过程中出现问题而导致重复消 费, 也有可能生产者重复生产消息 。 Kafka 中的事务可以使应用程序将消费消息、生产消息 、 提交消费位移当作原子操作来处理,同时成功或失败,即使该生产或消费会跨多个分区 。
为了实现事务,应用程序必须提供唯一的 transactionalld,这个 transactionalld 通过客户端 参数 transact工onal.id 来显式设置,参考如下 :
properties.put(ProducerConfig.TRANSACTIONAL ID CONFIG,”transactionId”);
或者:
properties .put (”transactional .id”,”transactionid”),
事务要求生产者开启幕等特性,因 此通过将 transactional . id 参数设置为非空从而开 启事务特性的同时 需要将 enable.idempotence 设置为 true ( 如果未显式设置 , 则 KafkaProducer 默认会将它 的值设 置 为 true) ,如果用 户显式地将 enable . idempotence 设置 为 false,则会报出 ConfigException:
transactionalld 与 PID 一一对应,两者之间所不同的是 transactionalld 由用户显式设置, 而 PID是由 Kafka内部分配的。另外,为了保证新的生产者启动后具有相同 transactionalld的旧生 产者能够立即失效 ,每个生产者通过 transactionalld 获取 PID 的 同时,还会获取一个单调递增的 producer epoch (对应下面 要讲述 的 KafkaProducer.initTransactions()方法〉。如果使用 同 一 个 transactionalld 开启两个生产者,那么前 一个 开启的生产者会报出如下的错误:
org . apache . kafka . common . errors . ProducerFencedExcept工on : Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalid, or the producer’s transaction has been expired by the broker.
从生产者 的角度分析,通过事务 , Kafka 可以保证跨生产者会话的消息幕等发送,以及跨 生产者会话的事务恢复 。 前者表示具有相同 transactionalld 的新生产者实例被创建且工作的时候,旧的且拥有相同 transactionalld 的生产者实例将不再工作。后者指当某个生产者实例君机后, 新的生产者实例可以保证任何未完成的旧事务要么被提交( Commit),要么被中止( Abo时), 如此可以使新的生产者实例从一个正常的状态开始工作。
而从消费者的角度分析, 事务能保证的语义相对偏弱。出于以下原因, Kafka 并不能保证 己提交的事务中的所有消息都能够被消 费 :
- 对采用日志压缩策略的主题而言,事务中的某些消息有可能被清理(相同 key 的消息, 后写入的消息会覆盖前面写入的消息)。
- 事务中消息可能分布在同一个分区的多个日志分段( LogSegment)中,当老的日志分 段被删除时,对应的消息可能会丢失。
- 消费者可以通过 seekO方法访问任意 offset 的消息,从而可能遗漏事务中的部分消息。
- 消费者在消费时可能没有分配到事务内的所有分区,如 此它也就不能读取事务中的所 有消息。
initTransactions()方法用来初始化事务,这个方法能够执行的前提是配置了 transactionalld, 如果没有则会报出 IllegalStateException:
beginTransaction()方法用来开启 事务: sendOffsetsToTransaction()方法为消费者提供在事务 内的位移提交 的操作; commitTransaction()方法用来提交事务 : abortTransaction()方法用来中止 事务 ,类似于事务回滚 。
package chapter7;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* 代码清单7-2
*/
public class TransactionOnlySend {
public static final String topic = "topic-transaction";
public static final String brokerList = "localhost:9092";
public static final String transactionId = "transactionId";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId);
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
producer.initTransactions();
producer.beginTransaction();
try {
//处理业务逻辑并创建ProducerRecord
ProducerRecord<String, String> record1 = new ProducerRecord<>(topic, "msg1");
producer.send(record1);
ProducerRecord<String, String> record2 = new ProducerRecord<>(topic, "msg2");
producer.send(record2);
ProducerRecord<String, String> record3 = new ProducerRecord<>(topic, "msg3");
producer.send(record3);
//处理一些其它逻辑
producer.commitTransaction();
} catch (ProducerFencedException e) {
producer.abortTransaction();
}
producer.close();
}
}
在消费端有一个参数 isolation.level,与事务有着莫大的关联,这个参数的默认值为“ read uncommitted”, 意思是说消费端应用可 以看到(消费到)未提交的事务, 当然对于己提 交的事务也是可见的。这个参数还可以设置为“ read committed”,表示消费端应用不可以看到 尚未提交的事务内的消息。举个例子,如果生产者开启事务并向某个分区值发送 3 条消息 msgl 、 msg2 和 msg3,在执行 commitTransaction()或 abortTransaction()方法前,设置为“read_committed” 的消费端应用是消费不到这些消息的,不过在 KafkaConsumer 内部会缓存这些消息,直到生产 者执行 commitTransaction()方法之后它才能将这些消息推送给消费端应用。反之,如果生产者 执行了 abortTransaction()方法,那么 KafkaConsumer 会将这些缓存的消息丢弃而不推送给消费 端应用。
日志文件中除了普通的消息,还有 一种消息专门用来标志 一个事务 的结束,它就是控制消息( Contro!Batch)。控制消息一共有两种类型 : COMMIT 和 ABORT,分别用来表征事务己经 成功提交或己经被成功中止。 KafkaConsumer 可以通过这个控制消息来判断对应的事务是被提 交了还是被中止了,然后结合参数 isolation.level 配置的隔离级别来决定是否将相应的消 息返回给消费端应用,如图 7-19所示。注意 Contro!Batch对消费端应用不可见,后面还会对它 有更加详细的介绍。
本节开头就提及了 consume-transform-produce 这种应用模式 ,这里还涉及在代码清单 7-2 中尚未使用的 s巳ndOffsetsToTransaction()方法。该模式的具体结构如图 7-20 所示。与此对应的 应用示例如代码清单 7-3 所示。
package chapter7;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.*;
/**
* 代码清单7-3
*/
public class TransactionConsumeTransformProduce {
public static final String brokerList = "10.198.197.73:9092";
public static Properties getConsumerProperties(){
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId");
return props;
}
public static Properties getProducerProperties(){
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId");
return props;
}
public static void main(String[] args) {
//初始化生产者和消费者
KafkaConsumer<String, String> consumer =
new KafkaConsumer<>(getConsumerProperties());
consumer.subscribe(Collections.singletonList("topic-source"));
KafkaProducer<String, String> producer =
new KafkaProducer<>(getProducerProperties());
//初始化事务
producer.initTransactions();
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
if (!records.isEmpty()) {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
//开启事务
producer.beginTransaction();
try {
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords
= records.records(partition);
for (ConsumerRecord<String, String> record :
partitionRecords) {
//do some logical processing.
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>("topic-sink", record.key(),
record.value());
//消费-生产模型
producer.send(producerRecord);
}
long lastConsumedOffset = partitionRecords.
get(partitionRecords.size() - 1).offset();
offsets.put(partition,
new OffsetAndMetadata(lastConsumedOffset + 1));
}
//提交消费位移
producer.sendOffsetsToTransaction(offsets,"groupId");
//提交事务
producer.commitTransaction();
} catch (ProducerFencedException e) {
//log the exception
//中止事务
producer.abortTransaction();
}
}
}
}
}
注意 : 在使用 KafkaConsumer 的时 候要将 enable .auto .commit 参数设置为 false,代码里也不能手动提交消费位移 。
为了实现事务的功能, Kafka还引入了事务协调器 CTransactionCoordinator)来负责处理事 务,这一点可以类比一下组协调器( GroupCoordinator) 。每一个生产者都会被指派一个特定的 TransactionCoordinator,所有的事务逻辑包括分派 PID 等都是由 TransactionCoordinator 来负责 实施的。 TransactionCoordinator 会将事务状态持久化到内部主题 位ansaction state 中 。下面就以最复杂 的 consume-transform-produce 的流程 (参考图 7-21 )为例来分析 Kafka 事务的 实现原 理。
1 查找 TransactionCoordinator
TransactionCoordinator 负责分配 PID 和管理事务,因此生产者要做的第一件事情就是找出 对应的 TransactionCoordinator 所在 的 broker 节点 。与查找 GroupCoordinator 节点 一样 ,也是通 过 FindCoordinatorRequest请求来实现的,只不过 FindCoordinatorRequest 中的 coordinator_ type 就由原来的 0变成了 1,由此来表示与事务相关联(FindCoordinatorRequest请求的具体结 构参考图 7-5)。
Kafka 在收到 FindCoorinatorRequest 请求之后 , 会根据 coord工nator_key (也就是 transactionalld)查找对应的TransactionCoordinator节点。如果找到,则会返回其相对应的node id、 host 和 port 信息。具体查找 TransactionCoordinator 的方式是根据 transactionalld 的 哈希值计算主 题 transaction state中的分区编号, 具体算法如代码清单 7-4所示。
代码:青单 7-4 计算分区编号
Utils.abs(transactionalid.hashCode) % transactionTopicPartit工onCount
其中 transactionTopicPartitionCount 为主题一transaction_state 中的分区个数 , 这 个可以通过 brok巳r端参数 transaction.state.log.num.partitions 来配置,默认值为 50 。
找到对应的分区之后,再寻找此分区 leader 副本所在 的 broker 节点,该 broker 节点即为这 个 transactionalld对应的 TransactionCoordinator节点。细心的读者可以发现,这一整套的逻辑和 查找 GroupCoordinator 的逻辑如出 一辙(参考 7.2.2 节) 。
- 获取 PID
在找到 TransactionCoordinator 节点之后,就需要为当前生产者分配一个 PID 了 。凡是开启 了罪等性功能的生产者都必须执行这个操作,不需要考虑该生 产者是否还开启了事务。生产者 获取 PID 的操作是通过 InitProducerldRequest 请求来实现的, InitProducerldRequest 请求体结构 如图 7-22 所示,其中 transactional id 表示 事务 的 transactiona!Id, transaction timeout ms 表示 TransactionCoordinaor等待事务状态更新的超时时间,通过生产者客户端参 数 transact工on . timeout .ms 配置,默认值为 60000。
保存 PID
生产者的 InitProducerldRequest请求会被发送给 TransactionCoordinator。 注意,如果未开启 事务特性 而 只开启幕等特性 , 那么 InitProducerldRequest 请求可以发送给任意的 broker。当 TransactionCoordinator 第一次收到包含该 transactiona!Id 的 InitProduc巳rldRequest 请求时,它会 把 transactiona!Id 和对应的 PID 以消息(我们习 惯性地把这类消息称为“事务日志消息”〉的形 式保存到主题 transaction state 中,如图 7-21 步骤 2.1 所示 。这样可以保证<transaction Id, PID> 的对应关系被持久化,从而保证即使 TransactionCoordinator 右机该对应关系也不会丢失 。 存储 到主题 transaction state 中的具体内容格式如图 7-23 所示 。
其中 transaction status 包含 Empty(O)、 Ongoing(l)、 PrepareCommit(2) 、 PrepareAbort(3)、 CompleteCommit(4)、 CompleteAbort(S)、 Dead(6)这 几种状态 。在存入主题
transaction state 之前,事务日志消息同样会根据单独的 transactiona!Id 来计算要发送的分区, 算法同代码清单 7-4 一样。
与InitProducerldRequest 对应的 InitProducerldResponse 响应体结构如图 7-24 所示,
3. 开启事务
通过 KafkaProduc町的 beginTransactionO方法可以开启一个事务, 调用该方法后,生产者本 地会标记己经开启了 一个新的事务 ,只有在生产者发送第一条消息之后 TransactionCoordinator 才会认为该事务 己经开启 。
4 . Consume-Transform-Produce
这个阶段囊括了整个事务的数据处理过程,其中还涉及多种请求
5. 提交或者中止事务
一旦数据被写入成功,我 们 就可以调用 KafkaProducer 的 commitTransaction()方法或 abortTransaction()方法来结束当前 的 事务 。