Replica的状态
Replica有7种状态:
- NewReplica: 在partition reassignment期间KafkaController创建New replica;
- OnlineReplica: 当一个replica变为一个parition的assingned replicas时, 其状态变为OnlineReplica, 即一个有效的OnlineReplica. Online状态的parition才能转变为leader或isr中的一员;
- OfflineReplica: 当一个broker down时, 上面的replica也随之die, 其状态转变为Onffline;
- ReplicaDeletionStarted: 当一个replica的删除操作开始时,其状态转变为ReplicaDeletionStarted;
- ReplicaDeletionSuccessful: Replica成功删除后,其状态转变为ReplicaDeletionSuccessful;
- ReplicaDeletionIneligible: Replica成功失败后,其状态转变为ReplicaDeletionIneligible;
- NonExistentReplica: Replica成功删除后, 从ReplicaDeletionSuccessful状态转变为NonExistentReplica状态.
- 状态转换图:
ReplicaStateMachine
- 所在文件: core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
-
startup
: 启动ReplicaStateMachine
-
initializeReplicaState
: 初始化每个replica的状态, 如果replica所在的broker是live状态,则此replica的状态为OnlineReplica
for((topicPartition, assignedReplicas) <- controllerContext.partitionReplicaAssignment) {
val topic = topicPartition.topic
val partition = topicPartition.partition
assignedReplicas.foreach { replicaId =>
val partitionAndReplica = PartitionAndReplica(topic, partition, replicaId)
controllerContext.liveBrokerIds.contains(replicaId) match {
case true => replicaState.put(partitionAndReplica, OnlineReplica)
case false =>
replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)
}
}
}
- 处理可以转换到
Online
状态的Replica,handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica)
, 并且发送LeaderAndIsrRequest
到各broker nodes:handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica)
case OnlineReplica =>
assertValidPreviousStates(partitionAndReplica,
List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState)
replicaState(partitionAndReplica) match {
case NewReplica =>
// add this replica to the assigned replicas list for its partition
val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
if(!currentAssignedReplicas.contains(replicaId))
controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)
case _ =>
// check if the leader for this partition ever existed
controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
case Some(leaderIsrAndControllerEpoch) =>
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch,
replicaAssignment)
replicaState.put(partitionAndReplica, OnlineReplica)
case None => // that means the partition was never in OnlinePartition state, this means the broker never
}
}
replicaState.put(partitionAndReplica, OnlineReplica)
- 监听broker改变
private def registerBrokerChangeListener() = {
zkUtils.zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener)
}
- 处理borker的改变事件
BrokerChangeListener()
:
针对broker的上下线,分别回调controller.onBrokerStartup
或controller.onBrokerFailure
val curBrokerIds = currentBrokerList.map(_.toInt).toSet
val newBrokerIds = curBrokerIds -- controllerContext.liveOrShuttingDownBrokerIds
val newBrokerInfo = newBrokerIds.map(zkUtils.getBrokerInfo(_))
val newBrokers = newBrokerInfo.filter(_.isDefined).map(_.get)
val deadBrokerIds = controllerContext.liveOrShuttingDownBrokerIds -- curBrokerIds
controllerContext.liveBrokers = curBrokerIds.map(zkUtils.getBrokerInfo(_)).filter(_.isDefined).map(_.get)
newBrokers.foreach(controllerContext.controllerChannelManager.addBroker(_))
deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker(_))
if(newBrokerIds.size > 0)
controller.onBrokerStartup(newBrokerIds.toSeq)
if(deadBrokerIds.size > 0)
controller.onBrokerFailure(deadBrokerIds.toSeq)