kafka 常用运维命令介绍(一)

一、连接zk

由于kafka的各种元数据都存储在zk,要连接kafka集群也要通过zk获取各个broker的ip端口然后连接broker。因此,大多数kafka自带的运维命令都要指定zk的地址,比如用kafka-topics列出所有topics:

kafka-topics --zookeeper localhost:2181/kafka --list

--zookeeper参数是必须指定的。另外,如果kafka集群启动的时候在配置文件中指定了namespace,记得要在zk的地址后面也要加上kafka所属的namespace。否则kafka就找不到kafka集群的相关元数据了。

由于kafka的元数据都存储在zk,因此掌握好如何查看zk的数据也是运维kafka集群的一个关键。

zkCli 命令

zookeeper安装包一般都会提供zkCli命令来让用户连接zookeeper集群。

./zkCli.sh -timeout 5000 -r -server ip:port

之后进入zk的交互界面,就可以输出相关命令查看zk的数据了。

# 查看kafka集群下所有的brokers id列表
ls /brokers/ids
# 查看 1003 broker的信息
get /brokers/ids/1003

其他zk交互界面的命令这里不多做介绍,zkCli的帮助文档已经写的很清楚了。

二、topic 相关

kafka-topics可以进行和topics相关的一些操作。下面介绍一下如何运用该命令来操作kafka topics。

该命令最终是调用kafka源码中的TopicCommand类来实现的。

列出所有的topic & 获取命令帮助

# 列出帮助文档,英文好的同学基本看帮助文档就可以指定大概怎么使用该命令了
kafka-topics --help
# 列出kafka集群下的所有topics,这里需要指定kafka机器元数据存储所在的zk机器地址,记得如果有namespace,要也加上,否则将连不上kafka集群
 kafka-topics --zookeeper localhost:2181/kafka --list

创建topic

# 创建一个topic为test的topic,并指定分区数为5,副本数为1。这里的副本数不能超过broker的数量,否则会报错
kafka-topics --topic test --zookeeper localhost:2181/kafka --create --replication-factor 1 --partitions 5
# 创建时指定副本在哪个broker上,多个partition之间用逗号分隔,副本之间用":"分割,第一个副本默认是leader
kafka-topics.sh --zookeeper 172.19.0.5:2181 --topic lyt2 --create --replica-assignment 1001:1002,1001:1002,1001:1002

列出所有topic的详情

通过 --describe 参数可以列出我们指定的topics详情,包括 partitions、leader、replicas、isr等。

kafka-topics --zookeeper localhost:2181/kafka --describe test test_yangjb
# 输出
Topic:test  PartitionCount:5    ReplicationFactor:3 Configs:
    Topic: test Partition: 0    Leader: 1001    Replicas: 1001,1002,1003    Isr: 1002,1001,1003
    Topic: test Partition: 1    Leader: 1002    Replicas: 1002,1003,1001    Isr: 1002,1003,1001
    Topic: test Partition: 2    Leader: 1003    Replicas: 1003,1001,1002    Isr: 1002,1001,1003
    Topic: test Partition: 3    Leader: 1001    Replicas: 1001,1003,1002    Isr: 1002,1001,1003
    Topic: test Partition: 4    Leader: 1002    Replicas: 1002,1001,1003    Isr: 1002,1001,1003

下面是一些 使用—describe时可以使用的其他参数

# 只列出修改了默认配置的那些topic。并可以查看修改了哪些topic配置
--topics-with-overrides
# 列出那些目前没有leader的topic
--under-replicated-partitions
# 列出那些正在同步的topic或者同步出现异常的topic
--under-replicated-partitions

删除topic

注意,kafka删除topic是异步的,因此并不是命令返回了topic就已经被成功删除。而是等待后台的删除任务执行成功才真正删除该topic。

kafka-topics --zookeeper localhost:2181/kafka --delete --topic yangjb_test

修改topic相关信息

通过 --alter 参数可以修改topic的信息,能修改的信息包括 partition数量、replica分配情况、topic配置。如果要修改 partition数量时,修改的后的数量一定要比当前的数量大,否则会报错。

# 将partition数量修改成7个
kafka-topics --zookeeper localhost:2181/kafka --topic test --alter --partitions 7
# 通过 --replica-assignment 参数指定新增partition的副本分布情况
# 如果原先的partition数量是3,那么新增的一个分区的副本分布应该在1002和1003
kafka-topics --zookeeper localhost:2181/kafka --topic test -alter --partitions 4 --replica-assignment 1001:1002,1001:1002,1001:1002,1002:1003
# 修改topic test的配置 flush.ms =30000 。
kafka-topics --zookeeper localhost:2181/kafka  --topic test --alter --config flush.ms=30000
# 删除topic test的 flush.ms 配置
kafka-topics --zookeeper localhost:2181/kafka  --topic test --alter --delete-config flush.ms

