1. ZooKeeper介绍
ZooKeeper 是一个分布式的、开放源码的分布式应用程序协调服务,是 Google 的 Chubby 一个开源的实现。分布式应用程序可以基于它实现统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等工作。在基于 Kafka 的分布式消息队列中,ZooKeeper 的作用有:broker 注册、topic 注册、producer 和 consumer 负载均衡、维护 partition 与 consumer 的关系、记录消息消费的进度以及 consumer 注册等。
1.1 Broker注册
Broker是分布式部署并且相互之间相互独立,但是需要有一个注册系统能够将整个集群中的Broker管理起来,此时就使用到了Zookeeper。主要工作:
- 为了记录 broker 的注册信息,在 ZooKeeper 上,专门创建了属于 Kafka 的一个节点,其路径为 /brokers;
- Kafka 的每个 broker 启动时,都会到 ZooKeeper 中进行注册,告诉 ZooKeeper 其 broker.id,在整个集群中,broker.id 应该全局唯一,并在 ZooKeeper 上创建其属于自己的节点,其节点路径为 /brokers/ids/{broker.id};
- 创建完节点后,Kafka 会将该 broker 的 broker.name 及端口号记录到该节点;
- 另外,该 broker 节点属性为临时节点,当 broker 会话失效时,ZooKeeper 会删除该节点,这样,我们就可以很方便的监控到broker 节点的变化,及时调整负载均衡等。
1.2 Topic注册
kafka消息的topic是由zookeeper管理的,在zookeeper中会建立专门的节点来记录这些信息,节点路径为 /brokers/topics/{topic_name}。为了保障数据的可靠性,每个 Topic 的 Partitions 实际上是存在备份的,并且备份的数量由 Kafka 机制中的 replicas 来控制。假设某个 TopicA 被分为 2 个 Partitions,并且存在两个备份,由于这 2 个 Partitions(1-2)被分布在不同的 broker 上,同一个 partiton 与其备份不能(也不应该)存储于同一个 broker 上。
为了保障数据的一致性,ZooKeeper 机制得以引入。基于 ZooKeeper,Kafka 为每一个 partition 找一个节点作为 leader,其余备份作为 follower;接续上图的例子,就 TopicA 的 partition1 而言,如果位于 broker2(Kafka 节点)上的 partition1 为 leader,那么位于 broker1 和 broker4 上面的 partition1 就充当 follower,则有下图:
基于上图的架构,当 producer push 的消息写入 partition(分区) 时,作为 leader 的 broker(Kafka 节点) 会将消息写入自己的分区,同时还会将此消息复制到各个 follower,实现同步。如果,某个follower 挂掉,leader 会再找一个替代并同步消息;如果 leader 挂了,follower 们会选举出一个新的 leader 替代,继续业务,这些都是由 ZooKeeper 完成的。
1.3 consumer 注册
注册新的消费者:
当新的消费者注册到 Kafka 中时,会在 /consumers/{group_id}/ids 节点下创建临时子节点,并记录相关信息。
注册新的消费者分组:
当新的消费者组注册到 ZooKeeper 中时,ZooKeeper 会创建专用的节点来保存相关信息,其节点路径为 ls/consumers/{group_id},其节点下有三个子节点,分别为 [ids, owners, offsets]。
- ids 节点:记录该消费组中当前正在消费的消费者;
- owners 节点:记录该消费组消费的 topic 信息;
- offsets 节点:记录每个 topic 的每个分区的 offset。
监听消费者分组中消费者的变化:
每个消费者都要关注其所属消费者组中消费者数目的变化,即监听 /consumers/{group_id}/ids 下子节点的变化。一单发现消费者新增或减少,就会触发消费者的负载均衡。
1.4 Consumer 负载均衡
Kafka 保证同一 consumer group 中只有一个 consumer 可消费某条消息,实际上,Kafka 保证的是稳定状态下每一个 consumer 实例只会消费某一个或多个特定的数据,而某个 partition 的数据只会被某一个特定的 consumer 实例所消费。这样设计的劣势是无法让同一个 consumer group 里的 consumer 均匀消费数据,优势是每个 consumer 不用都跟大量的 broker 通信,减少通信开销,同时也降低了分配难度,实现也更简单。另外,因为同一个 partition 里的数据是有序的,这种设计可以保证每个 partition 里的数据也是有序被消费。
consumer 数量不等于 partition 数量:
如果某 consumer group 中 consumer 数量少于 partition 数量,则至少有一个 consumer 会消费多个 partition 的数据;如果 consumer 的数量与 partition 数量相同,则正好一个 consumer 消费一个 partition 的数据,而如果 consumer 的数量多于 partition 的数量时,会有部分 consumer 无法消费该 topic 下任何一条消息。
借助 ZooKeeper 实现负载均衡:
关于负载均衡,对于某些低级别的 API,consumer 消费时必须指定 topic 和 partition,这显然不是一种友好的均衡策略。基于高级别的 API,consumer 消费时只需制定 topic,借助 ZooKeeper 可以根据 partition 的数量和 consumer 的数量做到均衡的动态配置。
consumers 在启动时会到 ZooKeeper 下以自己的 conusmer-id 创建临时节点 /consumer/[group-id]/ids/[conusmer-id],并对 /consumer/[group-id]/ids 注册监听事件,当消费者发生变化时,同一 group 的其余消费者会得到通知。当然,消费者还要监听 broker 列表的变化。librdkafka 通常会将 partition 进行排序后,根据消费者列表,进行轮流的分配。
1.5 Producers 负载均衡
对于同一个 topic 的不同 partition,Kafka会尽力将这些 partition 分布到不同的 broker 服务器上,这种均衡策略实际上是基于 ZooKeeper 实现的。在一个 broker 启动时,会首先完成 broker 的注册过程,并注册一些诸如 “有哪些可订阅的 topic” 之类的元数据信息。producers 启动后也要到 ZooKeeper 下注册,创建一个临时节点来监听 broker 服务器列表的变化。由于在 ZooKeeper 下 broker 创建的也是临时节点,当 brokers 发生变化时,producers 可以得到相关的通知,从改变自己的 broker list。其它的诸如 topic 的变化以及broker 和 topic 的关系变化,也是通过 ZooKeeper 的这种 Watcher 监听实现的。
在生产中,必须指定 topic;但是对于 partition,有两种指定方式:
- 明确指定 partition(0-N),则数据被发送到指定 partition;
- 设置为 RD_KAFKA_PARTITION_UA,则 Kafka 会回调 partitioner 进行均衡选取,partitioner 方法需要自己实现。可以轮询或者传入 key 进行 hash。未实现则采用默认的随机方法 rd_kafka_msg_partitioner_random 随机选择。
1.6 消费进度Offset 记录
在 consumer 对指定消息 partition 的消息进行消费的过程中,需要定时地将 partition 消息的消费进度 Offset 记录到 ZooKeeper上,以便在该 consumer 进行重启或者其它 consumer 重新接管该消息分区的消息消费权后,能够从之前的进度开始继续进行消息消费。Offset 在 ZooKeeper 中由一个专门节点进行记录,其节点路径为:
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
节点内容就是Offset的值。
Kafka 已推荐将 consumer 的 Offset 信息保存在 Kafka 内部的 topic 中,即:
__consumer_offsets(/brokers/topics/__consumer_offsets)
并且默认提供了 kafka_consumer_groups.sh 脚本供用户查看consumer 信息(命令:sh kafka-consumer-groups.sh –bootstrap-server * –describe –group *)。在当前版本中,offset 存储方式要么存储在本地文件中,要么存储在 broker 端,具体的存储方式取决 offset.store.method 的配置,默认是存储在 broker 端。
1.7 记录 Partition 与 Consumer 的关系
consumer group 下有多个 consumer(消费者),对于每个消费者组(consumer group),Kafka都会为其分配一个全局唯一的 group ID,group 内部的所有消费者共享该 ID。订阅的 topic 下的每个分区只能分配给某个 group 下的一个consumer(当然该分区还可以被分配给其它 group)。同时,Kafka 为每个消费者分配一个 consumer ID,通常采用 hostname:UUID 形式表示。
在Kafka中,规定了每个 partition 只能被同组的一个消费者进行消费,因此,需要在 ZooKeeper 上记录下 partition 与 consumer 之间的关系,每个 consumer 一旦确定了对一个 partition 的消费权力,需要将其 consumer ID 写入到 ZooKeeper 对应消息分区的临时节点上,例如:
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]
其中,[broker_id-partition_id] 就是一个消息分区的标识,节点内容就是该消息分区 消费者的 consumer ID。
参考资料: