场景
最近, 运维部门的同事碰到一个问题, 向Kafka 中 某个Topic 发送消息总是失败. 调查下来发现, Zookeeper 中记录的该Topic 的Partition Leader 是一个非法的Broker. 也就是说Zookeeper 中, 记录了错误的Kafka 信息.
重启Kafka 和Zookeeper 后, 该问题得到了解决.
在本篇文章中, 会对Kafka 中Zookeer 的具体作用做一个深入的剖析, 包括理论和实战部分.
理论
概览
Zookeeper 主要用来跟踪Kafka 集群中的节点状态, 以及Kafka Topic, message 等等其他信息. 同时, Kafka 依赖于Zookeeper, 没有Zookeeper 是不能运行起来Kafka 的.
- Controller 选举:
- Controller 是一个特殊的Broker, 其负责所有Partition 的leader/follower 关系.
- Zookeeper 负责从Broker 中选举出一个作为Controller, 并确保其唯一性. 同时, 当Controller 宕机时, 选举一个新的.
- 集群 membership:
- 记录集群中都有哪些活跃着的Broker.
- Topic 配置:
- 有哪些Topic, Topic 都有哪些Partition, Replica 存放在哪里, Leader 是谁.
- 配额(0.9.0+):
- 记录每个客户能够读写的数据量.
- ACLs(0.9.0+):
- 记录对Topic 的读写控制.
- high-level consumer(已废弃):
- 记录consumer group 及其成员和offset 信息.
再谈Zookeeper
Zookeeper 向客户端提供了对一种数据结构访问的服务. 该数据结构类似于Unix 文件系统, 可以认为是数据数或者有继承层次的命名空间.
数据结构的路径使用Unix 惯例. 例如/A/B/C
用来表示节点C 的路径, 其中, 节点C 的父节点是节点B, 节点B 的父节点是节点A.
客户端通过Zookeeper 可对该数据结构进行CRUD 操作.
与Unix 的文件系统不同的是, Zookeeper 中的节点分为永久节点和临时节点, 同时, Zookeeper 中的节点是不能重命名的.
除此之外, Zookeeper 提供了节点add/remove 的watch 功能, 用来在节点更新时获得通知.
实践
环境搭建
- 下载Kafka, 同时为了研究Zookeeper 的作用, 需要单独下载Zookeeper.
- 按照Kafka 官方的Quickstart 指南, 运行起来一个Kafka 的集群, 集群内含有3个Broker实例, 同时有一个Zookeeper 实例. 其中, Kafka 的目录我们命名为Kafka 控制台(其bin 目录下有Kafka 的管理工具).
- 在Zookeeper 的目录下, 运行命令
./zkCli.sh
来启动来连接到Zookeeper 实例中, 以监视其状态. 此控制台我们命名为ZK 监视端.
初探Zookeeper 目录结构
在ZK 的监视端, 运行ls /
, 可以列出所有的顶级节点, 主要包含以下的节点:
- cluster.
- brokers.
- controller.
- consumer.
- config.
- isr_change_notification.
通过字面意思能够理解其记录的内容. 需要说明的是isr_change_notification 节点. 在Kafka 中, Leader 和Follower 的数据同步遵循的是"最终一致"原则, 也就是数据同步会有延迟, 但保证最终数据的一致性. isr 是'in-sync' replicas 的缩写, 代表的是与Leader 数据已经通过过的replica, 它会作为重选Leader 时作为判断依据.
运行ls /brokers/ids
命令, 可以看到输出了[0, 1, 2] , 代表了集群中现有的3台Broker.
创建Topic
-
在Kafka 控制台创建一个Topic, 该Topic 拥有一个Partition 和3个replica.
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic topic-01
在ZK 监视端, 通过
ls /brokers/topics
可以看到出现了topic-01 主题.-
在ZK 监视端, 通过
get /brokers/topics/topic-01
可以获取topic-01 主题的值.{"version":1,"partitions":{"0":[0,1,2]}}
-
在ZK 监视端, 通过
ls /brokers/topics/topic-01/partitions
来查看该Topic 的Partition.[0]
可以看出, 该Topic 只有一个Partition.
-
在ZK 监视端, 通过
get /brokers/topics/topic-01/partitions/0/state
来获取Topic Partition 的信息.{"controller_epoch":11,"leader":0,"version":1,"leader_epoch":0,"isr":[0,1,2]}
可以看出, 该Topic 有3个replica, 而且当前状态都是isr(也就是已处于最新状态). 同时, 当前的Leader 是Broker 0.
关闭Leader
在kafka 控制台, 通过
bin/kafka-server-stop.sh
关闭Leader, 也就是Broker 0.-
在ZK 监视端, 通过
get /brokers/topics/topic-01/partitions/0/state
来获取Topic Partition 的信息.{"controller_epoch":15,"leader":1,"version":1,"leader_epoch":7,"isr":[1,2]}
可以看出, 系统进行了Leader 的重新选举, 新的Leader 为Broker 1. 当前的isr 队列中有Broker 1 和Broker 2.
关闭Zookeeper
-
通过Kafka 控制台produce 和consume 数据. 此刻功能一切正常.
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-01
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic topic-01
-
在Kafka 控制台, 通过
bin/zookeeper-server-stop.sh
来关闭Zookeeper 实例.此时, 如果Kafka Broker 的控制台窗口还没有关闭, 可以看到Broker 在不停地尝试去获取Zookeeper 的连接.
通过Kafka 控制台, 仍然能够正常地produce 和consume 数据.
-
在Kafka 控制台, 创建新的Topic 时, 会出现以下的异常:
[2017-01-06 11:23:23,477] WARN Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
创建的客户端会不停尝试, 并一直打印出该异常信息.
如果此时, 通过Kafka 控制台重新启动Zookeeper, 可以发现新建Topic 成功返回. 并且Broker 也成功与Zookeeper 重建了连接,并进行了Topic Partition 和Replica 的同步操作.
额外发现
离开了Zookeeper, Kafka 不能对Topic 进行新增操作, 但是仍然可以produce 和consume 消息.
PS: 在运行过程中, 如果先关闭掉了Zookeeper, 然后再去关闭Kafka, 会发现Kafka 后台一直结束不掉, 这是因为Kafka 会被block 在与Zookeeper 的重连过程中. 解决方法是重启Zookeeper , 然后先关闭Kafka 再关闭Zookeeper.
PPS: 如果想了解Zookeeper 和Kafka 的其它技术细节, 可以移步我的系列文章[Zookeeper 与Kafka].