elasticsearch shard split 分析(三)

上篇文章分析了elasticsearch从接受到restful请求后怎么生成一个索引创建任务,接下来分析索引创建任务的执行过程。我们之前讲到在TieBreakingPrioritizedRunnable中实现了run方法,所以任务执行的时候肯定是先进入其run方法,那么就从它的run函数开始一步步分析。

TieBreakingPrioritizedRunnable

public void run() {
            synchronized (this) {
                // make the task as stared. This is needed for synchronization with the timeout handling
                // see  #scheduleTimeout()
                started = true;
                FutureUtils.cancel(timeoutFuture);
            }
            runAndClean(runnable);
        }

首先将started设置为true,表示任务开始执行。随后进入runAndClean方法。

private void runAndClean(Runnable run) {
            try {
                run.run();
            } finally {
                runnable = null;
                timeoutFuture = null;
            }

        }

这里调用了run的run方法,而run就是TieBreakingPrioritizedRunnable的runnable成员变量。那么该runnable对象是个什么类型的对象呢?还记得之前讲到执行command时会先对command包装一下吧,在那个包装函数里生成了PrioritizedEsThreadPoolExecutor对象,这里再次看下wrapRunnable函数。

protected Runnable wrapRunnable(Runnable command) {
        if (command instanceof PrioritizedRunnable) {
            if ((command instanceof TieBreakingPrioritizedRunnable)) {
                return command;
            }
            Priority priority = ((PrioritizedRunnable) command).priority();
            return new TieBreakingPrioritizedRunnable(super.wrapRunnable(command), priority, insertionOrder.incrementAndGet());
        } else if (command instanceof PrioritizedFutureTask) {
            return command;
        } else { // it might be a callable wrapper...
            if (command instanceof TieBreakingPrioritizedRunnable) {
                return command;
            }
            return new TieBreakingPrioritizedRunnable(super.wrapRunnable(command), Priority.NORMAL, insertionOrder.incrementAndGet());
        }
    }

可以看到在构造TieBreakingPrioritizedRunnable对象的再次调用了TieBreakingPrioritizedRunnable父类的wrapRunnable函数对command封装了一下。TieBreakingPrioritizedRunnable的父类为EsThreadPoolExecutor。

EsThreadPoolExecutor

protected Runnable wrapRunnable(Runnable command) {
        return contextHolder.preserveContext(command);
    }

这里调用了contextHolder的preserveContext函数。contextHolder是一个ThreadContext对象。

ThreadContext

public Runnable preserveContext(Runnable command) {
        if (command instanceof ContextPreservingAbstractRunnable) {
            return command;
        }
        if (command instanceof ContextPreservingRunnable) {
            return command;
        }
        if (command instanceof AbstractRunnable) {
            return new ContextPreservingAbstractRunnable((AbstractRunnable) command);
        }
        return new ContextPreservingRunnable(command);
    }

到这里最终返回的是ContextPreservingRunnable对象。所以回到TieBreakingPrioritizedRunnable的runAdnClean方法,此处的run对象就是ContextPreservingRunnable对象。

ContextPreservingRunnable

public void run() {
            boolean whileRunning = false;
            try (ThreadContext.StoredContext ignore = stashContext()){
                ctx.restore();
                whileRunning = true;
                in.run();
                whileRunning = false;
            } catch (IllegalStateException ex) {
                if (whileRunning || threadLocal.closed.get() == false) {
                    throw ex;
                }
                // if we hit an ISE here we have been shutting down
                // this comes from the threadcontext and barfs if
                // our threadpool has been shutting down
            }
        }

这里设置了一下context后就直接调用了in对象的run方法,这里的in成员就是最开始被包装的command,也就是UpdateTask对象,它属于BatchedTask的子类。

BatchedTask

public void run() {
            runIfNotProcessed(this);
        }

TaskBatcher

