官方原文地址:https://kafka.apache.org/0101/documentation.html#distributionimpl
Consumer Offset Tracking
high-level的Consume保持它自己消费过的每个分区的最大的offset并且周期性的提交,所以可以在重启的时候恢复offset信息。Kafka提供在offset manager中保存所有offset的选项。任何Consumer实例都需要发送offset到offset manager。high-level的Consumer自动化的处理offset。如果使用simple consumer,需要自己手动管理offset。在Java simple consumer中现在还不支持,Java simple consumer只能从ZooKeeper提交和获取offset。如果使用Scala simple consumer,你会找到offset manager并且可以明确指定向offset manager提交和获取offset。Consumer通过向Broker发送GroupCoordinatorRequest请求并获取包含offset manager的GroupCoordinatorResponse来获取offset。之后Consumer可以向offset manager提交和获取offset。如果offset manager移动,Consumer需要重新发现。如果你期望手动管理offset,可以查看这些解释如果提交OffsetCommitRequest和OffsetFetchRequest的代码。
当offset manager收到OffsetCommitRequest,将其添加到一个特殊的、压缩的,名为__consumer_offsets的Kafka topic中。offset manager返回一个成功的offset commit的响应给Consumer,当所有的备份都收到offset之后。如果在配置的timeout时间内所有副本没有完成备份,commit offset认为是失败的,将在之后重试(high-consumer将自动执行)。broker周期性的压缩offset信息,因为它只需要保存每个Partition最近的offset信息即可。为了更快的响应获取offset的请求,offset manager也会在内存缓存offset数据。
当offset manager收到fetch offset的请求,它从cache中返回最近commit的offset。如果offset manager是刚启动或者刚成为一些group的offset manager(通过成为一下offset topic的leader partition),它需要加载offset信息。这种情况下,fetch offset request或返回OffsetsLoadInProgress异常,Consumer需要之后重试(high-level consumer会自动处理)。
Migrating offsets from ZooKeeper to Kafka
Kafka较早的版本将offset信息存储在ZooKeeper中。可以通过以下步骤将这些数据迁移到Kafka中:
1. 在Consumer配置中设置 offsets.storage=kafka
和 dual.commit.enabled=true
2. 验证Consumer是否正常
3. 在Consumer配置中设置 dual.commit.enabled=false
4. 验证Consumer是否正常
回滚(从Kafka到ZooKeeper)也可以通过以上的步骤执行,只需要设置offsets.storage=zookeeper
ZooKeeper Directories
以下说明ZooKeeper用于统筹Consumer和Broker的结构和算法。
Notation
当路径中的元素被表示为[xyz]时,这意味着xyz的值不是固定的,实际上对于xyz的每个可能值都有一个ZooKeeper znode。如/topics/[topic]表示/topics下每个topic都有一个对应的目录。[0…5]表示0,1,2,3,4,5的序列。->符号用于指示一个节点的值,如/hello -> world表示/hello存储的值是“world”。
Broker Node Registry
/brokers/ids/[0...N] --> {"jmx_port":...,"timestamp":...,"endpoints":[...],"host":...,"version":...,"port":...} (ephemeral node)
这是一个所有存在的Broker的节点列表,每个都提供一个唯一标识用于Consumer识别(必须作为配置的一部分)。在启动时,Broker通过在/brokers/ids中创建一个znode来注册自己。使用逻辑上的Broker ID的目的是可以在不影响Consumer的前提下将Broker移动到另外的物理机上。如果尝试注册一个已经存在的ID的Broker会失败。
一旦Broker通过临时节点注册到ZK,注册信息是动态的并且在Broker宕机或者关闭后会丢失(那么通知消费者Broker不再可用)。
Broker Topic Registry
/brokers/topics/[topic]/partitions/[0...N]/state --> {"controller_epoch":...,"leader":...,"version":...,"leader_epoch":...,"isr":[...]} (ephemeral node)
每个Broker将自己注册到它包含的Topic下面,并保存Topic的partition数量。
Consumers and Consumer Groups
Consumer同样将自己注册到ZK中,为了协调其他的Consumer并且做消费数据的负载均衡。Consumer还可以通过offsets.storage=zookeeper将offset信息也存储在ZK中。但是这个存储在未来的版本中将被废弃。因此建议将offset信息迁移到Kafka中。
多个Consumer可以组成一个集群共同消费一个Topic。一个Group中的每个Consumer实例共享一个group_id。
一个Group中的Consumer尽可能公平的分配partition,每个partition只能被一个group中的一个Consumer消费。
Consumer Id Registry
除了一个group内的所有Consumer实例共享一个groupid,每个Consumer实例还拥有一个唯一的consumerid(hostname:uuid)用于区分不同的实例。Consumer的id注册在以下的目录中。
/consumers/[group_id]/ids/[consumer_id] --> {"version":...,"subscription":{...:...},"pattern":...,"timestamp":...} (ephemeral node)
每个Consumer将自己注册到Group目录下并创建一个包含id的znode。节点的值包含<topic, #stream=””>的Map。id用于标识group中哪些Consumer是活跃的。节点是临时节点,所以在Consumer进程关闭之后节点会丢失。
Consumer Offsets
Consumer记录每个分区消费过的最大的offset信息。如果配置了offsets.storage=zookeeper,这个数据会被记录到ZK中。
/consumers/[group_id]/offsets/[topic]/[partition_id] --> offset_counter_value (persistent node)
Partition Owner registry
每个Partition都被一个group内的一个Consumer消费。这个Consumer必须在消费这个Partition之前建立对这个Partition的所有权。为了建立所有权,Consumer需要将id写入到Partition下面。
/consumers/[group_id]/owners/[topic]/[partition_id] --> consumer_node_id (ephemeral node)
Cluster Id
Cluster id是唯一且不可变的,用于表示Cluster。Cluster id最长可以拥有22个字符,由[a-zA-Z0-9_-]+组成。从概念上讲,它在Cluster第一次启动的时候生成。
实现层面上,它在Broker(0.10.1或更新的版本)第一次成功启动后产生。Broker在启动时尝试从/cluster/id节点获取cluster id。如果不存在,Broker穿件一个新的cluster id写入到这个节点中。
Broker node registration
Broker节点基本上是相互独立的,所以他们只是发布他们自己拥有的信息。当一个Broker加入时,它将自己注册到broker node,并写入自己的信息(host name和port)。Broker还将自己的Topic和Partition注册到对应的目录中。新Topic会被动态的注册,当他们在Broker上创建的时候。
Consumer registration algorithm
当启动一个Consumer时,它按照如下步骤操作:
1. 在group下注册自己的consumer id
2. 注册监听器用于监听新Consumer的加入和Consumer的关闭,Consumer变更都会触发Partition的分配(负载均衡)。
3. 注册监听器用于监听Broker的加入和关闭,Broker变更都会触发Partition的分配(负载均衡)。
4. 如果Consumer通过filter创建了一个消息流,同样会注册一个监听器用于监听新topic的加入。
5. 强制自己在消费group内重新平衡。
Consumer rebalancing algorithm
Consumer的负载均衡算法允许一个group内的所有Consumer对哪个Consumer消费哪些Partition达成一个共识。Consumer的负载均衡被Broker和同一个Group中的其他Consumer的添加和移除触发。对于给定的topic和group,partitions被平均的分配个consumers。这个设计是为了简化实现。如果我们允许一个分区同时被多个Consumer消费,那么在这个分区上会有冲突,需要一些锁去保证。如果consumer的数量超过了分区数,部分consumer会拿不到任何数据。在充分配算法中,我们分配分区时尽量使consumer需要和最少的Broker通信。
每个Consumer按照如下步骤进行重分配:
1. 对于Ci(Ci表示Consumer Instance)订阅的Topic T
2. PT表示Topic T的所有分区T
3. CG表示group内所有的Consumer
4. 对PT进行排序 (那么相同Broker上的分区会被集中到一起)
5. 对GC排序
6. i表示Ci在CG中的位置,N = size(PT)/size(CG)
7. 分配 i*N to (i+1)*N - 1 给Consumer Ci
8. 从分区所有者注册表中删除Ci拥有的这些条目
9. 将分区添加到所有者的分区注册表中
(我们可能需要重试直到分区原来的拥有者释放分区所有权)
当分区重分配在一个Consumer触发时,重分配应该在同一时间在相同group内的其他Consumer上也触发。