小记一次Kafka集群响应慢问题追查

问题

某一天业务来找我,反映发数据到某一个Kafka集群特别慢。
并且他们提供了一份自己的测试结果,结果显示发送数据到Kafka集群A的平均响应延迟在10ms以内,但是发送到Kafka集群B的平均响应延迟却达到了2000ms+。
这种问题一般是比较头疼的,首先,我们Kafka集群都有监控和报警,通过查看可用性、流量变化、Kafka日志等方式,都没有发现任何异样;其次,响应慢也有可能和用户的使用方式和测试方法有关系。
因此第一步,我决定验证一下问题的存在。

验证问题

kafka/bin目录中,kafka提供了一个写请求性能测试脚本kafka-producer-perf-test.sh
这个脚本会运行kafka中的kafka.perf.ProducerPerformance类,发送消息到kafka并输出CSV报告。
测试命令如下:

kafka/bin/kafka-producer-perf-test.sh --broker-list ${BROKER_LIST} --topics perf-test-topic --show-detailed-stats --messages 10000 --csv-reporter-enabled --metrics-dir ./perf-report

通过分析生成的报告,发现确实有一台节点的响应比较慢:

time min max mean median stddev 95% 99% 99.90%
1 0 0 0 0 0 0 0 0
2 1184.369398 1184.369398 1184.369398 1184.369398 0 1184.369398 1184.369398 1184.369398
3 1184.369398 1308.03076 1246.200079 1246.200079 87.44178764 1308.03076 1308.03076 1308.03076
4 1036.153496 1308.03076 1176.184551 1184.369398 136.1233097 1308.03076 1308.03076 1308.03076
5 1036.153496 1308.03076 1176.184551 1184.369398 136.1233097 1308.03076 1308.03076 1308.03076
6 1036.153496 1308.03076 1170.298591 1168.505053 111.7658942 1308.03076 1308.03076 1308.03076
7 1036.153496 1308.03076 1195.533735 1184.369398 112.0391625 1308.03076 1308.03076 1308.03076
8 1036.153496 1308.03076 1176.72978 1168.505053 110.2893991 1308.03076 1308.03076 1308.03076

可以看到P999分布已经达到了1300ms左右,这显然是不正常的,但是原因是什么呢?

分析

既然日志没有问题,那只能看一下jstack信息了:

"kafka-request-handler-12" daemon prio=10 tid=0x00007fee9c7eb800 nid=0xea5a waiting for monitor entry [0x00007fecfbaf9000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:350)
        - waiting to lock <0x0000000640327150> (a java.lang.Object)
        at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376)
        at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366)
        at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:185)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
        at java.lang.Thread.run(Thread.java:662)
"kafka-request-handler-11" daemon prio=10 tid=0x00007fee9c7e9000 nid=0xea59 waiting for monitor entry [0x00007fecfbbfa000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:350)
        - waiting to lock <0x0000000640327150> (a java.lang.Object)
        at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376)
        at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366)
        at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:185)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
        at java.lang.Thread.run(Thread.java:662)

如上发现jstack中有非常奇怪的信息,很多kafka-request-handler线程都处于阻塞状态。
这里简单解释一下kafka的处理请求线程模型,引用一篇讲Kafka NIO网络通信的文章中的图来说明:

kafka-nio

如图,kafka采用了Java NIO中的selector模型。一个Acceptor线程负责接受请求,多个Processor线程负责处理请求。但实际上Processor线程只是把请求封装成kafka request,然后丢到RequestChannel中(当然也负责读取response并返回,这里不展开)。真正执行请求的是KafkaRequestHandler,即jstack中的kafka-request-handler线程。
所以当kafka-request-handler线程出现大量阻塞的时候,会极大地影响整个节点的响应效率。

关于Java线程中的BLOCKED状态,可以直接看一下Java doc说明:

        /**
         * Thread state for a thread blocked waiting for a monitor lock.
         * A thread in the blocked state is waiting for a monitor lock
         * to enter a synchronized block/method or
         * reenter a synchronized block/method after calling
         * {@link Object#wait() Object.wait}.
         */