void runIfNotProcessed(BatchedTask updateTask) {
        // if this task is already processed, it shouldn't execute other tasks with same batching key that arrived later,
        // to give other tasks with different batching key a chance to execute.
        if (updateTask.processed.get() == false) {
            final List<BatchedTask> toExecute = new ArrayList<>();
            final Map<String, List<BatchedTask>> processTasksBySource = new HashMap<>();
            synchronized (tasksPerBatchingKey) {
                LinkedHashSet<BatchedTask> pending = tasksPerBatchingKey.remove(updateTask.batchingKey);
                if (pending != null) {
                    for (BatchedTask task : pending) {
                        if (task.processed.getAndSet(true) == false) {
                            logger.trace("will process {}", task);
                            toExecute.add(task);
                            processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task);
                        } else {
                            logger.trace("skipping {}, already processed", task);
                        }
                    }
                }
            }

            if (toExecute.isEmpty() == false) {
                final String tasksSummary = processTasksBySource.entrySet().stream().map(entry -> {
                    String tasks = updateTask.describeTasks(entry.getValue());
                    return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";
                }).reduce((s1, s2) -> s1 + ", " + s2).orElse("");

                run(updateTask.batchingKey, toExecute, tasksSummary);
            }
        }
    }

这里先是计算哪些task需要被执行,随后调用run方法执行需要执行的task。

Batcher

protected void run(Object batchingKey, List<? extends BatchedTask> tasks, String tasksSummary) {
            ClusterStateTaskExecutor<Object> taskExecutor = (ClusterStateTaskExecutor<Object>) batchingKey;
            List<UpdateTask> updateTasks = (List<UpdateTask>) tasks;
            runTasks(new TaskInputs(taskExecutor, updateTasks, tasksSummary));
        }

Batcher继承了TaskBatcher,所以会进入Batcher的run方法。这里先把batchingKey转成taskExecutor,batchingKey其实就是最初创建的IndexCreationTask对象。随后构造了一个TaskInputs对象,并调用MasterService的runtasks方法。

MasterService

protected void runTasks(TaskInputs taskInputs) {
        final String summary = taskInputs.summary;
        if (!lifecycle.started()) {
            logger.debug("processing [{}]: ignoring, master service not started", summary);
            return;
        }

        logger.debug("processing [{}]: execute", summary);
      //首先获取集群的当前状态
        final ClusterState previousClusterState = state();
       //如果当前节点已经不是master节点,并且task只能再master节点执行,则拒绝执行。
        if (!previousClusterState.nodes().isLocalNodeElectedMaster() && taskInputs.runOnlyWhenMaster()) {
            logger.debug("failing [{}]: local node is no longer master", summary);
            taskInputs.onNoLongerMaster();
            return;
        }

        long startTimeNS = currentTimeInNanos();
       //此处是任务的执行过程,并将计算结果通过TaskOutputs返回
        TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState, startTimeNS);
        taskOutputs.notifyFailedTasks();

        if (taskOutputs.clusterStateUnchanged()) {
            taskOutputs.notifySuccessfulTasksOnUnchangedClusterState();
            TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
            logger.debug("processing [{}]: took [{}] no change in cluster state", summary, executionTime);
            warnAboutSlowTaskIfNeeded(executionTime, summary);
        } else {
            ClusterState newClusterState = taskOutputs.newClusterState;
            if (logger.isTraceEnabled()) {
                logger.trace("cluster state updated, source [{}]\n{}", summary, newClusterState);
            } else if (logger.isDebugEnabled()) {
                logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), summary);
            }
            try {
                ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(summary, newClusterState, previousClusterState);
                // new cluster state, notify all listeners
                final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
                if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
                    String nodeSummary = nodesDelta.shortSummary();
                    if (nodeSummary.length() > 0) {
                        logger.info("{}, reason: {}", summary, nodeSummary);
                    }
                }

                logger.debug("publishing cluster state version [{}]", newClusterState.version());
                try {
                    clusterStatePublisher.accept(clusterChangedEvent, taskOutputs.createAckListener(threadPool, newClusterState));
                } catch (Discovery.FailedToCommitClusterStateException t) {
                    final long version = newClusterState.version();
                    logger.warn(
                        (Supplier<?>) () -> new ParameterizedMessage(
                            "failing [{}]: failed to commit cluster state version [{}]", summary, version),
                        t);
                    taskOutputs.publishingFailed(t);
                    return;
                }

                taskOutputs.processedDifferentClusterState(previousClusterState, newClusterState);

                try {
                    taskOutputs.clusterStatePublished(clusterChangedEvent);
                } catch (Exception e) {
                    logger.error(
                        (Supplier<?>) () -> new ParameterizedMessage(
                            "exception thrown while notifying executor of new cluster state publication [{}]",
                            summary),
                        e);
                }
                TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
                logger.debug("processing [{}]: took [{}] done publishing updated cluster state (version: {}, uuid: {})", summary,
                    executionTime, newClusterState.version(),
                    newClusterState.stateUUID());
                warnAboutSlowTaskIfNeeded(executionTime, summary);
            } catch (Exception e) {
                TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
                final long version = newClusterState.version();
                final String stateUUID = newClusterState.stateUUID();
                final String fullState = newClusterState.toString();
                logger.warn(
                    (Supplier<?>) () -> new ParameterizedMessage(
                        "failed to publish updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}",
                        executionTime,
                        version,
                        stateUUID,
                        summary,
                        fullState),
                    e);
                // TODO: do we want to call updateTask.onFailure here?
            }
        }
    }

