kafka生产者使用不当导致的应用挂起/夯住

[TOC]

1. 背景和现象

1.1 kafka版本和部署状态

kafka版本

server和client都是0.11.0

部署状态

kafka多个节点(具体多少不清楚,但是肯定不是单节点),zookeeper3个节点。topic的分区副本数为2。具备高可用。

1.2 事件现象

在一次生产事件中,其中一个kafka节点和zk节点因物理机宕机下线,zk和kafka broker恢复后,生产者应用并没有恢复,最终无法发送消息。

此时生产者端的应用业务流程无法继续执行,流程走到producer模块就被Block住,然后每隔10s报错一次。

重启producer之后,应用恢复。

关键日志

2018-11-07 10:52:24,015 [kfkBolt-tbl_qqhis_sq_trans_flow_raw-thread-0] [com.unionpay.cloudatlas.galaxy.services.streamService.writer.KafkaProducerWriter:65] [ERROR] - produce:  fail at seco
nd time.   
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 10000 ms.
        at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1057)
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:764)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:701)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:609)
        at com.unionpay.cloudatlas.galaxy.services.streamService.writer.KafkaProducerWriter$1.onCompletion(KafkaProducerWriter.java:57)
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:760)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:701)
        at com.unionpay.cloudatlas.galaxy.services.streamService.writer.KafkaProducerWriter.doOnce(KafkaProducerWriter.java:48)
        at com.unionpay.cloudatlas.galaxy.services.streamService.writer.KafkaProducerWriter.doOnce(KafkaProducerWriter.java:27)
        at com.unionpay.cloudatlas.upstorm.component.SimpleBolt.doOnce(SimpleBolt.java:187)
        at com.unionpay.cloudatlas.upstorm.component.SimpleBolt$InnerThread.run(SimpleBolt.java:105)
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 10000 ms.

1.3 生产者代码和配置

生产者代码

    @Override
    public DataRecord doOnce(Record record) {
        // TODO Auto-generated method stub
        try {
            OperCounter.getInstance().increment(Constant.KEY_KAFKA_RECEIVE);
            final ProducerRecord<String, Record> proRecord = new ProducerRecord<String, Record>(topicPub, record);
            producer.send(proRecord, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    // TODO Auto-generated method stub
                    // 发送失败
                    if (exception != null) {
                        OperCounter.getInstance().increment(Constant.KEY_KAFKA_REIN);
                        logger.warn("producer send fail and resend.", exception);
                        try {
                            producer.send(proRecord).get();
                            OperCounter.getInstance().increment(Constant.KEY_KAFKA_DEAL);
                        } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            logger.error("produce:  fail at second time.\t", e);
                            OperCounter.getInstance().increment(Constant.KEY_KAFKA_REINJECT);
                        } catch (ExecutionException e) {
                            // TODO Auto-generated catch block
                            logger.error("produce:  fail at second time.\t", e);
                            OperCounter.getInstance().increment(Constant.KEY_KAFKA_REINJECT);
                        } catch (Exception e) {
                            logger.error("produce:  fail at second time.\t", e);
                            OperCounter.getInstance().increment(Constant.KEY_KAFKA_REINJECT);
                        }
                    } else {
                        OperCounter.getInstance().increment(Constant.KEY_KAFKA_DEAL);
                    }
                }
            });
            return null;
        } catch (TimeoutException e) {
            logger.error("produce:  fail.\t", e);
            OperCounter.getInstance().increment(Constant.KEY_KAFKA_REJECT);
        } catch (Exception e) {
            logger.error("produce:  fail.\t", e);
            OperCounter.getInstance().increment(Constant.KEY_KAFKA_REJECT);
        }
        return null;
    }

生产者配置

bootstrap.servers=${KAFKA_SERVER_IN}
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=com.unionpay.cloudatlas.galaxy.common.protocol.kafka.RecordKryoSerializer
#max time to wait,default to 60s
max.block.ms=10000
batch.size=65536
buffer.memory=134217728
retries=3

通过上述代码和配置可以看出

  • 最大block事件为1000ms,也就是10s
  • buffer配置的较大,为134M
  • 生产者先是异步发送,如果发送失败,则执行一次同步发送

2. 问题初步定位和分析