可见kafka-request-handler线程是因为抢锁而发生了阻塞。我们根据jstack信息中的kafka.cluster.Partition.appendMessagesToLeader定位到了源码:

  def appendMessagesToLeader(messages: ByteBufferMessageSet) = {
    leaderIsrUpdateLock synchronized {
      val leaderReplicaOpt = leaderReplicaIfLocal()
      leaderReplicaOpt match {
        case Some(leaderReplica) =>
          val log = leaderReplica.log.get
          val info = log.append(messages, assignOffsets = true)
          // we may need to increment high watermark since ISR could be down to 1
          maybeIncrementLeaderHW(leaderReplica)
          info
        case None =>
          throw new NotLeaderForPartitionException("Leader not local for partition [%s,%d] on broker %d"
            .format(topic, partitionId, localBrokerId))
      }
    }
  }

可以看到这个方法确实是同步的,同步对象是leaderIsrUpdateLock。由于leaderIsrUpdateLock是kafka.cluster.Partition的成员变量,也就是说只有在写同一个topic partition的时候,才会发生互斥等待。
所以发生上面问题的原因只可能是某个topic有大量的写请求,而且这个topic的partition数量不多,导致并发不足。
于是大量该topic的ProduceRequest占用了kafka-request-handler线程池,但是这些线程之间互相抢锁,执行效率比较低,从而导致其他topic的请求无法及时被处理。

解决

通过分析日志和查看监控流量,定位到集群中某个topic的ProduceRequest请求的QPS占了整个集群的80%以上。
通过查看该topic监控指标中的单位时间内的消息条目数(MessagesInPerSec)和单位时间内的发送请求数(ProduceRequestPerSec),可以计算出该Topic平均不到10条消息就会触发一次kafka写请求;再考虑到partition数量,推测应该是业务采用了kafka producer的同步模式,每条消息都触发一次kafka写请求。
解决方法有两种:

  1. 通过在kafka producer config中配置producer.type=async来使用异步发送模式。该模式下client会把消息先放到一个queue中,后台的发送线程会从queue中取出消息,以batch(默认200条)的方式发送到kafka。这种方式提高了吞吐,妥协了时效性(可以配置最大发送间隔,默认5000ms),适合数据量比较大,对延迟不敏感的业务。
  2. 依旧采用默认的同步方式,不过client需要把要发送的消息先缓存到buffer中,然后调用send接口。其实send接口的参数是可变参数,接收的是message列表,def send(messages: KeyedMessage[K, V]*): Unit;但有一些用户不注意,会把自己集合中的一批消息逐条的调用send,给kafka后端带来QPS压力。
  • 错误示例
    val messages = Seq("hello", "world")
    val properties = new Properties()
    // custom properties here.
    val kafkaProducer = new Producer[String, String](new ProducerConfig(new Properties()))

    messages.foreach{m =>
      val keyedMessage = new KeyedMessage[String, String]("topic", null, m)
      kafkaProducer.send(keyedMessage)
    }
  • 正确示例
    val messages = Seq("hello", "world")
    val properties = new Properties()
    // custom properties here.
    val kafkaProducer = new Producer[String, String](new ProducerConfig(new Properties()))

    val keyedMessages = messages.map(m => new KeyedMessage[String, String]("topic", null, m))
    kafkaProducer.send(keyedMessages: _*)

当然,增加topic partition数量也能在一定程度上缓解问题,因为不同partition之间的写请求是不互斥的,但这种方式更像是治标不治本,掩盖了根本问题。

总结

合理地发送网络请求在分布式系统中非常重要,为了提高效率,通常在权衡时效性和吞吐的情况下,以“聚少为多”的方式发送批量的请求。过多的小请求不仅会降低吞吐,还可能会压垮后端的服务。
当然,作为服务提供方,需要通过多租户、限流等方式,避免不正常使用的场景把服务压垮。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 200,045评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,114评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 147,120评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,902评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,828评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,132评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,590评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,258评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,408评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,335评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,385评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,068评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,660评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,747评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,967评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,406评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,970评论 2 341

推荐阅读更多精彩内容