先看下calculateTaskOutputs这个函数

public TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState, long startTimeNS) {
        ClusterTasksResult<Object> clusterTasksResult = executeTasks(taskInputs, startTimeNS, previousClusterState);
        ClusterState newClusterState = patchVersions(previousClusterState, clusterTasksResult);
        return new TaskOutputs(taskInputs, previousClusterState, newClusterState, getNonFailedTasks(taskInputs, clusterTasksResult),
            clusterTasksResult.executionResults);
    }

在跟入executeTasks函数

protected ClusterTasksResult<Object> executeTasks(TaskInputs taskInputs, long startTimeNS, ClusterState previousClusterState) {
        ClusterTasksResult<Object> clusterTasksResult;
        try {
            List<Object> inputs = taskInputs.updateTasks.stream().map(tUpdateTask -> tUpdateTask.task).collect(Collectors.toList());
            clusterTasksResult = taskInputs.executor.execute(previousClusterState, inputs);
            if (previousClusterState != clusterTasksResult.resultingState &&
                previousClusterState.nodes().isLocalNodeElectedMaster() &&
                (clusterTasksResult.resultingState.nodes().isLocalNodeElectedMaster() == false)) {
                throw new AssertionError("update task submitted to MasterService cannot remove master");
            }
        } catch (Exception e) {
            ...
        }

        ...

        return clusterTasksResult;
    }

这个函数省略了一些代码,在这里通过调用taskInputs的executor成员来执行任务。这个executor其实就是IndexCreationTask对象。传入了两个参数,一个是上一个集群的状态,另一个是inputs对象,inputs里边包含的其实也是IndexCreationTask对象。这里先看下IndexCreationTask类的层次结构。

static class IndexCreationTask extends AckedClusterStateUpdateTask<ClusterStateUpdateResponse> 

public abstract class AckedClusterStateUpdateTask<Response> extends ClusterStateUpdateTask implements AckedClusterStateTaskListener

public abstract class ClusterStateUpdateTask implements ClusterStateTaskConfig, ClusterStateTaskExecutor<ClusterStateUpdateTask>, ClusterStateTaskListener

可以看到,IndexCreationTask即是一个task,又是一个listener,又是一个config,同时还是一个executor。回忆一下ClusterService的submitStateUpdateTask函数