2.1 kafka生产者简介

排除服务端的疑点

在最终定位之前,我们怀疑过很多点,比如是不是kafka高可用存在bug、是不是zk出问题了、是不是kafka选主失败等,最终通过生产的其他应用现象推论以及理论分析得出以下基本结论

  • kafka本身高可用机制还是比较可靠的,宕机1台节点,server的状态可以快速回复正常
  • zookeeper的高可用也没有问题,3个节点的情况下,是允许1个节点下线的,zookeeper服务正常
  • 宕机期间以及恢复后,kafka完成了leader节点的选举

总的来说,就是不要怀疑服务端有问题。

当然,“不要怀疑服务端有问题”只是我们定位到了原因之后的后置结论,并不表示故障排查的时候忽略服务端的潜在问题,毕竟不管是硬件资源还是软件质量都可能存在缺陷,尤其是开源产品的发展本身就是一个不断迭代完善的过程。

关于kafka生产者

在说原因之前,还需要说明一下kafka的producer流程。kafka生产者发送消息的粗略流程如下:

  • 首先应用调用send发送
  • 消息的KV序列化
  • 根据分区器决定消息发送到那个分区
  • 将消息添加到本地缓冲区,如果缓冲区满,则当前线程block,直到缓冲区有足够的空间或者达到最大阻塞时间(max.block.ms)
  • 有一个独立的IO线程负责从缓冲区中将消息发送到服务端
  • IO线程收到响应之后,通知producer线程完成了发送,如果需要,调用producer指定的回调函数

注意,从上面的流程我们可以看出,在kafka的高版本客户端(貌似是0.9之后)中,发送消息天然的是一个异步的过程,也就是说,消息发送都是异步方式进行的。而我们如果需要使用同步的方式发送消息,那么我们只能通过KafkaProducer.send返回的Future对象完成,调用Future.get,关键代码如下

//KafkaProducer.send的方法签名
//不提供回调
public Future<RecordMetadata> send(ProducerRecord<K, V> record);
//提供回调
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)

当我们调用了Future.get的时候,我们做了什么

上面出问题的代码,使用了同步的方式等待结果,那么同步的get,到底是什么样的操作呢?

先来看下KafkaProducer.send返回的具体Future实现

KafkaProducer.doSend

            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs);
            if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }
            return result.future;

上面的代码中返回的future即FutureRecordMetadata的实例,其实现了Future的get方法

public final class FutureRecordMetadata implements Future<RecordMetadata> {

    @Override
    public RecordMetadata get() throws InterruptedException, ExecutionException {
        //阻塞等待
        this.result.await();
        if (nextRecordMetadata != null)
            return nextRecordMetadata.get();
        return valueOrError();
    }

    @Override
    public RecordMetadata get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        // Handle overflow.
        long now = System.currentTimeMillis();
        long deadline = Long.MAX_VALUE - timeout < now ? Long.MAX_VALUE : now + timeout;
        //阻塞等待
        boolean occurred = this.result.await(timeout, unit);
        if (nextRecordMetadata != null)
            return nextRecordMetadata.get(deadline - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        if (!occurred)
            throw new TimeoutException("Timeout after waiting for " + TimeUnit.MILLISECONDS.convert(timeout, unit) + " ms.");
        return valueOrError();
    }
    
}

可以看到,get里通过result.await阻塞等待,再看看这里的result对应的类ProduceRequestResult

public final class ProduceRequestResult {
    private final CountDownLatch latch = new CountDownLatch(1);
    /**
     * Mark this request as complete and unblock any threads waiting on its completion.
     */
    public void done() {
        if (baseOffset == null)
            throw new IllegalStateException("The method `set` must be invoked before this method.");
        this.latch.countDown();
    }

    /**
     * Await the completion of this request
     */
    public void await() throws InterruptedException {
        latch.await();
    }
}

可以看到,其内部的await中调用了CountDownLatch.await进行等待,同时提供了done方法,解除等待的状态。

看到这里就比较清晰了,如果应用通过get方式同步等待结果,其内部实现时使用了CountDownLatch的await方法,当结果返回的时候,IO线程会调用done方法结束等待状态,并且返回结果。我们前面的分析只介绍了如何等待的,至于如何唤醒,将在下文介绍。

