Kafka基本操作
1. 查看Topic
--list命令(列举所有主题):
kafka-topics.sh –list –zookeeper server-1:2181,server-2:2181
--describe命令(不指定topic参数则查看所有主题信息,若指定topic参数则查看特定主题的信息):
kafka-topics.sh –describe –zookeeper server-1:2181,server-2:2181
#查看指定主题已覆盖的配置:
Kafka-topics.sh –describe –zookeeper server-1:2181,server-2:2181,server-3:2181–topics-with-overrides–topic config-test
2.创建Topic
命令:
kafka-topics.sh –create–zookeeper server-1:2181,server-2:2181,server-3:2181 --replication-factor 2 –partitions3 –topic kafka-action
此时会在${log.dir}目录下创建相应的分区文件目录,副本分别分布在不同的节点上。
同时登陆ZooKeeper客户端查看所创建的主题元数据信息,登陆ZK之后,“kafka-action”元数据信息如下:
Ls /brokers/topics/kafka-action/partitions
[0,1,2]
ls /brokers/topics/kafka-action
{“version”:1,”partitions”:{“2”:[3,1], “1”:[2,3],”0”:[1,2]}}
注意:
1)创建topic时,主要要指定副本数、分区数等,如果不指定副本和分区数,默认分别创建${num.partitions}个分区数和${default.replcation.factor}个副本。
2)当生产者向一个还未创建的topic发送消息时,会自动创建该副本,但是前提是:auto.create.topics.enable=true
3. 修改Topic
当创建一个主题之后,可以通过alter命令对主题进行修改,包括修改主题级别的配置、增加主题分区、修改副本分配方案、修改主题Offset等。
命令:
kafka-topics.sh --alter –zookeeper server-1:2181,server-2:2181,server-3:2181–topic config-test –config max.message.bytes=204800
修改max.message.bytes参数值,某些版本使用kafka-topics.sh脚本修改参数可以成功,但是可能会提示该脚本已过时,在后续版本可能会被移除,推荐使用kafka-configs.sh脚本修改参数。
kafka-topics.sh --alter –zookeeper server-1:2181,server-2:2181,server-3:2181–topic config-test –configsegment.bytes=209715200
修改segment.bytes参数后,可通过topics-with-overrides查看主题config-test已覆盖的配置信息。
删除参数segment.bytes配置:
kafka-topics.sh --alter –zookeeper server-1:2181,server-2:2181,server-3:2181–topic config-test –delete-configsegment.bytes
再次通过topics-with-overrides查看主题config-test已覆盖的配置信息,此时已不包含segment.bytes。
增加分区:
Kafka并不支持减少分区的操作,只能为一个主题增加分区。
假设当前分区为3个,要增加至5个:
kafka-topics.sh --alter –zookeeper server-1:2181,server-2:2181,server-3:2181–topic config-test –partitions 5
4.删除Topic
删除Kafka主题,一般有以下两种方式:
[if !supportLists](1) [endif]手动删除各节点${log.dir}目录下该主题分区文件夹,同时登录ZK客户端删除待删除主题对应的节点,主题元数据保存在/brokers/topics和/config/topics目录下。
[if !supportLists](2) [endif]执行kafka-topics.sh脚本进行删除,若希望通过该脚本彻底删除主题,则需要保证在启动Kafka时所加载的server.properties文件中配置delete.topic.enable=true,该配置默认为false。否则执行该脚本并未真正删除主题,而是在ZK的/admin/delete_topics目录下创建一个与待删除主题同名的节点,将该主题标记为删除状态。
5.启动生产者发送消息
启动一个向主题kafka-action发送消息的生产者,同时指定每条消息包含有Key:
Kafka-console-producer.sh –broker-listserver-1:9092,server-2:9092,server-3:9092 –topic kafka-action –property parse.key=true
该命令执行后,控制台等待客户端输入消息。由于没有指定消息Key与消息净荷(payload)之间的分隔符,默认是以制表符分隔。可以通过配置项key.separator指定。
Kafka-console-producer.sh –broker-list server-1:9092,server-2:9092,server-3:9092–topic kafka-action –property parse.key=true –property key.separator=’ ’
在控制台分别输入一批消息,消息key与消息实际数据之间以空格分隔。然后执行以下命令,验证消息是否发送成功。
Kafka-run-class.shkafka.tools.GetOffsetShell –broker-list server-1:9092,server-2:9092,server-3:9092–topic kafka-action –time -1
以上命令用于查看某个主题各分区对应消息偏移量。可以通过partitions参数指定一个或多个分区,多个分区之间以逗号分隔,若不指定则默认查看该主题所有分区;time参数表示查看在指定时间之前的数据,支持-1(latest)、-2(earliest)两个时间选项,默认取值为-1.
执行以上命令输出结果如下,共3列,分别表示主题名、分区编号、消息偏移量:
Kafka-action:2:6
Kafka-action:1:6
Kafka-action:0:4
通过结果信息可知,总共产生了16条消息,3个分区按编号从大到小依次有6条、6条、4条消息。
6.查看消息
Kafka生产的消息以二进制的形式在文件中,为了便于查看消息内容,Kafka提供了一个查看日志文件的工具类kafka.tools.DumpLogSegments.命令如下:
Kafka-run-class.shkafka.tools.DumpLogSegments –files ./././00000000000000.log
7.启动消费者接受消息