public <T extends ClusterStateTaskConfig & ClusterStateTaskExecutor<T> & ClusterStateTaskListener>
        void submitStateUpdateTask(String source, T updateTask) {
        submitStateUpdateTask(source, updateTask, updateTask, updateTask, updateTask);
    }

public <T> void submitStateUpdateTask(String source, T task,
                                          ClusterStateTaskConfig config,
                                          ClusterStateTaskExecutor<T> executor,
                                          ClusterStateTaskListener listener) {
        submitStateUpdateTasks(source, Collections.singletonMap(task, listener), config, executor);
    }

在执行这个函数的时候后面四个参数传的是同一个IndexCreateTask对象!回到正题,继续跟入executor的execute方法,其实是先进入了ClusterStateUpdateTask的execute方法。

ClusterStateUpdateTask

public final ClusterTasksResult<ClusterStateUpdateTask> execute(ClusterState currentState, List<ClusterStateUpdateTask> tasks) throws Exception {
        ClusterState result = execute(currentState);
        return ClusterTasksResult.<ClusterStateUpdateTask>builder().successes(tasks).build(result);
    }

然后在进入TaskCreationTask的execute方法

IndexCreationTask

public ClusterState execute(ClusterState currentState) throws Exception {
            Index createdIndex = null;
            String removalExtraInfo = null;
            IndexRemovalReason removalReason = IndexRemovalReason.FAILURE;
            try {
                ...
                //获取请求的recoverFromIndex,在前文中我提到过这个属性,可以往回找下该参数是怎么设置的
                final Index recoverFromIndex = request.recoverFrom();
                //目标索引的配置
                Settings.Builder indexSettingsBuilder = Settings.builder();
                // now, put the request settings, so they override templates
               //目标索引元数据builder
                final IndexMetaData.Builder tmpImdBuilder = IndexMetaData.builder(request.index());

                final int routingNumShards;
                if (recoverFromIndex == null) {
                    Settings idxSettings = indexSettingsBuilder.build();
                    routingNumShards = IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.get(idxSettings);
                } else {
                    assert IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.exists(indexSettingsBuilder.build()) == false
                        : "index.number_of_routing_shards should be present on the target index on resize";
                    final IndexMetaData sourceMetaData = currentState.metaData().getIndexSafe(recoverFromIndex);
                   //目标索引的routingNumShard必须从源索引中获取
                    routingNumShards = sourceMetaData.getRoutingNumShards();
                }
                // remove the setting it's temporary and is only relevant once we create the index
                indexSettingsBuilder.remove(IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.getKey());
                tmpImdBuilder.setRoutingNumShards(routingNumShards);

                if (recoverFromIndex != null) {
                    assert request.resizeType() != null;
                    //一些准备工作,包括校验参数合法性,将源索引的一些配置设置到目标索引中。如果源索引没有被设置为阻塞写,会在这一步报错。
                    prepareResizeIndexSettings(
                        currentState, mappings.keySet(), indexSettingsBuilder, recoverFromIndex, request.index(), request.resizeType());
                }
                final Settings actualIndexSettings = indexSettingsBuilder.build();
                tmpImdBuilder.settings(actualIndexSettings);

                if (recoverFromIndex != null) {
                    /*
                     * We need to arrange that the primary term on all the shards in the shrunken index is at least as large as
                     * the maximum primary term on all the shards in the source index. This ensures that we have correct
                     * document-level semantics regarding sequence numbers in the shrunken index.
                     */
                    final IndexMetaData sourceMetaData = currentState.metaData().getIndexSafe(recoverFromIndex);
                    final long primaryTerm =
                        IntStream
                            .range(0, sourceMetaData.getNumberOfShards())
                            .mapToLong(sourceMetaData::primaryTerm)
                            .max()
                            .getAsLong();
                    for (int shardId = 0; shardId < tmpImdBuilder.numberOfShards(); shardId++) {
                        tmpImdBuilder.primaryTerm(shardId, primaryTerm);
                    }
                }
                // Set up everything, now locally create the index to see that things are ok, and apply
                final IndexMetaData tmpImd = tmpImdBuilder.build();
                ActiveShardCount waitForActiveShards = request.waitForActiveShards();
                if (waitForActiveShards == ActiveShardCount.DEFAULT) {
                    waitForActiveShards = tmpImd.getWaitForActiveShards();
                }
                if (waitForActiveShards.validate(tmpImd.getNumberOfReplicas()) == false) {
                    throw new IllegalArgumentException("invalid wait_for_active_shards[" + request.waitForActiveShards() +
                        "]: cannot be greater than number of shard copies [" +
                        (tmpImd.getNumberOfReplicas() + 1) + "]");
                }
                // create the index here (on the master) to validate it can be created, as well as adding the mapping
               //此处创建了与索引对应的IndexService,并将其加入到IndicesService的indices map中
                final IndexService indexService = indicesService.createIndex(tmpImd, Collections.emptyList());
                createdIndex = indexService.index();
                // now add the mappings
                
                final IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(request.index())
                    .settings(actualIndexSettings)
                    .setRoutingNumShards(routingNumShards);

                //此时index处于open状态
                indexMetaDataBuilder.state(request.state());

                final IndexMetaData indexMetaData;
                try {
                    indexMetaData = indexMetaDataBuilder.build();
                } catch (Exception e) {
                    removalExtraInfo = "failed to build index metadata";
                    throw e;
                }

                indexService.getIndexEventListener().beforeIndexAddedToCluster(indexMetaData.getIndex(),
                    indexMetaData.getSettings());

               //此处将目标索引对应的IndexMetaData加入到MetaData对象的indices成员变量中。该成员变量是一个map,维护了集群中所有索引的metaData。
                MetaData newMetaData = MetaData.builder(currentState.metaData())
                    .put(indexMetaData, false)
                    .build();

                logger.info("[{}] creating index, cause [{}], templates {}, shards [{}]/[{}], mappings {}",
                    request.index(), request.cause(), templateNames, indexMetaData.getNumberOfShards(),
                    indexMetaData.getNumberOfReplicas(), mappings.keySet());

                ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
                if (!request.blocks().isEmpty()) {
                    for (ClusterBlock block : request.blocks()) {
                        blocks.addIndexBlock(request.index(), block);
                    }
                }
                blocks.updateBlocks(indexMetaData);
                //此处将新的MetaData对象应用到集群中,改变集群状态,使新的集群状态包含目标索引。
                ClusterState updatedState = ClusterState.builder(currentState).blocks(blocks).metaData(newMetaData).build();

                //上一步新建的集群状态虽然包含了目标索引的metaData,但路由表中还没有包含目标索引的路由信息。在这一步中试图更新路由表信息。
                if (request.state() == State.OPEN) {
                    RoutingTable.Builder routingTableBuilder = RoutingTable.builder(updatedState.routingTable())
                        .addAsNew(updatedState.metaData().index(request.index()));
                    updatedState = allocationService.reroute(
                        ClusterState.builder(updatedState).routingTable(routingTableBuilder.build()).build(),
                        "index [" + request.index() + "] created");
                }
                removalExtraInfo = "cleaning up after validating index on master";
                removalReason = IndexRemovalReason.NO_LONGER_ASSIGNED;
                return updatedState;
            } finally {
                if (createdIndex != null) {
                    // Index was already partially created - need to clean up
                    indicesService.removeIndex(createdIndex, removalReason, removalExtraInfo);
                }
            }
        }