2.2 问题定位

通过上面的代码分析,我们几乎可以猜测到问题的出现可能和这里的设计有关——调用了get阻塞等待,但是由于某种原因,导致没有人唤醒等待着的线程。

为了进一步验证我们的想法,在开发环境复现生产事件的情况,当出现上述现象时,通过jstack抓一下线程快照,进一步证实了我们的猜想:

"kafka-producer-network-thread | PRODUCER_VERSION_UP_KAKFA_20181129_195652" daemon prio=10 tid=0x00007fd280253000 nid=0x20a5 waiting on condition [0x00007fd2879d8000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000785982e70> (a java.util.concurrent.CountDownLatch$Sync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
        at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)
        at org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:61)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
        at com.unionpay.arch.bigdata.test.BigDataUPKafkaProducer$MyCallback.onCompletion(BigDataUPKafkaProducer.java:60)
        at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:201)
        at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:185)
        at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:599)
        at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:575)
        at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:539)
        at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:474)
        at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:75)
        at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:660)
        at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101)
        at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:454)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:446)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:224)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
        at java.lang.Thread.run(Thread.java:745)

这里是开发环境复现的代码执行情况,因此Producer代码和生产上的不完全一致,但是逻辑相同。

通过线程快照可以发现,IO线程一致处于await状态,因此后续流程无法执行!

关于CountDownLatch的设计和实现,感兴趣的可以查看相关文档。其实这里说生产者应用被Block,严格意义上来说是不对的,线程状态其实是WAITING,因此这里的Block指的是“代码执行不下去,在当前状态一直堵着”的状态~

因此,问题定位如下

  • 在异步发送的的回调里使用了同步的方式再次发送,由于kafka producer的同步发送是阻塞等待,且使用的是不带超时时间的无限期等待(future.get()中未指定超时时间),因此当不被唤醒时会一直wai下去
  • kafka生产者的IO线程(实际执行数据发送的线程)是单线程模型,且回调函数是在IO线程中执行的,因此回调函数的阻塞会直接导致IO线程阻塞,于是生产者缓冲区的数据无法被发送
  • kafka生产者还在不断的被应用调用,因此缓冲区一直累积并增大,当缓冲区满的时候,生产者线程会被阻塞,最大阻塞时间为max.block.time,如果改时间到达之后还是无法将数据塞入缓冲区,则会抛出一个异常,因此日志中看到达到10s之后,打印出异常栈
  • 由于使用了get没有指定超时时间,且该await一直无法被唤醒,因此这种情况会一直持续,在没有人工干预的情况下,永远不会发送成功

生产建议

  • kafka生产者推荐使用异步方式发送,并且提供回调以响应发送成功或者失败
  • 如果需要使用future.get的方式模拟同步发送,则需要在get里加上合适的超时时间,避免因为不可预知的外部因素导致线程无法被唤醒,即使用Future.get(long timeout)的api而不是不带超时参数的Future.get()
  • 不要在异步回调中执行阻塞操作或者耗时比较久的操作,如果有必要可以考虑交给另一个线程(池)去做

3. Future.get为何没有被唤醒

在前面的介绍中,我们定位了问题的原因,但也留下了一些疑问:

  • 为何future.get没有被唤醒?
  • producer是何时执行了回调操作的?
  • 这种情况属于应用使用不当还是kafka的bug?

3.1 HOW:分析思路

想要彻底弄清楚这个问题,恐怕要去好好撸一撸kafka producer的源码了。由于kafka producer的代码非常多,其中有缓冲区操作模块、IO执行模块、元数据更新模块、事务支持模块等很多设计,这里就只从这次的事件问题切入分析,后面如果对kafka producer源码全面分析了之后再专门用几篇文章描述。

那么思路很简单,主要从以下几个方面入手

  • 上一节中我们说到,造成wait的原因就是调用了CountDownLatch的await方法,那么何处调用了CountDownLatch的countdown方法?
  • 在所有调用了CountDownLatch.countdown的地方,是否包含了对kafka节点下线的处理?也就是说,难道kafka节点下线之后,流程就不会走到countdown了吗?