注意,在后续的kafka版本中,关于topic的配置的修改删除可能会被移到kafka-configs.sh中。官方建议使用kafka-configs来修改topic的配置。

三、分区副本重分配

在数据量大的情况下,各个broker上的数据量经常会不一致,有的broker上数据非常大,有的则很小,为了让数据更均匀的分布在各个broker,我们就要学会对topic的partion进行分区副本重分配。

首先建立一个json文件,用来描述如何分配分区副本。

assign.json

{
  "partitions": [
    {
      "topic": "test",
      "partition": 1,
      "replicas": [
        1002,
        1003
      ]
    },
    {
      "topic": "test",
      "partition": 2,
      "replicas": [
        1003,
        1002
      ]
    }
  ],
  "version": 1
}

文件中只要指定要重新分配副本的分区号就可以,不需要列出所有分区。

提交分区副本重分配任务:

# --execute 参数表示执行
kafka-reassign-partitions --zookeeper localhost:2181/kafka --reassignment-json-file assign.json --execute
# --verify 参数表示查看分区副本重分配任务的执行状态
kafka-reassign-partitions --zookeeper localhost:2181/kafka --reassignment-json-file assign.json --verify

让系统自动帮我们生成重分配json文件:

执行命令之前需要建立一个json文件,告诉系统要重分配哪些分区:

gen.json:

{
  "topics": [
    {
      "topic": "foo"
    }
  ],
  "version": 1
}

接着执行命令

# --generate 表示生成重分配的json文件
# --topics-to-move-json-file 指定要重分配哪些topic
# --broker-list 表示要分配到哪些broker上去
kafka-reassign-partitions --zookeeper localhost:2181/kafka --generate --topics-to-move-json-file gen.json --broker-list 1001,1002,1003

其他参数

# 指定重分配时,在一个broker上,各个日志目录之间复制数据的阈值,最低要求 1 KB/s
# 如果重分配任务正在进行,第二次执行会修改原来设置的阈值
--replica-alter-log-dirs
# 指定重分配时,在不同broker之间传输数据的阈值,最低要求 1 KB/s
# 如果重分配任务正在进行,第二次执行会修改原来设置的阈值
--throttle
# 等待重分配任务开始的超时时间
--timeout

分区副本重分配过程

详情可以看kafka源码的KafkaController#onPartitionReassignment()的方法注解。

RAR = Reassigned replicas,目标要分配的副本情况
OAR = Original list of replicas for partition,原先的副本分配情况
AR = current assigned replicas,当前的副本分配情况

  1. 更新zk处的partition副本配置:AR=RAR+OAR
  2. 向所有RAR+OAR的副本发送元数据更新请求
  3. 将新增的那部分的副本状态设置为NewReplica。也就是 RAR-OAR 那部分副本
  4. 等待所有的副本和leader保持同步。也就是抱着RAR+OAR的副本都在isr中了
  5. 将所有在RAR中的副本状态都设置为OnlineReplica
  6. 在内存中先将AR=RAR
  7. 如果leader不在RAR中,就需要重新竞选leader。采用ReassignedPartitionLeaderSelector选举
  8. 将所有准备移除的副本状态设置为OfflineReplica。也就是OAR-RAR的那部分副本。这时partition的isr会收缩
  9. 将所有准备移除的副本状态设置为NonExistentReplica。这时所在的分区副本数据会被删除。
  10. 将内存中的AR更新到zk
  11. 更新zk的/admin/reassign_partitions路径,移除这个partition
  12. 发送新的元数据到各个broker上

假设当前有OAR = {1, 2, 3}, RAR = {4,5,6},在进行partition reaassigned的过程中会发生如下变化

AR leader/isr 步骤
{1,2,3} 1/{1,2,3} 初始状态
{1,2,3,4,5,6} 1/{1,2,3,4,5,6} 步骤2
{1,2,3,4,5,6} 1/{1,2,3,4,5,6} 步骤4
{1,2,3,4,5,6} 4/{1,2,3,4,5,6} 步骤7
{1,2,3,4,5,6} 4/{1,2,3,4,5,6} 步骤8
{4,5,6} 4/{4,5,6} 步骤10

四、删除某个partition的数据

使用kafka-delete-records命令可以删除指定topic-partition在指定offset之前的所有数据。

该命令是kafka在0.11版本之后才支持的。

首先需要编写删除offset描述json文件:

delete.json

{
  "partitions": [
    {
      "topic": "test",
      "partition": 0,
      "offset": 24
    }
  ],
  "version": 1
}

上面的json文件表示删除topic是test的0号parition的24之前的所有offset,也就是1-23这些offset的数据都会被删除掉。

kafka-delete-records --bootstrap-server 127.0.0.1:9092 --offset-json-file delete.json

五、全局&topic配置修改

通过kafka-configs命令,我们可以修改broker的配置,以及topic的配置、client和user的配置。

配置更新原理

