1、模块组成
KafkaController是kafka集群的控制管理模块,且一个集群只能有一个leader。其主要通过向ZK注册各种监听事件来管理整个集群节点、分区的leader的选举、再平衡等问题。
KafkaController主要组成:
说明:
- ControllerEventManager:Controller事件管理器。KafkaController监听zk节点变化事件,会将事件放入ControllerEventManager的事件队列,而ControllerEventManager内部的事件处理线程会从队列中获取事件,并调用KafkaController的process()处理事件;
- PartitionStateMachine:分区状态机。定义及管理分区的状态;
- ReplicaStateMachine:副本状态机。定义及管理副本的状态;
- TopicDeletionManager:TopicDeletionManager 负责对管理员指定的 topic 执行删除操作,它定义了 DeleteTopicsThread 线程,采用异步的方式删除待删除的 topic 集合;
- Handler:事件处理器。KafkaController在初始化的时候,会监听zk节点的事件,而不同节点会绑定不同的事件处理器;
2、初始化流程
2.1、KafkaServer#startup()方法
startup()方法会在KafkaServer中进行调用。
def startup() = {
//注册zk状态变更处理器,当zk重新初始化时会调用
zkClient.registerStateChangeHandler(new StateChangeHandler {
override val name: String = StateChangeHandlers.ControllerHandler
//zk初始化之后调用
override def afterInitializingSession(): Unit = {
//设置RegisterBrokerAndReelect事件,重新选举Controller的leader
eventManager.put(RegisterBrokerAndReelect)
}
//zk初始化之前调用
override def beforeInitializingSession(): Unit = {
val queuedEvent = eventManager.clearAndPut(Expire)
// Block initialization of the new session until the expiration event is being handled,
// which ensures that all pending events have been processed before creating the new session
//等待controller中的事件处理完
queuedEvent.awaitProcessing()
}
})
//设置Startup事件,eventManager会调用KafkaController#process()处理Startup事件,
//Startup事件中会进行controller的选举等初始化处理
eventManager.put(Startup)
eventManager.start()
}
2.2、KafkaServer#processStartup()方法
processStartup()主要处理注册【/controller】节点监听、选举及初始化等工作
private def processStartup(): Unit = {
//注册【/controller】节点变更处理器并检查节点是否存在
zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
//选举处理
elect()
}
ControllerChangeHandler为【/controller】节点变更处理器,当节点被创建/删除/修改时,会生成对应的事件。
class ControllerChangeHandler(eventManager: ControllerEventManager) extends ZNodeChangeHandler {
override val path: String = ControllerZNode.path
//当节点被创建,表明其他某个broker被选举为leader,需处理ControllerChange事件
override def handleCreation(): Unit = eventManager.put(ControllerChange)
//当节点被删除,表明当前无leader节点,处理Reelect事件,进行重新选举
override def handleDeletion(): Unit = eventManager.put(Reelect)
//当节点数据变更,表明leader可能变更,需处理ControllerChange事件
override def handleDataChange(): Unit = eventManager.put(ControllerChange)
}
2.3、KafkaServer#elect()方法
elect()主要进行选举及Controller初始化处理。
private def elect(): Unit = {
activeControllerId = zkClient.getControllerId.getOrElse(-1)
/*
* We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition,
* it's possible that the controller has already been elected when we get here. This check will prevent the following
* createEphemeralPath method from getting into an infinite loop if this broker is already the controller.
*/
//当activeControllerId 不为-1,表明已经有其他Controller成为leader,无需进行后续处理
if (activeControllerId != -1) {
debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.")
return
}
try {
//注册【/controller】及【/controller_epoch】节点,即利用zk节点唯一特性选举出leader
val (epoch, epochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId)
controllerContext.epoch = epoch
controllerContext.epochZkVersion = epochZkVersion
activeControllerId = config.brokerId
info(s"${config.brokerId} successfully elected as the controller. Epoch incremented to ${controllerContext.epoch} " +
s"and epoch zk version is now ${controllerContext.epochZkVersion}")
//若当前节点成为leader,则进行Controller的相关初始化处理
onControllerFailover()
} catch {
//若controller已存在,则调用maybeResign(),其内进行Controller的一些清理操作
case e: ControllerMovedException =>
maybeResign()
if (activeControllerId != -1)
debug(s"Broker $activeControllerId was elected as controller instead of broker ${config.brokerId}", e)
else
warn("A controller has been elected but just resigned, this will result in another round of election", e)
//异常,则进行Controller主动变更处理,即清除上下文信息及删除【/controller】节点,触发重新选举
case t: Throwable =>
error(s"Error while electing or becoming controller on broker ${config.brokerId}. " +
s"Trigger controller movement immediately", t)
triggerControllerMove()
}
}
2.4、KafkaServer#onControllerFailover()方法
onControllerFailover()为Controller初始化的核心方法,其主要进行节点事件注册、删除leader不用的监听器、初始化上下文、topic删除管理器初始化、状态机初始、分区初始化处理等;
private def onControllerFailover(): Unit = {
info("Registering handlers")
// before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
//子节点变更处理器注册
val childChangeHandlers = Seq(brokerChangeHandler, topicChangeHandler, topicDeletionHandler, logDirEventNotificationHandler,
isrChangeNotificationHandler)
childChangeHandlers.foreach(zkClient.registerZNodeChildChangeHandler)
//节点变更处理器注册
val nodeChangeHandlers = Seq(preferredReplicaElectionHandler, partitionReassignmentHandler)
nodeChangeHandlers.foreach(zkClient.registerZNodeChangeHandlerAndCheckExistence)
info("Deleting log dir event notifications")
zkClient.deleteLogDirEventNotifications(controllerContext.epochZkVersion)
info("Deleting isr change notifications")
zkClient.deleteIsrChangeNotifications(controllerContext.epochZkVersion)
//上下文初始化
info("Initializing controller context")
initializeControllerContext()
//获取待删除的topic,并初始化TopicDeletionManager
info("Fetching topic deletions in progress")
val (topicsToBeDeleted, topicsIneligibleForDeletion) = fetchTopicDeletionsInProgress()
info("Initializing topic deletion manager")
topicDeletionManager.init(topicsToBeDeleted, topicsIneligibleForDeletion)
// We need to send UpdateMetadataRequest after the controller context is initialized and before the state machines
// are started. The is because brokers need to receive the list of live brokers from UpdateMetadataRequest before
// they can process the LeaderAndIsrRequests that are generated by replicaStateMachine.startup() and
// partitionStateMachine.startup().
//与其他server同步metadata信息
info("Sending update metadata request")
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty)
//副本状态机及分区状态机启动
replicaStateMachine.startup()
partitionStateMachine.startup()
info(s"Ready to serve as the new controller with epoch $epoch")
//处理分区重分配
initializePartitionReassignments()
topicDeletionManager.tryTopicDeletion()
val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections()
//针对各个分区副本,选举出主分区
onReplicaElection(pendingPreferredReplicaElections, ElectionType.PREFERRED, ZkTriggered)
info("Starting the controller scheduler")
kafkaScheduler.startup()
//自动重分配任务
if (config.autoLeaderRebalanceEnable) {
scheduleAutoLeaderRebalanceTask(delay = 5, unit = TimeUnit.SECONDS)
}
if (config.tokenAuthEnabled) {
info("starting the token expiry check scheduler")
tokenCleanScheduler.startup()
tokenCleanScheduler.schedule(name = "delete-expired-tokens",
fun = () => tokenManager.expireTokens,
period = config.delegationTokenExpiryCheckIntervalMs,
unit = TimeUnit.MILLISECONDS)
}
}
2.5、KafkaServer#onControllerResignation()方法
当broker从leader变更为follower时,调用onControllerResignation()进行一些上下文的清理及初始化工作。
private def onControllerResignation(): Unit = {
debug("Resigning")
// de-register listeners
//注销一些监听器
zkClient.unregisterZNodeChildChangeHandler(isrChangeNotificationHandler.path)
zkClient.unregisterZNodeChangeHandler(partitionReassignmentHandler.path)
zkClient.unregisterZNodeChangeHandler(preferredReplicaElectionHandler.path)
zkClient.unregisterZNodeChildChangeHandler(logDirEventNotificationHandler.path)
unregisterBrokerModificationsHandler(brokerModificationsHandlers.keySet)
// shutdown leader rebalance scheduler
//停止leader的重平衡调度器及一些全局计数器
kafkaScheduler.shutdown()
offlinePartitionCount = 0
preferredReplicaImbalanceCount = 0
globalTopicCount = 0
globalPartitionCount = 0
topicsToDeleteCount = 0
replicasToDeleteCount = 0
ineligibleTopicsToDeleteCount = 0
ineligibleReplicasToDeleteCount = 0
// stop token expiry check scheduler
if (tokenCleanScheduler.isStarted)
tokenCleanScheduler.shutdown()
// de-register partition ISR listener for on-going partition reassignment task
unregisterPartitionReassignmentIsrChangeHandlers()
// shutdown partition state machine
//关闭分区状态机
partitionStateMachine.shutdown()
//删除一些节点监听器
zkClient.unregisterZNodeChildChangeHandler(topicChangeHandler.path)
unregisterPartitionModificationsHandlers(partitionModificationsHandlers.keys.toSeq)
zkClient.unregisterZNodeChildChangeHandler(topicDeletionHandler.path)
// shutdown replica state machine
//关闭副本状态机
replicaStateMachine.shutdown()
zkClient.unregisterZNodeChildChangeHandler(brokerChangeHandler.path)
//关闭controller连接
controllerChannelManager.shutdown()
//恢复上下文信息
controllerContext.resetContext()
info("Resigned")
}
3、节点监听器及事件处理
节点 | 监听类型 | 监听器 | 事件类型 | event |
---|---|---|---|---|
/controller | 节点变更 | ControllerChangeHandler | Create | ControllerChange |
Delete | Reelect | DataChange | ControllerChange | |
/brokers/ids | 子节点变更 | BrokerChangeHandler | ChildChange | BrokerChange |
/brokers/ids/[brokerId] | 节点变更 | BrokerModificationsHandler | DataChange | BrokerModifications |
/brokers/topics | 子节点变更 | TopicChangeHandler | ChildChange | TopicChange |
/admin/delete_topics | 子节点变更 | TopicDeletionHandler | ChildChange | |
TopicDeletion | ||||
/brokers/topics/[topic] | 节点变更 | PartitionModificationsHandler | DataChange | PartitionModifications |
/admin/reassign_partitions | 节点变更 | PartitionReassignmentHandler | Create | ZkPartitionReassignment |
/admin/preferred_replica_election | 节点变更 | PreferredReplicaElectionHandler | Create | ReplicaLeaderElection |
/isr_change_notification | 子节点变更 | IsrChangeNotificationHandler | ChildChange | IsrChangeNotification |
/log_dir_event_notification | 子节点变更 | LogDirEventNotificationHandler | ChildChange | LogDirEventNotification |
4、副本及分区状态机
4.1、Replica状态机
副本状态转换图:
ReplicaState状态说明:
- NewReplica:当创建了topic或者重分配分区时Controller会创建新的副本,就处在这个状态,此状态中的副本只能接收“成为follower”的状态变更请求,可由NonExistentReplica转换而来
- OnlineReplica:一旦启动了一个副本以及该分区AR副本集合中的一部分,那么就将设置该副本状态为OnlineReplica。在此状态中的副本可以接收”成为leader”或”成为follower”的状态变更请求。可由NewRelica、OnlineReplica或OfflineReplica状态转换而来
- OfflineReplica:如果一个副本挂掉(保存该副本的broker宕机)将被置于OfflineReplica状态,可由NewReplica或OnlineReplica状态转换而来
- ReplicaDeletionStarted:开启副本删除操作时会将副本状态置于ReplicaDeletionStarted状态,可由OfflineReplica状态转换而来
- ReplicaDeletionSuccessful:如果副本删除请求成功,返回的响应没有错误的话,该副本会被置于ReplicaDeletionSuccessful状态,可由ReplicaDeletionStarted状态转换而来
- ReplicaDeletionIneligible:如果副本删除失败,将被置于ReplicaDeletionIneligible状态,可由ReplicaDeletionStarted状态转换而来
- NonExistentReplica:如果副本被成功删除将被置于NonExistentReplica状态,可由ReplicaDeletionSuccessful状态转换而来
副本状态流转:
override def handleStateChanges(replicas: Seq[PartitionAndReplica], targetState: ReplicaState): Unit = {
if (replicas.nonEmpty) {
try {
controllerBrokerRequestBatch.newBatch()
replicas.groupBy(_.replica).foreach { case (replicaId, replicas) =>
doHandleStateChanges(replicaId, replicas, targetState)
}
//follower节点状态同步
controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
} catch {
case e: ControllerMovedException =>
error(s"Controller moved to another broker when moving some replicas to $targetState state", e)
throw e
case e: Throwable => error(s"Error while moving some replicas to $targetState state", e)
}
}
}
private def doHandleStateChanges(replicaId: Int, replicas: Seq[PartitionAndReplica], targetState: ReplicaState): Unit = {
replicas.foreach(replica => controllerContext.putReplicaStateIfNotExists(replica, NonExistentReplica))
//检查前置状态
val (validReplicas, invalidReplicas) = controllerContext.checkValidReplicaStateChange(replicas, targetState)
invalidReplicas.foreach(replica => logInvalidTransition(replica, targetState))
targetState match {
case NewReplica =>
validReplicas.foreach { replica =>
val partition = replica.topicPartition
val currentState = controllerContext.replicaState(replica)
controllerContext.partitionLeadershipInfo.get(partition) match {
case Some(leaderIsrAndControllerEpoch) =>
//当前节点为leader节点?
if (leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId) {
val exception = new StateChangeFailedException(s"Replica $replicaId for partition $partition cannot be moved to NewReplica state as it is being requested to become leader")
logFailedStateChange(replica, currentState, OfflineReplica, exception)
} else {
//封装发送给这些replica所在的broker的LeaderAndIsrRequest请求
controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(replicaId),
replica.topicPartition,
leaderIsrAndControllerEpoch,
controllerContext.partitionFullReplicaAssignment(replica.topicPartition),
isNew = true)
logSuccessfulTransition(replicaId, partition, currentState, NewReplica)
controllerContext.putReplicaState(replica, NewReplica)
}
case None =>
logSuccessfulTransition(replicaId, partition, currentState, NewReplica)
controllerContext.putReplicaState(replica, NewReplica)
}
}
case OnlineReplica =>
validReplicas.foreach { replica =>
val partition = replica.topicPartition
val currentState = controllerContext.replicaState(replica)
currentState match {
case NewReplica =>
val assignment = controllerContext.partitionFullReplicaAssignment(partition)
if (!assignment.replicas.contains(replicaId)) {
error(s"Adding replica ($replicaId) that is not part of the assignment $assignment")
val newAssignment = assignment.copy(replicas = assignment.replicas :+ replicaId)
controllerContext.updatePartitionFullReplicaAssignment(partition, newAssignment)
}
case _ =>
//可能之前已经存在,则向其发送leader和isr的request
controllerContext.partitionLeadershipInfo.get(partition) match {
case Some(leaderIsrAndControllerEpoch) =>
controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(replicaId),
replica.topicPartition,
leaderIsrAndControllerEpoch,
controllerContext.partitionFullReplicaAssignment(partition), isNew = false)
case None =>
}
}
logSuccessfulTransition(replicaId, partition, currentState, OnlineReplica)
controllerContext.putReplicaState(replica, OnlineReplica)
}
case OfflineReplica =>
//封装发送给这些replica所在的broker的StopReplicaRequest请求,交由ControllerBrokerRequestBatch处理
validReplicas.foreach { replica =>
controllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(replicaId), replica.topicPartition, deletePartition = false)
}
val (replicasWithLeadershipInfo, replicasWithoutLeadershipInfo) = validReplicas.partition { replica =>
controllerContext.partitionLeadershipInfo.contains(replica.topicPartition)
}
//删除该replica
val updatedLeaderIsrAndControllerEpochs = removeReplicasFromIsr(replicaId, replicasWithLeadershipInfo.map(_.topicPartition))
updatedLeaderIsrAndControllerEpochs.foreach { case (partition, leaderIsrAndControllerEpoch) =>
//此topic的partition的replicas发生了shrink(缩减),需要通知其它的replica
if (!controllerContext.isTopicQueuedUpForDeletion(partition.topic)) {
val recipients = controllerContext.partitionReplicaAssignment(partition).filterNot(_ == replicaId)
controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipients,
partition,
leaderIsrAndControllerEpoch,
controllerContext.partitionFullReplicaAssignment(partition), isNew = false)
}
val replica = PartitionAndReplica(partition, replicaId)
val currentState = controllerContext.replicaState(replica)
logSuccessfulTransition(replicaId, partition, currentState, OfflineReplica)
controllerContext.putReplicaState(replica, OfflineReplica)
}
replicasWithoutLeadershipInfo.foreach { replica =>
val currentState = controllerContext.replicaState(replica)
logSuccessfulTransition(replicaId, replica.topicPartition, currentState, OfflineReplica)
controllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(replica.topicPartition))
controllerContext.putReplicaState(replica, OfflineReplica)
}
case ReplicaDeletionStarted =>
validReplicas.foreach { replica =>
val currentState = controllerContext.replicaState(replica)
logSuccessfulTransition(replicaId, replica.topicPartition, currentState, ReplicaDeletionStarted)
controllerContext.putReplicaState(replica, ReplicaDeletionStarted)
controllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(replicaId), replica.topicPartition, deletePartition = true)
}
case ReplicaDeletionIneligible =>
validReplicas.foreach { replica =>
val currentState = controllerContext.replicaState(replica)
logSuccessfulTransition(replicaId, replica.topicPartition, currentState, ReplicaDeletionIneligible)
controllerContext.putReplicaState(replica, ReplicaDeletionIneligible)
}
case ReplicaDeletionSuccessful =>
validReplicas.foreach { replica =>
val currentState = controllerContext.replicaState(replica)
logSuccessfulTransition(replicaId, replica.topicPartition, currentState, ReplicaDeletionSuccessful)
controllerContext.putReplicaState(replica, ReplicaDeletionSuccessful)
}
case NonExistentReplica =>
validReplicas.foreach { replica =>
val currentState = controllerContext.replicaState(replica)
val newAssignedReplicas = controllerContext
.partitionFullReplicaAssignment(replica.topicPartition)
.removeReplica(replica.replica)
controllerContext.updatePartitionFullReplicaAssignment(replica.topicPartition, newAssignedReplicas)
logSuccessfulTransition(replicaId, replica.topicPartition, currentState, NonExistentReplica)
controllerContext.removeReplicaState(replica)
}
}
}
4.2、Partition状态机
分区状态转换图:
Partition状态说明:
- NonExistentPartition:该分区要么没有被创建过或曾经被创建过但后面删除了
- NewPartition:分区创建之后已经分配了副本,但是还没有选举出Leader和ISR
- OnlinePartition:分区Leader一旦被选举出来,就处在该状态
- OfflinePartition:如果leader选举出来后,leader broker宕机了,那么该分区就处于OfflinePartition状态
分区状态转换流程:
override def handleStateChanges(
partitions: Seq[TopicPartition],
targetState: PartitionState,
partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]
): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
if (partitions.nonEmpty) {
try {
controllerBrokerRequestBatch.newBatch()
val result = doHandleStateChanges(
partitions,
targetState,
partitionLeaderElectionStrategyOpt
)
controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
result
} catch {
case e: ControllerMovedException =>
error(s"Controller moved to another broker when moving some partitions to $targetState state", e)
throw e
case e: Throwable =>
error(s"Error while moving some partitions to $targetState state", e)
partitions.iterator.map(_ -> Left(e)).toMap
}
} else {
Map.empty
}
}
5、主要处理流程
process包含了Controller的所有集群管理事件。
override def process(event: ControllerEvent): Unit = {
try {
event match {
case event: MockEvent =>
// Used only in test cases
event.process()
case ShutdownEventThread =>
error("Received a ShutdownEventThread event. This type of event is supposed to be handle by ControllerEventThread")
case AutoPreferredReplicaLeaderElection =>
processAutoPreferredReplicaLeaderElection()
case ReplicaLeaderElection(partitions, electionType, electionTrigger, callback) =>
processReplicaLeaderElection(partitions, electionType, electionTrigger, callback)
case UncleanLeaderElectionEnable =>
processUncleanLeaderElectionEnable()
case TopicUncleanLeaderElectionEnable(topic) =>
processTopicUncleanLeaderElectionEnable(topic)
case ControlledShutdown(id, brokerEpoch, callback) =>
processControlledShutdown(id, brokerEpoch, callback)
case LeaderAndIsrResponseReceived(response, brokerId) =>
processLeaderAndIsrResponseReceived(response, brokerId)
case UpdateMetadataResponseReceived(response, brokerId) =>
processUpdateMetadataResponseReceived(response, brokerId)
case TopicDeletionStopReplicaResponseReceived(replicaId, requestError, partitionErrors) =>
processTopicDeletionStopReplicaResponseReceived(replicaId, requestError, partitionErrors)
case BrokerChange =>
processBrokerChange()
case BrokerModifications(brokerId) =>
processBrokerModification(brokerId)
case ControllerChange =>
processControllerChange()
case Reelect =>
processReelect()
case RegisterBrokerAndReelect =>
processRegisterBrokerAndReelect()
case Expire =>
processExpire()
case TopicChange =>
processTopicChange()
case LogDirEventNotification =>
processLogDirEventNotification()
case PartitionModifications(topic) =>
processPartitionModifications(topic)
case TopicDeletion =>
processTopicDeletion()
case ApiPartitionReassignment(reassignments, callback) =>
processApiPartitionReassignment(reassignments, callback)
case ZkPartitionReassignment =>
processZkPartitionReassignment()
case ListPartitionReassignments(partitions, callback) =>
processListPartitionReassignments(partitions, callback)
case PartitionReassignmentIsrChange(partition) =>
processPartitionReassignmentIsrChange(partition)
case IsrChangeNotification =>
processIsrChangeNotification()
case Startup =>
processStartup()
}
} catch {
case e: ControllerMovedException =>
info(s"Controller moved to another broker when processing $event.", e)
maybeResign()
case e: Throwable =>
error(s"Error processing event $event", e)
} finally {
updateMetrics()
}
}