为了弄清楚以上两个问题,我们先去看看源码。通过对ProduceRequestResult的成员变量CountDownLatch latch分析可以知道,修改其状态的方法只有2个await方法和一个done方法

    /**
     * Mark this request as complete and unblock any threads waiting on its completion.
     */
    public void done() {
        if (baseOffset == null)
            throw new IllegalStateException("The method `set` must be invoked before this method.");
        this.latch.countDown();
    }

    /**
     * Await the completion of this request
     */
    public void await() throws InterruptedException {
        latch.await();
    }

    /**
     * Await the completion of this request (up to the given time interval)
     * @param timeout The maximum time to wait
     * @param unit The unit for the max time
     * @return true if the request completed, false if we timed out
     */
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        return latch.await(timeout, unit);
    }

另外还有一个completed方法只是读取状态,不是修改,这里就忽略了

其中两个await方法一个是有时间参数的一个是没有的,对应Future.get()和Future.get(long timeout),是导致阻塞的入口,因此也不用考虑,那么重点就是这个done方法了。

3.2 WHEN:谁调用了done/什么场景下会正常唤醒

通过eclipse提供的工具,可以一层一层追踪出,有哪些地方调用了这个done,基本结论如下,

在Producer中 ,主要有设计到两个逻辑(两个类),其中

  • Sender,主要对于异常情况做做一些处理,以唤醒await的线程,包括
    • 当链接被强制关闭时
    • 当事务管理器中认为需要丢弃时
    • 当有过期的数据时
  • NetworkClient,主要是处理发送结果,包括
    • 当发送后返回失败时
    • 当返回消息太大需要切分的时候
    • 当发送成功的时候

相应的逻辑和流程可以看具体的源码

Sender中相关逻辑流程图如下

Untitled Diagram.png

NetworkClient中相关逻辑

NetworClient中调用done的地方.png

安利一个良心在线制图网站 https://www.draw.io/

3.3 WHY:为何会一直wait却没有被唤醒

通过上面的分析,我们梳理了解除线程阻塞(WAIT)的几个场景和时机,然而不幸的是,上面的场景均没有机会被执行:

  • 在kafka节点宕机时,同步发送操作的message依然会被加入到生产者缓冲区,因为加入到缓冲区的过程和链路情况是解耦的,因此可以成功被塞到buffer
  • 由于是同步的过程,因此塞到buffer之后,发送者便开始了get()的无限期等待,直到有“人”唤醒
  • 通过上面的分析我们发现:唤醒该同步等待的操作,都需要在Sender也就是IO线程中执行:要么是由于各种原因觉得这个消息需要abort,要么是收到了正确或者错误的应答(fail or complete or split)。
  • 此时奇妙的现象就发生了:同步等待的操作在IO线程,唤醒的操作也是在IO线程,这是同一个线程!也就是说,此刻已经发生了某种意义的“死锁
  • IO线程已经被无限WAITing了,因此buffer中的数据再也无法被发送
  • 于是buffer越堆越多,直到达到buffer sizez之后,开始被block
  • producer对block进行了控制,每次最大block的时间为max.block.time,然后向上抛出一个异常,于是出现了日志中的现象

综上,这次生产实践的原委基本清楚了。关于producer源码中的细节,后面再细细研读~

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,968评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,601评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,220评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,416评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,425评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,144评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,432评论 3 401
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,088评论 0 261
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,586评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,028评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,137评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,783评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,343评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,333评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,559评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,595评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,901评论 2 345

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,601评论 18 139
  • 本文是我自己在秋招复习时的读书笔记,整理的知识点,也是为了防止忘记,尊重劳动成果,转载注明出处哦!如果你也喜欢,那...
    波波波先森阅读 11,239评论 4 56
  • 姓名:周小蓬 16019110037 转载自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw阅读 34,708评论 13 425
  • 耿治涛20+5万元。保险公司医院+维修车等7万元。张红卫借2万元。抓紧收回来。
    b538b2ad5da3阅读 168评论 0 0
  • 愿深夜落雪无声时,你不寂寞 愿搁笔踏雪而去路,你不孤独 愿天涯卧雪而眠处,你不害怕 无雪是你 雪落是你 雪化仍是你...
    彼岸风清阅读 170评论 0 0