ES5.6 Bulk源码解析

Bulk注册

在启动类BootStrap的start()方法中,启动了node.start()方法。在Node初始化的过程中,加载了一系列的模块和插件,其中包含ActionModel。

 ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(),
                settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(),
                threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService);
 modules.add(actionModule);

在ActionModel中,注册了我们常用的一些操作action,比如说我们这次解析的BulkAction:

  actions.register(UpdateAction.INSTANCE, TransportUpdateAction.class);
  actions.register(MultiGetAction.INSTANCE, TransportMultiGetAction.class,TransportShardMultiGetAction.class);
  actions.register(BulkAction.INSTANCE, TransportBulkAction.class,TransportShardBulkAction.class);

并且初始化RestHandler:

 registerHandler.accept(new RestMultiTermVectorsAction(settings, restController));
 registerHandler.accept(new RestBulkAction(settings, restController));
 registerHandler.accept(new RestUpdateAction(settings, restController));

在RestBulkAction中规定了我们的查询方式:

  controller.registerHandler(POST, "/_bulk", this);
  controller.registerHandler(PUT, "/_bulk", this);
  controller.registerHandler(POST, "/{index}/_bulk", this);
  controller.registerHandler(PUT, "/{index}/_bulk", this);
  controller.registerHandler(POST, "/{index}/{type}/_bulk", this);
  controller.registerHandler(PUT, "/{index}/{type}/_bulk", this);

接收到请求

RestBulkAction在prepareRequest方法中将我们普通的RestRequest转化为BulkReqest,并通过NodeClient调用:

 channel -> client.bulk(bulkRequest, new RestStatusToXContentListener<>(channel));

而在NodeClient的bulk中则是调用了NodeClient的doExecute()方法。

doExecute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener)

传入的Action是BulkAction.Instance,request就是上一步封装的BulkRequest,listener则是监听器。

在doExecute方法中,首先将普通的action转化为tansportAction,然后用转化后的tansportAction执行该请求:

transportAction(action).execute(request, listener);

bulkAction转化后变为TransportBulkAction,而TransportBulkAction的execute方法则是调用本身的doExecute()方法。在doExecut()方法中首先将存在和不存在的索引分类:

1)Step 1: collect all the indices in the request
2)Step 2: filter that to indices that don't exist and we can create. At the same time build a map of indices we can't create that we'll use when we try to run the requests.
3)Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back.

然后执行executeBulk()方法,接着在executeBulk中创建一个BulkOperation,并开始执行该BulkOperation:

void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos, final ActionListener<BulkResponse> listener,
        final AtomicArray<BulkItemResponse> responses, Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {
    new BulkOperation(task, bulkRequest, listener, responses, startTimeNanos, indicesThatCannotBeCreated).run();
}

在BulkOperation中存在两次遍历Bulk中所有的请求,第一次遍历则将给该请求设置Routing,Mapping等等,如果允许产生ID,则自动生成ID。第二次遍历则是根据shardID将请求分类。ES官网有说到批量处理时让用bulk,原因是bulk处理请求时做了一些底层的优化。这就是一个优化点,将同一个shard的请求集合在一起直接发送到节点对应的shard,避免请求在节点间传递,影响效率。

for (int i = 0; i < bulkRequest.requests.size(); i++) {
    ....
    switch (docWriteRequest.opType()) {
                    case CREATE:
                    case INDEX:
                        IndexRequest indexRequest = (IndexRequest) docWriteRequest;
                        MappingMetaData mappingMd = null;
                        final IndexMetaData indexMetaData = metaData.index(concreteIndex);
                        if (indexMetaData != null) {
                            mappingMd = indexMetaData.mappingOrDefault(indexRequest.type());
                        }
                        indexRequest.resolveRouting(metaData);
                        indexRequest.process(mappingMd, allowIdGeneration, concreteIndex.getName());
            ....
    }
....
}

....
for (int i = 0; i < bulkRequest.requests.size(); i++) {
            DocWriteRequest request = bulkRequest.requests.get(i);
            if (request == null) {
                continue;
             }
            String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
            ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId();
            List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
            shardRequests.add(new BulkItemRequest(i, request));
         }

然后针对不同的shardRequest,分别用shardBulkAction处理:

shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {}

每个shard的处理流程

接下来就是复杂的类继承关系了:

TransportShardBulkAction>TransportWriteAction >TransportReplicationAction>TransportAction

上一步的shardBulkAction.execute()方法则是执行的TransportAction的execute方法。我看得源码版本是5.6版本的,与5.0版本相比,ES增加了一个
TransportWriteAction类,而且在TransportReplicationAction不是直接运行run方法,而是通过transportService的RPC接口在实现功能。具体的流程如下:

1)TransportAction.execute()方法会调用TransportReplicationAction的doExecute()方法

2)在TransportReplicationAction的doExecute()方法中执行ReroutePhase的run方法,run方法中根据请求的shardID获取到primary shardID,同时得到primary shard的NodeID,如果当前节点包含primary shard,则执行performLocalAction方法,否则执行performRemoteAction。