这个函数非常长,这里删掉了很多细节,只列出几个比较重要的步骤。先设置一些配置属性,同时校验一些配置合法性后通过IndicesService的createIndex函数创建索引。

IndicesService

public synchronized IndexService createIndex(
            final IndexMetaData indexMetaData, final List<IndexEventListener> builtInListeners) throws IOException {
        ensureChangesAllowed();
        if (indexMetaData.getIndexUUID().equals(IndexMetaData.INDEX_UUID_NA_VALUE)) {
            throw new IllegalArgumentException("index must have a real UUID found value: [" + indexMetaData.getIndexUUID() + "]");
        }
        final Index index = indexMetaData.getIndex();
        if (hasIndex(index)) {
            throw new ResourceAlreadyExistsException(index);
        }
        List<IndexEventListener> finalListeners = new ArrayList<>(builtInListeners);
        final IndexEventListener onStoreClose = new IndexEventListener() {
            @Override
            public void onStoreClosed(ShardId shardId) {
                indicesQueryCache.onClose(shardId);
            }
        };
        finalListeners.add(onStoreClose);
        finalListeners.add(oldShardsStats);
        final IndexService indexService =
                createIndexService(
                        "create index",
                        indexMetaData,
                        indicesQueryCache,
                        indicesFieldDataCache,
                        finalListeners,
                        indexingMemoryController);
        boolean success = false;
        try {
            indexService.getIndexEventListener().afterIndexCreated(indexService);
            indices = newMapBuilder(indices).put(index.getUUID(), indexService).immutableMap();
            success = true;
            return indexService;
        } finally {
            if (success == false) {
                indexService.close("plugins_failed", true);
            }
        }
    }

