Elasticsearch-BulkProcessor踩坑之源码分析

1. 背景

在开发es入库组件时,使用BulkProcessor处理大批量的数据,遇到了数据入不进es。分析后发现是Bulkprocessor的有些地方没有执行,为此分析了该部分的源码。

2. 思路

分析这类源码,先看下BulkProcessor如何构建的。如下,问题的原因就是BulkProcessor的Listener的函数未成功执行。

BulkProcessor bulkProcessor = BulkProcessor.builder(
                client,
                new BulkProcessor.Listener() {
                    @Override
                    public void beforeBulk(long executionId,
                                           BulkRequest request) {
                    }

                    @Override
                    public void afterBulk(long executionId,
                                          BulkRequest request,
                                          BulkResponse response) {
                        }

                    @Override
                    public void afterBulk(long executionId,
                                          BulkRequest request,
                                          Throwable failure) {
                         }
                })
                // 10000个request触发flush
                .setBulkActions(10000)
                // bulk数据每达到5MB触发flush
                .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
                // 每5秒flush一次
                .setFlushInterval(TimeValue.timeValueSeconds(5))
                // 0代表同步提交即只能提交一个request;
                // 1代表当有一个新的bulk正在累积时,1个并发请求可被允许执行
                .setConcurrentRequests(1)
                // 设置当出现代表ES集群拥有很少的可用资源来处理request时抛出
                // EsRejectedExecutionException造成N个bulk内request失败时
                // 进行重试的策略,初始等待100ms,后面指数级增加,总共重试3次.
                // 不重试设为BackoffPolicy.noBackoff()
                .setBackoffPolicy(
                        BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
                .build();
  • 重要的配置参数

    • 批量请求数量:bulkActions

    • 批量数据大小:bulkSize

      上面这两个属性的代码比较简单

      
      private void executeIfNeeded() {
              this.ensureOpen();
              //达到条件处理bulkRequest
              if (this.isOverTheLimit()) {
                  this.execute();
              }
       }
      
      private boolean isOverTheLimit() {
              if (this.bulkActions != -1 && this.bulkRequest.numberOfActions() >= this.bulkActions) {
                  return true;
              } else {
                  return this.bulkSize != -1L && this.bulkRequest.estimatedSizeInBytes() >= this.bulkSize;
              }
          }
      
  • 并发请求处理量:concurrentRequests

    //根据该配置确定是同步处理器还是异步处理器
    this.bulkRequestHandler = concurrentRequests == 0 ? BulkRequestHandler.syncHandler(client, backoffPolicy, listener) : BulkRequestHandler.asyncHandler(client, backoffPolicy, listener, concurrentRequests);
    
    
  • 刷新间隔时间:flushInterval

    //scheduler设置的是单个worker线程,这是问题的起因!!!
    this.scheduler = (ScheduledThreadPoolExecutor)Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(client.settings(), (name != null ? "[" + name + "]" : "") + "bulk_processor"));
    

//定时线程池,定期(flushInterval)执行刷新任务(Flush)
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(new BulkProcessor.Flush(), flushInterval.millis(), flushInterval.millis(), TimeUnit.MILLISECONDS);
```

 看下定期执行的逻辑(Flush)

``` java
    class Flush implements Runnable {
        Flush() {
        }

        public void run() {
            synchronized(BulkProcessor.this) {
                if (!BulkProcessor.this.closed) {
                    if (BulkProcessor.this.bulkRequest.numberOfActions() != 0) {
                        //处理累计的批量数据
                        BulkProcessor.this.execute();
                    }
                }
            }
        }
  }
