前面我们已经分析了KafkaController中使用的一系列组件, 从本章开始,我们开始介绍KafkaController的各个功能:
KafkaController启动流程
- 注册zk的SessionExpiration事件通知:
registerSessionExpirationListener
, 当session到期且新session建立后,进行controller的重新选主;
def handleNewSession() {
info("ZK expired; shut down all controller components and try to re-elect")
inLock(controllerContext.controllerLock) {
onControllerResignation()
controllerElector.elect
}
}
- 启动 ZookeeperLeaderElector:
controllerElector.startup
. 如果当前broker成功选为Controller, 则onControllerFailover
回调被触发.
readControllerEpochFromZookeeper()
incrementControllerEpoch(zkUtils.zkClient)
registerReassignedPartitionsListener()
registerIsrChangeNotificationListener()
registerPreferredReplicaElectionListener()
partitionStateMachine.registerListeners()
replicaStateMachine.registerListeners()
initializeControllerContext()
replicaStateMachine.startup()
partitionStateMachine.startup()
brokerState.newState(RunningAsController)
maybeTriggerPartitionReassignment()
maybeTriggerPreferredReplicaElection()
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
if (config.autoLeaderRebalanceEnable) {
autoRebalanceScheduler.startup()
autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS)
}
deleteTopicManager.start()
- 更新zk上的controller epoch信息;
- 注册zk上的broker/topic节点变化事件通知;
- 初始化ControllerContext, 主要是从zk上获取broker, topic, parition, isr, partition leader, replicas等信息;
- 启动ReplicaStateMachine;
- 启动PartitionStateMachine;
- 发送所有的partition信息(leader, isr, replica, epoch等)到所有的 live brokers;
- 如果允许自动leader rebalance的话, 则启动AutoRebalanceScheduler;
- 启动TopicDeletionManager;