kafka-configs命令修改配置后会被写到对应的zookeeper的节点上持久化,之后kafka集群重启后还会加载这些配置,并覆盖配置文件的那些配置。也就是说,如果在此处设置了某个配置项,之后在配置文件中对这个配置项的改动都不会起作用,因为被覆盖了。

用该命令修改了配置后,可以在zk的节点下看到对应的配置内容。

节点目录一般是 /config/entityType/entityName,entityType可以是brokers、topics、users、clients。entityName表示具体的名称,比如broker的id,topic的名称等。

比如要看0号broker修改过的配置项,可以在zk交互界面中输入

# /kafka 是命名空间
get /kafka/config/brokers/0

修改broker配置

# 将0号broker的配置 log.cleaner.backoff.ms修改成1000,flush.ms 也修改成1000
# --alter 表示要修改配置项
# --add-config 后面跟着要修改的配置项
kafka-configs --bootstrap-server 127.0.0.1:9092 --entity-type brokers --entity-name 0 --add-config log.cleaner.backoff.ms=1000,flush.ms=1000 --alter
# 删除0号broker 对 log.cleaner.backoff.ms的配置
kafka-configs --bootstrap-server 127.0.0.1:9092 --entity-type brokers --entity-name 0 --delete-config log.cleaner.backoff.ms --alter
# 列出0号broker修改过的配置项
kafka-configs --bootstrap-server 127.0.0.1:9092 --entity-type brokers --entity-name 0 --describe

修改topic的配置

# 将test这个topic的 delete.retention.ms修改成1000,flush.ms 也修改成1000
kafka-configs --zookeeper 127.0.0.1:2181/kafka --entity-type topics --entity-name test --add-config delete.retention.ms=1000,flush.ms=1000 --alter
# 删除test这个topic的 delete.retention.ms和flush.ms配置项
kafka-configs --zookeeper 127.0.0.1:2181/kafka --entity-type topics --entity-name test --delete-config delete.retention.ms,flush.ms --alter
# 列出 test这个topic修改过的配置项
kafka-configs --zookeeper 127.0.0.1:2181/kafka --entity-type topics --entity-name test --describe

修改client的配置

这里的client是指客户端,也就是produer或者consumer。客户端支持修改的配置有

# 请求限制
request_percentage
# 推送消息时的流量控制
producer_byte_rate
# 消费时的流量控制
consumer_byte_rate

通过指定clientId我们可以控制指定客户端的配置,从而控制他们的流量不会超过我们设定的值

# 设置 客户端id 为test的 producer_byte_rate和consumer_byte_rate为1024
kafka-configs --zookeeper 127.0.0.1:2181/kafka --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=1024' --entity-type clients --entity-name test

六、查看broker上磁盘的使用情况

在0.11版本中,新增一个命令kafka-log-dirs可以查看broker的磁盘使用情况。

该命令可以从两个维度观察磁盘的使用情况,一个是指定broker id,查看该broker的数据目录的各个topic parition的占用大小。还可以直接指定topic,查看这些topic的partition在各个broker上的使用情况。甚至可以两个过滤条件一起用,同时指定brokerId和topic。

# 查看0、1号broker上各个topic partition的磁盘使用情况
kafka-log-dirs --bootstrap-server 127.0.0.1:9092 --broker-list 0,1 --describe
# 查看topic:test 在各个broker上的磁盘使用情况
kafka-log-dirs --bootstrap-server 127.0.0.1:9092 --topic-list test --describe
# 查看topic test 在0号broker上的磁盘使用情况
kafka-log-dirs --bootstrap-server 127.0.0.1:9092 --topic-list test --broker-list 0 --describe

输出示例:

{
  "version": 1,
  "brokers": [
    {
      "broker": 1001,
      "logDirs": [
        {
          "logDir": "/kafka/kafka-logs-7da01186c90a",
          "error": null,
          "partitions": [
            {
              "partition": "test-4",
              "size": 0,
              "offsetLag": 0,
              "isFuture": false
            },
            {
              "partition": "test-0",
              "size": 0,
              "offsetLag": 0,
              "isFuture": false
            }
          ]
        }
      ]
    }
  ]
}

七、使用kafka-preferred-replica-election进行leader选举

当我们查看某个topic partition时,会输出该partiton replica的列表,其中replica列表的第一个replica被kafka称为preferred replica。

Topic: test Partition: 0    Leader: 1002    Replicas: 1001,1002,1003    Isr: 1002,1001,1003

上面的test partition-0中,1001就是那个preferred replica。在大多情况下,preferred replica一般就是leader,但是有些情况可能不是。因此,kafka提供了kafka-preferred-replica-election来将preferred replica选举成leader。

首先我们需要编辑prefered.json 文件:

{
  "partitions": [
    {
      "topic": "test",
      "partition": 0
    }
  ]
}

该文件告诉工具我们要对topic test的partition-0进行preferred选举:

kafka-preferred-replica-election --zookeeper 127.0.0.1:2181 --path-to-json-file prefered.json
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342