[TOC]
1. 背景和现象
1.1 kafka版本和部署状态
kafka版本
server和client都是0.11.0
部署状态
kafka多个节点(具体多少不清楚,但是肯定不是单节点),zookeeper3个节点。topic的分区副本数为2。具备高可用。
1.2 事件现象
在一次生产事件中,其中一个kafka节点和zk节点因物理机宕机下线,zk和kafka broker恢复后,生产者应用并没有恢复,最终无法发送消息。
此时生产者端的应用业务流程无法继续执行,流程走到producer模块就被Block住,然后每隔10s报错一次。
重启producer之后,应用恢复。
关键日志
2018-11-07 10:52:24,015 [kfkBolt-tbl_qqhis_sq_trans_flow_raw-thread-0] [com.unionpay.cloudatlas.galaxy.services.streamService.writer.KafkaProducerWriter:65] [ERROR] - produce: fail at seco
nd time.
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 10000 ms.
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1057)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:764)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:701)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:609)
at com.unionpay.cloudatlas.galaxy.services.streamService.writer.KafkaProducerWriter$1.onCompletion(KafkaProducerWriter.java:57)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:760)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:701)
at com.unionpay.cloudatlas.galaxy.services.streamService.writer.KafkaProducerWriter.doOnce(KafkaProducerWriter.java:48)
at com.unionpay.cloudatlas.galaxy.services.streamService.writer.KafkaProducerWriter.doOnce(KafkaProducerWriter.java:27)
at com.unionpay.cloudatlas.upstorm.component.SimpleBolt.doOnce(SimpleBolt.java:187)
at com.unionpay.cloudatlas.upstorm.component.SimpleBolt$InnerThread.run(SimpleBolt.java:105)
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 10000 ms.
1.3 生产者代码和配置
生产者代码
@Override
public DataRecord doOnce(Record record) {
// TODO Auto-generated method stub
try {
OperCounter.getInstance().increment(Constant.KEY_KAFKA_RECEIVE);
final ProducerRecord<String, Record> proRecord = new ProducerRecord<String, Record>(topicPub, record);
producer.send(proRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
// TODO Auto-generated method stub
// 发送失败
if (exception != null) {
OperCounter.getInstance().increment(Constant.KEY_KAFKA_REIN);
logger.warn("producer send fail and resend.", exception);
try {
producer.send(proRecord).get();
OperCounter.getInstance().increment(Constant.KEY_KAFKA_DEAL);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
logger.error("produce: fail at second time.\t", e);
OperCounter.getInstance().increment(Constant.KEY_KAFKA_REINJECT);
} catch (ExecutionException e) {
// TODO Auto-generated catch block
logger.error("produce: fail at second time.\t", e);
OperCounter.getInstance().increment(Constant.KEY_KAFKA_REINJECT);
} catch (Exception e) {
logger.error("produce: fail at second time.\t", e);
OperCounter.getInstance().increment(Constant.KEY_KAFKA_REINJECT);
}
} else {
OperCounter.getInstance().increment(Constant.KEY_KAFKA_DEAL);
}
}
});
return null;
} catch (TimeoutException e) {
logger.error("produce: fail.\t", e);
OperCounter.getInstance().increment(Constant.KEY_KAFKA_REJECT);
} catch (Exception e) {
logger.error("produce: fail.\t", e);
OperCounter.getInstance().increment(Constant.KEY_KAFKA_REJECT);
}
return null;
}
生产者配置
bootstrap.servers=${KAFKA_SERVER_IN}
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=com.unionpay.cloudatlas.galaxy.common.protocol.kafka.RecordKryoSerializer
#max time to wait,default to 60s
max.block.ms=10000
batch.size=65536
buffer.memory=134217728
retries=3
通过上述代码和配置可以看出
- 最大block事件为1000ms,也就是10s
- buffer配置的较大,为134M
- 生产者先是异步发送,如果发送失败,则执行一次同步发送
2. 问题初步定位和分析
2.1 kafka生产者简介
排除服务端的疑点
在最终定位之前,我们怀疑过很多点,比如是不是kafka高可用存在bug、是不是zk出问题了、是不是kafka选主失败等,最终通过生产的其他应用现象推论以及理论分析得出以下基本结论
- kafka本身高可用机制还是比较可靠的,宕机1台节点,server的状态可以快速回复正常
- zookeeper的高可用也没有问题,3个节点的情况下,是允许1个节点下线的,zookeeper服务正常
- 宕机期间以及恢复后,kafka完成了leader节点的选举
总的来说,就是不要怀疑服务端有问题。
当然,“不要怀疑服务端有问题”只是我们定位到了原因之后的后置结论,并不表示故障排查的时候忽略服务端的潜在问题,毕竟不管是硬件资源还是软件质量都可能存在缺陷,尤其是开源产品的发展本身就是一个不断迭代完善的过程。
关于kafka生产者
在说原因之前,还需要说明一下kafka的producer流程。kafka生产者发送消息的粗略流程如下:
- 首先应用调用send发送
- 消息的KV序列化
- 根据分区器决定消息发送到那个分区
- 将消息添加到本地缓冲区,如果缓冲区满,则当前线程block,直到缓冲区有足够的空间或者达到最大阻塞时间(max.block.ms)
- 有一个独立的IO线程负责从缓冲区中将消息发送到服务端
- IO线程收到响应之后,通知producer线程完成了发送,如果需要,调用producer指定的回调函数
注意,从上面的流程我们可以看出,在kafka的高版本客户端(貌似是0.9之后)中,发送消息天然的是一个异步的过程,也就是说,消息发送都是异步方式进行的。而我们如果需要使用同步的方式发送消息,那么我们只能通过KafkaProducer.send返回的Future对象完成,调用Future.get,关键代码如下
//KafkaProducer.send的方法签名
//不提供回调
public Future<RecordMetadata> send(ProducerRecord<K, V> record);
//提供回调
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
当我们调用了Future.get的时候,我们做了什么
上面出问题的代码,使用了同步的方式等待结果,那么同步的get,到底是什么样的操作呢?
先来看下KafkaProducer.send返回的具体Future实现
KafkaProducer.doSend
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
上面的代码中返回的future即FutureRecordMetadata的实例,其实现了Future的get方法
public final class FutureRecordMetadata implements Future<RecordMetadata> {
@Override
public RecordMetadata get() throws InterruptedException, ExecutionException {
//阻塞等待
this.result.await();
if (nextRecordMetadata != null)
return nextRecordMetadata.get();
return valueOrError();
}
@Override
public RecordMetadata get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
// Handle overflow.
long now = System.currentTimeMillis();
long deadline = Long.MAX_VALUE - timeout < now ? Long.MAX_VALUE : now + timeout;
//阻塞等待
boolean occurred = this.result.await(timeout, unit);
if (nextRecordMetadata != null)
return nextRecordMetadata.get(deadline - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
if (!occurred)
throw new TimeoutException("Timeout after waiting for " + TimeUnit.MILLISECONDS.convert(timeout, unit) + " ms.");
return valueOrError();
}
}
可以看到,get里通过result.await阻塞等待,再看看这里的result对应的类ProduceRequestResult
public final class ProduceRequestResult {
private final CountDownLatch latch = new CountDownLatch(1);
/**
* Mark this request as complete and unblock any threads waiting on its completion.
*/
public void done() {
if (baseOffset == null)
throw new IllegalStateException("The method `set` must be invoked before this method.");
this.latch.countDown();
}
/**
* Await the completion of this request
*/
public void await() throws InterruptedException {
latch.await();
}
}
可以看到,其内部的await中调用了CountDownLatch.await进行等待,同时提供了done方法,解除等待的状态。
看到这里就比较清晰了,如果应用通过get方式同步等待结果,其内部实现时使用了CountDownLatch的await方法,当结果返回的时候,IO线程会调用done方法结束等待状态,并且返回结果。我们前面的分析只介绍了如何等待的,至于如何唤醒,将在下文介绍。
2.2 问题定位
通过上面的代码分析,我们几乎可以猜测到问题的出现可能和这里的设计有关——调用了get阻塞等待,但是由于某种原因,导致没有人唤醒等待着的线程。
为了进一步验证我们的想法,在开发环境复现生产事件的情况,当出现上述现象时,通过jstack抓一下线程快照,进一步证实了我们的猜想:
"kafka-producer-network-thread | PRODUCER_VERSION_UP_KAKFA_20181129_195652" daemon prio=10 tid=0x00007fd280253000 nid=0x20a5 waiting on condition [0x00007fd2879d8000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0000000785982e70> (a java.util.concurrent.CountDownLatch$Sync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)
at org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:61)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
at com.unionpay.arch.bigdata.test.BigDataUPKafkaProducer$MyCallback.onCompletion(BigDataUPKafkaProducer.java:60)
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:201)
at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:185)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:599)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:575)
at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:539)
at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:474)
at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:75)
at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:660)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:454)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:446)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:224)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
at java.lang.Thread.run(Thread.java:745)
这里是开发环境复现的代码执行情况,因此Producer代码和生产上的不完全一致,但是逻辑相同。
通过线程快照可以发现,IO线程一致处于await状态,因此后续流程无法执行!
关于CountDownLatch的设计和实现,感兴趣的可以查看相关文档。其实这里说生产者应用被Block,严格意义上来说是不对的,线程状态其实是WAITING,因此这里的Block指的是“代码执行不下去,在当前状态一直堵着”的状态~
因此,问题定位如下
- 在异步发送的的回调里使用了同步的方式再次发送,由于kafka producer的同步发送是阻塞等待,且使用的是不带超时时间的无限期等待(future.get()中未指定超时时间),因此当不被唤醒时会一直wai下去
- kafka生产者的IO线程(实际执行数据发送的线程)是单线程模型,且回调函数是在IO线程中执行的,因此回调函数的阻塞会直接导致IO线程阻塞,于是生产者缓冲区的数据无法被发送
- kafka生产者还在不断的被应用调用,因此缓冲区一直累积并增大,当缓冲区满的时候,生产者线程会被阻塞,最大阻塞时间为max.block.time,如果改时间到达之后还是无法将数据塞入缓冲区,则会抛出一个异常,因此日志中看到达到10s之后,打印出异常栈
- 由于使用了get没有指定超时时间,且该await一直无法被唤醒,因此这种情况会一直持续,在没有人工干预的情况下,永远不会发送成功
生产建议
- kafka生产者推荐使用异步方式发送,并且提供回调以响应发送成功或者失败
- 如果需要使用future.get的方式模拟同步发送,则需要在get里加上合适的超时时间,避免因为不可预知的外部因素导致线程无法被唤醒,即使用Future.get(long timeout)的api而不是不带超时参数的Future.get()
- 不要在异步回调中执行阻塞操作或者耗时比较久的操作,如果有必要可以考虑交给另一个线程(池)去做
3. Future.get为何没有被唤醒
在前面的介绍中,我们定位了问题的原因,但也留下了一些疑问:
- 为何future.get没有被唤醒?
- producer是何时执行了回调操作的?
- 这种情况属于应用使用不当还是kafka的bug?
3.1 HOW:分析思路
想要彻底弄清楚这个问题,恐怕要去好好撸一撸kafka producer的源码了。由于kafka producer的代码非常多,其中有缓冲区操作模块、IO执行模块、元数据更新模块、事务支持模块等很多设计,这里就只从这次的事件问题切入分析,后面如果对kafka producer源码全面分析了之后再专门用几篇文章描述。
那么思路很简单,主要从以下几个方面入手
- 上一节中我们说到,造成wait的原因就是调用了CountDownLatch的await方法,那么何处调用了CountDownLatch的countdown方法?
- 在所有调用了CountDownLatch.countdown的地方,是否包含了对kafka节点下线的处理?也就是说,难道kafka节点下线之后,流程就不会走到countdown了吗?
为了弄清楚以上两个问题,我们先去看看源码。通过对ProduceRequestResult的成员变量CountDownLatch latch分析可以知道,修改其状态的方法只有2个await方法和一个done方法
/**
* Mark this request as complete and unblock any threads waiting on its completion.
*/
public void done() {
if (baseOffset == null)
throw new IllegalStateException("The method `set` must be invoked before this method.");
this.latch.countDown();
}
/**
* Await the completion of this request
*/
public void await() throws InterruptedException {
latch.await();
}
/**
* Await the completion of this request (up to the given time interval)
* @param timeout The maximum time to wait
* @param unit The unit for the max time
* @return true if the request completed, false if we timed out
*/
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
return latch.await(timeout, unit);
}
另外还有一个completed方法只是读取状态,不是修改,这里就忽略了
其中两个await方法一个是有时间参数的一个是没有的,对应Future.get()和Future.get(long timeout),是导致阻塞的入口,因此也不用考虑,那么重点就是这个done方法了。
3.2 WHEN:谁调用了done/什么场景下会正常唤醒
通过eclipse提供的工具,可以一层一层追踪出,有哪些地方调用了这个done,基本结论如下,
在Producer中 ,主要有设计到两个逻辑(两个类),其中
- Sender,主要对于异常情况做做一些处理,以唤醒await的线程,包括
- 当链接被强制关闭时
- 当事务管理器中认为需要丢弃时
- 当有过期的数据时
- NetworkClient,主要是处理发送结果,包括
- 当发送后返回失败时
- 当返回消息太大需要切分的时候
- 当发送成功的时候
相应的逻辑和流程可以看具体的源码
Sender中相关逻辑流程图如下
NetworkClient中相关逻辑
安利一个良心在线制图网站 https://www.draw.io/
3.3 WHY:为何会一直wait却没有被唤醒
通过上面的分析,我们梳理了解除线程阻塞(WAIT)的几个场景和时机,然而不幸的是,上面的场景均没有机会被执行:
- 在kafka节点宕机时,同步发送操作的message依然会被加入到生产者缓冲区,因为加入到缓冲区的过程和链路情况是解耦的,因此可以成功被塞到buffer
- 由于是同步的过程,因此塞到buffer之后,发送者便开始了get()的无限期等待,直到有“人”唤醒
- 通过上面的分析我们发现:唤醒该同步等待的操作,都需要在Sender也就是IO线程中执行:要么是由于各种原因觉得这个消息需要abort,要么是收到了正确或者错误的应答(fail or complete or split)。
- 此时奇妙的现象就发生了:同步等待的操作在IO线程,唤醒的操作也是在IO线程,这是同一个线程!也就是说,此刻已经发生了某种意义的“死锁”
- IO线程已经被无限WAITing了,因此buffer中的数据再也无法被发送
- 于是buffer越堆越多,直到达到buffer sizez之后,开始被block
- producer对block进行了控制,每次最大block的时间为max.block.time,然后向上抛出一个异常,于是出现了日志中的现象
综上,这次生产实践的原委基本清楚了。关于producer源码中的细节,后面再细细研读~