1. 背景
在开发es入库组件时,使用BulkProcessor处理大批量的数据,遇到了数据入不进es。分析后发现是Bulkprocessor的有些地方没有执行,为此分析了该部分的源码。
2. 思路
分析这类源码,先看下BulkProcessor如何构建的。如下,问题的原因就是BulkProcessor的Listener的函数未成功执行。
BulkProcessor bulkProcessor = BulkProcessor.builder(
client,
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId,
BulkRequest request) {
}
@Override
public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) {
}
@Override
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) {
}
})
// 10000个request触发flush
.setBulkActions(10000)
// bulk数据每达到5MB触发flush
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
// 每5秒flush一次
.setFlushInterval(TimeValue.timeValueSeconds(5))
// 0代表同步提交即只能提交一个request;
// 1代表当有一个新的bulk正在累积时,1个并发请求可被允许执行
.setConcurrentRequests(1)
// 设置当出现代表ES集群拥有很少的可用资源来处理request时抛出
// EsRejectedExecutionException造成N个bulk内request失败时
// 进行重试的策略,初始等待100ms,后面指数级增加,总共重试3次.
// 不重试设为BackoffPolicy.noBackoff()
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
.build();
-
重要的配置参数
批量请求数量:bulkActions
-
批量数据大小:bulkSize
上面这两个属性的代码比较简单
private void executeIfNeeded() { this.ensureOpen(); //达到条件处理bulkRequest if (this.isOverTheLimit()) { this.execute(); } } private boolean isOverTheLimit() { if (this.bulkActions != -1 && this.bulkRequest.numberOfActions() >= this.bulkActions) { return true; } else { return this.bulkSize != -1L && this.bulkRequest.estimatedSizeInBytes() >= this.bulkSize; } }
-
并发请求处理量:concurrentRequests
//根据该配置确定是同步处理器还是异步处理器 this.bulkRequestHandler = concurrentRequests == 0 ? BulkRequestHandler.syncHandler(client, backoffPolicy, listener) : BulkRequestHandler.asyncHandler(client, backoffPolicy, listener, concurrentRequests);
-
刷新间隔时间:flushInterval
//scheduler设置的是单个worker线程,这是问题的起因!!! this.scheduler = (ScheduledThreadPoolExecutor)Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(client.settings(), (name != null ? "[" + name + "]" : "") + "bulk_processor"));
//定时线程池,定期(flushInterval)执行刷新任务(Flush)
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(new BulkProcessor.Flush(), flushInterval.millis(), flushInterval.millis(), TimeUnit.MILLISECONDS);
```
看下定期执行的逻辑(Flush)
``` java
class Flush implements Runnable {
Flush() {
}
public void run() {
synchronized(BulkProcessor.this) {
if (!BulkProcessor.this.closed) {
if (BulkProcessor.this.bulkRequest.numberOfActions() != 0) {
//处理累计的批量数据
BulkProcessor.this.execute();
}
}
}
}
}
```
Flush线程的逻辑很简单,就是处理已有批量的数据。
- 回退重试策略:backoffPolicy
Retry.on(EsRejectedExecutionException.class).policy(this.backoffPolicy).withSyncBackoff(this.client, bulkRequest);
在插入es失败时发起重试,可以配置重试策略,如指数回退,常量回退等,不是重点,略过。
-
回调函数:listener
既然是入库,最重要的当然是把数据入进去,处理批量请求的最重要的函数是
private void execute() { BulkRequest bulkRequest = this.bulkRequest; long executionId = this.executionIdGen.incrementAndGet(); this.bulkRequest = new BulkRequest(); this.bulkRequestHandler.execute(bulkRequest, executionId); } private static class AsyncBulkRequestHandler extends BulkRequestHandler { private AsyncBulkRequestHandler(Client client, BackoffPolicy backoffPolicy, Listener listener, int concurrentRequests) { super(client); this.backoffPolicy = backoffPolicy; assert concurrentRequests > 0; this.listener = listener; this.concurrentRequests = concurrentRequests; //创建同步信号量 this.semaphore = new Semaphore(concurrentRequests); } public void execute(final BulkRequest bulkRequest, final long executionId) { boolean bulkRequestSetupSuccessful = false; boolean acquired = false; try { //处理前执行以下逻辑 this.listener.beforeBulk(executionId, bulkRequest); //获取信号量 this.semaphore.acquire(); acquired = true; Retry.on(EsRejectedExecutionException.class).policy(this.backoffPolicy).withAsyncBackoff(this.client, bulkRequest, new ActionListener<BulkResponse>() { public void onResponse(BulkResponse response) { try { //正常处理后执行以下逻辑 AsyncBulkRequestHandler.this.listener.afterBulk(executionId, bulkRequest, response); } finally { //释放信号量(很重要!!!未按预期执行会导致信号量未能释放而阻塞) AsyncBulkRequestHandler.this.semaphore.release(); } } public void onFailure(Exception e) { try { //异常处理后执行以下逻辑 AsyncBulkRequestHandler.this.listener.afterBulk(executionId, bulkRequest, e); } finally { //释放信号量 (很重要!!!未按预期执行会导致信号量未能释放而阻塞) AsyncBulkRequestHandler.this.semaphore.release(); } } }); bulkRequestSetupSuccessful = true; } catch (InterruptedException var11) { ... } finally { if (!bulkRequestSetupSuccessful && acquired) { this.semaphore.release(); } } } }
在这里不得不多提一嘴,execute方法中的onResponse方法的回调函数正确执行很重要,否则会导致阻塞。
3. 死锁问题分析
这个算是5.4.1的bug了。死锁原因是,在scheduler中只有一个worker线程。Flush方法是同步方法(sychronized(BulkProcessor.this)),但是在internallAdd()方法也会被阻塞在这个锁上,这个bulk请求就会发送失败,触发Retry逻辑。由于internalAdd方法占锁,导致线程的Flush方法阻塞,所以当retry逻辑尝试重试会失败因为Flush阻塞了唯一的线程,所以Flush不能继续,一直等待internalAdd结束。internalAdd逻辑未能结束因为一直在等Retry逻辑结束,而Retry逻辑未能结束是因为在等待Flush逻辑结束。
首先走查代码分析这个缺陷。
//定义了单个线程,该线程被Flush和Retry逻辑共享
this.scheduler = (ScheduledThreadPoolExecutor)Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(client.settings(), (name != null ? "[" + name + "]" : "") + "bulk_processor"));
//该线程会执行Flush逻辑
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(new BulkProcessor.Flush(), flushInterval.millis(), flushInterval.millis(), TimeUnit.MILLISECONDS);
接下来看下Flush逻辑
//
class Flush implements Runnable {
Flush() {
}
public void run() {
//会加锁!!!
synchronized(BulkProcessor.this) {
if (!BulkProcessor.this.closed) {
if (BulkProcessor.this.bulkRequest.numberOfActions() != 0) {
BulkProcessor.this.execute();
}
}
}
}
}
-
添加索引数据请求
bulkProcessor.add(new IndexRequest("twitter", "_doc", "1").source(/* your doc here */)); //在添加数据时会hold住对象锁(和Flush hold的是同一把锁),如果失败会阻塞Flush逻辑 private synchronized void internalAdd(DocWriteRequest request, @Nullable Object payload) { ensureOpen(); bulkRequest.add(request, payload); executeIfNeeded(); }
InternalAdd过程会占用锁,此时如果失败,就会去调用Retry线程去重试。而Retry线程需要等待Flush结束才能进入execute方法执行重试的过程。但是Flush的锁被添加数据的时候(internalAdd)占据了,因而导致整个死锁,如下图所示: