(九)kafka常见操作
A. 基础操作
-
创建主题
- 命令:
./kafka-topics.sh --create --zookeeper xxx.xxx.xxx.xxx:2181 --replication-factor 2 --partitions 3 --topic test
- 注意事项:新版的kafka的kafka-topics脚本也用--bootstrap-server替代了--zookeeper,直接代入kafka的ip和端口(比如localhost:9092)。
- 命令:
-
查看主题
- 命令:
./kafka-topics.sh --describe --topic test --zookeeper xxx.xxx.xxx.xxx:2181
- 命令:
-
模拟生产者
- 命令:
./kafka-console-producer.sh --broker-list xxx.xxx.xxx.xxxx:9092 --topic test
- 命令:
-
模拟消费者
- 命令:
./kafka-console-consumer.sh --bootstrap-server xxx.xxx.xxx.xxxx:9092 --topic test --from-beginning
- 注意事项:该脚本下,只支持新的--bootstrap-server字段。
- 命令:
B. 集群扩容节点
-
背景概述
- kafka cluster若需要增加节点,必须将原有topic进行重新分区分配(主要是原有分区的数据迁移),不然会导致broker端负载不均衡(具体内容可参考第六章)。
- 原集群共有3台brokers(0,1,2),现增加2台brokers(3 & 4)。原有topics两个,分别为push_event & third_event,replicas设置为2。
-
操作步骤
-
预准备工作
- 需要重新分区分配的topic,将其写入对应的json文本,move.json的具体内容为
{ "topics":[ { "topic": "push_event" }, { "topic": "third_event" } ] }
-
执行算法脚本获取分配方案
- 使用kafka的kafka-reassign-partitions.sh脚本,将第一步的move.json作为参数获取新的分区分配方案。
- 具体命令为
./bin/kafka-reassign-partitions.sh --zookeeper your_zk_address:2181 --topics-to-move-json-file move.json --broker-list "0,1,2,3,4" --generate
,其中broker list中的内容就是具体的broker id。
-
执行分配方案
执行完第二步后会返回分配结果,由两部分组成current方案 & proposed方案。
-
current方案作为备份为回滚做准备,存储为backup.json(存储时,第一行省略)。
Current partition replica assignment { "version":1, "partitions":[ {"topic":"open_push_event","partition":2,"replicas":[0,1]}, {"topic":"open_push_event","partition":4,"replicas":[2,1]}, {"topic":"open_push_event","partition":3,"replicas":[1,0]}, {"topic":"open_push_event","partition":0,"replicas":[1,2]}, {"topic":"open_push_event","partition":1,"replicas":[2,0]} ] }
-
proposed方案则是本次更新分区的方案,存储为update.json(存储时,第一行省略)。
Proposed partition reassignment configuration { "version":1, "partitions":[ {"topic":"think_tank","partition":2,"replicas":[2,0]}, {"topic":"think_tank","partition":4,"replicas":[4,2]}, {"topic":"think_tank","partition":3,"replicas":[3,1]}, {"topic":"think_tank","partition":0,"replicas":[0,3]}, {"topic":"think_tank","partition":1,"replicas":[1,4]} ] }
使用kafka-reassign-partitions.sh执行分区分配,传入update.json作为具体参数,命令为
./bin/kafka-reassign-partitions.sh --zookeeper your_zk_address:2181 --reassignment-json-file update.json --execute
,若数据较大则该动作会持续较久。
-