```

Flush线程的逻辑很简单,就是处理已有批量的数据。
  • 回退重试策略:backoffPolicy
Retry.on(EsRejectedExecutionException.class).policy(this.backoffPolicy).withSyncBackoff(this.client, bulkRequest);

在插入es失败时发起重试,可以配置重试策略,如指数回退,常量回退等,不是重点,略过。

  • 回调函数:listener

    既然是入库,最重要的当然是把数据入进去,处理批量请求的最重要的函数是

        private void execute() {
            BulkRequest bulkRequest = this.bulkRequest;
            long executionId = this.executionIdGen.incrementAndGet();
            this.bulkRequest = new BulkRequest();
            this.bulkRequestHandler.execute(bulkRequest, executionId);
        }
    
        
    private static class AsyncBulkRequestHandler extends BulkRequestHandler {
              private AsyncBulkRequestHandler(Client client, BackoffPolicy backoffPolicy, Listener listener, int concurrentRequests) {
                super(client);
                this.backoffPolicy = backoffPolicy;
    
                assert concurrentRequests > 0;
    
                this.listener = listener;
                this.concurrentRequests = concurrentRequests;
                //创建同步信号量
                this.semaphore = new Semaphore(concurrentRequests);
            }
        
     public void execute(final BulkRequest bulkRequest, final long executionId) {
                boolean bulkRequestSetupSuccessful = false;
                boolean acquired = false;
    
                try {
                    //处理前执行以下逻辑
                    this.listener.beforeBulk(executionId, bulkRequest);
                    //获取信号量
                    this.semaphore.acquire();
                    acquired = true;
                    Retry.on(EsRejectedExecutionException.class).policy(this.backoffPolicy).withAsyncBackoff(this.client, bulkRequest, new ActionListener<BulkResponse>() {
                        public void onResponse(BulkResponse response) {
                            try {
      //正常处理后执行以下逻辑
                                AsyncBulkRequestHandler.this.listener.afterBulk(executionId, bulkRequest, response);
                            } finally {
                                //释放信号量(很重要!!!未按预期执行会导致信号量未能释放而阻塞)
                                AsyncBulkRequestHandler.this.semaphore.release();
                            }
    
                        }
    
                        public void onFailure(Exception e) {
                            try {
                           //异常处理后执行以下逻辑   
                                AsyncBulkRequestHandler.this.listener.afterBulk(executionId, bulkRequest, e);
                            } finally {
                                //释放信号量 (很重要!!!未按预期执行会导致信号量未能释放而阻塞)
                                AsyncBulkRequestHandler.this.semaphore.release();
                            }
    
                        }
                    });
                    bulkRequestSetupSuccessful = true;
                } catch (InterruptedException var11) {
                    ... 
                } finally {
                    if (!bulkRequestSetupSuccessful && acquired) {
                        this.semaphore.release();
                    }
    
                }
    
            }
    }
    
    

    在这里不得不多提一嘴,execute方法中的onResponse方法的回调函数正确执行很重要,否则会导致阻塞。

3. 死锁问题分析

这个算是5.4.1的bug了。死锁原因是,在scheduler中只有一个worker线程。Flush方法是同步方法(sychronized(BulkProcessor.this)),但是在internallAdd()方法也会被阻塞在这个锁上,这个bulk请求就会发送失败,触发Retry逻辑。由于internalAdd方法占锁,导致线程的Flush方法阻塞,所以当retry逻辑尝试重试会失败因为Flush阻塞了唯一的线程,所以Flush不能继续,一直等待internalAdd结束。internalAdd逻辑未能结束因为一直在等Retry逻辑结束,而Retry逻辑未能结束是因为在等待Flush逻辑结束。

首先走查代码分析这个缺陷。

//定义了单个线程,该线程被Flush和Retry逻辑共享
this.scheduler = (ScheduledThreadPoolExecutor)Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(client.settings(), (name != null ? "[" + name + "]" : "") + "bulk_processor"));

//该线程会执行Flush逻辑
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(new BulkProcessor.Flush(), flushInterval.millis(), flushInterval.millis(), TimeUnit.MILLISECONDS);

接下来看下Flush逻辑

//
    class Flush implements Runnable {
        Flush() {
        }

        public void run() {
            //会加锁!!!
            synchronized(BulkProcessor.this) {
                if (!BulkProcessor.this.closed) {
                    if (BulkProcessor.this.bulkRequest.numberOfActions() != 0) {
                        BulkProcessor.this.execute();
                    }
                }
            }
        }
    }
  • 添加索引数据请求

     bulkProcessor.add(new IndexRequest("twitter", "_doc", "1").source(/* your doc here */));
    
    //在添加数据时会hold住对象锁(和Flush hold的是同一把锁),如果失败会阻塞Flush逻辑
    private synchronized void internalAdd(DocWriteRequest request, @Nullable Object payload) {
            ensureOpen();
            bulkRequest.add(request, payload);
            executeIfNeeded();
        }
    
    

​ InternalAdd过程会占用锁,此时如果失败,就会去调用Retry线程去重试。而Retry线程需要等待Flush结束才能进入execute方法执行重试的过程。但是Flush的锁被添加数据的时候(internalAdd)占据了,因而导致整个死锁,如下图所示:


死锁图.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,802评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,109评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,683评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,458评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,452评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,505评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,901评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,550评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,763评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,556评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,629评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,330评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,898评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,897评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,140评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,807评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,339评论 2 342

推荐阅读更多精彩内容