生成Assignment
bin/kafka-reassign-partitions.sh --zookeeper zkaddress地址 --topics-to-move-json-file topic.json --broker-list "1,2,3" --generate
topic.json 如
{
"topics": [{
"topic": "主题名称"
}],
"version": 1
}
输出如
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test-topic","partition":0,"replicas":[1417718,1417973]},{"topic":"test-topic","partition":3,"replicas":[1417718,1417974]},{"topic":"test-topic","partition":2,"replicas":[1417974,1417718]},{"topic":"test-topic","partition":5,"replicas":[1417974,1417973]},{"topic":"test-topic","partition":1,"replicas":[1417973,1417974]},{"topic":"test-topic","partition":4,"replicas":[1417973,1417718]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"test-topic","partition":0,"replicas":[1417973,1417718]},{"topic":"test-topic","partition":3,"replicas":[1417973,1417974]},{"topic":"test-topic","partition":2,"replicas":[1417974,1417973]},{"topic":"test-topic","partition":5,"replicas":[1417974,1417718]},{"topic":"test-topic","partition":1,"replicas":[1417718,1417974]},{"topic":"test-topic","partition":4,"replicas":[1417718,1417973]}]}
kafka.admin.AdminUtils#assignReplicasToBrokers
/**
* There are 2 goals of replica assignment:
* 1. Spread the replicas evenly among brokers.
* 2. For partitions assigned to a particular broker, their other replicas are spread over the other brokers.
*
* To achieve this goal, we:
* 1. Assign the first replica of each partition by round-robin, starting from a random position in the broker list.
* 2. Assign the remaining replicas of each partition with an increasing shift.
*
* Here is an example of assigning
* broker-0 broker-1 broker-2 broker-3 broker-4
* p0 p1 p2 p3 p4 (1st replica)
* p5 p6 p7 p8 p9 (1st replica)
* p4 p0 p1 p2 p3 (2nd replica)
* p8 p9 p5 p6 p7 (2nd replica)
* p3 p4 p0 p1 p2 (3nd replica)
* p7 p8 p9 p5 p6 (3nd replica)
*/
执行 reassign
/bin/kafka-reassign-partitions.sh --zookeeper ${zookeeper_path} --reassignment-json-file ${reassignment_node_rar_partition_json} --execute
kafka.controller.KafkaController#onPartitionReassignment
/**
* This callback is invoked by the reassigned partitions listener. When an admin command initiates a partition
* reassignment, it creates the /admin/reassign_partitions path that triggers the zookeeper listener.
* Reassigning replicas for a partition goes through a few steps listed in the code.
* RAR = Reassigned replicas
* OAR = Original list of replicas for partition
* AR = current assigned replicas
*
* 1. Update AR in ZK with OAR + RAR.
* 2. Send LeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR). We do this by forcing an update
* of the leader epoch in zookeeper.
* 3. Start new replicas RAR - OAR by moving replicas in RAR - OAR to NewReplica state.
* 4. Wait until all replicas in RAR are in sync with the leader.
* 5 Move all replicas in RAR to OnlineReplica state.
* 6. Set AR to RAR in memory.
* 7. If the leader is not in RAR, elect a new leader from RAR. If new leader needs to be elected from RAR, a LeaderAndIsr
* will be sent. If not, then leader epoch will be incremented in zookeeper and a LeaderAndIsr request will be sent.
* In any case, the LeaderAndIsr request will have AR = RAR. This will prevent the leader from adding any replica in
* RAR - OAR back in the isr.
* 8. Move all replicas in OAR - RAR to OfflineReplica state. As part of OfflineReplica state change, we shrink the
* isr to remove OAR - RAR in zookeeper and sent a LeaderAndIsr ONLY to the Leader to notify it of the shrunk isr.
* After that, we send a StopReplica (delete = false) to the replicas in OAR - RAR.
* 9. Move all replicas in OAR - RAR to NonExistentReplica state. This will send a StopReplica (delete = false) to
* the replicas in OAR - RAR to physically delete the replicas on disk.
* 10. Update AR in ZK with RAR.
* 11. Update the /admin/reassign_partitions path in ZK to remove this partition.
* 12. After electing leader, the replicas and isr information changes. So resend the update metadata request to every broker.
*
* For example, if OAR = {1, 2, 3} and RAR = {4,5,6}, the values in the assigned replica (AR) and leader/isr path in ZK
* may go through the following transition.
* AR leader/isr
* {1,2,3} 1/{1,2,3} (initial state)
* {1,2,3,4,5,6} 1/{1,2,3} (step 2)
* {1,2,3,4,5,6} 1/{1,2,3,4,5,6} (step 4)
* {1,2,3,4,5,6} 4/{1,2,3,4,5,6} (step 7)
* {1,2,3,4,5,6} 4/{4,5,6} (step 8)
* {4,5,6} 4/{4,5,6} (step 10)
*
* Note that we have to update AR in ZK with RAR last since it's the only place where we store OAR persistently.
* This way, if the controller crashes before that step, we can still recover.
*/