这里设置了一些索引上的listener进入到createIndexService函数。

private synchronized IndexService createIndexService(final String reason,
                                                         IndexMetaData indexMetaData,
                                                         IndicesQueryCache indicesQueryCache,
                                                         IndicesFieldDataCache indicesFieldDataCache,
                                                         List<IndexEventListener> builtInListeners,
                                                         IndexingOperationListener... indexingOperationListeners) throws IOException {
        final IndexSettings idxSettings = new IndexSettings(indexMetaData, this.settings, indexScopeSetting);
        logger.debug("creating Index [{}], shards [{}]/[{}] - reason [{}]",
            indexMetaData.getIndex(),
            idxSettings.getNumberOfShards(),
            idxSettings.getNumberOfReplicas(),
            reason);

        final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry);
        for (IndexingOperationListener operationListener : indexingOperationListeners) {
            indexModule.addIndexOperationListener(operationListener);
        }
        pluginsService.onIndexModule(indexModule);
        for (IndexEventListener listener : builtInListeners) {
            indexModule.addIndexEventListener(listener);
        }
        return indexModule.newIndexService(
                nodeEnv,
                xContentRegistry,
                this,
                circuitBreakerService,
                bigArrays,
                threadPool,
                scriptService,
                client,
                indicesQueryCache,
                mapperRegistry,
                indicesFieldDataCache,
                namedWriteableRegistry
        );
    }

最终在这里创建了IndexService对象。有关索引创建的细节这里暂时先不讨论,只需要暂时知道在elasticsearch中,每一个索引都对应一个IndexService对象。同时每个索引有个UUID,每个索引对应的IndexServie对象通过IndicesService中的indices成员维护。indices成员就是一个map。在createIndex函数中,创建了IndexService后,调用了如下命令将新建的索引加入到indices map中。

indices = newMapBuilder(indices).put(index.getUUID(), indexService).immutableMap();

回到IndexCreationTask的execute函数中,当索引服务创建完,集群状态变更后,就需要更改路由表信息。在这里详细分析下路由表信息是怎么构建的,这对split过程理解比较重要。首先是获取到更新后的集群状态的路由表,注意,此时获取的路由表中还未包含目标索引的路由信息。然后拿到一个路由表builder,通过其addAsNew方法将新索引加入到路由表中。

RoutingTable.Builder