3)performLocalAction和performRemoteAction最终都将执行performAction方法,在performAction中我们可以看到,transportService发送请求:

transportService.sendRequest(node, action, requestToPerform, transportOptions, new TransportResponseHandler<Response>() {}

4)transportService接收到请求后用的PrimaryOperationTransportHandler处理,至于PrimaryOperationTransportHandler是在TransportReplicationAction中注册的:

transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor,
        new PrimaryOperationTransportHandler());

5)PrimaryOperationTransportHandler则是一个primary操作的处理类,在这个类接收到信息之后调用AsyncPrimaryAction处理:

@Override
    public void messageReceived(ConcreteShardRequest<Request> request, TransportChannel channel, Task task) {
        new AsyncPrimaryAction(request.request, request.targetAllocationID, channel, (ReplicationTask) task).run();
    }

6)在AsyncPrimaryAction中首先获取shard锁,如果成功的获取到锁则调用自身的onresponse()方法,否则将获取操作加入线程池:

            synchronized (this) {
            releasable = tryAcquire();
            if (releasable == null) {
                // blockOperations is executing, this operation will be retried by blockOperations once it finishes
                if (delayedOperations == null) {
                    delayedOperations = new ArrayList<>();
                }
                final Supplier<StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(false);
                if (executorOnDelay != null) {
                    delayedOperations.add(
                        new ThreadedActionListener<>(logger, threadPool, executorOnDelay,
                            new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution));
                } else {
                    delayedOperations.add(new ContextPreservingActionListener<>(contextSupplier, onAcquired));
                }
                return;
            }
        }

7)在onresponse中,如果该primaryShardReference已经被移动了,则获取到正确的primary shard和nodeID重新发送请求。否则就用primaryShardReference直接处理:

 @Override
    public void onResponse(PrimaryShardReference primaryShardReference) {
        try {
            if (primaryShardReference.isRelocated()) {
                primaryShardReference.close(); // release shard operation lock as soon as possible
                setPhase(replicationTask, "primary_delegation");
                // delegate primary phase to relocation target
                // it is safe to execute primary phase on relocation target as there are no more in-flight operations where primary
                // phase is executed on local shard and all subsequent operations are executed on relocation target as primary phase.
                final ShardRouting primary = primaryShardReference.routingEntry();
                assert primary.relocating() : "indexShard is marked as relocated but routing isn't" + primary;
                DiscoveryNode relocatingNode = clusterService.state().nodes().get(primary.relocatingNodeId());
                transportService.sendRequest(relocatingNode, transportPrimaryAction,
                    new ConcreteShardRequest<>(request, primary.allocationId().getRelocationId()),
                    transportOptions,
                    new TransportChannelResponseHandler<Response>(logger, channel, "rerouting indexing to target primary " + primary,
                        TransportReplicationAction.this::newResponseInstance) {

                        @Override
                        public void handleResponse(Response response) {
                            setPhase(replicationTask, "finished");
                            super.handleResponse(response);
                        }

                        @Override
                        public void handleException(TransportException exp) {
                            setPhase(replicationTask, "finished");
                            super.handleException(exp);
                        }
                    });
            } else {
                setPhase(replicationTask, "primary");
                final IndexMetaData indexMetaData = clusterService.state().getMetaData().index(request.shardId().getIndex());
                final boolean executeOnReplicas = (indexMetaData == null) || shouldExecuteReplication(indexMetaData);
                final ActionListener<Response> listener = createResponseListener(primaryShardReference);
                createReplicatedOperation(request,
                        ActionListener.wrap(result -> result.respond(listener), listener::onFailure),
                        primaryShardReference, executeOnReplicas)
                        .execute();
            }
        } catch (Exception e) {
            Releasables.closeWhileHandlingException(primaryShardReference); // release shard operation lock before responding to caller
            onFailure(e);
        }
    }

8)createReplicatedOperation看名字还以为直接就是副本处理了,点进去看了之后才发现是先执行primary,后执行replia。

 primaryResult = primary.perform(request);
    ...
 performOnReplicas(replicaRequest, shards);

主分片处理

主分片的处理调用的是PrimaryShardReference.perform()方法,在该方法中则是调用shardOperationOnPrimary()进行主分片的处理。

shardOperationOnPrimary()方法则是由TransportShardBulkAction来实现的,具体执行的步骤如下:

1)获取节点中所有的索引元数据

2)获取版本号

3)更新mapping

4)调用Engin底层的代码。比如说primary.delete(delete),primary.index(operation)等等。

5)写到tanslog中

副本分片和主分片类似,这里就不做过多解释。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,214评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,307评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,543评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,221评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,224评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,007评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,313评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,956评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,441评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,925评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,018评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,685评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,234评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,240评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,464评论 1 261
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,467评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,762评论 2 345

推荐阅读更多精彩内容