public Builder addAsNew(IndexMetaData indexMetaData) {
            if (indexMetaData.getState() == IndexMetaData.State.OPEN) {
                IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetaData.getIndex())
                        .initializeAsNew(indexMetaData);
                add(indexRoutingBuilder);
            }
            return this;
        }

这里传入的参数为目标索引的IndexMetaData,因为此时目标索引还处于open状态,所以为该索引生成一个新的IndexRoutingTable.Builder对象。然后调用其initializeAsNew方法。

IndexRoutingTable.Builder

public Builder initializeAsNew(IndexMetaData indexMetaData) {
            return initializeEmpty(indexMetaData, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
        }
private Builder initializeEmpty(IndexMetaData indexMetaData, UnassignedInfo unassignedInfo) {
            assert indexMetaData.getIndex().equals(index);
            if (!shards.isEmpty()) {
                throw new IllegalStateException("trying to initialize an index with fresh shards, but already has shards created");
            }
            for (int shardNumber = 0; shardNumber < indexMetaData.getNumberOfShards(); shardNumber++) {
                ShardId shardId = new ShardId(index, shardNumber);
                final RecoverySource primaryRecoverySource;
                if (indexMetaData.inSyncAllocationIds(shardNumber).isEmpty() == false) {
                    // we have previous valid copies for this shard. use them for recovery
                    primaryRecoverySource = StoreRecoverySource.EXISTING_STORE_INSTANCE;
                } else if (indexMetaData.getResizeSourceIndex() != null) {
                    // this is a new index but the initial shards should merged from another index
                    primaryRecoverySource = LocalShardsRecoverySource.INSTANCE;
                } else {
                    // a freshly created index with no restriction
                    primaryRecoverySource = StoreRecoverySource.EMPTY_STORE_INSTANCE;
                }
                IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId);
                for (int i = 0; i <= indexMetaData.getNumberOfReplicas(); i++) {
                    boolean primary = i == 0;
                    indexShardRoutingBuilder.addShard(ShardRouting.newUnassigned(shardId, primary,
                        primary ? primaryRecoverySource : PeerRecoverySource.INSTANCE, unassignedInfo));
                }
                shards.put(shardNumber, indexShardRoutingBuilder.build());
            }
            return this;
        }

这里优先对目标索引中的每个shard确定其primary shard的恢复源。因为split属于ResizeSourceIndex,所以primaryRecoverSrouce为LocalShardsRecoverySource.INSTANCE。到这里应该清楚了,目标索引的每一个primary shard都是首先从local shard 恢复而来。也就是源索引的shard在哪个机器,分列出来的目标shard也会在那个机器上。当然后边可能会有relocate,但创建之初,目标索引的primary shard和源索引的shard一定是在同台机器上的。确定完primary shard 的恢复位置后,然后在设置副本的恢复类型为PeerRecoverySource.INSTANCE。需要注意的是,到这一步还只是设置了每个分片的恢复类型,但每个分片最终应该属于哪台机器还并没有设置。最后把每个shard的路由表放到shards成员变量中。然后回到addAsNew方法中,把新建的索引的路由表加入到集群路由表中。所以路由表的关系为:

RoutingTable->IndexRoutingTable->IndexShardRoutingTable

最后再看下IndexCreationTask的execute方法,生成了路由表后,会调用allocationService的reroute方法。

AllocationService

public ClusterState reroute(ClusterState clusterState, String reason) {
        return reroute(clusterState, reason, false);
    }
protected ClusterState reroute(final ClusterState clusterState, String reason, boolean debug) {
        RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
        // shuffle the unassigned nodes, just so we won't have things like poison failed shards
        routingNodes.unassigned().shuffle();
        RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState,
            clusterInfoService.getClusterInfo(), currentNanoTime());
        allocation.debugDecision(debug);
        reroute(allocation);
        if (allocation.routingNodesChanged() == false) {
            return clusterState;
        }
        return buildResultAndLogHealthChange(clusterState, allocation, reason);
    }

直接进入到reroute函数

private void reroute(RoutingAllocation allocation) {
        assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See deassociateDeadNodes";

        // now allocate all the unassigned to available nodes
        if (allocation.routingNodes().unassigned().size() > 0) {
            removeDelayMarkers(allocation);
            gatewayAllocator.allocateUnassigned(allocation);
        }

        shardsAllocator.allocate(allocation);
        assert RoutingNodes.assertShardStats(allocation.routingNodes());
    }

到这里会调用shardsAllocator 的allocate方法分配所有shard。具体怎么分配这里不展开细讲了,我们重点关注下ResizeAllocationDecider这个类。

ResizeAllocationDecider

public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
        final UnassignedInfo unassignedInfo = shardRouting.unassignedInfo();
        if (unassignedInfo != null && shardRouting.recoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) {
            // we only make decisions here if we have an unassigned info and we have to recover from another index ie. split / shrink
            final IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index());
            Index resizeSourceIndex = indexMetaData.getResizeSourceIndex();
            assert resizeSourceIndex != null;
            if (allocation.metaData().index(resizeSourceIndex) == null) {
                return allocation.decision(Decision.NO, NAME, "resize source index [%s] doesn't exists", resizeSourceIndex.toString());
            }
            IndexMetaData sourceIndexMetaData = allocation.metaData().getIndexSafe(resizeSourceIndex);
            if (indexMetaData.getNumberOfShards() < sourceIndexMetaData.getNumberOfShards()) {
                // this only handles splits so far.
                return Decision.ALWAYS;
            }

            //选择源shard
            ShardId shardId = IndexMetaData.selectSplitShard(shardRouting.id(), sourceIndexMetaData, indexMetaData.getNumberOfShards());
            //找到源shard的primary分片对应的路由信息
            ShardRouting sourceShardRouting = allocation.routingNodes().activePrimary(shardId);
            if (sourceShardRouting == null) {
                return allocation.decision(Decision.NO, NAME, "source primary shard [%s] is not active", shardId);
            }
            if (node != null) { // we might get called from the 2 param canAllocate method..
                if (node.node().getVersion().before(ResizeAction.COMPATIBILITY_VERSION)) {
                    return allocation.decision(Decision.NO, NAME, "node [%s] is too old to split a shard", node.nodeId());
                }
                //只有在源shard的primary分片所在的节点和当前分配的节点一致,才返回YES
                if (sourceShardRouting.currentNodeId().equals(node.nodeId())) {
                    return allocation.decision(Decision.YES, NAME, "source primary is allocated on this node");
                } else {
                    return allocation.decision(Decision.NO, NAME, "source primary is allocated on another node");
                }
            } else {
                return allocation.decision(Decision.YES, NAME, "source primary is active");
            }
        }
        return super.canAllocate(shardRouting, node, allocation);
    }

elasticsearch在决定是否将某个shard分配到一个节点时会调用许多的AllocationDecider来决策的。只有所有的AllocationDecider都通过后才能将该shard分配到某个节点。在split这种场景下,主要是ResizeAllocationDecider来决定。该decider决定目标索引的所有主分片只能和源索引的相应的主分片在同一个节点上。例如源索引的分片0的主分片在节点3上,那么由分片0分裂的所有分片的主分片都必须同样在节点3上。
等所有的分片路由信息决定好后索引创建完毕,回到calculateTaskOutputs函数,生成新的集群状态,然后构造TaskOutPuts信息返回到MasterService的runTasks方法。在该方法中会生成一个ClusterChangedEvent事件,然后通过clusterStatePublisher对外公布集群状态变更事件。
至此,索引已经创建完毕,但索引背后的数据从哪来呢,在下篇文章中将会解答这个问题。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,590评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,808评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,151评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,779评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,773评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,656评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,022评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,678评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,038评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,756评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,411评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,005评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,973评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,053评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,495评论 2 343

推荐阅读更多精彩内容